Home

Awesome

Build Status

About

Each is a single elegant function to iterate over values both in sequential, parallel, and concurrent mode. It is a powerful and mature library.

Main functionalities include:

Getting started

Installation

Use your favorite package manager to install the each package:

npm install each

With ESM:

import each from "each";

With CommonJS:

const each = require("each");

Simple example

In its simplest form, Each is used as a single function, a bit like Promise.all or Promise.allSettled but, arguably, with more flexibility and easier to read.

This example defines list 3 items to process along the concurrency level and a function hander as it last argument:

const stack = [];
const result = await each(
  [
    { message: "Is", timeout: 30 },
    { message: "Gollum", timeout: 20 },
    { message: "Around", timeout: 10 },
  ],
  { concurrency: true },
  ({ message, timeout }) =>
    new Promise((resolve) =>
      setTimeout(() => stack.push(message) && resolve(message), timeout),
    ),
);

assert.equal(result.join(" "), "Is Gollum Around");
assert.equal(stack.join(" "), "Around Gollum Is");

It is equivalent to passing items as functions without a function handler and with the concurrency level defined as true.

const stack = [];
const result = await each(
  [
    () =>
      new Promise((resolve) => {
        setTimeout(() => stack.push("Is") && resolve("Is"), 30);
      }),
    () =>
      new Promise((resolve) => {
        setTimeout(() => stack.push("Gollum") && resolve("Gollum"), 20);
      }),
    () =>
      new Promise((resolve) => {
        setTimeout(() => stack.push("Around") && resolve("Around"), 10);
      }),
  ],
  true,
);

assert.equal(result.join(" "), "Is Gollum Around");
assert.equal(stack.join(" "), "Around Gollum Is");

Advanced usage

In its advanced form, Each is a scheduler with advanced functionalities to control the execution process.

const scheduler = each({ concurrency: true });
const result = await Promise.all([
  scheduler.call([
    () => new Promise((resolve) => resolve(1)),
    () => new Promise((resolve) => resolve(2)),
  ]),
  scheduler.call([
    () => new Promise((resolve) => resolve(3)),
    () => new Promise((resolve) => resolve(4)),
  ]),
]);
assert.deepStrictEqual(result, [
  [1, 2],
  [3, 4],
]);

Usage

Initialisation

Signature is each(...[items|options|concurrency|handler]).

All arguments are optional and can be defined in any order.

Multiple items (arrays) are merged. Muliple options (objects) are merged as well.

Options

Functions

Iteration

Resolution order

Output order is consistent with input order. The value returned by a function or resolved by a promise is always returned in the same position as it was originally defined.

Iteration with any type of values

Another type is returned as is unless a handler function is defined.

Each iterates over any type of item. If no handler is defined, functions and and promises get a special treatment. Functions are executed and may return a promise and promises are resolved.

Here is a quick example:

const result = await each([
  // Item is a value
  "a",
  // Item is a function
  () => new Promise((resolve) => resolve("b")),
  // Item is a promise
  new Promise((resolve) => resolve("c")),
]);

assert.deepStrictEqual(result, ["a", "b", "c"]);

Note, in the majority of cases, items (arrays) which do not contain functions and promises are handled with a handler function.

Iteration over a list of functions

Functions are executed. Each handles both synchronous and asynchronous functions. In the latter case, functions return a Promise and Each wait for their resolution.

Here are various ways to declare functions:

const result = await each([
  // Synchronous function
  function () {
    return "a";
  },
  // Synchronous function with the fat arrow syntax
  () => "b",
  // Asynchronous function
  () => new Promise((resolve) => resolve("c")),
  // Asynchronous function which resolves after some delay
  () => new Promise((resolve) => setTimeout(() => resolve("d")), 100),
]);

assert.deepStrictEqual(result, ["a", "b", "c", "d"]);

Iteration over a list of promises

Each wait for all promises to be resolved before returning their result. Just like with Promise.all, result orders respect registration orders.

const result = await each([
  // Instant resolution
  new Promise((resolve) => resolve("a")),
  // Delayed resolution
  new Promise((resolve) => setTimeout(() => resolve("b")), 100),
  // Instant resolution
  new Promise((resolve) => resolve("c")),
]);

assert.deepStrictEqual(result, ["a", "b", "c"]);

Synchronous and asynchronous functions

A function can be an item to iterate or defined with the handler option. In both cases, the behavior is the same.

A function defined as an item:

console.info(await each([() => 1]));

A function handling an item:

console.info(await each([1], (item) => item));

Handlers are called with the item as the first argument and the index number as the second argument.

Synchronous functions return a value. Asynchronous functions return a Promise.

Here is a synchronous handler function:

const result = await each(
  [{ id: "a" }, { id: "b" }, { id: "c" }, { id: "d" }],
  (item, index) => `${item.id}@${index}`,
);

assert.deepStrictEqual(result, ["a@0", "b@1", "c@2", "d@3"]);

Here is an asynchronous handler function:

const result = await each(
  [{ id: "a" }, { id: "b" }, { id: "c" }, { id: "d" }],
  (item, index) =>
    new Promise((resolve) => setTimeout(resolve(`${item.id}@${index}`), 100)),
);

assert.deepStrictEqual(result, ["a@0", "b@1", "c@2", "d@3"]);

Concurrency modes

Sequential mode (default)

When the concurrent option is undefined, false, or 1, items are executed in order one after the other.

let running = 0;
const result = await each(
  [{ id: "a" }, { id: "b" }, { id: "c" }, { id: "d" }],
  function (item, index) {
    running++;
    if (running !== 1) {
      throw Error("Invalid execution");
    }
    return new Promise((resolve) =>
      setTimeout(() => {
        if (running !== 1) {
          throw Error("Invalid execution");
        }
        running--;
        resolve(`${item.id}@${index}`);
      }, 100),
    );
  },
);

assert.deepStrictEqual(result, ["a@0", "b@1", "c@2", "d@3"]);

Parallel mode

When the concurrent option is true or -1, items are all scheduled at the same time and run in parallel.

let running = 0;
const result = await each(
  [{ id: "a" }, { id: "b" }, { id: "c" }, { id: "d" }],
  true,
  function (item, index) {
    if (running !== index) {
      throw Error("Invalid execution");
    }
    running++;
    return new Promise((resolve) =>
      setTimeout(() => {
        if (running !== 4 - index) {
          throw Error("Invalid execution");
        }
        running--;
        resolve(`${item.id}@${index}`);
      }, 100),
    );
  },
);

assert.deepStrictEqual(result, ["a@0", "b@1", "c@2", "d@3"]);

Concurrent mode

When the concurrent mode is a value above 1, the number of items running simultaneously is bounded to the concurrent value.

let running = 0;
const result = await each(
  [{ id: "a" }, { id: "b" }, { id: "c" }, { id: "d" }],
  2,
  function (item, index) {
    running++;
    if (running > 2) {
      throw Error("At most 2 running tasks");
    }
    return new Promise((resolve, reject) =>
      setTimeout(() => {
        running--;
        if (running > 2) {
          reject(Error("At most 2 running tasks"));
        } else {
          resolve(`${item.id}@${index}`);
        }
      }, 100),
    );
  },
);

assert.deepStrictEqual(result, ["a@0", "b@1", "c@2", "d@3"]);

Manual throttling

Use pause and resume functions to throttle the iteration.

The pause option defines the initial status. Its value defaults to false.

On pause, executed functions pursue their execution and no further function is scheduled for execution.

When the iteration's state is paused, new scheduled items will not resolve the returned promise until the iteration is resumed.

let state = "paused";
const scheduler = each({ pause: true });
scheduler.then(() => assert.deepStrictEqual(state, "resumed"));
setTimeout(() => {
  state = "resumed";
  scheduler.resume();
}, 100);

The resume and end methods return a promise that resolves once all the element's executions are complete. This is an example using the resume function.

const stack = [];
const scheduler = each({ pause: true });
scheduler.call(
  () =>
    new Promise((resolve) => {
      stack.push(1);
      resolve();
    }),
);
scheduler.call(
  () =>
    new Promise((resolve) => {
      stack.push(2);
      resolve();
    }),
);
setTimeout(async () => {
  // Before resume, not processing occurs
  assert.deepStrictEqual(stack, []);
  // Resume and wait for execution
  await scheduler.resume();
  // After resume, every element was processed
  assert.deepStrictEqual(stack, [1, 2]);
}, 100);

Dealing with errors

Iterations are stopped on error.

With synchronous functions or when the concurrency mode is sequential, it behaves like Promise.all. On error, no additionnal function is scheduled for execution and the returned promise is rejected.

With asynchronous functions executed concurrently, no additional functions are scheduled. Already executed functions resolves or rejects their promise but the result is discarded.

Whether the items array is provided at initialization or with the call function, the behavior is the same:

try {
  await each(2).call([
    () => new Promise((resolve) => setImmediate(() => resolve("ok"))),
    () =>
      new Promise((resolve, reject) =>
        setImmediate(() => reject(Error("Catchme"))),
      ),
    () => new Promise((resolve) => setImmediate(() => resolve("ok"))),
  ]);
} catch (error) {
  assert.equal(error.message, "Catchme");
}

API concurrency

concurrency([level])

It defines the number of items to be executed in parallel. The new level takes effect for all new scheduled items. Previously scheduled items are unaffected.

Calling the concurrency function change the number of items executed in parrallel. Previously scheduled items are not affected. Only the items scheduled after calling the concurrency function will honor the new value.

This example change the concurrency level. The first 3 items are executed in parallel and the next 3 items are executed sequentially.

import assert from "assert";
import each from "each";

const history = [];
const handler = (id) => {
  history.push(`${id}:start`);
  return new Promise((resolve) =>
    setTimeout(() => {
      history.push(`${id}:end`);
      resolve();
    }, 20),
  );
};

const scheduler = each(-1);
// Schedule parallel execution
scheduler.call(() => handler(1));
scheduler.call(() => handler(2));
// Change the concurrency level
scheduler.concurrency(1);
// Schedule sequential execution
scheduler.call(() => handler(4));
scheduler.call(() => handler(5));
// Wait for completion
await scheduler.end();

assert.deepStrictEqual(history, [
  // Parallel execution
  "1:start",
  "2:start",
  "1:end",
  "2:end",
  // Sequential execution
  "4:start",
  "4:end",
  "5:start",
  "5:end",
]);

API end

end([error|options])

Close the scheduler. The returned promise waits for all previously scheduled items to resolve.

No further items are allowed to register with call. In such case, the returned promise is rejected. When end is called and the scheduler is in paused state, all paused items are resolved with undefined or an error if any.

This example wait for the completion of two scheduled items before completion.

import assert from "assert";
import each from "each";

const history = [];
const handler = (id) => {
  return new Promise((resolve) =>
    setTimeout(() => {
      history.push(`${id}:end`);
      resolve();
    }, 20),
  );
};

const scheduler = each(-1);
// Schedule parallel execution
scheduler.call(() => handler(1));
scheduler.call(() => handler(2));
// Wait for completion
await scheduler.end();

assert.deepStrictEqual(history, ["1:end", "2:end"]);

Option fluent

The fluent option applies when using the each().call function. By default, it is enabled. The API is designed to allow multiple calls to be chained where the value of the last call is returned:

const result = await each()
  .call(() => new Promise((resolve) => resolve(1)))
  .call(() => new Promise((resolve) => resolve(2)))
  .call(() => new Promise((resolve) => resolve(3)));

assert.strictEqual(result, 3);

The returned promise is enriched with the same functions as the promise returned by each(), thus exposing the each API.

Set the fluent option to false to not overload the returned promise with the each API:

const promise = each({ fluent: false }).call(
  () => new Promise((resolve) => resolve(1)),
);

assert.strictEqual(promise.call, undefined);
assert.strictEqual(promise.options, undefined);

Option pause

The pause set the initial mode of the scheduler. It is false by default. Setting the scheduler in pause mode implies calling resume to start the execution.

Option relax

When the relax option is active, the internal scheduler permits the registration of new items with the call function even after an error.

It doesn't affect the processing of an items list. An error while handling one of the items prevents additionnal execution and rejects the items' promise. However, it provides the ability to register and execute new items with call.

This is an example with the default behavior:

const scheduler = each();
const prom1 = scheduler.call(() => new Promise((resolve) => resolve(1)));
const prom2 = scheduler.call(() => new Promise((resolve, reject) => reject(2)));
const prom3 = scheduler.call(() => new Promise((resolve) => resolve(3)));

const result = await Promise.allSettled([prom1, prom2, prom3]);
assert.deepStrictEqual(result, [
  { status: "fulfilled", value: 1 },
  { status: "rejected", reason: 2 },
  { status: "rejected", reason: 2 },
]);

This is an example with the relax option in action:

const scheduler = each({ relax: true });
const prom1 = scheduler.call(() => new Promise((resolve) => resolve(1)));
const prom2 = scheduler.call(() => new Promise((resolve, reject) => reject(2)));
const prom3 = scheduler.call(() => new Promise((resolve) => resolve(3)));

const result = await Promise.allSettled([prom1, prom2, prom3]);
assert.deepStrictEqual(result, [
  { status: "fulfilled", value: 1 },
  { status: "rejected", reason: 2 },
  { status: "fulfilled", value: 3 },
]);

Developers

Tests are executed with Mocha. To install the mocha package and its dependencies, run npm install.

npm run test
# Or
yarn run test

To automatically generate a new version and publish it:

yarn run release

Package publication is handled by the CI/CD with GitHub action.

History