Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.mongodb.internal.operation;

import com.mongodb.MongoNamespace;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncReadWriteBinding;
import com.mongodb.internal.connection.OperationContext;

/**
* An async-only operation that performs a write followed by a read that returns a cursor.
*
* <p>Unlike {@link ReadOperationCursor}, this operation requires an {@link AsyncReadWriteBinding}
* so that both the write and the read portions can be executed without narrowing casts.
*
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*/
public interface AsyncWriteThenReadOperationCursor<T> {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a new interface that will be used by OperationExecutorImpl
It adds a new override to execute that accepts this interface instead of ReadOperation
All methods here were copied from ReadOperation interface


/**
* @return the command name of the operation, e.g. "insert", "update", "delete", "bulkWrite", etc.
*/
String getCommandName();

/**
* @return the namespace of the operation
*/
MongoNamespace getNamespace();

/**
* General execute which can return anything of type T
*
* @param binding the binding to execute in the context of
* @param operationContext the operation context to use
* @param callback the callback to be called when the operation has been executed
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the javadoc

*/
void executeAsync(AsyncReadWriteBinding binding, OperationContext operationContext,
SingleResultCallback<AsyncBatchCursor<T>> callback);
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.bson.BsonDocument;
import org.bson.conversions.Bson;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

import java.util.concurrent.TimeUnit;
import java.util.function.Function;
Expand Down Expand Up @@ -201,10 +202,46 @@ public ReadOperationCursor<T> asReadOperation(final int initialBatchSize) {
if (inline) {
// initialBatchSize is ignored for map reduce operations.
return createMapReduceInlineOperation();
} else {
return new VoidWriteOperationThenCursorReadOperation<>(createMapReduceToCollectionOperation(),
createFindOperation(initialBatchSize));
}
throw new IllegalStateException("Non-inline map-reduce uses the write-then-read path; "
+ "asReadOperation must not be called.");
}

@Override
public Mono<BatchCursor<T>> batchCursor(final int initialBatchSize) {
if (inline) {
return super.batchCursor(initialBatchSize);
}
return writeThenReadBatchCursor(initialBatchSize);
}
Comment on lines +212 to +217
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a unit test here


@Override
public Publisher<T> first() {
if (inline) {
return super.first();
}
return writeThenReadBatchCursor(1)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this number 1 comes from BatchCursorPublisher

.flatMap(batchCursor -> {
batchCursor.setBatchSize(1);
return Mono.from(batchCursor.next())
.doOnTerminate(batchCursor::close)
.flatMap(results -> {
if (results == null || results.isEmpty()) {
return Mono.empty();
}
return Mono.fromCallable(() -> results.get(0));
});
});
}

private Mono<BatchCursor<T>> writeThenReadBatchCursor(final int initialBatchSize) {
return getMongoOperationPublisher()
.createWriteThenReadOperationMono(
operations -> operations.createTimeoutSettings(maxTimeMS),
() -> new VoidWriteOperationThenCursorReadOperation<>(createMapReduceToCollectionOperation(),
createFindOperation(initialBatchSize)),
getClientSession())
.map(BatchCursor::new);
}

private WrappedMapReduceReadOperation<T> createMapReduceInlineOperation() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
import com.mongodb.internal.TimeoutSettings;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.bulk.WriteRequest;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
import com.mongodb.internal.operation.IndexHelper;
import com.mongodb.internal.operation.Operations;
import com.mongodb.internal.operation.ReadOperation;
Expand Down Expand Up @@ -513,6 +515,15 @@ <T> Mono<T> createReadOperationMono(final Supplier<TimeoutSettings> timeoutSetti
.execute(readOperation, readPreference, getReadConcern(), clientSession);
}

<R> Mono<AsyncBatchCursor<R>> createWriteThenReadOperationMono(
final Function<Operations<?>, TimeoutSettings> timeoutSettingsFunction,
final Supplier<AsyncWriteThenReadOperationCursor<R>> operationSupplier,
@Nullable final ClientSession clientSession) {
AsyncWriteThenReadOperationCursor<R> operation = operationSupplier.get();
return getExecutor(timeoutSettingsFunction.apply(operations))
.execute(operation, getReadConcern(), clientSession);
}

<R> Mono<R> createWriteOperationMono(final Function<Operations<?>, TimeoutSettings> timeoutSettingsFunction,
final Supplier<WriteOperation<R>> operationSupplier, @Nullable final ClientSession clientSession) {
return createWriteOperationMono(() -> timeoutSettingsFunction.apply(operations), operationSupplier, clientSession);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.mongodb.ReadConcern;
import com.mongodb.ReadPreference;
import com.mongodb.internal.TimeoutSettings;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
import com.mongodb.internal.operation.ReadOperation;
import com.mongodb.internal.operation.WriteOperation;
import com.mongodb.lang.Nullable;
Expand Down Expand Up @@ -54,6 +56,20 @@ <T> Mono<T> execute(ReadOperation<?, T> operation, ReadPreference readPreference
*/
<T> Mono<T> execute(WriteOperation<T> operation, ReadConcern readConcern, @Nullable ClientSession session);

/**
* Execute an operation that writes and then reads a cursor within a single read-write binding.
*
* <p>The binding is acquired once and used for both phases, avoiding the need to narrow an
* {@code AsyncReadBinding} to an {@code AsyncWriteBinding}.
*
* @param operation the write-then-read operation.
* @param readConcern the read concern
* @param session the session to associate this operation with
* @param <T> the document type returned by the cursor.
*/
<T> Mono<AsyncBatchCursor<T>> execute(AsyncWriteThenReadOperationCursor<T> operation,
ReadConcern readConcern, @Nullable ClientSession session);

/**
* Create a new OperationExecutor with a specific timeout settings
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import com.mongodb.internal.connection.ReadConcernAwareNoOpSessionContext;
import com.mongodb.internal.observability.micrometer.Span;
import com.mongodb.internal.observability.micrometer.TracingManager;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
import com.mongodb.internal.operation.OperationHelper;
import com.mongodb.internal.operation.ReadOperation;
import com.mongodb.internal.operation.WriteOperation;
Expand Down Expand Up @@ -179,6 +181,47 @@ public <T> Mono<T> execute(final WriteOperation<T> operation, final ReadConcern
);
}

@Override
public <T> Mono<AsyncBatchCursor<T>> execute(final AsyncWriteThenReadOperationCursor<T> operation, final ReadConcern readConcern,
@Nullable final ClientSession session) {
isTrue("open", !mongoClient.getCluster().isClosed());
notNull("operation", operation);
notNull("readConcern", readConcern);

Comment on lines +184 to +190
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added notifyOperationInitiated call , also implementationf of notifyOperationInitiated now also checked instance of AsyncWriteThenRead

return Mono.from(subscriber ->
clientSessionHelper.withClientSession(session, this)
.flatMap(actualClientSession -> {
AsyncReadWriteBinding binding = getReadWriteBinding(primary(), actualClientSession, session == null);
RequestContext requestContext = getContext(subscriber);
OperationContext operationContext = getOperationContext(requestContext, actualClientSession, readConcern, operation.getCommandName())
.withSessionContext(new ClientSessionBinding.AsyncClientSessionContext(actualClientSession,
isImplicitSession(session), readConcern));
Span span = tracingManager.createOperationSpan(actualClientSession.getTransactionSpan(),
operationContext, operation.getCommandName(), operation.getNamespace());

return Mono.<AsyncBatchCursor<T>>create(sink -> operation.executeAsync(binding, operationContext, (result, t) -> {
try {
binding.release();
} finally {
if (t != null) {
Throwable exceptionToHandle = t instanceof MongoException
? OperationHelper.unwrap((MongoException) t) : t;
labelException(session, exceptionToHandle);
unpinServerAddressOnTransientTransactionError(session, exceptionToHandle);
if (span != null) {
span.error(t);
}
}
if (span != null) {
span.end();
}
sinkToCallback(sink).onResult(result, t);
}
}));
}).subscribe(subscriber)
);
}

@Override
public OperationExecutor withTimeoutSettings(final TimeoutSettings newTimeoutSettings) {
if (Objects.equals(timeoutSettings, newTimeoutSettings)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
import com.mongodb.MongoNamespace;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncReadBinding;
import com.mongodb.internal.binding.AsyncWriteBinding;
import com.mongodb.internal.binding.AsyncReadWriteBinding;
import com.mongodb.internal.connection.OperationContext;
import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
import com.mongodb.internal.operation.ReadOperationCursor;
import com.mongodb.internal.operation.WriteOperation;

class VoidWriteOperationThenCursorReadOperation<T> implements ReadOperationCursorAsyncOnly<T> {
class VoidWriteOperationThenCursorReadOperation<T> implements AsyncWriteThenReadOperationCursor<T> {
private final WriteOperation<Void> writeOperation;
private final ReadOperationCursor<T> cursorReadOperation;

Expand All @@ -46,8 +46,9 @@ public MongoNamespace getNamespace() {
}

@Override
public void executeAsync(final AsyncReadBinding binding, final OperationContext operationContext, final SingleResultCallback<AsyncBatchCursor<T>> callback) {
writeOperation.executeAsync((AsyncWriteBinding) binding, operationContext, (result, t) -> {
public void executeAsync(final AsyncReadWriteBinding binding, final OperationContext operationContext,
final SingleResultCallback<AsyncBatchCursor<T>> callback) {
writeOperation.executeAsync(binding, operationContext, (result, t) -> {
if (t != null) {
callback.onResult(null, t);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
import com.mongodb.internal.operation.MapReduceStatistics;
import com.mongodb.internal.operation.MapReduceToCollectionOperation;
import com.mongodb.internal.operation.MapReduceWithInlineResultsOperation;

import com.mongodb.reactivestreams.client.MapReducePublisher;


Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed empty line

Comment thread
strogiyotec marked this conversation as resolved.
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.BsonJavaScript;
Expand All @@ -39,6 +43,7 @@
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;

@SuppressWarnings({"rawtypes", "deprecation"})
Expand All @@ -48,6 +53,23 @@ public class MapReducePublisherImplTest extends TestHelper {
private static final String REDUCE_FUNCTION = "reduceFunction(){}";
private static final String FINALIZE_FUNCTION = "finalizeFunction(){}";

@DisplayName("Inline MapReduce still routes through the read-operation path")
@Test
void shouldRouteInlineMapReduceThroughReadOperationPath() {
configureBatchCursor();
TestOperationExecutor executor = createOperationExecutor(asList(getBatchCursor()));

MapReducePublisher<Document> publisher =
new MapReducePublisherImpl<>(null, createMongoOperationPublisher(executor),
MAP_FUNCTION, REDUCE_FUNCTION); // no collectionName -> inline

Flux.from(publisher).blockFirst();

assertNotNull(executor.getReadOperation());
assertNull(executor.getWriteThenReadOperation());
}


@DisplayName("Should build the expected MapReduceWithInlineResultsOperation")
@Test
void shouldBuildTheExpectedMapReduceWithInlineResultsOperation() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.mongodb.internal.bulk.IndexRequest;
import com.mongodb.internal.bulk.WriteRequest;
import com.mongodb.internal.client.model.FindOptions;
import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
import com.mongodb.internal.operation.ReadOperation;
import com.mongodb.internal.operation.WriteOperation;
import com.mongodb.lang.NonNull;
Expand Down Expand Up @@ -93,7 +94,10 @@ public class TestHelper {

Mockito.lenient().doAnswer(invocation -> Mono.empty())
.when(executor)
.execute(any(), any(), any());
.execute(any(WriteOperation.class), any(), any());
Mockito.lenient().doAnswer(invocation -> Mono.empty())
.when(executor)
.execute(any(AsyncWriteThenReadOperationCursor.class), any(), any());
Mockito.lenient().doAnswer(invocation -> Mono.empty())
.when(executor)
.execute(any(), any(), any(), any());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.mongodb.ReadConcern;
import com.mongodb.ReadPreference;
import com.mongodb.internal.TimeoutSettings;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
import com.mongodb.internal.operation.ReadOperation;
import com.mongodb.internal.operation.WriteOperation;
import com.mongodb.lang.Nullable;
Expand All @@ -37,6 +39,7 @@ public class TestOperationExecutor implements OperationExecutor {

private final List<ReadOperation> readOperations = new ArrayList<>();
private final List<WriteOperation> writeOperations = new ArrayList<>();
private final List<AsyncWriteThenReadOperationCursor> writeThenReadOperations = new ArrayList<>();

public TestOperationExecutor(final List<Object> responses) {
this.responses = new ArrayList<>(responses);
Expand All @@ -60,6 +63,14 @@ public <T> Mono<T> execute(final WriteOperation<T> operation, final ReadConcern
return createMono();
}

@Override
public <T> Mono<AsyncBatchCursor<T>> execute(final AsyncWriteThenReadOperationCursor<T> operation, final ReadConcern readConcern,
@Nullable final ClientSession session) {
clientSessions.add(session);
writeThenReadOperations.add(operation);
return createMono();
}

@Override
public OperationExecutor withTimeoutSettings(final TimeoutSettings timeoutSettings) {
return this;
Expand Down Expand Up @@ -106,4 +117,9 @@ WriteOperation getWriteOperation() {
return writeOperations.isEmpty() ? null : writeOperations.remove(0);
}

@Nullable
AsyncWriteThenReadOperationCursor getWriteThenReadOperation() {
return writeThenReadOperations.isEmpty() ? null : writeThenReadOperations.remove(0);
}

}