Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/connection_string.ts
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,10 @@ interface OptionDescriptor {
}

export const OPTIONS = {
adaptiveRetries: {
default: false,
type: 'boolean'
},
appName: {
type: 'string'
},
Expand Down
3 changes: 3 additions & 0 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ export interface MongoClientOptions extends BSONSerializeOptions, SupportedNodeC
retryReads?: boolean;
/** Enable retryable writes. */
retryWrites?: boolean;
/** Whether to enable adaptive retry rate limiting using a token bucket. Defaults to false. */
adaptiveRetries?: boolean;
/** Allow a driver to force a Single topology type with a connection string containing one host */
directConnection?: boolean;
/** Instruct the driver it is connecting to a load balancer fronting a mongos like service */
Expand Down Expand Up @@ -1041,6 +1043,7 @@ export interface MongoOptions
extends Required<
Pick<
MongoClientOptions,
| 'adaptiveRetries'
| 'autoEncryption'
| 'connectTimeoutMS'
| 'directConnection'
Expand Down
26 changes: 16 additions & 10 deletions src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -266,11 +266,13 @@ async function executeOperationWithRetries<
try {
try {
const result = await server.command(operation, timeoutContext);
topology.tokenBucket.deposit(
attempt > 0
? RETRY_TOKEN_RETURN_RATE + RETRY_COST // on successful retry
: RETRY_TOKEN_RETURN_RATE // otherwise
);
if (topology.s.options.adaptiveRetries) {
topology.tokenBucket.deposit(
attempt > 0
? RETRY_TOKEN_RETURN_RATE + RETRY_COST // on successful retry
: RETRY_TOKEN_RETURN_RATE // otherwise
);
}
return operation.handleOk(result);
} catch (error) {
return operation.handleError(error);
Expand All @@ -279,7 +281,11 @@ async function executeOperationWithRetries<
// Should never happen but if it does - propagate the error.
if (!(operationError instanceof MongoError)) throw operationError;

if (attempt > 0 && !operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
if (
topology.s.options.adaptiveRetries &&
attempt > 0 &&
!operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)
) {
// if a retry attempt fails with a non-overload error, deposit 1 token.
topology.tokenBucket.deposit(RETRY_COST);
}
Expand Down Expand Up @@ -318,17 +324,17 @@ async function executeOperationWithRetries<
}

if (operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
if (!topology.tokenBucket.consume(RETRY_COST)) {
throw error;
}

const backoffMS = Math.random() * Math.min(MAX_BACKOFF_MS, BASE_BACKOFF_MS * 2 ** attempt);

// if the backoff would exhaust the CSOT timeout, short-circuit.
if (timeoutContext.csotEnabled() && backoffMS > timeoutContext.remainingTimeMS) {
throw error;
}

if (topology.s.options.adaptiveRetries && !topology.tokenBucket.consume(RETRY_COST)) {
throw error;
}

await setTimeout(backoffMS);
}

Expand Down
1 change: 1 addition & 0 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ export interface TopologyOptions extends BSONSerializeOptions, ServerOptions {
hosts: HostAddress[];
retryWrites: boolean;
retryReads: boolean;
adaptiveRetries: boolean;
/** How long to block for server selection before throwing an error */
serverSelectionTimeoutMS: number;
/** The name of the replica set to connect to */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import * as sinon from 'sinon';
import {
type Collection,
INITIAL_TOKEN_BUCKET_SIZE,
MAX_RETRIES,
type MongoClient,
MongoServerError
} from '../../mongodb';
import { clearFailPoint, configureFailPoint, measureDuration } from '../../tools/utils';
import { filterForCommands } from '../shared';

describe('Client Backpressure (Prose)', function () {
let client: MongoClient;
Expand Down Expand Up @@ -64,18 +66,141 @@ describe('Client Backpressure (Prose)', function () {
}
);

it('Test 2: Token Bucket capacity is Enforced', async () => {
// 1-2. Assert that the client's retry token bucket is at full capacity and that the capacity
// is DEFAULT_RETRY_TOKEN_CAPACITY.
it('Test 2: Token Bucket capacity is Enforced', async function () {
// 1. Let client be a MongoClient with adaptiveRetries=True.
const client = this.configuration.newClient({
Comment thread
dariakp marked this conversation as resolved.
Outdated
adaptiveRetries: true
});
await client.connect();

Comment thread
nbbeeken marked this conversation as resolved.
// 2. Assert that the client's retry token bucket is at full capacity and that the capacity is DEFAULT_RETRY_TOKEN_CAPACITY.
const tokenBucket = client.topology.tokenBucket;
expect(tokenBucket).to.have.property('budget', INITIAL_TOKEN_BUCKET_SIZE);
expect(tokenBucket).to.have.property('capacity', INITIAL_TOKEN_BUCKET_SIZE);

// 3. Execute a successful ping command.
// 3. Using client, execute a successful ping command.
await client.db('admin').command({ ping: 1 });

// 4. Assert that the successful command did not increase the number of tokens in the bucket
// above DEFAULT_RETRY_TOKEN_CAPACITY.
// 4. Assert that the successful command did not increase the number of tokens in the bucket above DEFAULT_RETRY_TOKEN_CAPACITY.
expect(tokenBucket).to.have.property('budget').that.is.at.most(INITIAL_TOKEN_BUCKET_SIZE);

await client.close();
});

it(
'Test 3: Overload Errors are Retried a Maximum of MAX_RETRIES times',
{
requires: {
mongodb: '>=4.4'
}
},
async function () {
// 1. Let `client` be a `MongoClient` with command event monitoring enabled.
const client = this.configuration.newClient({
monitorCommands: true
});
await client.connect();

Comment thread
nbbeeken marked this conversation as resolved.
// 2. Let `coll` be a collection.
const collection = client.db('foo').collection('bar');
const commandsStarted = [];
client.on('commandStarted', filterForCommands(['find'], commandsStarted));

/*
* 3. Configure the following failpoint:
{
configureFailPoint: 'failCommand',
mode: 'alwaysOn',
data: {
failCommands: ['find'],
errorCode: 462, // IngressRequestRateLimitExceeded
errorLabels: ['SystemOverloadedError', 'RetryableError']
}
}
* */
await configureFailPoint(this.configuration, {
configureFailPoint: 'failCommand',
mode: 'alwaysOn',
data: {
failCommands: ['find'],
errorCode: 462,
errorLabels: ['RetryableError', 'SystemOverloadedError']
}
});

// 4. Perform a find operation with `coll` that fails.
const error = await collection.findOne({}).catch(e => e);

// 5. Assert that the raised error contains both the `RetryableError` and `SystemOverloadedError` error labels.
expect(error).to.be.instanceof(MongoServerError);
expect(error.hasErrorLabel('RetryableError')).to.be.true;
expect(error.hasErrorLabel('SystemOverloadedError')).to.be.true;

// 6. Assert that the total number of started commands is MAX_RETRIES + 1 (6).
expect(commandsStarted).to.have.length(MAX_RETRIES + 1);

await client.close();
}
);

it(
'Test 4: Adaptive Retries are Limited by Token Bucket Tokens',
{
requires: {
mongodb: '>=4.4'
}
},
async function () {
// 1. Let `client` be a `MongoClient` with `adaptiveRetries=True` and command event monitoring enabled.
const client = this.configuration.newClient({
adaptiveRetries: true,
monitorCommands: true
});
await client.connect();

Comment thread
nbbeeken marked this conversation as resolved.
// 2. Set `client`'s retry token bucket to have 2 tokens.
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
client.topology!.tokenBucket['budget'] = 2;
Comment thread
nbbeeken marked this conversation as resolved.

// 3. Let `coll` be a collection.
const collection = client.db('foo').collection('bar');
const commandsStarted = [];
client.on('commandStarted', filterForCommands(['find'], commandsStarted));

/*
* 4. Configure the following failpoint:
{
configureFailPoint: 'failCommand',
mode: {times: 3},
data: {
failCommands: ['find'],
errorCode: 462, // IngressRequestRateLimitExceeded
errorLabels: ['SystemOverloadedError', 'RetryableError']
}
}
* */
await configureFailPoint(this.configuration, {
configureFailPoint: 'failCommand',
mode: { times: 3 },
data: {
failCommands: ['find'],
errorCode: 462,
errorLabels: ['RetryableError', 'SystemOverloadedError']
}
});

// 5. Perform a find operation with `coll` that fails.
const error = await collection.findOne({}).catch(e => e);

// 6. Assert that the raised error contains both the `RetryableError` and `SystemOverloadedError` error labels.
expect(error).to.be.instanceof(MongoServerError);
expect(error.hasErrorLabel('RetryableError')).to.be.true;
expect(error.hasErrorLabel('SystemOverloadedError')).to.be.true;

// 7. Assert that the total number of started commands is 3: one for the initial attempt and two for the retries.
expect(commandsStarted).to.have.length(3);

await client.close();
}
);
});
35 changes: 35 additions & 0 deletions test/spec/uri-options/client-backpressure-options.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{
"tests": [
{
"description": "adaptiveRetries=true is parsed correctly",
"uri": "mongodb://example.com/?adaptiveRetries=true",
"valid": true,
"warning": false,
"hosts": null,
"auth": null,
"options": {
"adaptiveRetries": true
}
},
{
"description": "adaptiveRetries=false is parsed correctly",
"uri": "mongodb://example.com/?adaptiveRetries=false",
"valid": true,
"warning": false,
"hosts": null,
"auth": null,
"options": {
"adaptiveRetries": false
}
},
{
"description": "adaptiveRetries with invalid value causes a warning",
"uri": "mongodb://example.com/?adaptiveRetries=invalid",
"valid": true,
"warning": true,
"hosts": null,
"auth": null,
"options": null
}
]
}
27 changes: 27 additions & 0 deletions test/spec/uri-options/client-backpressure-options.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
tests:
-
description: "adaptiveRetries=true is parsed correctly"
uri: "mongodb://example.com/?adaptiveRetries=true"
valid: true
warning: false
hosts: ~
auth: ~
options:
adaptiveRetries: true
-
description: "adaptiveRetries=false is parsed correctly"
uri: "mongodb://example.com/?adaptiveRetries=false"
valid: true
warning: false
hosts: ~
auth: ~
options:
adaptiveRetries: false
-
description: "adaptiveRetries with invalid value causes a warning"
uri: "mongodb://example.com/?adaptiveRetries=invalid"
valid: true
warning: true
hosts: ~
auth: ~
options: ~
1 change: 1 addition & 0 deletions test/tools/uri_spec_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ export function executeUriValidationTest(
case 'serverSelectionTimeoutMS':
case 'serverMonitoringMode':
case 'socketTimeoutMS':
case 'adaptiveRetries':
case 'retryWrites':
case 'directConnection':
case 'loadBalanced':
Expand Down
19 changes: 19 additions & 0 deletions test/unit/connection_string.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -889,4 +889,23 @@ describe('Connection String', function () {
}
});
});

context('when adaptiveRetries is set', function () {
it('defaults to false', function () {
const options = parseOptions('mongodb://localhost:27017');
expect(options.adaptiveRetries).to.equal(false);
});

it('can be enabled via connection string', function () {
const options = parseOptions('mongodb://localhost:27017?adaptiveRetries=true');
expect(options.adaptiveRetries).to.equal(true);
});

it('can be enabled via client options', function () {
const options = parseOptions('mongodb://localhost:27017', {
adaptiveRetries: true
});
expect(options.adaptiveRetries).to.equal(true);
});
});
});