From e2b76face95a6ad62ce9880ef4f050767261b044 Mon Sep 17 00:00:00 2001 From: Gino Heyman Date: Tue, 10 Feb 2026 21:38:04 +0100 Subject: [PATCH 1/6] feat(NODE-7441): add `ChangeStream.bufferedCount` --- src/change_stream.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/change_stream.ts b/src/change_stream.ts index 22330e8595e..a89b3849366 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -705,6 +705,11 @@ export class ChangeStream< return this.cursor?.resumeToken; } + /** Returns the currently buffered documents length of the underlying cursor. */ + get bufferedCount(): number | undefined { + return this.cursor?.bufferedCount(); + } + /** Check if there is any document still available in the Change Stream */ async hasNext(): Promise { this._setIsIterator(); From ff34eb89d96d5bb5da3b64407462fd5fe45f5505 Mon Sep 17 00:00:00 2001 From: Gino Heyman Date: Wed, 4 Mar 2026 10:13:37 +0100 Subject: [PATCH 2/6] feat(NODE-7441): align `bufferedCount` signature with `AbstractCursor` --- src/change_stream.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index a89b3849366..f99a18c5016 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -706,8 +706,8 @@ export class ChangeStream< } /** Returns the currently buffered documents length of the underlying cursor. */ - get bufferedCount(): number | undefined { - return this.cursor?.bufferedCount(); + bufferedCount(): number { + return this.cursor?.bufferedCount() ?? 0; } /** Check if there is any document still available in the Change Stream */ From cc16a749e9356991d9ee8188fd85a8d8b6937d70 Mon Sep 17 00:00:00 2001 From: Gino Heyman Date: Wed, 4 Mar 2026 20:33:17 +0100 Subject: [PATCH 3/6] feat(NODE-7441): add `ChangeStream.bufferedCount` tests --- .../change-streams/change_stream.test.ts | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index c3011a4168c..363bdbbd0d5 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -1165,6 +1165,82 @@ describe('Change Streams', function () { ); }); }); + + describe('#bufferedCount()', function () { + it( + 'should return 0 before starting to consume the change stream (empty collection)', + { requires: { topology: 'replicaset' } }, + async function () { + changeStream = collection.watch([]); + expect(changeStream.bufferedCount()).to.equal(0); + await changeStream.close(); + } + ); + + it( + 'should return 0 before starting to consume the change stream (non-empty collection)', + { requires: { topology: 'replicaset' } }, + async function () { + // existing documents + await collection.insertOne({ a: 1 }); + await collection.insertOne({ a: 2 }); + + changeStream = collection.watch([]); + + // docs inserted after the change stream was created + await collection.insertOne({ a: 3 }); + await collection.insertOne({ a: 4 }); + + expect(changeStream.bufferedCount()).to.equal(0); + await changeStream.close(); + } + ); + + it('should return the underlying cursor buffered document count', { + metadata: { requires: { topology: 'replicaset' } }, + async test() { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + await collection.insertOne({ a: 1 }); + await collection.insertOne({ a: 2 }); + + expect(changeStream.bufferedCount()).to.equal(0); + + // hasNext will trigger a batch fetch, buffering the documents + const hasNext = await changeStream.hasNext(); + expect(hasNext).to.be.true; + + expect(changeStream.bufferedCount()).to.equal(2); + } + }); + + it(`decreases as buffered documents are consumed`, { + metadata: { requires: { topology: 'replicaset' } }, + async test() { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + await collection.insertOne({ a: 1 }); + await collection.insertOne({ a: 2 }); + + expect(changeStream.bufferedCount()).to.equal(0); + // `next` triggers a batch fetch, buffering the documents + // and then consumes one document from that buffer + await changeStream.next(); + expect(changeStream.bufferedCount()).to.equal(1); + await changeStream.next(); + expect(changeStream.bufferedCount()).to.equal(0); + + await collection.insertOne({ a: 1 }); + await collection.insertOne({ a: 2 }); + + // `tryNext` also triggers a batch fetch + await changeStream.tryNext(); + expect(changeStream.bufferedCount()).to.equal(1); + await changeStream.tryNext(); + expect(changeStream.bufferedCount()).to.equal(0); + } + }); + }); }); describe('startAfter', function () { From 4bbdf201838f072c79f9c94b12cdc7e37752cf59 Mon Sep 17 00:00:00 2001 From: Gino Heyman Date: Sun, 8 Mar 2026 14:41:29 +0100 Subject: [PATCH 4/6] feat(NODE-7441): use `majority` `wc` for `ChangeStream.bufferedCount` tests --- .../change-streams/change_stream.test.ts | 39 ++++++++++--------- 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index aab490f8549..30cbebd5a1c 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -1184,14 +1184,14 @@ describe('Change Streams', function () { { requires: { topology: 'replicaset' } }, async function () { // existing documents - await collection.insertOne({ a: 1 }); - await collection.insertOne({ a: 2 }); + await collection.insertOne({ a: 1 }, { writeConcern: { w: 'majority' } }); + await collection.insertOne({ a: 2 }, { writeConcern: { w: 'majority' } }); changeStream = collection.watch([]); // docs inserted after the change stream was created - await collection.insertOne({ a: 3 }); - await collection.insertOne({ a: 4 }); + await collection.insertOne({ a: 3 }, { writeConcern: { w: 'majority' } }); + await collection.insertOne({ a: 4 }, { writeConcern: { w: 'majority' } }); expect(changeStream.bufferedCount()).to.equal(0); await changeStream.close(); @@ -1203,8 +1203,8 @@ describe('Change Streams', function () { async test() { changeStream = collection.watch([]); await initIteratorMode(changeStream); - await collection.insertOne({ a: 1 }); - await collection.insertOne({ a: 2 }); + await collection.insertOne({ a: 1 }, { writeConcern: { w: 'majority' } }); + await collection.insertOne({ a: 2 }, { writeConcern: { w: 'majority' } }); expect(changeStream.bufferedCount()).to.equal(0); @@ -1221,25 +1221,28 @@ describe('Change Streams', function () { async test() { changeStream = collection.watch([]); await initIteratorMode(changeStream); - await collection.insertOne({ a: 1 }); - await collection.insertOne({ a: 2 }); + await collection.insertOne({ a: 1 }, { writeConcern: { w: 'majority' } }); + await collection.insertOne({ a: 2 }, { writeConcern: { w: 'majority' } }); + await collection.insertOne({ a: 3 }, { writeConcern: { w: 'majority' } }); expect(changeStream.bufferedCount()).to.equal(0); + // `next` triggers a batch fetch, buffering the documents // and then consumes one document from that buffer - await changeStream.next(); - expect(changeStream.bufferedCount()).to.equal(1); - await changeStream.next(); - expect(changeStream.bufferedCount()).to.equal(0); + for (let i = 2; i >= 0; i--) { + await changeStream.next(); + expect(changeStream.bufferedCount()).to.equal(i); + } - await collection.insertOne({ a: 1 }); - await collection.insertOne({ a: 2 }); + await collection.insertOne({ a: 1 }, { writeConcern: { w: 'majority' } }); + await collection.insertOne({ a: 2 }, { writeConcern: { w: 'majority' } }); + await collection.insertOne({ a: 3 }, { writeConcern: { w: 'majority' } }); // `tryNext` also triggers a batch fetch - await changeStream.tryNext(); - expect(changeStream.bufferedCount()).to.equal(1); - await changeStream.tryNext(); - expect(changeStream.bufferedCount()).to.equal(0); + for (let i = 2; i >= 0; i--) { + await changeStream.tryNext(); + expect(changeStream.bufferedCount()).to.equal(i); + } } }); }); From 7dad46506673ca81e4283a17e9dcf97e70614721 Mon Sep 17 00:00:00 2001 From: Gino Heyman Date: Thu, 12 Mar 2026 09:28:45 +0100 Subject: [PATCH 5/6] feat(NODE-7441): add comment about write concern usage in tests --- test/integration/change-streams/change_stream.test.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 30cbebd5a1c..3c650e6a44d 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -1168,6 +1168,12 @@ describe('Change Streams', function () { }); }); + // Note that `insertOne` in these tests is called with write concern `{ w: 'majority' }`. + // This is to avoid eventual consistency issues that make these tests brittle. + // A change stream won't see the new documents until they are persisted in the oplog and + // some MongoDB server versions (4.2-) used for testing default to write concern `{ w: 1 }`. + // Using write concern `{ w: 'majority' }` ensures that the new documents are persisted in + // the oplog when the `insertOne` promise resolves, making these tests more reliable. describe('#bufferedCount()', function () { it( 'should return 0 before starting to consume the change stream (empty collection)', From af2217dd65650d3ad55cb241a1680a3b293de34c Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Sun, 22 Mar 2026 14:49:19 -0400 Subject: [PATCH 6/6] lint: Update test/integration/change-streams/change_stream.test.ts --- test/integration/change-streams/change_stream.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 3c650e6a44d..ae79cb5f3f1 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -1172,7 +1172,7 @@ describe('Change Streams', function () { // This is to avoid eventual consistency issues that make these tests brittle. // A change stream won't see the new documents until they are persisted in the oplog and // some MongoDB server versions (4.2-) used for testing default to write concern `{ w: 1 }`. - // Using write concern `{ w: 'majority' }` ensures that the new documents are persisted in + // Using write concern `{ w: 'majority' }` ensures that the new documents are persisted in // the oplog when the `insertOne` promise resolves, making these tests more reliable. describe('#bufferedCount()', function () { it(