diff --git a/src/index.ts b/src/index.ts index 7157a057..59f3fdd8 100644 --- a/src/index.ts +++ b/src/index.ts @@ -258,10 +258,14 @@ class ThreadPool { workerInfo.markAsReady(); // We need to emit the event in the next microtask, so that the user can // attach event listeners before the event is emitted. - queueMicrotask(() => this.publicInterface.emit('workerCreate', workerInfo.interface)); + queueMicrotask(() => { + this.publicInterface.emit('workerCreate', workerInfo.interface); + this._onWorkerReady(workerInfo); + }); } else { workerInfo.onReady(() => { this.publicInterface.emit('workerCreate', workerInfo.interface); + this._onWorkerReady(workerInfo); }); } @@ -425,8 +429,6 @@ class ThreadPool { // return; // } - if (this.closingUp) return; - let workers: PiscinaWorker[] | null = null; while ((this.taskQueue.size > 0 || this.skipQueue.length > 0)) { // The skipQueue will have tasks that we previously shifted off @@ -614,6 +616,7 @@ class ThreadPool { // this._maybeDrain(); // return ret; // } + if (this.taskQueue.size > 0) { const totalCapacity = this.options.maxQueue + this.pendingCapacity(); if (this.taskQueue.size >= totalCapacity) { @@ -773,7 +776,7 @@ class ThreadPool { for (const workerInfo of this.workers) { checkIfWorkerIsDone(workerInfo); - workerInfo.port.on('message', () => checkIfWorkerIsDone(workerInfo)); + this.workers.onTaskDone(checkIfWorkerIsDone); } }); diff --git a/src/worker_pool/base.ts b/src/worker_pool/base.ts index 59e2dfd4..3b322087 100644 --- a/src/worker_pool/base.ts +++ b/src/worker_pool/base.ts @@ -42,6 +42,10 @@ export abstract class AsynchronouslyCreatedResource { } } + isDestroyed () { + return this.ondestroyListeners === null; + } + abstract currentUsage() : number; } diff --git a/src/worker_pool/index.ts b/src/worker_pool/index.ts index 045b2866..1650528e 100644 --- a/src/worker_pool/index.ts +++ b/src/worker_pool/index.ts @@ -54,6 +54,10 @@ export class WorkerInfo extends AsynchronouslyCreatedResource { this.histogram = enableHistogram ? createHistogram() : null; } + get id (): number { + return this.worker.threadId; + } + destroy () : void { if (this.terminating || this.destroyed) return; diff --git a/test/histogram.ts b/test/histogram.ts index 49625b0c..e14d6f29 100644 --- a/test/histogram.ts +++ b/test/histogram.ts @@ -115,7 +115,7 @@ test('workers does not have histogram if disabled', async t => { let index = 0; // After each task the balancer is called to distribute the next task // The first task is distributed, the second is enqueued, once the first is done, the second is distributed and normalizes - t.plan((10 * 2) - 1); + t.plan(10 * 2); const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/eval.js'), maxThreads: 1, diff --git a/test/thread-count.ts b/test/thread-count.ts index 96d3bcf9..550db7aa 100644 --- a/test/thread-count.ts +++ b/test/thread-count.ts @@ -1,26 +1,43 @@ +import { resolve } from 'node:path'; +import { cpus } from 'node:os'; +import { once } from 'node:events'; import Piscina from '..'; -import { cpus } from 'os'; import { test } from 'tap'; -import { resolve } from 'path'; -test('will start with minThreads and max out at maxThreads', async ({ equal, rejects }) => { +test('will start with minThreads and max out at maxThreads', { only: true }, async ({ equal, rejects }) => { const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/eval.js'), minThreads: 2, - maxThreads: 4 + maxThreads: 4, + concurrentTasksPerWorker: 1 + }); + let counter = 0; + + pool.on('workerCreate', () => { + counter++; }); + equal(pool.threads.length, 2); + rejects(pool.run('while(true) {}')); - equal(pool.threads.length, 2); rejects(pool.run('while(true) {}')); - equal(pool.threads.length, 2); + + // #3 rejects(pool.run('while(true) {}')); - equal(pool.threads.length, 3); + await once(pool, 'workerCreate'); + + // #4 rejects(pool.run('while(true) {}')); - equal(pool.threads.length, 4); + await once(pool, 'workerCreate'); + + // #4 - as spawn does not happen synchronously anymore, we wait for the signal once more rejects(pool.run('while(true) {}')); + await once(pool, 'workerCreate'); + equal(pool.threads.length, 4); await pool.destroy(); + equal(pool.threads.length, 0); + equal(counter, 4); }); test('low maxThreads sets minThreads', async ({ equal }) => {