Skip to content

Commit bb59533

Browse files
committed
fix(NODE-4763): cache resume token in ChangeStream#tryNext() method
1 parent cd4f6c0 commit bb59533

3 files changed

Lines changed: 102 additions & 12 deletions

File tree

src/change_stream.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -809,7 +809,11 @@ export class ChangeStream<
809809
while (true) {
810810
try {
811811
const change = await this.cursor.tryNext();
812-
return change ?? null;
812+
if (!change) {
813+
return null;
814+
}
815+
const processedChange = this._processChange(change);
816+
return processedChange;
813817
} catch (error) {
814818
try {
815819
await this._processErrorIteratorMode(error, this.cursor.id != null);

test/integration/change-streams/change_stream.test.ts

Lines changed: 91 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ const pipeline = [
4545
{ $addFields: { comment: 'The documentKey field has been projected out of this document.' } }
4646
];
4747

48-
describe.only('Change Streams', function () {
48+
describe('Change Streams', function () {
4949
let client: MongoClient;
5050
let collection: Collection;
5151
let changeStream: ChangeStream;
@@ -377,10 +377,7 @@ describe.only('Change Streams', function () {
377377

378378
async test() {
379379
await initIteratorMode(changeStream);
380-
collection.insertOne({ a: 1 });
381-
382-
const hasNext = await changeStream.hasNext();
383-
expect(hasNext).to.be.true;
380+
await collection.insertOne({ a: 1 });
384381

385382
const change = await changeStream.next();
386383
expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken);
@@ -392,10 +389,7 @@ describe.only('Change Streams', function () {
392389

393390
async test() {
394391
await initIteratorMode(changeStream);
395-
collection.insertOne({ a: 1 });
396-
397-
const hasNext = await changeStream.hasNext();
398-
expect(hasNext).to.be.true;
392+
await collection.insertOne({ a: 1 });
399393

400394
const change = await changeStream.tryNext();
401395
expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken);
@@ -408,7 +402,7 @@ describe.only('Change Streams', function () {
408402
async test() {
409403
const willBeChange = once(changeStream, 'change');
410404
await once(changeStream.cursor, 'init');
411-
collection.insertOne({ a: 1 });
405+
await collection.insertOne({ a: 1 });
412406

413407
const [change] = await willBeChange;
414408
expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken);
@@ -2753,3 +2747,90 @@ describe('ChangeStream resumability', function () {
27532747
}
27542748
);
27552749
});
2750+
2751+
describe('NODE-4763 - handle resume without duplicates', function () {
2752+
let client: MongoClient;
2753+
let db: Db;
2754+
let collection: Collection;
2755+
let changeStream: ChangeStream;
2756+
const resumableError = { code: 6, message: 'host unreachable' };
2757+
2758+
beforeEach(async function () {
2759+
client = this.configuration.newClient();
2760+
await client.connect();
2761+
2762+
await client.db('admin').command({
2763+
configureFailPoint: is4_2Server(this.configuration.version)
2764+
? 'failCommand'
2765+
: 'failGetMoreAfterCursorCheckout',
2766+
mode: { skip: 1 },
2767+
data: {
2768+
failCommands: ['getMore'],
2769+
errorCode: resumableError.code,
2770+
errmsg: resumableError.message
2771+
}
2772+
} as FailPoint);
2773+
2774+
db = client.db('NODE-4763');
2775+
collection = await db.createCollection('test');
2776+
changeStream = collection.watch();
2777+
});
2778+
2779+
afterEach(async function () {
2780+
await client.db('admin').command({
2781+
configureFailPoint: is4_2Server(this.configuration.version)
2782+
? 'failCommand'
2783+
: 'failGetMoreAfterCursorCheckout',
2784+
mode: 'off'
2785+
} as FailPoint);
2786+
2787+
await changeStream.close();
2788+
await db.dropCollection('test');
2789+
await client?.close();
2790+
});
2791+
2792+
describe('when using iterator form', function () {
2793+
it('#next', { requires: { topology: 'replicaset' } }, async function test() {
2794+
await initIteratorMode(changeStream);
2795+
2796+
await collection.insertOne({ a: 1 });
2797+
const change = await changeStream.next();
2798+
expect(change).to.have.property('operationType', 'insert');
2799+
expect(change).to.have.nested.property('fullDocument.a', 1);
2800+
2801+
await collection.insertOne({ a: 2 });
2802+
const change2 = await changeStream.next();
2803+
expect(change2).to.have.property('operationType', 'insert');
2804+
expect(change2).to.have.nested.property('fullDocument.a', 2);
2805+
});
2806+
2807+
it('#tryNext', { requires: { topology: 'replicaset' } }, async function test() {
2808+
await initIteratorMode(changeStream);
2809+
2810+
await collection.insertOne({ a: 1 });
2811+
const change = await changeStream.tryNext();
2812+
expect(change).to.have.property('operationType', 'insert');
2813+
expect(change).to.have.nested.property('fullDocument.a', 1);
2814+
2815+
await collection.insertOne({ a: 2 });
2816+
const change2 = await changeStream.tryNext();
2817+
expect(change2).to.have.property('operationType', 'insert');
2818+
expect(change2).to.have.nested.property('fullDocument.a', 2);
2819+
});
2820+
});
2821+
2822+
it('in an event listener form', { requires: { topology: 'replicaset' } }, async function () {
2823+
const willBeChange = on(changeStream, 'change');
2824+
await once(changeStream.cursor, 'init');
2825+
2826+
await collection.insertOne({ a: 1 });
2827+
const change = await willBeChange.next();
2828+
expect(change.value[0]).to.have.property('operationType', 'insert');
2829+
expect(change.value[0]).to.have.nested.property('fullDocument.a', 1);
2830+
2831+
await collection.insertOne({ a: 2 });
2832+
const change2 = await willBeChange.next();
2833+
expect(change2.value[0]).to.have.property('operationType', 'insert');
2834+
expect(change2.value[0]).to.have.nested.property('fullDocument.a', 2);
2835+
});
2836+
});

test/tools/utils.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,12 @@ export function extractAuthFromConnectionString(connectionString: string | any[]
207207

208208
export interface FailPoint {
209209
configureFailPoint: 'failCommand' | 'failGetMoreAfterCursorCheckout' | 'maxTimeNeverTimeOut';
210-
mode: { activationProbability: number } | { times: number } | 'alwaysOn' | 'off';
210+
mode:
211+
| { activationProbability: number }
212+
| { times: number }
213+
| { skip: number }
214+
| 'alwaysOn'
215+
| 'off';
211216
data: {
212217
failCommands: string[];
213218
errorCode?: number;

0 commit comments

Comments
 (0)