Skip to content

Commit 811bc69

Browse files
committed
add chunking for annotations and search
1 parent 9b75e89 commit 811bc69

5 files changed

Lines changed: 312 additions & 44 deletions

File tree

packages/engines/src/lib/orchestrator/pdf-engine.ts

Lines changed: 90 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import {
2+
BatchProgress,
23
Logger,
34
NoopLogger,
45
PdfEngine as IPdfEngine,
@@ -50,7 +51,7 @@ import type { ImageDataConverter } from '../converters/types';
5051

5152
// Re-export for convenience
5253
export type { ImageDataConverter } from '../converters/types';
53-
export type { ImageDataLike, IPdfiumExecutor } from '@embedpdf/models';
54+
export type { ImageDataLike, IPdfiumExecutor, BatchProgress } from '@embedpdf/models';
5455

5556
const LOG_SOURCE = 'PdfEngine';
5657
const LOG_CATEGORY = 'Orchestrator';
@@ -107,6 +108,17 @@ export class PdfEngine<T = Blob> implements IPdfEngine<T> {
107108
this.logger.debug(LOG_SOURCE, LOG_CATEGORY, 'PdfEngine orchestrator created');
108109
}
109110

111+
/**
112+
* Split an array into chunks of a given size
113+
*/
114+
private chunkArray<U>(items: U[], chunkSize: number): U[][] {
115+
const chunks: U[][] = [];
116+
for (let i = 0; i < items.length; i += chunkSize) {
117+
chunks.push(items.slice(i, i + chunkSize));
118+
}
119+
return chunks;
120+
}
121+
110122
/**
111123
* Update visible pages for visibility-based task ranking
112124
*/
@@ -453,21 +465,52 @@ export class PdfEngine<T = Blob> implements IPdfEngine<T> {
453465

454466
/**
455467
* Get all annotations across all pages
456-
* Orchestrates LOW priority tasks for each page
468+
* Uses batched operations to reduce queue overhead
457469
*/
458470
getAllAnnotations(
459471
doc: PdfDocumentObject,
460472
): CompoundTask<Record<number, PdfAnnotationObject[]>, PdfErrorReason, PdfAnnotationsProgress> {
461-
const tasks = doc.pages.map((page, index) =>
462-
this.workerQueue.enqueue(
473+
// Chunk pages for batched processing
474+
const chunks = this.chunkArray(doc.pages, 500);
475+
476+
this.logger.debug(
477+
LOG_SOURCE,
478+
LOG_CATEGORY,
479+
`getAllAnnotations: ${doc.pages.length} pages in ${chunks.length} chunks`,
480+
);
481+
482+
// Create compound task for result aggregation
483+
const compound = new CompoundTask<
484+
Record<number, PdfAnnotationObject[]>,
485+
PdfErrorReason,
486+
PdfAnnotationsProgress
487+
>({
488+
aggregate: (results) => Object.assign({}, ...results),
489+
});
490+
491+
// Create one task per chunk and wire up progress forwarding
492+
chunks.forEach((chunkPages, chunkIndex) => {
493+
const batchTask = this.workerQueue.enqueue(
463494
{
464-
execute: () => this.executor.getPageAnnotationsRaw(doc, page),
465-
meta: { docId: doc.id, pageIndex: index, operation: 'getAnnotations' },
495+
execute: () => this.executor.getAnnotationsBatch(doc, chunkPages),
496+
meta: { docId: doc.id, operation: 'getAnnotationsBatch', chunkSize: chunkPages.length },
466497
},
467498
{ priority: Priority.LOW },
468-
),
469-
);
470-
return CompoundTask.gatherIndexed(tasks);
499+
);
500+
501+
// Forward batch progress (per-page) to compound task
502+
batchTask.onProgress((batchProgress: BatchProgress<PdfAnnotationObject[]>) => {
503+
compound.progress({
504+
page: batchProgress.pageIndex,
505+
result: batchProgress.result,
506+
});
507+
});
508+
509+
compound.addChild(batchTask, chunkIndex);
510+
});
511+
512+
compound.finalize();
513+
return compound;
471514
}
472515

473516
getPageTextRects(doc: PdfDocumentObject, page: PdfPageObject): PdfTask<PdfTextRectObject[]> {
@@ -486,7 +529,7 @@ export class PdfEngine<T = Blob> implements IPdfEngine<T> {
486529

487530
/**
488531
* Search across all pages
489-
* Orchestrates LOW priority tasks for each page
532+
* Uses batched operations to reduce queue overhead
490533
*/
491534
searchAllPages(
492535
doc: PdfDocumentObject,
@@ -497,26 +540,49 @@ export class PdfEngine<T = Blob> implements IPdfEngine<T> {
497540
? options.flags.reduce((acc, flag) => acc | flag, 0)
498541
: (options?.flags ?? 0);
499542

500-
const tasks = doc.pages.map((page, index) =>
501-
this.workerQueue.enqueue(
502-
{
503-
execute: () => this.executor.searchInPage(doc, page, keyword, flags),
504-
meta: { docId: doc.id, pageIndex: index, operation: 'search' },
505-
},
506-
{ priority: Priority.LOW },
507-
),
543+
// Chunk pages for batched processing
544+
const chunks = this.chunkArray(doc.pages, 25);
545+
546+
this.logger.debug(
547+
LOG_SOURCE,
548+
LOG_CATEGORY,
549+
`searchAllPages: ${doc.pages.length} pages in ${chunks.length} chunks`,
508550
);
509551

510-
return CompoundTask.gatherFrom(tasks, {
552+
// Create compound task for result aggregation
553+
const compound = new CompoundTask<SearchAllPagesResult, PdfErrorReason, PdfPageSearchProgress>({
511554
aggregate: (results) => {
512-
const allResults = results.flat();
555+
// Merge all batch results into a flat array
556+
const allResults = results.flatMap((batchResult: Record<number, SearchResult[]>) =>
557+
Object.values(batchResult).flat(),
558+
);
513559
return { results: allResults, total: allResults.length };
514560
},
515-
onChildComplete: (_completed, _total, results, index) => ({
516-
page: index,
517-
results,
518-
}),
519561
});
562+
563+
// Create one task per chunk and wire up progress forwarding
564+
chunks.forEach((chunkPages, chunkIndex) => {
565+
const batchTask = this.workerQueue.enqueue(
566+
{
567+
execute: () => this.executor.searchBatch(doc, chunkPages, keyword, flags),
568+
meta: { docId: doc.id, operation: 'searchBatch', chunkSize: chunkPages.length },
569+
},
570+
{ priority: Priority.LOW },
571+
);
572+
573+
// Forward batch progress (per-page) to compound task
574+
batchTask.onProgress((batchProgress: BatchProgress<SearchResult[]>) => {
575+
compound.progress({
576+
page: batchProgress.pageIndex,
577+
results: batchProgress.result,
578+
});
579+
});
580+
581+
compound.addChild(batchTask, chunkIndex);
582+
});
583+
584+
compound.finalize();
585+
return compound;
520586
}
521587

522588
// ========== Attachments ==========

packages/engines/src/lib/orchestrator/remote-executor.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import {
2+
BatchProgress,
23
Logger,
34
NoopLogger,
45
PdfDocumentObject,
@@ -82,6 +83,8 @@ type MessageType =
8283
| 'removePageAnnotation'
8384
| 'getPageTextRects'
8485
| 'searchInPage'
86+
| 'getAnnotationsBatch'
87+
| 'searchBatch'
8588
| 'getAttachments'
8689
| 'addAttachment'
8790
| 'removeAttachment'
@@ -387,6 +390,30 @@ export class RemoteExecutor implements IPdfiumExecutor {
387390
return this.send<SearchResult[]>('searchInPage', [doc, page, keyword, flags]);
388391
}
389392

393+
getAnnotationsBatch(
394+
doc: PdfDocumentObject,
395+
pages: PdfPageObject[],
396+
): PdfTask<Record<number, PdfAnnotationObject[]>, BatchProgress<PdfAnnotationObject[]>> {
397+
return this.send<Record<number, PdfAnnotationObject[]>, BatchProgress<PdfAnnotationObject[]>>(
398+
'getAnnotationsBatch',
399+
[doc, pages],
400+
);
401+
}
402+
403+
searchBatch(
404+
doc: PdfDocumentObject,
405+
pages: PdfPageObject[],
406+
keyword: string,
407+
flags: number,
408+
): PdfTask<Record<number, SearchResult[]>, BatchProgress<SearchResult[]>> {
409+
return this.send<Record<number, SearchResult[]>, BatchProgress<SearchResult[]>>('searchBatch', [
410+
doc,
411+
pages,
412+
keyword,
413+
flags,
414+
]);
415+
}
416+
390417
getAttachments(doc: PdfDocumentObject): PdfTask<PdfAttachmentObject[]> {
391418
return this.send<PdfAttachmentObject[]>('getAttachments', [doc]);
392419
}

0 commit comments

Comments
 (0)