Skip to content

Commit be73c20

Browse files
Merge branch 'main' into NODE-7306
2 parents 67e9d63 + b1b6e81 commit be73c20

11 files changed

Lines changed: 36 additions & 26 deletions

File tree

src/cmap/connection_pool.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
233233
this.mongoLogger = this.server.topology.client?.mongoLogger;
234234
this.component = 'connection';
235235

236-
process.nextTick(() => {
236+
queueMicrotask(() => {
237237
this.emitAndLog(ConnectionPool.CONNECTION_POOL_CREATED, new ConnectionPoolCreatedEvent(this));
238238
});
239239
}
@@ -344,7 +344,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
344344
});
345345

346346
this.waitQueue.push(waitQueueMember);
347-
process.nextTick(() => this.processWaitQueue());
347+
queueMicrotask(() => this.processWaitQueue());
348348

349349
try {
350350
timeout?.throwIfExpired();
@@ -407,7 +407,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
407407
this.destroyConnection(connection, reason);
408408
}
409409

410-
process.nextTick(() => this.processWaitQueue());
410+
queueMicrotask(() => this.processWaitQueue());
411411
}
412412

413413
/**
@@ -463,7 +463,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
463463
}
464464

465465
if (interruptInUseConnections) {
466-
process.nextTick(() => this.interruptInUseConnections(oldGeneration));
466+
queueMicrotask(() => this.interruptInUseConnections(oldGeneration));
467467
}
468468

469469
this.processWaitQueue();
@@ -704,7 +704,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
704704
this.createConnection((err, connection) => {
705705
if (!err && connection) {
706706
this.connections.push(connection);
707-
process.nextTick(() => this.processWaitQueue());
707+
queueMicrotask(() => this.processWaitQueue());
708708
}
709709
if (this.poolState === PoolState.ready) {
710710
clearTimeout(this.minPoolSizeTimer);
@@ -811,7 +811,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
811811
waitQueueMember.resolve(connection);
812812
}
813813
}
814-
process.nextTick(() => this.processWaitQueue());
814+
queueMicrotask(() => this.processWaitQueue());
815815
});
816816
}
817817
this.processingWaitQueue = false;

src/gridfs/upload.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ export class GridFSBucketWriteStream extends Writable {
167167
}
168168
);
169169
} else {
170-
return process.nextTick(callback);
170+
return queueMicrotask(callback);
171171
}
172172
}
173173

@@ -190,7 +190,7 @@ export class GridFSBucketWriteStream extends Writable {
190190
/** @internal */
191191
override _final(callback: (error?: Error | null) => void): void {
192192
if (this.state.streamEnd) {
193-
return process.nextTick(callback);
193+
return queueMicrotask(callback);
194194
}
195195
this.state.streamEnd = true;
196196
writeRemnant(this, callback);
@@ -222,11 +222,11 @@ export class GridFSBucketWriteStream extends Writable {
222222

223223
function handleError(stream: GridFSBucketWriteStream, error: Error, callback: Callback): void {
224224
if (stream.state.errored) {
225-
process.nextTick(callback);
225+
queueMicrotask(callback);
226226
return;
227227
}
228228
stream.state.errored = true;
229-
process.nextTick(callback, error);
229+
queueMicrotask(() => callback(error));
230230
}
231231

232232
function createChunkDoc(filesId: ObjectId, n: number, data: Buffer): GridFSChunk {
@@ -285,7 +285,7 @@ async function checkChunksIndex(stream: GridFSBucketWriteStream): Promise<void>
285285

286286
function checkDone(stream: GridFSBucketWriteStream, callback: Callback): void {
287287
if (stream.done) {
288-
return process.nextTick(callback);
288+
return queueMicrotask(callback);
289289
}
290290

291291
if (stream.state.streamEnd && stream.state.outstandingRequests === 0 && !stream.state.errored) {
@@ -329,7 +329,7 @@ function checkDone(stream: GridFSBucketWriteStream, callback: Callback): void {
329329
return;
330330
}
331331

332-
process.nextTick(callback);
332+
queueMicrotask(callback);
333333
}
334334

335335
async function checkIndexes(stream: GridFSBucketWriteStream): Promise<void> {
@@ -427,7 +427,7 @@ function doWrite(
427427
if (stream.pos + inputBuf.length < stream.chunkSizeBytes) {
428428
inputBuf.copy(stream.bufToStore, stream.pos);
429429
stream.pos += inputBuf.length;
430-
process.nextTick(callback);
430+
queueMicrotask(callback);
431431
return;
432432
}
433433

@@ -532,7 +532,7 @@ function writeRemnant(stream: GridFSBucketWriteStream, callback: Callback): void
532532

533533
function isAborted(stream: GridFSBucketWriteStream, callback: Callback<void>): boolean {
534534
if (stream.state.aborted) {
535-
process.nextTick(callback, new MongoAPIError('Stream has been aborted'));
535+
queueMicrotask(() => callback(new MongoAPIError('Stream has been aborted')));
536536
return true;
537537
}
538538
return false;

src/sdam/monitor.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,7 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
428428
function monitorServer(monitor: Monitor) {
429429
return (callback: Callback) => {
430430
if (monitor.s.state === STATE_MONITORING) {
431-
process.nextTick(callback);
431+
queueMicrotask(callback);
432432
return;
433433
}
434434
stateTransition(monitor, STATE_MONITORING);

src/sdam/server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
416416
error.addErrorLabel(MongoErrorLabel.ResetPool);
417417
}
418418
markServerUnknown(this, error);
419-
process.nextTick(() => this.requestCheck());
419+
queueMicrotask(() => this.requestCheck());
420420
return;
421421
}
422422

src/sdam/topology.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1078,7 +1078,7 @@ function processWaitQueue(topology: Topology) {
10781078
if (topology.waitQueue.length > 0) {
10791079
// ensure all server monitors attempt monitoring soon
10801080
for (const [, server] of topology.s.servers) {
1081-
process.nextTick(function scheduleServerCheck() {
1081+
queueMicrotask(function scheduleServerCheck() {
10821082
return server.requestCheck();
10831083
});
10841084
}

test/integration/sessions/sessions.prose.test.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,18 @@ describe('Sessions Prose Tests', () => {
107107
expect(allResults).to.have.lengthOf(operations.length);
108108
expect(events).to.have.lengthOf(operations.length);
109109

110-
// This is a guarantee in node, unless you are performing a transaction (which is not being done in this test)
111-
expect(new Set(events.map(ev => ev.command.lsid.id.toString('hex')))).to.have.lengthOf(1);
110+
// Previous version of this test was too strict: we were expecting that only one session be used for this scenario.
111+
// That was possible at the time because the operations were simple enough and the server fast enough that the operations would complete serially.
112+
//
113+
// However, with a more complex operation bulkWrite (like `Array.from({ length: 100_000 }).map(() => ({ insertOne: { document: { a: 1 } } })),`),
114+
// it's entirely possible and expected that bulkWrite would introduce a second session due to the time it takes to process all the inserts.
115+
//
116+
// The important bit of the test is that the number of sessions is less than the number of concurrent operations, so now instead of expecting exactly 1 session,
117+
// we just expect less than operations.length - 1 sessions.
118+
//
119+
const uniqueSessionIds = new Set(events.map(ev => ev.command.lsid.id.toString('hex')));
120+
const expectedMaxSessions = operations.length - 1;
121+
expect(uniqueSessionIds).to.have.length.lessThan(expectedMaxSessions);
112122
});
113123
});
114124

test/tools/runner/ee_checker.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ events.EventEmitter = class RequireErrorListenerEventEmitter extends EventEmitte
1111
super(...args);
1212
const ctorCallSite = new Error('EventEmitter must add an error listener synchronously');
1313
ctorCallSite.stack;
14-
process.nextTick(() => {
14+
queueMicrotask(() => {
1515
const isChangeStream = this.constructor.name
1616
.toLowerCase()
1717
.includes('ChangeStream'.toLowerCase());

test/tools/utils.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,9 +165,9 @@ export const sleep = promisify(setTimeout);
165165

166166
/**
167167
* If you are using sinon fake timers, it can end up blocking queued IO from running
168-
* awaiting a nextTick call will allow the event loop to process Networking/FS callbacks
168+
* awaiting a setTimeout call will allow the event loop to process Networking/FS callbacks
169169
*/
170-
export const processTick = () => new Promise(resolve => process.nextTick(resolve));
170+
export const processTick = () => new Promise(resolve => setTimeout(resolve, 0));
171171

172172
export function getIndicesOfAuthInUrl(connectionString: string | string[]) {
173173
const doubleSlashIndex = connectionString.indexOf('//');

test/unit/assorted/polling_srv_records_for_mongos_discovery.prose.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ describe('Polling Srv Records for Mongos Discovery', () => {
132132
expect(topology.description).to.have.property('type', TopologyType.Sharded);
133133
const servers = Array.from(topology.description.servers.keys());
134134
expect(servers).to.deep.equal(srvAddresses(recordSets[0]));
135-
process.nextTick(() => srvPoller.trigger(recordSets[1]));
135+
queueMicrotask(() => srvPoller.trigger(recordSets[1]));
136136

137137
await once(topology, 'topologyDescriptionChanged');
138138

@@ -298,7 +298,7 @@ describe('Polling Srv Records for Mongos Discovery', () => {
298298
const callback = args[args.length - 1] as (err: null, address: string, family: 4) => void;
299299

300300
if (hostname.includes('test.mock.test.build.10gen.cc')) {
301-
return process.nextTick(callback, null, '127.0.0.1', 4);
301+
return queueMicrotask(() => callback(null, '127.0.0.1', 4));
302302
}
303303

304304
const { wrappedMethod: lookup } = lookupStub;

0 commit comments

Comments
 (0)