From 175d7915d9d4737adb3d6f7eceadd611331653ea Mon Sep 17 00:00:00 2001 From: bailey Date: Tue, 29 Jul 2025 14:28:08 -0600 Subject: [PATCH 1/4] remove insertmany operation --- src/collection.ts | 35 ++++++++--- src/operations/insert.ts | 59 +------------------ .../crud/abstract_operation.test.ts | 5 -- 3 files changed, 28 insertions(+), 71 deletions(-) diff --git a/src/collection.ts b/src/collection.ts index 1bdc89b2628..9660fb3409b 100644 --- a/src/collection.ts +++ b/src/collection.ts @@ -61,7 +61,6 @@ import { type ListIndexesOptions } from './operations/indexes'; import { - InsertManyOperation, type InsertManyResult, InsertOneOperation, type InsertOneOptions, @@ -305,14 +304,34 @@ export class Collection { docs: ReadonlyArray>, options?: BulkWriteOptions ): Promise> { - return await executeOperation( - this.client, - new InsertManyOperation( - this as TODO_NODE_3286, - docs, - resolveOptions(this, options ?? { ordered: true }) - ) as TODO_NODE_3286 + if (!Array.isArray(docs)) { + throw new MongoInvalidArgumentError('Argument "docs" must be an array of documents'); + } + + const writeConcern = WriteConcern.fromOptions(options); + const bulkWriteOperation = new BulkWriteOperation( + this as unknown as Collection, + docs.map(document => ({ + insertOne: { document } + })), + options ?? {} ); + + try { + const res = await executeOperation(this.client, bulkWriteOperation); + return { + acknowledged: writeConcern?.w !== 0, + insertedCount: res.insertedCount, + insertedIds: res.insertedIds + }; + } catch (err) { + if (err && err.message === 'Operation must be an object with an operation key') { + throw new MongoInvalidArgumentError( + 'Collection.insertMany() cannot be called with an array that has null/undefined values' + ); + } + throw err; + } } /** diff --git a/src/operations/insert.ts b/src/operations/insert.ts index 1a40763e313..330648400d7 100644 --- a/src/operations/insert.ts +++ b/src/operations/insert.ts @@ -105,62 +105,5 @@ export interface InsertManyResult { insertedIds: { [key: number]: InferIdType }; } -/** @internal */ -export class InsertManyOperation extends AbstractOperation { - override options: BulkWriteOptions; - collection: Collection; - docs: ReadonlyArray; - - constructor(collection: Collection, docs: ReadonlyArray, options: BulkWriteOptions) { - super(options); - - if (!Array.isArray(docs)) { - throw new MongoInvalidArgumentError('Argument "docs" must be an array of documents'); - } - - this.options = options; - this.collection = collection; - this.docs = docs; - } - - override get commandName() { - return 'insert' as const; - } - - override async execute( - server: Server, - session: ClientSession | undefined, - timeoutContext: TimeoutContext - ): Promise { - const coll = this.collection; - const options = { ...this.options, ...this.bsonOptions, readPreference: this.readPreference }; - const writeConcern = WriteConcern.fromOptions(options); - const bulkWriteOperation = new BulkWriteOperation( - coll, - this.docs.map(document => ({ - insertOne: { document } - })), - options - ); - - try { - const res = await bulkWriteOperation.execute(server, session, timeoutContext); - return { - acknowledged: writeConcern?.w !== 0, - insertedCount: res.insertedCount, - insertedIds: res.insertedIds - }; - } catch (err) { - if (err && err.message === 'Operation must be an object with an operation key') { - throw new MongoInvalidArgumentError( - 'Collection.insertMany() cannot be called with an array that has null/undefined values' - ); - } - throw err; - } - } -} - defineAspects(InsertOperation, [Aspect.RETRYABLE, Aspect.WRITE_OPERATION]); -defineAspects(InsertOneOperation, [Aspect.RETRYABLE, Aspect.WRITE_OPERATION]); -defineAspects(InsertManyOperation, [Aspect.WRITE_OPERATION]); +defineAspects(InsertOneOperation, [Aspect.RETRYABLE, Aspect.WRITE_OPERATION]); \ No newline at end of file diff --git a/test/integration/crud/abstract_operation.test.ts b/test/integration/crud/abstract_operation.test.ts index a30c185db6b..82af49be7d2 100644 --- a/test/integration/crud/abstract_operation.test.ts +++ b/test/integration/crud/abstract_operation.test.ts @@ -161,11 +161,6 @@ describe('abstract operation', function () { subclassType: mongodb.InsertOneOperation, correctCommandName: 'insert' }, - { - subclassCreator: () => new mongodb.InsertManyOperation(collection, [{ a: 1 }], {}), - subclassType: mongodb.InsertManyOperation, - correctCommandName: 'insert' - }, { subclassCreator: () => new mongodb.IsCappedOperation(collection, {}), subclassType: mongodb.IsCappedOperation, From e75c6a06dd96df19ed41e777917d86141a095ccf Mon Sep 17 00:00:00 2001 From: bailey Date: Tue, 29 Jul 2025 14:40:00 -0600 Subject: [PATCH 2/4] remove bulk write operation --- src/collection.ts | 48 ++++++++------ src/operations/bulk_write.ts | 64 ------------------- src/operations/insert.ts | 8 +-- .../crud/abstract_operation.test.ts | 6 -- 4 files changed, 31 insertions(+), 95 deletions(-) delete mode 100644 src/operations/bulk_write.ts diff --git a/src/collection.ts b/src/collection.ts index 9660fb3409b..e66358442ee 100644 --- a/src/collection.ts +++ b/src/collection.ts @@ -1,5 +1,10 @@ import { type BSONSerializeOptions, type Document, resolveBSONOptions } from './bson'; -import type { AnyBulkWriteOperation, BulkWriteOptions, BulkWriteResult } from './bulk/common'; +import type { + AnyBulkWriteOperation, + BulkOperationBase, + BulkWriteOptions, + BulkWriteResult +} from './bulk/common'; import { OrderedBulkOperation } from './bulk/ordered'; import { UnorderedBulkOperation } from './bulk/unordered'; import { ChangeStream, type ChangeStreamDocument, type ChangeStreamOptions } from './change_stream'; @@ -24,7 +29,6 @@ import type { WithoutId } from './mongo_types'; import type { AggregateOptions } from './operations/aggregate'; -import { BulkWriteOperation } from './operations/bulk_write'; import { CountOperation, type CountOptions } from './operations/count'; import { DeleteManyOperation, @@ -307,20 +311,17 @@ export class Collection { if (!Array.isArray(docs)) { throw new MongoInvalidArgumentError('Argument "docs" must be an array of documents'); } + options = options ?? {}; - const writeConcern = WriteConcern.fromOptions(options); - const bulkWriteOperation = new BulkWriteOperation( - this as unknown as Collection, - docs.map(document => ({ - insertOne: { document } - })), - options ?? {} - ); + const acknowledged = WriteConcern.fromOptions(options)?.w !== 0; try { - const res = await executeOperation(this.client, bulkWriteOperation); + const res = await this.bulkWrite( + docs.map(doc => ({ insertOne: { document: doc } })), + options + ); return { - acknowledged: writeConcern?.w !== 0, + acknowledged, insertedCount: res.insertedCount, insertedIds: res.insertedIds }; @@ -361,14 +362,21 @@ export class Collection { throw new MongoInvalidArgumentError('Argument "operations" must be an array of documents'); } - return await executeOperation( - this.client, - new BulkWriteOperation( - this as TODO_NODE_3286, - operations, - resolveOptions(this, options ?? { ordered: true }) - ) - ); + options = options ?? {}; + + // Create the bulk operation + const bulk: BulkOperationBase = + options.ordered === false + ? this.initializeUnorderedBulkOp(options) + : this.initializeOrderedBulkOp(options); + + // for each op go through and add to the bulk + for (let i = 0; i < operations.length; i++) { + bulk.raw(operations[i]); + } + + // Execute the bulk + return await bulk.execute({ ...options }); } /** diff --git a/src/operations/bulk_write.ts b/src/operations/bulk_write.ts deleted file mode 100644 index 55b61ef73b0..00000000000 --- a/src/operations/bulk_write.ts +++ /dev/null @@ -1,64 +0,0 @@ -import type { - AnyBulkWriteOperation, - BulkOperationBase, - BulkWriteOptions, - BulkWriteResult -} from '../bulk/common'; -import type { Collection } from '../collection'; -import type { Server } from '../sdam/server'; -import type { ClientSession } from '../sessions'; -import { type TimeoutContext } from '../timeout'; -import { AbstractOperation, Aspect, defineAspects } from './operation'; - -/** @internal */ -export class BulkWriteOperation extends AbstractOperation { - override options: BulkWriteOptions; - collection: Collection; - operations: ReadonlyArray; - - constructor( - collection: Collection, - operations: ReadonlyArray, - options: BulkWriteOptions - ) { - super(options); - this.options = options; - this.collection = collection; - this.operations = operations; - } - - override get commandName() { - return 'bulkWrite' as const; - } - - override async execute( - server: Server, - session: ClientSession | undefined, - timeoutContext: TimeoutContext - ): Promise { - const coll = this.collection; - const operations = this.operations; - const options = { - ...this.options, - ...this.bsonOptions, - readPreference: this.readPreference, - timeoutContext - }; - - // Create the bulk operation - const bulk: BulkOperationBase = - options.ordered === false - ? coll.initializeUnorderedBulkOp(options) - : coll.initializeOrderedBulkOp(options); - - // for each op go through and add to the bulk - for (let i = 0; i < operations.length; i++) { - bulk.raw(operations[i]); - } - - // Execute the bulk - return await bulk.execute({ ...options, session }); - } -} - -defineAspects(BulkWriteOperation, [Aspect.WRITE_OPERATION]); diff --git a/src/operations/insert.ts b/src/operations/insert.ts index 330648400d7..04beb087e96 100644 --- a/src/operations/insert.ts +++ b/src/operations/insert.ts @@ -1,16 +1,14 @@ import type { Document } from '../bson'; import type { BulkWriteOptions } from '../bulk/common'; import type { Collection } from '../collection'; -import { MongoInvalidArgumentError, MongoServerError } from '../error'; +import { MongoServerError } from '../error'; import type { InferIdType } from '../mongo_types'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; import { type TimeoutContext } from '../timeout'; import { maybeAddIdToDocuments, type MongoDBNamespace } from '../utils'; -import { WriteConcern } from '../write_concern'; -import { BulkWriteOperation } from './bulk_write'; import { CommandOperation, type CommandOperationOptions } from './command'; -import { AbstractOperation, Aspect, defineAspects } from './operation'; +import { Aspect, defineAspects } from './operation'; /** @internal */ export class InsertOperation extends CommandOperation { @@ -106,4 +104,4 @@ export interface InsertManyResult { } defineAspects(InsertOperation, [Aspect.RETRYABLE, Aspect.WRITE_OPERATION]); -defineAspects(InsertOneOperation, [Aspect.RETRYABLE, Aspect.WRITE_OPERATION]); \ No newline at end of file +defineAspects(InsertOneOperation, [Aspect.RETRYABLE, Aspect.WRITE_OPERATION]); diff --git a/test/integration/crud/abstract_operation.test.ts b/test/integration/crud/abstract_operation.test.ts index 82af49be7d2..0c26dd01cc0 100644 --- a/test/integration/crud/abstract_operation.test.ts +++ b/test/integration/crud/abstract_operation.test.ts @@ -37,12 +37,6 @@ describe('abstract operation', function () { subclassType: mongodb.AggregateOperation, correctCommandName: 'aggregate' }, - { - subclassCreator: () => - new mongodb.BulkWriteOperation(collection, [{ insertOne: { document: { a: 1 } } }], {}), - subclassType: mongodb.BulkWriteOperation, - correctCommandName: 'bulkWrite' - }, { subclassCreator: () => new mongodb.CollectionsOperation(db, {}), subclassType: mongodb.CollectionsOperation, From 18d6c8fa6b31e28480b5814ba5000b68e6e3e967 Mon Sep 17 00:00:00 2001 From: bailey Date: Tue, 29 Jul 2025 16:04:55 -0600 Subject: [PATCH 3/4] All aworking --- src/bulk/common.ts | 62 ++++++++++------------------- src/collection.ts | 17 +++++--- src/operations/execute_operation.ts | 2 +- test/integration/crud/bulk.test.ts | 45 +++++++++------------ test/mongodb.ts | 1 - 5 files changed, 53 insertions(+), 74 deletions(-) diff --git a/src/bulk/common.ts b/src/bulk/common.ts index 50283c94f1e..4b1578903ea 100644 --- a/src/bulk/common.ts +++ b/src/bulk/common.ts @@ -14,13 +14,11 @@ import type { CollationOptions, CommandOperationOptions } from '../operations/co import { DeleteOperation, type DeleteStatement, makeDeleteStatement } from '../operations/delete'; import { executeOperation } from '../operations/execute_operation'; import { InsertOperation } from '../operations/insert'; -import { AbstractOperation, type Hint } from '../operations/operation'; +import { type Hint } from '../operations/operation'; import { makeUpdateStatement, UpdateOperation, type UpdateStatement } from '../operations/update'; -import type { Server } from '../sdam/server'; import type { Topology } from '../sdam/topology'; -import type { ClientSession } from '../sessions'; import { type Sort } from '../sort'; -import { type TimeoutContext } from '../timeout'; +import { TimeoutContext } from '../timeout'; import { applyRetryableWrites, getTopology, @@ -854,40 +852,6 @@ export interface BulkWriteOptions extends CommandOperationOptions { timeoutContext?: TimeoutContext; } -/** - * TODO(NODE-4063) - * BulkWrites merge complexity is implemented in executeCommands - * This provides a vehicle to treat bulkOperations like any other operation (hence "shim") - * We would like this logic to simply live inside the BulkWriteOperation class - * @internal - */ -export class BulkWriteShimOperation extends AbstractOperation { - bulkOperation: BulkOperationBase; - constructor(bulkOperation: BulkOperationBase, options: BulkWriteOptions) { - super(options); - this.bulkOperation = bulkOperation; - } - - get commandName(): string { - return 'bulkWrite' as const; - } - - async execute( - _server: Server, - session: ClientSession | undefined, - timeoutContext: TimeoutContext - ): Promise { - if (this.options.session == null) { - // An implicit session could have been created by 'executeOperation' - // So if we stick it on finalOptions here, each bulk operation - // will use this same session, it'll be passed in the same way - // an explicit session would be - this.options.session = session; - } - return await executeCommands(this.bulkOperation, { ...this.options, timeoutContext }); - } -} - /** @public */ export abstract class BulkOperationBase { isOrdered: boolean; @@ -1208,10 +1172,26 @@ export abstract class BulkOperationBase { } this.s.executed = true; - const finalOptions = { ...this.s.options, ...options }; - const operation = new BulkWriteShimOperation(this, finalOptions); + const finalOptions = resolveOptions(this.collection, { ...this.s.options, ...options }); + + // if there is no timeoutContext provided, create a timeoutContext and use it for + // all batches in the bulk operation + finalOptions.timeoutContext ??= TimeoutContext.create({ + session: finalOptions.session, + timeoutMS: finalOptions.timeoutMS, + serverSelectionTimeoutMS: this.collection.client.s.options.serverSelectionTimeoutMS, + waitQueueTimeoutMS: this.collection.client.s.options.waitQueueTimeoutMS + }); + + if (finalOptions.session == null) { + // if there is not an explicit session provided to `execute()`, create + // an implicit session and use that for all batches in the bulk operation + return await this.collection.client.withSession({ explicit: false }, async session => { + return await executeCommands(this, { ...finalOptions, session }); + }); + } - return await executeOperation(this.s.collection.client, operation, finalOptions.timeoutContext); + return await executeCommands(this, { ...finalOptions }); } /** diff --git a/src/collection.ts b/src/collection.ts index e66358442ee..5ef6ed8f2e5 100644 --- a/src/collection.ts +++ b/src/collection.ts @@ -42,7 +42,7 @@ import { EstimatedDocumentCountOperation, type EstimatedDocumentCountOptions } from './operations/estimated_document_count'; -import { executeOperation } from './operations/execute_operation'; +import { autoConnect, executeOperation } from './operations/execute_operation'; import type { FindOptions } from './operations/find'; import { FindOneAndDeleteOperation, @@ -311,7 +311,7 @@ export class Collection { if (!Array.isArray(docs)) { throw new MongoInvalidArgumentError('Argument "docs" must be an array of documents'); } - options = options ?? {}; + options = resolveOptions(this, options ?? {}); const acknowledged = WriteConcern.fromOptions(options)?.w !== 0; @@ -362,7 +362,14 @@ export class Collection { throw new MongoInvalidArgumentError('Argument "operations" must be an array of documents'); } - options = options ?? {}; + options = resolveOptions(this, options ?? {}); + + // TODO(NODE-7071): remove once the client doesn't need to be connected to construct + // bulk operations + const isConnected = this.client.topology != null; + if (!isConnected) { + await autoConnect(this.client); + } // Create the bulk operation const bulk: BulkOperationBase = @@ -371,8 +378,8 @@ export class Collection { : this.initializeOrderedBulkOp(options); // for each op go through and add to the bulk - for (let i = 0; i < operations.length; i++) { - bulk.raw(operations[i]); + for (const operation of operations) { + bulk.raw(operation); } // Execute the bulk diff --git a/src/operations/execute_operation.ts b/src/operations/execute_operation.ts index ed713999991..454f56daaa9 100644 --- a/src/operations/execute_operation.ts +++ b/src/operations/execute_operation.ts @@ -129,7 +129,7 @@ export async function executeOperation< * Connects a client if it has not yet been connected * @internal */ -async function autoConnect(client: MongoClient): Promise { +export async function autoConnect(client: MongoClient): Promise { if (client.topology == null) { if (client.s.hasBeenClosed) { throw new MongoNotConnectedError('Client must be connected before running operations'); diff --git a/test/integration/crud/bulk.test.ts b/test/integration/crud/bulk.test.ts index c7a80ffa182..bdae64ca5f5 100644 --- a/test/integration/crud/bulk.test.ts +++ b/test/integration/crud/bulk.test.ts @@ -925,7 +925,7 @@ describe('Bulk', function () { try { batch.insert({ string: hugeString }); test.ok(false); - } catch (err) {} // eslint-disable-line + } catch (err) { } // eslint-disable-line // Finish up test client.close(done); @@ -1216,34 +1216,27 @@ describe('Bulk', function () { } }); - it('should correctly execute unordered batch using w:0', { - metadata: { requires: { topology: ['single', 'replicaset', 'ssl', 'heap', 'wiredtiger'] } }, + it('should correctly execute unordered batch using w:0', async function () { + await client.connect(); + const db = client.db(); + const col = db.collection('batch_write_ordered_ops_9'); + const bulk = col.initializeUnorderedBulkOp(); + for (let i = 0; i < 100; i++) { + bulk.insert({ a: 1 }); + } - test: function (done) { - client.connect((err, client) => { - const db = client.db(); - const col = db.collection('batch_write_ordered_ops_9'); - const bulk = col.initializeUnorderedBulkOp(); - for (let i = 0; i < 100; i++) { - bulk.insert({ a: 1 }); - } + bulk.find({ b: 1 }).upsert().update({ b: 1 }); + bulk.find({ c: 1 }).delete(); - bulk.find({ b: 1 }).upsert().update({ b: 1 }); - bulk.find({ c: 1 }).delete(); + const result = await bulk.execute({ writeConcern: { w: 0 } }); + test.equal(0, result.upsertedCount); + test.equal(0, result.insertedCount); + test.equal(0, result.matchedCount); + test.ok(0 === result.modifiedCount || result.modifiedCount == null); + test.equal(0, result.deletedCount); + test.equal(false, result.hasWriteErrors()); - bulk.execute({ writeConcern: { w: 0 } }, function (err, result) { - expect(err).to.not.exist; - test.equal(0, result.upsertedCount); - test.equal(0, result.insertedCount); - test.equal(0, result.matchedCount); - test.ok(0 === result.modifiedCount || result.modifiedCount == null); - test.equal(0, result.deletedCount); - test.equal(false, result.hasWriteErrors()); - - client.close(done); - }); - }); - } + await client.close(); }); it('should provide an accessor for operations on ordered bulk ops', function (done) { diff --git a/test/mongodb.ts b/test/mongodb.ts index 45e6a6679c7..e9d019d0b12 100644 --- a/test/mongodb.ts +++ b/test/mongodb.ts @@ -163,7 +163,6 @@ export * from '../src/mongo_client'; export * from '../src/mongo_logger'; export * from '../src/mongo_types'; export * from '../src/operations/aggregate'; -export * from '../src/operations/bulk_write'; export * from '../src/operations/client_bulk_write/command_builder'; export * from '../src/operations/client_bulk_write/common'; export * from '../src/operations/client_bulk_write/results_merger'; From f613ca7712017b96fbfb458c8c466b808e566732 Mon Sep 17 00:00:00 2001 From: bailey Date: Wed, 30 Jul 2025 07:26:03 -0600 Subject: [PATCH 4/4] remove array overloads of maybeAddObjectId --- src/operations/insert.ts | 2 +- src/utils.ts | 35 ++++++++++------------------------- 2 files changed, 11 insertions(+), 26 deletions(-) diff --git a/src/operations/insert.ts b/src/operations/insert.ts index 04beb087e96..588468f3134 100644 --- a/src/operations/insert.ts +++ b/src/operations/insert.ts @@ -71,7 +71,7 @@ export interface InsertOneResult { export class InsertOneOperation extends InsertOperation { constructor(collection: Collection, doc: Document, options: InsertOneOptions) { - super(collection.s.namespace, maybeAddIdToDocuments(collection, [doc], options), options); + super(collection.s.namespace, [maybeAddIdToDocuments(collection, doc, options)], options); } override async execute( diff --git a/src/utils.ts b/src/utils.ts index 22cda4092ba..c4a68dcd524 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1357,38 +1357,23 @@ export async function once(ee: EventEmitter, name: string, options?: Abortabl } export function maybeAddIdToDocuments( - coll: Collection, - docs: Document[], + collection: Collection, + document: Document, options: { forceServerObjectId?: boolean } -): Document[]; -export function maybeAddIdToDocuments( - coll: Collection, - docs: Document, - options: { forceServerObjectId?: boolean } -): Document; -export function maybeAddIdToDocuments( - coll: Collection, - docOrDocs: Document[] | Document, - options: { forceServerObjectId?: boolean } -): Document[] | Document { +): Document { const forceServerObjectId = - typeof options.forceServerObjectId === 'boolean' - ? options.forceServerObjectId - : coll.s.db.options?.forceServerObjectId; + options.forceServerObjectId ?? collection.s.db.options?.forceServerObjectId ?? false; // no need to modify the docs if server sets the ObjectId - if (forceServerObjectId === true) { - return docOrDocs; + if (forceServerObjectId) { + return document; } - const transform = (doc: Document): Document => { - if (doc._id == null) { - doc._id = coll.s.pkFactory.createPk(); - } + if (document._id == null) { + document._id = collection.s.pkFactory.createPk(); + } - return doc; - }; - return Array.isArray(docOrDocs) ? docOrDocs.map(transform) : transform(docOrDocs); + return document; } export async function fileIsAccessible(fileName: string, mode?: number) {