Skip to content

sinclairzx81/threadbox

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

77 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

ThreadBox

Recursive Worker Threads in NodeJS

npm version Build Status

Example

The following replicates the above worker graph.

import { Thread, Sender, Receiver } from '@sinclair/threadbox'

const WorkerC = Thread.Worker(class {
  run() {
    return Math.random()
  }
})

const WorkerB = Thread.Worker(class {
  async run(sender: Sender) {
    const c_0 = Thread.Spawn(WorkerC)
    const c_1 = Thread.Spawn(WorkerC)
    const c_2 = Thread.Spawn(WorkerC)
    const c_3 = Thread.Spawn(WorkerC)
    const [a, b, c, d] = await Promise.all([
      c_0.run(),
      c_1.run(),
      c_2.run(),
      c_3.run(),
    ])
    await sender.send([a, b, c, d])
    await sender.end()
    await c_0.dispose()
    await c_1.dispose()
    await c_2.dispose()
    await c_3.dispose()
  }
})
const WorkerA = Thread.Worker(class {
  async run(receiver: Receiver) {
    for await(const [a, b, c, d] of receiver) { }
  }
})

// start here ...
Thread.Main(() => {
  const [sender, receiver] = Thread.Channel()
  const a = Thread.Spawn(WorkerA)
  const b = Thread.Spawn(WorkerB)
  await Promise.all([
    a.run(receiver),
    b.run(sender) 
  ])
  await a.dispose()
  await b.dispose()
})

Overview

ThreadBox is a threading library for JavaScript built on top of NodeJS worker_threads. It is written to allow for compute intensive and potentially blocking JavaScript routines to be easily executed in remote worker threads. ThreadBox uses a recursive threading model, where spawned threads are created by re-running the applications entry module (typically app.js). This approach allows for ergonomic threading, but requires code executed in the global scope to be moved into functions and classes.

This project is written as a research project to explore the potential for recursive threading in Node. It is offered to anyone who may find it of use.

Licence MIT

Install

$ npm install @sinclair/threadbox --save

Contents

Main

Use Thread.Main(...) to define the application entry point. This function will only be called once when the process starts, and ignored for subsequent threads.

import { Thread } from '@sinclair/threadbox'

Thread.Main(() => {
  
  console.log('Hello World')
  
})

Worker

Use Thread.Worker(...) to denote a class as threadable. This enables the class to be spawned via Thread.Spawn(...). The return type of this function returns the inner constructor that can be instanced in the current thread.

import { Thread } from '@sinclair/threadbox'

const Basic = Thread.Worker(class {
    add(a: number, b: number) {
        return a + b
    }
    dispose() { 
        console.log('disposed!')
    }
})

Thread.Main(async () => {
    // instance as thread
    const thread = Thread.Spawn(Basic)
    console.log(await thread.add(10, 20))
    await thread.dispose()

    // instance as local
    const local = new Basic()
    console.log(local.add(10, 20))
})

Spawn

The Thread.Spawn(...) to spawn a new constructor in a remote worker thread. This function takes the threadable constructor as it's first argument followed by any parameters defined for the constructor.

import { Thread } from '@sinclair/threadbox'

const Runner = Thread.Worker(class {
  constructor(private taskName: string) {
    console.log(`Runner: ${taskName}`)
  }
  process() {
    console.log(`Runner: execute: ${taskName}`)
  }
  dispose() {
    console.log(`Runner: dispose ${taskName}`)
  }
})

Thread.Main(async () => {
  const runner = Thread.Spawn(Runner, 'Name of Runner')
  await runner.process()
  await runner.dispose()
})

Channel

Use Thread.Channel<T>() to create a messaging channel to communicate between threads.

import { Thread, Sender, Receiver } from '@sinclair/threadbox'

const Numbers = Thread.Worker(class {
  start(sender: Sender<number>) {
    for(let i = 0; i < 1024; i++) {
        sender.send(i)
    }
  }
})

Thread.Main(async () => {
  const thread = Thread.Spawn(Numbers)
  const [sender, receiver] = Thread.Channel<number>()
  thread.start(sender)
  
  // await values on receiver
  for await(const value of receiver) {
    console.log(value)
  }

  await thread.dispose()
})

Marshal

Use Thread.Marshal(...) to denote a constructor should be marshalled across threads. This enables class instances to be transferred to remote threads for remote invocation.

import { Thread } from '@sinclair/threadbox'

const Transferrable = Thread.Marshal({
    method() {
        console.log('Hello World')
    }
})

const Worker = Thread.Worker({
    execute(transferable: Transferrable) {
        transferable.method() // callable
    }
}

Thread.Main(() => {
  const thread = spawn(Worker)
  const transferable = new Transferrable()
  await thread.execute(transferable)
  await thread.dispose()
})

Note: There is a serialization cost to marshaling. For performance, only Marshal when you need to dynamically move logic in and out of threads.

Mutex

Use Thread.Mutex(...) to create a lock on critical sections. This should only be used when two threads reference the same SharedArrayBuffer.

import { Thread, Mutex } from '@sinclair/threadbox'

const Worker = Thread.Worker(class {
  constructor(private readaonly mutex: Mutex) {}

  execute(data: Uint8Array, value: number) {
    this.mutex.lock()
    data[0] = value
    data[1] = value
    data[2] = value
    data[3] = value
    this.mutex.unlock()
  }
})

Thread.Main(async () => {

  const mutex = Thread.Mutex()

  const threads = [
    Thread.Spawn(Worker, mutex),
    Thread.Spawn(Worker, mutex),
    Thread.Spawn(Worker, mutex),
    Thread.Spawn(Worker, mutex)
  ]

  const shared = new Uint8Array(new SharedArrayBuffer(4 * Float32Array.BYTES_PER_ELEMENT))

  await Promise.all([
    threads[0].execute(shared)
    threads[1].execute(shared)
    threads[2].execute(shared)
    threads[3].execute(shared)
  ])

  await Promise.all([
    threads[0].dispose()
    threads[1].dispose()
    threads[2].dispose()
    threads[3].dispose()
  ])
})