Skip to content

Commit f7e03be

Browse files
committed
Properly wait for worker to be ready before sending tasks
1 parent 43885a3 commit f7e03be

4 files changed

Lines changed: 86 additions & 60 deletions

File tree

packages/engines/src/lib/orchestrator/pdf-engine.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ export class PdfEngine<T = Blob> implements IPdfEngine<T> {
210210
this.workerQueue = new WorkerTaskQueue({
211211
concurrency: 1,
212212
autoStart: true,
213+
logger: this.logger,
213214
});
214215

215216
this.logger.debug(LOG_SOURCE, LOG_CATEGORY, 'PdfEngine orchestrator created');

packages/engines/src/lib/orchestrator/remote-executor.ts

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,11 @@ type MessageType =
110110
* - Progress tracking
111111
*/
112112
export class RemoteExecutor implements IPdfExecutor {
113+
private static READY_TASK_ID = '0';
113114
private pendingRequests = new Map<string, Task<any, any>>();
114115
private requestCounter = 0;
115116
private logger: Logger;
116-
private initialized = false;
117+
private readyTask: Task<boolean, PdfErrorReason>;
117118

118119
constructor(
119120
private worker: Worker,
@@ -122,9 +123,13 @@ export class RemoteExecutor implements IPdfExecutor {
122123
this.logger = options.logger ?? new NoopLogger();
123124
this.worker.addEventListener('message', this.handleMessage);
124125

126+
// Create ready task - will be resolved when worker sends 'ready'
127+
this.readyTask = new Task<boolean, PdfErrorReason>();
128+
this.pendingRequests.set(RemoteExecutor.READY_TASK_ID, this.readyTask);
129+
125130
// Send initialization message with WASM URL
126131
this.worker.postMessage({
127-
id: '0',
132+
id: RemoteExecutor.READY_TASK_ID,
128133
type: 'wasmInit',
129134
wasmUrl: options.wasmUrl,
130135
logger: options.logger ? serializeLogger(options.logger) : undefined,
@@ -142,22 +147,39 @@ export class RemoteExecutor implements IPdfExecutor {
142147

143148
/**
144149
* Send a message to the worker and return a Task
150+
* Waits for worker to be ready before sending
145151
*/
146152
private send<T, P = unknown>(method: MessageType, args: any[]): Task<T, PdfErrorReason, P> {
147153
const id = this.generateId();
148154
const task = new Task<T, PdfErrorReason, P>();
149155

150-
this.pendingRequests.set(id, task);
151-
152156
const request: WorkerRequest = {
153157
id,
154158
type: 'execute',
155159
method,
156160
args,
157161
};
158162

159-
this.logger.debug(LOG_SOURCE, LOG_CATEGORY, `Sending ${method} request:`, id);
160-
this.worker.postMessage(request);
163+
// Wait for worker to be ready before sending
164+
this.readyTask.wait(
165+
() => {
166+
this.pendingRequests.set(id, task);
167+
this.logger.debug(LOG_SOURCE, LOG_CATEGORY, `Sending ${method} request:`, id);
168+
this.worker.postMessage(request);
169+
},
170+
(error) => {
171+
this.logger.error(
172+
LOG_SOURCE,
173+
LOG_CATEGORY,
174+
`Worker init failed, rejecting ${method}:`,
175+
error,
176+
);
177+
task.reject({
178+
code: PdfErrorCode.Initialization,
179+
message: 'Worker initialization failed',
180+
});
181+
},
182+
);
161183

162184
return task;
163185
}
@@ -168,10 +190,10 @@ export class RemoteExecutor implements IPdfExecutor {
168190
private handleMessage = (event: MessageEvent<WorkerResponse>) => {
169191
const response = event.data;
170192

171-
// Handle ready response separately
193+
// Handle ready response - resolve the readyTask
172194
if (response.type === 'ready') {
173-
this.initialized = true;
174195
this.logger.debug(LOG_SOURCE, LOG_CATEGORY, 'Worker is ready');
196+
this.readyTask.resolve(true);
175197
return;
176198
}
177199

@@ -221,10 +243,12 @@ export class RemoteExecutor implements IPdfExecutor {
221243
destroy(): void {
222244
this.worker.removeEventListener('message', this.handleMessage);
223245

224-
// Reject all pending requests
246+
// Reject all pending requests (except readyTask)
225247
this.pendingRequests.forEach((task, id) => {
226-
task.abort('Worker destroyed');
227-
this.logger.debug(LOG_SOURCE, LOG_CATEGORY, `Aborted pending request: ${id}`);
248+
if (id !== RemoteExecutor.READY_TASK_ID) {
249+
task.abort('Worker destroyed');
250+
this.logger.debug(LOG_SOURCE, LOG_CATEGORY, `Aborted pending request: ${id}`);
251+
}
228252
});
229253
this.pendingRequests.clear();
230254

@@ -235,10 +259,7 @@ export class RemoteExecutor implements IPdfExecutor {
235259
// ========== IPdfExecutor Implementation ==========
236260

237261
initialize(): void {
238-
if (this.initialized) return;
239-
// Initialization is handled by worker creation
240-
// We just mark it as initialized here
241-
this.initialized = true;
262+
// Initialization is handled by worker creation via readyTask
242263
this.logger.debug(LOG_SOURCE, LOG_CATEGORY, 'RemoteExecutor initialized');
243264
}
244265

packages/engines/src/lib/orchestrator/task-queue.ts

Lines changed: 42 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
import { Task, TaskError } from '@embedpdf/models';
1+
import { Task, TaskError, Logger, NoopLogger } from '@embedpdf/models';
2+
3+
const LOG_SOURCE = 'TaskQueue';
4+
const LOG_CATEGORY = 'Queue';
25

36
export enum Priority {
47
CRITICAL = 3,
@@ -54,6 +57,7 @@ export interface WorkerTaskQueueOptions {
5457
onIdle?: () => void;
5558
maxQueueSize?: number;
5659
autoStart?: boolean;
60+
logger?: Logger;
5761
}
5862

5963
// ============================================================================
@@ -65,13 +69,23 @@ export class WorkerTaskQueue {
6569
private running = 0;
6670
private resultTasks = new Map<string, Task<any, any, any>>();
6771
private visiblePages = new Map<number, number>();
68-
private opts: Required<Omit<WorkerTaskQueueOptions, 'comparator' | 'ranker'>> & {
72+
private logger: Logger;
73+
private opts: Required<Omit<WorkerTaskQueueOptions, 'comparator' | 'ranker' | 'logger'>> & {
6974
comparator?: TaskComparator;
7075
ranker?: TaskRanker;
7176
};
7277

7378
constructor(options: WorkerTaskQueueOptions = {}) {
74-
const { concurrency = 1, comparator, ranker, onIdle, maxQueueSize, autoStart = true } = options;
79+
const {
80+
concurrency = 1,
81+
comparator,
82+
ranker,
83+
onIdle,
84+
maxQueueSize,
85+
autoStart = true,
86+
logger,
87+
} = options;
88+
this.logger = logger ?? new NoopLogger();
7589
this.opts = {
7690
concurrency: Math.max(1, concurrency),
7791
comparator,
@@ -179,22 +193,17 @@ export class WorkerTaskQueue {
179193

180194
this.queue.push(queuedTask);
181195

182-
console.log(
183-
'[TaskQueue] Task enqueued:',
184-
id,
185-
'| Priority:',
186-
priority,
187-
'| Running:',
188-
this.running,
189-
'| Queued:',
190-
this.queue.length,
196+
this.logger.debug(
197+
LOG_SOURCE,
198+
LOG_CATEGORY,
199+
`Task enqueued: ${id} | Priority: ${priority} | Running: ${this.running} | Queued: ${this.queue.length}`,
191200
);
192201

193202
// Set up automatic abort handling
194203
// When result task is aborted externally, remove from queue
195204
const originalAbort = resultTask.abort.bind(resultTask);
196205
resultTask.abort = (reason: any) => {
197-
console.log('[TaskQueue] Task aborted:', id);
206+
this.logger.debug(LOG_SOURCE, LOG_CATEGORY, `Task aborted: ${id}`);
198207
this.cancel(id);
199208
originalAbort(reason);
200209
};
@@ -220,7 +229,7 @@ export class WorkerTaskQueue {
220229
this.resultTasks.delete(taskId);
221230

222231
if (before !== this.queue.length) {
223-
console.log('[TaskQueue] Task cancelled and removed:', taskId);
232+
this.logger.debug(LOG_SOURCE, LOG_CATEGORY, `Task cancelled and removed: ${taskId}`);
224233
this.kick();
225234
}
226235
}
@@ -230,28 +239,24 @@ export class WorkerTaskQueue {
230239
}
231240

232241
private async process(fifo = false): Promise<void> {
233-
console.log(
234-
'[TaskQueue] process() called | Running:',
235-
this.running,
236-
'| Concurrency:',
237-
this.opts.concurrency,
238-
'| Queued:',
239-
this.queue.length,
242+
this.logger.debug(
243+
LOG_SOURCE,
244+
LOG_CATEGORY,
245+
`process() called | Running: ${this.running} | Concurrency: ${this.opts.concurrency} | Queued: ${this.queue.length}`,
240246
);
241247

242248
while (this.running < this.opts.concurrency && this.queue.length > 0) {
243-
console.log(
244-
'[TaskQueue] Starting new task | Running:',
245-
this.running,
246-
'| Queued:',
247-
this.queue.length,
249+
this.logger.debug(
250+
LOG_SOURCE,
251+
LOG_CATEGORY,
252+
`Starting new task | Running: ${this.running} | Queued: ${this.queue.length}`,
248253
);
249254

250255
if (!fifo) this.sortQueue();
251256

252257
const queuedTask = this.queue.shift()!;
253258
if (queuedTask.cancelled) {
254-
console.log('[TaskQueue] Skipping cancelled task:', queuedTask.id);
259+
this.logger.debug(LOG_SOURCE, LOG_CATEGORY, `Skipping cancelled task: ${queuedTask.id}`);
255260
continue;
256261
}
257262

@@ -307,13 +312,10 @@ export class WorkerTaskQueue {
307312
this.resultTasks.delete(queuedTask.id);
308313
this.running--;
309314

310-
console.log(
311-
'[TaskQueue] Task completed:',
312-
queuedTask.id,
313-
'| Running:',
314-
this.running,
315-
'| Queued:',
316-
this.queue.length,
315+
this.logger.debug(
316+
LOG_SOURCE,
317+
LOG_CATEGORY,
318+
`Task completed: ${queuedTask.id} | Running: ${this.running} | Queued: ${this.queue.length}`,
317319
);
318320

319321
if (this.isIdle()) {
@@ -323,7 +325,12 @@ export class WorkerTaskQueue {
323325
}
324326
}
325327
})().catch((error) => {
326-
console.error('[TaskQueue] Unhandled error in task execution wrapper:', error);
328+
this.logger.error(
329+
LOG_SOURCE,
330+
LOG_CATEGORY,
331+
'Unhandled error in task execution wrapper:',
332+
error,
333+
);
327334
this.running = Math.max(0, this.running - 1);
328335
if (this.isIdle()) {
329336
this.notifyIdle();

packages/engines/src/lib/pdfium/worker.ts

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,12 @@
11
import { deserializeLogger } from '@embedpdf/models';
22
import { PdfiumEngineRunner } from './runner';
3-
import { PdfiumNativeRunner } from '../orchestrator';
43

5-
let runner: PdfiumNativeRunner | null = null;
6-
7-
console.log('worker');
4+
let runner: PdfiumEngineRunner | null = null;
85

96
// Listen for initialization message
10-
self.onmessage = async (event) => {
7+
self.onmessage = async (event: MessageEvent) => {
118
const { type, wasmUrl, logger: serializedLogger } = event.data;
129

13-
console.log('wasmInit', type, wasmUrl, serializedLogger);
14-
1510
if (type === 'wasmInit' && wasmUrl && !runner) {
1611
try {
1712
const response = await fetch(wasmUrl);
@@ -20,9 +15,11 @@ self.onmessage = async (event) => {
2015
// Deserialize the logger if provided
2116
const logger = serializedLogger ? deserializeLogger(serializedLogger) : undefined;
2217

23-
const nativeRunner = new PdfiumEngineRunner(wasmBinary, logger);
24-
25-
await nativeRunner.prepare();
18+
runner = new PdfiumEngineRunner(wasmBinary, logger);
19+
await runner.prepare();
20+
// runner.prepare() calls ready() which:
21+
// 1. Sets self.onmessage to runner.handle()
22+
// 2. Sends 'ready' message to main thread
2623
} catch (error) {
2724
const message = error instanceof Error ? error.message : String(error);
2825
self.postMessage({ type: 'wasmError', error: message });

0 commit comments

Comments
 (0)