Skip to content

Commit c0f9627

Browse files
authored
Refactoring for backpressure (#1952)
Refactoring - naming, log messages, internal docs - remove unused `binding` parameters from `MixedBulkWriteOperation` -`ClientBulkWriteOperation`: -- Document the `doWithRetriesDisabled*` methods, leave `TODO-JAVA-5956` to fix them. -- Add `ClientBulkWriteCommandOkResponse`, use it instead of a generic `BsonDocument`. -- Naming of callbacks. -- Code formatting. - remove unused parameters from `OperationHelper.canRetryWrite`/`canRetryRead` - remove the broken `shouldAttemptToRetryWriteAndAddRetryableLabel` method - remove the unused `RetryState.attempts` method, update docs JAVA-5956, JAVA-6117, JAVA-6113, JAVA-6119, JAVA-6141
1 parent edea711 commit c0f9627

13 files changed

Lines changed: 245 additions & 255 deletions

driver-core/src/main/com/mongodb/internal/async/function/RetryState.java

Lines changed: 22 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -56,19 +56,19 @@ public final class RetryState {
5656
private Throwable previouslyChosenException;
5757

5858
/**
59-
* Creates a {@code RetryState} with a positive number of allowed retries. {@link Integer#MAX_VALUE} is a special value interpreted as
60-
* being unlimited.
59+
* Creates a {@code RetryState} with a positive number of allowed retry attempts.
60+
* {@link Integer#MAX_VALUE} is a special value interpreted as being unlimited.
6161
* <p>
62-
* If a timeout is not specified in the {@link TimeoutContext#hasTimeoutMS()}, the specified {@code retries} param acts as a fallback
62+
* If a timeout is not specified in the {@link TimeoutContext#hasTimeoutMS()}, the specified {@code retries} argument acts as a fallback
6363
* bound. Otherwise, retries are unbounded until the timeout is reached.
6464
* <p>
6565
* It is possible to provide an additional {@code retryPredicate} in the {@link #doAdvanceOrThrow} method,
66-
* which can be used to stop retrying based on a custom condition additionally to {@code retires} and {@link TimeoutContext}.
66+
* which can be used to stop retrying based on a custom condition additionally to {@code retries} and {@link TimeoutContext}.
6767
* </p>
6868
*
69-
* @param retries A positive number of allowed retries. {@link Integer#MAX_VALUE} is a special value interpreted as being unlimited.
70-
* @param retryUntilTimeoutThrowsException If {@code true}, then if a {@link MongoOperationTimeoutException} is throws then retrying stops.
71-
* @see #attempts()
69+
* @param retries A positive number of allowed retry attempts.
70+
* {@link Integer#MAX_VALUE} is a special value interpreted as being unlimited.
71+
* @param retryUntilTimeoutThrowsException If {@code true}, then if a {@link MongoOperationTimeoutException} is thrown then retrying stops.
7272
*/
7373
public static RetryState withRetryableState(final int retries, final boolean retryUntilTimeoutThrowsException) {
7474
assertTrue(retries > 0);
@@ -80,24 +80,22 @@ public static RetryState withNonRetryableState() {
8080
}
8181

8282
/**
83-
* Creates a {@link RetryState} that does not limit the number of retries.
83+
* Creates a {@link RetryState} that does not limit the number of attempts.
8484
* The number of attempts is limited iff {@link TimeoutContext#hasTimeoutMS()} is true and timeout has expired.
8585
* <p>
8686
* It is possible to provide an additional {@code retryPredicate} in the {@link #doAdvanceOrThrow} method,
87-
* which can be used to stop retrying based on a custom condition additionally to {@code retires} and {@link TimeoutContext}.
87+
* which can be used to stop retrying based on a custom condition additionally to {@link TimeoutContext}.
8888
* </p>
8989
*
9090
* @param timeoutContext A timeout context that will be used to determine if the operation has timed out.
91-
* @see #attempts()
9291
*/
9392
public RetryState(final TimeoutContext timeoutContext) {
9493
this(INFINITE_ATTEMPTS, timeoutContext.hasTimeoutMS());
9594
}
9695

9796
/**
98-
* @param retries A non-negative number of allowed retries. {@link Integer#MAX_VALUE} is a special value interpreted as being unlimited.
99-
* @param retryUntilTimeoutThrowsException
100-
* @see #attempts()
97+
* @param retries A non-negative number of allowed retry attempts.
98+
* {@link Integer#MAX_VALUE} is a special value interpreted as being unlimited.
10199
*/
102100
private RetryState(final int retries, final boolean retryUntilTimeoutThrowsException) {
103101
assertTrue(retries >= 0);
@@ -108,7 +106,7 @@ private RetryState(final int retries, final boolean retryUntilTimeoutThrowsExcep
108106

109107
/**
110108
* Advances this {@link RetryState} such that it represents the state of a new attempt.
111-
* If there is at least one more {@linkplain #attempts() attempt} left, it is consumed by this method.
109+
* If there is at least one more attempt left, it is consumed by this method.
112110
* Must not be called before the {@linkplain #isFirstAttempt() first attempt}, must be called before each subsequent attempt.
113111
* <p>
114112
* This method is intended to be used by code that generally does not handle {@link Error}s explicitly,
@@ -147,7 +145,7 @@ private RetryState(final int retries, final boolean retryUntilTimeoutThrowsExcep
147145
* <li>the {@code retryPredicate} is {@code false}.</li>
148146
* </ul>
149147
* The exception thrown represents the failed result of the associated retryable activity,
150-
* i.e., the caller must not do any more attempts.
148+
* i.e., the caller must not make any more attempts.
151149
* @see #advanceOrThrow(Throwable, BinaryOperator, BiPredicate)
152150
*/
153151
void advanceOrThrow(final RuntimeException attemptException, final BinaryOperator<Throwable> onAttemptFailureOperator,
@@ -354,58 +352,43 @@ public void markAsLastAttempt() {
354352
}
355353

356354
/**
357-
* Returns {@code true} iff the current attempt is the first one, i.e., no retries have been made.
355+
* Returns {@code true} iff the current attempt is the first one, i.e., no retry attempts have been made.
358356
*
359-
* @see #attempts()
357+
* @see #attempt()
360358
*/
361359
public boolean isFirstAttempt() {
362360
return loopState.isFirstIteration();
363361
}
364362

365363
/**
366-
* Returns {@code true} iff the current attempt is known to be the last one, i.e., it is known that no more retries will be made.
364+
* Returns {@code true} iff the current attempt is known to be the last one, i.e., it is known that no more attempts will be made.
367365
* An attempt is known to be the last one iff any of the following applies:
368366
* <ul>
369367
* <li>{@link #breakAndThrowIfRetryAnd(Supplier)} / {@link #breakAndCompleteIfRetryAnd(Supplier, SingleResultCallback)} / {@link #markAsLastAttempt()} was called.</li>
370368
* <li>A timeout is set and has been reached.</li>
371-
* <li>No timeout is set, and the number of {@linkplain #attempts() attempts} is limited, and the current attempt is the last one.</li>
369+
* <li>No timeout is set, and the number of attempts is limited, and the current attempt is the last one.</li>
372370
* </ul>
373371
*
374-
* @see #attempts()
372+
* @see #attempt()
375373
*/
376374
public boolean isLastAttempt() {
377-
if (loopState.isLastIteration()){
375+
if (loopState.isLastIteration()) {
378376
return true;
379377
}
380378
if (retryUntilTimeoutThrowsException) {
381-
return false;
379+
return false;
382380
}
383381
return attempt() == attempts - 1;
384382
}
385383

386384
/**
387385
* A 0-based attempt number.
388386
*
389-
* @see #attempts()
390-
*/
391-
public int attempt() {
392-
return loopState.iteration();
393-
}
394-
395-
/**
396-
* Returns a positive maximum number of attempts:
397-
* <ul>
398-
* <li>0 if the number of retries is {@linkplain #RetryState(TimeoutContext) unlimited};</li>
399-
* <li>1 if no retries are allowed;</li>
400-
* <li>{@link #RetryState(int, boolean) retries} + 1 otherwise.</li>
401-
* </ul>
402-
*
403-
* @see #attempt()
404387
* @see #isFirstAttempt()
405388
* @see #isLastAttempt()
406389
*/
407-
public int attempts() {
408-
return attempts == INFINITE_ATTEMPTS ? 0 : attempts;
390+
public int attempt() {
391+
return loopState.iteration();
409392
}
410393

411394
/**

driver-core/src/main/com/mongodb/internal/operation/AsyncOperationHelper.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@
5353
import static com.mongodb.internal.operation.CommandOperationHelper.CommandCreator;
5454
import static com.mongodb.internal.operation.CommandOperationHelper.addRetryableWriteErrorLabel;
5555
import static com.mongodb.internal.operation.CommandOperationHelper.initialRetryState;
56-
import static com.mongodb.internal.operation.CommandOperationHelper.isRetryWritesEnabled;
57-
import static com.mongodb.internal.operation.CommandOperationHelper.logRetryExecute;
56+
import static com.mongodb.internal.operation.CommandOperationHelper.isRetryableWriteCommand;
57+
import static com.mongodb.internal.operation.CommandOperationHelper.logRetryCommand;
5858
import static com.mongodb.internal.operation.CommandOperationHelper.onRetryableReadAttemptFailure;
5959
import static com.mongodb.internal.operation.CommandOperationHelper.onRetryableWriteAttemptFailure;
6060
import static com.mongodb.internal.operation.CommandOperationHelper.transformWriteException;
@@ -199,9 +199,7 @@ static <D, T> void executeRetryableReadAsync(
199199
withAsyncSourceAndConnection(sourceAsyncFunction, false, operationContext, funcCallback,
200200
(source, connection, operationContextWithMinRtt, releasingCallback) -> {
201201
if (retryState.breakAndCompleteIfRetryAnd(
202-
() -> !OperationHelper.canRetryRead(source.getServerDescription(),
203-
operationContextWithMinRtt),
204-
releasingCallback)) {
202+
() -> !OperationHelper.canRetryRead(operationContextWithMinRtt), releasingCallback)) {
205203
return;
206204
}
207205
createReadCommandAndExecuteAsync(retryState, operationContextWithMinRtt, source, database,
@@ -278,8 +276,7 @@ static <T, R> void executeRetryableWriteAsync(
278276
? releasingCallback
279277
: addingRetryableLabelCallback(releasingCallback, maxWireVersion);
280278
if (retryState.breakAndCompleteIfRetryAnd(() ->
281-
!OperationHelper.canRetryWrite(connection.getDescription(), operationContextWithMinRtt.getSessionContext()),
282-
addingRetryableLabelCallback)) {
279+
!OperationHelper.canRetryWrite(connection.getDescription()), addingRetryableLabelCallback)) {
283280
return;
284281
}
285282
BsonDocument command;
@@ -292,9 +289,9 @@ static <T, R> void executeRetryableWriteAsync(
292289
operationContextWithMinRtt,
293290
source.getServerDescription(),
294291
connection.getDescription()));
295-
// attach `maxWireVersion`, `retryableCommandFlag` ASAP because they are used to check whether we should retry
292+
// attach `maxWireVersion`, `retryableWriteCommandFlag` ASAP because they are used to check whether we should retry
296293
retryState.attach(AttachmentKeys.maxWireVersion(), maxWireVersion, true)
297-
.attach(AttachmentKeys.retryableCommandFlag(), isRetryWritesEnabled(command), true)
294+
.attach(AttachmentKeys.retryableWriteCommandFlag(), isRetryableWriteCommand(command), true)
298295
.attach(AttachmentKeys.commandDescriptionSupplier(), command::getFirstKey, false)
299296
.attach(AttachmentKeys.command(), command, false);
300297
} catch (Throwable t) {
@@ -335,8 +332,8 @@ static <D, T> void createReadCommandAndExecuteAsync(
335332
static <R> AsyncCallbackSupplier<R> decorateReadWithRetriesAsync(final RetryState retryState, final OperationContext operationContext,
336333
final AsyncCallbackSupplier<R> asyncReadFunction) {
337334
return new RetryingAsyncCallbackSupplier<>(retryState, onRetryableReadAttemptFailure(operationContext),
338-
CommandOperationHelper::shouldAttemptToRetryRead, callback -> {
339-
logRetryExecute(retryState, operationContext);
335+
CommandOperationHelper::loggingShouldAttemptToRetryRead, callback -> {
336+
logRetryCommand(retryState, operationContext);
340337
asyncReadFunction.get(callback);
341338
});
342339
}
@@ -345,7 +342,7 @@ static <R> AsyncCallbackSupplier<R> decorateWriteWithRetriesAsync(final RetrySta
345342
final AsyncCallbackSupplier<R> asyncWriteFunction) {
346343
return new RetryingAsyncCallbackSupplier<>(retryState, onRetryableWriteAttemptFailure(operationContext),
347344
CommandOperationHelper::loggingShouldAttemptToRetryWriteAndAddRetryableLabel, callback -> {
348-
logRetryExecute(retryState, operationContext);
345+
logRetryCommand(retryState, operationContext);
349346
asyncWriteFunction.get(callback);
350347
});
351348
}

0 commit comments

Comments
 (0)