Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
24daacd
using bson for buffer calls
PavelSafronov Jan 14, 2026
51efd36
removing unnecessary code
PavelSafronov Jan 14, 2026
3d28ae5
lots of changes related to removing Buffer from internal uses:
PavelSafronov Jan 14, 2026
74fc9e3
test fixes
PavelSafronov Jan 14, 2026
732b12e
remove onDemand accessors
PavelSafronov Jan 14, 2026
0ee1a17
update based on isUint8Array signature change
PavelSafronov Jan 14, 2026
5ddfae4
remove unnecessary toLocalBufferType calls
PavelSafronov Jan 14, 2026
45ebf1e
point to bson PR https://github.com/mongodb/js-bson/pull/860/
PavelSafronov Jan 15, 2026
79d0d19
use bson 7.1.0
PavelSafronov Jan 16, 2026
82a34bb
pick up BSON 7.1.1
PavelSafronov Jan 20, 2026
fbb742e
Merge branch 'main' into NODE-7315
PavelSafronov Jan 20, 2026
cbc488c
resolve conflicts
PavelSafronov Jan 20, 2026
e649b6f
Merge branch 'main' into NODE-7315
PavelSafronov Jan 20, 2026
12ff51f
add an easy-to-use copyBuffer method
PavelSafronov Jan 21, 2026
983571b
minor fix: create copy of data
PavelSafronov Jan 21, 2026
e637852
remove unnecessary ByteUtils.toLocalBufferType calls
PavelSafronov Jan 22, 2026
6c9d928
Merge branch 'main' into NODE-7315
PavelSafronov Jan 22, 2026
736ba62
pr feedback
PavelSafronov Jan 22, 2026
722379a
lint fix
PavelSafronov Jan 22, 2026
45c9875
Apply suggestion from @addaleax
PavelSafronov Jan 23, 2026
ecc19f2
pr feedback: undo test changes and remove ByteUtils
PavelSafronov Jan 26, 2026
a04ec7b
Merge branch 'main' into NODE-7315
PavelSafronov Jan 26, 2026
cafd0e4
pr feedback
PavelSafronov Feb 2, 2026
ca016d9
fix bugs
PavelSafronov Feb 3, 2026
615f37f
pick up BSON 7.2.0
PavelSafronov Feb 3, 2026
ea641e6
Merge branch 'main' into NODE-7315
PavelSafronov Feb 3, 2026
4258a42
removed writeInt32LE, use NumberUtils directly
PavelSafronov Feb 4, 2026
5dbbf80
pr feedback:
PavelSafronov Feb 4, 2026
6a5087a
lint fix
PavelSafronov Feb 4, 2026
58739b6
fix bug
PavelSafronov Feb 4, 2026
ad9e2aa
pr feedback
PavelSafronov Feb 4, 2026
ffcdfe0
pr feedback
PavelSafronov Feb 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions src/bson.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@
/** @internal */
export type BSONElement = BSON.OnDemand['BSONElement'];

/** @internal */
export function toLocalBufferType(this: void, buffer: Buffer | Uint8Array): Buffer {
return Buffer.isBuffer(buffer)
? buffer
: Buffer.from(buffer.buffer, buffer.byteOffset, buffer.byteLength);
}

export function parseToElementsToArray(bytes: Uint8Array, offset?: number): BSONElement[] {
const res = BSON.onDemand.parseToElements(bytes, offset);
return Array.isArray(res) ? res : [...res];
Expand All @@ -41,6 +48,34 @@
export const getFloat64LE = BSON.onDemand.NumberUtils.getFloat64LE;
export const getBigInt64LE = BSON.onDemand.NumberUtils.getBigInt64LE;
export const toUTF8 = BSON.onDemand.ByteUtils.toUTF8;
export const writeInt32LE = BSON.onDemand.NumberUtils.setInt32LE;

export const fromUTF8 = (text: string) => toLocalBufferType(BSON.onDemand.ByteUtils.fromUTF8(text));

Check failure on line 53 in src/bson.ts

View workflow job for this annotation

GitHub Actions / build

Property 'fromUTF8' does not exist on type 'ByteUtils'.

Check failure on line 53 in src/bson.ts

View workflow job for this annotation

GitHub Actions / build

Property 'fromUTF8' does not exist on type 'ByteUtils'.

export const concatBuffers = (list: Buffer[] | Uint8Array[]) => {
return toLocalBufferType(BSON.onDemand.ByteUtils.concat(list));

Check failure on line 56 in src/bson.ts

View workflow job for this annotation

GitHub Actions / build

Property 'concat' does not exist on type 'ByteUtils'.

Check failure on line 56 in src/bson.ts

View workflow job for this annotation

GitHub Actions / build

Property 'concat' does not exist on type 'ByteUtils'.
};
export const allocateBuffer = (size: number) =>
toLocalBufferType(BSON.onDemand.ByteUtils.allocate(size));
export const allocateUnsafeBuffer = (size: number) =>
toLocalBufferType(BSON.onDemand.ByteUtils.allocateUnsafe(size));

export const utf8ByteLength = BSON.onDemand.ByteUtils.utf8ByteLength;
export const toBase64 = BSON.onDemand.ByteUtils.toBase64;
export const encodeUTF8Into = BSON.onDemand.ByteUtils.encodeUTF8Into;

const validateBufferInputs = (buffer: Uint8Array, offset: number, length: number) => {
if (offset < 0 || offset + length > buffer.length) {
throw new RangeError(
`Attempt to access memory outside buffer bounds: buffer length: ${buffer.length}, offset: ${offset}, length: ${length}`
);
}
};

export const readInt32LE = (buffer: Uint8Array, offset: number): number => {
validateBufferInputs(buffer, offset, 4);
Comment thread
baileympearson marked this conversation as resolved.
return getInt32LE(buffer, offset);
};

/**
* BSON Serialization options.
Expand Down
23 changes: 15 additions & 8 deletions src/cmap/auth/scram.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import { saslprep } from '@mongodb-js/saslprep';
import * as crypto from 'crypto';

import { Binary, type Document } from '../../bson';
import {
allocateBuffer,
Binary,
concatBuffers,
type Document,
fromUTF8,
toBase64
} from '../../bson';
import {
MongoInvalidArgumentError,
MongoMissingCredentialsError,
Expand Down Expand Up @@ -68,11 +75,11 @@ function cleanUsername(username: string) {
function clientFirstMessageBare(username: string, nonce: Buffer) {
// NOTE: This is done b/c Javascript uses UTF-16, but the server is hashing in UTF-8.
// Since the username is not sasl-prep-d, we need to do this here.
return Buffer.concat([
Buffer.from('n=', 'utf8'),
Buffer.from(username, 'utf8'),
Buffer.from(',r=', 'utf8'),
Buffer.from(nonce.toString('base64'), 'utf8')
return concatBuffers([
fromUTF8('n='),
fromUTF8(username),
fromUTF8(',r='),
fromUTF8(toBase64(nonce))
]);
}

Expand All @@ -91,7 +98,7 @@ function makeFirstMessage(
saslStart: 1,
mechanism,
payload: new Binary(
Buffer.concat([Buffer.from('n,,', 'utf8'), clientFirstMessageBare(username, nonce)])
concatBuffers([Buffer.from('n,,', 'utf8'), clientFirstMessageBare(username, nonce)])
),
autoAuthorize: 1,
options: { skipEmptyExchange: true }
Expand Down Expand Up @@ -199,7 +206,7 @@ async function continueScramConversation(
const retrySaslContinueCmd = {
saslContinue: 1,
conversationId: r.conversationId,
payload: Buffer.alloc(0)
payload: allocateBuffer(0)
};

await connection.command(ns(`${db}.$cmd`), retrySaslContinueCmd, undefined);
Expand Down
85 changes: 48 additions & 37 deletions src/cmap/commands.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
import type { BSONSerializeOptions, Document, Long } from '../bson';
import * as BSON from '../bson';
import {
allocateBuffer,
allocateUnsafeBuffer,
BSON,
type BSONSerializeOptions,
concatBuffers,
type Document,
type Long,
readInt32LE,
utf8ByteLength,
writeInt32LE
} from '../bson';
import { MongoInvalidArgumentError, MongoRuntimeError } from '../error';
import { type ReadPreference } from '../read_preference';
import type { ClientSession } from '../sessions';
Expand Down Expand Up @@ -30,7 +40,7 @@ const QUERY_FAILURE = 2;
const SHARD_CONFIG_STALE = 4;
const AWAIT_CAPABLE = 8;

const encodeUTF8Into = BSON.BSON.onDemand.ByteUtils.encodeUTF8Into;
const encodeUTF8Into = BSON.onDemand.ByteUtils.encodeUTF8Into;
Comment thread
tadjik1 marked this conversation as resolved.
Outdated

/** @internal */
export type WriteProtocolMessageType = OpQueryRequest | OpMsgRequest;
Expand Down Expand Up @@ -182,10 +192,10 @@ export class OpQueryRequest {
if (this.batchSize !== this.numberToReturn) this.numberToReturn = this.batchSize;

// Allocate write protocol header buffer
const header = Buffer.alloc(
const header = allocateBuffer(
4 * 4 + // Header
4 + // Flags
Buffer.byteLength(this.ns) +
utf8ByteLength(this.ns) +
1 + // namespace
4 + // numberToSkip
4 // numberToReturn
Expand Down Expand Up @@ -256,7 +266,7 @@ export class OpQueryRequest {
index = index + 4;

// Write collection name
index = index + header.write(this.ns, index, 'utf8') + 1;
index = index + encodeUTF8Into(header, this.ns, index) + 1;
header[index - 1] = 0;

// Write header information flags numberToSkip
Expand Down Expand Up @@ -364,10 +374,10 @@ export class OpReply {
this.index = 20;

// Read the message body
this.responseFlags = this.data.readInt32LE(0);
this.cursorId = new BSON.Long(this.data.readInt32LE(4), this.data.readInt32LE(8));
this.startingFrom = this.data.readInt32LE(12);
this.numberReturned = this.data.readInt32LE(16);
this.responseFlags = readInt32LE(this.data, 0);
this.cursorId = new BSON.Long(readInt32LE(this.data, 4), readInt32LE(this.data, 8));
this.startingFrom = readInt32LE(this.data, 12);
this.numberReturned = readInt32LE(this.data, 16);

if (this.numberReturned < 0 || this.numberReturned > 2 ** 32 - 1) {
throw new RangeError(
Expand Down Expand Up @@ -446,7 +456,7 @@ export class DocumentSequence {
this.serializedDocumentsLength = 0;
// Document sequences starts with type 1 at the first byte.
// Field strings must always be UTF-8.
const buffer = Buffer.allocUnsafe(1 + 4 + this.field.length + 1);
const buffer = allocateUnsafeBuffer(1 + 4 + this.field.length + 1);
buffer[0] = 1;
// Third part is the field name at offset 5 with trailing null byte.
encodeUTF8Into(buffer, `${this.field}\0`, 5);
Expand Down Expand Up @@ -482,7 +492,7 @@ export class DocumentSequence {
* @returns The section bytes.
*/
toBin(): Uint8Array {
return Buffer.concat(this.chunks);
return concatBuffers(this.chunks);
}
}

Expand Down Expand Up @@ -547,7 +557,7 @@ export class OpMsgRequest {
flags |= OPTS_EXHAUST_ALLOWED;
}

const header = Buffer.alloc(
const header = allocateBuffer(
4 * 4 + // Header
4 // Flags
);
Expand All @@ -558,11 +568,11 @@ export class OpMsgRequest {
const command = this.command;
totalLength += this.makeSections(buffers, command);

header.writeInt32LE(totalLength, 0); // messageLength
header.writeInt32LE(this.requestId, 4); // requestID
header.writeInt32LE(0, 8); // responseTo
header.writeInt32LE(OP_MSG, 12); // opCode
header.writeUInt32LE(flags, 16); // flags
writeInt32LE(header, 0, totalLength); // messageLength
writeInt32LE(header, 4, this.requestId); // requestID
writeInt32LE(header, 8, 0); // responseTo
writeInt32LE(header, 12, OP_MSG); // opCode
writeInt32LE(header, 16, flags); // flags
return buffers;
}

Expand All @@ -571,7 +581,7 @@ export class OpMsgRequest {
*/
makeSections(buffers: Uint8Array[], document: Document): number {
const sequencesBuffer = this.extractDocumentSequences(document);
const payloadTypeBuffer = Buffer.allocUnsafe(1);
const payloadTypeBuffer = allocateUnsafeBuffer(1);
payloadTypeBuffer[0] = 0;

const documentBuffer = this.serializeBson(document);
Expand Down Expand Up @@ -606,11 +616,11 @@ export class OpMsgRequest {
}
}
if (chunks.length > 0) {
return Buffer.concat(chunks);
return concatBuffers(chunks);
}
// If we have no document sequences we return an empty buffer for nothing to add
// to the payload.
return Buffer.alloc(0);
return allocateBuffer(0);
}

serializeBson(document: Document): Uint8Array {
Expand Down Expand Up @@ -676,7 +686,7 @@ export class OpMsgResponse {
this.fromCompressed = msgHeader.fromCompressed;

// Read response flags
this.responseFlags = msgBody.readInt32LE(0);
this.responseFlags = readInt32LE(msgBody, 0);
this.checksumPresent = (this.responseFlags & OPTS_CHECKSUM_PRESENT) !== 0;
this.moreToCome = (this.responseFlags & OPTS_MORE_TO_COME) !== 0;
this.exhaustAllowed = (this.responseFlags & OPTS_EXHAUST_ALLOWED) !== 0;
Expand All @@ -700,9 +710,9 @@ export class OpMsgResponse {
this.index = 4;

while (this.index < this.data.length) {
const payloadType = this.data.readUInt8(this.index++);
const payloadType = this.data[this.index++];
if (payloadType === 0) {
const bsonSize = this.data.readUInt32LE(this.index);
const bsonSize = readInt32LE(this.data, this.index);
Comment thread
baileympearson marked this conversation as resolved.
const bin = this.data.subarray(this.index, this.index + bsonSize);

this.sections.push(bin);
Expand Down Expand Up @@ -759,30 +769,31 @@ export class OpCompressedRequest {
}

async toBin(): Promise<Buffer[]> {
const concatenatedOriginalCommandBuffer = Buffer.concat(this.command.toBin());
const concatenatedOriginalCommandBuffer = concatBuffers(this.command.toBin());
// otherwise, compress the message
const messageToBeCompressed = concatenatedOriginalCommandBuffer.slice(MESSAGE_HEADER_SIZE);

// Extract information needed for OP_COMPRESSED from the uncompressed message
const originalCommandOpCode = concatenatedOriginalCommandBuffer.readInt32LE(12);
const originalCommandOpCode = readInt32LE(concatenatedOriginalCommandBuffer, 12);

// Compress the message body
const compressedMessage = await compress(this.options, messageToBeCompressed);
// Create the msgHeader of OP_COMPRESSED
const msgHeader = Buffer.alloc(MESSAGE_HEADER_SIZE);
msgHeader.writeInt32LE(
MESSAGE_HEADER_SIZE + COMPRESSION_DETAILS_SIZE + compressedMessage.length,
0
const msgHeader = allocateBuffer(MESSAGE_HEADER_SIZE);
writeInt32LE(
msgHeader,
0,
MESSAGE_HEADER_SIZE + COMPRESSION_DETAILS_SIZE + compressedMessage.length
); // messageLength
msgHeader.writeInt32LE(this.command.requestId, 4); // requestID
msgHeader.writeInt32LE(0, 8); // responseTo (zero)
msgHeader.writeInt32LE(OP_COMPRESSED, 12); // opCode
writeInt32LE(msgHeader, 4, this.command.requestId); // requestID
writeInt32LE(msgHeader, 8, 0); // responseTo (zero)
writeInt32LE(msgHeader, 12, OP_COMPRESSED); // opCode

// Create the compression details of OP_COMPRESSED
const compressionDetails = Buffer.alloc(COMPRESSION_DETAILS_SIZE);
compressionDetails.writeInt32LE(originalCommandOpCode, 0); // originalOpcode
compressionDetails.writeInt32LE(messageToBeCompressed.length, 4); // Size of the uncompressed compressedMessage, excluding the MsgHeader
compressionDetails.writeUInt8(Compressor[this.options.agreedCompressor], 8); // compressorID
const compressionDetails = allocateBuffer(COMPRESSION_DETAILS_SIZE);
writeInt32LE(compressionDetails, 0, originalCommandOpCode); // originalOpcode
writeInt32LE(compressionDetails, 4, messageToBeCompressed.length); // Size of the uncompressed compressedMessage, excluding the MsgHeader
writeInt32LE(compressionDetails, 8, Compressor[this.options.agreedCompressor]); // compressorID
return [msgHeader, compressionDetails, compressedMessage];
}
}
6 changes: 4 additions & 2 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ import { type Readable, Transform, type TransformCallback } from 'stream';
import { clearTimeout, setTimeout } from 'timers';

import {
BSON,
type BSONSerializeOptions,
concatBuffers,
deserialize,
type DeserializeOptions,
type Document,
Expand Down Expand Up @@ -174,7 +176,7 @@ function streamIdentifier(stream: Stream, options: ConnectionOptions): string {
return HostAddress.fromHostPort(remoteAddress, remotePort).toString();
}

return uuidV4().toString('hex');
return BSON.onDemand.ByteUtils.toHex(uuidV4());
}

/** @internal */
Expand Down Expand Up @@ -696,7 +698,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
zlibCompressionLevel: options.zlibCompressionLevel ?? 0
});

const buffer = Buffer.concat(await finalCommand.toBin());
const buffer = concatBuffers(await finalCommand.toBin());

if (options.timeoutContext?.csotEnabled()) {
if (
Expand Down
15 changes: 8 additions & 7 deletions src/cmap/wire_protocol/compression.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import * as zlib from 'zlib';

import { concatBuffers, readInt32LE } from '../../bson';
import { LEGACY_HELLO_COMMAND } from '../../constants';
import { getSnappy, getZstdLibrary, type SnappyLib, type ZStandard } from '../../deps';
import { MongoDecompressionError, MongoInvalidArgumentError } from '../../error';
Expand Down Expand Up @@ -168,7 +169,7 @@ export async function compressCommand(
zlibCompressionLevel: description.zlibCompressionLevel ?? 0
});
const data = await finalCommand.toBin();
return Buffer.concat(data);
return concatBuffers(data);
}

/**
Expand All @@ -180,10 +181,10 @@ export async function compressCommand(
*/
export async function decompressResponse(message: Buffer): Promise<OpMsgResponse | OpReply> {
const messageHeader: MessageHeader = {
length: message.readInt32LE(0),
requestId: message.readInt32LE(4),
responseTo: message.readInt32LE(8),
opCode: message.readInt32LE(12)
length: readInt32LE(message, 0),
requestId: readInt32LE(message, 4),
responseTo: readInt32LE(message, 8),
opCode: readInt32LE(message, 12)
};

if (messageHeader.opCode !== OP_COMPRESSED) {
Expand All @@ -195,8 +196,8 @@ export async function decompressResponse(message: Buffer): Promise<OpMsgResponse
const header: MessageHeader = {
...messageHeader,
fromCompressed: true,
opCode: message.readInt32LE(MESSAGE_HEADER_SIZE),
length: message.readInt32LE(MESSAGE_HEADER_SIZE + 4)
opCode: readInt32LE(message, MESSAGE_HEADER_SIZE),
length: readInt32LE(message, MESSAGE_HEADER_SIZE + 4)
};
const compressorID = message[MESSAGE_HEADER_SIZE + 8];
const compressedBuffer = message.slice(MESSAGE_HEADER_SIZE + 9);
Expand Down
6 changes: 3 additions & 3 deletions src/gridfs/upload.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Writable } from 'stream';

import { type Document, ObjectId } from '../bson';
import { allocateBuffer, type Document, ObjectId } from '../bson';
import type { Collection } from '../collection';
import { CursorTimeoutMode } from '../cursor/abstract_cursor';
import {
Expand Down Expand Up @@ -122,7 +122,7 @@ export class GridFSBucketWriteStream extends Writable {
this.id = options.id ? options.id : new ObjectId();
// properly inherit the default chunksize from parent
this.chunkSizeBytes = options.chunkSizeBytes || this.bucket.s.options.chunkSizeBytes;
this.bufToStore = Buffer.alloc(this.chunkSizeBytes);
this.bufToStore = allocateBuffer(this.chunkSizeBytes);
this.length = 0;
this.n = 0;
this.pos = 0;
Expand Down Expand Up @@ -495,7 +495,7 @@ function writeRemnant(stream: GridFSBucketWriteStream, callback: Callback): void

// Create a new buffer to make sure the buffer isn't bigger than it needs
// to be.
const remnant = Buffer.alloc(stream.pos);
const remnant = allocateBuffer(stream.pos);
stream.bufToStore.copy(remnant, 0, 0, stream.pos);
const doc = createChunkDoc(stream.id, stream.n, remnant);

Expand Down
Loading
Loading