Skip to content
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/cmap/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ export interface HandshakeDocument extends Document {
compression: string[];
saslSupportedMechs?: string;
loadBalanced?: boolean;
backpressure: true;
}

/**
Expand All @@ -241,6 +242,7 @@ export async function prepareHandshakeDocument(

const handshakeDoc: HandshakeDocument = {
[serverApi?.version || options.loadBalanced === true ? 'hello' : LEGACY_HELLO_COMMAND]: 1,
backpressure: true,
helloOk: true,
client: clientMetadata,
compression: compressors
Expand Down
3 changes: 3 additions & 0 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,9 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
this.throwIfAborted();
}
} catch (error) {
if (options.session != null && !(error instanceof MongoServerError)) {
updateSessionFromResponse(options.session, MongoDBResponse.empty);
Comment thread
PavelSafronov marked this conversation as resolved.
}
if (this.shouldEmitAndLogCommand) {
this.emitAndLogCommand(
this.monitorCommands,
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,7 @@ export type {
TimeoutContext,
TimeoutContextOptions
} from './timeout';
export type { TokenBucket } from './token_bucket';
Comment thread
PavelSafronov marked this conversation as resolved.
export type { Transaction, TransactionOptions, TxnState } from './transactions';
export type {
BufferPool,
Expand Down
176 changes: 130 additions & 46 deletions src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { setTimeout } from 'timers/promises';

import { MIN_SUPPORTED_SNAPSHOT_READS_WIRE_VERSION } from '../cmap/wire_protocol/constants';
import {
isRetryableReadError,
Expand Down Expand Up @@ -26,9 +28,17 @@ import {
import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import { TimeoutContext } from '../timeout';
import {
BASE_BACKOFF_MS,
MAX_BACKOFF_MS,
MAX_RETRIES,
RETRY_COST,
RETRY_TOKEN_RETURN_RATE
} from '../token_bucket';
import { abortable, maxWireVersion, supportsRetryableWrites } from '../utils';
import { AggregateOperation } from './aggregate';
import { AbstractOperation, Aspect } from './operation';
import { RunCommandOperation } from './run_command';

const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
const MMAPv1_RETRY_WRITES_ERROR_MESSAGE =
Expand All @@ -50,7 +60,7 @@ type ResultTypeFromOperation<TOperation extends AbstractOperation> = ReturnType<
* The expectation is that this function:
* - Connects the MongoClient if it has not already been connected, see {@link autoConnect}
* - Creates a session if none is provided and cleans up the session it creates
* - Tries an operation and retries under certain conditions, see {@link tryOperation}
* - Tries an operation and retries under certain conditions, see {@link executeOperationWithRetries}
*
* @typeParam T - The operation's type
* @typeParam TResult - The type of the operation's result, calculated from T
Expand Down Expand Up @@ -120,7 +130,7 @@ export async function executeOperation<
});

try {
return await tryOperation(operation, {
return await executeOperationWithRetries(operation, {
topology,
timeoutContext,
session,
Expand Down Expand Up @@ -183,8 +193,11 @@ type RetryOptions = {
* @typeParam TResult - The type of the operation's result, calculated from T
*
* @param operation - The operation to execute
* */
async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFromOperation<T>>(
*/
async function executeOperationWithRetries<
T extends AbstractOperation,
TResult = ResultTypeFromOperation<T>
>(
operation: T,
{ topology, timeoutContext, session, readPreference }: RetryOptions
): Promise<TResult> {
Expand Down Expand Up @@ -233,33 +246,94 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
session.incrementTransactionNumber();
}

const maxTries = willRetry ? (timeoutContext.csotEnabled() ? Infinity : 2) : 1;
let previousOperationError: MongoError | undefined;
const deprioritizedServers = new DeprioritizedServers();

for (let tries = 0; tries < maxTries; tries++) {
if (previousOperationError) {
if (hasWriteAspect && previousOperationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
let maxAttempts =
typeof operation.maxAttempts === 'number'
? operation.maxAttempts
: willRetry
? timeoutContext.csotEnabled()
? Infinity
: 2
: 1;

let error: MongoError | null = null;

for (let attempt = 0; attempt < maxAttempts; attempt++) {
operation.attemptsMade = attempt + 1;
operation.server = server;

try {
try {
const result = await server.command(operation, timeoutContext);
topology.tokenBucket.deposit(
attempt > 0
? RETRY_TOKEN_RETURN_RATE + RETRY_COST // on successful retry
: RETRY_TOKEN_RETURN_RATE // otherwise
);
return operation.handleOk(result);
} catch (error) {
return operation.handleError(error);
}
} catch (operationError) {
// Should never happen but if it does - propagate the error.
if (!(operationError instanceof MongoError)) throw operationError;

if (attempt > 0 && !operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
// if a retry attempt fails with a non-overload error, deposit 1 token.
topology.tokenBucket.deposit(RETRY_COST);
}

// Preserve the original error once a write has been performed.
// Only update to the latest error if no writes were performed.
if (error == null) {
error = operationError;
} else {
if (!operationError.hasErrorLabel(MongoErrorLabel.NoWritesPerformed)) {
error = operationError;
}
}
Comment thread
nbbeeken marked this conversation as resolved.

// Reset timeouts
timeoutContext.clear();

if (hasWriteAspect && operationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
throw new MongoServerError({
message: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
errmsg: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
originalError: previousOperationError
originalError: operationError
});
}

if (operation.hasAspect(Aspect.COMMAND_BATCHING) && !operation.canRetryWrite) {
throw previousOperationError;
if (!canRetry(operation, operationError)) {
throw error;
}

if (operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
maxAttempts = Math.min(MAX_RETRIES + 1, operation.maxAttempts ?? MAX_RETRIES + 1);
}

if (hasWriteAspect && !isRetryableWriteError(previousOperationError))
throw previousOperationError;
if (attempt + 1 >= maxAttempts) {
throw error;
}

if (operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
if (!topology.tokenBucket.consume(RETRY_COST)) {
throw error;
}

const backoffMS = Math.random() * Math.min(MAX_BACKOFF_MS, BASE_BACKOFF_MS * 2 ** attempt);

// if the backoff would exhaust the CSOT timeout, short-circuit.
if (timeoutContext.csotEnabled() && backoffMS > timeoutContext.remainingTimeMS) {
throw error;
}

if (hasReadAspect && !isRetryableReadError(previousOperationError)) {
throw previousOperationError;
await setTimeout(backoffMS);
}

if (
previousOperationError instanceof MongoNetworkError &&
operationError instanceof MongoNetworkError &&
operation.hasAspect(Aspect.CURSOR_CREATING) &&
session != null &&
session.isPinned &&
Expand All @@ -268,52 +342,62 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
session.unpin({ force: true, forceClear: true });
}

deprioritizedServers.add(server.description);

server = await topology.selectServer(selector, {
session,
operationName: operation.commandName,
deprioritizedServers,
signal: operation.options.signal
});

if (hasWriteAspect && !supportsRetryableWrites(server)) {
if (
hasWriteAspect &&
!supportsRetryableWrites(server) &&
!operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)
) {
throw new MongoUnexpectedServerResponseError(
'Selected server does not support retryable writes'
);
}
}

operation.server = server;

try {
// If tries > 0 and we are command batching we need to reset the batch.
if (tries > 0 && operation.hasAspect(Aspect.COMMAND_BATCHING)) {
// Batched operations must reset the batch before retry,
// otherwise building a command will build the _next_ batch, not the current batch.
if (operation.hasAspect(Aspect.COMMAND_BATCHING)) {
operation.resetBatch();
}

try {
const result = await server.command(operation, timeoutContext);
return operation.handleOk(result);
} catch (error) {
return operation.handleError(error);
}
} catch (operationError) {
if (!(operationError instanceof MongoError)) throw operationError;
if (
previousOperationError != null &&
operationError.hasErrorLabel(MongoErrorLabel.NoWritesPerformed)
) {
throw previousOperationError;
}
deprioritizedServers.add(server.description);
previousOperationError = operationError;

// Reset timeouts
timeoutContext.clear();
}
}

throw (
previousOperationError ??
new MongoRuntimeError('Tried to propagate retryability error, but no error was found.')
error ??
new MongoRuntimeError(
'Should never happen: operation execution loop terminated but no error was recorded.'
)
);

function canRetry(operation: AbstractOperation, error: MongoError) {
Comment thread
PavelSafronov marked this conversation as resolved.
// always retryable
if (
error.hasErrorLabel(MongoErrorLabel.SystemOverloadedError) &&
error.hasErrorLabel(MongoErrorLabel.RetryableError)
) {
return true;
}

// run command is only retryable if we get retryable overload errors
if (operation instanceof RunCommandOperation) {
return false;
}

// batch operations are only retryable if the batch is retryable
if (operation.hasAspect(Aspect.COMMAND_BATCHING)) {
return operation.canRetryWrite && isRetryableWriteError(error);
}

return (
(hasWriteAspect && willRetryWrite && isRetryableWriteError(error)) ||
(hasReadAspect && willRetryRead && isRetryableReadError(error))
);
}
}
8 changes: 8 additions & 0 deletions src/operations/operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ export abstract class AbstractOperation<TResult = any> {
/** Specifies the time an operation will run until it throws a timeout error. */
timeoutMS?: number;

/** Used by commitTransaction to share the retry budget across two executeOperation calls. */
maxAttempts?: number;

/** Tracks how many attempts were made in the last executeOperation call. */
attemptsMade: number;
Comment thread
PavelSafronov marked this conversation as resolved.

private _session: ClientSession | undefined;

static aspects?: Set<symbol>;
Expand All @@ -82,6 +88,8 @@ export abstract class AbstractOperation<TResult = any> {

this.options = options;
this.bypassPinningCheck = !!options.bypassPinningCheck;

this.attemptsMade = 0;
}

/** Must match the first key of the command object sent to the server.
Expand Down
19 changes: 12 additions & 7 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import { type Abortable, TypedEventEmitter } from '../mongo_types';
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import type { ClientSession } from '../sessions';
import { Timeout, TimeoutContext, TimeoutError } from '../timeout';
import { INITIAL_TOKEN_BUCKET_SIZE, TokenBucket } from '../token_bucket';
import type { Transaction } from '../transactions';
import {
addAbortListener,
Expand Down Expand Up @@ -207,18 +208,16 @@ export type TopologyEvents = {
* @internal
*/
export class Topology extends TypedEventEmitter<TopologyEvents> {
/** @internal */
Comment thread
PavelSafronov marked this conversation as resolved.
s: TopologyPrivate;
/** @internal */
waitQueue: List<ServerSelectionRequest>;
/** @internal */
hello?: Document;
/** @internal */
_type?: string;

/** @internal */
tokenBucket = new TokenBucket(INITIAL_TOKEN_BUCKET_SIZE);

client!: MongoClient;

/** @internal */
private connectionLock?: Promise<Topology>;

/** @event */
Expand Down Expand Up @@ -595,7 +594,11 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
)
);
}
if (options.timeoutContext?.clearServerSelectionTimeout) timeout?.clear();

if (!options.timeoutContext || options.timeoutContext.clearServerSelectionTimeout) {
timeout?.clear();
}
Comment thread
nbbeeken marked this conversation as resolved.

return transaction.server;
}

Expand Down Expand Up @@ -666,7 +669,9 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
throw error;
} finally {
abortListener?.[kDispose]();
if (options.timeoutContext?.clearServerSelectionTimeout) timeout?.clear();
if (!options.timeoutContext || options.timeoutContext.clearServerSelectionTimeout) {
timeout?.clear();
}
}
}
/**
Expand Down
Loading