Skip to content

Commit ac3f5c0

Browse files
authored
#3588 fix: gRPC query and streaming query now propagate the language parameter (#3589)
1 parent 43f1922 commit ac3f5c0

6 files changed

Lines changed: 143 additions & 18 deletions

File tree

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# gRPC Query Language Support
2+
3+
## Problem
4+
5+
The gRPC client (`RemoteGrpcDatabase`) ignores the `language` parameter for query operations. Only `command()` correctly propagates the language. This prevents using Cypher, Gremlin, or any non-SQL language through the gRPC query and streaming query paths.
6+
7+
The bug spans three layers:
8+
9+
1. **Proto**: `ExecuteQueryRequest` is missing a `language` field entirely
10+
2. **Client**: `query()` can't pass language (proto missing it); `queryStream()` and `streamQuery()` never call `.setLanguage()` despite the proto supporting it
11+
3. **Server**: `executeQuery()` hardcodes `db.query("sql", ...)`; all three `streamQuery` modes (`streamCursor`, `streamMaterialized`, `streamPaged`) hardcode `db.query("sql", ...)`
12+
13+
## Design Decisions
14+
15+
- Add a `language` field to `ExecuteQueryRequest` (additive, backward-compatible)
16+
- Keep using `db.query(language, ...)` on the server for the Query RPC (caller chose read-only)
17+
- Default to `"sql"` when the language field is empty/unset (backward-compatible)
18+
19+
## Changes
20+
21+
### 1. Proto (`grpc/src/main/proto/arcadedb-server.proto`)
22+
23+
Add `string language = 9;` to `ExecuteQueryRequest`:
24+
25+
```protobuf
26+
message ExecuteQueryRequest {
27+
string database = 1;
28+
string query = 2;
29+
map<string, GrpcValue> parameters = 3;
30+
DatabaseCredentials credentials = 4;
31+
TransactionContext transaction = 5;
32+
int32 limit = 6;
33+
int32 timeout_ms = 7;
34+
ProjectionSettings projectionSettings = 8;
35+
string language = 9; // "sql" if empty (default)
36+
}
37+
```
38+
39+
### 2. Server (`grpcw/.../ArcadeDbGrpcService.java`)
40+
41+
**`executeQuery()`**: Replace hardcoded `"sql"` with language from request, defaulting to `"sql"` when empty.
42+
43+
**`streamQuery()`**: Extract language from `StreamQueryRequest.getLanguage()` (proto field 7, already exists), resolve default, and pass to `streamCursor`/`streamMaterialized`/`streamPaged`. Each mode method gains a `String language` parameter.
44+
45+
### 3. Client (`grpc-client/.../RemoteGrpcDatabase.java`)
46+
47+
- `query()` path (line 556): Add `.setLanguage(language)` to `ExecuteQueryRequest` builder
48+
- `queryStream()` path (line 780): Add `.setLanguage(language)` to `StreamQueryRequest` builder
49+
- Private `streamQuery()` (line 1767): Add `.setLanguage("sql")` since it's SQL-only by design
50+
51+
### Testing
52+
53+
- Existing gRPC e2e tests verify backward compatibility (SQL still works)
54+
- Add test that runs a query with a non-SQL language through `query()` and `queryStream()` to verify language propagation
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# gRPC Query Language Support - Implementation Plan
2+
3+
## Step 1: Proto change
4+
5+
- File: `grpc/src/main/proto/arcadedb-server.proto`
6+
- Add `string language = 9;` to `ExecuteQueryRequest` (after `projectionSettings`)
7+
- Rebuild proto: `cd grpc && mvn clean install -DskipTests`
8+
9+
## Step 2: Server - `executeQuery()` language support
10+
11+
- File: `grpcw/src/main/java/com/arcadedb/server/grpc/ArcadeDbGrpcService.java`
12+
- In `executeQuery()` (~line 823): extract language from `request.getLanguage()`, default to `"sql"`
13+
- Replace `db.query("sql", ...)` with `db.query(language, ...)`
14+
15+
## Step 3: Server - `streamQuery()` language support
16+
17+
- File: `grpcw/src/main/java/com/arcadedb/server/grpc/ArcadeDbGrpcService.java`
18+
- In `streamQuery()`: extract language from `request.getLanguage()`, default to `"sql"`
19+
- Add `String language` parameter to `streamCursor()`, `streamMaterialized()`, `streamPaged()`
20+
- Replace hardcoded `"sql"` in each mode's `db.query()` call
21+
- Build server: `cd grpcw && mvn clean install -DskipTests`
22+
23+
## Step 4: Client - wire language through query paths
24+
25+
- File: `grpc-client/src/main/java/com/arcadedb/remote/grpc/RemoteGrpcDatabase.java`
26+
- `query()` at line 556: add `.setLanguage(language)` to `ExecuteQueryRequest` builder
27+
- `queryStream()` at line 780: add `.setLanguage(language)` to `StreamQueryRequest` builder
28+
- Private `streamQuery()` at line 1767: add `.setLanguage("sql")` to `StreamQueryRequest` builder
29+
- Build client: `cd grpc-client && mvn clean install -DskipTests`
30+
31+
## Step 5: Test
32+
33+
- Add e2e test verifying a non-SQL query (e.g. Cypher `MATCH (n) RETURN n LIMIT 1`) works via gRPC `query()`
34+
- Run existing gRPC e2e tests to verify no regressions

e2e/src/test/java/com/arcadedb/e2e/RemoteGrpcDatabaseTest.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import com.arcadedb.utility.CollectionUtils;
2525
import org.junit.jupiter.api.AfterEach;
2626
import org.junit.jupiter.api.BeforeEach;
27-
import org.junit.jupiter.api.Disabled;
2827
import org.junit.jupiter.api.Test;
2928

3029
import java.util.List;
@@ -57,16 +56,35 @@ void simpleSQLQuery() {
5756
}
5857

5958
@Test
60-
@Disabled("Gremlin not supported yet")
6159
void simpleGremlinQuery() {
6260
final ResultSet result = database.query("gremlin", "g.V().limit(10)");
6361
assertThat(CollectionUtils.countEntries(result)).isEqualTo(10);
6462
}
6563

6664
@Test
67-
@Disabled("Cypher not supported yet")
6865
void simpleCypherQuery() {
6966
final ResultSet result = database.query("cypher", "MATCH(p:Beer) RETURN * LIMIT 10");
7067
assertThat(CollectionUtils.countEntries(result)).isEqualTo(10);
7168
}
69+
70+
@Test
71+
void simpleOpenCypherQuery() {
72+
database.transaction(() -> {
73+
final ResultSet result = database.query("opencypher", "MATCH(p:Beer) RETURN * LIMIT 10");
74+
assertThat(CollectionUtils.countEntries(result)).isEqualTo(10);
75+
}, false, 10);
76+
}
77+
78+
@Test
79+
void streamQueryWithSQL() {
80+
final ResultSet result = database.queryStream("sql", "select * from Beer limit 10");
81+
assertThat(CollectionUtils.countEntries(result)).isEqualTo(10);
82+
}
83+
84+
@Test
85+
void streamQueryWithOpenCypher() {
86+
final ResultSet result = database.queryStream("opencypher", "MATCH(p:Beer) RETURN p LIMIT 10");
87+
assertThat(CollectionUtils.countEntries(result)).isEqualTo(10);
88+
}
89+
7290
}

grpc-client/src/main/java/com/arcadedb/remote/grpc/RemoteGrpcDatabase.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,10 @@ public ResultSet query(final String language, final String query, RemoteGrpcConf
553553
checkDatabaseIsOpen();
554554
stats.queries.incrementAndGet();
555555

556-
ExecuteQueryRequest.Builder requestBuilder = ExecuteQueryRequest.newBuilder().setDatabase(getName()).setQuery(query)
556+
ExecuteQueryRequest.Builder requestBuilder = ExecuteQueryRequest.newBuilder()
557+
.setDatabase(getName())
558+
.setQuery(query)
559+
.setLanguage(langOrDefault(language))
557560
.setCredentials(buildCredentials());
558561

559562
if (transactionId != null) {
@@ -776,6 +779,7 @@ public ResultSet queryStream(final String language, final String query, final Re
776779
stats.queries.incrementAndGet();
777780

778781
StreamQueryRequest.Builder b = StreamQueryRequest.newBuilder().setDatabase(getName()).setQuery(query)
782+
.setLanguage(langOrDefault(language))
779783
.setCredentials(buildCredentials())
780784
.setBatchSize(batchSize > 0 ? batchSize : 100).setRetrievalMode(mode);
781785

@@ -1763,6 +1767,7 @@ private static String langOrDefault(String language) {
17631767

17641768
private Iterator<Record> streamQuery(final String query) {
17651769
StreamQueryRequest request = StreamQueryRequest.newBuilder().setDatabase(getName()).setQuery(query)
1770+
.setLanguage("sql")
17661771
.setCredentials(buildCredentials())
17671772
.setBatchSize(100).build();
17681773

grpc/src/main/proto/arcadedb-server.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,7 @@ message ExecuteQueryRequest {
286286
int32 timeout_ms = 7;
287287

288288
ProjectionSettings projectionSettings = 8;
289+
string language = 9; // "sql" if empty (default)
289290
}
290291

291292
message ExecuteQueryResponse {

grpcw/src/main/java/com/arcadedb/server/grpc/ArcadeDbGrpcService.java

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -252,8 +252,7 @@ private ExecuteCommandResponse executeCommandInternal(ExecuteCommandRequest req,
252252
try {
253253
final Map<String, Object> params = GrpcTypeConverter.convertParameters(req.getParametersMap());
254254

255-
// Language defaults to "sql" when empty
256-
final String language = (req.getLanguage() == null || req.getLanguage().isEmpty()) ? "sql" : req.getLanguage();
255+
final String language = langOrDefault(req.getLanguage());
257256

258257
// Transaction: begin if requested
259258
final boolean hasTx = req.hasTransaction();
@@ -818,9 +817,11 @@ public void executeQuery(ExecuteQueryRequest request, StreamObserver<ExecuteQuer
818817
// Execute the query
819818
long startTime = System.currentTimeMillis();
820819

821-
LogManager.instance().log(this, Level.FINE, "executeQuery(): query = %s", request.getQuery());
820+
final String language = langOrDefault(request.getLanguage());
822821

823-
ResultSet resultSet = database.query("sql", request.getQuery(),
822+
LogManager.instance().log(this, Level.FINE, "executeQuery(): language = %s query = %s", language, request.getQuery());
823+
824+
ResultSet resultSet = database.query(language, request.getQuery(),
824825
GrpcTypeConverter.convertParameters(request.getParametersMap()));
825826

826827
LogManager.instance()
@@ -1105,12 +1106,20 @@ public void streamQuery(StreamQueryRequest request, StreamObserver<QueryResult>
11051106
beganHere = true;
11061107
}
11071108

1109+
final String language = langOrDefault(request.getLanguage());
1110+
11081111
// --- Dispatch on mode (helpers do NOT manage transactions) ---
1112+
// PAGED mode uses SQL-specific SKIP/LIMIT wrapping, so fall back to CURSOR for non-SQL languages
11091113
switch (request.getRetrievalMode()) {
1110-
case MATERIALIZE_ALL -> streamMaterialized(db, request, batchSize, scso, cancelled, projectionConfig);
1111-
case PAGED -> streamPaged(db, request, batchSize, scso, cancelled, projectionConfig);
1112-
case CURSOR -> streamCursor(db, request, batchSize, scso, cancelled, projectionConfig);
1113-
default -> streamCursor(db, request, batchSize, scso, cancelled, projectionConfig);
1114+
case MATERIALIZE_ALL -> streamMaterialized(db, request, batchSize, scso, cancelled, projectionConfig, language);
1115+
case PAGED -> {
1116+
if (!"sql".equalsIgnoreCase(language))
1117+
streamCursor(db, request, batchSize, scso, cancelled, projectionConfig, language);
1118+
else
1119+
streamPaged(db, request, batchSize, scso, cancelled, projectionConfig, language);
1120+
}
1121+
case CURSOR -> streamCursor(db, request, batchSize, scso, cancelled, projectionConfig, language);
1122+
default -> streamCursor(db, request, batchSize, scso, cancelled, projectionConfig, language);
11141123
}
11151124

11161125
// If the client cancelled mid-stream, choose rollback unless caller explicitly
@@ -1171,14 +1180,14 @@ public void streamQuery(StreamQueryRequest request, StreamObserver<QueryResult>
11711180
*/
11721181
private void streamCursor(Database db, StreamQueryRequest request, int batchSize,
11731182
ServerCallStreamObserver<QueryResult> scso,
1174-
AtomicBoolean cancelled, ProjectionConfig projectionConfig) {
1183+
AtomicBoolean cancelled, ProjectionConfig projectionConfig, String language) {
11751184

11761185
long running = 0L;
11771186

11781187
QueryResult.Builder batch = QueryResult.newBuilder();
11791188
int inBatch = 0;
11801189

1181-
try (ResultSet rs = db.query("sql", request.getQuery(),
1190+
try (ResultSet rs = db.query(language, request.getQuery(),
11821191
GrpcTypeConverter.convertParameters(request.getParametersMap()))) {
11831192

11841193
while (rs.hasNext()) {
@@ -1242,11 +1251,11 @@ private void streamCursor(Database db, StreamQueryRequest request, int batchSize
12421251
*/
12431252
private void streamMaterialized(Database db, StreamQueryRequest request, int batchSize,
12441253
ServerCallStreamObserver<QueryResult> scso,
1245-
AtomicBoolean cancelled, ProjectionConfig projectionConfig) {
1254+
AtomicBoolean cancelled, ProjectionConfig projectionConfig, String language) {
12461255

12471256
final List<GrpcRecord> all = new ArrayList<>();
12481257

1249-
try (ResultSet rs = db.query("sql", request.getQuery(),
1258+
try (ResultSet rs = db.query(language, request.getQuery(),
12501259
GrpcTypeConverter.convertParameters(request.getParametersMap()))) {
12511260

12521261
while (rs.hasNext()) {
@@ -1295,7 +1304,7 @@ private void streamMaterialized(Database db, StreamQueryRequest request, int bat
12951304
*/
12961305
private void streamPaged(Database db, StreamQueryRequest request, int batchSize,
12971306
ServerCallStreamObserver<QueryResult> scso,
1298-
AtomicBoolean cancelled, ProjectionConfig projectionConfig) {
1307+
AtomicBoolean cancelled, ProjectionConfig projectionConfig, String language) {
12991308

13001309
final String pagedSql = wrapWithSkipLimit(request.getQuery()); // see helper below
13011310
int offset = 0;
@@ -1313,7 +1322,7 @@ private void streamPaged(Database db, StreamQueryRequest request, int batchSize,
13131322
int count = 0;
13141323
QueryResult.Builder b = QueryResult.newBuilder();
13151324

1316-
try (ResultSet rs = db.query("sql", pagedSql, params)) {
1325+
try (ResultSet rs = db.query(language, pagedSql, params)) {
13171326
while (rs.hasNext()) {
13181327
if (cancelled.get())
13191328
return;
@@ -3063,6 +3072,10 @@ private String generateTransactionId() {
30633072
return "tx_" + System.nanoTime();
30643073
}
30653074

3075+
private static String langOrDefault(String language) {
3076+
return (language == null || language.isEmpty()) ? "sql" : language;
3077+
}
3078+
30663079
// ---- Debug helpers ----
30673080
private static String summarizeJava(Object o) {
30683081
if (o == null)

0 commit comments

Comments
 (0)