Skip to content

Commit 14ee245

Browse files
committed
feat(NODE-7142): make token bucket optional
1 parent 02f2b18 commit 14ee245

6 files changed

Lines changed: 159 additions & 7 deletions

File tree

src/connection_string.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -639,6 +639,10 @@ interface OptionDescriptor {
639639
}
640640

641641
export const OPTIONS = {
642+
adaptiveRetries: {
643+
default: false,
644+
type: 'boolean'
645+
},
642646
appName: {
643647
type: 'string'
644648
},

src/mongo_client.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,8 @@ export interface MongoClientOptions extends BSONSerializeOptions, SupportedNodeC
229229
retryReads?: boolean;
230230
/** Enable retryable writes. */
231231
retryWrites?: boolean;
232+
/** Whether to enable adaptive retry rate limiting using a token bucket. Defaults to false. */
233+
adaptiveRetries?: boolean;
232234
/** Allow a driver to force a Single topology type with a connection string containing one host */
233235
directConnection?: boolean;
234236
/** Instruct the driver it is connecting to a load balancer fronting a mongos like service */
@@ -1041,6 +1043,7 @@ export interface MongoOptions
10411043
extends Required<
10421044
Pick<
10431045
MongoClientOptions,
1046+
| 'adaptiveRetries'
10441047
| 'autoEncryption'
10451048
| 'connectTimeoutMS'
10461049
| 'directConnection'

src/operations/execute_operation.ts

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -266,11 +266,13 @@ async function executeOperationWithRetries<
266266
try {
267267
try {
268268
const result = await server.command(operation, timeoutContext);
269-
topology.tokenBucket.deposit(
270-
attempt > 0
271-
? RETRY_TOKEN_RETURN_RATE + RETRY_COST // on successful retry
272-
: RETRY_TOKEN_RETURN_RATE // otherwise
273-
);
269+
if (topology.s.options.adaptiveRetries) {
270+
topology.tokenBucket.deposit(
271+
attempt > 0
272+
? RETRY_TOKEN_RETURN_RATE + RETRY_COST // on successful retry
273+
: RETRY_TOKEN_RETURN_RATE // otherwise
274+
);
275+
}
274276
return operation.handleOk(result);
275277
} catch (error) {
276278
return operation.handleError(error);
@@ -279,7 +281,11 @@ async function executeOperationWithRetries<
279281
// Should never happen but if it does - propagate the error.
280282
if (!(operationError instanceof MongoError)) throw operationError;
281283

282-
if (attempt > 0 && !operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
284+
if (
285+
topology.s.options.adaptiveRetries &&
286+
attempt > 0 &&
287+
!operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)
288+
) {
283289
// if a retry attempt fails with a non-overload error, deposit 1 token.
284290
topology.tokenBucket.deposit(RETRY_COST);
285291
}
@@ -318,7 +324,7 @@ async function executeOperationWithRetries<
318324
}
319325

320326
if (operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
321-
if (!topology.tokenBucket.consume(RETRY_COST)) {
327+
if (topology.s.options.adaptiveRetries && !topology.tokenBucket.consume(RETRY_COST)) {
322328
throw error;
323329
}
324330

src/sdam/topology.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ export interface TopologyOptions extends BSONSerializeOptions, ServerOptions {
146146
hosts: HostAddress[];
147147
retryWrites: boolean;
148148
retryReads: boolean;
149+
adaptiveRetries: boolean;
149150
/** How long to block for server selection before throwing an error */
150151
serverSelectionTimeoutMS: number;
151152
/** The name of the replica set to connect to */

test/integration/client-backpressure/client-backpressure.prose.test.ts

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ import * as sinon from 'sinon';
44
import {
55
type Collection,
66
INITIAL_TOKEN_BUCKET_SIZE,
7+
MAX_RETRIES,
78
type MongoClient,
89
MongoServerError
910
} from '../../mongodb';
1011
import { clearFailPoint, configureFailPoint, measureDuration } from '../../tools/utils';
12+
import { filterForCommands } from '../shared';
1113

1214
describe('Client Backpressure (Prose)', function () {
1315
let client: MongoClient;
@@ -78,4 +80,121 @@ describe('Client Backpressure (Prose)', function () {
7880
// above DEFAULT_RETRY_TOKEN_CAPACITY.
7981
expect(tokenBucket).to.have.property('budget').that.is.at.most(INITIAL_TOKEN_BUCKET_SIZE);
8082
});
83+
84+
it(
85+
'Test 3: Overload Errors are Retried a Maximum of MAX_RETRIES times',
86+
{
87+
requires: {
88+
mongodb: '>=4.4'
89+
}
90+
},
91+
async function () {
92+
// 1. Let `client` be a `MongoClient` with command event monitoring enabled.
93+
const client = this.configuration.newClient({
94+
monitorCommands: true
95+
});
96+
await client.connect();
97+
98+
// 2. Let `coll` be a collection.
99+
const collection = client.db('foo').collection('bar');
100+
const commandsStarted = [];
101+
client.on('commandStarted', filterForCommands(['find'], commandsStarted));
102+
103+
/*
104+
* 3. Configure the following failpoint:
105+
{
106+
configureFailPoint: 'failCommand',
107+
mode: 'alwaysOn',
108+
data: {
109+
failCommands: ['find'],
110+
errorCode: 462, // IngressRequestRateLimitExceeded
111+
errorLabels: ['SystemOverloadedError', 'RetryableError']
112+
}
113+
}
114+
* */
115+
await configureFailPoint(this.configuration, {
116+
configureFailPoint: 'failCommand',
117+
mode: 'alwaysOn',
118+
data: {
119+
failCommands: ['find'],
120+
errorCode: 462,
121+
errorLabels: ['RetryableError', 'SystemOverloadedError']
122+
}
123+
});
124+
125+
// 4. Perform a find operation with `coll` that fails.
126+
const error = await collection.findOne({}).catch(e => e);
127+
128+
// 5. Assert that the raised error contains both the `RetryableError` and `SystemOverLoadedError` error labels.
129+
expect(error).to.be.instanceof(MongoServerError);
130+
expect(error.hasErrorLabel('RetryableError')).to.be.true;
131+
expect(error.hasErrorLabel('SystemOverLoadedError')).to.be.true;
132+
133+
// 6. Assert that the total number of started commands is MAX_RETRIES + 1 (6).
134+
expect(commandsStarted).to.have.length(MAX_RETRIES + 1);
135+
136+
await client.close();
137+
}
138+
);
139+
140+
it(
141+
'Test 4: Adaptive Retries are Limited by Token Bucket Tokens',
142+
{
143+
requires: {
144+
mongodb: '>=4.4'
145+
}
146+
},
147+
async function () {
148+
// 1. Let `client` be a `MongoClient` with `adaptiveRetries=True` and command event monitoring enabled.
149+
const client = this.configuration.newClient({
150+
adaptiveRetries: true,
151+
monitorCommands: true
152+
});
153+
await client.connect();
154+
155+
// 2. Set `client`'s retry token bucket to have 2 tokens.
156+
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
157+
client.topology!.tokenBucket['budget'] = 2;
158+
159+
// 3. Let `coll` be a collection.
160+
const collection = client.db('foo').collection('bar');
161+
const commandsStarted = [];
162+
client.on('commandStarted', filterForCommands(['find'], commandsStarted));
163+
164+
/*
165+
* 4. Configure the following failpoint:
166+
{
167+
configureFailPoint: 'failCommand',
168+
mode: {times: 3},
169+
data: {
170+
failCommands: ['find'],
171+
errorCode: 462, // IngressRequestRateLimitExceeded
172+
errorLabels: ['SystemOverloadedError', 'RetryableError']
173+
}
174+
}
175+
* */
176+
await configureFailPoint(this.configuration, {
177+
configureFailPoint: 'failCommand',
178+
mode: { times: 3 },
179+
data: {
180+
failCommands: ['find'],
181+
errorCode: 462,
182+
errorLabels: ['RetryableError', 'SystemOverloadedError']
183+
}
184+
});
185+
186+
// 5. Perform a find operation with `coll` that fails.
187+
const error = await collection.findOne({}).catch(e => e);
188+
189+
// 6. Assert that the raised error contains both the `RetryableError` and `SystemOverLoadedError` error labels.
190+
expect(error).to.be.instanceof(MongoServerError);
191+
expect(error.hasErrorLabel('RetryableError')).to.be.true;
192+
expect(error.hasErrorLabel('SystemOverLoadedError')).to.be.true;
193+
194+
// 7. Assert that the total number of started commands is 3: one for the initial attempt and two for the retries.
195+
expect(commandsStarted).to.have.length(3);
196+
197+
await client.close();
198+
}
199+
);
81200
});

test/unit/connection_string.test.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -889,4 +889,23 @@ describe('Connection String', function () {
889889
}
890890
});
891891
});
892+
893+
context('when adaptiveRetries is set', function () {
894+
it('defaults to false', function () {
895+
const options = parseOptions('mongodb://localhost:27017');
896+
expect(options.adaptiveRetries).to.equal(false);
897+
});
898+
899+
it('can be enabled via connection string', function () {
900+
const options = parseOptions('mongodb://localhost:27017?adaptiveRetries=true');
901+
expect(options.adaptiveRetries).to.equal(true);
902+
});
903+
904+
it('can be enabled via client options', function () {
905+
const options = parseOptions('mongodb://localhost:27017', {
906+
adaptiveRetries: true
907+
});
908+
expect(options.adaptiveRetries).to.equal(true);
909+
});
910+
});
892911
});

0 commit comments

Comments
 (0)