Skip to content

Commit f164af5

Browse files
author
Almas Abdrazak
committed
address Ross comments
1 parent fa700ae commit f164af5

17 files changed

Lines changed: 196 additions & 47 deletions

File tree

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidWriteOperationThenCursorReadOperation.java renamed to driver-core/src/main/com/mongodb/internal/operation/VoidWriteOperationThenCursorReadOperation.java

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,27 +14,40 @@
1414
* limitations under the License.
1515
*/
1616

17-
package com.mongodb.reactivestreams.client.internal;
17+
package com.mongodb.internal.operation;
1818

1919
import com.mongodb.MongoNamespace;
2020
import com.mongodb.internal.async.AsyncBatchCursor;
2121
import com.mongodb.internal.async.SingleResultCallback;
2222
import com.mongodb.internal.binding.AsyncReadWriteBinding;
23+
import com.mongodb.internal.binding.ReadWriteBinding;
2324
import com.mongodb.internal.connection.OperationContext;
24-
import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
25-
import com.mongodb.internal.operation.ReadOperationCursor;
26-
import com.mongodb.internal.operation.WriteOperation;
2725

28-
class VoidWriteOperationThenCursorReadOperation<T> implements AsyncWriteThenReadOperationCursor<T> {
26+
/**
27+
* A {@link WriteThenReadOperationCursor} that performs a {@link WriteOperation} returning
28+
* {@code Void} followed by a {@link ReadOperationCursor}, using a single read-write
29+
* binding for both phases.
30+
*
31+
* <p>This class is not part of the public API and may be removed or changed at any time</p>
32+
*/
33+
public final class VoidWriteOperationThenCursorReadOperation<T> implements WriteThenReadOperationCursor<T> {
2934
private final WriteOperation<Void> writeOperation;
3035
private final ReadOperationCursor<T> cursorReadOperation;
3136

32-
VoidWriteOperationThenCursorReadOperation(final WriteOperation<Void> writeOperation,
33-
final ReadOperationCursor<T> cursorReadOperation) {
37+
public VoidWriteOperationThenCursorReadOperation(final WriteOperation<Void> writeOperation,
38+
final ReadOperationCursor<T> cursorReadOperation) {
3439
this.writeOperation = writeOperation;
3540
this.cursorReadOperation = cursorReadOperation;
3641
}
3742

43+
public WriteOperation<Void> getWriteOperation() {
44+
return writeOperation;
45+
}
46+
47+
public ReadOperationCursor<T> getCursorReadOperation() {
48+
return cursorReadOperation;
49+
}
50+
3851
@Override
3952
public String getCommandName() {
4053
return writeOperation.getCommandName();
@@ -45,6 +58,12 @@ public MongoNamespace getNamespace() {
4558
return writeOperation.getNamespace();
4659
}
4760

61+
@Override
62+
public BatchCursor<T> execute(final ReadWriteBinding binding, final OperationContext operationContext) {
63+
writeOperation.execute(binding, operationContext);
64+
return cursorReadOperation.execute(binding, operationContext);
65+
}
66+
4867
@Override
4968
public void executeAsync(final AsyncReadWriteBinding binding, final OperationContext operationContext,
5069
final SingleResultCallback<AsyncBatchCursor<T>> callback) {

driver-core/src/main/com/mongodb/internal/operation/AsyncWriteThenReadOperationCursor.java renamed to driver-core/src/main/com/mongodb/internal/operation/WriteThenReadOperationCursor.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,20 @@
2020
import com.mongodb.internal.async.AsyncBatchCursor;
2121
import com.mongodb.internal.async.SingleResultCallback;
2222
import com.mongodb.internal.binding.AsyncReadWriteBinding;
23+
import com.mongodb.internal.binding.ReadWriteBinding;
2324
import com.mongodb.internal.connection.OperationContext;
2425

2526
/**
26-
* An async-only operation that performs a write followed by a read that returns a cursor.
27+
* An operation that performs a write followed by a read that returns a cursor, using a
28+
* single read-write binding for both phases.
2729
*
28-
* <p>Unlike {@link ReadOperationCursor}, this operation requires an {@link AsyncReadWriteBinding}
29-
* so that both the write and the read portions can be executed without narrowing casts.
30+
* <p>Having a dedicated interface lets the executor hand the operation a binding that is
31+
* both a {@link ReadWriteBinding} and an {@link AsyncReadWriteBinding}, avoiding the
32+
* narrowing cast from a read-only binding that would otherwise be required.
3033
*
3134
* <p>This class is not part of the public API and may be removed or changed at any time</p>
3235
*/
33-
public interface AsyncWriteThenReadOperationCursor<T> {
36+
public interface WriteThenReadOperationCursor<T> {
3437

3538
/**
3639
* @return the command name of the write phase of this operation (e.g. "mapReduce", "aggregate")
@@ -42,6 +45,16 @@ public interface AsyncWriteThenReadOperationCursor<T> {
4245
*/
4346
MongoNamespace getNamespace();
4447

48+
/**
49+
* Executes the write phase followed by the read phase, returning a {@link BatchCursor}
50+
* over the results of the read.
51+
*
52+
* @param binding the read-write binding used by both phases
53+
* @param operationContext the operation context to use
54+
* @return the batch cursor produced by the read phase
55+
*/
56+
BatchCursor<T> execute(ReadWriteBinding binding, OperationContext operationContext);
57+
4558
/**
4659
* Executes the write phase followed by the read phase, yielding an {@link AsyncBatchCursor}
4760
* over the results of the read.

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@
2727
import com.mongodb.internal.observability.micrometer.TracingManager;
2828
import com.mongodb.internal.observability.micrometer.TransactionSpan;
2929
import com.mongodb.internal.operation.AbortTransactionOperation;
30-
import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
3130
import com.mongodb.internal.operation.CommitTransactionOperation;
3231
import com.mongodb.internal.operation.ReadOperation;
3332
import com.mongodb.internal.operation.WriteConcernHelper;
3433
import com.mongodb.internal.operation.WriteOperation;
34+
import com.mongodb.internal.operation.WriteThenReadOperationCursor;
3535
import com.mongodb.internal.session.BaseClientSessionImpl;
3636
import com.mongodb.internal.session.ServerSessionPool;
3737
import com.mongodb.lang.Nullable;
@@ -89,7 +89,8 @@ public boolean notifyMessageSent() {
8989

9090
@Override
9191
public void notifyOperationInitiated(final Object operation) {
92-
assertTrue(operation instanceof ReadOperation || operation instanceof WriteOperation || operation instanceof AsyncWriteThenReadOperationCursor);
92+
assertTrue(operation instanceof ReadOperation || operation instanceof WriteOperation
93+
|| operation instanceof WriteThenReadOperationCursor);
9394
if (!(hasActiveTransaction() || operation instanceof CommitTransactionOperation)) {
9495
assertTrue(getPinnedServerAddress() == null
9596
|| (transactionState != TransactionState.ABORTED && transactionState != TransactionState.NONE));

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import com.mongodb.internal.operation.Operations;
3535
import com.mongodb.internal.operation.ReadOperation;
3636
import com.mongodb.internal.operation.ReadOperationCursor;
37+
import com.mongodb.internal.operation.VoidWriteOperationThenCursorReadOperation;
3738
import com.mongodb.internal.operation.WriteOperation;
3839
import com.mongodb.lang.Nullable;
3940
import com.mongodb.reactivestreams.client.ClientSession;

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@
6161
import com.mongodb.internal.async.SingleResultCallback;
6262
import com.mongodb.internal.bulk.WriteRequest;
6363
import com.mongodb.internal.async.AsyncBatchCursor;
64-
import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
64+
import com.mongodb.internal.operation.WriteThenReadOperationCursor;
6565
import com.mongodb.internal.operation.IndexHelper;
6666
import com.mongodb.internal.operation.Operations;
6767
import com.mongodb.internal.operation.ReadOperation;
@@ -517,9 +517,9 @@ <T> Mono<T> createReadOperationMono(final Supplier<TimeoutSettings> timeoutSetti
517517

518518
<R> Mono<AsyncBatchCursor<R>> createWriteThenReadOperationMono(
519519
final Function<Operations<?>, TimeoutSettings> timeoutSettingsFunction,
520-
final Supplier<AsyncWriteThenReadOperationCursor<R>> operationSupplier,
520+
final Supplier<WriteThenReadOperationCursor<R>> operationSupplier,
521521
@Nullable final ClientSession clientSession) {
522-
AsyncWriteThenReadOperationCursor<R> operation = operationSupplier.get();
522+
WriteThenReadOperationCursor<R> operation = operationSupplier.get();
523523
return getExecutor(timeoutSettingsFunction.apply(operations))
524524
.execute(operation, getReadConcern(), clientSession);
525525
}

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020
import com.mongodb.ReadPreference;
2121
import com.mongodb.internal.TimeoutSettings;
2222
import com.mongodb.internal.async.AsyncBatchCursor;
23-
import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
2423
import com.mongodb.internal.operation.ReadOperation;
2524
import com.mongodb.internal.operation.WriteOperation;
25+
import com.mongodb.internal.operation.WriteThenReadOperationCursor;
2626
import com.mongodb.lang.Nullable;
2727
import com.mongodb.reactivestreams.client.ClientSession;
2828
import reactor.core.publisher.Mono;
@@ -67,7 +67,7 @@ <T> Mono<T> execute(ReadOperation<?, T> operation, ReadPreference readPreference
6767
* @param session the session to associate this operation with
6868
* @param <T> the document type returned by the cursor.
6969
*/
70-
<T> Mono<AsyncBatchCursor<T>> execute(AsyncWriteThenReadOperationCursor<T> operation,
70+
<T> Mono<AsyncBatchCursor<T>> execute(WriteThenReadOperationCursor<T> operation,
7171
ReadConcern readConcern, @Nullable ClientSession session);
7272

7373
/**

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@
3434
import com.mongodb.internal.observability.micrometer.Span;
3535
import com.mongodb.internal.observability.micrometer.TracingManager;
3636
import com.mongodb.internal.async.AsyncBatchCursor;
37-
import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
3837
import com.mongodb.internal.operation.OperationHelper;
3938
import com.mongodb.internal.operation.ReadOperation;
4039
import com.mongodb.internal.operation.WriteOperation;
40+
import com.mongodb.internal.operation.WriteThenReadOperationCursor;
4141
import com.mongodb.lang.Nullable;
4242
import com.mongodb.reactivestreams.client.ClientSession;
4343
import com.mongodb.reactivestreams.client.ReactiveContextProvider;
@@ -182,7 +182,7 @@ public <T> Mono<T> execute(final WriteOperation<T> operation, final ReadConcern
182182
}
183183

184184
@Override
185-
public <T> Mono<AsyncBatchCursor<T>> execute(final AsyncWriteThenReadOperationCursor<T> operation, final ReadConcern readConcern,
185+
public <T> Mono<AsyncBatchCursor<T>> execute(final WriteThenReadOperationCursor<T> operation, final ReadConcern readConcern,
186186
@Nullable final ClientSession session) {
187187
isTrue("open", !mongoClient.getCluster().isClosed());
188188
notNull("operation", operation);

driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MapReducePublisherImplTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import com.mongodb.ReadPreference;
2121
import com.mongodb.WriteConcern;
2222
import com.mongodb.client.model.Sorts;
23-
import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
23+
import com.mongodb.internal.operation.WriteThenReadOperationCursor;
2424
import com.mongodb.internal.operation.MapReduceStatistics;
2525
import com.mongodb.internal.operation.MapReduceToCollectionOperation;
2626
import com.mongodb.internal.operation.MapReduceWithInlineResultsOperation;
@@ -66,7 +66,7 @@ void shouldRouteNonInlineMapReduceThroughWriteThenReadPath() {
6666

6767
Flux.from(publisher).blockFirst();
6868

69-
AsyncWriteThenReadOperationCursor<?> op = executor.getWriteThenReadOperation();
69+
WriteThenReadOperationCursor<?> op = executor.getWriteThenReadOperation();
7070
assertNotNull(op, "expected a write-then-read operation");
7171
assertEquals(NAMESPACE, op.getNamespace());
7272
// Must not fall through to the plain read-operation path.

driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestHelper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import com.mongodb.internal.bulk.IndexRequest;
2727
import com.mongodb.internal.bulk.WriteRequest;
2828
import com.mongodb.internal.client.model.FindOptions;
29-
import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
29+
import com.mongodb.internal.operation.WriteThenReadOperationCursor;
3030
import com.mongodb.internal.operation.ReadOperation;
3131
import com.mongodb.internal.operation.WriteOperation;
3232
import com.mongodb.lang.NonNull;
@@ -97,7 +97,7 @@ public class TestHelper {
9797
.execute(any(WriteOperation.class), any(), any());
9898
Mockito.lenient().doAnswer(invocation -> Mono.empty())
9999
.when(executor)
100-
.execute(any(AsyncWriteThenReadOperationCursor.class), any(), any());
100+
.execute(any(WriteThenReadOperationCursor.class), any(), any());
101101
Mockito.lenient().doAnswer(invocation -> Mono.empty())
102102
.when(executor)
103103
.execute(any(), any(), any(), any());

driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestOperationExecutor.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import com.mongodb.ReadPreference;
2121
import com.mongodb.internal.TimeoutSettings;
2222
import com.mongodb.internal.async.AsyncBatchCursor;
23-
import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
23+
import com.mongodb.internal.operation.WriteThenReadOperationCursor;
2424
import com.mongodb.internal.operation.ReadOperation;
2525
import com.mongodb.internal.operation.WriteOperation;
2626
import com.mongodb.lang.Nullable;
@@ -39,7 +39,7 @@ public class TestOperationExecutor implements OperationExecutor {
3939

4040
private final List<ReadOperation> readOperations = new ArrayList<>();
4141
private final List<WriteOperation> writeOperations = new ArrayList<>();
42-
private final List<AsyncWriteThenReadOperationCursor> writeThenReadOperations = new ArrayList<>();
42+
private final List<WriteThenReadOperationCursor> writeThenReadOperations = new ArrayList<>();
4343

4444
public TestOperationExecutor(final List<Object> responses) {
4545
this.responses = new ArrayList<>(responses);
@@ -64,7 +64,7 @@ public <T> Mono<T> execute(final WriteOperation<T> operation, final ReadConcern
6464
}
6565

6666
@Override
67-
public <T> Mono<AsyncBatchCursor<T>> execute(final AsyncWriteThenReadOperationCursor<T> operation, final ReadConcern readConcern,
67+
public <T> Mono<AsyncBatchCursor<T>> execute(final WriteThenReadOperationCursor<T> operation, final ReadConcern readConcern,
6868
@Nullable final ClientSession session) {
6969
clientSessions.add(session);
7070
writeThenReadOperations.add(operation);
@@ -118,7 +118,7 @@ WriteOperation getWriteOperation() {
118118
}
119119

120120
@Nullable
121-
AsyncWriteThenReadOperationCursor getWriteThenReadOperation() {
121+
WriteThenReadOperationCursor getWriteThenReadOperation() {
122122
return writeThenReadOperations.isEmpty() ? null : writeThenReadOperations.remove(0);
123123
}
124124

0 commit comments

Comments
 (0)