Skip to content

Commit 7ba7426

Browse files
refactor + retry writes error logic
1 parent 0363694 commit 7ba7426

2 files changed

Lines changed: 165 additions & 61 deletions

File tree

src/operations/execute_operation.ts

Lines changed: 67 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -246,9 +246,7 @@ async function executeOperationWithRetries<
246246
session.incrementTransactionNumber();
247247
}
248248

249-
let previousOperationError: MongoError | undefined;
250249
const deprioritizedServers = new DeprioritizedServers();
251-
252250
const backoffDelayProvider = new ExponentialBackoffProvider(
253251
10_000, // MAX_BACKOFF
254252
100, // base backoff
@@ -257,40 +255,86 @@ async function executeOperationWithRetries<
257255

258256
let maxAttempts =
259257
(operation.maxAttempts ?? willRetry) ? (timeoutContext.csotEnabled() ? Infinity : 2) : 1;
258+
260259
const shouldRetry = operation.hasAspect(Aspect.READ_OPERATION) && topology.s.options.retryReads || (operation.hasAspect(Aspect.WRITE_OPERATION) || operation instanceof RunCommandOperation) && topology.s.options.retryWrites;
261260

261+
let error: MongoError | null = null;
262+
262263
for (
263264
let attempt = 0;
264265
attempt < maxAttempts;
265-
attempt++,
266-
maxAttempts = shouldRetry && previousOperationError?.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)
267-
? 6
268-
: maxAttempts
266+
attempt++
269267
) {
270-
if (previousOperationError) {
271-
if (hasWriteAspect && previousOperationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
268+
269+
operation.server = server;
270+
271+
try {
272+
const isRetry = attempt > 0;
273+
274+
try {
275+
const result = await server.command(operation, timeoutContext);
276+
topology.tokenBucket.deposit(
277+
isRetry
278+
? // on successful retry, deposit the retry cost + the refresh rate.
279+
TOKEN_REFRESH_RATE + RETRY_COST
280+
: // otherwise, just deposit the refresh rate.
281+
TOKEN_REFRESH_RATE
282+
);
283+
return operation.handleOk(result);
284+
} catch (error) {
285+
return operation.handleError(error);
286+
}
287+
} catch (operationError) {
288+
// Should never happen but if it does - propragate the error.
289+
if (!(operationError instanceof MongoError)) throw operationError;
290+
291+
if (!operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
292+
// if an operation fails with an error that does not contain the SystemOverloadError, deposit 1 token.
293+
topology.tokenBucket.deposit(RETRY_COST);
294+
}
295+
296+
if (error == null) {
297+
error = operationError;
298+
} else {
299+
if (!operationError.hasErrorLabel(MongoErrorLabel.NoWritesPerformed)) {
300+
error = operationError;
301+
}
302+
}
303+
304+
if (hasWriteAspect && operationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
272305
throw new MongoServerError({
273306
message: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
274307
errmsg: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
275-
originalError: previousOperationError
308+
originalError: operationError
276309
});
277310
}
278311

312+
// prepare for retry
279313
const isRetryable =
280314
// bulk write commands are retryable if all operations in the batch are retryable
281315
(operation.hasAspect(Aspect.COMMAND_BATCHING) && operation.canRetryWrite) ||
282316
// if we have a retryable read or write operation, we can retry
283-
(hasWriteAspect && willRetryWrite && isRetryableWriteError(previousOperationError)) ||
284-
(hasReadAspect && willRetryRead && isRetryableReadError(previousOperationError)) ||
317+
(!operation.hasAspect(Aspect.COMMAND_BATCHING) && hasWriteAspect && willRetryWrite && isRetryableWriteError(operationError)) ||
318+
(hasReadAspect && willRetryRead && isRetryableReadError(operationError)) ||
285319
// if we have a retryable, system overloaded error, we can retry
286-
(previousOperationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError) &&
287-
previousOperationError.hasErrorLabel(MongoErrorLabel.RetryableError));
320+
(operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError) &&
321+
operationError.hasErrorLabel(MongoErrorLabel.RetryableError));
322+
323+
if (!isRetryable) throw error;
288324

289-
if (!isRetryable) {
290-
throw previousOperationError;
325+
maxAttempts = shouldRetry && operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)
326+
? 6
327+
: maxAttempts
328+
if (attempt >= maxAttempts) {
329+
throw error;
291330
}
292331

293-
if (previousOperationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
332+
// safe to retry - reset timeout context, apply backoff if necessary and re-run server selection
333+
334+
// Reset timeouts
335+
timeoutContext.clear();
336+
337+
if (operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
294338
const delayMS = backoffDelayProvider.getNextBackoffDuration();
295339

296340
// if the delay would exhaust the CSOT timeout, short-circuit.
@@ -299,20 +343,20 @@ async function executeOperationWithRetries<
299343
throw new MongoOperationTimeoutError(
300344
`MongoDB SystemOverload exponential backoff would exceed timeoutMS deadline: remaining CSOT deadline=${timeoutContext.remainingTimeMS}, backoff delayMS=${delayMS}`,
301345
{
302-
cause: previousOperationError
346+
cause: error
303347
}
304348
);
305349
}
306350

307351
if (!topology.tokenBucket.consume(RETRY_COST)) {
308-
throw previousOperationError;
352+
throw error;
309353
}
310354

311355
await setTimeout(delayMS);
312356
}
313357

314358
if (
315-
previousOperationError instanceof MongoNetworkError &&
359+
operationError instanceof MongoNetworkError &&
316360
operation.hasAspect(Aspect.CURSOR_CREATING) &&
317361
session != null &&
318362
session.isPinned &&
@@ -321,6 +365,8 @@ async function executeOperationWithRetries<
321365
session.unpin({ force: true, forceClear: true });
322366
}
323367

368+
deprioritizedServers.add(server.description);
369+
324370
server = await topology.selectServer(selector, {
325371
session,
326372
operationName: operation.commandName,
@@ -333,52 +379,13 @@ async function executeOperationWithRetries<
333379
'Selected server does not support retryable writes'
334380
);
335381
}
336-
}
337-
338-
operation.server = server;
339-
340-
try {
341-
const isRetry = attempt > 0;
342382

343383
// If attempt > 0 and we are command batching we need to reset the batch.
344-
if (isRetry && operation.hasAspect(Aspect.COMMAND_BATCHING)) {
384+
if (operation.hasAspect(Aspect.COMMAND_BATCHING)) {
345385
operation.resetBatch();
346386
}
347-
348-
try {
349-
const result = await server.command(operation, timeoutContext);
350-
topology.tokenBucket.deposit(
351-
isRetry
352-
? // on successful retry, deposit the retry cost + the refresh rate.
353-
TOKEN_REFRESH_RATE + RETRY_COST
354-
: // otherwise, just deposit the refresh rate.
355-
TOKEN_REFRESH_RATE
356-
);
357-
return operation.handleOk(result);
358-
} catch (error) {
359-
return operation.handleError(error);
360-
}
361-
} catch (operationError) {
362-
if (!(operationError instanceof MongoError)) throw operationError;
363-
364-
if (!operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
365-
// if an operation fails with an error that does not contain the SystemOverloadError, deposit 1 token.
366-
topology.tokenBucket.deposit(RETRY_COST);
367-
}
368-
369-
if (
370-
previousOperationError != null &&
371-
operationError.hasErrorLabel(MongoErrorLabel.NoWritesPerformed)
372-
) {
373-
throw previousOperationError;
374-
}
375-
deprioritizedServers.add(server.description);
376-
previousOperationError = operationError;
377-
378-
// Reset timeouts
379-
timeoutContext.clear();
380387
}
381388
}
382389

383-
throw previousOperationError ?? new MongoRuntimeError('ahh');
390+
throw error ?? new MongoRuntimeError('ahh');
384391
}

test/integration/retryable-writes/retryable_writes.spec.prose.test.ts

Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@ import {
88
type Collection,
99
type MongoClient,
1010
MongoError,
11+
MongoErrorLabel,
1112
MongoServerError,
12-
MongoWriteConcernError
13+
MongoWriteConcernError,
1314
} from '../../../src';
1415
import { Server } from '../../../src/sdam/server';
1516
import { sleep } from '../../tools/utils';
@@ -344,4 +345,100 @@ describe('Retryable Writes Spec Prose', () => {
344345
}
345346
);
346347
});
348+
349+
describe('6. Test that drivers return the original error after encountering multiple WriteConcernErrors with a RetryableWriteError label', () => {
350+
let client: MongoClient;
351+
let collection: Collection<{ _id: 1 }>;
352+
353+
beforeEach(async function () {
354+
// This test MUST:
355+
// - be implemented by any driver that implements the Command Monitoring specification,
356+
// - only run against replica sets as mongos does not propagate the NoWritesPerformed label to the drivers.
357+
// - be run against server versions 6.0 and above.
358+
// - be implemented by any driver that has implemented the Client Backpressure specification.
359+
360+
// Additionally, this test requires drivers to set a fail point after an `insertOne` operation but before the subsequent
361+
// retry. Drivers that are unable to set a failCommand after the CommandFailedEvent SHOULD use mocking or write a unit test
362+
// to cover the same sequence of events.
363+
364+
// 1. Create a client with `retryWrites=true`.
365+
client = this.configuration.newClient({ monitorCommands: true, retryWrites: true });
366+
await client
367+
.db()
368+
.collection('retryReturnsOriginal')
369+
.drop()
370+
.catch(() => null);
371+
collection = client.db().collection('retryReturnsOriginal');
372+
});
373+
374+
afterEach(async function () {
375+
// 5. Disable the fail point:
376+
// ```javascript
377+
// {
378+
// configureFailPoint: "failCommand",
379+
// mode: "off"
380+
// }
381+
// ```
382+
383+
// (we don't use a failPoint, so we use sinon.restore instead)
384+
sinon.restore();
385+
await client.close();
386+
});
387+
388+
it(
389+
'when a retry attempt fails with an error labeled NoWritesPerformed, drivers MUST return the original error',
390+
{ requires: { topology: 'replicaset', mongodb: '>=4.2.9' } },
391+
async () => {
392+
// 2. Configure a fail point with error code `91` (ShutdownInProgress) with the `RetryableError` and
393+
// `SystemOverloadedError` error labels:
394+
395+
// ```javascript
396+
// {
397+
// configureFailPoint: "failCommand",
398+
// mode: {times: 1},
399+
// data: {
400+
// failCommands: ["insert"],
401+
// errorLabels: ["RetryableError", "SystemOverloadedError", "NoWritesPerformed"],
402+
// errorCode: 91
403+
// }
404+
// }
405+
// ```
406+
407+
// 3. Via the command monitoring CommandFailedEvent, configure a fail point with error code `10107` (NotWritablePrimary)
408+
// and a NoWritesPerformed label:
409+
410+
// ```javascript
411+
// {
412+
// configureFailPoint: "failCommand",
413+
// mode: "alwaysOn",
414+
// data: {
415+
// failCommands: ["insert"],
416+
// errorCode: 10107,
417+
// errorLabels: ["RetryableError", "SystemOverloadedError", , "NoWritesPerformed"]
418+
// }
419+
// }
420+
// ```
421+
422+
// Drivers SHOULD only configure the `10107` fail point command if the the failed event is for the `91` error configured
423+
// in step 2.
424+
const serverCommandStub = sinon.stub(Server.prototype, 'command').callsFake(async function () {
425+
throw new MongoServerError({
426+
message: 'Server Error',
427+
errorLabels: [MongoErrorLabel.RetryableError, MongoErrorLabel.SystemOverloadedError, MongoErrorLabel.NoWritesPerformed],
428+
code: serverCommandStub.callCount === 1 ? 91 : 10107,
429+
ok: 0
430+
});
431+
});
432+
433+
const insertResult = await collection.insertOne({ _id: 1 }).catch(error => error);
434+
435+
expect(serverCommandStub.callCount).to.equal(6);
436+
437+
// 4. Attempt an `insertOne` operation on any record for any database and collection. Expect the `insertOne` to fail with a
438+
// server error. Assert that the error code of the server error is 91.
439+
expect(insertResult).to.be.instanceOf(MongoServerError);
440+
expect(insertResult).to.have.property('code', 91);
441+
}
442+
);
443+
})
347444
});

0 commit comments

Comments
 (0)