Skip to content

Commit aa78855

Browse files
committed
pr feedback
1 parent ca7749f commit aa78855

2 files changed

Lines changed: 157 additions & 156 deletions

File tree

test/integration/crud/misc_cursors.test.ts

Lines changed: 1 addition & 155 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { expect } from 'chai';
2-
import { on, once } from 'events';
2+
import { once } from 'events';
33

44
import { MongoClientClosedError } from '../../../src/error';
55
import { type MongoClient } from '../../../src/mongo_client';
@@ -726,89 +726,6 @@ describe('Cursor', function () {
726726
}
727727
});
728728

729-
it('does not auto destroy streams', async function () {
730-
const docs = [];
731-
732-
for (let i = 0; i < 10; i++) {
733-
docs.push({ a: i + 1 });
734-
}
735-
736-
const configuration = this.configuration;
737-
await client.connect();
738-
739-
const db = client.db(configuration.db);
740-
const collection = await db.createCollection('does_not_autodestroy_streams');
741-
742-
await collection.insertMany(docs, configuration.writeConcernMax());
743-
744-
const cursor = collection.find();
745-
const stream = cursor.stream();
746-
stream.on('close', () => {
747-
expect.fail('extra close event must not be called');
748-
});
749-
stream.on('end', () => {
750-
client.close();
751-
});
752-
stream.on('data', doc => {
753-
expect(doc).to.exist;
754-
});
755-
stream.resume();
756-
await once(stream, 'end');
757-
});
758-
759-
it('immediately destroying a stream prevents the query from executing', {
760-
// Add a tag that our runner can trigger on
761-
// in this case we are setting that node needs to be higher than 0.10.X to run
762-
metadata: {
763-
requires: { topology: ['single', 'replicaset', 'sharded'] }
764-
},
765-
766-
test: async function () {
767-
const docs = [{ b: 2 }, { b: 3 }];
768-
let i = 0,
769-
doneCalled = 0;
770-
771-
const configuration = this.configuration;
772-
await client.connect();
773-
774-
const db = client.db(configuration.db);
775-
const collection = await db.createCollection(
776-
'immediately_destroying_a_stream_prevents_the_query_from_executing'
777-
);
778-
779-
// insert all docs
780-
await collection.insertMany(docs, configuration.writeConcernMax());
781-
782-
const cursor = collection.find();
783-
const stream = cursor.stream();
784-
785-
stream.on('data', function () {
786-
i++;
787-
});
788-
789-
function testDone() {
790-
return err => {
791-
++doneCalled;
792-
793-
if (doneCalled === 1) {
794-
expect(err).to.not.exist;
795-
test.strictEqual(0, i);
796-
test.strictEqual(true, cursor.closed);
797-
}
798-
};
799-
}
800-
801-
cursor.once('close', testDone('close'));
802-
stream.once('error', testDone('error'));
803-
const promise = once(cursor, 'close');
804-
805-
stream.destroy();
806-
807-
await cursor.close();
808-
await promise;
809-
}
810-
});
811-
812729
it('removes session when cloning an find cursor', async function () {
813730
const collection = await client.db().collection('test');
814731

@@ -833,77 +750,6 @@ describe('Cursor', function () {
833750
expect(clonedCursor).to.have.property('session').to.be.null;
834751
});
835752

836-
it('destroying a stream stops it', async function () {
837-
const db = client.db();
838-
await db.dropCollection('destroying_a_stream_stops_it');
839-
const collection = await db.createCollection('destroying_a_stream_stops_it');
840-
841-
const docs = Array.from({ length: 10 }, (_, i) => ({ b: i + 1 }));
842-
843-
await collection.insertMany(docs);
844-
845-
const cursor = collection.find();
846-
const stream = cursor.stream();
847-
848-
expect(cursor).property('closed', false);
849-
850-
const willClose = once(cursor, 'close');
851-
852-
const dataEvents = on(stream, 'data');
853-
854-
for (let i = 0; i < 5; i++) {
855-
const {
856-
value: [doc]
857-
} = await dataEvents.next();
858-
expect(doc).property('b', i + 1);
859-
}
860-
861-
// After 5 successful data events, destroy stream
862-
stream.destroy();
863-
864-
// We should get a close event on the stream and a close event on the cursor
865-
// We should **not** get an 'error' or an 'end' event,
866-
// the following will throw if either stream or cursor emitted an 'error' event
867-
await Promise.race([
868-
willClose,
869-
sleep(100).then(() => Promise.reject(new Error('close event never emitted')))
870-
]);
871-
});
872-
873-
it('Should not emit any events after close event emitted due to cursor killed', {
874-
// Add a tag that our runner can trigger on
875-
// in this case we are setting that node needs to be higher than 0.10.X to run
876-
metadata: { requires: { topology: ['single', 'replicaset'] } },
877-
878-
test: async function () {
879-
const configuration = this.configuration;
880-
await client.connect();
881-
882-
const db = client.db(configuration.db);
883-
const collection = db.collection('cursor_limit_skip_correctly');
884-
885-
// Insert x number of docs
886-
const ordered = collection.initializeUnorderedBulkOp();
887-
888-
for (let i = 0; i < 100; i++) {
889-
ordered.insert({ a: i });
890-
}
891-
892-
await ordered.execute({ writeConcern: { w: 1 } });
893-
894-
// Let's attempt to skip and limit
895-
const cursor = collection.find({}).batchSize(10);
896-
const stream = cursor.stream();
897-
stream.on('data', function () {
898-
stream.destroy();
899-
});
900-
901-
const onClose = once(cursor, 'close');
902-
await cursor.close();
903-
await onClose;
904-
}
905-
});
906-
907753
// NOTE: skipped for use of topology manager
908754
// it.skip('cursor stream errors', {
909755
// // Add a tag that our runner can trigger on

test/integration/node-specific/cursor_stream.test.ts

Lines changed: 156 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1-
import { once } from 'node:events';
1+
import { strictEqual } from 'node:assert';
2+
import { on, once } from 'node:events';
23

34
import { expect } from 'chai';
45

56
import { Binary, type Collection, type Db, type MongoClient, MongoServerError } from '../../../src';
7+
import { sleep } from '../../tools/utils';
68

79
describe('Cursor Streams', function () {
810
let client: MongoClient;
@@ -78,4 +80,157 @@ describe('Cursor Streams', function () {
7880
expect(error).to.be.instanceof(MongoServerError);
7981
expect(error.message).to.include('unknown operator');
8082
});
83+
84+
it('does not auto destroy streams', async function () {
85+
const docs = [];
86+
87+
for (let i = 0; i < 10; i++) {
88+
docs.push({ a: i + 1 });
89+
}
90+
91+
const configuration = this.configuration;
92+
await client.connect();
93+
94+
const db = client.db(configuration.db);
95+
const collection = await db.createCollection('does_not_autodestroy_streams');
96+
97+
await collection.insertMany(docs, configuration.writeConcernMax());
98+
99+
const cursor = collection.find();
100+
const stream = cursor.stream();
101+
stream.on('close', () => {
102+
expect.fail('extra close event must not be called');
103+
});
104+
stream.on('data', doc => {
105+
expect(doc).to.exist;
106+
});
107+
stream.resume();
108+
await once(stream, 'end');
109+
110+
await cursor.close();
111+
});
112+
113+
it('immediately destroying a stream prevents the query from executing', {
114+
// Add a tag that our runner can trigger on
115+
// in this case we are setting that node needs to be higher than 0.10.X to run
116+
metadata: {
117+
requires: { topology: ['single', 'replicaset', 'sharded'] }
118+
},
119+
120+
test: async function () {
121+
const docs = [{ b: 2 }, { b: 3 }];
122+
let i = 0,
123+
doneCalled = 0;
124+
125+
const configuration = this.configuration;
126+
await client.connect();
127+
128+
const db = client.db(configuration.db);
129+
const collection = await db.createCollection(
130+
'immediately_destroying_a_stream_prevents_the_query_from_executing'
131+
);
132+
133+
// insert all docs
134+
await collection.insertMany(docs, configuration.writeConcernMax());
135+
136+
const cursor = collection.find();
137+
const stream = cursor.stream();
138+
139+
stream.on('data', function () {
140+
i++;
141+
});
142+
143+
function testDone() {
144+
return err => {
145+
++doneCalled;
146+
147+
if (doneCalled === 1) {
148+
expect(err).to.not.exist;
149+
strictEqual(0, i);
150+
strictEqual(true, cursor.closed);
151+
}
152+
};
153+
}
154+
155+
cursor.once('close', testDone('close'));
156+
stream.once('error', testDone('error'));
157+
const promise = once(cursor, 'close');
158+
159+
stream.destroy();
160+
161+
await cursor.close();
162+
await promise;
163+
}
164+
});
165+
166+
it('destroying a stream stops it', async function () {
167+
const db = client.db();
168+
await db.dropCollection('destroying_a_stream_stops_it');
169+
const collection = await db.createCollection('destroying_a_stream_stops_it');
170+
171+
const docs = Array.from({ length: 10 }, (_, i) => ({ b: i + 1 }));
172+
173+
await collection.insertMany(docs);
174+
175+
const cursor = collection.find();
176+
const stream = cursor.stream();
177+
178+
expect(cursor).property('closed', false);
179+
180+
const willClose = once(cursor, 'close');
181+
182+
const dataEvents = on(stream, 'data');
183+
184+
for (let i = 0; i < 5; i++) {
185+
const {
186+
value: [doc]
187+
} = await dataEvents.next();
188+
expect(doc).property('b', i + 1);
189+
}
190+
191+
// After 5 successful data events, destroy stream
192+
stream.destroy();
193+
194+
// We should get a close event on the stream and a close event on the cursor
195+
// We should **not** get an 'error' or an 'end' event,
196+
// the following will throw if either stream or cursor emitted an 'error' event
197+
await Promise.race([
198+
willClose,
199+
sleep(100).then(() => Promise.reject(new Error('close event never emitted')))
200+
]);
201+
});
202+
203+
it('Should not emit any events after close event emitted due to cursor killed', {
204+
// Add a tag that our runner can trigger on
205+
// in this case we are setting that node needs to be higher than 0.10.X to run
206+
metadata: { requires: { topology: ['single', 'replicaset'] } },
207+
208+
test: async function () {
209+
const configuration = this.configuration;
210+
await client.connect();
211+
212+
const db = client.db(configuration.db);
213+
const collection = db.collection('cursor_limit_skip_correctly');
214+
215+
// Insert x number of docs
216+
const ordered = collection.initializeUnorderedBulkOp();
217+
218+
for (let i = 0; i < 100; i++) {
219+
ordered.insert({ a: i });
220+
}
221+
222+
await ordered.execute({ writeConcern: { w: 1 } });
223+
224+
// Let's attempt to skip and limit
225+
const cursor = collection.find({}).batchSize(10);
226+
const stream = cursor.stream();
227+
stream.on('data', function () {
228+
stream.destroy();
229+
});
230+
231+
const onClose = once(cursor, 'close');
232+
await cursor.close();
233+
await onClose;
234+
}
235+
});
81236
});

0 commit comments

Comments
 (0)