Skip to content

Commit 2f1a517

Browse files
author
Almas Abdrazak
committed
address copilot feedback
1 parent 71e84a5 commit 2f1a517

3 files changed

Lines changed: 28 additions & 2 deletions

File tree

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.mongodb.internal.observability.micrometer.TracingManager;
2828
import com.mongodb.internal.observability.micrometer.TransactionSpan;
2929
import com.mongodb.internal.operation.AbortTransactionOperation;
30+
import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
3031
import com.mongodb.internal.operation.CommitTransactionOperation;
3132
import com.mongodb.internal.operation.ReadOperation;
3233
import com.mongodb.internal.operation.WriteConcernHelper;
@@ -88,7 +89,7 @@ public boolean notifyMessageSent() {
8889

8990
@Override
9091
public void notifyOperationInitiated(final Object operation) {
91-
assertTrue(operation instanceof ReadOperation || operation instanceof WriteOperation);
92+
assertTrue(operation instanceof ReadOperation || operation instanceof WriteOperation || operation instanceof AsyncWriteThenReadOperationCursor);
9293
if (!(hasActiveTransaction() || operation instanceof CommitTransactionOperation)) {
9394
assertTrue(getPinnedServerAddress() == null
9495
|| (transactionState != TransactionState.ABORTED && transactionState != TransactionState.NONE));

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,10 @@ public <T> Mono<AsyncBatchCursor<T>> execute(final AsyncWriteThenReadOperationCu
188188
notNull("operation", operation);
189189
notNull("readConcern", readConcern);
190190

191+
if (session != null) {
192+
session.notifyOperationInitiated(operation);
193+
}
194+
191195
return Mono.from(subscriber ->
192196
clientSessionHelper.withClientSession(session, this)
193197
.flatMap(actualClientSession -> {

driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MapReducePublisherImplTest.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@
2020
import com.mongodb.ReadPreference;
2121
import com.mongodb.WriteConcern;
2222
import com.mongodb.client.model.Sorts;
23+
import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
2324
import com.mongodb.internal.operation.MapReduceStatistics;
2425
import com.mongodb.internal.operation.MapReduceToCollectionOperation;
2526
import com.mongodb.internal.operation.MapReduceWithInlineResultsOperation;
26-
2727
import com.mongodb.reactivestreams.client.MapReducePublisher;
2828

2929

@@ -53,6 +53,27 @@ public class MapReducePublisherImplTest extends TestHelper {
5353
private static final String REDUCE_FUNCTION = "reduceFunction(){}";
5454
private static final String FINALIZE_FUNCTION = "finalizeFunction(){}";
5555

56+
@DisplayName("Non-inline MapReduce routes through the write-then-read executor path")
57+
@Test
58+
void shouldRouteNonInlineMapReduceThroughWriteThenReadPath() {
59+
configureBatchCursor();
60+
TestOperationExecutor executor = createOperationExecutor(asList(getBatchCursor()));
61+
62+
MapReducePublisher<Document> publisher =
63+
new MapReducePublisherImpl<>(null, createMongoOperationPublisher(executor),
64+
MAP_FUNCTION, REDUCE_FUNCTION)
65+
.collectionName(NAMESPACE.getCollectionName());
66+
67+
Flux.from(publisher).blockFirst();
68+
69+
AsyncWriteThenReadOperationCursor<?> op = executor.getWriteThenReadOperation();
70+
assertNotNull(op, "expected a write-then-read operation");
71+
assertEquals(NAMESPACE, op.getNamespace());
72+
// Must not fall through to the plain read-operation path.
73+
assertNull(executor.getReadOperation());
74+
}
75+
76+
5677
@DisplayName("Inline MapReduce still routes through the read-operation path")
5778
@Test
5879
void shouldRouteInlineMapReduceThroughReadOperationPath() {

0 commit comments

Comments
 (0)