Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.mongodb.internal.operation;

import com.mongodb.MongoNamespace;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncReadWriteBinding;
import com.mongodb.internal.binding.ReadWriteBinding;
import com.mongodb.internal.connection.OperationContext;

/**
* A {@link WriteThenReadOperationCursor} that performs a {@link WriteOperation} returning
* {@code Void} followed by a {@link ReadOperationCursor}, using a single read-write
* binding for both phases.
*
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*/
public final class VoidWriteOperationThenCursorReadOperation<T> implements WriteThenReadOperationCursor<T> {
private final WriteOperation<Void> writeOperation;
private final ReadOperationCursor<T> cursorReadOperation;

public VoidWriteOperationThenCursorReadOperation(final WriteOperation<Void> writeOperation,
final ReadOperationCursor<T> cursorReadOperation) {
this.writeOperation = writeOperation;
this.cursorReadOperation = cursorReadOperation;
}

public WriteOperation<Void> getWriteOperation() {
return writeOperation;
}

public ReadOperationCursor<T> getCursorReadOperation() {
return cursorReadOperation;
}

@Override
public String getCommandName() {
return writeOperation.getCommandName();
}

@Override
public MongoNamespace getNamespace() {
return writeOperation.getNamespace();
}

@Override
public BatchCursor<T> execute(final ReadWriteBinding binding, final OperationContext operationContext) {
writeOperation.execute(binding, operationContext);
return cursorReadOperation.execute(binding, operationContext);
}

@Override
public void executeAsync(final AsyncReadWriteBinding binding, final OperationContext operationContext,
final SingleResultCallback<AsyncBatchCursor<T>> callback) {
writeOperation.executeAsync(binding, operationContext, (result, t) -> {
if (t != null) {
callback.onResult(null, t);
} else {
cursorReadOperation.executeAsync(binding, operationContext, callback);
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.mongodb.internal.operation;

import com.mongodb.MongoNamespace;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncReadWriteBinding;
import com.mongodb.internal.binding.ReadWriteBinding;
import com.mongodb.internal.connection.OperationContext;

/**
* An operation that performs a write followed by a read that returns a cursor, using a
* single read-write binding for both phases.
*
* <p>Having a dedicated interface lets the executor hand the operation a binding that is
* both a {@link ReadWriteBinding} and an {@link AsyncReadWriteBinding}, avoiding the
* narrowing cast from a read-only binding that would otherwise be required.
*
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*/
public interface WriteThenReadOperationCursor<T> {

/**
* @return the command name of the write phase of this operation (e.g. "mapReduce", "aggregate")
*/
String getCommandName();

/**
* @return the namespace the write phase targets
*/
MongoNamespace getNamespace();

/**
* Executes the write phase followed by the read phase, returning a {@link BatchCursor}
* over the results of the read.
*
* @param binding the read-write binding used by both phases
* @param operationContext the operation context to use
* @return the batch cursor produced by the read phase
*/
BatchCursor<T> execute(ReadWriteBinding binding, OperationContext operationContext);

/**
* Executes the write phase followed by the read phase, yielding an {@link AsyncBatchCursor}
* over the results of the read.
*
* @param binding the read-write binding used by both phases
* @param operationContext the operation context to use
* @param callback receives the {@link AsyncBatchCursor} on success, or the failure of either phase
*/
void executeAsync(AsyncReadWriteBinding binding, OperationContext operationContext,
SingleResultCallback<AsyncBatchCursor<T>> callback);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.mongodb.internal.operation.ReadOperation;
import com.mongodb.internal.operation.WriteConcernHelper;
import com.mongodb.internal.operation.WriteOperation;
import com.mongodb.internal.operation.WriteThenReadOperationCursor;
import com.mongodb.internal.session.BaseClientSessionImpl;
import com.mongodb.internal.session.ServerSessionPool;
import com.mongodb.lang.Nullable;
Expand Down Expand Up @@ -88,7 +89,8 @@ public boolean notifyMessageSent() {

@Override
public void notifyOperationInitiated(final Object operation) {
assertTrue(operation instanceof ReadOperation || operation instanceof WriteOperation);
assertTrue(operation instanceof ReadOperation || operation instanceof WriteOperation
|| operation instanceof WriteThenReadOperationCursor);
if (!(hasActiveTransaction() || operation instanceof CommitTransactionOperation)) {
assertTrue(getPinnedServerAddress() == null
|| (transactionState != TransactionState.ABORTED && transactionState != TransactionState.NONE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@
import com.mongodb.internal.operation.Operations;
import com.mongodb.internal.operation.ReadOperation;
import com.mongodb.internal.operation.ReadOperationCursor;
import com.mongodb.internal.operation.VoidWriteOperationThenCursorReadOperation;
import com.mongodb.internal.operation.WriteOperation;
import com.mongodb.lang.Nullable;
import com.mongodb.reactivestreams.client.ClientSession;
import org.bson.BsonDocument;
import org.bson.conversions.Bson;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

import java.util.concurrent.TimeUnit;
import java.util.function.Function;
Expand Down Expand Up @@ -201,10 +203,46 @@ public ReadOperationCursor<T> asReadOperation(final int initialBatchSize) {
if (inline) {
// initialBatchSize is ignored for map reduce operations.
return createMapReduceInlineOperation();
} else {
return new VoidWriteOperationThenCursorReadOperation<>(createMapReduceToCollectionOperation(),
createFindOperation(initialBatchSize));
}
throw new IllegalStateException("Non-inline map-reduce uses the write-then-read path; "
+ "asReadOperation must not be called.");
}

@Override
public Mono<BatchCursor<T>> batchCursor(final int initialBatchSize) {
if (inline) {
return super.batchCursor(initialBatchSize);
}
return writeThenReadBatchCursor(initialBatchSize);
}
Comment on lines +212 to +217
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a unit test here


@Override
public Publisher<T> first() {
if (inline) {
return super.first();
}
return writeThenReadBatchCursor(1)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this number 1 comes from BatchCursorPublisher

.flatMap(batchCursor -> {
batchCursor.setBatchSize(1);
return Mono.from(batchCursor.next())
.doOnTerminate(batchCursor::close)
.flatMap(results -> {
if (results == null || results.isEmpty()) {
return Mono.empty();
}
return Mono.fromCallable(() -> results.get(0));
});
});
}

private Mono<BatchCursor<T>> writeThenReadBatchCursor(final int initialBatchSize) {
return getMongoOperationPublisher()
.createWriteThenReadOperationMono(
operations -> operations.createTimeoutSettings(maxTimeMS),
() -> new VoidWriteOperationThenCursorReadOperation<>(createMapReduceToCollectionOperation(),
createFindOperation(initialBatchSize)),
getClientSession())
.map(BatchCursor::new);
}

private WrappedMapReduceReadOperation<T> createMapReduceInlineOperation() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
import com.mongodb.internal.TimeoutSettings;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.bulk.WriteRequest;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.operation.WriteThenReadOperationCursor;
import com.mongodb.internal.operation.IndexHelper;
import com.mongodb.internal.operation.Operations;
import com.mongodb.internal.operation.ReadOperation;
Expand Down Expand Up @@ -513,6 +515,15 @@ <T> Mono<T> createReadOperationMono(final Supplier<TimeoutSettings> timeoutSetti
.execute(readOperation, readPreference, getReadConcern(), clientSession);
}

<R> Mono<AsyncBatchCursor<R>> createWriteThenReadOperationMono(
final Function<Operations<?>, TimeoutSettings> timeoutSettingsFunction,
final Supplier<WriteThenReadOperationCursor<R>> operationSupplier,
@Nullable final ClientSession clientSession) {
WriteThenReadOperationCursor<R> operation = operationSupplier.get();
return getExecutor(timeoutSettingsFunction.apply(operations))
.execute(operation, getReadConcern(), clientSession);
}

<R> Mono<R> createWriteOperationMono(final Function<Operations<?>, TimeoutSettings> timeoutSettingsFunction,
final Supplier<WriteOperation<R>> operationSupplier, @Nullable final ClientSession clientSession) {
return createWriteOperationMono(() -> timeoutSettingsFunction.apply(operations), operationSupplier, clientSession);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import com.mongodb.ReadConcern;
import com.mongodb.ReadPreference;
import com.mongodb.internal.TimeoutSettings;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.operation.ReadOperation;
import com.mongodb.internal.operation.WriteOperation;
import com.mongodb.internal.operation.WriteThenReadOperationCursor;
import com.mongodb.lang.Nullable;
import com.mongodb.reactivestreams.client.ClientSession;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -54,6 +56,20 @@ <T> Mono<T> execute(ReadOperation<?, T> operation, ReadPreference readPreference
*/
<T> Mono<T> execute(WriteOperation<T> operation, ReadConcern readConcern, @Nullable ClientSession session);

/**
* Execute an operation that writes and then reads a cursor within a single read-write binding.
*
* <p>The binding is acquired once and used for both phases, avoiding the need to narrow an
* {@code AsyncReadBinding} to an {@code AsyncWriteBinding}.
*
* @param operation the write-then-read operation.
* @param readConcern the read concern
* @param session the session to associate this operation with
* @param <T> the document type returned by the cursor.
*/
<T> Mono<AsyncBatchCursor<T>> execute(WriteThenReadOperationCursor<T> operation,
ReadConcern readConcern, @Nullable ClientSession session);

/**
* Create a new OperationExecutor with a specific timeout settings
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@
import com.mongodb.internal.connection.ReadConcernAwareNoOpSessionContext;
import com.mongodb.internal.observability.micrometer.Span;
import com.mongodb.internal.observability.micrometer.TracingManager;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.operation.OperationHelper;
import com.mongodb.internal.operation.ReadOperation;
import com.mongodb.internal.operation.WriteOperation;
import com.mongodb.internal.operation.WriteThenReadOperationCursor;
import com.mongodb.lang.Nullable;
import com.mongodb.reactivestreams.client.ClientSession;
import com.mongodb.reactivestreams.client.ReactiveContextProvider;
Expand Down Expand Up @@ -179,6 +181,51 @@ public <T> Mono<T> execute(final WriteOperation<T> operation, final ReadConcern
);
}

@Override
public <T> Mono<AsyncBatchCursor<T>> execute(final WriteThenReadOperationCursor<T> operation, final ReadConcern readConcern,
@Nullable final ClientSession session) {
isTrue("open", !mongoClient.getCluster().isClosed());
notNull("operation", operation);
notNull("readConcern", readConcern);

Comment on lines +184 to +190
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added notifyOperationInitiated call , also implementationf of notifyOperationInitiated now also checked instance of AsyncWriteThenRead

if (session != null) {
session.notifyOperationInitiated(operation);
}

return Mono.from(subscriber ->
clientSessionHelper.withClientSession(session, this)
.flatMap(actualClientSession -> {
AsyncReadWriteBinding binding = getReadWriteBinding(primary(), actualClientSession, session == null);
RequestContext requestContext = getContext(subscriber);
OperationContext operationContext = getOperationContext(requestContext, actualClientSession, readConcern, operation.getCommandName())
.withSessionContext(new ClientSessionBinding.AsyncClientSessionContext(actualClientSession,
isImplicitSession(session), readConcern));
Span span = tracingManager.createOperationSpan(actualClientSession.getTransactionSpan(),
operationContext, operation.getCommandName(), operation.getNamespace());

return Mono.<AsyncBatchCursor<T>>create(sink -> operation.executeAsync(binding, operationContext, (result, t) -> {
try {
binding.release();
} finally {
if (t != null) {
Throwable exceptionToHandle = t instanceof MongoException
? OperationHelper.unwrap((MongoException) t) : t;
labelException(session, exceptionToHandle);
unpinServerAddressOnTransientTransactionError(session, exceptionToHandle);
if (span != null) {
span.error(t);
}
}
if (span != null) {
span.end();
}
sinkToCallback(sink).onResult(result, t);
}
}));
}).subscribe(subscriber)
);
}

@Override
public OperationExecutor withTimeoutSettings(final TimeoutSettings newTimeoutSettings) {
if (Objects.equals(timeoutSettings, newTimeoutSettings)) {
Expand Down
Loading