Skip to content

Commit

Permalink
feat: pool events for workers (#624)
Browse files Browse the repository at this point in the history
Co-authored-by: Robert Nagy <[email protected]>
  • Loading branch information
metcoder95 and ronag authored Sep 20, 2024
1 parent b2463b7 commit f329b81
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 90 deletions.
14 changes: 13 additions & 1 deletion docs/docs/api-reference/event.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,16 @@ by number of tasks enqueued that are pending of execution.

## Event: `'message'`

A `'message'` event is emitted whenever a message is received from a worker thread.
A `'message'` event is emitted whenever a message is received from a worker thread.

## Event: `'workerCreate'`

Event that is triggered when a new worker is created.

As argument, it receives the worker instance.

## Event: `'workerDestroy'`

Event that is triggered when a worker is destroyed.

As argument, it receives the worker instance that has been destroyed.
125 changes: 38 additions & 87 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 14 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,22 @@ class ThreadPool {

const { port1, port2 } = new MessageChannel();
const workerInfo = new WorkerInfo(worker, port1, onMessage, this.options.workerHistogram);

workerInfo.onDestroy(() => {
this.publicInterface.emit('workerDestroy', workerInfo.interface);
});

if (this.startingUp) {
// There is no point in waiting for the initial set of Workers to indicate
// that they are ready, we just mark them as such from the start.
workerInfo.markAsReady();
// We need to emit the event in the next microtask, so that the user can
// attach event listeners before the event is emitted.
queueMicrotask(() => this.publicInterface.emit('workerCreate', workerInfo.interface));
} else {
workerInfo.onReady(() => {
this.publicInterface.emit('workerCreate', workerInfo.interface);
});
}

const message : StartupMessage = {
Expand Down Expand Up @@ -440,12 +452,13 @@ class ThreadPool {
}

_distributeTask (task: TaskInfo, workers: PiscinaWorker[]): boolean {
const balancerResult = this.balancer(task.interface, workers);
// TODO: we need to verify if the task is aborted already or not
// otherwise we might be distributing aborted tasks to workers
if (task.aborted) return false;
// console.log(task.interface, balancerResult);

const balancerResult = this.balancer(task.interface, workers);

if (balancerResult.candidate != null) {
const now = performance.now();
this.waitTime?.record(toHistogramIntegerNano(now - task.created));
Expand Down
18 changes: 18 additions & 0 deletions src/worker_pool/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import assert from 'node:assert';

export abstract class AsynchronouslyCreatedResource {
onreadyListeners : (() => void)[] | null = [];
ondestroyListeners : (() => void)[] | null = [];

markAsReady () : void {
const listeners = this.onreadyListeners;
Expand All @@ -24,6 +25,23 @@ export abstract class AsynchronouslyCreatedResource {
this.onreadyListeners.push(fn);
}

onDestroy (fn : () => void) {
if (this.ondestroyListeners === null) {
return;
}

this.ondestroyListeners.push(fn);
}

markAsDestroyed () {
const listeners = this.ondestroyListeners;
assert(listeners !== null);
this.ondestroyListeners = null;
for (const listener of listeners) {
listener();
}
}

abstract currentUsage() : number;
}

Expand Down
3 changes: 2 additions & 1 deletion src/worker_pool/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,11 @@ export class WorkerInfo extends AsynchronouslyCreatedResource {

this.terminating = false;
this.destroyed = true;
this.markAsDestroyed();
}

clearIdleTimeout () : void {
if (this.idleTimeout !== null) {
if (this.idleTimeout != null) {
clearTimeout(this.idleTimeout);
this.idleTimeout = null;
}
Expand Down
Loading

0 comments on commit f329b81

Please sign in to comment.