Sign Up for Free

RunKit +

Try any Node.js package right in your browser

This is a playground to test code. It runs a full Node.js environment and already has all of npm’s 400,000 packages pre-installed, including elwt with all npm packages installed. Try it out:

var elwt = require("elwt")

This service is provided by RunKit and is not affiliated with npm, Inc or the package authors.

elwt v1.4.0

Worker threads pool manager

elwt

This module provides fast pool abstraction over new experimental Node.js worker threads API. I'm gonna implement more parallel paradigm stuff, like locks, semaphores, serialized handles etc.

Why elwt?

  • fully customizable
  • asynchronous API & interaction
  • supports correct usage of SharedArrayBuffer
  • can transfer most of datatypes
  • workers are reusable
  • customizable caching mechanism
  • automatic workers respawn
  • minimalistic

Installation

Simply cast npm i -S elwt to install module locally. After this you're free to import it.

Usage

Firstly, you must import module to start work with it. Module exports object of such structure:

  • class Pool: extends EventEmitter and is used to manage pool, tasks, workers, cache etc.
  • class Storage: extends Map and is used to manage workers and tasks storing
  • sync function templater: used to create workers wrapper code (see below for details)
  • class PoolWorker: extends Worker and is used to spawn new workers

Simply import Pool and use it's static asynchronous method spawnPool to create and preconfigure pool. It's not recommended to create pools by just casting new Pool(...) due to spawnPool computes some neccessary stuff to make pool workable. See API docs below for details. See examples section below for usage with explanation.

API

Below you can see default API containing of classes, their methods, public fields, advanced tools and relations between all of that. Feel free to extends any of classes to customize pool behavior. Here is also virtual Cache definition, which isn't provided as exportable class due to cache is disabled by default, but API defines it's format for you to implement your own cache storage if needed. Some descriptions use worker_threads.* notation as alias for require('worker_threads').*.

Classes

class Pool, extends EventEmitter: Main class of lib and is used to create and manage pools, to let workers execute tasks, to manage execution details. It's okay to create multiple pools if needed. Be sure to obtain correct manipulations made to memory due to it may be unsafe to keep in sync when working in parallel. Public methods and fields are:

  • spawnPool: static async function use it to create new pool. Accepts optional object containing following options:
    • size: optional positive number - indicates how much workers to create at pool's init. Pool is capable of dynamic size changes (see below for details). Defaults to require('os').cpus().length
    • queue: optional Storage - used to manage task queue. Defaults to what does storage.js export
    • free: optional Storage: used to manage free workers. Defaults to what does storage.js export
    • active: optional Storage - used to manage busy workers. Defaults to what does storage.js export
    • cache: optional boolean|Cache - used to manage cache. If falsy, then caching is turned off. Defaults to false
    • PoolWorker: optional PoolWorker - used as workers' constructor, being called explicitly. Defaults to what does worker.js export
    • templater: optional (): string - synchronous function, which is called to generate body for workers, which must be in some special format explained below. Defaults to what does templater.js export
    • roundRobin: optional (Iterable): Iterable - synchronous function, which is called to convert iterators to round Robin iterators. Defaults to tools.roundRobin
    • unitProps: optional object - arguments passed to original Worker constructor as is. unitProps.eval is always true independent of what do you pass. Defaults to {}
    • autoRespawn: optional boolean - whether to enable autorespawn of workers fired internal error event (internal here means not the one your code passed to pool fires, but the one fired by worker itself). Notice workers keep living after task is done so this option may be safely turned off if your implementation handles worker internal errors correctly. Defaults to true
  • constructor: function used seemlessly by spawnPool to create new pool. It's not recommended to use constructor explicitly. Pool created by calling constructor without of computations made by spawnPool is unusable. Accepts object same as one passed to spawnPool, except size and roundRobin aren't passed, buttemplate is passed instead of templater itself. template is result of calling templater. It's string containing JS code, which must react on message event fired by worker_threads.parentPort, considering next:
    • handler is asynchronous function and accepts required object containing arguments as follows:
      • action: string - defines action for worker to perform. See below for acceptable action types
      • port: worker_threads.MessagePort - used to reply to messages
      • fn: function - asynchronous function which must be executed. Notice it is executed inside dedicated context so import modules it uses inside it not outside
      • data: any - data passed to fn. Must be serialized if not of type SharedArrayBuffer. If data is object, then it's children of type SharedArrayBuffer will be shared, rest children of type ArryBuffer will be moved (follow Node worker_threads docs to learn more about sharing and moving of typed arrays between threads), rest must be serialized
      • raw: object - raw data passed to fn. Comprises key: SharedArrayBuffer pairs with SharedArrayBuffers derived from data, if any
    • handler must deserialize incoming data if it's not SharedArrayBuffer
    • handler must react to tools.actions.RUN action with fn execution
    • handler must call port.postMessage passing object structured as follows:
      • action: string - defines type of response to send. See below for acceptable action types
      • result: string - any data to return alongside. Non-string data must be serialized before. ArrayBuffer aren't moved or shared, and disallowed as value
      • error: optional boolean - must be true if action is tools.actions.ERROR
    • handler must close port explicitly after port.postMessage is called
    • action must be tools.actions.DONE if fn is executed successfully
    • action must be tools.actions.ERROR if fn execution throws
  • toSize: async function used to dynamically change the size of pool. Accepts following options:
    • size: number - new size to set. If new size is less than old one, then redundant workers will be safely terminated after finishing their current activities, if ones. If new size is greater than old one, then new workers will be created automatically
  • exec: async function used to enqueue task execution. Pool will automatically choose worker to execute task once one is available. Accepts following options:
    • fn: async function - function to execute in thread
    • data: optional any - data to pass to fn. If object then SharedArrayBuffers inside are shared and rest ArrayBuffers are moved
    • optional object containing following additional options:
      • respawn: number - how much times to retry task execution if one has failed
  • size: number - current pool's size

Private methods and fields are:

  • addUnit: async function - used to create new worker, to define event listeners for and to storage new worker to storage
  • activateUnit: async function - used to move worker from storage of free ones to storage of busy ones
  • loadUnit: async function - used to load free worker or to await for free one and load it when it's free
  • loadTask: async function - used to load task from queue if there are ones if await for one if not
  • next: async function - used to prepare execution environment, to set task-related listeners for worker and to send task to thread
  • _execCached: async function - used as replacement for exec if caching is enabled to omit checks overhead if it's disabled

class Storage, extends Map: Used to manage workers and tasks. Different storages are created separately for free workers, for busy ones and for tasks. Default storage provides asynchronous wrappers over next Map methods: clear, delete, has, set. Class which extends Storage must be provide Iterable with at least that set of methods.

class PoolWorker, extends Worker: Used to create workers. eval option passed to Worker constructor always equals to true.

Examples

First of all, you have to import Pool class someway. Then call spawnPool on it to get new pool. After one is ready you're free to asynchronously cast .exec(fn, data, { respawn: N }) and wait for Promise in response. fn will be called by free worker, or queued if there are no ones:

const Pool = require('elwt').Pool;
let swarm = await Pool.spawnPool({ size: require('os').cpus().length });
console.log(await swarm.exec(async (input) => {
    return input * 2;
}, 21));    //  logs 42

Data may be of any type, but notice it transfers typed arrays and shares shared ones. This means if you pass typed array as data or data children, it will become unaccessible on emitter side, but listener will be able to catch it immediately. This also means you're free to use SharedArrayBuffer without of copying or moving of data.

let shared = new SharedArrayBuffer(2 * Int32Array.BYTES_PER_ELEMENT);
await swarm.exec(async (input) => {
    let view = new Int32Array(input);
    view[0] = 21;
}, shared);
await swarm.exec(async (input) => {
    let view = new Int32Array(input);
    view[1] = view[0] * 2;
}, shared);
let response = await swarmc.exec(async (input) => {
    let view = new Int32Array(input);
    return view[1];
}, shared);
console.log(response);  //  logs 42

Size of pool is changeable anytime:

await swarm.toSize(32); //  ok
await swarm.toSize(-5); //  ok, will be set to 0
await swarm.toSize(0); //  ok
await swarm.toSize(Infinity); //  ok, will be set to Number.MAX_SAFE_INTEGER - 1

One more thing

Feel free to contribute and participate! Feel free to open an issue or fork or PR. Feel free to use this module anywhere considering there is still original LICENSE.md and link to it inside package.json

TODO Bnaya/objectbuffer

Metadata

RunKit is a free, in-browser JavaScript dev environment for prototyping Node.js code, with every npm package installed. Sign up to share your code.
Sign Up for Free