Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,11 @@ export class ChangeStream<
return this.cursor?.resumeToken;
}

/** Returns the currently buffered documents length of the underlying cursor. */
bufferedCount(): number {
return this.cursor?.bufferedCount() ?? 0;
}
Comment thread
nbbeeken marked this conversation as resolved.

/** Check if there is any document still available in the Change Stream */
async hasNext(): Promise<boolean> {
this._setIsIterator();
Expand Down
85 changes: 85 additions & 0 deletions test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1167,6 +1167,91 @@ describe('Change Streams', function () {
);
});
});

// Note that `insertOne` in these tests is called with write concern `{ w: 'majority' }`.
// This is to avoid eventual consistency issues that make these tests brittle.
// A change stream won't see the new documents until they are persisted in the oplog and
// some MongoDB server versions (4.2-) used for testing default to write concern `{ w: 1 }`.
// Using write concern `{ w: 'majority' }` ensures that the new documents are persisted in
Comment thread
dariakp marked this conversation as resolved.
Outdated
// the oplog when the `insertOne` promise resolves, making these tests more reliable.
describe('#bufferedCount()', function () {
it(
Comment thread
dariakp marked this conversation as resolved.
'should return 0 before starting to consume the change stream (empty collection)',
{ requires: { topology: 'replicaset' } },
async function () {
changeStream = collection.watch([]);
expect(changeStream.bufferedCount()).to.equal(0);
await changeStream.close();
}
);

it(
'should return 0 before starting to consume the change stream (non-empty collection)',
{ requires: { topology: 'replicaset' } },
async function () {
// existing documents
await collection.insertOne({ a: 1 }, { writeConcern: { w: 'majority' } });
await collection.insertOne({ a: 2 }, { writeConcern: { w: 'majority' } });

changeStream = collection.watch([]);

// docs inserted after the change stream was created
await collection.insertOne({ a: 3 }, { writeConcern: { w: 'majority' } });
await collection.insertOne({ a: 4 }, { writeConcern: { w: 'majority' } });

expect(changeStream.bufferedCount()).to.equal(0);
await changeStream.close();
}
);

it('should return the underlying cursor buffered document count', {
metadata: { requires: { topology: 'replicaset' } },
async test() {
changeStream = collection.watch([]);
await initIteratorMode(changeStream);
await collection.insertOne({ a: 1 }, { writeConcern: { w: 'majority' } });
await collection.insertOne({ a: 2 }, { writeConcern: { w: 'majority' } });

expect(changeStream.bufferedCount()).to.equal(0);

// hasNext will trigger a batch fetch, buffering the documents
const hasNext = await changeStream.hasNext();
expect(hasNext).to.be.true;

expect(changeStream.bufferedCount()).to.equal(2);
}
});

it(`decreases as buffered documents are consumed`, {
metadata: { requires: { topology: 'replicaset' } },
async test() {
changeStream = collection.watch([]);
await initIteratorMode(changeStream);
await collection.insertOne({ a: 1 }, { writeConcern: { w: 'majority' } });
await collection.insertOne({ a: 2 }, { writeConcern: { w: 'majority' } });
await collection.insertOne({ a: 3 }, { writeConcern: { w: 'majority' } });

expect(changeStream.bufferedCount()).to.equal(0);

// `next` triggers a batch fetch, buffering the documents
// and then consumes one document from that buffer
for (let i = 2; i >= 0; i--) {
await changeStream.next();
expect(changeStream.bufferedCount()).to.equal(i);
}

await collection.insertOne({ a: 1 }, { writeConcern: { w: 'majority' } });
await collection.insertOne({ a: 2 }, { writeConcern: { w: 'majority' } });
await collection.insertOne({ a: 3 }, { writeConcern: { w: 'majority' } });

// `tryNext` also triggers a batch fetch
for (let i = 2; i >= 0; i--) {
await changeStream.tryNext();
expect(changeStream.bufferedCount()).to.equal(i);
}
}
});
});
});

describe('startAfter', function () {
Expand Down
Loading