-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Expand file tree
/
Copy pathcommand_builder.ts
More file actions
478 lines (444 loc) · 14.5 KB
/
command_builder.ts
File metadata and controls
478 lines (444 loc) · 14.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
import { BSON, type Document } from '../../bson';
import { DocumentSequence } from '../../cmap/commands';
import { MongoAPIError, MongoInvalidArgumentError } from '../../error';
import { type PkFactory } from '../../mongo_client';
import type { Filter, OptionalId, UpdateFilter, WithoutId } from '../../mongo_types';
import { formatSort, type SortForCmd } from '../../sort';
import { DEFAULT_PK_FACTORY, hasAtomicOperators } from '../../utils';
import { type CollationOptions } from '../command';
import { type Hint } from '../operation';
import type {
AnyClientBulkWriteModel,
ClientBulkWriteOptions,
ClientDeleteManyModel,
ClientDeleteOneModel,
ClientInsertOneModel,
ClientReplaceOneModel,
ClientUpdateManyModel,
ClientUpdateOneModel
} from './common';
/** @internal */
export interface ClientBulkWriteCommand {
bulkWrite: 1;
errorsOnly: boolean;
ordered: boolean;
ops: DocumentSequence;
nsInfo: DocumentSequence;
bypassDocumentValidation?: boolean;
let?: Document;
comment?: any;
}
/**
* The bytes overhead for the extra fields added post command generation.
*/
const MESSAGE_OVERHEAD_BYTES = 1000;
/** @internal */
export class ClientBulkWriteCommandBuilder {
models: ReadonlyArray<AnyClientBulkWriteModel<Document>>;
options: ClientBulkWriteOptions;
pkFactory: PkFactory;
/** The current index in the models array that is being processed. */
currentModelIndex: number;
/** The model index that the builder was on when it finished the previous batch. Used for resets when retrying. */
previousModelIndex: number;
/** The last array of operations that were created. Used by the results merger for indexing results. */
lastOperations: Document[];
/** Returns true if the current batch being created has no multi-updates. */
isBatchRetryable: boolean;
/**
* Create the command builder.
* @param models - The client write models.
*/
constructor(
models: ReadonlyArray<AnyClientBulkWriteModel<Document>>,
options: ClientBulkWriteOptions,
pkFactory?: PkFactory
) {
this.models = models;
this.options = options;
this.pkFactory = pkFactory ?? DEFAULT_PK_FACTORY;
this.currentModelIndex = 0;
this.previousModelIndex = 0;
this.lastOperations = [];
this.isBatchRetryable = true;
}
/**
* Gets the errorsOnly value for the command, which is the inverse of the
* user provided verboseResults option. Defaults to true.
*/
get errorsOnly(): boolean {
if ('verboseResults' in this.options) {
return !this.options.verboseResults;
}
return true;
}
/**
* Determines if there is another batch to process.
* @returns True if not all batches have been built.
*/
hasNextBatch(): boolean {
return this.currentModelIndex < this.models.length;
}
/**
* When we need to retry a command we need to set the current
* model index back to its previous value.
*/
resetBatch(): boolean {
this.currentModelIndex = this.previousModelIndex;
return true;
}
/**
* Build a single batch of a client bulk write command.
* @param maxMessageSizeBytes - The max message size in bytes.
* @param maxWriteBatchSize - The max write batch size.
* @returns The client bulk write command.
*/
buildBatch(
maxMessageSizeBytes: number,
maxWriteBatchSize: number,
maxBsonObjectSize: number
): ClientBulkWriteCommand {
// We start by assuming the batch has no multi-updates, so it is retryable
// until we find them.
this.isBatchRetryable = true;
let commandLength = 0;
let currentNamespaceIndex = 0;
const command: ClientBulkWriteCommand = this.baseCommand();
const namespaces = new Map<string, number>();
// In the case of retries we need to mark where we started this batch.
this.previousModelIndex = this.currentModelIndex;
while (this.currentModelIndex < this.models.length) {
const model = this.models[this.currentModelIndex];
const ns = model.namespace;
const nsIndex = namespaces.get(ns);
// Multi updates are not retryable.
if (model.name === 'deleteMany' || model.name === 'updateMany') {
this.isBatchRetryable = false;
}
if (nsIndex != null) {
// Build the operation and serialize it to get the bytes buffer.
const operation = buildOperation(model, nsIndex, this.pkFactory);
let operationBuffer;
try {
operationBuffer = BSON.serialize(operation);
} catch (cause) {
throw new MongoInvalidArgumentError(`Could not serialize operation to BSON`, { cause });
}
validateBufferSize('ops', operationBuffer, maxBsonObjectSize);
// Check if the operation buffer can fit in the command. If it can,
// then add the operation to the document sequence and increment the
// current length as long as the ops don't exceed the maxWriteBatchSize.
if (
commandLength + operationBuffer.length < maxMessageSizeBytes &&
command.ops.documents.length < maxWriteBatchSize
) {
// Pushing to the ops document sequence returns the total byte length of the document sequence.
commandLength = MESSAGE_OVERHEAD_BYTES + command.ops.push(operation, operationBuffer);
// Increment the builder's current model index.
this.currentModelIndex++;
} else {
// The operation cannot fit in the current command and will need to
// go in the next batch. Exit the loop.
break;
}
} else {
// The namespace is not already in the nsInfo so we will set it in the map, and
// construct our nsInfo and ops documents and buffers.
namespaces.set(ns, currentNamespaceIndex);
const nsInfo = { ns: ns };
const operation = buildOperation(model, currentNamespaceIndex, this.pkFactory);
let nsInfoBuffer;
let operationBuffer;
try {
nsInfoBuffer = BSON.serialize(nsInfo);
operationBuffer = BSON.serialize(operation);
} catch (cause) {
throw new MongoInvalidArgumentError(`Could not serialize ns info to BSON`, { cause });
}
validateBufferSize('nsInfo', nsInfoBuffer, maxBsonObjectSize);
validateBufferSize('ops', operationBuffer, maxBsonObjectSize);
// Check if the operation and nsInfo buffers can fit in the command. If they
// can, then add the operation and nsInfo to their respective document
// sequences and increment the current length as long as the ops don't exceed
// the maxWriteBatchSize.
if (
commandLength + nsInfoBuffer.length + operationBuffer.length < maxMessageSizeBytes &&
command.ops.documents.length < maxWriteBatchSize
) {
// Pushing to the ops document sequence returns the total byte length of the document sequence.
commandLength =
MESSAGE_OVERHEAD_BYTES +
command.nsInfo.push(nsInfo, nsInfoBuffer) +
command.ops.push(operation, operationBuffer);
// We've added a new namespace, increment the namespace index.
currentNamespaceIndex++;
// Increment the builder's current model index.
this.currentModelIndex++;
} else {
// The operation cannot fit in the current command and will need to
// go in the next batch. Exit the loop.
break;
}
}
}
// Set the last operations and return the command.
this.lastOperations = command.ops.documents;
return command;
}
private baseCommand(): ClientBulkWriteCommand {
const command: ClientBulkWriteCommand = {
bulkWrite: 1,
errorsOnly: this.errorsOnly,
ordered: this.options.ordered ?? true,
ops: new DocumentSequence('ops'),
nsInfo: new DocumentSequence('nsInfo')
};
// Add bypassDocumentValidation if it was present in the options.
if (this.options.bypassDocumentValidation != null) {
command.bypassDocumentValidation = this.options.bypassDocumentValidation;
}
// Add let if it was present in the options.
if (this.options.let) {
command.let = this.options.let;
}
// we check for undefined specifically here to allow falsy values
// eslint-disable-next-line no-restricted-syntax
if (this.options.comment !== undefined) {
command.comment = this.options.comment;
}
return command;
}
}
function validateBufferSize(name: string, buffer: Uint8Array, maxBsonObjectSize: number) {
if (buffer.length > maxBsonObjectSize) {
throw new MongoInvalidArgumentError(
`Client bulk write operation ${name} of length ${buffer.length} exceeds the max bson object size of ${maxBsonObjectSize}`
);
}
}
/** @internal */
interface ClientInsertOperation {
insert: number;
document: OptionalId<Document>;
}
/**
* Build the insert one operation.
* @param model - The insert one model.
* @param index - The namespace index.
* @returns the operation.
*/
export const buildInsertOneOperation = (
model: ClientInsertOneModel<Document>,
index: number,
pkFactory: PkFactory
): ClientInsertOperation => {
const document: ClientInsertOperation = {
insert: index,
document: model.document
};
document.document._id = model.document._id ?? pkFactory.createPk();
return document;
};
/** @internal */
export interface ClientDeleteOperation {
delete: number;
multi: boolean;
filter: Filter<Document>;
hint?: Hint;
collation?: CollationOptions;
}
/**
* Build the delete one operation.
* @param model - The insert many model.
* @param index - The namespace index.
* @returns the operation.
*/
export const buildDeleteOneOperation = (
model: ClientDeleteOneModel<Document>,
index: number
): Document => {
return createDeleteOperation(model, index, false);
};
/**
* Build the delete many operation.
* @param model - The delete many model.
* @param index - The namespace index.
* @returns the operation.
*/
export const buildDeleteManyOperation = (
model: ClientDeleteManyModel<Document>,
index: number
): Document => {
return createDeleteOperation(model, index, true);
};
/**
* Creates a delete operation based on the parameters.
*/
function createDeleteOperation(
model: ClientDeleteOneModel<Document> | ClientDeleteManyModel<Document>,
index: number,
multi: boolean
): ClientDeleteOperation {
const document: ClientDeleteOperation = {
delete: index,
multi: multi,
filter: model.filter
};
if (model.hint) {
document.hint = model.hint;
}
if (model.collation) {
document.collation = model.collation;
}
return document;
}
/** @internal */
export interface ClientUpdateOperation {
update: number;
multi: boolean;
filter: Filter<Document>;
updateMods: UpdateFilter<Document> | Document[];
hint?: Hint;
upsert?: boolean;
arrayFilters?: Document[];
collation?: CollationOptions;
sort?: SortForCmd;
}
/**
* Build the update one operation.
* @param model - The update one model.
* @param index - The namespace index.
* @returns the operation.
*/
export const buildUpdateOneOperation = (
model: ClientUpdateOneModel<Document>,
index: number
): ClientUpdateOperation => {
return createUpdateOperation(model, index, false);
};
/**
* Build the update many operation.
* @param model - The update many model.
* @param index - The namespace index.
* @returns the operation.
*/
export const buildUpdateManyOperation = (
model: ClientUpdateManyModel<Document>,
index: number
): ClientUpdateOperation => {
return createUpdateOperation(model, index, true);
};
/**
* Validate the update document.
* @param update - The update document.
*/
function validateUpdate(update: Document) {
if (!hasAtomicOperators(update)) {
throw new MongoAPIError(
'Client bulk write update models must only contain atomic modifiers (start with $) and must not be empty.'
);
}
}
/**
* Creates a delete operation based on the parameters.
*/
function createUpdateOperation(
model: ClientUpdateOneModel<Document> | ClientUpdateManyModel<Document>,
index: number,
multi: boolean
): ClientUpdateOperation {
// Update documents provided in UpdateOne and UpdateMany write models are
// required only to contain atomic modifiers (i.e. keys that start with "$").
// Drivers MUST throw an error if an update document is empty or if the
// document's first key does not start with "$".
validateUpdate(model.update);
const document: ClientUpdateOperation = {
update: index,
multi: multi,
filter: model.filter,
updateMods: model.update
};
if (model.hint) {
document.hint = model.hint;
}
if (model.upsert) {
document.upsert = model.upsert;
}
if (model.arrayFilters) {
document.arrayFilters = model.arrayFilters;
}
if (model.collation) {
document.collation = model.collation;
}
if (!multi && 'sort' in model && model.sort != null) {
document.sort = formatSort(model.sort);
}
return document;
}
/** @internal */
export interface ClientReplaceOneOperation {
update: number;
multi: boolean;
filter: Filter<Document>;
updateMods: WithoutId<Document>;
hint?: Hint;
upsert?: boolean;
collation?: CollationOptions;
sort?: SortForCmd;
}
/**
* Build the replace one operation.
* @param model - The replace one model.
* @param index - The namespace index.
* @returns the operation.
*/
export const buildReplaceOneOperation = (
model: ClientReplaceOneModel<Document>,
index: number
): ClientReplaceOneOperation => {
if (hasAtomicOperators(model.replacement)) {
throw new MongoAPIError(
'Client bulk write replace models must not contain atomic modifiers (start with $) and must not be empty.'
);
}
const document: ClientReplaceOneOperation = {
update: index,
multi: false,
filter: model.filter,
updateMods: model.replacement
};
if (model.hint) {
document.hint = model.hint;
}
if (model.upsert) {
document.upsert = model.upsert;
}
if (model.collation) {
document.collation = model.collation;
}
if (model.sort != null) {
document.sort = formatSort(model.sort);
}
return document;
};
/** @internal */
export function buildOperation(
model: AnyClientBulkWriteModel<Document>,
index: number,
pkFactory: PkFactory
): Document {
switch (model.name) {
case 'insertOne':
return buildInsertOneOperation(model, index, pkFactory);
case 'deleteOne':
return buildDeleteOneOperation(model, index);
case 'deleteMany':
return buildDeleteManyOperation(model, index);
case 'updateOne':
return buildUpdateOneOperation(model, index);
case 'updateMany':
return buildUpdateManyOperation(model, index);
case 'replaceOne':
return buildReplaceOneOperation(model, index);
}
}