From c79bc7277a2f396dc2f074ce6fa55f9bb106e963 Mon Sep 17 00:00:00 2001 From: Almas Abdrazak Date: Tue, 28 Apr 2026 17:07:37 -0700 Subject: [PATCH 1/5] JAVA-2673 --- .../AsyncWriteThenReadOperationCursor.java | 54 +++++++++++++++++++ .../internal/MapReducePublisherImpl.java | 44 +++++++++++++-- .../internal/MongoOperationPublisher.java | 11 ++++ .../client/internal/OperationExecutor.java | 16 ++++++ .../internal/OperationExecutorImpl.java | 43 +++++++++++++++ ...WriteOperationThenCursorReadOperation.java | 11 ++-- .../internal/MapReducePublisherImplTest.java | 22 ++++++++ .../client/internal/TestHelper.java | 6 ++- .../internal/TestOperationExecutor.java | 16 ++++++ 9 files changed, 214 insertions(+), 9 deletions(-) create mode 100644 driver-core/src/main/com/mongodb/internal/operation/AsyncWriteThenReadOperationCursor.java diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncWriteThenReadOperationCursor.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncWriteThenReadOperationCursor.java new file mode 100644 index 00000000000..9c9505f42bd --- /dev/null +++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncWriteThenReadOperationCursor.java @@ -0,0 +1,54 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.operation; + +import com.mongodb.MongoNamespace; +import com.mongodb.internal.async.AsyncBatchCursor; +import com.mongodb.internal.async.SingleResultCallback; +import com.mongodb.internal.binding.AsyncReadWriteBinding; +import com.mongodb.internal.connection.OperationContext; + +/** + * An async-only operation that performs a write followed by a read that returns a cursor. + * + *

Unlike {@link ReadOperationCursor}, this operation requires an {@link AsyncReadWriteBinding} + * so that both the write and the read portions can be executed without narrowing casts. + * + *

This class is not part of the public API and may be removed or changed at any time

+ */ +public interface AsyncWriteThenReadOperationCursor { + + /** + * @return the command name of the operation, e.g. "insert", "update", "delete", "bulkWrite", etc. + */ + String getCommandName(); + + /** + * @return the namespace of the operation + */ + MongoNamespace getNamespace(); + + /** + * General execute which can return anything of type T + * + * @param binding the binding to execute in the context of + * @param operationContext the operation context to use + * @param callback the callback to be called when the operation has been executed + */ + void executeAsync(AsyncReadWriteBinding binding, OperationContext operationContext, + SingleResultCallback> callback); +} diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java index 1e8e7fa223b..4ac7f503254 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java @@ -28,6 +28,7 @@ import com.mongodb.internal.binding.WriteBinding; import com.mongodb.internal.client.model.FindOptions; import com.mongodb.internal.connection.OperationContext; +import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor; import com.mongodb.internal.operation.MapReduceAsyncBatchCursor; import com.mongodb.internal.operation.MapReduceBatchCursor; import com.mongodb.internal.operation.MapReduceStatistics; @@ -40,6 +41,7 @@ import org.bson.BsonDocument; import org.bson.conversions.Bson; import org.reactivestreams.Publisher; +import reactor.core.publisher.Mono; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -201,10 +203,46 @@ public ReadOperationCursor asReadOperation(final int initialBatchSize) { if (inline) { // initialBatchSize is ignored for map reduce operations. return createMapReduceInlineOperation(); - } else { - return new VoidWriteOperationThenCursorReadOperation<>(createMapReduceToCollectionOperation(), - createFindOperation(initialBatchSize)); } + throw new IllegalStateException("Non-inline map-reduce uses the write-then-read path; " + + "asReadOperation must not be called."); + } + + @Override + public Mono> batchCursor(final int initialBatchSize) { + if (inline) { + return super.batchCursor(initialBatchSize); + } + return writeThenReadBatchCursor(initialBatchSize); + } + + @Override + public Publisher first() { + if (inline) { + return super.first(); + } + return writeThenReadBatchCursor(1) + .flatMap(batchCursor -> { + batchCursor.setBatchSize(1); + return Mono.from(batchCursor.next()) + .doOnTerminate(batchCursor::close) + .flatMap(results -> { + if (results == null || results.isEmpty()) { + return Mono.empty(); + } + return Mono.fromCallable(() -> results.get(0)); + }); + }); + } + + private Mono> writeThenReadBatchCursor(final int initialBatchSize) { + return getMongoOperationPublisher() + .createWriteThenReadOperationMono( + operations -> operations.createTimeoutSettings(maxTimeMS), + () -> new VoidWriteOperationThenCursorReadOperation<>(createMapReduceToCollectionOperation(), + createFindOperation(initialBatchSize)), + getClientSession()) + .map(BatchCursor::new); } private WrappedMapReduceReadOperation createMapReduceInlineOperation() { diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java index 84c810f1b5e..8d561859af4 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java @@ -60,6 +60,8 @@ import com.mongodb.internal.TimeoutSettings; import com.mongodb.internal.async.SingleResultCallback; import com.mongodb.internal.bulk.WriteRequest; +import com.mongodb.internal.async.AsyncBatchCursor; +import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor; import com.mongodb.internal.operation.IndexHelper; import com.mongodb.internal.operation.Operations; import com.mongodb.internal.operation.ReadOperation; @@ -513,6 +515,15 @@ Mono createReadOperationMono(final Supplier timeoutSetti .execute(readOperation, readPreference, getReadConcern(), clientSession); } + Mono> createWriteThenReadOperationMono( + final Function, TimeoutSettings> timeoutSettingsFunction, + final Supplier> operationSupplier, + @Nullable final ClientSession clientSession) { + AsyncWriteThenReadOperationCursor operation = operationSupplier.get(); + return getExecutor(timeoutSettingsFunction.apply(operations)) + .execute(operation, getReadConcern(), clientSession); + } + Mono createWriteOperationMono(final Function, TimeoutSettings> timeoutSettingsFunction, final Supplier> operationSupplier, @Nullable final ClientSession clientSession) { return createWriteOperationMono(() -> timeoutSettingsFunction.apply(operations), operationSupplier, clientSession); diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutor.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutor.java index cd666720f33..7c61ecf547e 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutor.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutor.java @@ -19,6 +19,8 @@ import com.mongodb.ReadConcern; import com.mongodb.ReadPreference; import com.mongodb.internal.TimeoutSettings; +import com.mongodb.internal.async.AsyncBatchCursor; +import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor; import com.mongodb.internal.operation.ReadOperation; import com.mongodb.internal.operation.WriteOperation; import com.mongodb.lang.Nullable; @@ -54,6 +56,20 @@ Mono execute(ReadOperation operation, ReadPreference readPreference */ Mono execute(WriteOperation operation, ReadConcern readConcern, @Nullable ClientSession session); + /** + * Execute an operation that writes and then reads a cursor within a single read-write binding. + * + *

The binding is acquired once and used for both phases, avoiding the need to narrow an + * {@code AsyncReadBinding} to an {@code AsyncWriteBinding}. + * + * @param operation the write-then-read operation. + * @param readConcern the read concern + * @param session the session to associate this operation with + * @param the document type returned by the cursor. + */ + Mono> execute(AsyncWriteThenReadOperationCursor operation, + ReadConcern readConcern, @Nullable ClientSession session); + /** * Create a new OperationExecutor with a specific timeout settings * diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java index 62a4431cc9a..a29f15ad55b 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java @@ -33,6 +33,8 @@ import com.mongodb.internal.connection.ReadConcernAwareNoOpSessionContext; import com.mongodb.internal.observability.micrometer.Span; import com.mongodb.internal.observability.micrometer.TracingManager; +import com.mongodb.internal.async.AsyncBatchCursor; +import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor; import com.mongodb.internal.operation.OperationHelper; import com.mongodb.internal.operation.ReadOperation; import com.mongodb.internal.operation.WriteOperation; @@ -179,6 +181,47 @@ public Mono execute(final WriteOperation operation, final ReadConcern ); } + @Override + public Mono> execute(final AsyncWriteThenReadOperationCursor operation, final ReadConcern readConcern, + @Nullable final ClientSession session) { + isTrue("open", !mongoClient.getCluster().isClosed()); + notNull("operation", operation); + notNull("readConcern", readConcern); + + return Mono.from(subscriber -> + clientSessionHelper.withClientSession(session, this) + .flatMap(actualClientSession -> { + AsyncReadWriteBinding binding = getReadWriteBinding(primary(), actualClientSession, session == null); + RequestContext requestContext = getContext(subscriber); + OperationContext operationContext = getOperationContext(requestContext, actualClientSession, readConcern, operation.getCommandName()) + .withSessionContext(new ClientSessionBinding.AsyncClientSessionContext(actualClientSession, + isImplicitSession(session), readConcern)); + Span span = tracingManager.createOperationSpan(actualClientSession.getTransactionSpan(), + operationContext, operation.getCommandName(), operation.getNamespace()); + + return Mono.>create(sink -> operation.executeAsync(binding, operationContext, (result, t) -> { + try { + binding.release(); + } finally { + if (t != null) { + Throwable exceptionToHandle = t instanceof MongoException + ? OperationHelper.unwrap((MongoException) t) : t; + labelException(session, exceptionToHandle); + unpinServerAddressOnTransientTransactionError(session, exceptionToHandle); + if (span != null) { + span.error(t); + } + } + if (span != null) { + span.end(); + } + sinkToCallback(sink).onResult(result, t); + } + })); + }).subscribe(subscriber) + ); + } + @Override public OperationExecutor withTimeoutSettings(final TimeoutSettings newTimeoutSettings) { if (Objects.equals(timeoutSettings, newTimeoutSettings)) { diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidWriteOperationThenCursorReadOperation.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidWriteOperationThenCursorReadOperation.java index a7d1191c8bf..fb253dafa88 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidWriteOperationThenCursorReadOperation.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidWriteOperationThenCursorReadOperation.java @@ -19,13 +19,13 @@ import com.mongodb.MongoNamespace; import com.mongodb.internal.async.AsyncBatchCursor; import com.mongodb.internal.async.SingleResultCallback; -import com.mongodb.internal.binding.AsyncReadBinding; -import com.mongodb.internal.binding.AsyncWriteBinding; +import com.mongodb.internal.binding.AsyncReadWriteBinding; import com.mongodb.internal.connection.OperationContext; +import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor; import com.mongodb.internal.operation.ReadOperationCursor; import com.mongodb.internal.operation.WriteOperation; -class VoidWriteOperationThenCursorReadOperation implements ReadOperationCursorAsyncOnly { +class VoidWriteOperationThenCursorReadOperation implements AsyncWriteThenReadOperationCursor { private final WriteOperation writeOperation; private final ReadOperationCursor cursorReadOperation; @@ -46,8 +46,9 @@ public MongoNamespace getNamespace() { } @Override - public void executeAsync(final AsyncReadBinding binding, final OperationContext operationContext, final SingleResultCallback> callback) { - writeOperation.executeAsync((AsyncWriteBinding) binding, operationContext, (result, t) -> { + public void executeAsync(final AsyncReadWriteBinding binding, final OperationContext operationContext, + final SingleResultCallback> callback) { + writeOperation.executeAsync(binding, operationContext, (result, t) -> { if (t != null) { callback.onResult(null, t); } else { diff --git a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MapReducePublisherImplTest.java b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MapReducePublisherImplTest.java index c112395a818..953e5e0fc13 100644 --- a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MapReducePublisherImplTest.java +++ b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MapReducePublisherImplTest.java @@ -23,6 +23,10 @@ import com.mongodb.internal.operation.MapReduceStatistics; import com.mongodb.internal.operation.MapReduceToCollectionOperation; import com.mongodb.internal.operation.MapReduceWithInlineResultsOperation; + +import com.mongodb.reactivestreams.client.MapReducePublisher; + + import org.bson.BsonDocument; import org.bson.BsonInt32; import org.bson.BsonJavaScript; @@ -39,6 +43,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; @SuppressWarnings({"rawtypes", "deprecation"}) @@ -48,6 +53,23 @@ public class MapReducePublisherImplTest extends TestHelper { private static final String REDUCE_FUNCTION = "reduceFunction(){}"; private static final String FINALIZE_FUNCTION = "finalizeFunction(){}"; + @DisplayName("Inline MapReduce still routes through the read-operation path") + @Test + void shouldRouteInlineMapReduceThroughReadOperationPath() { + configureBatchCursor(); + TestOperationExecutor executor = createOperationExecutor(asList(getBatchCursor())); + + MapReducePublisher publisher = + new MapReducePublisherImpl<>(null, createMongoOperationPublisher(executor), + MAP_FUNCTION, REDUCE_FUNCTION); // no collectionName -> inline + + Flux.from(publisher).blockFirst(); + + assertNotNull(executor.getReadOperation()); + assertNull(executor.getWriteThenReadOperation()); + } + + @DisplayName("Should build the expected MapReduceWithInlineResultsOperation") @Test void shouldBuildTheExpectedMapReduceWithInlineResultsOperation() { diff --git a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestHelper.java b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestHelper.java index 450536df2b8..685b6361d04 100644 --- a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestHelper.java +++ b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestHelper.java @@ -26,6 +26,7 @@ import com.mongodb.internal.bulk.IndexRequest; import com.mongodb.internal.bulk.WriteRequest; import com.mongodb.internal.client.model.FindOptions; +import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor; import com.mongodb.internal.operation.ReadOperation; import com.mongodb.internal.operation.WriteOperation; import com.mongodb.lang.NonNull; @@ -93,7 +94,10 @@ public class TestHelper { Mockito.lenient().doAnswer(invocation -> Mono.empty()) .when(executor) - .execute(any(), any(), any()); + .execute(any(WriteOperation.class), any(), any()); + Mockito.lenient().doAnswer(invocation -> Mono.empty()) + .when(executor) + .execute(any(AsyncWriteThenReadOperationCursor.class), any(), any()); Mockito.lenient().doAnswer(invocation -> Mono.empty()) .when(executor) .execute(any(), any(), any(), any()); diff --git a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestOperationExecutor.java b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestOperationExecutor.java index 831d22b3080..731eb251374 100644 --- a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestOperationExecutor.java +++ b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestOperationExecutor.java @@ -19,6 +19,8 @@ import com.mongodb.ReadConcern; import com.mongodb.ReadPreference; import com.mongodb.internal.TimeoutSettings; +import com.mongodb.internal.async.AsyncBatchCursor; +import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor; import com.mongodb.internal.operation.ReadOperation; import com.mongodb.internal.operation.WriteOperation; import com.mongodb.lang.Nullable; @@ -37,6 +39,7 @@ public class TestOperationExecutor implements OperationExecutor { private final List readOperations = new ArrayList<>(); private final List writeOperations = new ArrayList<>(); + private final List writeThenReadOperations = new ArrayList<>(); public TestOperationExecutor(final List responses) { this.responses = new ArrayList<>(responses); @@ -60,6 +63,14 @@ public Mono execute(final WriteOperation operation, final ReadConcern return createMono(); } + @Override + public Mono> execute(final AsyncWriteThenReadOperationCursor operation, final ReadConcern readConcern, + @Nullable final ClientSession session) { + clientSessions.add(session); + writeThenReadOperations.add(operation); + return createMono(); + } + @Override public OperationExecutor withTimeoutSettings(final TimeoutSettings timeoutSettings) { return this; @@ -106,4 +117,9 @@ WriteOperation getWriteOperation() { return writeOperations.isEmpty() ? null : writeOperations.remove(0); } + @Nullable + AsyncWriteThenReadOperationCursor getWriteThenReadOperation() { + return writeThenReadOperations.isEmpty() ? null : writeThenReadOperations.remove(0); + } + } From 71e84a5c5aa3be9da5775aa76ed4acf62fb0aa74 Mon Sep 17 00:00:00 2001 From: Almas Abdrazak Date: Wed, 29 Apr 2026 09:54:54 -0700 Subject: [PATCH 2/5] remove unused import --- .../reactivestreams/client/internal/MapReducePublisherImpl.java | 1 - 1 file changed, 1 deletion(-) diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java index 4ac7f503254..86a2239f759 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java @@ -28,7 +28,6 @@ import com.mongodb.internal.binding.WriteBinding; import com.mongodb.internal.client.model.FindOptions; import com.mongodb.internal.connection.OperationContext; -import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor; import com.mongodb.internal.operation.MapReduceAsyncBatchCursor; import com.mongodb.internal.operation.MapReduceBatchCursor; import com.mongodb.internal.operation.MapReduceStatistics; From 2f1a51725549f5d031d5136f161c42445dc5b5d1 Mon Sep 17 00:00:00 2001 From: Almas Abdrazak Date: Wed, 29 Apr 2026 12:13:25 -0700 Subject: [PATCH 3/5] address copilot feedback --- .../internal/ClientSessionPublisherImpl.java | 3 ++- .../internal/OperationExecutorImpl.java | 4 ++++ .../internal/MapReducePublisherImplTest.java | 23 ++++++++++++++++++- 3 files changed, 28 insertions(+), 2 deletions(-) diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java index 511f9f62c6b..87b90781150 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java @@ -27,6 +27,7 @@ import com.mongodb.internal.observability.micrometer.TracingManager; import com.mongodb.internal.observability.micrometer.TransactionSpan; import com.mongodb.internal.operation.AbortTransactionOperation; +import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor; import com.mongodb.internal.operation.CommitTransactionOperation; import com.mongodb.internal.operation.ReadOperation; import com.mongodb.internal.operation.WriteConcernHelper; @@ -88,7 +89,7 @@ public boolean notifyMessageSent() { @Override public void notifyOperationInitiated(final Object operation) { - assertTrue(operation instanceof ReadOperation || operation instanceof WriteOperation); + assertTrue(operation instanceof ReadOperation || operation instanceof WriteOperation || operation instanceof AsyncWriteThenReadOperationCursor); if (!(hasActiveTransaction() || operation instanceof CommitTransactionOperation)) { assertTrue(getPinnedServerAddress() == null || (transactionState != TransactionState.ABORTED && transactionState != TransactionState.NONE)); diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java index a29f15ad55b..9a7b0e2c592 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java @@ -188,6 +188,10 @@ public Mono> execute(final AsyncWriteThenReadOperationCu notNull("operation", operation); notNull("readConcern", readConcern); + if (session != null) { + session.notifyOperationInitiated(operation); + } + return Mono.from(subscriber -> clientSessionHelper.withClientSession(session, this) .flatMap(actualClientSession -> { diff --git a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MapReducePublisherImplTest.java b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MapReducePublisherImplTest.java index 953e5e0fc13..36c8ac93705 100644 --- a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MapReducePublisherImplTest.java +++ b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MapReducePublisherImplTest.java @@ -20,10 +20,10 @@ import com.mongodb.ReadPreference; import com.mongodb.WriteConcern; import com.mongodb.client.model.Sorts; +import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor; import com.mongodb.internal.operation.MapReduceStatistics; import com.mongodb.internal.operation.MapReduceToCollectionOperation; import com.mongodb.internal.operation.MapReduceWithInlineResultsOperation; - import com.mongodb.reactivestreams.client.MapReducePublisher; @@ -53,6 +53,27 @@ public class MapReducePublisherImplTest extends TestHelper { private static final String REDUCE_FUNCTION = "reduceFunction(){}"; private static final String FINALIZE_FUNCTION = "finalizeFunction(){}"; + @DisplayName("Non-inline MapReduce routes through the write-then-read executor path") + @Test + void shouldRouteNonInlineMapReduceThroughWriteThenReadPath() { + configureBatchCursor(); + TestOperationExecutor executor = createOperationExecutor(asList(getBatchCursor())); + + MapReducePublisher publisher = + new MapReducePublisherImpl<>(null, createMongoOperationPublisher(executor), + MAP_FUNCTION, REDUCE_FUNCTION) + .collectionName(NAMESPACE.getCollectionName()); + + Flux.from(publisher).blockFirst(); + + AsyncWriteThenReadOperationCursor op = executor.getWriteThenReadOperation(); + assertNotNull(op, "expected a write-then-read operation"); + assertEquals(NAMESPACE, op.getNamespace()); + // Must not fall through to the plain read-operation path. + assertNull(executor.getReadOperation()); + } + + @DisplayName("Inline MapReduce still routes through the read-operation path") @Test void shouldRouteInlineMapReduceThroughReadOperationPath() { From fa700ae908a8444fb4e80d84868a3fa5c165f480 Mon Sep 17 00:00:00 2001 From: Almas Abdrazak Date: Wed, 29 Apr 2026 16:56:23 -0700 Subject: [PATCH 4/5] update the docs --- .../operation/AsyncWriteThenReadOperationCursor.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncWriteThenReadOperationCursor.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncWriteThenReadOperationCursor.java index 9c9505f42bd..0a48079bf5f 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AsyncWriteThenReadOperationCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncWriteThenReadOperationCursor.java @@ -33,21 +33,22 @@ public interface AsyncWriteThenReadOperationCursor { /** - * @return the command name of the operation, e.g. "insert", "update", "delete", "bulkWrite", etc. + * @return the command name of the write phase of this operation (e.g. "mapReduce", "aggregate") */ String getCommandName(); /** - * @return the namespace of the operation + * @return the namespace the write phase targets */ MongoNamespace getNamespace(); /** - * General execute which can return anything of type T + * Executes the write phase followed by the read phase, yielding an {@link AsyncBatchCursor} + * over the results of the read. * - * @param binding the binding to execute in the context of + * @param binding the read-write binding used by both phases * @param operationContext the operation context to use - * @param callback the callback to be called when the operation has been executed + * @param callback receives the {@link AsyncBatchCursor} on success, or the failure of either phase */ void executeAsync(AsyncReadWriteBinding binding, OperationContext operationContext, SingleResultCallback> callback); From f164af58d3c8819ee5f75dd5a0ce40d5305134b8 Mon Sep 17 00:00:00 2001 From: Almas Abdrazak Date: Mon, 4 May 2026 21:50:04 -0700 Subject: [PATCH 5/5] address Ross comments --- ...WriteOperationThenCursorReadOperation.java | 33 +++++++-- ...java => WriteThenReadOperationCursor.java} | 21 +++++- .../internal/ClientSessionPublisherImpl.java | 5 +- .../internal/MapReducePublisherImpl.java | 1 + .../internal/MongoOperationPublisher.java | 6 +- .../client/internal/OperationExecutor.java | 4 +- .../internal/OperationExecutorImpl.java | 4 +- .../internal/MapReducePublisherImplTest.java | 4 +- .../client/internal/TestHelper.java | 4 +- .../internal/TestOperationExecutor.java | 8 +- .../internal/ChangeStreamIterableImpl.java | 6 +- .../client/internal/ClientSessionImpl.java | 4 +- .../internal/MapReduceIterableImpl.java | 74 +++++++++++++++---- .../client/internal/MongoClusterImpl.java | 34 +++++++++ .../client/internal/MongoIterableImpl.java | 2 +- .../client/internal/OperationExecutor.java | 16 ++++ .../internal/TestOperationExecutor.java | 17 +++++ 17 files changed, 196 insertions(+), 47 deletions(-) rename {driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal => driver-core/src/main/com/mongodb/internal/operation}/VoidWriteOperationThenCursorReadOperation.java (60%) rename driver-core/src/main/com/mongodb/internal/operation/{AsyncWriteThenReadOperationCursor.java => WriteThenReadOperationCursor.java} (66%) diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidWriteOperationThenCursorReadOperation.java b/driver-core/src/main/com/mongodb/internal/operation/VoidWriteOperationThenCursorReadOperation.java similarity index 60% rename from driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidWriteOperationThenCursorReadOperation.java rename to driver-core/src/main/com/mongodb/internal/operation/VoidWriteOperationThenCursorReadOperation.java index fb253dafa88..b6311f00840 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidWriteOperationThenCursorReadOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/VoidWriteOperationThenCursorReadOperation.java @@ -14,27 +14,40 @@ * limitations under the License. */ -package com.mongodb.reactivestreams.client.internal; +package com.mongodb.internal.operation; import com.mongodb.MongoNamespace; import com.mongodb.internal.async.AsyncBatchCursor; import com.mongodb.internal.async.SingleResultCallback; import com.mongodb.internal.binding.AsyncReadWriteBinding; +import com.mongodb.internal.binding.ReadWriteBinding; import com.mongodb.internal.connection.OperationContext; -import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor; -import com.mongodb.internal.operation.ReadOperationCursor; -import com.mongodb.internal.operation.WriteOperation; -class VoidWriteOperationThenCursorReadOperation implements AsyncWriteThenReadOperationCursor { +/** + * A {@link WriteThenReadOperationCursor} that performs a {@link WriteOperation} returning + * {@code Void} followed by a {@link ReadOperationCursor}, using a single read-write + * binding for both phases. + * + *

This class is not part of the public API and may be removed or changed at any time

+ */ +public final class VoidWriteOperationThenCursorReadOperation implements WriteThenReadOperationCursor { private final WriteOperation writeOperation; private final ReadOperationCursor cursorReadOperation; - VoidWriteOperationThenCursorReadOperation(final WriteOperation writeOperation, - final ReadOperationCursor cursorReadOperation) { + public VoidWriteOperationThenCursorReadOperation(final WriteOperation writeOperation, + final ReadOperationCursor cursorReadOperation) { this.writeOperation = writeOperation; this.cursorReadOperation = cursorReadOperation; } + public WriteOperation getWriteOperation() { + return writeOperation; + } + + public ReadOperationCursor getCursorReadOperation() { + return cursorReadOperation; + } + @Override public String getCommandName() { return writeOperation.getCommandName(); @@ -45,6 +58,12 @@ public MongoNamespace getNamespace() { return writeOperation.getNamespace(); } + @Override + public BatchCursor execute(final ReadWriteBinding binding, final OperationContext operationContext) { + writeOperation.execute(binding, operationContext); + return cursorReadOperation.execute(binding, operationContext); + } + @Override public void executeAsync(final AsyncReadWriteBinding binding, final OperationContext operationContext, final SingleResultCallback> callback) { diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncWriteThenReadOperationCursor.java b/driver-core/src/main/com/mongodb/internal/operation/WriteThenReadOperationCursor.java similarity index 66% rename from driver-core/src/main/com/mongodb/internal/operation/AsyncWriteThenReadOperationCursor.java rename to driver-core/src/main/com/mongodb/internal/operation/WriteThenReadOperationCursor.java index 0a48079bf5f..964cfb8847d 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AsyncWriteThenReadOperationCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/WriteThenReadOperationCursor.java @@ -20,17 +20,20 @@ import com.mongodb.internal.async.AsyncBatchCursor; import com.mongodb.internal.async.SingleResultCallback; import com.mongodb.internal.binding.AsyncReadWriteBinding; +import com.mongodb.internal.binding.ReadWriteBinding; import com.mongodb.internal.connection.OperationContext; /** - * An async-only operation that performs a write followed by a read that returns a cursor. + * An operation that performs a write followed by a read that returns a cursor, using a + * single read-write binding for both phases. * - *

Unlike {@link ReadOperationCursor}, this operation requires an {@link AsyncReadWriteBinding} - * so that both the write and the read portions can be executed without narrowing casts. + *

Having a dedicated interface lets the executor hand the operation a binding that is + * both a {@link ReadWriteBinding} and an {@link AsyncReadWriteBinding}, avoiding the + * narrowing cast from a read-only binding that would otherwise be required. * *

This class is not part of the public API and may be removed or changed at any time

*/ -public interface AsyncWriteThenReadOperationCursor { +public interface WriteThenReadOperationCursor { /** * @return the command name of the write phase of this operation (e.g. "mapReduce", "aggregate") @@ -42,6 +45,16 @@ public interface AsyncWriteThenReadOperationCursor { */ MongoNamespace getNamespace(); + /** + * Executes the write phase followed by the read phase, returning a {@link BatchCursor} + * over the results of the read. + * + * @param binding the read-write binding used by both phases + * @param operationContext the operation context to use + * @return the batch cursor produced by the read phase + */ + BatchCursor execute(ReadWriteBinding binding, OperationContext operationContext); + /** * Executes the write phase followed by the read phase, yielding an {@link AsyncBatchCursor} * over the results of the read. diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java index 87b90781150..9fa6cd383d8 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java @@ -27,11 +27,11 @@ import com.mongodb.internal.observability.micrometer.TracingManager; import com.mongodb.internal.observability.micrometer.TransactionSpan; import com.mongodb.internal.operation.AbortTransactionOperation; -import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor; import com.mongodb.internal.operation.CommitTransactionOperation; import com.mongodb.internal.operation.ReadOperation; import com.mongodb.internal.operation.WriteConcernHelper; import com.mongodb.internal.operation.WriteOperation; +import com.mongodb.internal.operation.WriteThenReadOperationCursor; import com.mongodb.internal.session.BaseClientSessionImpl; import com.mongodb.internal.session.ServerSessionPool; import com.mongodb.lang.Nullable; @@ -89,7 +89,8 @@ public boolean notifyMessageSent() { @Override public void notifyOperationInitiated(final Object operation) { - assertTrue(operation instanceof ReadOperation || operation instanceof WriteOperation || operation instanceof AsyncWriteThenReadOperationCursor); + assertTrue(operation instanceof ReadOperation || operation instanceof WriteOperation + || operation instanceof WriteThenReadOperationCursor); if (!(hasActiveTransaction() || operation instanceof CommitTransactionOperation)) { assertTrue(getPinnedServerAddress() == null || (transactionState != TransactionState.ABORTED && transactionState != TransactionState.NONE)); diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java index 86a2239f759..5fdc154707e 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java @@ -34,6 +34,7 @@ import com.mongodb.internal.operation.Operations; import com.mongodb.internal.operation.ReadOperation; import com.mongodb.internal.operation.ReadOperationCursor; +import com.mongodb.internal.operation.VoidWriteOperationThenCursorReadOperation; import com.mongodb.internal.operation.WriteOperation; import com.mongodb.lang.Nullable; import com.mongodb.reactivestreams.client.ClientSession; diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java index 8d561859af4..e25074a07d2 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java @@ -61,7 +61,7 @@ import com.mongodb.internal.async.SingleResultCallback; import com.mongodb.internal.bulk.WriteRequest; import com.mongodb.internal.async.AsyncBatchCursor; -import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor; +import com.mongodb.internal.operation.WriteThenReadOperationCursor; import com.mongodb.internal.operation.IndexHelper; import com.mongodb.internal.operation.Operations; import com.mongodb.internal.operation.ReadOperation; @@ -517,9 +517,9 @@ Mono createReadOperationMono(final Supplier timeoutSetti Mono> createWriteThenReadOperationMono( final Function, TimeoutSettings> timeoutSettingsFunction, - final Supplier> operationSupplier, + final Supplier> operationSupplier, @Nullable final ClientSession clientSession) { - AsyncWriteThenReadOperationCursor operation = operationSupplier.get(); + WriteThenReadOperationCursor operation = operationSupplier.get(); return getExecutor(timeoutSettingsFunction.apply(operations)) .execute(operation, getReadConcern(), clientSession); } diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutor.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutor.java index 7c61ecf547e..f8f1d1b9a2e 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutor.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutor.java @@ -20,9 +20,9 @@ import com.mongodb.ReadPreference; import com.mongodb.internal.TimeoutSettings; import com.mongodb.internal.async.AsyncBatchCursor; -import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor; import com.mongodb.internal.operation.ReadOperation; import com.mongodb.internal.operation.WriteOperation; +import com.mongodb.internal.operation.WriteThenReadOperationCursor; import com.mongodb.lang.Nullable; import com.mongodb.reactivestreams.client.ClientSession; import reactor.core.publisher.Mono; @@ -67,7 +67,7 @@ Mono execute(ReadOperation operation, ReadPreference readPreference * @param session the session to associate this operation with * @param the document type returned by the cursor. */ - Mono> execute(AsyncWriteThenReadOperationCursor operation, + Mono> execute(WriteThenReadOperationCursor operation, ReadConcern readConcern, @Nullable ClientSession session); /** diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java index 9a7b0e2c592..b9ce24b5898 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java @@ -34,10 +34,10 @@ import com.mongodb.internal.observability.micrometer.Span; import com.mongodb.internal.observability.micrometer.TracingManager; import com.mongodb.internal.async.AsyncBatchCursor; -import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor; import com.mongodb.internal.operation.OperationHelper; import com.mongodb.internal.operation.ReadOperation; import com.mongodb.internal.operation.WriteOperation; +import com.mongodb.internal.operation.WriteThenReadOperationCursor; import com.mongodb.lang.Nullable; import com.mongodb.reactivestreams.client.ClientSession; import com.mongodb.reactivestreams.client.ReactiveContextProvider; @@ -182,7 +182,7 @@ public Mono execute(final WriteOperation operation, final ReadConcern } @Override - public Mono> execute(final AsyncWriteThenReadOperationCursor operation, final ReadConcern readConcern, + public Mono> execute(final WriteThenReadOperationCursor operation, final ReadConcern readConcern, @Nullable final ClientSession session) { isTrue("open", !mongoClient.getCluster().isClosed()); notNull("operation", operation); diff --git a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MapReducePublisherImplTest.java b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MapReducePublisherImplTest.java index 36c8ac93705..98c84eb7f8b 100644 --- a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MapReducePublisherImplTest.java +++ b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MapReducePublisherImplTest.java @@ -20,7 +20,7 @@ import com.mongodb.ReadPreference; import com.mongodb.WriteConcern; import com.mongodb.client.model.Sorts; -import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor; +import com.mongodb.internal.operation.WriteThenReadOperationCursor; import com.mongodb.internal.operation.MapReduceStatistics; import com.mongodb.internal.operation.MapReduceToCollectionOperation; import com.mongodb.internal.operation.MapReduceWithInlineResultsOperation; @@ -66,7 +66,7 @@ void shouldRouteNonInlineMapReduceThroughWriteThenReadPath() { Flux.from(publisher).blockFirst(); - AsyncWriteThenReadOperationCursor op = executor.getWriteThenReadOperation(); + WriteThenReadOperationCursor op = executor.getWriteThenReadOperation(); assertNotNull(op, "expected a write-then-read operation"); assertEquals(NAMESPACE, op.getNamespace()); // Must not fall through to the plain read-operation path. diff --git a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestHelper.java b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestHelper.java index 685b6361d04..a340b2977c4 100644 --- a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestHelper.java +++ b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestHelper.java @@ -26,7 +26,7 @@ import com.mongodb.internal.bulk.IndexRequest; import com.mongodb.internal.bulk.WriteRequest; import com.mongodb.internal.client.model.FindOptions; -import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor; +import com.mongodb.internal.operation.WriteThenReadOperationCursor; import com.mongodb.internal.operation.ReadOperation; import com.mongodb.internal.operation.WriteOperation; import com.mongodb.lang.NonNull; @@ -97,7 +97,7 @@ public class TestHelper { .execute(any(WriteOperation.class), any(), any()); Mockito.lenient().doAnswer(invocation -> Mono.empty()) .when(executor) - .execute(any(AsyncWriteThenReadOperationCursor.class), any(), any()); + .execute(any(WriteThenReadOperationCursor.class), any(), any()); Mockito.lenient().doAnswer(invocation -> Mono.empty()) .when(executor) .execute(any(), any(), any(), any()); diff --git a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestOperationExecutor.java b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestOperationExecutor.java index 731eb251374..df2314a52c9 100644 --- a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestOperationExecutor.java +++ b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestOperationExecutor.java @@ -20,7 +20,7 @@ import com.mongodb.ReadPreference; import com.mongodb.internal.TimeoutSettings; import com.mongodb.internal.async.AsyncBatchCursor; -import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor; +import com.mongodb.internal.operation.WriteThenReadOperationCursor; import com.mongodb.internal.operation.ReadOperation; import com.mongodb.internal.operation.WriteOperation; import com.mongodb.lang.Nullable; @@ -39,7 +39,7 @@ public class TestOperationExecutor implements OperationExecutor { private final List readOperations = new ArrayList<>(); private final List writeOperations = new ArrayList<>(); - private final List writeThenReadOperations = new ArrayList<>(); + private final List writeThenReadOperations = new ArrayList<>(); public TestOperationExecutor(final List responses) { this.responses = new ArrayList<>(responses); @@ -64,7 +64,7 @@ public Mono execute(final WriteOperation operation, final ReadConcern } @Override - public Mono> execute(final AsyncWriteThenReadOperationCursor operation, final ReadConcern readConcern, + public Mono> execute(final WriteThenReadOperationCursor operation, final ReadConcern readConcern, @Nullable final ClientSession session) { clientSessions.add(session); writeThenReadOperations.add(operation); @@ -118,7 +118,7 @@ WriteOperation getWriteOperation() { } @Nullable - AsyncWriteThenReadOperationCursor getWriteThenReadOperation() { + WriteThenReadOperationCursor getWriteThenReadOperation() { return writeThenReadOperations.isEmpty() ? null : writeThenReadOperations.remove(0); } diff --git a/driver-sync/src/main/com/mongodb/client/internal/ChangeStreamIterableImpl.java b/driver-sync/src/main/com/mongodb/client/internal/ChangeStreamIterableImpl.java index b5b41b375f5..c3c12d086d4 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/ChangeStreamIterableImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/ChangeStreamIterableImpl.java @@ -136,7 +136,7 @@ public MongoCursor iterator() { @Override public MongoChangeStreamCursor cursor() { - return new MongoChangeStreamCursorImpl<>(execute(), codecRegistry.get(clazz), initialResumeToken()); + return new MongoChangeStreamCursorImpl<>(executeChangeStream(), codecRegistry.get(clazz), initialResumeToken()); } @Override @@ -190,7 +190,7 @@ public MongoCursor> iterator() { @Override public MongoChangeStreamCursor> cursor() { - return new MongoChangeStreamCursorImpl<>(execute(), codec, initialResumeToken()); + return new MongoChangeStreamCursorImpl<>(executeChangeStream(), codec, initialResumeToken()); } @Nullable @@ -219,7 +219,7 @@ private ReadOperationCursor createChangeStreamOperation() { getBatchSize(), collation, comment, resumeToken, startAtOperationTime, startAfter, showExpandedEvents); } - private BatchCursor execute() { + private BatchCursor executeChangeStream() { return getExecutor().execute(createChangeStreamOperation(), getReadPreference(), getReadConcern(), getClientSession()); } diff --git a/driver-sync/src/main/com/mongodb/client/internal/ClientSessionImpl.java b/driver-sync/src/main/com/mongodb/client/internal/ClientSessionImpl.java index aa1414dce5d..f6245e875f1 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/ClientSessionImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/ClientSessionImpl.java @@ -34,6 +34,7 @@ import com.mongodb.internal.operation.ReadOperation; import com.mongodb.internal.operation.WriteConcernHelper; import com.mongodb.internal.operation.WriteOperation; +import com.mongodb.internal.operation.WriteThenReadOperationCursor; import com.mongodb.internal.session.BaseClientSessionImpl; import com.mongodb.internal.session.ServerSessionPool; import com.mongodb.internal.observability.micrometer.TracingManager; @@ -88,7 +89,8 @@ public boolean notifyMessageSent() { @Override public void notifyOperationInitiated(final Object operation) { - assertTrue(operation instanceof ReadOperation || operation instanceof WriteOperation); + assertTrue(operation instanceof ReadOperation || operation instanceof WriteOperation + || operation instanceof WriteThenReadOperationCursor); if (!(hasActiveTransaction() || operation instanceof CommitTransactionOperation)) { assertTrue(getPinnedServerAddress() == null || (transactionState != TransactionState.ABORTED && transactionState != TransactionState.NONE)); diff --git a/driver-sync/src/main/com/mongodb/client/internal/MapReduceIterableImpl.java b/driver-sync/src/main/com/mongodb/client/internal/MapReduceIterableImpl.java index fd79242766e..722751b8b5e 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/MapReduceIterableImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/MapReduceIterableImpl.java @@ -27,7 +27,9 @@ import com.mongodb.internal.async.AsyncBatchCursor; import com.mongodb.internal.async.SingleResultCallback; import com.mongodb.internal.binding.AsyncReadBinding; +import com.mongodb.internal.binding.AsyncWriteBinding; import com.mongodb.internal.binding.ReadBinding; +import com.mongodb.internal.binding.WriteBinding; import com.mongodb.internal.client.model.FindOptions; import com.mongodb.internal.connection.OperationContext; import com.mongodb.internal.operation.BatchCursor; @@ -35,6 +37,7 @@ import com.mongodb.internal.operation.Operations; import com.mongodb.internal.operation.ReadOperationCursor; import com.mongodb.internal.operation.ReadOperationMapReduceCursor; +import com.mongodb.internal.operation.VoidWriteOperationThenCursorReadOperation; import com.mongodb.internal.operation.WriteOperation; import com.mongodb.lang.Nullable; import org.bson.BsonDocument; @@ -203,25 +206,35 @@ public ReadOperationCursor asReadOperation() { ReadOperationMapReduceCursor operation = operations.mapReduce(mapFunction, reduceFunction, finalizeFunction, resultClass, filter, limit, jsMode, scope, sort, verbose, collation); return new WrappedMapReduceReadOperation<>(operation); - } else { - getExecutor().execute(createMapReduceToCollectionOperation(), getReadConcern(), getClientSession()); - - String dbName = databaseName != null ? databaseName : namespace.getDatabaseName(); + } + throw new IllegalStateException("Non-inline map-reduce uses the write-then-read path; " + + "asReadOperation must not be called."); + } - FindOptions findOptions = new FindOptions().collation(collation); - Integer batchSize = getBatchSize(); - if (batchSize != null) { - findOptions.batchSize(batchSize); - } - return operations.find(new MongoNamespace(dbName, collectionName), new BsonDocument(), resultClass, findOptions); + @Override + BatchCursor execute() { + if (inline) { + return super.execute(); } + return getExecutor().execute( + new VoidWriteOperationThenCursorReadOperation<>(createMapReduceToCollectionOperation(), createFindOperation()), + getReadConcern(), getClientSession()); + } + private WriteOperation createMapReduceToCollectionOperation() { + return new WrappedMapReduceWriteOperation( + operations.mapReduceToCollection(databaseName, collectionName, mapFunction, reduceFunction, finalizeFunction, filter, + limit, jsMode, scope, sort, verbose, action, bypassDocumentValidation, collation)); } - private WriteOperation createMapReduceToCollectionOperation() { - return operations.mapReduceToCollection(databaseName, collectionName, mapFunction, reduceFunction, finalizeFunction, filter, - limit, jsMode, scope, sort, verbose, action, bypassDocumentValidation, collation - ); + private ReadOperationCursor createFindOperation() { + String dbName = databaseName != null ? databaseName : namespace.getDatabaseName(); + FindOptions findOptions = new FindOptions().collation(collation); + Integer batchSize = getBatchSize(); + if (batchSize != null) { + findOptions.batchSize(batchSize); + } + return operations.find(new MongoNamespace(dbName, collectionName), new BsonDocument(), resultClass, findOptions); } // this could be inlined, but giving it a name so that it's unit-testable @@ -256,4 +269,37 @@ public void executeAsync(final AsyncReadBinding binding, final OperationContext throw new UnsupportedOperationException("This operation is sync only"); } } + + static class WrappedMapReduceWriteOperation implements WriteOperation { + private final WriteOperation operation; + + WrappedMapReduceWriteOperation(final WriteOperation operation) { + this.operation = operation; + } + + WriteOperation getOperation() { + return operation; + } + + @Override + public String getCommandName() { + return operation.getCommandName(); + } + + @Override + public MongoNamespace getNamespace() { + return operation.getNamespace(); + } + + @Override + public Void execute(final WriteBinding binding, final OperationContext operationContext) { + operation.execute(binding, operationContext); + return null; + } + + @Override + public void executeAsync(final AsyncWriteBinding binding, final OperationContext operationContext, final SingleResultCallback callback) { + throw new UnsupportedOperationException("This operation is sync only"); + } + } } diff --git a/driver-sync/src/main/com/mongodb/client/internal/MongoClusterImpl.java b/driver-sync/src/main/com/mongodb/client/internal/MongoClusterImpl.java index eb36678761a..dcb8d5e2989 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/MongoClusterImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/MongoClusterImpl.java @@ -54,10 +54,12 @@ import com.mongodb.internal.connection.ReadConcernAwareNoOpSessionContext; import com.mongodb.internal.observability.micrometer.Span; import com.mongodb.internal.observability.micrometer.TracingManager; +import com.mongodb.internal.operation.BatchCursor; import com.mongodb.internal.operation.OperationHelper; import com.mongodb.internal.operation.Operations; import com.mongodb.internal.operation.ReadOperation; import com.mongodb.internal.operation.WriteOperation; +import com.mongodb.internal.operation.WriteThenReadOperationCursor; import com.mongodb.internal.session.ServerSessionPool; import com.mongodb.lang.Nullable; import org.bson.BsonDocument; @@ -482,6 +484,38 @@ public T execute(final WriteOperation operation, final ReadConcern readCo } } + @Override + public BatchCursor execute(final WriteThenReadOperationCursor operation, final ReadConcern readConcern, + @Nullable final ClientSession session) { + if (session != null) { + session.notifyOperationInitiated(operation); + } + + ClientSession actualClientSession = getClientSession(session); + OperationContext operationContext = getOperationContext(actualClientSession, readConcern, operation.getCommandName()) + .withSessionContext(new ClientSessionBinding.SyncClientSessionContext(actualClientSession, readConcern, isImplicitSession(session))); + Span span = operationContext.getTracingManager().createOperationSpan( + actualClientSession.getTransactionSpan(), operationContext, operation.getCommandName(), operation.getNamespace()); + ReadWriteBinding binding = getReadWriteBinding(primary(), actualClientSession, isImplicitSession(session)); + + try { + return operation.execute(binding, operationContext); + } catch (MongoException e) { + MongoException exceptionToHandle = OperationHelper.unwrap(e); + labelException(actualClientSession, exceptionToHandle); + clearTransactionContextOnTransientTransactionError(session, exceptionToHandle); + if (span != null) { + span.error(e); + } + throw e; + } finally { + binding.release(); + if (span != null) { + span.end(); + } + } + } + @Override public OperationExecutor withTimeoutSettings(final TimeoutSettings newTimeoutSettings) { if (Objects.equals(executorTimeoutSettings, newTimeoutSettings)) { diff --git a/driver-sync/src/main/com/mongodb/client/internal/MongoIterableImpl.java b/driver-sync/src/main/com/mongodb/client/internal/MongoIterableImpl.java index b642f0f1189..d04691606d9 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/MongoIterableImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/MongoIterableImpl.java @@ -152,7 +152,7 @@ public > A into(final A target) { return target; } - private BatchCursor execute() { + BatchCursor execute() { return getExecutor().execute(asReadOperation(), readPreference, readConcern, clientSession); } diff --git a/driver-sync/src/main/com/mongodb/client/internal/OperationExecutor.java b/driver-sync/src/main/com/mongodb/client/internal/OperationExecutor.java index 1ec19483afc..b8bbc2f8a88 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/OperationExecutor.java +++ b/driver-sync/src/main/com/mongodb/client/internal/OperationExecutor.java @@ -20,8 +20,10 @@ import com.mongodb.ReadPreference; import com.mongodb.client.ClientSession; import com.mongodb.internal.TimeoutSettings; +import com.mongodb.internal.operation.BatchCursor; import com.mongodb.internal.operation.ReadOperation; import com.mongodb.internal.operation.WriteOperation; +import com.mongodb.internal.operation.WriteThenReadOperationCursor; import com.mongodb.lang.Nullable; /** @@ -75,6 +77,20 @@ public interface OperationExecutor { */ T execute(WriteOperation operation, ReadConcern readConcern, @Nullable ClientSession session); + /** + * Execute an operation that writes and then reads a cursor within a single read-write binding. + * + *

The binding is acquired once and used for both phases, avoiding the need to narrow a + * {@code ReadBinding} to a {@code WriteBinding}. + * + * @param the document type returned by the cursor. + * @param operation the write-then-read operation. + * @param readConcern the read concern + * @param session the session to associate this operation with + * @return the batch cursor produced by the read phase. + */ + BatchCursor execute(WriteThenReadOperationCursor operation, ReadConcern readConcern, @Nullable ClientSession session); + /** * Create a new OperationExecutor with a specific timeout settings * diff --git a/driver-sync/src/test/unit/com/mongodb/client/internal/TestOperationExecutor.java b/driver-sync/src/test/unit/com/mongodb/client/internal/TestOperationExecutor.java index adcfaa0f903..10b6c9e9bae 100644 --- a/driver-sync/src/test/unit/com/mongodb/client/internal/TestOperationExecutor.java +++ b/driver-sync/src/test/unit/com/mongodb/client/internal/TestOperationExecutor.java @@ -20,8 +20,10 @@ import com.mongodb.ReadPreference; import com.mongodb.client.ClientSession; import com.mongodb.internal.TimeoutSettings; +import com.mongodb.internal.operation.BatchCursor; import com.mongodb.internal.operation.ReadOperation; import com.mongodb.internal.operation.WriteOperation; +import com.mongodb.internal.operation.WriteThenReadOperationCursor; import com.mongodb.lang.Nullable; import java.util.ArrayList; @@ -36,6 +38,7 @@ public class TestOperationExecutor implements OperationExecutor { private final List readConcerns = new ArrayList<>(); private final List readOperations = new ArrayList<>(); private final List writeOperations = new ArrayList<>(); + private final List writeThenReadOperations = new ArrayList<>(); public TestOperationExecutor(final List responses) { this.responses = responses; @@ -69,6 +72,15 @@ public T execute(final WriteOperation operation, final ReadConcern readCo return getResponse(); } + @Override + public BatchCursor execute(final WriteThenReadOperationCursor operation, final ReadConcern readConcern, + @Nullable final ClientSession session) { + clientSessions.add(session); + writeThenReadOperations.add(operation); + readConcerns.add(readConcern); + return getResponse(); + } + @Override public OperationExecutor withTimeoutSettings(final TimeoutSettings timeoutSettings) { return this; @@ -111,4 +123,9 @@ public ReadConcern getReadConcern() { public WriteOperation getWriteOperation() { return writeOperations.isEmpty() ? null : writeOperations.remove(0); } + + @Nullable + public WriteThenReadOperationCursor getWriteThenReadOperation() { + return writeThenReadOperations.isEmpty() ? null : writeThenReadOperations.remove(0); + } }