Skip to content

Commit

Permalink
feat: add worker events to pool (#625)
Browse files Browse the repository at this point in the history
Co-authored-by: Robert Nagy <[email protected]>
  • Loading branch information
metcoder95 and ronag authored Oct 2, 2024
1 parent f329b81 commit 0dfc689
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 25 deletions.
25 changes: 19 additions & 6 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ class ThreadPool {
this.balancer = this.options.loadBalancer ?? ResourceBasedBalancer({ maximumUsage: this.options.concurrentTasksPerWorker });
this.workers = new AsynchronouslyCreatedResourcePool<WorkerInfo>(
this.options.concurrentTasksPerWorker);
this.workers.onAvailable((w : WorkerInfo) => this._onWorkerAvailable(w));
this.workers.onTaskDone((w : WorkerInfo) => this._onWorkerTaskDone(w));
this.maxCapacity = this.options.maxThreads * this.options.concurrentTasksPerWorker;

this.startingUp = true;
Expand Down Expand Up @@ -258,10 +258,14 @@ class ThreadPool {
workerInfo.markAsReady();
// We need to emit the event in the next microtask, so that the user can
// attach event listeners before the event is emitted.
queueMicrotask(() => this.publicInterface.emit('workerCreate', workerInfo.interface));
queueMicrotask(() => {
this.publicInterface.emit('workerCreate', workerInfo.interface);
this._onWorkerReady(workerInfo);
});
} else {
workerInfo.onReady(() => {
this.publicInterface.emit('workerCreate', workerInfo.interface);
this._onWorkerReady(workerInfo);
});
}

Expand All @@ -283,7 +287,9 @@ class ThreadPool {
const taskInfo = workerInfo.taskInfos.get(taskId);
workerInfo.taskInfos.delete(taskId);

pool.workers.maybeAvailable(workerInfo);
// TODO: we can abstract the task info handling
// right into the pool.workers.taskDone method
pool.workers.taskDone(workerInfo);

/* istanbul ignore if */
if (taskInfo === undefined) {
Expand Down Expand Up @@ -393,6 +399,14 @@ class ThreadPool {
this.workers.delete(workerInfo);
}

_onWorkerReady (workerInfo : WorkerInfo) : void {
this._onWorkerAvailable(workerInfo);
}

_onWorkerTaskDone (workerInfo: WorkerInfo) : void {
this._onWorkerAvailable(workerInfo);
}

_onWorkerAvailable (workerInfo : WorkerInfo) : void {
// while ((this.taskQueue.size > 0 || this.skipQueue.length > 0) &&
// workerInfo.currentUsage() < this.options.concurrentTasksPerWorker) {
Expand All @@ -415,8 +429,6 @@ class ThreadPool {
// return;
// }

if (this.closingUp) return;

let workers: PiscinaWorker[] | null = null;
while ((this.taskQueue.size > 0 || this.skipQueue.length > 0)) {
// The skipQueue will have tasks that we previously shifted off
Expand Down Expand Up @@ -604,6 +616,7 @@ class ThreadPool {
// this._maybeDrain();
// return ret;
// }

if (this.taskQueue.size > 0) {
const totalCapacity = this.options.maxQueue + this.pendingCapacity();
if (this.taskQueue.size >= totalCapacity) {
Expand Down Expand Up @@ -763,7 +776,7 @@ class ThreadPool {
for (const workerInfo of this.workers) {
checkIfWorkerIsDone(workerInfo);

workerInfo.port.on('message', () => checkIfWorkerIsDone(workerInfo));
this.workers.onTaskDone(checkIfWorkerIsDone);
}
});

Expand Down
16 changes: 16 additions & 0 deletions src/worker_pool/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ export abstract class AsynchronouslyCreatedResource {
}
}

isDestroyed () {
return this.ondestroyListeners === null;
}

abstract currentUsage() : number;
}

Expand All @@ -51,10 +55,12 @@ export class AsynchronouslyCreatedResourcePool<
readyItems = new Set<T>();
maximumUsage : number;
onAvailableListeners : ((item : T) => void)[];
onTaskDoneListeners : ((item : T) => void)[];

constructor (maximumUsage : number) {
this.maximumUsage = maximumUsage;
this.onAvailableListeners = [];
this.onTaskDoneListeners = [];
}

add (item : T) {
Expand Down Expand Up @@ -111,6 +117,16 @@ export class AsynchronouslyCreatedResourcePool<
this.onAvailableListeners.push(fn);
}

taskDone (item : T) {
for (let i = 0; i < this.onTaskDoneListeners.length; i++) {
this.onTaskDoneListeners[i](item);
}
}

onTaskDone (fn : (item : T) => void) {
this.onTaskDoneListeners.push(fn);
}

getCurrentUsage (): number {
let inFlight = 0;
for (const worker of this.readyItems) {
Expand Down
4 changes: 4 additions & 0 deletions src/worker_pool/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ export class WorkerInfo extends AsynchronouslyCreatedResource {
this.histogram = enableHistogram ? createHistogram() : null;
}

get id (): number {
return this.worker.threadId;
}

destroy () : void {
if (this.terminating || this.destroyed) return;

Expand Down
5 changes: 5 additions & 0 deletions test/fixtures/wait-for-notify.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module.exports = function (i32array) {
Atomics.wait(i32array, 0, 0);
Atomics.store(i32array, 0, -1);
Atomics.notify(i32array, 0, Infinity);
};
24 changes: 14 additions & 10 deletions test/histogram.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import Piscina from '..';
import { test } from 'tap';
import { resolve } from 'path';
import { PiscinaWorker } from '../dist/worker_pool';

test('pool will maintain run and wait time histograms by default', async ({ equal, ok }) => {
const pool = new Piscina({
Expand Down Expand Up @@ -67,9 +68,10 @@ test('pool does not maintain run and wait time histograms when recordTiming is f

test('workers has histogram', async t => {
let index = 0;
let list: PiscinaWorker[];
// Its expected to have one task get balanced twice due to the load balancer distribution
// first task enters, its distributed; second is enqueued, once first is done, second is distributed and normalizes
t.plan(44);
t.plan(4);
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/eval.js'),
maxThreads: 1,
Expand All @@ -78,12 +80,10 @@ test('workers has histogram', async t => {
loadBalancer (_task, workers) {
// Verify distribution to properly test this feature
const candidate = workers[index++ % workers.length];
const histogram = candidate.histogram;

t.type(histogram?.average, 'number');
t.type(histogram?.max, 'number');
t.type(histogram?.mean, 'number');
t.type(histogram?.min, 'number');
// We assign it everytime is called to check the histogram
// and that the list remains the same
list = workers;

if (candidate.currentUsage !== 0) {
return {
Expand All @@ -104,13 +104,18 @@ test('workers has histogram', async t => {
tasks.push(pool.run('new Promise(resolve => setTimeout(resolve, 500))'));
}
await Promise.all(tasks);
const histogram = list[0].histogram;
t.type(histogram?.average, 'number');
t.type(histogram?.max, 'number');
t.type(histogram?.mean, 'number');
t.type(histogram?.min, 'number');
});

test('workers does not have histogram if disabled', async t => {
let index = 0;
// Its expected to have one task get balanced twice due to the load balancer distribution
// first task enters, its distributed; second is enqueued, once first is done, second is distributed and normalizes
t.plan(11);
// After each task the balancer is called to distribute the next task
// The first task is distributed, the second is enqueued, once the first is done, the second is distributed and normalizes
t.plan(10 * 2);
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/eval.js'),
maxThreads: 1,
Expand All @@ -120,7 +125,6 @@ test('workers does not have histogram if disabled', async t => {
// Verify distribution to properly test this feature
const candidate = workers[index++ % workers.length];
const histogram = candidate.histogram;

t.notOk(histogram);

if (candidate.currentUsage !== 0) {
Expand Down
2 changes: 1 addition & 1 deletion test/task-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ test('tasks can share a Worker if requested (both tests blocking)', async ({ equ

test('tasks can share a Worker if requested (one test finishes)', async ({ equal, rejects }) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/wait-for-notify.ts'),
filename: resolve(__dirname, 'fixtures/wait-for-notify.js'),
minThreads: 0,
maxThreads: 1,
maxQueue: 0,
Expand Down
33 changes: 25 additions & 8 deletions test/thread-count.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,43 @@
import { resolve } from 'node:path';
import { cpus } from 'node:os';
import { once } from 'node:events';
import Piscina from '..';
import { cpus } from 'os';
import { test } from 'tap';
import { resolve } from 'path';

test('will start with minThreads and max out at maxThreads', async ({ equal, rejects }) => {
test('will start with minThreads and max out at maxThreads', { only: true }, async ({ equal, rejects }) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/eval.js'),
minThreads: 2,
maxThreads: 4
maxThreads: 4,
concurrentTasksPerWorker: 1
});
let counter = 0;

pool.on('workerCreate', () => {
counter++;
});

equal(pool.threads.length, 2);

rejects(pool.run('while(true) {}'));
equal(pool.threads.length, 2);
rejects(pool.run('while(true) {}'));
equal(pool.threads.length, 2);

// #3
rejects(pool.run('while(true) {}'));
equal(pool.threads.length, 3);
await once(pool, 'workerCreate');

// #4
rejects(pool.run('while(true) {}'));
equal(pool.threads.length, 4);
await once(pool, 'workerCreate');

// #4 - as spawn does not happen synchronously anymore, we wait for the signal once more
rejects(pool.run('while(true) {}'));
await once(pool, 'workerCreate');

equal(pool.threads.length, 4);
await pool.destroy();
equal(pool.threads.length, 0);
equal(counter, 4);
});

test('low maxThreads sets minThreads', async ({ equal }) => {
Expand Down
1 change: 1 addition & 0 deletions test/workers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ test('workers are marked as destroyed if destroyed', async t => {
let workersSecondRound = [];
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/eval.js'),
minThreads: 2,
maxThreads: 2,
concurrentTasksPerWorker: 1,
loadBalancer (_task, workers) {
Expand Down

0 comments on commit 0dfc689

Please sign in to comment.