Skip to content

Commit 0297ff1

Browse files
Merge branch 'main' into DRIVERS-3344
2 parents 8fa05c6 + 4cb2b87 commit 0297ff1

31 files changed

Lines changed: 907 additions & 448 deletions

src/cmap/connect.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ import {
3535
/** @public */
3636
export type Stream = Socket | TLSSocket;
3737

38+
function applyBackpressureLabels(error: MongoError) {
39+
error.addErrorLabel(MongoErrorLabel.SystemOverloadedError);
40+
error.addErrorLabel(MongoErrorLabel.RetryableError);
41+
}
42+
3843
export async function connect(options: ConnectionOptions): Promise<Connection> {
3944
let connection: Connection | null = null;
4045
try {
@@ -103,6 +108,8 @@ export async function performInitialHandshake(
103108
const authContext = new AuthContext(conn, credentials, options);
104109
conn.authContext = authContext;
105110

111+
// If we encounter an error preparing the handshake document, do NOT apply backpressure labels. Errors
112+
// encountered building the handshake document are all client-side, and do not indicate an overloaded server.
106113
const handshakeDoc = await prepareHandshakeDocument(authContext);
107114

108115
// @ts-expect-error: TODO(NODE-5141): The options need to be filtered properly, Connection options differ from Command options
@@ -163,12 +170,15 @@ export async function performInitialHandshake(
163170
try {
164171
await provider.auth(authContext);
165172
} catch (error) {
173+
// NOTE: If we encounter an error authenticating a connection, do NOT apply backpressure labels.
174+
166175
if (error instanceof MongoError) {
167176
error.addErrorLabel(MongoErrorLabel.HandshakeError);
168177
if (needsRetryableWriteLabel(error, response.maxWireVersion, conn.description.type)) {
169178
error.addErrorLabel(MongoErrorLabel.RetryableWriteError);
170179
}
171180
}
181+
172182
throw error;
173183
}
174184
}
@@ -189,6 +199,11 @@ export async function performInitialHandshake(
189199
if (error instanceof MongoError) {
190200
error.addErrorLabel(MongoErrorLabel.HandshakeError);
191201
}
202+
// If we encounter a network error executing the initial handshake, apply backpressure labels.
203+
if (error instanceof MongoNetworkError) {
204+
applyBackpressureLabels(error);
205+
}
206+
192207
throw error;
193208
}
194209
}
@@ -424,6 +439,10 @@ export async function makeSocket(options: MakeConnectionOptions): Promise<Stream
424439
socket = await connectedSocket;
425440
return socket;
426441
} catch (error) {
442+
// If we encounter an error while establishing a socket, apply the backpressure labels to it. We cannot
443+
// differentiate between DNS, TLS errors and network errors without refactoring our connection establishment to
444+
// handle all three steps separately.
445+
applyBackpressureLabels(error);
427446
socket.destroy();
428447
throw error;
429448
} finally {

src/cmap/connection_pool.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
231231
this.mongoLogger = this.server.topology.client?.mongoLogger;
232232
this.component = 'connection';
233233

234-
process.nextTick(() => {
234+
queueMicrotask(() => {
235235
this.emitAndLog(ConnectionPool.CONNECTION_POOL_CREATED, new ConnectionPoolCreatedEvent(this));
236236
});
237237
}
@@ -342,7 +342,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
342342
});
343343

344344
this.waitQueue.push(waitQueueMember);
345-
process.nextTick(() => this.processWaitQueue());
345+
queueMicrotask(() => this.processWaitQueue());
346346

347347
try {
348348
timeout?.throwIfExpired();
@@ -405,7 +405,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
405405
this.destroyConnection(connection, reason);
406406
}
407407

408-
process.nextTick(() => this.processWaitQueue());
408+
queueMicrotask(() => this.processWaitQueue());
409409
}
410410

411411
/**
@@ -461,7 +461,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
461461
}
462462

463463
if (interruptInUseConnections) {
464-
process.nextTick(() => this.interruptInUseConnections(oldGeneration));
464+
queueMicrotask(() => this.interruptInUseConnections(oldGeneration));
465465
}
466466

467467
this.processWaitQueue();
@@ -702,7 +702,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
702702
this.createConnection((err, connection) => {
703703
if (!err && connection) {
704704
this.connections.push(connection);
705-
process.nextTick(() => this.processWaitQueue());
705+
queueMicrotask(() => this.processWaitQueue());
706706
}
707707
if (this.poolState === PoolState.ready) {
708708
clearTimeout(this.minPoolSizeTimer);
@@ -809,7 +809,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
809809
waitQueueMember.resolve(connection);
810810
}
811811
}
812-
process.nextTick(() => this.processWaitQueue());
812+
queueMicrotask(() => this.processWaitQueue());
813813
});
814814
}
815815
this.processingWaitQueue = false;

src/error.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,9 @@ export const MongoErrorLabel = Object.freeze({
9999
ResetPool: 'ResetPool',
100100
PoolRequestedRetry: 'PoolRequestedRetry',
101101
InterruptInUseConnections: 'InterruptInUseConnections',
102-
NoWritesPerformed: 'NoWritesPerformed'
102+
NoWritesPerformed: 'NoWritesPerformed',
103+
RetryableError: 'RetryableError',
104+
SystemOverloadedError: 'SystemOverloadedError'
103105
} as const);
104106

105107
/** @public */

src/gridfs/upload.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ export class GridFSBucketWriteStream extends Writable {
165165
}
166166
);
167167
} else {
168-
return process.nextTick(callback);
168+
return queueMicrotask(callback);
169169
}
170170
}
171171

@@ -188,7 +188,7 @@ export class GridFSBucketWriteStream extends Writable {
188188
/** @internal */
189189
override _final(callback: (error?: Error | null) => void): void {
190190
if (this.state.streamEnd) {
191-
return process.nextTick(callback);
191+
return queueMicrotask(callback);
192192
}
193193
this.state.streamEnd = true;
194194
writeRemnant(this, callback);
@@ -220,11 +220,11 @@ export class GridFSBucketWriteStream extends Writable {
220220

221221
function handleError(stream: GridFSBucketWriteStream, error: Error, callback: Callback): void {
222222
if (stream.state.errored) {
223-
process.nextTick(callback);
223+
queueMicrotask(callback);
224224
return;
225225
}
226226
stream.state.errored = true;
227-
process.nextTick(callback, error);
227+
queueMicrotask(() => callback(error));
228228
}
229229

230230
function createChunkDoc(filesId: ObjectId, n: number, data: Buffer): GridFSChunk {
@@ -283,7 +283,7 @@ async function checkChunksIndex(stream: GridFSBucketWriteStream): Promise<void>
283283

284284
function checkDone(stream: GridFSBucketWriteStream, callback: Callback): void {
285285
if (stream.done) {
286-
return process.nextTick(callback);
286+
return queueMicrotask(callback);
287287
}
288288

289289
if (stream.state.streamEnd && stream.state.outstandingRequests === 0 && !stream.state.errored) {
@@ -327,7 +327,7 @@ function checkDone(stream: GridFSBucketWriteStream, callback: Callback): void {
327327
return;
328328
}
329329

330-
process.nextTick(callback);
330+
queueMicrotask(callback);
331331
}
332332

333333
async function checkIndexes(stream: GridFSBucketWriteStream): Promise<void> {
@@ -425,7 +425,7 @@ function doWrite(
425425
if (stream.pos + inputBuf.length < stream.chunkSizeBytes) {
426426
inputBuf.copy(stream.bufToStore, stream.pos);
427427
stream.pos += inputBuf.length;
428-
process.nextTick(callback);
428+
queueMicrotask(callback);
429429
return;
430430
}
431431

@@ -530,7 +530,7 @@ function writeRemnant(stream: GridFSBucketWriteStream, callback: Callback): void
530530

531531
function isAborted(stream: GridFSBucketWriteStream, callback: Callback<void>): boolean {
532532
if (stream.state.aborted) {
533-
process.nextTick(callback, new MongoAPIError('Stream has been aborted'));
533+
queueMicrotask(() => callback(new MongoAPIError('Stream has been aborted')));
534534
return true;
535535
}
536536
return false;

src/sdam/monitor.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,7 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
426426
function monitorServer(monitor: Monitor) {
427427
return (callback: Callback) => {
428428
if (monitor.s.state === STATE_MONITORING) {
429-
process.nextTick(callback);
429+
queueMicrotask(callback);
430430
return;
431431
}
432432
stateTransition(monitor, STATE_MONITORING);

src/sdam/server.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,8 @@ export class Server extends TypedEventEmitter<ServerEvents> {
400400
error instanceof MongoNetworkError && !(error instanceof MongoNetworkTimeoutError);
401401
const isNetworkTimeoutBeforeHandshakeError =
402402
error instanceof MongoNetworkError && error.beforeHandshake;
403-
const isAuthHandshakeError = error.hasErrorLabel(MongoErrorLabel.HandshakeError);
403+
const isAuthOrEstablishmentHandshakeError = error.hasErrorLabel(MongoErrorLabel.HandshakeError);
404+
const isSystemOverloadError = error.hasErrorLabel(MongoErrorLabel.SystemOverloadedError);
404405

405406
// Perhaps questionable and divergent from the spec, but considering MongoParseErrors like state change errors was legacy behavior.
406407
if (isStateChangeError(error) || error instanceof MongoParseError) {
@@ -414,7 +415,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
414415
error.addErrorLabel(MongoErrorLabel.ResetPool);
415416
}
416417
markServerUnknown(this, error);
417-
process.nextTick(() => this.requestCheck());
418+
queueMicrotask(() => this.requestCheck());
418419
return;
419420
}
420421

@@ -424,8 +425,12 @@ export class Server extends TypedEventEmitter<ServerEvents> {
424425
} else if (
425426
isNetworkNonTimeoutError ||
426427
isNetworkTimeoutBeforeHandshakeError ||
427-
isAuthHandshakeError
428+
isAuthOrEstablishmentHandshakeError
428429
) {
430+
// Do NOT clear the pool if we encounter a system overloaded error.
431+
if (isSystemOverloadError) {
432+
return;
433+
}
429434
// from the SDAM spec: The driver MUST synchronize clearing the pool with updating the topology.
430435
// In load balanced mode: there is no monitoring, so there is no topology to update. We simply clear the pool.
431436
// For other topologies: the `ResetPool` label instructs the topology to clear the server's pool in `updateServer()`.

src/sdam/topology.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1079,7 +1079,7 @@ function processWaitQueue(topology: Topology) {
10791079
if (topology.waitQueue.length > 0) {
10801080
// ensure all server monitors attempt monitoring soon
10811081
for (const [, server] of topology.s.servers) {
1082-
process.nextTick(function scheduleServerCheck() {
1082+
queueMicrotask(function scheduleServerCheck() {
10831083
return server.requestCheck();
10841084
});
10851085
}

test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.prose.test.ts

Lines changed: 95 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
11
import { expect } from 'chai';
22
import { once } from 'events';
33

4-
import { type MongoClient } from '../../../src';
4+
import {
5+
type ConnectionCheckOutFailedEvent,
6+
type ConnectionPoolClearedEvent,
7+
type MongoClient
8+
} from '../../../src';
59
import {
610
CONNECTION_POOL_CLEARED,
711
CONNECTION_POOL_READY,
812
SERVER_HEARTBEAT_FAILED,
913
SERVER_HEARTBEAT_SUCCEEDED
1014
} from '../../../src/constants';
15+
import { sleep } from '../../tools/utils';
1116

1217
describe('Server Discovery and Monitoring Prose Tests', function () {
1318
context('Monitors sleep at least minHeartbeatFrequencyMS between checks', function () {
@@ -187,4 +192,93 @@ describe('Server Discovery and Monitoring Prose Tests', function () {
187192
}
188193
});
189194
});
195+
196+
context('Connection Pool Backpressure', function () {
197+
let client: MongoClient;
198+
const checkoutFailedEvents: Array<ConnectionCheckOutFailedEvent> = [];
199+
const poolClearedEvents: Array<ConnectionPoolClearedEvent> = [];
200+
201+
beforeEach(async function () {
202+
// 1. Create a test client that listens to CMAP events, with maxConnecting=100.
203+
client = this.configuration.newClient({}, { maxConnecting: 100 });
204+
205+
client.on('connectionCheckOutFailed', e => checkoutFailedEvents.push(e));
206+
client.on('connectionPoolCleared', e => poolClearedEvents.push(e));
207+
208+
await client.connect();
209+
210+
// 2. Run the following commands to set up the rate limiter.
211+
// ```python
212+
// client.admin.command("setParameter", 1, ingressConnectionEstablishmentRateLimiterEnabled=True)
213+
// client.admin.command("setParameter", 1, ingressConnectionEstablishmentRatePerSec=20)
214+
// client.admin.command("setParameter", 1, ingressConnectionEstablishmentBurstCapacitySecs=1)
215+
// client.admin.command("setParameter", 1, ingressConnectionEstablishmentMaxQueueDepth=1)
216+
// ```
217+
const admin = client.db('admin').admin();
218+
await admin.command({
219+
setParameter: 1,
220+
ingressConnectionEstablishmentRateLimiterEnabled: true
221+
});
222+
await admin.command({
223+
setParameter: 1,
224+
ingressConnectionEstablishmentRatePerSec: 20
225+
});
226+
await admin.command({
227+
setParameter: 1,
228+
ingressConnectionEstablishmentBurstCapacitySecs: 1
229+
});
230+
await admin.command({
231+
setParameter: 1,
232+
ingressConnectionEstablishmentMaxQueueDepth: 1
233+
});
234+
235+
// 3. Add a document to the test collection so that the sleep operations will actually block:
236+
// `client.test.test.insert_one({})`.
237+
await client.db('test').collection('test').insertOne({});
238+
});
239+
240+
afterEach(async function () {
241+
// 7. Sleep for 1 second to clear the rate limiter.
242+
await sleep(1000);
243+
244+
// 8. Ensure that the following command runs at test teardown even if the test fails.
245+
// `client.admin("setParameter", 1, ingressConnectionEstablishmentRateLimiterEnabled=False)`.
246+
const admin = client.db('admin').admin();
247+
await admin.command({
248+
setParameter: 1,
249+
ingressConnectionEstablishmentRateLimiterEnabled: false
250+
});
251+
252+
await client.close();
253+
});
254+
255+
it(
256+
'does not clear the pool when connections are closed due to connection storms',
257+
{
258+
requires: {
259+
// This test requires MongoDB 7.0+.
260+
mongodb: '>=7.0' // rate limiting added in 7.0
261+
}
262+
},
263+
async function () {
264+
// 4. Run the following find command on the collection in 100 parallel threads/coroutines. Run these commands concurrently
265+
// but block on their completion, and ignore errors raised by the command.
266+
// `client.test.test.find_one({"$where": "function() { sleep(2000); return true; }})`
267+
await Promise.allSettled(
268+
Array.from({ length: 100 }).map(() =>
269+
client
270+
.db('test')
271+
.collection('test')
272+
.findOne({ $where: 'function() { sleep(2000); return true; }' })
273+
)
274+
);
275+
276+
// 5. Assert that at least 10 `ConnectionCheckOutFailedEvent` occurred.
277+
expect(checkoutFailedEvents.length).to.be.at.least(10);
278+
279+
// 6. Assert that 0 `PoolClearedEvent` occurred.
280+
expect(poolClearedEvents).to.be.empty;
281+
}
282+
);
283+
});
190284
});

test/integration/sessions/sessions.prose.test.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,18 @@ describe('Sessions Prose Tests', () => {
107107
expect(allResults).to.have.lengthOf(operations.length);
108108
expect(events).to.have.lengthOf(operations.length);
109109

110-
// This is a guarantee in node, unless you are performing a transaction (which is not being done in this test)
111-
expect(new Set(events.map(ev => ev.command.lsid.id.toString('hex')))).to.have.lengthOf(1);
110+
// Previous version of this test was too strict: we were expecting that only one session be used for this scenario.
111+
// That was possible at the time because the operations were simple enough and the server fast enough that the operations would complete serially.
112+
//
113+
// However, with a more complex operation bulkWrite (like `Array.from({ length: 100_000 }).map(() => ({ insertOne: { document: { a: 1 } } })),`),
114+
// it's entirely possible and expected that bulkWrite would introduce a second session due to the time it takes to process all the inserts.
115+
//
116+
// The important bit of the test is that the number of sessions is less than the number of concurrent operations, so now instead of expecting exactly 1 session,
117+
// we just expect less than operations.length - 1 sessions.
118+
//
119+
const uniqueSessionIds = new Set(events.map(ev => ev.command.lsid.id.toString('hex')));
120+
const expectedMaxSessions = operations.length - 1;
121+
expect(uniqueSessionIds).to.have.length.lessThan(expectedMaxSessions);
112122
});
113123
});
114124

0 commit comments

Comments
 (0)