Skip to content

Commit c79bc72

Browse files
author
Almas Abdrazak
committed
1 parent e9c0c4b commit c79bc72

9 files changed

Lines changed: 214 additions & 9 deletions

File tree

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.internal.operation;
18+
19+
import com.mongodb.MongoNamespace;
20+
import com.mongodb.internal.async.AsyncBatchCursor;
21+
import com.mongodb.internal.async.SingleResultCallback;
22+
import com.mongodb.internal.binding.AsyncReadWriteBinding;
23+
import com.mongodb.internal.connection.OperationContext;
24+
25+
/**
26+
* An async-only operation that performs a write followed by a read that returns a cursor.
27+
*
28+
* <p>Unlike {@link ReadOperationCursor}, this operation requires an {@link AsyncReadWriteBinding}
29+
* so that both the write and the read portions can be executed without narrowing casts.
30+
*
31+
* <p>This class is not part of the public API and may be removed or changed at any time</p>
32+
*/
33+
public interface AsyncWriteThenReadOperationCursor<T> {
34+
35+
/**
36+
* @return the command name of the operation, e.g. "insert", "update", "delete", "bulkWrite", etc.
37+
*/
38+
String getCommandName();
39+
40+
/**
41+
* @return the namespace of the operation
42+
*/
43+
MongoNamespace getNamespace();
44+
45+
/**
46+
* General execute which can return anything of type T
47+
*
48+
* @param binding the binding to execute in the context of
49+
* @param operationContext the operation context to use
50+
* @param callback the callback to be called when the operation has been executed
51+
*/
52+
void executeAsync(AsyncReadWriteBinding binding, OperationContext operationContext,
53+
SingleResultCallback<AsyncBatchCursor<T>> callback);
54+
}

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.mongodb.internal.binding.WriteBinding;
2929
import com.mongodb.internal.client.model.FindOptions;
3030
import com.mongodb.internal.connection.OperationContext;
31+
import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
3132
import com.mongodb.internal.operation.MapReduceAsyncBatchCursor;
3233
import com.mongodb.internal.operation.MapReduceBatchCursor;
3334
import com.mongodb.internal.operation.MapReduceStatistics;
@@ -40,6 +41,7 @@
4041
import org.bson.BsonDocument;
4142
import org.bson.conversions.Bson;
4243
import org.reactivestreams.Publisher;
44+
import reactor.core.publisher.Mono;
4345

4446
import java.util.concurrent.TimeUnit;
4547
import java.util.function.Function;
@@ -201,10 +203,46 @@ public ReadOperationCursor<T> asReadOperation(final int initialBatchSize) {
201203
if (inline) {
202204
// initialBatchSize is ignored for map reduce operations.
203205
return createMapReduceInlineOperation();
204-
} else {
205-
return new VoidWriteOperationThenCursorReadOperation<>(createMapReduceToCollectionOperation(),
206-
createFindOperation(initialBatchSize));
207206
}
207+
throw new IllegalStateException("Non-inline map-reduce uses the write-then-read path; "
208+
+ "asReadOperation must not be called.");
209+
}
210+
211+
@Override
212+
public Mono<BatchCursor<T>> batchCursor(final int initialBatchSize) {
213+
if (inline) {
214+
return super.batchCursor(initialBatchSize);
215+
}
216+
return writeThenReadBatchCursor(initialBatchSize);
217+
}
218+
219+
@Override
220+
public Publisher<T> first() {
221+
if (inline) {
222+
return super.first();
223+
}
224+
return writeThenReadBatchCursor(1)
225+
.flatMap(batchCursor -> {
226+
batchCursor.setBatchSize(1);
227+
return Mono.from(batchCursor.next())
228+
.doOnTerminate(batchCursor::close)
229+
.flatMap(results -> {
230+
if (results == null || results.isEmpty()) {
231+
return Mono.empty();
232+
}
233+
return Mono.fromCallable(() -> results.get(0));
234+
});
235+
});
236+
}
237+
238+
private Mono<BatchCursor<T>> writeThenReadBatchCursor(final int initialBatchSize) {
239+
return getMongoOperationPublisher()
240+
.createWriteThenReadOperationMono(
241+
operations -> operations.createTimeoutSettings(maxTimeMS),
242+
() -> new VoidWriteOperationThenCursorReadOperation<>(createMapReduceToCollectionOperation(),
243+
createFindOperation(initialBatchSize)),
244+
getClientSession())
245+
.map(BatchCursor::new);
208246
}
209247

210248
private WrappedMapReduceReadOperation<T> createMapReduceInlineOperation() {

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@
6060
import com.mongodb.internal.TimeoutSettings;
6161
import com.mongodb.internal.async.SingleResultCallback;
6262
import com.mongodb.internal.bulk.WriteRequest;
63+
import com.mongodb.internal.async.AsyncBatchCursor;
64+
import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
6365
import com.mongodb.internal.operation.IndexHelper;
6466
import com.mongodb.internal.operation.Operations;
6567
import com.mongodb.internal.operation.ReadOperation;
@@ -513,6 +515,15 @@ <T> Mono<T> createReadOperationMono(final Supplier<TimeoutSettings> timeoutSetti
513515
.execute(readOperation, readPreference, getReadConcern(), clientSession);
514516
}
515517

518+
<R> Mono<AsyncBatchCursor<R>> createWriteThenReadOperationMono(
519+
final Function<Operations<?>, TimeoutSettings> timeoutSettingsFunction,
520+
final Supplier<AsyncWriteThenReadOperationCursor<R>> operationSupplier,
521+
@Nullable final ClientSession clientSession) {
522+
AsyncWriteThenReadOperationCursor<R> operation = operationSupplier.get();
523+
return getExecutor(timeoutSettingsFunction.apply(operations))
524+
.execute(operation, getReadConcern(), clientSession);
525+
}
526+
516527
<R> Mono<R> createWriteOperationMono(final Function<Operations<?>, TimeoutSettings> timeoutSettingsFunction,
517528
final Supplier<WriteOperation<R>> operationSupplier, @Nullable final ClientSession clientSession) {
518529
return createWriteOperationMono(() -> timeoutSettingsFunction.apply(operations), operationSupplier, clientSession);

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutor.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import com.mongodb.ReadConcern;
2020
import com.mongodb.ReadPreference;
2121
import com.mongodb.internal.TimeoutSettings;
22+
import com.mongodb.internal.async.AsyncBatchCursor;
23+
import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
2224
import com.mongodb.internal.operation.ReadOperation;
2325
import com.mongodb.internal.operation.WriteOperation;
2426
import com.mongodb.lang.Nullable;
@@ -54,6 +56,20 @@ <T> Mono<T> execute(ReadOperation<?, T> operation, ReadPreference readPreference
5456
*/
5557
<T> Mono<T> execute(WriteOperation<T> operation, ReadConcern readConcern, @Nullable ClientSession session);
5658

59+
/**
60+
* Execute an operation that writes and then reads a cursor within a single read-write binding.
61+
*
62+
* <p>The binding is acquired once and used for both phases, avoiding the need to narrow an
63+
* {@code AsyncReadBinding} to an {@code AsyncWriteBinding}.
64+
*
65+
* @param operation the write-then-read operation.
66+
* @param readConcern the read concern
67+
* @param session the session to associate this operation with
68+
* @param <T> the document type returned by the cursor.
69+
*/
70+
<T> Mono<AsyncBatchCursor<T>> execute(AsyncWriteThenReadOperationCursor<T> operation,
71+
ReadConcern readConcern, @Nullable ClientSession session);
72+
5773
/**
5874
* Create a new OperationExecutor with a specific timeout settings
5975
*

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import com.mongodb.internal.connection.ReadConcernAwareNoOpSessionContext;
3434
import com.mongodb.internal.observability.micrometer.Span;
3535
import com.mongodb.internal.observability.micrometer.TracingManager;
36+
import com.mongodb.internal.async.AsyncBatchCursor;
37+
import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
3638
import com.mongodb.internal.operation.OperationHelper;
3739
import com.mongodb.internal.operation.ReadOperation;
3840
import com.mongodb.internal.operation.WriteOperation;
@@ -179,6 +181,47 @@ public <T> Mono<T> execute(final WriteOperation<T> operation, final ReadConcern
179181
);
180182
}
181183

184+
@Override
185+
public <T> Mono<AsyncBatchCursor<T>> execute(final AsyncWriteThenReadOperationCursor<T> operation, final ReadConcern readConcern,
186+
@Nullable final ClientSession session) {
187+
isTrue("open", !mongoClient.getCluster().isClosed());
188+
notNull("operation", operation);
189+
notNull("readConcern", readConcern);
190+
191+
return Mono.from(subscriber ->
192+
clientSessionHelper.withClientSession(session, this)
193+
.flatMap(actualClientSession -> {
194+
AsyncReadWriteBinding binding = getReadWriteBinding(primary(), actualClientSession, session == null);
195+
RequestContext requestContext = getContext(subscriber);
196+
OperationContext operationContext = getOperationContext(requestContext, actualClientSession, readConcern, operation.getCommandName())
197+
.withSessionContext(new ClientSessionBinding.AsyncClientSessionContext(actualClientSession,
198+
isImplicitSession(session), readConcern));
199+
Span span = tracingManager.createOperationSpan(actualClientSession.getTransactionSpan(),
200+
operationContext, operation.getCommandName(), operation.getNamespace());
201+
202+
return Mono.<AsyncBatchCursor<T>>create(sink -> operation.executeAsync(binding, operationContext, (result, t) -> {
203+
try {
204+
binding.release();
205+
} finally {
206+
if (t != null) {
207+
Throwable exceptionToHandle = t instanceof MongoException
208+
? OperationHelper.unwrap((MongoException) t) : t;
209+
labelException(session, exceptionToHandle);
210+
unpinServerAddressOnTransientTransactionError(session, exceptionToHandle);
211+
if (span != null) {
212+
span.error(t);
213+
}
214+
}
215+
if (span != null) {
216+
span.end();
217+
}
218+
sinkToCallback(sink).onResult(result, t);
219+
}
220+
}));
221+
}).subscribe(subscriber)
222+
);
223+
}
224+
182225
@Override
183226
public OperationExecutor withTimeoutSettings(final TimeoutSettings newTimeoutSettings) {
184227
if (Objects.equals(timeoutSettings, newTimeoutSettings)) {

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidWriteOperationThenCursorReadOperation.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@
1919
import com.mongodb.MongoNamespace;
2020
import com.mongodb.internal.async.AsyncBatchCursor;
2121
import com.mongodb.internal.async.SingleResultCallback;
22-
import com.mongodb.internal.binding.AsyncReadBinding;
23-
import com.mongodb.internal.binding.AsyncWriteBinding;
22+
import com.mongodb.internal.binding.AsyncReadWriteBinding;
2423
import com.mongodb.internal.connection.OperationContext;
24+
import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
2525
import com.mongodb.internal.operation.ReadOperationCursor;
2626
import com.mongodb.internal.operation.WriteOperation;
2727

28-
class VoidWriteOperationThenCursorReadOperation<T> implements ReadOperationCursorAsyncOnly<T> {
28+
class VoidWriteOperationThenCursorReadOperation<T> implements AsyncWriteThenReadOperationCursor<T> {
2929
private final WriteOperation<Void> writeOperation;
3030
private final ReadOperationCursor<T> cursorReadOperation;
3131

@@ -46,8 +46,9 @@ public MongoNamespace getNamespace() {
4646
}
4747

4848
@Override
49-
public void executeAsync(final AsyncReadBinding binding, final OperationContext operationContext, final SingleResultCallback<AsyncBatchCursor<T>> callback) {
50-
writeOperation.executeAsync((AsyncWriteBinding) binding, operationContext, (result, t) -> {
49+
public void executeAsync(final AsyncReadWriteBinding binding, final OperationContext operationContext,
50+
final SingleResultCallback<AsyncBatchCursor<T>> callback) {
51+
writeOperation.executeAsync(binding, operationContext, (result, t) -> {
5152
if (t != null) {
5253
callback.onResult(null, t);
5354
} else {

driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MapReducePublisherImplTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@
2323
import com.mongodb.internal.operation.MapReduceStatistics;
2424
import com.mongodb.internal.operation.MapReduceToCollectionOperation;
2525
import com.mongodb.internal.operation.MapReduceWithInlineResultsOperation;
26+
27+
import com.mongodb.reactivestreams.client.MapReducePublisher;
28+
29+
2630
import org.bson.BsonDocument;
2731
import org.bson.BsonInt32;
2832
import org.bson.BsonJavaScript;
@@ -39,6 +43,7 @@
3943
import static java.util.concurrent.TimeUnit.MILLISECONDS;
4044
import static org.junit.jupiter.api.Assertions.assertEquals;
4145
import static org.junit.jupiter.api.Assertions.assertNotNull;
46+
import static org.junit.jupiter.api.Assertions.assertNull;
4247
import static org.junit.jupiter.api.Assertions.assertThrows;
4348

4449
@SuppressWarnings({"rawtypes", "deprecation"})
@@ -48,6 +53,23 @@ public class MapReducePublisherImplTest extends TestHelper {
4853
private static final String REDUCE_FUNCTION = "reduceFunction(){}";
4954
private static final String FINALIZE_FUNCTION = "finalizeFunction(){}";
5055

56+
@DisplayName("Inline MapReduce still routes through the read-operation path")
57+
@Test
58+
void shouldRouteInlineMapReduceThroughReadOperationPath() {
59+
configureBatchCursor();
60+
TestOperationExecutor executor = createOperationExecutor(asList(getBatchCursor()));
61+
62+
MapReducePublisher<Document> publisher =
63+
new MapReducePublisherImpl<>(null, createMongoOperationPublisher(executor),
64+
MAP_FUNCTION, REDUCE_FUNCTION); // no collectionName -> inline
65+
66+
Flux.from(publisher).blockFirst();
67+
68+
assertNotNull(executor.getReadOperation());
69+
assertNull(executor.getWriteThenReadOperation());
70+
}
71+
72+
5173
@DisplayName("Should build the expected MapReduceWithInlineResultsOperation")
5274
@Test
5375
void shouldBuildTheExpectedMapReduceWithInlineResultsOperation() {

driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestHelper.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.mongodb.internal.bulk.IndexRequest;
2727
import com.mongodb.internal.bulk.WriteRequest;
2828
import com.mongodb.internal.client.model.FindOptions;
29+
import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
2930
import com.mongodb.internal.operation.ReadOperation;
3031
import com.mongodb.internal.operation.WriteOperation;
3132
import com.mongodb.lang.NonNull;
@@ -93,7 +94,10 @@ public class TestHelper {
9394

9495
Mockito.lenient().doAnswer(invocation -> Mono.empty())
9596
.when(executor)
96-
.execute(any(), any(), any());
97+
.execute(any(WriteOperation.class), any(), any());
98+
Mockito.lenient().doAnswer(invocation -> Mono.empty())
99+
.when(executor)
100+
.execute(any(AsyncWriteThenReadOperationCursor.class), any(), any());
97101
Mockito.lenient().doAnswer(invocation -> Mono.empty())
98102
.when(executor)
99103
.execute(any(), any(), any(), any());

driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestOperationExecutor.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import com.mongodb.ReadConcern;
2020
import com.mongodb.ReadPreference;
2121
import com.mongodb.internal.TimeoutSettings;
22+
import com.mongodb.internal.async.AsyncBatchCursor;
23+
import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
2224
import com.mongodb.internal.operation.ReadOperation;
2325
import com.mongodb.internal.operation.WriteOperation;
2426
import com.mongodb.lang.Nullable;
@@ -37,6 +39,7 @@ public class TestOperationExecutor implements OperationExecutor {
3739

3840
private final List<ReadOperation> readOperations = new ArrayList<>();
3941
private final List<WriteOperation> writeOperations = new ArrayList<>();
42+
private final List<AsyncWriteThenReadOperationCursor> writeThenReadOperations = new ArrayList<>();
4043

4144
public TestOperationExecutor(final List<Object> responses) {
4245
this.responses = new ArrayList<>(responses);
@@ -60,6 +63,14 @@ public <T> Mono<T> execute(final WriteOperation<T> operation, final ReadConcern
6063
return createMono();
6164
}
6265

66+
@Override
67+
public <T> Mono<AsyncBatchCursor<T>> execute(final AsyncWriteThenReadOperationCursor<T> operation, final ReadConcern readConcern,
68+
@Nullable final ClientSession session) {
69+
clientSessions.add(session);
70+
writeThenReadOperations.add(operation);
71+
return createMono();
72+
}
73+
6374
@Override
6475
public OperationExecutor withTimeoutSettings(final TimeoutSettings timeoutSettings) {
6576
return this;
@@ -106,4 +117,9 @@ WriteOperation getWriteOperation() {
106117
return writeOperations.isEmpty() ? null : writeOperations.remove(0);
107118
}
108119

120+
@Nullable
121+
AsyncWriteThenReadOperationCursor getWriteThenReadOperation() {
122+
return writeThenReadOperations.isEmpty() ? null : writeThenReadOperations.remove(0);
123+
}
124+
109125
}

0 commit comments

Comments
 (0)