Home

Awesome

Toniq

Simple and reliable background job processing library for Elixir.

asciicast

If anything is unclear about how this library works or what an error message means that's considered a bug, please file an issue (or a pull request)!

--

Status: Relatively new. Used quite a lot of apps since 1.0 (nov, 2015). If you like, ping @joakimk about how you use toniq and for what size/type of app.

Installation

Add as a dependency in your mix.exs file:

defp deps do
  [
    {:exredis, ">= 0.2.4"},
    {:toniq, "~> 1.0"}
  ]
end

And run:

mix deps.get

Then add :toniq to the list of applications in mix.exs.

And configure toniq in different environments:

config :toniq, redis_url: "redis://localhost:6379/0"
# config :toniq, redis_url: System.get_env("REDIS_PROVIDER")

If you have multiple apps using the same redis server, then don't forget to also configure redis_key_prefix.

Dynamic redis url

If you need to configure redis dynamically after the application starts, you can use redis_url_provider to block until a redis_url is available.

config :toniq, redis_url_provider: fn -> wait_for_redis_url_to_be_available end

Usage

Define a worker:

defmodule SendEmailWorker do
  use Toniq.Worker

  def perform(to: to, subject: subject, body: body) do
    # do work
  end
end

Enqueue jobs somewhere in your app code:

Toniq.enqueue(SendEmailWorker, to: "info@example.com", subject: "Hello", body: "Hello, there!")

Pipelines

You can also enqueue jobs using |> like this:

email = [to: "info@example.com", subject: "Hello", body: "Hello, there!"]

email
|> Toniq.enqueue_to(SendEmailWorker)

Delayed jobs

And delay jobs.

email = [to: "info@example.com", subject: "Hello", body: "Hello, there!"]

# Using enqueue_to:
email
|> Toniq.enqueue_to(SendEmailWorker, delay_for: 1000)

# Using enqueue_with_delay:
Toniq.enqueue_with_delay(SendEmailWorker, email, delay_for: 1000)

Pattern matching

You can pattern match in workers. This can be used to clean up the code, or to handle data from previous versions of the same worker!

defmodule SendMessageWorker do
  use Toniq.Worker

  def perform(message: "", receipient: _receipient) do
    # don't send empty messages
  end

  def perform(message: message, receipient: receipient) do
    SomeMessageService.send(message, receipient)
  end
end

Limiting concurrency

For some workers you might want to limit the number of jobs that run at the same time. For example, if you call out to a API, you most likely don't want more than 3-10 connections at once.

You can set this by specifying the max_concurrency option on a worker.

defmodule RegisterInvoiceWorker do
  use Toniq.Worker, max_concurrency: 10

  def perform(attributes) do
    # do work
  end
end

Retrying failed jobs

An admin web UI is planned, but until then (and after that) you can use the console.

Retrying all failed jobs:

iex -S mix
iex> Toniq.failed_jobs |> Enum.each &Toniq.retry/1

Retrying one at a time:

iex> job = Toniq.failed_jobs |> hd
iex> Toniq.retry(job)

Or delete the failed job:

iex> job = Toniq.failed_jobs |> hd
iex> Toniq.delete(job)

Automatic retries

Jobs will be retried automatically when they fail. This can be customized, or even disabled by configuring a retry strategy for toniq (keep in mind that a system crash will still cause a job to be run more than once in some cases even if retries are disabled).

The default strategy is Toniq.RetryWithIncreasingDelayStrategy, which will retry a job 5 times after the initial run with increasing delay between each. Delays are approximately: 250 ms, 1 second, 20 seconds, 1 minute and 2.5 minutes. In total about 4 minutes (+ 6 x job run time) before the job is marked as failed.

An alternative is Toniq.RetryWithoutDelayStrategy which just retries twice without delay (this is used in toniq tests).

config :toniq, retry_strategy: Toniq.RetryWithoutDelayStrategy
# config :toniq, retry_strategy: YourCustomStrategy

Load balancing

As toniq only runs jobs within the VM that enqueued them, it's up to you to enqueue jobs in different VMs if you want to run more of them concurrently than a single Erlang VM can handle.

This could be as simple as web requests to load balanced web servers enqueuing jobs within each web server, or as complex as a custom redis pub-sub system.

Alternatively you can use Toniq.JobImporter to pass jobs to a random VM. It has a little delay due to being a polling system.

identifier = Toniq.KeepalivePersistence.registered_vms |> Enum.shuffle |> hd
Toniq.JobPersistence.store_incoming_job(Toniq.TestWorker, [], identifier)

Designed to avoid complexity

Instead of using redis as a messaging queue, toniq uses it for backup.

Jobs are run within the VM where they are enqueued. If a VM is stopped or crashes, unprocessed jobs are recovered from redis once another VM is running.

By running jobs within the same VM that enqueues them we avoid having to use any locks in redis. Locking is a complex subject and very hard to get right. Toniq should be simple and reliable, so let's avoid locking!

Request for feedback

I would like to know how using this tool works out for you. Any problems? Anything that was hard to understand from the docs? What scale do you run jobs on? Works great? Better than something you've used before? Missing some feature you're used to?

Ping @joakimk or open an issue.

FAQ

Why have a job queue at all?

Will jobs be run in order?

This is a first-in-first-out queue but due to retries and concurrency, ordering can not be guaranteed.

How are jobs serialized when stored in redis?

Jobs are serialized using erlang serialization. It's the same format that is used when distributed nodes communicate. This means you can pass almost anything to jobs.

What happens if the serialization format changes?

There is code in place to automatically migrate old versions of jobs.

If an Erlang VM stops with unprocessed jobs in its queue, how are those jobs handled?

As soon as another Erlang VM is running it will find the jobs in redis, move them into it's own queue and run them. It may take a little while before this happens (10-15 seconds or so), so that the original VM has a chance to report in and retain it's jobs.

Why will jobs be run more than once in rare cases?

If something really unexpected happens and a job can't be marked as finished after being run, this library prefers to run it twice (or more) rather than not at all.

Unexpected things include something killing the erlang VM, an unexpected crash within the job runner, or problems with the redis connection at the wrong time.

You can solve this in two ways:

I tend to prefer the first alternative in whenever possible.

How do I run scheduled or recurring jobs?

There is no built-in support yet, but you can use tools like https://github.com/c-rack/quantum-elixir to schedule toniq jobs.

config :quantum, cron: [
  # Every 15 minutes
  "*/15 * * * *": fn -> Toniq.enqueue(SomeWorker) end
]

Notes

This project uses mix format to format the code. Ensure that you run that when you make changes. One easy way is to have an editor plugin run it for you when you save.

Versioning

This library uses semver for versioning. The API won't change in incompatible ways within the same major version, etc. The version is specified in mix.exs.

Credits

Presentations featuring toniq

Contributing

Development

mix deps.get
mix test

While developing you can use mix test.watch --stale to run tests as you save files.

You can also try toniq in dev using Toniq.TestWorker.

iex -S mix
iex> Toniq.enqueue(Toniq.TestWorker)
iex> Toniq.enqueue(Toniq.TestWorker, :fail_once)

TODO and ideas for after 1.0

License

Copyright (c) 2015-2017 Joakim Kolsjö

MIT License

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.