Skip to content

Commit f2b9750

Browse files
POC
1 parent cfb0bbd commit f2b9750

33 files changed

Lines changed: 12785 additions & 64 deletions

src/cmap/connect.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ export interface HandshakeDocument extends Document {
224224
compression: string[];
225225
saslSupportedMechs?: string;
226226
loadBalanced?: boolean;
227+
backpressure: true;
227228
}
228229

229230
/**
@@ -241,6 +242,7 @@ export async function prepareHandshakeDocument(
241242

242243
const handshakeDoc: HandshakeDocument = {
243244
[serverApi?.version || options.loadBalanced === true ? 'hello' : LEGACY_HELLO_COMMAND]: 1,
245+
backpressure: true,
244246
helloOk: true,
245247
client: clientMetadata,
246248
compression: compressors

src/cmap/connection.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -586,6 +586,15 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
586586
this.throwIfAborted();
587587
}
588588
} catch (error) {
589+
// Note for Sergey: when retrying a command with `startTransaction`, the spec says drivers must only mark the transaction
590+
// as in progress _after_ the server responds, regardless of the command's outcome.
591+
// Server errors are thrown from the try-block above _after_ updateSessionFromResponse is called. So, server errors already correctly update the
592+
// session's state.
593+
// Network errors, however, are thrown from the try-block from `sendWire()`, and as such bypass the call to `updateSessionFromResponse` above. So, we have to
594+
// handle updating sessions for non-server errors here.
595+
if (options.session != null && !(error instanceof MongoServerError)) {
596+
updateSessionFromResponse(options.session, MongoDBResponse.empty);
597+
}
589598
if (this.shouldEmitAndLogCommand) {
590599
this.emitAndLogCommand(
591600
this.monitorCommands,

src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ export {
8787
MongoWriteConcernError,
8888
WriteConcernErrorResult
8989
} from './error';
90+
export { TokenBucket } from './token_bucket';
9091
export {
9192
AbstractCursor,
9293
// Actual driver classes exported

src/operations/execute_operation.ts

Lines changed: 156 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { setTimeout } from 'timers/promises';
2+
13
import { MIN_SUPPORTED_SNAPSHOT_READS_WIRE_VERSION } from '../cmap/wire_protocol/constants';
24
import {
35
isRetryableReadError,
@@ -10,6 +12,7 @@ import {
1012
MongoInvalidArgumentError,
1113
MongoNetworkError,
1214
MongoNotConnectedError,
15+
MongoOperationTimeoutError,
1316
MongoRuntimeError,
1417
MongoServerError,
1518
MongoTransactionError,
@@ -26,9 +29,16 @@ import {
2629
import type { Topology } from '../sdam/topology';
2730
import type { ClientSession } from '../sessions';
2831
import { TimeoutContext } from '../timeout';
29-
import { abortable, maxWireVersion, supportsRetryableWrites } from '../utils';
32+
import { RETRY_COST, TOKEN_REFRESH_RATE } from '../token_bucket';
33+
import {
34+
abortable,
35+
ExponentialBackoffProvider,
36+
maxWireVersion,
37+
supportsRetryableWrites
38+
} from '../utils';
3039
import { AggregateOperation } from './aggregate';
3140
import { AbstractOperation, Aspect } from './operation';
41+
import { RunCommandOperation } from './run_command';
3242

3343
const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
3444
const MMAPv1_RETRY_WRITES_ERROR_MESSAGE =
@@ -50,7 +60,7 @@ type ResultTypeFromOperation<TOperation extends AbstractOperation> = ReturnType<
5060
* The expectation is that this function:
5161
* - Connects the MongoClient if it has not already been connected, see {@link autoConnect}
5262
* - Creates a session if none is provided and cleans up the session it creates
53-
* - Tries an operation and retries under certain conditions, see {@link tryOperation}
63+
* - Tries an operation and retries under certain conditions, see {@link executeOperationWithRetries}
5464
*
5565
* @typeParam T - The operation's type
5666
* @typeParam TResult - The type of the operation's result, calculated from T
@@ -120,7 +130,7 @@ export async function executeOperation<
120130
});
121131

122132
try {
123-
return await tryOperation(operation, {
133+
return await executeOperationWithRetries(operation, {
124134
topology,
125135
timeoutContext,
126136
session,
@@ -183,8 +193,12 @@ type RetryOptions = {
183193
* @typeParam TResult - The type of the operation's result, calculated from T
184194
*
185195
* @param operation - The operation to execute
186-
* */
187-
async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFromOperation<T>>(
196+
*/
197+
// Note for Sergey: The rename here could be reverted - I just thought this was more descriptive.
198+
async function executeOperationWithRetries<
199+
T extends AbstractOperation,
200+
TResult = ResultTypeFromOperation<T>
201+
>(
188202
operation: T,
189203
{ topology, timeoutContext, session, readPreference }: RetryOptions
190204
): Promise<TResult> {
@@ -233,33 +247,118 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
233247
session.incrementTransactionNumber();
234248
}
235249

236-
const maxTries = willRetry ? (timeoutContext.csotEnabled() ? Infinity : 2) : 1;
237-
let previousOperationError: MongoError | undefined;
238250
const deprioritizedServers = new DeprioritizedServers();
251+
const backoffDelayProvider = new ExponentialBackoffProvider(
252+
10_000, // MAX_BACKOFF
253+
100, // base backoff
254+
2 // backoff rate
255+
);
256+
257+
let maxAttempts =
258+
typeof operation.maxAttempts === 'number'
259+
? operation.maxAttempts
260+
: willRetry
261+
? timeoutContext.csotEnabled()
262+
? Infinity
263+
: 2
264+
: 1;
265+
266+
const shouldRetry =
267+
(operation.hasAspect(Aspect.READ_OPERATION) && topology.s.options.retryReads) ||
268+
((operation.hasAspect(Aspect.WRITE_OPERATION) || operation instanceof RunCommandOperation) &&
269+
topology.s.options.retryWrites);
270+
271+
let error: MongoError | null = null;
239272

240-
for (let tries = 0; tries < maxTries; tries++) {
241-
if (previousOperationError) {
242-
if (hasWriteAspect && previousOperationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
273+
for (let attempt = 0; attempt < maxAttempts; attempt++) {
274+
operation.server = server;
275+
276+
try {
277+
try {
278+
const result = await server.command(operation, timeoutContext);
279+
topology.tokenBucket.deposit(
280+
attempt > 0
281+
? // on successful retry, deposit the retry cost + the refresh rate.
282+
TOKEN_REFRESH_RATE + RETRY_COST
283+
: // otherwise, just deposit the refresh rate.
284+
TOKEN_REFRESH_RATE
285+
);
286+
return operation.handleOk(result);
287+
} catch (error) {
288+
return operation.handleError(error);
289+
}
290+
291+
// Note for Sergey: This ended up being a larger refactor than I anticipated. But it made more sense to me to put the error handling
292+
// that previously lived in the try-block into the catch-block.
293+
// The primary motivator for this change was to make error tracking simpler. The post failure but pre-retry logic needs access to both the
294+
// most recently encountered operation error _and_ the error we're storing to potentially throw, if we need to (because in some circumstances
295+
// the error raised from here is _not_ the most recently encountered operation error). If we chose to keep the previously existing structure,
296+
// this would force us to keep track of two errors outside of the for loop. By moving the pre-retry logic into the catch block, it gains
297+
// access to `operationError`, and still only requires keeping track of one error outside the loop.
298+
} catch (operationError) {
299+
// Should never happen but if it does - propagate the error.
300+
if (!(operationError instanceof MongoError)) throw operationError;
301+
302+
if (!operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
303+
// if an operation fails with an error that does not contain the SystemOverloadError, deposit 1 token.
304+
topology.tokenBucket.deposit(RETRY_COST);
305+
}
306+
307+
if (error == null) {
308+
error = operationError;
309+
} else {
310+
if (!operationError.hasErrorLabel(MongoErrorLabel.NoWritesPerformed)) {
311+
error = operationError;
312+
}
313+
}
314+
315+
// Reset timeouts
316+
timeoutContext.clear();
317+
318+
if (hasWriteAspect && operationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
243319
throw new MongoServerError({
244320
message: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
245321
errmsg: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
246-
originalError: previousOperationError
322+
originalError: operationError
247323
});
248324
}
249325

250-
if (operation.hasAspect(Aspect.COMMAND_BATCHING) && !operation.canRetryWrite) {
251-
throw previousOperationError;
326+
if (!canRetry(operation, operationError)) {
327+
throw error;
252328
}
253329

254-
if (hasWriteAspect && !isRetryableWriteError(previousOperationError))
255-
throw previousOperationError;
330+
maxAttempts =
331+
shouldRetry && operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)
332+
? 6
333+
: maxAttempts;
334+
335+
if (attempt + 1 >= maxAttempts) {
336+
throw error;
337+
}
256338

257-
if (hasReadAspect && !isRetryableReadError(previousOperationError)) {
258-
throw previousOperationError;
339+
if (operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
340+
if (!topology.tokenBucket.consume(RETRY_COST)) {
341+
throw error;
342+
}
343+
344+
const delayMS = backoffDelayProvider.getNextBackoffDuration();
345+
346+
// if the delay would exhaust the CSOT timeout, short-circuit.
347+
if (timeoutContext.csotEnabled() && delayMS > timeoutContext.remainingTimeMS) {
348+
// TODO: is this the right error to throw?
349+
throw new MongoOperationTimeoutError(
350+
`MongoDB SystemOverload exponential backoff would exceed timeoutMS deadline: remaining CSOT deadline=${timeoutContext.remainingTimeMS}, backoff delayMS=${delayMS}`,
351+
{
352+
cause: error
353+
}
354+
);
355+
}
356+
357+
await setTimeout(delayMS);
259358
}
260359

261360
if (
262-
previousOperationError instanceof MongoNetworkError &&
361+
operationError instanceof MongoNetworkError &&
263362
operation.hasAspect(Aspect.CURSOR_CREATING) &&
264363
session != null &&
265364
session.isPinned &&
@@ -268,52 +367,62 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
268367
session.unpin({ force: true, forceClear: true });
269368
}
270369

370+
deprioritizedServers.add(server.description);
371+
271372
server = await topology.selectServer(selector, {
272373
session,
273374
operationName: operation.commandName,
274375
deprioritizedServers,
275376
signal: operation.options.signal
276377
});
277378

278-
if (hasWriteAspect && !supportsRetryableWrites(server)) {
379+
if (
380+
hasWriteAspect &&
381+
!supportsRetryableWrites(server) &&
382+
!operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)
383+
) {
279384
throw new MongoUnexpectedServerResponseError(
280385
'Selected server does not support retryable writes'
281386
);
282387
}
283-
}
284388

285-
operation.server = server;
286-
287-
try {
288-
// If tries > 0 and we are command batching we need to reset the batch.
289-
if (tries > 0 && operation.hasAspect(Aspect.COMMAND_BATCHING)) {
389+
// Batched operations must reset the batch before retry,
390+
// otherwise building a command will build the _next_ batch, not the current batch.
391+
if (operation.hasAspect(Aspect.COMMAND_BATCHING)) {
290392
operation.resetBatch();
291393
}
292-
293-
try {
294-
const result = await server.command(operation, timeoutContext);
295-
return operation.handleOk(result);
296-
} catch (error) {
297-
return operation.handleError(error);
298-
}
299-
} catch (operationError) {
300-
if (!(operationError instanceof MongoError)) throw operationError;
301-
if (
302-
previousOperationError != null &&
303-
operationError.hasErrorLabel(MongoErrorLabel.NoWritesPerformed)
304-
) {
305-
throw previousOperationError;
306-
}
307-
deprioritizedServers.add(server.description);
308-
previousOperationError = operationError;
309-
310-
// Reset timeouts
311-
timeoutContext.clear();
312394
}
313395
}
314396

315397
throw (
316-
previousOperationError ??
317-
new MongoRuntimeError('Tried to propagate retryability error, but no error was found.')
398+
error ??
399+
new MongoRuntimeError(
400+
'Should never happen: operation execution loop terminated but no error was recorded.'
401+
)
318402
);
403+
404+
function canRetry(operation: AbstractOperation, error: MongoError) {
405+
// always retryable
406+
if (
407+
error.hasErrorLabel(MongoErrorLabel.SystemOverloadedError) &&
408+
error.hasErrorLabel(MongoErrorLabel.RetryableError)
409+
) {
410+
return true;
411+
}
412+
413+
// run command is only retryable if we get retryable overload errors
414+
if (operation instanceof RunCommandOperation) {
415+
return false;
416+
}
417+
418+
// batch operations are only retryable if the batch is retryable
419+
if (operation.hasAspect(Aspect.COMMAND_BATCHING)) {
420+
return operation.canRetryWrite;
421+
}
422+
423+
return (
424+
(hasWriteAspect && willRetryWrite && isRetryableWriteError(error)) ||
425+
(hasReadAspect && willRetryRead && isRetryableReadError(error))
426+
);
427+
}
319428
}

src/operations/operation.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,14 @@ export abstract class AbstractOperation<TResult = any> {
6666
/** Specifies the time an operation will run until it throws a timeout error. */
6767
timeoutMS?: number;
6868

69+
// Note for Sergey: sort of a hack, one that I planned to revisit before officially opening the PR for review.
70+
// For every operation _except_ transactions, all backpressure retries occur inside one `executeOperation` call. `commitTransaction`, however,
71+
// currently hard-codes two retry attempts, each with a `RunCommandOperation`. Neither retry attempt is retryable by itself.
72+
// Because we use two separate RunCommand operations, it's possible to retry up to 2 * MAX_BACKPRESSURE_RETRIES times, if the server is overloaded.
73+
// I hacked around it here by storing the max attempts for an operation on the operation, and sharing the value between both operations.
74+
// I don't have a solid idea of a better way to do this right now though.
75+
maxAttempts?: number;
76+
6977
private _session: ClientSession | undefined;
7078

7179
static aspects?: Set<symbol>;

src/sdam/topology.ts

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import { type Abortable, TypedEventEmitter } from '../mongo_types';
3535
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
3636
import type { ClientSession } from '../sessions';
3737
import { Timeout, TimeoutContext, TimeoutError } from '../timeout';
38+
import { TokenBucket } from '../token_bucket';
3839
import type { Transaction } from '../transactions';
3940
import {
4041
addAbortListener,
@@ -207,18 +208,16 @@ export type TopologyEvents = {
207208
* @internal
208209
*/
209210
export class Topology extends TypedEventEmitter<TopologyEvents> {
210-
/** @internal */
211211
s: TopologyPrivate;
212-
/** @internal */
213212
waitQueue: List<ServerSelectionRequest>;
214-
/** @internal */
215213
hello?: Document;
216-
/** @internal */
217214
_type?: string;
218215

216+
// Note for Sergey: expect this to change, and instead be stored on the server class.
217+
tokenBucket = new TokenBucket(1000);
218+
219219
client!: MongoClient;
220220

221-
/** @internal */
222221
private connectionLock?: Promise<Topology>;
223222

224223
/** @event */

0 commit comments

Comments
 (0)