Skip to content

Commit

Permalink
fix: fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
metcoder95 committed Sep 25, 2024
1 parent 2fb79f2 commit 64381da
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 14 deletions.
11 changes: 7 additions & 4 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,14 @@ class ThreadPool {
workerInfo.markAsReady();
// We need to emit the event in the next microtask, so that the user can
// attach event listeners before the event is emitted.
queueMicrotask(() => this.publicInterface.emit('workerCreate', workerInfo.interface));
queueMicrotask(() => {
this.publicInterface.emit('workerCreate', workerInfo.interface);
this._onWorkerReady(workerInfo);
});
} else {
workerInfo.onReady(() => {
this.publicInterface.emit('workerCreate', workerInfo.interface);
this._onWorkerReady(workerInfo);
});
}

Expand Down Expand Up @@ -425,8 +429,6 @@ class ThreadPool {
// return;
// }

if (this.closingUp) return;

let workers: PiscinaWorker[] | null = null;
while ((this.taskQueue.size > 0 || this.skipQueue.length > 0)) {
// The skipQueue will have tasks that we previously shifted off
Expand Down Expand Up @@ -614,6 +616,7 @@ class ThreadPool {
// this._maybeDrain();
// return ret;
// }

if (this.taskQueue.size > 0) {
const totalCapacity = this.options.maxQueue + this.pendingCapacity();
if (this.taskQueue.size >= totalCapacity) {
Expand Down Expand Up @@ -773,7 +776,7 @@ class ThreadPool {
for (const workerInfo of this.workers) {
checkIfWorkerIsDone(workerInfo);

workerInfo.port.on('message', () => checkIfWorkerIsDone(workerInfo));
this.workers.onTaskDone(checkIfWorkerIsDone);
}
});

Expand Down
4 changes: 4 additions & 0 deletions src/worker_pool/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ export abstract class AsynchronouslyCreatedResource {
}
}

isDestroyed () {
return this.ondestroyListeners === null;
}

abstract currentUsage() : number;
}

Expand Down
4 changes: 4 additions & 0 deletions src/worker_pool/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ export class WorkerInfo extends AsynchronouslyCreatedResource {
this.histogram = enableHistogram ? createHistogram() : null;
}

get id (): number {
return this.worker.threadId;
}

destroy () : void {
if (this.terminating || this.destroyed) return;

Expand Down
5 changes: 5 additions & 0 deletions test/fixtures/wait-for-notify.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module.exports = function (i32array) {
Atomics.wait(i32array, 0, 0);
Atomics.store(i32array, 0, -1);
Atomics.notify(i32array, 0, Infinity);
};
2 changes: 1 addition & 1 deletion test/histogram.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ test('workers does not have histogram if disabled', async t => {
let index = 0;
// After each task the balancer is called to distribute the next task
// The first task is distributed, the second is enqueued, once the first is done, the second is distributed and normalizes
t.plan((10 * 2) - 1);
t.plan(10 * 2);
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/eval.js'),
maxThreads: 1,
Expand Down
2 changes: 1 addition & 1 deletion test/task-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ test('tasks can share a Worker if requested (both tests blocking)', async ({ equ

test('tasks can share a Worker if requested (one test finishes)', async ({ equal, rejects }) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/wait-for-notify.ts'),
filename: resolve(__dirname, 'fixtures/wait-for-notify.js'),
minThreads: 0,
maxThreads: 1,
maxQueue: 0,
Expand Down
33 changes: 25 additions & 8 deletions test/thread-count.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,43 @@
import { resolve } from 'node:path';
import { cpus } from 'node:os';
import { once } from 'node:events';
import Piscina from '..';
import { cpus } from 'os';
import { test } from 'tap';
import { resolve } from 'path';

test('will start with minThreads and max out at maxThreads', async ({ equal, rejects }) => {
test('will start with minThreads and max out at maxThreads', { only: true }, async ({ equal, rejects }) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/eval.js'),
minThreads: 2,
maxThreads: 4
maxThreads: 4,
concurrentTasksPerWorker: 1
});
let counter = 0;

pool.on('workerCreate', () => {
counter++;
});

equal(pool.threads.length, 2);

rejects(pool.run('while(true) {}'));
equal(pool.threads.length, 2);
rejects(pool.run('while(true) {}'));
equal(pool.threads.length, 2);

// #3
rejects(pool.run('while(true) {}'));
equal(pool.threads.length, 3);
await once(pool, 'workerCreate');

// #4
rejects(pool.run('while(true) {}'));
equal(pool.threads.length, 4);
await once(pool, 'workerCreate');

// #4 - as spawn does not happen synchronously anymore, we wait for the signal once more
rejects(pool.run('while(true) {}'));
await once(pool, 'workerCreate');

equal(pool.threads.length, 4);
await pool.destroy();
equal(pool.threads.length, 0);
equal(counter, 4);
});

test('low maxThreads sets minThreads', async ({ equal }) => {
Expand Down

0 comments on commit 64381da

Please sign in to comment.