Home

Awesome

piscina - the node.js worker pool

CI

Written in TypeScript.

For Node.js 12.x and higher.

MIT Licensed.

Piscina API

Example

In main.js:

const Piscina = require('piscina');

const piscina = new Piscina({
  filename: path.resolve(__dirname, 'worker.js')
});

(async function() {
  const result = await piscina.runTask({ a: 4, b: 6 });
  console.log(result);  // Prints 10
})();

In worker.js:

module.exports = ({ a, b }) => {
  return a + b;
};

The worker may also be an async function or may return a Promise:

const { promisify } = require('util');
const sleep = promisify(setTimeout);

module.exports = async ({ a, b } => {
  // Fake some async activity
  await sleep(100);
  return a + b;
})

ESM is also supported for both Piscina and workers:

import { Piscina } from 'piscina';

const piscina = new Piscina({
  // The URL must be a file:// URL
  filename: new URL('./worker.mjs', import.meta.url).href
});

(async function () {
  const result = await piscina.runTask({ a: 4, b: 6 });
  console.log(result); // Prints 10
})();

In worker.mjs:

export default ({ a, b }) => {
  return a + b;
};

Cancelable Tasks

Submitted tasks may be canceled using either an AbortController or an EventEmitter:

'use strict';

const Piscina = require('piscina');
const { AbortController } = require('abort-controller');
const { resolve } = require('path');

const piscina = new Piscina({
  filename: resolve(__dirname, 'worker.js')
});

(async function() {
  const abortController = new AbortController();
  try {
    const task = piscina.runTask({ a: 4, b: 6 }, abortController.signal);
    abortController.abort();
    await task;
  } catch (err) {
    console.log('The task was canceled');
  }
})();

To use AbortController, you will need to npm i abort-controller (or yarn add abort-controller).

Alternatively, any EventEmitter that emits an 'abort' event may be used as an abort controller:

'use strict';

const Piscina = require('piscina');
const EventEmitter = require('events');
const { resolve } = require('path');

const piscina = new Piscina({
  filename: resolve(__dirname, 'worker.js')
});

(async function() {
  const ee = new EventEmitter();
  try {
    const task = piscina.runTask({ a: 4, b: 6 }, ee);
    ee.emit('abort');
    await task;
  } catch (err) {
    console.log('The task was canceled');
  }
})();

Delaying Availability of Workers

A worker thread will not be made available to process tasks until Piscina determines that it is "ready". By default, a worker is ready as soon as Piscina loads it and acquires a reference to the exported handler function.

There may be times when the availability of a worker may need to be delayed longer while the worker initializes any resources it may need to operate. To support this case, the worker module may export a Promise that resolves the handler function as opposed to exporting the function directly:

async function initialize() {
  await someAsyncInitializationActivity();
  return ({ a, b }) => a + b;
}

module.exports = initialize();

Piscina will await the resolution of the exported Promise before marking the worker thread available.

Backpressure

When the maxQueue option is set, once the Piscina queue is full, no additional tasks may be submitted until the queue size falls below the limit. The 'drain' event may be used to receive notification when the queue is empty and all tasks have been submitted to workers for processing.

Example: Using a Node.js stream to feed a Piscina worker pool:

'use strict';

const { resolve } = require('path');
const Pool = require('../..');

const pool = new Pool({
  filename: resolve(__dirname, 'worker.js'),
  maxQueue: 'auto'
});

const stream = getStreamSomehow();
stream.setEncoding('utf8');

pool.on('drain', () => {
  if (stream.isPaused()) {
    console.log('resuming...', counter, pool.queueSize);
    stream.resume();
  }
});

stream
  .on('data', (data) => {
    pool.runTask(data);
    if (pool.queueSize === pool.options.maxQueue) {
      console.log('pausing...', counter, pool.queueSize);
      stream.pause();
    }
  })
  .on('error', console.error)
  .on('end', () => {
    console.log('done');
  });

Additional Examples

Additional examples can be found in the GitHub repo at https://github.com/jasnell/piscina/tree/master/examples

Class: Piscina

Piscina works by creating a pool of Node.js Worker Threads to which one or more tasks may be dispatched. Each worker thread executes a single exported function defined in a separate file. Whenever a task is dispatched to a worker, the worker invokes the exported function and reports the return value back to Piscina when the function completes.

This class extends EventEmitter from Node.js.

Constructor: new Piscina([options])

Use caution when setting resource limits. Setting limits that are too low may result in the Piscina worker threads being unusable.

Method: runTask(task[, transferList][, filename][, abortSignal])

Schedules a task to be run on a Worker thread.

This returns a Promise for the return value of the (async) function call made to the function exported from filename. If the (async) function throws an error, the returned Promise will be rejected with that error. If the task is aborted, the returned Promise is rejected with an error as well.

Method: destroy()

Stops all Workers and rejects all Promises for pending tasks.

This returns a Promise that is fulfilled once all threads have stopped.

Event: 'error'

An 'error' event is emitted by instances of this class when:

All other errors are reported by rejecting the Promise returned from runTask(), including rejections reported by the handler function itself.

Event: 'drain'

A 'drain' event is emitted whenever the queueSize reaches 0.

Property: completed (readonly)

The current number of completed tasks.

Property: duration (readonly)

The length of time (in milliseconds) since this Piscina instance was created.

Property: options (readonly)

A copy of the options that are currently being used by this instance. This object has the same properties as the options object passed to the constructor.

Property: runTime (readonly)

A histogram summary object summarizing the collected run times of completed tasks. All values are expressed in milliseconds.

All properties following the pattern p{N} where N is a number (e.g. p1, p99) represent the percentile distributions of run time observations. For example, p99 is the 99th percentile indicating that 99% of the observed run times were faster or equal to the given value.

{
  average: 1880.25,
  mean: 1880.25,
  stddev: 1.93,
  min: 1877,
  max: 1882.0190887451172,
  p0_001: 1877,
  p0_01: 1877,
  p0_1: 1877,
  p1: 1877,
  p2_5: 1877,
  p10: 1877,
  p25: 1877,
  p50: 1881,
  p75: 1881,
  p90: 1882,
  p97_5: 1882,
  p99: 1882,
  p99_9: 1882,
  p99_99: 1882,
  p99_999: 1882
}

Property: threads (readonly)

An Array of the Worker instances used by this pool.

Property: queueSize (readonly)

The current number of tasks waiting to be assigned to a Worker thread.

Property: utilization (readonly)

A point-in-time ratio comparing the approximate total mean run time of completed tasks to the total runtime capacity of the pool.

A pools runtime capacity is determined by multiplying the duration by the options.maxThread count. This provides an absolute theoretical maximum aggregate compute time that the pool would be capable of.

The approximate total mean run time is determined by multiplying the mean run time of all completed tasks by the total number of completed tasks. This number represents the approximate amount of time the pool as been actively processing tasks.

The utilization is then calculated by dividing the approximate total mean run time by the capacity, yielding a fraction between 0 and 1.

Property: waitTime (readonly)

A histogram summary object summarizing the collected times tasks spent waiting in the queue. All values are expressed in milliseconds.

All properties following the pattern p{N} where N is a number (e.g. p1, p99) represent the percentile distributions of wait time observations. For example, p99 is the 99th percentile indicating that 99% of the observed wait times were faster or equal to the given value.

{
  average: 1880.25,
  mean: 1880.25,
  stddev: 1.93,
  min: 1877,
  max: 1882.0190887451172,
  p0_001: 1877,
  p0_01: 1877,
  p0_1: 1877,
  p1: 1877,
  p2_5: 1877,
  p10: 1877,
  p25: 1877,
  p50: 1881,
  p75: 1881,
  p90: 1882,
  p97_5: 1882,
  p99: 1882,
  p99_9: 1882,
  p99_99: 1882,
  p99_999: 1882
}

Static property: isWorkerThread (readonly)

Is true if this code runs inside a Piscina threadpool as a Worker.

Static property: version (readonly)

Provides the current version of this library as a semver string.

Static method: move(value)

By default, any value returned by a worker function will be cloned when returned back to the Piscina pool, even if that object is capable of being transfered. The Piscina.move() method can be used to wrap and mark transferable values such that they will by transfered rather than cloned.

The value may be any object supported by Node.js to be transferable (e.g. ArrayBuffer, any TypedArray, or MessagePort), or any object implementing the Transferable interface.

const { move } = require('piscina');

module.exports = () => {
  return move(new ArrayBuffer(10));
}

The move() method will throw if the value is not transferable.

The object returned by the move() method should not be set as a nested value in an object. If it is used, the move() object itself will be cloned as opposed to transfering the object it wraps.

Interface: Transferable

Objects may implement the Transferable interface to create their own custom transferable objects. This is useful when an object being passed into or from a worker contains a deeply nested transferable object such as an ArrayBuffer or MessagePort.

Transferable objects expose two properties inspected by Piscina to determine how to transfer the object. These properties are named using the special static Piscina.transferableSymbol and Piscina.valueSymbol properties:

Both properties are required.

For example,

const {
  move,
  transferableSymbol,
  valueSymbol
} = require('piscina');

module.exports = () => {
  const obj = {
    a: { b: new Uint8Array(5); },
    c: { new Uint8Array(10); },

    get [transferableSymbol]() {
      // Transfer the two underlying ArrayBuffers
      return [this.a.b.buffer, this.c.buffer];
    }

    get [valueSymbol]() {
      return { a: { b: this.b }, c: this.c };
    }
  };
  return move(obj);
};

Custom Task Queues

By default, Piscina uses a simple array-based first-in-first-out (fifo) task queue. When a new task is submitted and there are no available workers, tasks are pushed on to the queue until a worker becomes available.

If the default fifo queue is not sufficient, user code may replace the task queue implementation with a custom implementation using the taskQueue option on the Piscina constructor.

Custom task queue objects must implement the TaskQueue interface, described below using TypeScript syntax:

interface Task {
  readonly [Piscina.queueOptionsSymbol] : object | null;
}

interface TaskQueue {
  readonly size : number;
  shift () : Task | null;
  remove (task : Task) : void;
  push (task : Task) : void;
}

An example of a custom task queue that uses a shuffled priority queue is available in examples/task-queue;

The special symbol Piscina.queueOptionsSymbol may be set as a property on tasks submitted to runTask() as a way of passing additional options on to the custom TaskQueue implementation. (Note that because the queue options are set as a property on the task, tasks with queue options cannot be submitted as JavaScript primitives).

Current Limitations (Things we're working on / would love help with)

Performance Notes

Workers are generally optimized for offloading synchronous, compute-intensive operations off the main Node.js event loop thread. While it is possible to perform asynchronous operations and I/O within a Worker, the performance advantages of doing so will be minimal.

Specifically, it is worth noting that asynchronous operations within Node.js, including I/O such as file system operations or CPU-bound tasks such as crypto operations or compression algorithms, are already performed in parallel by Node.js and libuv on a per-process level. This means that there will be little performance impact on moving such async operations into a Piscina worker (see examples/scrypt for example).

Queue Size

Piscina provides the ability to configure the minimum and maximum number of worker threads active in the pool, as well as set limits on the number of tasks that may be queued up waiting for a free worker. It is important to note that setting the maxQueue size too high relative to the number of worker threads can have a detrimental impact on performance and memory usage. Setting the maxQueue size too small can also be problematic as doing so could cause your worker threads to become idle and be shutdown. Our testing has shown that a maxQueue size of approximately the square of the maximum number of threads is generally sufficient and performs well for many cases, but this will vary significantly depending on your workload. It will be important to test and benchmark your worker pools to ensure you've effectively balanced queue wait times, memory usage, and worker pool utilization.

Queue Pressure and Idle Threads

The thread pool maintained by Piscina has both a minimum and maximum limit to the number of threads that may be created. When a Piscina instance is created, it will spawn the minimum number of threads immediately, then create additional threads as needed up to the limit set by maxThreads. Whenever a worker completes a task, a check is made to determine if there is additional work for it to perform. If there is no additional work, the thread is marked idle. By default, idle threads are shutdown immediately, with Piscina ensuring that the pool always maintains at least the minimum.

When a Piscina pool is processing a stream of tasks (for instance, processing http server requests as in the React server-side rendering example in examples/react-ssr), if the rate in which new tasks are received and queued is not sufficient to keep workers from going idle and terminating, the pool can experience a thrashing effect -- excessively creating and terminating workers that will cause a net performance loss. There are a couple of strategies to avoid this churn:

Strategy 1: Ensure that the queue rate of new tasks is sufficient to keep workers from going idle. We refer to this as "queue pressure". If the queue pressure is too low, workers will go idle and terminate. If the queue pressure is too high, tasks will stack up, experience increased wait latency, and consume additional memory.

Strategy 2: Increase the idleTimeout configuration option. By default, idle threads terminate immediately. The idleTimeout option can be used to specify a longer period of time to wait for additional tasks to be submitted before terminating the worker. If the queue pressure is not maintained, this could result in workers sitting idle but those will have less of a performance impact than the thrashing that occurs when threads are repeatedly terminated and recreated.

Strategy 3: Increase the minThreads configuration option. This has the same basic effect as increasing the idleTimeout. If the queue pressure is not high enough, workers may sit idle indefinitely but there will be less of a performance hit.

In applications using Piscina, it will be most effective to use a combination of these three approaches and tune the various configuration parameters to find the optimum combination both for the application workload and the capabilities of the deployment environment. There are no one set of options that are going to work best.

Thread priority on Linux systems

On Linux systems that support nice(2), Piscina is capable of setting the priority of every worker in the pool. To use this mechanism, an additional optional native addon dependency (nice-napi, npm i nice-napi) is required. Once nice-napi is installed, creating a Piscina instance with the niceIncrement configuration option will set the priority for the pool:

const Piscina = require('piscina');
const pool = new Piscina({
  worker: '/absolute/path/to/worker.js',
  niceIncrement: 20
});

The higher the niceIncrement, the lower the CPU scheduling priority will be for the pooled workers which will generally extend the execution time of CPU-bound tasks but will help prevent those threads from stealing CPU time from the main Node.js event loop thread. Whether this is a good thing or not depends entirely on your application and will require careful profiling to get correct.

The key metrics to pay attention to when tuning the niceIncrement are the sampled run times of the tasks in the worker pool (using the runTime property) and the delay of the Node.js main thread event loop.

Multiple Thread Pools and Embedding Piscina as a Dependency

Every Piscina instance creates a separate pool of threads and operates without any awareness of the other. When multiple pools are created in a single application the various threads may contend with one another, and with the Node.js main event loop thread, and may cause an overall reduction in system performance.

Modules that embed Piscina as a dependency should make it clear via documentation that threads are being used. It would be ideal if those would make it possible for users to provide an existing Piscina instance as a configuration option in lieu of always creating their own.

Release Notes

1.6.1

1.6.0

1.5.1

1.5.0

1.4.0

1.3.0

1.2.0

1.1.0

1.0.0

The Team

Acknowledgements

Piscina development is sponsored by NearForm Research.