Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/connection_string.ts
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ interface OptionDescriptor {
}

export const OPTIONS = {
adaptiveRetries: {
enableOverloadRetargeting: {
default: false,
type: 'boolean'
},
Expand Down Expand Up @@ -885,6 +885,10 @@ export const OPTIONS = {
default: 15,
type: 'uint'
},
maxAdaptiveRetries: {
default: 2,
type: 'uint'
},
maxConnecting: {
default: 2,
transform({ name, values: [value] }): number {
Expand Down
1 change: 0 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,6 @@ export type {
TimeoutContext,
TimeoutContextOptions
} from './timeout';
export type { MAX_RETRIES, TokenBucket } from './token_bucket';
export type { Transaction, TransactionOptions, TxnState } from './transactions';
export type {
BufferPool,
Expand Down
9 changes: 6 additions & 3 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,10 @@ export interface MongoClientOptions extends BSONSerializeOptions, SupportedNodeC
retryReads?: boolean;
/** Enable retryable writes. */
retryWrites?: boolean;
/** Whether to enable adaptive retry rate limiting using a token bucket. Defaults to false. */
adaptiveRetries?: boolean;
/** The maximum number of overload retries as described by the https://mongodb.com/docs/atlas/overload-errors. Set to 0 to disable overload retries. Defaults to 2. */
Comment thread
dariakp marked this conversation as resolved.
Outdated
Comment thread
dariakp marked this conversation as resolved.
Outdated
maxAdaptiveRetries?: number;
/** Whether or not to enable overload retargeting as described by the https://mongodb.com/docs/atlas/overload-errors. Defaults to false. */
Comment thread
dariakp marked this conversation as resolved.
Outdated
enableOverloadRetargeting?: boolean;
Comment thread
dariakp marked this conversation as resolved.
/** Allow a driver to force a Single topology type with a connection string containing one host */
directConnection?: boolean;
/** Instruct the driver it is connecting to a load balancer fronting a mongos like service */
Expand Down Expand Up @@ -1043,7 +1045,8 @@ export interface MongoOptions
extends Required<
Pick<
MongoClientOptions,
| 'adaptiveRetries'
| 'maxAdaptiveRetries'
| 'enableOverloadRetargeting'
| 'autoEncryption'
| 'connectTimeoutMS'
| 'directConnection'
Expand Down
37 changes: 8 additions & 29 deletions src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,6 @@ import {
import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import { TimeoutContext } from '../timeout';
import {
BASE_BACKOFF_MS,
MAX_BACKOFF_MS,
MAX_RETRIES,
RETRY_COST,
RETRY_TOKEN_RETURN_RATE
} from '../token_bucket';
import { abortable, maxWireVersion, supportsRetryableWrites } from '../utils';
import { AggregateOperation } from './aggregate';
import { AbstractOperation, Aspect } from './operation';
Expand Down Expand Up @@ -176,6 +169,10 @@ type RetryOptions = {
topology: Topology;
timeoutContext: TimeoutContext;
};
/** @internal The base backoff duration in milliseconds */
const BASE_BACKOFF_MS = 100;
/** @internal The maximum backoff duration in milliseconds */
const MAX_BACKOFF_MS = 10_000;

/**
* Executes an operation and retries as appropriate
Expand Down Expand Up @@ -267,13 +264,6 @@ async function executeOperationWithRetries<
try {
try {
const result = await server.command(operation, timeoutContext);
if (topology.s.options.adaptiveRetries) {
topology.tokenBucket.deposit(
attempt > 0
? RETRY_TOKEN_RETURN_RATE + RETRY_COST // on successful retry
: RETRY_TOKEN_RETURN_RATE // otherwise
);
}
return operation.handleOk(result);
} catch (error) {
return operation.handleError(error);
Expand All @@ -282,15 +272,6 @@ async function executeOperationWithRetries<
// Should never happen but if it does - propagate the error.
if (!(operationError instanceof MongoError)) throw operationError;

if (
topology.s.options.adaptiveRetries &&
attempt > 0 &&
!operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)
) {
// if a retry attempt fails with a non-overload error, deposit 1 token.
topology.tokenBucket.deposit(RETRY_COST);
}

// Preserve the original error once a write has been performed.
// Only update to the latest error if no writes were performed.
if (error == null) {
Expand All @@ -317,7 +298,8 @@ async function executeOperationWithRetries<
}

if (operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
maxAttempts = Math.min(MAX_RETRIES + 1, operation.maxAttempts ?? MAX_RETRIES + 1);
const maxOverloadAttempts = topology.s.options.maxAdaptiveRetries + 1;
maxAttempts = Math.min(maxOverloadAttempts, operation.maxAttempts ?? maxOverloadAttempts);
}

if (attempt + 1 >= maxAttempts) {
Expand Down Expand Up @@ -352,16 +334,13 @@ async function executeOperationWithRetries<
throw error;
}

if (topology.s.options.adaptiveRetries && !topology.tokenBucket.consume(RETRY_COST)) {
throw error;
}

await setTimeout(backoffMS);
}

if (
topology.description.type === TopologyType.Sharded ||
operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)
(operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError) &&
topology.s.options.enableOverloadRetargeting)
) {
deprioritizedServers.add(server.description);
}
Expand Down
6 changes: 2 additions & 4 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import { type Abortable, TypedEventEmitter } from '../mongo_types';
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import type { ClientSession } from '../sessions';
import { Timeout, TimeoutContext, TimeoutError } from '../timeout';
import { INITIAL_TOKEN_BUCKET_SIZE, TokenBucket } from '../token_bucket';
import type { Transaction } from '../transactions';
import {
addAbortListener,
Expand Down Expand Up @@ -146,7 +145,8 @@ export interface TopologyOptions extends BSONSerializeOptions, ServerOptions {
hosts: HostAddress[];
retryWrites: boolean;
retryReads: boolean;
adaptiveRetries: boolean;
maxAdaptiveRetries: number;
enableOverloadRetargeting: boolean;
/** How long to block for server selection before throwing an error */
serverSelectionTimeoutMS: number;
/** The name of the replica set to connect to */
Expand Down Expand Up @@ -214,8 +214,6 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
hello?: Document;
_type?: string;

tokenBucket = new TokenBucket(INITIAL_TOKEN_BUCKET_SIZE);

client!: MongoClient;

private connectionLock?: Promise<Topology>;
Expand Down
5 changes: 2 additions & 3 deletions src/sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import { ReadConcernLevel } from './read_concern';
import { ReadPreference } from './read_preference';
import { _advanceClusterTime, type ClusterTime, TopologyType } from './sdam/common';
import { TimeoutContext } from './timeout';
import { MAX_RETRIES } from './token_bucket';
import {
isTransactionCommand,
Transaction,
Expand Down Expand Up @@ -498,7 +497,7 @@ export class ClientSession
readPreference: ReadPreference.primary,
bypassPinningCheck: true
});
operation.maxAttempts = MAX_RETRIES + 1;
operation.maxAttempts = this.clientOptions.maxAdaptiveRetries + 1;

const timeoutContext =
this.timeoutContext ??
Expand All @@ -517,7 +516,7 @@ export class ClientSession
} catch (firstCommitError) {
this.commitAttempted = true;

const remainingAttempts = MAX_RETRIES + 1 - operation.attemptsMade;
const remainingAttempts = this.clientOptions.maxAdaptiveRetries + 1 - operation.attemptsMade;
if (remainingAttempts <= 0) {
throw firstCommitError;
}
Expand Down
54 changes: 0 additions & 54 deletions src/token_bucket.ts

This file was deleted.

Loading
Loading