Skip to content

Commit cc16a74

Browse files
committed
feat(NODE-7441): add ChangeStream.bufferedCount tests
1 parent ff34eb8 commit cc16a74

1 file changed

Lines changed: 76 additions & 0 deletions

File tree

test/integration/change-streams/change_stream.test.ts

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1165,6 +1165,82 @@ describe('Change Streams', function () {
11651165
);
11661166
});
11671167
});
1168+
1169+
describe('#bufferedCount()', function () {
1170+
it(
1171+
'should return 0 before starting to consume the change stream (empty collection)',
1172+
{ requires: { topology: 'replicaset' } },
1173+
async function () {
1174+
changeStream = collection.watch([]);
1175+
expect(changeStream.bufferedCount()).to.equal(0);
1176+
await changeStream.close();
1177+
}
1178+
);
1179+
1180+
it(
1181+
'should return 0 before starting to consume the change stream (non-empty collection)',
1182+
{ requires: { topology: 'replicaset' } },
1183+
async function () {
1184+
// existing documents
1185+
await collection.insertOne({ a: 1 });
1186+
await collection.insertOne({ a: 2 });
1187+
1188+
changeStream = collection.watch([]);
1189+
1190+
// docs inserted after the change stream was created
1191+
await collection.insertOne({ a: 3 });
1192+
await collection.insertOne({ a: 4 });
1193+
1194+
expect(changeStream.bufferedCount()).to.equal(0);
1195+
await changeStream.close();
1196+
}
1197+
);
1198+
1199+
it('should return the underlying cursor buffered document count', {
1200+
metadata: { requires: { topology: 'replicaset' } },
1201+
async test() {
1202+
changeStream = collection.watch([]);
1203+
await initIteratorMode(changeStream);
1204+
await collection.insertOne({ a: 1 });
1205+
await collection.insertOne({ a: 2 });
1206+
1207+
expect(changeStream.bufferedCount()).to.equal(0);
1208+
1209+
// hasNext will trigger a batch fetch, buffering the documents
1210+
const hasNext = await changeStream.hasNext();
1211+
expect(hasNext).to.be.true;
1212+
1213+
expect(changeStream.bufferedCount()).to.equal(2);
1214+
}
1215+
});
1216+
1217+
it(`decreases as buffered documents are consumed`, {
1218+
metadata: { requires: { topology: 'replicaset' } },
1219+
async test() {
1220+
changeStream = collection.watch([]);
1221+
await initIteratorMode(changeStream);
1222+
await collection.insertOne({ a: 1 });
1223+
await collection.insertOne({ a: 2 });
1224+
1225+
expect(changeStream.bufferedCount()).to.equal(0);
1226+
// `next` triggers a batch fetch, buffering the documents
1227+
// and then consumes one document from that buffer
1228+
await changeStream.next();
1229+
expect(changeStream.bufferedCount()).to.equal(1);
1230+
await changeStream.next();
1231+
expect(changeStream.bufferedCount()).to.equal(0);
1232+
1233+
await collection.insertOne({ a: 1 });
1234+
await collection.insertOne({ a: 2 });
1235+
1236+
// `tryNext` also triggers a batch fetch
1237+
await changeStream.tryNext();
1238+
expect(changeStream.bufferedCount()).to.equal(1);
1239+
await changeStream.tryNext();
1240+
expect(changeStream.bufferedCount()).to.equal(0);
1241+
}
1242+
});
1243+
});
11681244
});
11691245

11701246
describe('startAfter', function () {

0 commit comments

Comments
 (0)