diff --git a/driver-core/src/main/com/mongodb/internal/operation/VoidWriteOperationThenCursorReadOperation.java b/driver-core/src/main/com/mongodb/internal/operation/VoidWriteOperationThenCursorReadOperation.java new file mode 100644 index 0000000000..b6311f0084 --- /dev/null +++ b/driver-core/src/main/com/mongodb/internal/operation/VoidWriteOperationThenCursorReadOperation.java @@ -0,0 +1,78 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.operation; + +import com.mongodb.MongoNamespace; +import com.mongodb.internal.async.AsyncBatchCursor; +import com.mongodb.internal.async.SingleResultCallback; +import com.mongodb.internal.binding.AsyncReadWriteBinding; +import com.mongodb.internal.binding.ReadWriteBinding; +import com.mongodb.internal.connection.OperationContext; + +/** + * A {@link WriteThenReadOperationCursor} that performs a {@link WriteOperation} returning + * {@code Void} followed by a {@link ReadOperationCursor}, using a single read-write + * binding for both phases. + * + *

This class is not part of the public API and may be removed or changed at any time

+ */ +public final class VoidWriteOperationThenCursorReadOperation implements WriteThenReadOperationCursor { + private final WriteOperation writeOperation; + private final ReadOperationCursor cursorReadOperation; + + public VoidWriteOperationThenCursorReadOperation(final WriteOperation writeOperation, + final ReadOperationCursor cursorReadOperation) { + this.writeOperation = writeOperation; + this.cursorReadOperation = cursorReadOperation; + } + + public WriteOperation getWriteOperation() { + return writeOperation; + } + + public ReadOperationCursor getCursorReadOperation() { + return cursorReadOperation; + } + + @Override + public String getCommandName() { + return writeOperation.getCommandName(); + } + + @Override + public MongoNamespace getNamespace() { + return writeOperation.getNamespace(); + } + + @Override + public BatchCursor execute(final ReadWriteBinding binding, final OperationContext operationContext) { + writeOperation.execute(binding, operationContext); + return cursorReadOperation.execute(binding, operationContext); + } + + @Override + public void executeAsync(final AsyncReadWriteBinding binding, final OperationContext operationContext, + final SingleResultCallback> callback) { + writeOperation.executeAsync(binding, operationContext, (result, t) -> { + if (t != null) { + callback.onResult(null, t); + } else { + cursorReadOperation.executeAsync(binding, operationContext, callback); + } + }); + } +} diff --git a/driver-core/src/main/com/mongodb/internal/operation/WriteThenReadOperationCursor.java b/driver-core/src/main/com/mongodb/internal/operation/WriteThenReadOperationCursor.java new file mode 100644 index 0000000000..964cfb8847 --- /dev/null +++ b/driver-core/src/main/com/mongodb/internal/operation/WriteThenReadOperationCursor.java @@ -0,0 +1,68 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.operation; + +import com.mongodb.MongoNamespace; +import com.mongodb.internal.async.AsyncBatchCursor; +import com.mongodb.internal.async.SingleResultCallback; +import com.mongodb.internal.binding.AsyncReadWriteBinding; +import com.mongodb.internal.binding.ReadWriteBinding; +import com.mongodb.internal.connection.OperationContext; + +/** + * An operation that performs a write followed by a read that returns a cursor, using a + * single read-write binding for both phases. + * + *

Having a dedicated interface lets the executor hand the operation a binding that is + * both a {@link ReadWriteBinding} and an {@link AsyncReadWriteBinding}, avoiding the + * narrowing cast from a read-only binding that would otherwise be required. + * + *

This class is not part of the public API and may be removed or changed at any time

+ */ +public interface WriteThenReadOperationCursor { + + /** + * @return the command name of the write phase of this operation (e.g. "mapReduce", "aggregate") + */ + String getCommandName(); + + /** + * @return the namespace the write phase targets + */ + MongoNamespace getNamespace(); + + /** + * Executes the write phase followed by the read phase, returning a {@link BatchCursor} + * over the results of the read. + * + * @param binding the read-write binding used by both phases + * @param operationContext the operation context to use + * @return the batch cursor produced by the read phase + */ + BatchCursor execute(ReadWriteBinding binding, OperationContext operationContext); + + /** + * Executes the write phase followed by the read phase, yielding an {@link AsyncBatchCursor} + * over the results of the read. + * + * @param binding the read-write binding used by both phases + * @param operationContext the operation context to use + * @param callback receives the {@link AsyncBatchCursor} on success, or the failure of either phase + */ + void executeAsync(AsyncReadWriteBinding binding, OperationContext operationContext, + SingleResultCallback> callback); +} diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java index 511f9f62c6..9fa6cd383d 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java @@ -31,6 +31,7 @@ import com.mongodb.internal.operation.ReadOperation; import com.mongodb.internal.operation.WriteConcernHelper; import com.mongodb.internal.operation.WriteOperation; +import com.mongodb.internal.operation.WriteThenReadOperationCursor; import com.mongodb.internal.session.BaseClientSessionImpl; import com.mongodb.internal.session.ServerSessionPool; import com.mongodb.lang.Nullable; @@ -88,7 +89,8 @@ public boolean notifyMessageSent() { @Override public void notifyOperationInitiated(final Object operation) { - assertTrue(operation instanceof ReadOperation || operation instanceof WriteOperation); + assertTrue(operation instanceof ReadOperation || operation instanceof WriteOperation + || operation instanceof WriteThenReadOperationCursor); if (!(hasActiveTransaction() || operation instanceof CommitTransactionOperation)) { assertTrue(getPinnedServerAddress() == null || (transactionState != TransactionState.ABORTED && transactionState != TransactionState.NONE)); diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java index 1e8e7fa223..5fdc154707 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java @@ -34,12 +34,14 @@ import com.mongodb.internal.operation.Operations; import com.mongodb.internal.operation.ReadOperation; import com.mongodb.internal.operation.ReadOperationCursor; +import com.mongodb.internal.operation.VoidWriteOperationThenCursorReadOperation; import com.mongodb.internal.operation.WriteOperation; import com.mongodb.lang.Nullable; import com.mongodb.reactivestreams.client.ClientSession; import org.bson.BsonDocument; import org.bson.conversions.Bson; import org.reactivestreams.Publisher; +import reactor.core.publisher.Mono; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -201,10 +203,46 @@ public ReadOperationCursor asReadOperation(final int initialBatchSize) { if (inline) { // initialBatchSize is ignored for map reduce operations. return createMapReduceInlineOperation(); - } else { - return new VoidWriteOperationThenCursorReadOperation<>(createMapReduceToCollectionOperation(), - createFindOperation(initialBatchSize)); } + throw new IllegalStateException("Non-inline map-reduce uses the write-then-read path; " + + "asReadOperation must not be called."); + } + + @Override + public Mono> batchCursor(final int initialBatchSize) { + if (inline) { + return super.batchCursor(initialBatchSize); + } + return writeThenReadBatchCursor(initialBatchSize); + } + + @Override + public Publisher first() { + if (inline) { + return super.first(); + } + return writeThenReadBatchCursor(1) + .flatMap(batchCursor -> { + batchCursor.setBatchSize(1); + return Mono.from(batchCursor.next()) + .doOnTerminate(batchCursor::close) + .flatMap(results -> { + if (results == null || results.isEmpty()) { + return Mono.empty(); + } + return Mono.fromCallable(() -> results.get(0)); + }); + }); + } + + private Mono> writeThenReadBatchCursor(final int initialBatchSize) { + return getMongoOperationPublisher() + .createWriteThenReadOperationMono( + operations -> operations.createTimeoutSettings(maxTimeMS), + () -> new VoidWriteOperationThenCursorReadOperation<>(createMapReduceToCollectionOperation(), + createFindOperation(initialBatchSize)), + getClientSession()) + .map(BatchCursor::new); } private WrappedMapReduceReadOperation createMapReduceInlineOperation() { diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java index 84c810f1b5..e25074a07d 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java @@ -60,6 +60,8 @@ import com.mongodb.internal.TimeoutSettings; import com.mongodb.internal.async.SingleResultCallback; import com.mongodb.internal.bulk.WriteRequest; +import com.mongodb.internal.async.AsyncBatchCursor; +import com.mongodb.internal.operation.WriteThenReadOperationCursor; import com.mongodb.internal.operation.IndexHelper; import com.mongodb.internal.operation.Operations; import com.mongodb.internal.operation.ReadOperation; @@ -513,6 +515,15 @@ Mono createReadOperationMono(final Supplier timeoutSetti .execute(readOperation, readPreference, getReadConcern(), clientSession); } + Mono> createWriteThenReadOperationMono( + final Function, TimeoutSettings> timeoutSettingsFunction, + final Supplier> operationSupplier, + @Nullable final ClientSession clientSession) { + WriteThenReadOperationCursor operation = operationSupplier.get(); + return getExecutor(timeoutSettingsFunction.apply(operations)) + .execute(operation, getReadConcern(), clientSession); + } + Mono createWriteOperationMono(final Function, TimeoutSettings> timeoutSettingsFunction, final Supplier> operationSupplier, @Nullable final ClientSession clientSession) { return createWriteOperationMono(() -> timeoutSettingsFunction.apply(operations), operationSupplier, clientSession); diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutor.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutor.java index cd666720f3..f8f1d1b9a2 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutor.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutor.java @@ -19,8 +19,10 @@ import com.mongodb.ReadConcern; import com.mongodb.ReadPreference; import com.mongodb.internal.TimeoutSettings; +import com.mongodb.internal.async.AsyncBatchCursor; import com.mongodb.internal.operation.ReadOperation; import com.mongodb.internal.operation.WriteOperation; +import com.mongodb.internal.operation.WriteThenReadOperationCursor; import com.mongodb.lang.Nullable; import com.mongodb.reactivestreams.client.ClientSession; import reactor.core.publisher.Mono; @@ -54,6 +56,20 @@ Mono execute(ReadOperation operation, ReadPreference readPreference */ Mono execute(WriteOperation operation, ReadConcern readConcern, @Nullable ClientSession session); + /** + * Execute an operation that writes and then reads a cursor within a single read-write binding. + * + *

The binding is acquired once and used for both phases, avoiding the need to narrow an + * {@code AsyncReadBinding} to an {@code AsyncWriteBinding}. + * + * @param operation the write-then-read operation. + * @param readConcern the read concern + * @param session the session to associate this operation with + * @param the document type returned by the cursor. + */ + Mono> execute(WriteThenReadOperationCursor operation, + ReadConcern readConcern, @Nullable ClientSession session); + /** * Create a new OperationExecutor with a specific timeout settings * diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java index 62a4431cc9..b9ce24b589 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java @@ -33,9 +33,11 @@ import com.mongodb.internal.connection.ReadConcernAwareNoOpSessionContext; import com.mongodb.internal.observability.micrometer.Span; import com.mongodb.internal.observability.micrometer.TracingManager; +import com.mongodb.internal.async.AsyncBatchCursor; import com.mongodb.internal.operation.OperationHelper; import com.mongodb.internal.operation.ReadOperation; import com.mongodb.internal.operation.WriteOperation; +import com.mongodb.internal.operation.WriteThenReadOperationCursor; import com.mongodb.lang.Nullable; import com.mongodb.reactivestreams.client.ClientSession; import com.mongodb.reactivestreams.client.ReactiveContextProvider; @@ -179,6 +181,51 @@ public Mono execute(final WriteOperation operation, final ReadConcern ); } + @Override + public Mono> execute(final WriteThenReadOperationCursor operation, final ReadConcern readConcern, + @Nullable final ClientSession session) { + isTrue("open", !mongoClient.getCluster().isClosed()); + notNull("operation", operation); + notNull("readConcern", readConcern); + + if (session != null) { + session.notifyOperationInitiated(operation); + } + + return Mono.from(subscriber -> + clientSessionHelper.withClientSession(session, this) + .flatMap(actualClientSession -> { + AsyncReadWriteBinding binding = getReadWriteBinding(primary(), actualClientSession, session == null); + RequestContext requestContext = getContext(subscriber); + OperationContext operationContext = getOperationContext(requestContext, actualClientSession, readConcern, operation.getCommandName()) + .withSessionContext(new ClientSessionBinding.AsyncClientSessionContext(actualClientSession, + isImplicitSession(session), readConcern)); + Span span = tracingManager.createOperationSpan(actualClientSession.getTransactionSpan(), + operationContext, operation.getCommandName(), operation.getNamespace()); + + return Mono.>create(sink -> operation.executeAsync(binding, operationContext, (result, t) -> { + try { + binding.release(); + } finally { + if (t != null) { + Throwable exceptionToHandle = t instanceof MongoException + ? OperationHelper.unwrap((MongoException) t) : t; + labelException(session, exceptionToHandle); + unpinServerAddressOnTransientTransactionError(session, exceptionToHandle); + if (span != null) { + span.error(t); + } + } + if (span != null) { + span.end(); + } + sinkToCallback(sink).onResult(result, t); + } + })); + }).subscribe(subscriber) + ); + } + @Override public OperationExecutor withTimeoutSettings(final TimeoutSettings newTimeoutSettings) { if (Objects.equals(timeoutSettings, newTimeoutSettings)) { diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidWriteOperationThenCursorReadOperation.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidWriteOperationThenCursorReadOperation.java deleted file mode 100644 index a7d1191c8b..0000000000 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidWriteOperationThenCursorReadOperation.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright 2008-present MongoDB, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.mongodb.reactivestreams.client.internal; - -import com.mongodb.MongoNamespace; -import com.mongodb.internal.async.AsyncBatchCursor; -import com.mongodb.internal.async.SingleResultCallback; -import com.mongodb.internal.binding.AsyncReadBinding; -import com.mongodb.internal.binding.AsyncWriteBinding; -import com.mongodb.internal.connection.OperationContext; -import com.mongodb.internal.operation.ReadOperationCursor; -import com.mongodb.internal.operation.WriteOperation; - -class VoidWriteOperationThenCursorReadOperation implements ReadOperationCursorAsyncOnly { - private final WriteOperation writeOperation; - private final ReadOperationCursor cursorReadOperation; - - VoidWriteOperationThenCursorReadOperation(final WriteOperation writeOperation, - final ReadOperationCursor cursorReadOperation) { - this.writeOperation = writeOperation; - this.cursorReadOperation = cursorReadOperation; - } - - @Override - public String getCommandName() { - return writeOperation.getCommandName(); - } - - @Override - public MongoNamespace getNamespace() { - return writeOperation.getNamespace(); - } - - @Override - public void executeAsync(final AsyncReadBinding binding, final OperationContext operationContext, final SingleResultCallback> callback) { - writeOperation.executeAsync((AsyncWriteBinding) binding, operationContext, (result, t) -> { - if (t != null) { - callback.onResult(null, t); - } else { - cursorReadOperation.executeAsync(binding, operationContext, callback); - } - }); - } -} diff --git a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MapReducePublisherImplTest.java b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MapReducePublisherImplTest.java index c112395a81..98c84eb7f8 100644 --- a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MapReducePublisherImplTest.java +++ b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MapReducePublisherImplTest.java @@ -20,9 +20,13 @@ import com.mongodb.ReadPreference; import com.mongodb.WriteConcern; import com.mongodb.client.model.Sorts; +import com.mongodb.internal.operation.WriteThenReadOperationCursor; import com.mongodb.internal.operation.MapReduceStatistics; import com.mongodb.internal.operation.MapReduceToCollectionOperation; import com.mongodb.internal.operation.MapReduceWithInlineResultsOperation; +import com.mongodb.reactivestreams.client.MapReducePublisher; + + import org.bson.BsonDocument; import org.bson.BsonInt32; import org.bson.BsonJavaScript; @@ -39,6 +43,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; @SuppressWarnings({"rawtypes", "deprecation"}) @@ -48,6 +53,44 @@ public class MapReducePublisherImplTest extends TestHelper { private static final String REDUCE_FUNCTION = "reduceFunction(){}"; private static final String FINALIZE_FUNCTION = "finalizeFunction(){}"; + @DisplayName("Non-inline MapReduce routes through the write-then-read executor path") + @Test + void shouldRouteNonInlineMapReduceThroughWriteThenReadPath() { + configureBatchCursor(); + TestOperationExecutor executor = createOperationExecutor(asList(getBatchCursor())); + + MapReducePublisher publisher = + new MapReducePublisherImpl<>(null, createMongoOperationPublisher(executor), + MAP_FUNCTION, REDUCE_FUNCTION) + .collectionName(NAMESPACE.getCollectionName()); + + Flux.from(publisher).blockFirst(); + + WriteThenReadOperationCursor op = executor.getWriteThenReadOperation(); + assertNotNull(op, "expected a write-then-read operation"); + assertEquals(NAMESPACE, op.getNamespace()); + // Must not fall through to the plain read-operation path. + assertNull(executor.getReadOperation()); + } + + + @DisplayName("Inline MapReduce still routes through the read-operation path") + @Test + void shouldRouteInlineMapReduceThroughReadOperationPath() { + configureBatchCursor(); + TestOperationExecutor executor = createOperationExecutor(asList(getBatchCursor())); + + MapReducePublisher publisher = + new MapReducePublisherImpl<>(null, createMongoOperationPublisher(executor), + MAP_FUNCTION, REDUCE_FUNCTION); // no collectionName -> inline + + Flux.from(publisher).blockFirst(); + + assertNotNull(executor.getReadOperation()); + assertNull(executor.getWriteThenReadOperation()); + } + + @DisplayName("Should build the expected MapReduceWithInlineResultsOperation") @Test void shouldBuildTheExpectedMapReduceWithInlineResultsOperation() { diff --git a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestHelper.java b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestHelper.java index 450536df2b..a340b2977c 100644 --- a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestHelper.java +++ b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestHelper.java @@ -26,6 +26,7 @@ import com.mongodb.internal.bulk.IndexRequest; import com.mongodb.internal.bulk.WriteRequest; import com.mongodb.internal.client.model.FindOptions; +import com.mongodb.internal.operation.WriteThenReadOperationCursor; import com.mongodb.internal.operation.ReadOperation; import com.mongodb.internal.operation.WriteOperation; import com.mongodb.lang.NonNull; @@ -93,7 +94,10 @@ public class TestHelper { Mockito.lenient().doAnswer(invocation -> Mono.empty()) .when(executor) - .execute(any(), any(), any()); + .execute(any(WriteOperation.class), any(), any()); + Mockito.lenient().doAnswer(invocation -> Mono.empty()) + .when(executor) + .execute(any(WriteThenReadOperationCursor.class), any(), any()); Mockito.lenient().doAnswer(invocation -> Mono.empty()) .when(executor) .execute(any(), any(), any(), any()); diff --git a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestOperationExecutor.java b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestOperationExecutor.java index 831d22b308..df2314a52c 100644 --- a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestOperationExecutor.java +++ b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestOperationExecutor.java @@ -19,6 +19,8 @@ import com.mongodb.ReadConcern; import com.mongodb.ReadPreference; import com.mongodb.internal.TimeoutSettings; +import com.mongodb.internal.async.AsyncBatchCursor; +import com.mongodb.internal.operation.WriteThenReadOperationCursor; import com.mongodb.internal.operation.ReadOperation; import com.mongodb.internal.operation.WriteOperation; import com.mongodb.lang.Nullable; @@ -37,6 +39,7 @@ public class TestOperationExecutor implements OperationExecutor { private final List readOperations = new ArrayList<>(); private final List writeOperations = new ArrayList<>(); + private final List writeThenReadOperations = new ArrayList<>(); public TestOperationExecutor(final List responses) { this.responses = new ArrayList<>(responses); @@ -60,6 +63,14 @@ public Mono execute(final WriteOperation operation, final ReadConcern return createMono(); } + @Override + public Mono> execute(final WriteThenReadOperationCursor operation, final ReadConcern readConcern, + @Nullable final ClientSession session) { + clientSessions.add(session); + writeThenReadOperations.add(operation); + return createMono(); + } + @Override public OperationExecutor withTimeoutSettings(final TimeoutSettings timeoutSettings) { return this; @@ -106,4 +117,9 @@ WriteOperation getWriteOperation() { return writeOperations.isEmpty() ? null : writeOperations.remove(0); } + @Nullable + WriteThenReadOperationCursor getWriteThenReadOperation() { + return writeThenReadOperations.isEmpty() ? null : writeThenReadOperations.remove(0); + } + } diff --git a/driver-sync/src/main/com/mongodb/client/internal/ChangeStreamIterableImpl.java b/driver-sync/src/main/com/mongodb/client/internal/ChangeStreamIterableImpl.java index b5b41b375f..c3c12d086d 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/ChangeStreamIterableImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/ChangeStreamIterableImpl.java @@ -136,7 +136,7 @@ public MongoCursor iterator() { @Override public MongoChangeStreamCursor cursor() { - return new MongoChangeStreamCursorImpl<>(execute(), codecRegistry.get(clazz), initialResumeToken()); + return new MongoChangeStreamCursorImpl<>(executeChangeStream(), codecRegistry.get(clazz), initialResumeToken()); } @Override @@ -190,7 +190,7 @@ public MongoCursor> iterator() { @Override public MongoChangeStreamCursor> cursor() { - return new MongoChangeStreamCursorImpl<>(execute(), codec, initialResumeToken()); + return new MongoChangeStreamCursorImpl<>(executeChangeStream(), codec, initialResumeToken()); } @Nullable @@ -219,7 +219,7 @@ private ReadOperationCursor createChangeStreamOperation() { getBatchSize(), collation, comment, resumeToken, startAtOperationTime, startAfter, showExpandedEvents); } - private BatchCursor execute() { + private BatchCursor executeChangeStream() { return getExecutor().execute(createChangeStreamOperation(), getReadPreference(), getReadConcern(), getClientSession()); } diff --git a/driver-sync/src/main/com/mongodb/client/internal/ClientSessionImpl.java b/driver-sync/src/main/com/mongodb/client/internal/ClientSessionImpl.java index c717a539a8..a5555b9eb6 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/ClientSessionImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/ClientSessionImpl.java @@ -34,6 +34,7 @@ import com.mongodb.internal.operation.ReadOperation; import com.mongodb.internal.operation.WriteConcernHelper; import com.mongodb.internal.operation.WriteOperation; +import com.mongodb.internal.operation.WriteThenReadOperationCursor; import com.mongodb.internal.session.BaseClientSessionImpl; import com.mongodb.internal.session.ServerSessionPool; import com.mongodb.internal.observability.micrometer.TracingManager; @@ -88,7 +89,8 @@ public boolean notifyMessageSent() { @Override public void notifyOperationInitiated(final Object operation) { - assertTrue(operation instanceof ReadOperation || operation instanceof WriteOperation); + assertTrue(operation instanceof ReadOperation || operation instanceof WriteOperation + || operation instanceof WriteThenReadOperationCursor); if (!(hasActiveTransaction() || operation instanceof CommitTransactionOperation)) { assertTrue(getPinnedServerAddress() == null || (transactionState != TransactionState.ABORTED && transactionState != TransactionState.NONE)); diff --git a/driver-sync/src/main/com/mongodb/client/internal/MapReduceIterableImpl.java b/driver-sync/src/main/com/mongodb/client/internal/MapReduceIterableImpl.java index fd79242766..722751b8b5 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/MapReduceIterableImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/MapReduceIterableImpl.java @@ -27,7 +27,9 @@ import com.mongodb.internal.async.AsyncBatchCursor; import com.mongodb.internal.async.SingleResultCallback; import com.mongodb.internal.binding.AsyncReadBinding; +import com.mongodb.internal.binding.AsyncWriteBinding; import com.mongodb.internal.binding.ReadBinding; +import com.mongodb.internal.binding.WriteBinding; import com.mongodb.internal.client.model.FindOptions; import com.mongodb.internal.connection.OperationContext; import com.mongodb.internal.operation.BatchCursor; @@ -35,6 +37,7 @@ import com.mongodb.internal.operation.Operations; import com.mongodb.internal.operation.ReadOperationCursor; import com.mongodb.internal.operation.ReadOperationMapReduceCursor; +import com.mongodb.internal.operation.VoidWriteOperationThenCursorReadOperation; import com.mongodb.internal.operation.WriteOperation; import com.mongodb.lang.Nullable; import org.bson.BsonDocument; @@ -203,25 +206,35 @@ public ReadOperationCursor asReadOperation() { ReadOperationMapReduceCursor operation = operations.mapReduce(mapFunction, reduceFunction, finalizeFunction, resultClass, filter, limit, jsMode, scope, sort, verbose, collation); return new WrappedMapReduceReadOperation<>(operation); - } else { - getExecutor().execute(createMapReduceToCollectionOperation(), getReadConcern(), getClientSession()); - - String dbName = databaseName != null ? databaseName : namespace.getDatabaseName(); + } + throw new IllegalStateException("Non-inline map-reduce uses the write-then-read path; " + + "asReadOperation must not be called."); + } - FindOptions findOptions = new FindOptions().collation(collation); - Integer batchSize = getBatchSize(); - if (batchSize != null) { - findOptions.batchSize(batchSize); - } - return operations.find(new MongoNamespace(dbName, collectionName), new BsonDocument(), resultClass, findOptions); + @Override + BatchCursor execute() { + if (inline) { + return super.execute(); } + return getExecutor().execute( + new VoidWriteOperationThenCursorReadOperation<>(createMapReduceToCollectionOperation(), createFindOperation()), + getReadConcern(), getClientSession()); + } + private WriteOperation createMapReduceToCollectionOperation() { + return new WrappedMapReduceWriteOperation( + operations.mapReduceToCollection(databaseName, collectionName, mapFunction, reduceFunction, finalizeFunction, filter, + limit, jsMode, scope, sort, verbose, action, bypassDocumentValidation, collation)); } - private WriteOperation createMapReduceToCollectionOperation() { - return operations.mapReduceToCollection(databaseName, collectionName, mapFunction, reduceFunction, finalizeFunction, filter, - limit, jsMode, scope, sort, verbose, action, bypassDocumentValidation, collation - ); + private ReadOperationCursor createFindOperation() { + String dbName = databaseName != null ? databaseName : namespace.getDatabaseName(); + FindOptions findOptions = new FindOptions().collation(collation); + Integer batchSize = getBatchSize(); + if (batchSize != null) { + findOptions.batchSize(batchSize); + } + return operations.find(new MongoNamespace(dbName, collectionName), new BsonDocument(), resultClass, findOptions); } // this could be inlined, but giving it a name so that it's unit-testable @@ -256,4 +269,37 @@ public void executeAsync(final AsyncReadBinding binding, final OperationContext throw new UnsupportedOperationException("This operation is sync only"); } } + + static class WrappedMapReduceWriteOperation implements WriteOperation { + private final WriteOperation operation; + + WrappedMapReduceWriteOperation(final WriteOperation operation) { + this.operation = operation; + } + + WriteOperation getOperation() { + return operation; + } + + @Override + public String getCommandName() { + return operation.getCommandName(); + } + + @Override + public MongoNamespace getNamespace() { + return operation.getNamespace(); + } + + @Override + public Void execute(final WriteBinding binding, final OperationContext operationContext) { + operation.execute(binding, operationContext); + return null; + } + + @Override + public void executeAsync(final AsyncWriteBinding binding, final OperationContext operationContext, final SingleResultCallback callback) { + throw new UnsupportedOperationException("This operation is sync only"); + } + } } diff --git a/driver-sync/src/main/com/mongodb/client/internal/MongoClusterImpl.java b/driver-sync/src/main/com/mongodb/client/internal/MongoClusterImpl.java index 8cd885f646..38173912af 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/MongoClusterImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/MongoClusterImpl.java @@ -54,10 +54,12 @@ import com.mongodb.internal.connection.ReadConcernAwareNoOpSessionContext; import com.mongodb.internal.observability.micrometer.Span; import com.mongodb.internal.observability.micrometer.TracingManager; +import com.mongodb.internal.operation.BatchCursor; import com.mongodb.internal.operation.OperationHelper; import com.mongodb.internal.operation.Operations; import com.mongodb.internal.operation.ReadOperation; import com.mongodb.internal.operation.WriteOperation; +import com.mongodb.internal.operation.WriteThenReadOperationCursor; import com.mongodb.internal.session.ServerSessionPool; import com.mongodb.lang.Nullable; import org.bson.BsonDocument; @@ -487,6 +489,38 @@ public T execute(final WriteOperation operation, final ReadConcern readCo } } + @Override + public BatchCursor execute(final WriteThenReadOperationCursor operation, final ReadConcern readConcern, + @Nullable final ClientSession session) { + if (session != null) { + session.notifyOperationInitiated(operation); + } + + ClientSession actualClientSession = getClientSession(session); + OperationContext operationContext = getOperationContext(actualClientSession, readConcern, operation.getCommandName()) + .withSessionContext(new ClientSessionBinding.SyncClientSessionContext(actualClientSession, readConcern, isImplicitSession(session))); + Span span = operationContext.getTracingManager().createOperationSpan( + actualClientSession.getTransactionSpan(), operationContext, operation.getCommandName(), operation.getNamespace()); + ReadWriteBinding binding = getReadWriteBinding(primary(), actualClientSession, isImplicitSession(session)); + + try { + return operation.execute(binding, operationContext); + } catch (MongoException e) { + MongoException exceptionToHandle = OperationHelper.unwrap(e); + labelException(actualClientSession, exceptionToHandle); + clearTransactionContextOnTransientTransactionError(session, exceptionToHandle); + if (span != null) { + span.error(e); + } + throw e; + } finally { + binding.release(); + if (span != null) { + span.end(); + } + } + } + @Override public OperationExecutor withTimeoutSettings(final TimeoutSettings newTimeoutSettings) { if (Objects.equals(executorTimeoutSettings, newTimeoutSettings)) { diff --git a/driver-sync/src/main/com/mongodb/client/internal/MongoIterableImpl.java b/driver-sync/src/main/com/mongodb/client/internal/MongoIterableImpl.java index b642f0f118..d04691606d 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/MongoIterableImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/MongoIterableImpl.java @@ -152,7 +152,7 @@ public > A into(final A target) { return target; } - private BatchCursor execute() { + BatchCursor execute() { return getExecutor().execute(asReadOperation(), readPreference, readConcern, clientSession); } diff --git a/driver-sync/src/main/com/mongodb/client/internal/OperationExecutor.java b/driver-sync/src/main/com/mongodb/client/internal/OperationExecutor.java index 1ec19483af..b8bbc2f8a8 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/OperationExecutor.java +++ b/driver-sync/src/main/com/mongodb/client/internal/OperationExecutor.java @@ -20,8 +20,10 @@ import com.mongodb.ReadPreference; import com.mongodb.client.ClientSession; import com.mongodb.internal.TimeoutSettings; +import com.mongodb.internal.operation.BatchCursor; import com.mongodb.internal.operation.ReadOperation; import com.mongodb.internal.operation.WriteOperation; +import com.mongodb.internal.operation.WriteThenReadOperationCursor; import com.mongodb.lang.Nullable; /** @@ -75,6 +77,20 @@ public interface OperationExecutor { */ T execute(WriteOperation operation, ReadConcern readConcern, @Nullable ClientSession session); + /** + * Execute an operation that writes and then reads a cursor within a single read-write binding. + * + *

The binding is acquired once and used for both phases, avoiding the need to narrow a + * {@code ReadBinding} to a {@code WriteBinding}. + * + * @param the document type returned by the cursor. + * @param operation the write-then-read operation. + * @param readConcern the read concern + * @param session the session to associate this operation with + * @return the batch cursor produced by the read phase. + */ + BatchCursor execute(WriteThenReadOperationCursor operation, ReadConcern readConcern, @Nullable ClientSession session); + /** * Create a new OperationExecutor with a specific timeout settings * diff --git a/driver-sync/src/test/unit/com/mongodb/client/internal/TestOperationExecutor.java b/driver-sync/src/test/unit/com/mongodb/client/internal/TestOperationExecutor.java index adcfaa0f90..10b6c9e9ba 100644 --- a/driver-sync/src/test/unit/com/mongodb/client/internal/TestOperationExecutor.java +++ b/driver-sync/src/test/unit/com/mongodb/client/internal/TestOperationExecutor.java @@ -20,8 +20,10 @@ import com.mongodb.ReadPreference; import com.mongodb.client.ClientSession; import com.mongodb.internal.TimeoutSettings; +import com.mongodb.internal.operation.BatchCursor; import com.mongodb.internal.operation.ReadOperation; import com.mongodb.internal.operation.WriteOperation; +import com.mongodb.internal.operation.WriteThenReadOperationCursor; import com.mongodb.lang.Nullable; import java.util.ArrayList; @@ -36,6 +38,7 @@ public class TestOperationExecutor implements OperationExecutor { private final List readConcerns = new ArrayList<>(); private final List readOperations = new ArrayList<>(); private final List writeOperations = new ArrayList<>(); + private final List writeThenReadOperations = new ArrayList<>(); public TestOperationExecutor(final List responses) { this.responses = responses; @@ -69,6 +72,15 @@ public T execute(final WriteOperation operation, final ReadConcern readCo return getResponse(); } + @Override + public BatchCursor execute(final WriteThenReadOperationCursor operation, final ReadConcern readConcern, + @Nullable final ClientSession session) { + clientSessions.add(session); + writeThenReadOperations.add(operation); + readConcerns.add(readConcern); + return getResponse(); + } + @Override public OperationExecutor withTimeoutSettings(final TimeoutSettings timeoutSettings) { return this; @@ -111,4 +123,9 @@ public ReadConcern getReadConcern() { public WriteOperation getWriteOperation() { return writeOperations.isEmpty() ? null : writeOperations.remove(0); } + + @Nullable + public WriteThenReadOperationCursor getWriteThenReadOperation() { + return writeThenReadOperations.isEmpty() ? null : writeThenReadOperations.remove(0); + } }