Skip to content

Commit 65352ac

Browse files
authored
Merge branch 'main' into NODE-7298
2 parents a65c940 + e70fdc9 commit 65352ac

106 files changed

Lines changed: 3725 additions & 738 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

src/change_stream.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import { MongoClient } from './mongo_client';
1818
import { type InferIdType, TypedEventEmitter } from './mongo_types';
1919
import type { AggregateOptions } from './operations/aggregate';
2020
import type { OperationParent } from './operations/command';
21+
import { DeprioritizedServers } from './sdam/server_selection';
2122
import type { ServerSessionId } from './sessions';
2223
import { CSOTTimeoutContext, type TimeoutContext } from './timeout';
2324
import { type AnyOptions, getTopology, type MongoDBNamespace, squashError } from './utils';
@@ -1073,7 +1074,8 @@ export class ChangeStream<
10731074
try {
10741075
await topology.selectServer(this.cursor.readPreference, {
10751076
operationName: 'reconnect topology in change stream',
1076-
timeoutContext: this.timeoutContext
1077+
timeoutContext: this.timeoutContext,
1078+
deprioritizedServers: new DeprioritizedServers()
10771079
});
10781080
this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions);
10791081
} catch {

src/cmap/auth/gssapi.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import * as dns from 'dns';
2+
import * as os from 'os';
23

34
import { getKerberos, type Kerberos, type KerberosClient } from '../../deps';
45
import { MongoInvalidArgumentError, MongoMissingCredentialsError } from '../../error';
@@ -97,7 +98,7 @@ async function makeKerberosClient(authContext: AuthContext): Promise<KerberosCli
9798
}
9899

99100
const spnHost = mechanismProperties.SERVICE_HOST ?? host;
100-
let spn = `${serviceName}${process.platform === 'win32' ? '/' : '@'}${spnHost}`;
101+
let spn = `${serviceName}${os.platform() === 'win32' ? '/' : '@'}${spnHost}`;
101102
if ('SERVICE_REALM' in mechanismProperties) {
102103
spn = `${spn}@${mechanismProperties.SERVICE_REALM}`;
103104
}

src/cmap/connect.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ import {
3535
/** @public */
3636
export type Stream = Socket | TLSSocket;
3737

38+
function applyBackpressureLabels(error: MongoError) {
39+
error.addErrorLabel(MongoErrorLabel.SystemOverloadedError);
40+
error.addErrorLabel(MongoErrorLabel.RetryableError);
41+
}
42+
3843
export async function connect(options: ConnectionOptions): Promise<Connection> {
3944
let connection: Connection | null = null;
4045
try {
@@ -103,6 +108,8 @@ export async function performInitialHandshake(
103108
const authContext = new AuthContext(conn, credentials, options);
104109
conn.authContext = authContext;
105110

111+
// If we encounter an error preparing the handshake document, do NOT apply backpressure labels. Errors
112+
// encountered building the handshake document are all client-side, and do not indicate an overloaded server.
106113
const handshakeDoc = await prepareHandshakeDocument(authContext);
107114

108115
// @ts-expect-error: TODO(NODE-5141): The options need to be filtered properly, Connection options differ from Command options
@@ -163,12 +170,15 @@ export async function performInitialHandshake(
163170
try {
164171
await provider.auth(authContext);
165172
} catch (error) {
173+
// NOTE: If we encounter an error authenticating a connection, do NOT apply backpressure labels.
174+
166175
if (error instanceof MongoError) {
167176
error.addErrorLabel(MongoErrorLabel.HandshakeError);
168177
if (needsRetryableWriteLabel(error, response.maxWireVersion, conn.description.type)) {
169178
error.addErrorLabel(MongoErrorLabel.RetryableWriteError);
170179
}
171180
}
181+
172182
throw error;
173183
}
174184
}
@@ -189,6 +199,11 @@ export async function performInitialHandshake(
189199
if (error instanceof MongoError) {
190200
error.addErrorLabel(MongoErrorLabel.HandshakeError);
191201
}
202+
// If we encounter a network error executing the initial handshake, apply backpressure labels.
203+
if (error instanceof MongoNetworkError) {
204+
applyBackpressureLabels(error);
205+
}
206+
192207
throw error;
193208
}
194209
}
@@ -424,6 +439,10 @@ export async function makeSocket(options: MakeConnectionOptions): Promise<Stream
424439
socket = await connectedSocket;
425440
return socket;
426441
} catch (error) {
442+
// If we encounter an error while establishing a socket, apply the backpressure labels to it. We cannot
443+
// differentiate between DNS, TLS errors and network errors without refactoring our connection establishment to
444+
// handle all three steps separately.
445+
applyBackpressureLabels(error);
427446
socket.destroy();
428447
throw error;
429448
} finally {

src/cmap/connection_pool.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
231231
this.mongoLogger = this.server.topology.client?.mongoLogger;
232232
this.component = 'connection';
233233

234-
process.nextTick(() => {
234+
queueMicrotask(() => {
235235
this.emitAndLog(ConnectionPool.CONNECTION_POOL_CREATED, new ConnectionPoolCreatedEvent(this));
236236
});
237237
}
@@ -342,7 +342,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
342342
});
343343

344344
this.waitQueue.push(waitQueueMember);
345-
process.nextTick(() => this.processWaitQueue());
345+
queueMicrotask(() => this.processWaitQueue());
346346

347347
try {
348348
timeout?.throwIfExpired();
@@ -405,7 +405,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
405405
this.destroyConnection(connection, reason);
406406
}
407407

408-
process.nextTick(() => this.processWaitQueue());
408+
queueMicrotask(() => this.processWaitQueue());
409409
}
410410

411411
/**
@@ -461,7 +461,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
461461
}
462462

463463
if (interruptInUseConnections) {
464-
process.nextTick(() => this.interruptInUseConnections(oldGeneration));
464+
queueMicrotask(() => this.interruptInUseConnections(oldGeneration));
465465
}
466466

467467
this.processWaitQueue();
@@ -702,7 +702,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
702702
this.createConnection((err, connection) => {
703703
if (!err && connection) {
704704
this.connections.push(connection);
705-
process.nextTick(() => this.processWaitQueue());
705+
queueMicrotask(() => this.processWaitQueue());
706706
}
707707
if (this.poolState === PoolState.ready) {
708708
clearTimeout(this.minPoolSizeTimer);
@@ -809,7 +809,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
809809
waitQueueMember.resolve(connection);
810810
}
811811
}
812-
process.nextTick(() => this.processWaitQueue());
812+
queueMicrotask(() => this.processWaitQueue());
813813
});
814814
}
815815
this.processingWaitQueue = false;

src/cmap/handshake/client_metadata.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,8 @@ export async function makeClientMetadata(
157157

158158
// Note: order matters, os.type is last so it will be removed last if we're at maxSize
159159
const osInfo = new Map()
160-
.set('name', process.platform)
161-
.set('architecture', process.arch)
160+
.set('name', os.platform())
161+
.set('architecture', os.arch())
162162
.set('version', os.release())
163163
.set('type', os.type());
164164

src/error.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,9 @@ export const MongoErrorLabel = Object.freeze({
9999
ResetPool: 'ResetPool',
100100
PoolRequestedRetry: 'PoolRequestedRetry',
101101
InterruptInUseConnections: 'InterruptInUseConnections',
102-
NoWritesPerformed: 'NoWritesPerformed'
102+
NoWritesPerformed: 'NoWritesPerformed',
103+
RetryableError: 'RetryableError',
104+
SystemOverloadedError: 'SystemOverloadedError'
103105
} as const);
104106

105107
/** @public */

src/gridfs/upload.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ export class GridFSBucketWriteStream extends Writable {
165165
}
166166
);
167167
} else {
168-
return process.nextTick(callback);
168+
return queueMicrotask(callback);
169169
}
170170
}
171171

@@ -188,7 +188,7 @@ export class GridFSBucketWriteStream extends Writable {
188188
/** @internal */
189189
override _final(callback: (error?: Error | null) => void): void {
190190
if (this.state.streamEnd) {
191-
return process.nextTick(callback);
191+
return queueMicrotask(callback);
192192
}
193193
this.state.streamEnd = true;
194194
writeRemnant(this, callback);
@@ -220,11 +220,11 @@ export class GridFSBucketWriteStream extends Writable {
220220

221221
function handleError(stream: GridFSBucketWriteStream, error: Error, callback: Callback): void {
222222
if (stream.state.errored) {
223-
process.nextTick(callback);
223+
queueMicrotask(callback);
224224
return;
225225
}
226226
stream.state.errored = true;
227-
process.nextTick(callback, error);
227+
queueMicrotask(() => callback(error));
228228
}
229229

230230
function createChunkDoc(filesId: ObjectId, n: number, data: Buffer): GridFSChunk {
@@ -283,7 +283,7 @@ async function checkChunksIndex(stream: GridFSBucketWriteStream): Promise<void>
283283

284284
function checkDone(stream: GridFSBucketWriteStream, callback: Callback): void {
285285
if (stream.done) {
286-
return process.nextTick(callback);
286+
return queueMicrotask(callback);
287287
}
288288

289289
if (stream.state.streamEnd && stream.state.outstandingRequests === 0 && !stream.state.errored) {
@@ -327,7 +327,7 @@ function checkDone(stream: GridFSBucketWriteStream, callback: Callback): void {
327327
return;
328328
}
329329

330-
process.nextTick(callback);
330+
queueMicrotask(callback);
331331
}
332332

333333
async function checkIndexes(stream: GridFSBucketWriteStream): Promise<void> {
@@ -425,7 +425,7 @@ function doWrite(
425425
if (stream.pos + inputBuf.length < stream.chunkSizeBytes) {
426426
inputBuf.copy(stream.bufToStore, stream.pos);
427427
stream.pos += inputBuf.length;
428-
process.nextTick(callback);
428+
queueMicrotask(callback);
429429
return;
430430
}
431431

@@ -530,7 +530,7 @@ function writeRemnant(stream: GridFSBucketWriteStream, callback: Callback): void
530530

531531
function isAborted(stream: GridFSBucketWriteStream, callback: Callback<void>): boolean {
532532
if (stream.state.aborted) {
533-
process.nextTick(callback, new MongoAPIError('Stream has been aborted'));
533+
queueMicrotask(() => callback(new MongoAPIError('Stream has been aborted')));
534534
return true;
535535
}
536536
return false;

src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -588,7 +588,7 @@ export type {
588588
TagSet,
589589
TopologyVersion
590590
} from './sdam/server_description';
591-
export type { ServerSelector } from './sdam/server_selection';
591+
export type { DeprioritizedServers, ServerSelector } from './sdam/server_selection';
592592
export type { SrvPoller, SrvPollerEvents, SrvPollerOptions } from './sdam/srv_polling';
593593
export type {
594594
ConnectOptions,

src/mongo_client.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ import type { ReadConcern, ReadConcernLevel, ReadConcernLike } from './read_conc
4848
import { ReadPreference, type ReadPreferenceMode } from './read_preference';
4949
import type { ServerMonitoringMode } from './sdam/monitor';
5050
import type { TagSet } from './sdam/server_description';
51-
import { readPreferenceServerSelector } from './sdam/server_selection';
51+
import { DeprioritizedServers, readPreferenceServerSelector } from './sdam/server_selection';
5252
import type { SrvPoller } from './sdam/srv_polling';
5353
import { Topology, type TopologyEvents } from './sdam/topology';
5454
import { ClientSession, type ClientSessionOptions, ServerSessionPool } from './sessions';
@@ -789,7 +789,7 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
789789
// to avoid the server selection timeout.
790790
const selector = readPreferenceServerSelector(ReadPreference.primaryPreferred);
791791
const serverDescriptions = Array.from(topologyDescription.servers.values());
792-
const servers = selector(topologyDescription, serverDescriptions);
792+
const servers = selector(topologyDescription, serverDescriptions, new DeprioritizedServers());
793793
if (servers.length !== 0) {
794794
const endSessions = Array.from(client.s.sessionPool.sessions, ({ id }) => id);
795795
if (endSessions.length !== 0) {

0 commit comments

Comments
 (0)