Skip to content

Commit 24daacd

Browse files
committed
using bson for buffer calls
1 parent 0f46db8 commit 24daacd

8 files changed

Lines changed: 133 additions & 72 deletions

File tree

src/bson.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,13 @@ export {
3232
/** @internal */
3333
export type BSONElement = BSON.OnDemand['BSONElement'];
3434

35+
/** @internal */
36+
export function toLocalBufferType(this: void, buffer: Buffer | Uint8Array): Buffer {
37+
return Buffer.isBuffer(buffer)
38+
? buffer
39+
: Buffer.from(buffer.buffer, buffer.byteOffset, buffer.byteLength);
40+
}
41+
3542
export function parseToElementsToArray(bytes: Uint8Array, offset?: number): BSONElement[] {
3643
const res = BSON.onDemand.parseToElements(bytes, offset);
3744
return Array.isArray(res) ? res : [...res];
@@ -41,6 +48,34 @@ export const getInt32LE = BSON.onDemand.NumberUtils.getInt32LE;
4148
export const getFloat64LE = BSON.onDemand.NumberUtils.getFloat64LE;
4249
export const getBigInt64LE = BSON.onDemand.NumberUtils.getBigInt64LE;
4350
export const toUTF8 = BSON.onDemand.ByteUtils.toUTF8;
51+
export const writeInt32LE = BSON.onDemand.NumberUtils.setInt32LE;
52+
53+
export const fromUTF8 = (text: string) => toLocalBufferType(BSON.onDemand.ByteUtils.fromUTF8(text));
54+
55+
export const concatBuffers = (list: Buffer[] | Uint8Array[]) => {
56+
return toLocalBufferType(BSON.onDemand.ByteUtils.concat(list));
57+
};
58+
export const allocateBuffer = (size: number) =>
59+
toLocalBufferType(BSON.onDemand.ByteUtils.allocate(size));
60+
export const allocateUnsafeBuffer = (size: number) =>
61+
toLocalBufferType(BSON.onDemand.ByteUtils.allocateUnsafe(size));
62+
63+
export const utf8ByteLength = BSON.onDemand.ByteUtils.utf8ByteLength;
64+
export const toBase64 = BSON.onDemand.ByteUtils.toBase64;
65+
export const encodeUTF8Into = BSON.onDemand.ByteUtils.encodeUTF8Into;
66+
67+
const validateBufferInputs = (buffer: Uint8Array, offset: number, length: number) => {
68+
if (offset < 0 || offset + length > buffer.length) {
69+
throw new RangeError(
70+
`Attempt to access memory outside buffer bounds: buffer length: ${buffer.length}, offset: ${offset}, length: ${length}`
71+
);
72+
}
73+
};
74+
75+
export const readInt32LE = (buffer: Uint8Array, offset: number): number => {
76+
validateBufferInputs(buffer, offset, 4);
77+
return getInt32LE(buffer, offset);
78+
};
4479

4580
/**
4681
* BSON Serialization options.

src/cmap/auth/scram.ts

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,14 @@
11
import { saslprep } from '@mongodb-js/saslprep';
22
import * as crypto from 'crypto';
33

4-
import { Binary, type Document } from '../../bson';
4+
import {
5+
allocateBuffer,
6+
Binary,
7+
concatBuffers,
8+
type Document,
9+
fromUTF8,
10+
toBase64
11+
} from '../../bson';
512
import {
613
MongoInvalidArgumentError,
714
MongoMissingCredentialsError,
@@ -68,11 +75,11 @@ function cleanUsername(username: string) {
6875
function clientFirstMessageBare(username: string, nonce: Buffer) {
6976
// NOTE: This is done b/c Javascript uses UTF-16, but the server is hashing in UTF-8.
7077
// Since the username is not sasl-prep-d, we need to do this here.
71-
return Buffer.concat([
72-
Buffer.from('n=', 'utf8'),
73-
Buffer.from(username, 'utf8'),
74-
Buffer.from(',r=', 'utf8'),
75-
Buffer.from(nonce.toString('base64'), 'utf8')
78+
return concatBuffers([
79+
fromUTF8('n='),
80+
fromUTF8(username),
81+
fromUTF8(',r='),
82+
fromUTF8(toBase64(nonce))
7683
]);
7784
}
7885

@@ -91,7 +98,7 @@ function makeFirstMessage(
9198
saslStart: 1,
9299
mechanism,
93100
payload: new Binary(
94-
Buffer.concat([Buffer.from('n,,', 'utf8'), clientFirstMessageBare(username, nonce)])
101+
concatBuffers([Buffer.from('n,,', 'utf8'), clientFirstMessageBare(username, nonce)])
95102
),
96103
autoAuthorize: 1,
97104
options: { skipEmptyExchange: true }
@@ -199,7 +206,7 @@ async function continueScramConversation(
199206
const retrySaslContinueCmd = {
200207
saslContinue: 1,
201208
conversationId: r.conversationId,
202-
payload: Buffer.alloc(0)
209+
payload: allocateBuffer(0)
203210
};
204211

205212
await connection.command(ns(`${db}.$cmd`), retrySaslContinueCmd, undefined);

src/cmap/commands.ts

Lines changed: 48 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,15 @@
1-
import type { BSONSerializeOptions, Document, Long } from '../bson';
2-
import * as BSON from '../bson';
1+
import {
2+
allocateBuffer,
3+
allocateUnsafeBuffer,
4+
BSON,
5+
type BSONSerializeOptions,
6+
concatBuffers,
7+
type Document,
8+
type Long,
9+
readInt32LE,
10+
utf8ByteLength,
11+
writeInt32LE
12+
} from '../bson';
313
import { MongoInvalidArgumentError, MongoRuntimeError } from '../error';
414
import { type ReadPreference } from '../read_preference';
515
import type { ClientSession } from '../sessions';
@@ -30,7 +40,7 @@ const QUERY_FAILURE = 2;
3040
const SHARD_CONFIG_STALE = 4;
3141
const AWAIT_CAPABLE = 8;
3242

33-
const encodeUTF8Into = BSON.BSON.onDemand.ByteUtils.encodeUTF8Into;
43+
const encodeUTF8Into = BSON.onDemand.ByteUtils.encodeUTF8Into;
3444

3545
/** @internal */
3646
export type WriteProtocolMessageType = OpQueryRequest | OpMsgRequest;
@@ -182,10 +192,10 @@ export class OpQueryRequest {
182192
if (this.batchSize !== this.numberToReturn) this.numberToReturn = this.batchSize;
183193

184194
// Allocate write protocol header buffer
185-
const header = Buffer.alloc(
195+
const header = allocateBuffer(
186196
4 * 4 + // Header
187197
4 + // Flags
188-
Buffer.byteLength(this.ns) +
198+
utf8ByteLength(this.ns) +
189199
1 + // namespace
190200
4 + // numberToSkip
191201
4 // numberToReturn
@@ -256,7 +266,7 @@ export class OpQueryRequest {
256266
index = index + 4;
257267

258268
// Write collection name
259-
index = index + header.write(this.ns, index, 'utf8') + 1;
269+
index = index + encodeUTF8Into(header, this.ns, index) + 1;
260270
header[index - 1] = 0;
261271

262272
// Write header information flags numberToSkip
@@ -364,10 +374,10 @@ export class OpReply {
364374
this.index = 20;
365375

366376
// Read the message body
367-
this.responseFlags = this.data.readInt32LE(0);
368-
this.cursorId = new BSON.Long(this.data.readInt32LE(4), this.data.readInt32LE(8));
369-
this.startingFrom = this.data.readInt32LE(12);
370-
this.numberReturned = this.data.readInt32LE(16);
377+
this.responseFlags = readInt32LE(this.data, 0);
378+
this.cursorId = new BSON.Long(readInt32LE(this.data, 4), readInt32LE(this.data, 8));
379+
this.startingFrom = readInt32LE(this.data, 12);
380+
this.numberReturned = readInt32LE(this.data, 16);
371381

372382
if (this.numberReturned < 0 || this.numberReturned > 2 ** 32 - 1) {
373383
throw new RangeError(
@@ -446,7 +456,7 @@ export class DocumentSequence {
446456
this.serializedDocumentsLength = 0;
447457
// Document sequences starts with type 1 at the first byte.
448458
// Field strings must always be UTF-8.
449-
const buffer = Buffer.allocUnsafe(1 + 4 + this.field.length + 1);
459+
const buffer = allocateUnsafeBuffer(1 + 4 + this.field.length + 1);
450460
buffer[0] = 1;
451461
// Third part is the field name at offset 5 with trailing null byte.
452462
encodeUTF8Into(buffer, `${this.field}\0`, 5);
@@ -482,7 +492,7 @@ export class DocumentSequence {
482492
* @returns The section bytes.
483493
*/
484494
toBin(): Uint8Array {
485-
return Buffer.concat(this.chunks);
495+
return concatBuffers(this.chunks);
486496
}
487497
}
488498

@@ -547,7 +557,7 @@ export class OpMsgRequest {
547557
flags |= OPTS_EXHAUST_ALLOWED;
548558
}
549559

550-
const header = Buffer.alloc(
560+
const header = allocateBuffer(
551561
4 * 4 + // Header
552562
4 // Flags
553563
);
@@ -558,11 +568,11 @@ export class OpMsgRequest {
558568
const command = this.command;
559569
totalLength += this.makeSections(buffers, command);
560570

561-
header.writeInt32LE(totalLength, 0); // messageLength
562-
header.writeInt32LE(this.requestId, 4); // requestID
563-
header.writeInt32LE(0, 8); // responseTo
564-
header.writeInt32LE(OP_MSG, 12); // opCode
565-
header.writeUInt32LE(flags, 16); // flags
571+
writeInt32LE(header, 0, totalLength); // messageLength
572+
writeInt32LE(header, 4, this.requestId); // requestID
573+
writeInt32LE(header, 8, 0); // responseTo
574+
writeInt32LE(header, 12, OP_MSG); // opCode
575+
writeInt32LE(header, 16, flags); // flags
566576
return buffers;
567577
}
568578

@@ -571,7 +581,7 @@ export class OpMsgRequest {
571581
*/
572582
makeSections(buffers: Uint8Array[], document: Document): number {
573583
const sequencesBuffer = this.extractDocumentSequences(document);
574-
const payloadTypeBuffer = Buffer.allocUnsafe(1);
584+
const payloadTypeBuffer = allocateUnsafeBuffer(1);
575585
payloadTypeBuffer[0] = 0;
576586

577587
const documentBuffer = this.serializeBson(document);
@@ -606,11 +616,11 @@ export class OpMsgRequest {
606616
}
607617
}
608618
if (chunks.length > 0) {
609-
return Buffer.concat(chunks);
619+
return concatBuffers(chunks);
610620
}
611621
// If we have no document sequences we return an empty buffer for nothing to add
612622
// to the payload.
613-
return Buffer.alloc(0);
623+
return allocateBuffer(0);
614624
}
615625

616626
serializeBson(document: Document): Uint8Array {
@@ -676,7 +686,7 @@ export class OpMsgResponse {
676686
this.fromCompressed = msgHeader.fromCompressed;
677687

678688
// Read response flags
679-
this.responseFlags = msgBody.readInt32LE(0);
689+
this.responseFlags = readInt32LE(msgBody, 0);
680690
this.checksumPresent = (this.responseFlags & OPTS_CHECKSUM_PRESENT) !== 0;
681691
this.moreToCome = (this.responseFlags & OPTS_MORE_TO_COME) !== 0;
682692
this.exhaustAllowed = (this.responseFlags & OPTS_EXHAUST_ALLOWED) !== 0;
@@ -700,9 +710,9 @@ export class OpMsgResponse {
700710
this.index = 4;
701711

702712
while (this.index < this.data.length) {
703-
const payloadType = this.data.readUInt8(this.index++);
713+
const payloadType = this.data[this.index++];
704714
if (payloadType === 0) {
705-
const bsonSize = this.data.readUInt32LE(this.index);
715+
const bsonSize = readInt32LE(this.data, this.index);
706716
const bin = this.data.subarray(this.index, this.index + bsonSize);
707717

708718
this.sections.push(bin);
@@ -759,30 +769,31 @@ export class OpCompressedRequest {
759769
}
760770

761771
async toBin(): Promise<Buffer[]> {
762-
const concatenatedOriginalCommandBuffer = Buffer.concat(this.command.toBin());
772+
const concatenatedOriginalCommandBuffer = concatBuffers(this.command.toBin());
763773
// otherwise, compress the message
764774
const messageToBeCompressed = concatenatedOriginalCommandBuffer.slice(MESSAGE_HEADER_SIZE);
765775

766776
// Extract information needed for OP_COMPRESSED from the uncompressed message
767-
const originalCommandOpCode = concatenatedOriginalCommandBuffer.readInt32LE(12);
777+
const originalCommandOpCode = readInt32LE(concatenatedOriginalCommandBuffer, 12);
768778

769779
// Compress the message body
770780
const compressedMessage = await compress(this.options, messageToBeCompressed);
771781
// Create the msgHeader of OP_COMPRESSED
772-
const msgHeader = Buffer.alloc(MESSAGE_HEADER_SIZE);
773-
msgHeader.writeInt32LE(
774-
MESSAGE_HEADER_SIZE + COMPRESSION_DETAILS_SIZE + compressedMessage.length,
775-
0
782+
const msgHeader = allocateBuffer(MESSAGE_HEADER_SIZE);
783+
writeInt32LE(
784+
msgHeader,
785+
0,
786+
MESSAGE_HEADER_SIZE + COMPRESSION_DETAILS_SIZE + compressedMessage.length
776787
); // messageLength
777-
msgHeader.writeInt32LE(this.command.requestId, 4); // requestID
778-
msgHeader.writeInt32LE(0, 8); // responseTo (zero)
779-
msgHeader.writeInt32LE(OP_COMPRESSED, 12); // opCode
788+
writeInt32LE(msgHeader, 4, this.command.requestId); // requestID
789+
writeInt32LE(msgHeader, 8, 0); // responseTo (zero)
790+
writeInt32LE(msgHeader, 12, OP_COMPRESSED); // opCode
780791

781792
// Create the compression details of OP_COMPRESSED
782-
const compressionDetails = Buffer.alloc(COMPRESSION_DETAILS_SIZE);
783-
compressionDetails.writeInt32LE(originalCommandOpCode, 0); // originalOpcode
784-
compressionDetails.writeInt32LE(messageToBeCompressed.length, 4); // Size of the uncompressed compressedMessage, excluding the MsgHeader
785-
compressionDetails.writeUInt8(Compressor[this.options.agreedCompressor], 8); // compressorID
793+
const compressionDetails = allocateBuffer(COMPRESSION_DETAILS_SIZE);
794+
writeInt32LE(compressionDetails, 0, originalCommandOpCode); // originalOpcode
795+
writeInt32LE(compressionDetails, 4, messageToBeCompressed.length); // Size of the uncompressed compressedMessage, excluding the MsgHeader
796+
writeInt32LE(compressionDetails, 8, Compressor[this.options.agreedCompressor]); // compressorID
786797
return [msgHeader, compressionDetails, compressedMessage];
787798
}
788799
}

src/cmap/connection.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ import { type Readable, Transform, type TransformCallback } from 'stream';
22
import { clearTimeout, setTimeout } from 'timers';
33

44
import {
5+
BSON,
56
type BSONSerializeOptions,
7+
concatBuffers,
68
deserialize,
79
type DeserializeOptions,
810
type Document,
@@ -174,7 +176,7 @@ function streamIdentifier(stream: Stream, options: ConnectionOptions): string {
174176
return HostAddress.fromHostPort(remoteAddress, remotePort).toString();
175177
}
176178

177-
return uuidV4().toString('hex');
179+
return BSON.onDemand.ByteUtils.toHex(uuidV4());
178180
}
179181

180182
/** @internal */
@@ -204,7 +206,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
204206
private lastUseTime: number;
205207
private clusterTime: Document | null = null;
206208
private error: Error | null = null;
207-
private dataEvents: AsyncGenerator<Buffer, void, void> | null = null;
209+
private dataEvents: AsyncGenerator<Uint8Array, void, void> | null = null;
208210

209211
private readonly socketTimeoutMS: number;
210212
private readonly monitorCommands: boolean;
@@ -696,7 +698,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
696698
zlibCompressionLevel: options.zlibCompressionLevel ?? 0
697699
});
698700

699-
const buffer = Buffer.concat(await finalCommand.toBin());
701+
const buffer = concatBuffers(await finalCommand.toBin());
700702

701703
if (options.timeoutContext?.csotEnabled()) {
702704
if (
@@ -794,7 +796,7 @@ export class SizedMessageTransform extends Transform {
794796
this.connection = connection;
795797
}
796798

797-
override _transform(chunk: Buffer, encoding: unknown, callback: TransformCallback): void {
799+
override _transform(chunk: Uint8Array, encoding: unknown, callback: TransformCallback): void {
798800
if (this.connection.delayedTimeoutId != null) {
799801
clearTimeout(this.connection.delayedTimeoutId);
800802
this.connection.delayedTimeoutId = null;

src/cmap/wire_protocol/compression.ts

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import * as zlib from 'zlib';
22

3+
import { concatBuffers, readInt32LE } from '../../bson';
34
import { LEGACY_HELLO_COMMAND } from '../../constants';
45
import { getSnappy, getZstdLibrary, type SnappyLib, type ZStandard } from '../../deps';
56
import { MongoDecompressionError, MongoInvalidArgumentError } from '../../error';
@@ -168,7 +169,7 @@ export async function compressCommand(
168169
zlibCompressionLevel: description.zlibCompressionLevel ?? 0
169170
});
170171
const data = await finalCommand.toBin();
171-
return Buffer.concat(data);
172+
return concatBuffers(data);
172173
}
173174

174175
/**
@@ -180,10 +181,10 @@ export async function compressCommand(
180181
*/
181182
export async function decompressResponse(message: Buffer): Promise<OpMsgResponse | OpReply> {
182183
const messageHeader: MessageHeader = {
183-
length: message.readInt32LE(0),
184-
requestId: message.readInt32LE(4),
185-
responseTo: message.readInt32LE(8),
186-
opCode: message.readInt32LE(12)
184+
length: readInt32LE(message, 0),
185+
requestId: readInt32LE(message, 4),
186+
responseTo: readInt32LE(message, 8),
187+
opCode: readInt32LE(message, 12)
187188
};
188189

189190
if (messageHeader.opCode !== OP_COMPRESSED) {
@@ -195,8 +196,8 @@ export async function decompressResponse(message: Buffer): Promise<OpMsgResponse
195196
const header: MessageHeader = {
196197
...messageHeader,
197198
fromCompressed: true,
198-
opCode: message.readInt32LE(MESSAGE_HEADER_SIZE),
199-
length: message.readInt32LE(MESSAGE_HEADER_SIZE + 4)
199+
opCode: readInt32LE(message, MESSAGE_HEADER_SIZE),
200+
length: readInt32LE(message, MESSAGE_HEADER_SIZE + 4)
200201
};
201202
const compressorID = message[MESSAGE_HEADER_SIZE + 8];
202203
const compressedBuffer = message.slice(MESSAGE_HEADER_SIZE + 9);

0 commit comments

Comments
 (0)