Home

Awesome

support support <a href="https://ostr.io/info/built-by-developers-for-developers?ref=github-josk-repo-top"><img src="https://ostr.io/apple-touch-icon-60x60.png" height="20"></a> <a href="https://meteor-files.com/?ref=github-josk-repo-top"><img src="https://meteor-files.com/apple-touch-icon-60x60.png" height="20"></a>

JoSk

"JoSk" is a Node.js task manager for horizontally scaled apps and apps that would need to scale horizontally quickly at some point of growth.

"JoSk" mimics the native API of setTimeout and setInterval and supports CRON expressions. All queued tasks are synced between all running application instances via Redis, MongoDB, or a custom adapter.

The "JoSk" package is made for a variety of horizontally scaled apps, such as clusters, multi-servers, and multi-threaded Node.js instances, that are running either on the same or different machines or even different data centers. "JoSk ensures that the only single execution of each task occurs across all running instances of the application.

"JoSk" is not just for multi-instance apps. It seamlessly integrates with single-instance applications as well, showcasing its versatility and adaptability.

Note: JoSk is the server-only package.

ToC

Main features

Prerequisites

Older releases compatibility

Install:

npm install josk --save
// ES Module Style
import { JoSk, RedisAdapter, MongoAdapter } from 'josk';

// CommonJS
const { JoSk, RedisAdapter, MongoAdapter } = require('josk');

API:

Constructor options for JoSK, MongoAdapter, and RedisAdapter

new JoSk(opts)

new RedisAdapter(opts)

Since v5.0.0

new MongoAdapter(opts)

Since v5.0.0

Initialization

JoSk is storage-agnostic (since v4.0.0). It's shipped with Redis and MongoDB "adapters" out of the box, with option to extend its capabilities by creating and passing a custom adapter

Redis Adapter

JoSk has no dependencies, hence make sure redis NPM package is installed in order to support Redis Storage Adapter. RedisAdapter utilize basic set of commands SET, GET, DEL, EXISTS, HSET, HGETALL, and SCAN. RedisAdapter is compatible with all Redis-alike databases, and was well-tested with Redis and KeyDB

import { JoSk, RedisAdapter } from 'josk';
import { createClient } from 'redis';

const redisClient = await createClient({
  url: 'redis://127.0.0.1:6379'
}).connect();

const jobs = new JoSk({
  adapter: new RedisAdapter({
    client: redisClient,
    prefix: 'app-scheduler',
  }),
  onError(reason, details) {
    // Use onError hook to catch runtime exceptions
    // thrown inside scheduled tasks
    console.log(reason, details.error);
  }
});

MongoDB Adapter

JoSk has no dependencies, hence make sure mongodb NPM package is installed in order to support MongoDB Storage Adapter. Note: this package will add two new MongoDB collections per each new JoSk(). One collection for tasks and second for "Read Locking" with .lock suffix

import { JoSk, MongoAdapter } from 'josk';
import { MongoClient } from 'mongodb';

const client = new MongoClient('mongodb://127.0.0.1:27017');
// To avoid "DB locks" — it's a good idea to use separate DB from the "main" DB
const mongoDb = client.db('joskdb');
const jobs = new JoSk({
  adapter: new MongoAdapter({
    db: mongoDb,
    prefix: 'cluster-scheduler',
  }),
  onError(reason, details) {
    // Use onError hook to catch runtime exceptions
    // thrown inside scheduled tasks
    console.log(reason, details.error);
  }
});

Create the first task

After JoSk initialized simply call JoSk#setInterval to create recurring task

const jobs = new JoSk({ /*...*/ });

jobs.setInterval((ready) => {
  /* ...code here... */
  ready();
}, 60 * 60000, 'task1h'); // every hour

jobs.setInterval((ready) => {
  /* ...code here... */
  asyncCall(() => {
    /* ...more code here...*/
    ready();
  });
}, 15 * 60000, 'asyncTask15m'); // every 15 mins

/**
 * no need to call ready() inside async function
 */
jobs.setInterval(async () => {
  try {
    await asyncMethod();
  } catch (err) {
    console.log(err)
  }
}, 30 * 60000, 'asyncAwaitTask30m'); // every 30 mins

/**
 * no need to call ready() when call returns Promise
 */
jobs.setInterval(() => {
  return asyncMethod(); // <-- returns Promise
}, 2 * 60 * 60000, 'asyncAwaitTask2h'); // every two hours

Note: This library relies on job ID. Always use different uid, even for the same task:

const task = function (ready) {
  //... code here
  ready();
};

jobs.setInterval(task, 60000, 'task-1m'); // every minute
jobs.setInterval(task, 2 * 60000, 'task-2m'); // every two minutes

setInterval(func, delay, uid)

Set task into interval execution loop. ready() callback is passed as the first argument into a task function.

In the example below, the next task will not be scheduled until the current is ready:

jobs.setInterval(function (ready) {
  /* ...run sync code... */
  ready();
}, 60 * 60000, 'syncTask1h'); // will execute every hour + time to execute the task

jobs.setInterval(async function () {
  try {
    await asyncMethod();
  } catch (err) {
    console.log(err)
  }
}, 60 * 60000, 'asyncAwaitTask1h'); // will execute every hour + time to execute the task

In the example below, the next task will not wait for the current task to finish:

jobs.setInterval(function (ready) {
  ready();
  /* ...run sync code... */
}, 60 * 60000, 'syncTask1h'); // will execute every hour

jobs.setInterval(async function () {
  /* ...task re-scheduled instantly here... */
  process.nextTick(async () => {
    await asyncMethod();
  });
}, 60 * 60000, 'asyncAwaitTask1h'); // will execute every hour

In the next example, a long running task is executed in a loop without delay after the full execution:

jobs.setInterval(function (ready) {
  asyncCall((error, result) => {
    if (error) {
      ready(); // <-- Always run `ready()`, even if call was unsuccessful
    } else {
      anotherCall(result.data, ['param'], (error, response) => {
        if (error) {
          ready(); // <-- Always run `ready()`, even if call was unsuccessful
          return;
        }

        waitForSomethingElse(response, () => {
          ready(); // <-- End of the full execution
        });
      });
    }
  });
}, 0, 'longRunningAsyncTask'); // run in a loop as soon as previous run is finished

Same task combining await/async and callbacks

jobs.setInterval(function (ready) {
  process.nextTick(async () => {
    try {
      const result = await asyncCall();
      const response = await anotherCall(result.data, ['param']);

      waitForSomethingElse(response, () => {
        ready(); // <-- End of the full execution
      });
    } catch (err) {
      console.log(err)
      ready(); // <-- Always run `ready()`, even if call was unsuccessful
    }
  });
}, 0, 'longRunningAsyncTask'); // run in a loop as soon as previous run is finished

setTimeout(func, delay, uid)

Run a task after delay in ms. setTimeout is useful for cluster - when you need to make sure task executed only once. ready() callback is passed as the first argument into a task function.

jobs.setTimeout(function (ready) {
  /* ...run sync code... */
  ready();
}, 60000, 'syncTaskIn1m'); // will run only once across the cluster in a minute

jobs.setTimeout(function (ready) {
  asyncCall(function () {
    /* ...run async code... */
    ready();
  });
}, 60000, 'asyncTaskIn1m'); // will run only once across the cluster in a minute

jobs.setTimeout(async function () {
  try {
    /* ...code here... */
    await asyncMethod();
    /* ...more code here...*/
  } catch (err) {
    console.log(err)
  }
}, 60000, 'asyncAwaitTaskIn1m'); // will run only once across the cluster in a minute

setImmediate(func, uid)

Immediate execute the function, and only once. setImmediate is useful for cluster - when you need to execute function immediately and only once across all servers. ready() is passed as the first argument into the task function.

jobs.setImmediate(function (ready) {
  //...run sync code
  ready();
}, 'syncTask'); // will run immediately and only once across the cluster

jobs.setImmediate(function (ready) {
  asyncCall(function () {
    //...run more async code
    ready();
  });
}, 'asyncTask'); // will run immediately and only once across the cluster

jobs.setImmediate(async function () {
  try {
    /* ...code here... */
    await asyncMethod();
  } catch (err) {
    console.log(err)
  }
}, 'asyncTask'); // will run immediately and only once across the cluster

clearInterval(timerId)

Cancel current interval timer.

const timer = await jobs.setInterval(func, 34789, 'unique-taskid');
await jobs.clearInterval(timer);

clearTimeout(timerId)

Cancel current timeout timer.

const timer = await jobs.setTimeout(func, 34789, 'unique-taskid');
await jobs.clearTimeout(timer);

destroy()

Destroy JoSk instance. This method shouldn't be called in normal circumstances. Stop internal interval timer. After JoSk is destroyed — calling public methods would end up logged to stdout or if onError hook was passed to JoSk it would receive an error. Only permitted methods are clearTimeout and clearInterval.

// EXAMPLE: DESTROY JoSk INSTANCE UPON SERVER PROCESS TERMINATION
const jobs = new JoSk({ /* ... */ });

const cleanUpBeforeTermination = function () {
  /* ...CLEAN UP AND STOP OTHER THINGS HERE... */
  jobs.destroy();
  process.exit(1);
};

process.stdin.resume();
process.on('uncaughtException', cleanUpBeforeTermination);
process.on('exit', cleanUpBeforeTermination);
process.on('SIGHUP', cleanUpBeforeTermination);

ping()

Ping JoSk instance. Check scheduler readiness and its connection to the "storage adapter"

const jobs = new JoSk({ /* ... */ });

const pingResult = await jobs.ping();
console.log(pingResult)
/**
In case of the successful response
{
  status: 'OK',
  code: 200,
  statusCode: 200,
}

Failed response
{
  status: 'Error reason',
  code: 500,
  statusCode: 500,
  error: ErrorObject
}
*/

Examples

Use cases and usage examples

CRON

Use JoSk to invoke synchronized tasks by CRON schedule, and cron-parser package to parse CRON expressions. To simplify CRON scheduling — grab and use setCron function below:

import parser from 'cron-parser';

const jobsCron = new JoSk({
  adapter: new RedisAdapter({
    client: await createClient({ url: 'redis://127.0.0.1:6379' }).connect(),
    prefix: 'cron-scheduler'
  }),
  minRevolvingDelay: 512, // Adjust revolving delays to higher values
  maxRevolvingDelay: 1000, // as CRON schedule defined to seconds
});

// CRON HELPER FUNCTION
const setCron = async (uniqueName, cronTask, task) => {
  const nextTimestamp = +parser.parseExpression(cronTask).next().toDate();

  return await jobsCron.setInterval(function (ready) {
    ready(parser.parseExpression(cronTask).next().toDate());
    task();
  }, nextTimestamp - Date.now(), uniqueName);
};

setCron('Run every two seconds cron', '*/2 * * * * *', function () {
  console.log(new Date);
});

Pass arguments

Passing arguments can be done via wrapper function

const jobs = new JoSk({ /* ... */ });
const myVar = { key: 'value' };
let myLet = 'Some top level or env.variable (can get changed during runtime)';

const task = function (arg1, arg2, ready) {
  //... code here
  ready();
};

jobs.setInterval((ready) => {
  task(myVar, myLet, ready);
}, 60 * 60000, 'taskA');

jobs.setInterval((ready) => {
  task({ otherKey: 'Another Value' }, 'Some other string', ready);
}, 60 * 60000, 'taskB');

Async/Await with ready() callback

For long-running async tasks, or with callback-apis it might be needed to call ready() explicitly. Wrap task's body into process.nextTick to enjoy await/async combined with classic callback-apis

jobs.setInterval((ready) => {
  process.nextTick(async () => {
    try {
      const result = await asyncCall();
      waitForSomethingElse(async (error, data) => {
        if (error) {
          ready(); // <-- Always run `ready()`, even if call was unsuccessful
          return;
        }

        await saveCollectedData(result, [data]);
        ready(); // <-- End of the full execution
      });
    } catch (err) {
      console.log(err)
      ready(); // <-- Always run `ready()`, even if call was unsuccessful
    }
  });
}, 60 * 60000, 'longRunningTask1h'); // once every hour

Clean up old tasks

During development and tests you may want to clean up Adapter's Storage

Clean up Redis

To clean up old tasks via Redis CLI use the next query pattern:

redis-cli --no-auth-warning KEYS "josk:default:*" | xargs redis-cli --raw --no-auth-warning DEL

# If you're using multiple JoSk instances with prefix:
redis-cli --no-auth-warning KEYS "josk:prefix:*" | xargs redis-cli --raw --no-auth-warning DEL

Clean up MongoDB

To clean up old tasks via MongoDB use the next query pattern:

// Run directly in MongoDB console:
db.getCollection('__JobTasks__').remove({});
// If you're using multiple JoSk instances with prefix:
db.getCollection('__JobTasks__PrefixHere').remove({});

MongoDB connection fine tuning

// Recommended MongoDB connection options
// When used with ReplicaSet
const options = {
  writeConcern: {
    j: true,
    w: 'majority',
    wtimeout: 30000
  },
  readConcern: {
    level: 'majority'
  },
  readPreference: 'primary'
};

MongoClient.connect('mongodb://url', options, (error, client) => {
  // To avoid "DB locks" — it's a good idea to use separate DB from "main" application DB
  const db = client.db('dbName');
  const jobs = new JoSk({
    adapter: new MongoAdapter({
      db: db,
    })
  });
});

Notes

Running Tests

  1. Clone this package
  2. In Terminal (Console) go to directory where package is cloned
  3. Then run:
# Before running tests make sure NODE_ENV === development
# Install NPM dependencies
npm install --save-dev

# Before running tests you need
# to have access to MongoDB and Redis servers
REDIS_URL="redis://127.0.0.1:6379" MONGO_URL="mongodb://127.0.0.1:27017/npm-josk-test-001" npm test

# If previous run has errors — add "debug" to output extra logs
DEBUG=true REDIS_URL="redis://127.0.0.1:6379" MONGO_URL="mongodb://127.0.0.1:27017/npm-josk-test-001" npm test

# Be patient, tests are taking around 6 mins

Run Redis tests only

Run Redis-related tests only

# Before running Redis tests you need to have Redis server installed and running
REDIS_URL="redis://127.0.0.1:6379" npm run test-redis

# Be patient, tests are taking around 3 mins

Run MongoDB tests only

Run MongoDB-related tests only

# Before running Mongo tests you need to have MongoDB server installed and running
MONGO_URL="mongodb://127.0.0.1:27017/npm-josk-test-001" npm run test-mongo

# Be patient, tests are taking around 3 mins

Why JoSk?

JoSk is Job-Task - Is randomly generated name by "uniq" project

Support our open source contribution: