1+ import { setTimeout } from 'timers/promises' ;
2+
13import { MIN_SUPPORTED_SNAPSHOT_READS_WIRE_VERSION } from '../cmap/wire_protocol/constants' ;
24import {
35 isRetryableReadError ,
@@ -10,6 +12,7 @@ import {
1012 MongoInvalidArgumentError ,
1113 MongoNetworkError ,
1214 MongoNotConnectedError ,
15+ MongoOperationTimeoutError ,
1316 MongoRuntimeError ,
1417 MongoServerError ,
1518 MongoTransactionError ,
@@ -26,9 +29,16 @@ import {
2629import type { Topology } from '../sdam/topology' ;
2730import type { ClientSession } from '../sessions' ;
2831import { TimeoutContext } from '../timeout' ;
29- import { abortable , maxWireVersion , supportsRetryableWrites } from '../utils' ;
32+ import { RETRY_COST , TOKEN_REFRESH_RATE } from '../token_bucket' ;
33+ import {
34+ abortable ,
35+ ExponentialBackoffProvider ,
36+ maxWireVersion ,
37+ supportsRetryableWrites
38+ } from '../utils' ;
3039import { AggregateOperation } from './aggregate' ;
3140import { AbstractOperation , Aspect } from './operation' ;
41+ import { RunCommandOperation } from './run_command' ;
3242
3343const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES . IllegalOperation ;
3444const MMAPv1_RETRY_WRITES_ERROR_MESSAGE =
@@ -50,7 +60,7 @@ type ResultTypeFromOperation<TOperation extends AbstractOperation> = ReturnType<
5060 * The expectation is that this function:
5161 * - Connects the MongoClient if it has not already been connected, see {@link autoConnect}
5262 * - Creates a session if none is provided and cleans up the session it creates
53- * - Tries an operation and retries under certain conditions, see {@link tryOperation }
63+ * - Tries an operation and retries under certain conditions, see {@link executeOperationWithRetries }
5464 *
5565 * @typeParam T - The operation's type
5666 * @typeParam TResult - The type of the operation's result, calculated from T
@@ -120,7 +130,7 @@ export async function executeOperation<
120130 } ) ;
121131
122132 try {
123- return await tryOperation ( operation , {
133+ return await executeOperationWithRetries ( operation , {
124134 topology,
125135 timeoutContext,
126136 session,
@@ -183,8 +193,12 @@ type RetryOptions = {
183193 * @typeParam TResult - The type of the operation's result, calculated from T
184194 *
185195 * @param operation - The operation to execute
186- * */
187- async function tryOperation < T extends AbstractOperation , TResult = ResultTypeFromOperation < T > > (
196+ */
197+ // Note for Sergey: The rename here could be reverted - I just thought this was more descriptive.
198+ async function executeOperationWithRetries <
199+ T extends AbstractOperation ,
200+ TResult = ResultTypeFromOperation < T >
201+ > (
188202 operation : T ,
189203 { topology, timeoutContext, session, readPreference } : RetryOptions
190204) : Promise < TResult > {
@@ -233,33 +247,118 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
233247 session . incrementTransactionNumber ( ) ;
234248 }
235249
236- const maxTries = willRetry ? ( timeoutContext . csotEnabled ( ) ? Infinity : 2 ) : 1 ;
237- let previousOperationError : MongoError | undefined ;
238250 const deprioritizedServers = new DeprioritizedServers ( ) ;
251+ const backoffDelayProvider = new ExponentialBackoffProvider (
252+ 10_000 , // MAX_BACKOFF
253+ 100 , // base backoff
254+ 2 // backoff rate
255+ ) ;
256+
257+ let maxAttempts =
258+ typeof operation . maxAttempts === 'number'
259+ ? operation . maxAttempts
260+ : willRetry
261+ ? timeoutContext . csotEnabled ( )
262+ ? Infinity
263+ : 2
264+ : 1 ;
265+
266+ const shouldRetry =
267+ ( operation . hasAspect ( Aspect . READ_OPERATION ) && topology . s . options . retryReads ) ||
268+ ( ( operation . hasAspect ( Aspect . WRITE_OPERATION ) || operation instanceof RunCommandOperation ) &&
269+ topology . s . options . retryWrites ) ;
270+
271+ let error : MongoError | null = null ;
239272
240- for ( let tries = 0 ; tries < maxTries ; tries ++ ) {
241- if ( previousOperationError ) {
242- if ( hasWriteAspect && previousOperationError . code === MMAPv1_RETRY_WRITES_ERROR_CODE ) {
273+ for ( let attempt = 0 ; attempt < maxAttempts ; attempt ++ ) {
274+ operation . server = server ;
275+
276+ try {
277+ try {
278+ const result = await server . command ( operation , timeoutContext ) ;
279+ topology . tokenBucket . deposit (
280+ attempt > 0
281+ ? // on successful retry, deposit the retry cost + the refresh rate.
282+ TOKEN_REFRESH_RATE + RETRY_COST
283+ : // otherwise, just deposit the refresh rate.
284+ TOKEN_REFRESH_RATE
285+ ) ;
286+ return operation . handleOk ( result ) ;
287+ } catch ( error ) {
288+ return operation . handleError ( error ) ;
289+ }
290+
291+ // Note for Sergey: This ended up being a larger refactor than I anticipated. But it made more sense to me to put the error handling
292+ // that previously lived in the try-block into the catch-block.
293+ // The primary motivator for this change was to make error tracking simpler. The post failure but pre-retry logic needs access to both the
294+ // most recently encountered operation error _and_ the error we're storing to potentially throw, if we need to (because in some circumstances
295+ // the error raised from here is _not_ the most recently encountered operation error). If we chose to keep the previously existing structure,
296+ // this would force us to keep track of two errors outside of the for loop. By moving the pre-retry logic into the catch block, it gains
297+ // access to `operationError`, and still only requires keeping track of one error outside the loop.
298+ } catch ( operationError ) {
299+ // Should never happen but if it does - propagate the error.
300+ if ( ! ( operationError instanceof MongoError ) ) throw operationError ;
301+
302+ if ( ! operationError . hasErrorLabel ( MongoErrorLabel . SystemOverloadedError ) ) {
303+ // if an operation fails with an error that does not contain the SystemOverloadError, deposit 1 token.
304+ topology . tokenBucket . deposit ( RETRY_COST ) ;
305+ }
306+
307+ if ( error == null ) {
308+ error = operationError ;
309+ } else {
310+ if ( ! operationError . hasErrorLabel ( MongoErrorLabel . NoWritesPerformed ) ) {
311+ error = operationError ;
312+ }
313+ }
314+
315+ // Reset timeouts
316+ timeoutContext . clear ( ) ;
317+
318+ if ( hasWriteAspect && operationError . code === MMAPv1_RETRY_WRITES_ERROR_CODE ) {
243319 throw new MongoServerError ( {
244320 message : MMAPv1_RETRY_WRITES_ERROR_MESSAGE ,
245321 errmsg : MMAPv1_RETRY_WRITES_ERROR_MESSAGE ,
246- originalError : previousOperationError
322+ originalError : operationError
247323 } ) ;
248324 }
249325
250- if ( operation . hasAspect ( Aspect . COMMAND_BATCHING ) && ! operation . canRetryWrite ) {
251- throw previousOperationError ;
326+ if ( ! canRetry ( operation , operationError ) ) {
327+ throw error ;
252328 }
253329
254- if ( hasWriteAspect && ! isRetryableWriteError ( previousOperationError ) )
255- throw previousOperationError ;
330+ maxAttempts =
331+ shouldRetry && operationError . hasErrorLabel ( MongoErrorLabel . SystemOverloadedError )
332+ ? 6
333+ : maxAttempts ;
334+
335+ if ( attempt + 1 >= maxAttempts ) {
336+ throw error ;
337+ }
256338
257- if ( hasReadAspect && ! isRetryableReadError ( previousOperationError ) ) {
258- throw previousOperationError ;
339+ if ( operationError . hasErrorLabel ( MongoErrorLabel . SystemOverloadedError ) ) {
340+ if ( ! topology . tokenBucket . consume ( RETRY_COST ) ) {
341+ throw error ;
342+ }
343+
344+ const delayMS = backoffDelayProvider . getNextBackoffDuration ( ) ;
345+
346+ // if the delay would exhaust the CSOT timeout, short-circuit.
347+ if ( timeoutContext . csotEnabled ( ) && delayMS > timeoutContext . remainingTimeMS ) {
348+ // TODO: is this the right error to throw?
349+ throw new MongoOperationTimeoutError (
350+ `MongoDB SystemOverload exponential backoff would exceed timeoutMS deadline: remaining CSOT deadline=${ timeoutContext . remainingTimeMS } , backoff delayMS=${ delayMS } ` ,
351+ {
352+ cause : error
353+ }
354+ ) ;
355+ }
356+
357+ await setTimeout ( delayMS ) ;
259358 }
260359
261360 if (
262- previousOperationError instanceof MongoNetworkError &&
361+ operationError instanceof MongoNetworkError &&
263362 operation . hasAspect ( Aspect . CURSOR_CREATING ) &&
264363 session != null &&
265364 session . isPinned &&
@@ -268,52 +367,62 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
268367 session . unpin ( { force : true , forceClear : true } ) ;
269368 }
270369
370+ deprioritizedServers . add ( server . description ) ;
371+
271372 server = await topology . selectServer ( selector , {
272373 session,
273374 operationName : operation . commandName ,
274375 deprioritizedServers,
275376 signal : operation . options . signal
276377 } ) ;
277378
278- if ( hasWriteAspect && ! supportsRetryableWrites ( server ) ) {
379+ if (
380+ hasWriteAspect &&
381+ ! supportsRetryableWrites ( server ) &&
382+ ! operationError . hasErrorLabel ( MongoErrorLabel . SystemOverloadedError )
383+ ) {
279384 throw new MongoUnexpectedServerResponseError (
280385 'Selected server does not support retryable writes'
281386 ) ;
282387 }
283- }
284388
285- operation . server = server ;
286-
287- try {
288- // If tries > 0 and we are command batching we need to reset the batch.
289- if ( tries > 0 && operation . hasAspect ( Aspect . COMMAND_BATCHING ) ) {
389+ // Batched operations must reset the batch before retry,
390+ // otherwise building a command will build the _next_ batch, not the current batch.
391+ if ( operation . hasAspect ( Aspect . COMMAND_BATCHING ) ) {
290392 operation . resetBatch ( ) ;
291393 }
292-
293- try {
294- const result = await server . command ( operation , timeoutContext ) ;
295- return operation . handleOk ( result ) ;
296- } catch ( error ) {
297- return operation . handleError ( error ) ;
298- }
299- } catch ( operationError ) {
300- if ( ! ( operationError instanceof MongoError ) ) throw operationError ;
301- if (
302- previousOperationError != null &&
303- operationError . hasErrorLabel ( MongoErrorLabel . NoWritesPerformed )
304- ) {
305- throw previousOperationError ;
306- }
307- deprioritizedServers . add ( server . description ) ;
308- previousOperationError = operationError ;
309-
310- // Reset timeouts
311- timeoutContext . clear ( ) ;
312394 }
313395 }
314396
315397 throw (
316- previousOperationError ??
317- new MongoRuntimeError ( 'Tried to propagate retryability error, but no error was found.' )
398+ error ??
399+ new MongoRuntimeError (
400+ 'Should never happen: operation execution loop terminated but no error was recorded.'
401+ )
318402 ) ;
403+
404+ function canRetry ( operation : AbstractOperation , error : MongoError ) {
405+ // always retryable
406+ if (
407+ error . hasErrorLabel ( MongoErrorLabel . SystemOverloadedError ) &&
408+ error . hasErrorLabel ( MongoErrorLabel . RetryableError )
409+ ) {
410+ return true ;
411+ }
412+
413+ // run command is only retryable if we get retryable overload errors
414+ if ( operation instanceof RunCommandOperation ) {
415+ return false ;
416+ }
417+
418+ // batch operations are only retryable if the batch is retryable
419+ if ( operation . hasAspect ( Aspect . COMMAND_BATCHING ) ) {
420+ return operation . canRetryWrite ;
421+ }
422+
423+ return (
424+ ( hasWriteAspect && willRetryWrite && isRetryableWriteError ( error ) ) ||
425+ ( hasReadAspect && willRetryRead && isRetryableReadError ( error ) )
426+ ) ;
427+ }
319428}
0 commit comments