Awesome
Lowkiq
Ordered background jobs processing
- Rationale
- Description
- Sidekiq comparison
- Queue
- Install
- Api
- Ring app
- Configuration
- Performance
- Execution
- Shutdown
- Debug
- Development
- Exceptions
- Rails integration
- Splitter
- Scheduler
- Recommendations on configuration
- Changing of worker's shards amount
- Extended error info
Rationale
We've faced some problems using Sidekiq while processing messages from a side system. For instance, the message is the data of an order at a particular time. The side system will send new data of an order on every change. Orders are frequently updated and a queue contains some closely located messages of the same order.
Sidekiq doesn't guarantee a strict message order, because a queue is processed by multiple threads. For example, we've received 2 messages: M1 and M2. Sidekiq handlers begin to process them parallel, so M2 can be processed before M1.
Parallel processing of such kind of messages can result in:
- deadlocks
- overwriting new data with an old one
Lowkiq has been created to eliminate such problems by avoiding parallel task processing within one entity.
Description
Lowkiq's queues are reliable i.e., Lowkiq saves information about a job being processed and returns uncompleted jobs to the queue on startup.
Jobs in queues are ordered by preassigned execution time, so they are not FIFO queues.
Every job has its identifier. Lowkiq guarantees that jobs with equal IDs are processed by the same thread.
Every queue is divided into a permanent set of shards. A job is placed into a particular shard based on an id of the job. So jobs with the same id are always placed into the same shard. All jobs of the shard are always processed with the same thread. This guarantees the sequential processing of jobs with the same ids and excludes the possibility of locks.
Besides the id, every job has a payload. Payloads are accumulated for jobs with the same id. So all accumulated payloads will be processed together. It's useful when you need to process only the last message and drop all previous ones.
A worker corresponds to a queue and contains a job processing logic.
The fixed number of threads is used to process all jobs of all queues. Adding or removing queues or their shards won't affect the number of threads.
Sidekiq comparison
If Sidekiq is good for your tasks you should use it. But if you use plugins like sidekiq-grouping, sidekiq-unique-jobs, sidekiq-merger or implement your own lock system, you should look at Lowkiq.
For example, sidekiq-grouping accumulates a batch of jobs then enqueues it and accumulates the next batch. With this approach, a queue can contain two batches with data of the same order. These batches are parallel processed with different threads, so we come back to the initial problem.
Lowkiq was designed to avoid any type of locking.
Furthermore, Lowkiq's queues are reliable. Only Sidekiq Pro or plugins can add such functionality.
This benchmark shows overhead on Redis usage. These are the results for 5 threads, 100,000 blank jobs:
- lowkiq: 155 sec or 1.55 ms per job
- lowkiq +hiredis: 80 sec or 0.80 ms per job
- sidekiq: 15 sec or 0.15 ms per job
This difference is related to different queues structure. Sidekiq uses one list for all workers and fetches the job entirely for O(1). Lowkiq uses several data structures, including sorted sets for keeping ids of jobs. So fetching only an id of a job takes O(log(N)).
Queue
Please, look at the presentation.
Every job has the following attributes:
id
is a job identifier with string type.payloads
is a sorted set of payloads ordered by its score. A payload is an object. A score is a real number.perform_in
is planned execution time. It's a Unix timestamp with a real number type.retry_count
is amount of retries. It's a real number.
For example, id
can be an identifier of a replicated entity.
payloads
is a sorted set ordered by a score of payload and resulted by grouping a payload of the job by its id
.
payload
can be a ruby object because it is serialized by Marshal.dump
.
score
can be payload
's creation date (Unix timestamp) or it's an incremental version number.
By default, score
and perform_in
are current Unix timestamp.
retry_count
for new unprocessed job equals to -1
,
for one-time failed is 0
, so the planned retries are counted, not the performed ones.
Job execution can be unsuccessful. In this case, its retry_count
is incremented, the new perform_in
is calculated with determined formula, and it moves back to a queue.
In case of retry_count
is getting >=
max_retry_count
an element of payloads
with less (oldest) score is moved to a morgue,
rest elements are moved back to the queue, wherein retry_count
and perform_in
are reset to -1
and now()
respectively.
Calculation algorithm for retry_count
and perform_in
- a job's been executed and failed
retry_count++
perform_in = now + retry_in (try_count)
if retry_count >= max_retry_count
the job will be moved to a morgue.
type | retry_count | perform_in |
---|---|---|
new haven't been executed | -1 | set or now() |
new failed | 0 | now() + retry_in(0) |
retry failed | 1 | now() + retry_in(1) |
If max_retry_count = 1
, retries stop.
Job merging rules
They are applied when:
- a job has been in a queue and a new one with the same id is added
- a job is failed, but a new one with the same id has been added
- a job from a morgue is moved back to a queue, but the queue has had a job with the same id
Algorithm:
- payloads are merged, the minimal score is chosen for equal payloads
- if a new job and queued job is merged,
perform_in
andretry_count
is taken from the job from the queue - if a failed job and queued job is merged,
perform_in
andretry_count
is taken from the failed one - if morgue job and queued job is merged,
perform_in = now()
,retry_count = -1
Example:
# v1 is the first version and v2 is the second
# #{"v1": 1} is a sorted set of a single element, the payload is "v1", the score is 1
# a job is in a queue
{ id: "1", payloads: #{"v1": 1, "v2": 2}, retry_count: 0, perform_in: 1536323288 }
# a job which is being added
{ id: "1", payloads: #{"v2": 3, "v3": 4}, retry_count: -1, perform_in: 1536323290 }
# a resulted job in the queue
{ id: "1", payloads: #{"v1": 1, "v2": 3, "v3": 4}, retry_count: 0, perform_in: 1536323288 }
A morgue is a part of a queue. Jobs in a morgue are not processed. A job in a morgue has the following attributes:
- id is the job identifier
- payloads
A job from morgue can be moved back to the queue, retry_count
= 0 and perform_in = now()
would be set.
Install
# Gemfile
gem 'lowkiq'
Redis >= 3.2
Api
module ATestWorker
extend Lowkiq::Worker
self.shards_count = 24
self.batch_size = 10
self.max_retry_count = 5
def self.retry_in(count)
10 * (count + 1) # (i.e. 10, 20, 30, 40, 50)
end
def self.retries_exhausted(batch)
batch.each do |job|
Rails.logger.info "retries exhausted for #{name} with error #{job[:error]}"
end
end
def self.perform(payloads_by_id)
# payloads_by_id is a hash map
payloads_by_id.each do |id, payloads|
# payloads are sorted by score, from old to new (min to max)
payloads.each do |payload|
do_some_work(id, payload)
end
end
end
end
And then you have to add it to Lowkiq in your initializer file due to problems with autoloading:
Lowkiq.workers = [ ATestWorker ]
Default values:
self.shards_count = 5
self.batch_size = 1
self.max_retry_count = 25
self.queue_name = self.name
# i.e. 15, 16, 31, 96, 271, ... seconds + a random amount of time
def retry_in(retry_count)
(retry_count ** 4) + 15 + (rand(30) * (retry_count + 1))
end
ATestWorker.perform_async [
{ id: 0 },
{ id: 1, payload: { attr: 'v1' } },
{ id: 2, payload: { attr: 'v1' }, score: Time.now.to_f, perform_in: Time.now.to_f },
]
# payload by default equals to ""
# score and perform_in by default equals to Time.now.to_f
It is possible to redefine perform_async
and calculate id
, score
и perform_in
in a worker code:
module ATestWorker
extend Lowkiq::Worker
def self.perform_async(jobs)
jobs.each do |job|
job.merge! id: job[:payload][:id]
end
super
end
def self.perform(payloads_by_id)
#...
end
end
ATestWorker.perform_async 1000.times.map { |id| { payload: {id: id} } }
Ring app
Lowkiq::Web
- a ring app.
/
- a dashboard/api/v1/stats
- queue length, morgue length, lag for each worker and total result
Configuration
Options and their default values are:
-
Lowkiq.workers = []
- list of workers to use. Since 1.1.0. -
Lowkiq.poll_interval = 1
- delay in seconds between queue polling for new jobs. Used only if a queue was empty in a previous cycle or an error occurred. -
Lowkiq.threads_per_node = 5
- threads per node. -
Lowkiq.redis = ->() { Redis.new url: ENV.fetch('REDIS_URL') }
- redis connection options -
Lowkiq.client_pool_size = 5
- redis pool size for queueing jobs -
Lowkiq.pool_timeout = 5
- client and server redis pool timeout -
Lowkiq.server_middlewares = []
- a middleware list, used when job is processed -
Lowkiq.client_middlewares = []
- a middleware list, used when job is enqueued -
Lowkiq.on_server_init = ->() {}
- a lambda is being executed when server inits -
Lowkiq.build_scheduler = ->() { Lowkiq.build_lag_scheduler }
is a scheduler -
Lowkiq.build_splitter = ->() { Lowkiq.build_default_splitter }
is a splitter -
Lowkiq.last_words = ->(ex) {}
is an exception handler of descendants ofStandardError
caused the process stop -
Lowkiq.dump_payload = Marshal.method :dump
-
Lowkiq.load_payload = Marshal.method :load
-
Lowkiq.format_error = -> (error) { error.message }
can be used to add error backtrace. Please see Extended error info -
Lowkiq.dump_error = -> (msg) { msg }
can be used to implement a custom compression logic for errors. Recommended when usingLowkiq.format_error
. -
Lowkiq.load_error = -> (msg) { msg }
can be used to implement a custom decompression logic for errors.
$logger = Logger.new(STDOUT)
Lowkiq.server_middlewares << -> (worker, batch, &block) do
$logger.info "Started job for #{worker} #{batch}"
block.call
$logger.info "Finished job for #{worker} #{batch}"
end
Lowkiq.server_middlewares << -> (worker, batch, &block) do
begin
block.call
rescue => e
$logger.error "#{e.message} #{worker} #{batch}"
raise e
end
end
Lowkiq.client_middlewares << -> (worker, batch, &block) do
$logger.info "Enqueueing job for #{worker} #{batch}"
block.call
$logger.info "Enqueued job for #{worker} #{batch}"
end
Performance
Use hiredis for better performance.
# Gemfile
gem "hiredis"
# config
Lowkiq.redis = ->() { Redis.new url: ENV.fetch('REDIS_URL'), driver: :hiredis }
Execution
lowkiq -r ./path_to_app
path_to_app.rb
must load app. Example.
The lazy loading of worker modules is unacceptable.
For preliminarily loading modules use
require
or require_dependency
for Ruby on Rails.
Shutdown
Send TERM or INT signal to the process (Ctrl-C). The process will wait for executed jobs to finish.
Note that if a queue is empty, the process sleeps poll_interval
seconds,
therefore, the process will not stop until the poll_interval
seconds have passed.
Debug
To get trace of all threads of an app:
kill -TTIN <pid>
cat /tmp/lowkiq_ttin.txt
Development
docker-compose run --rm --service-port app bash
bundle
rspec
cd examples/dummy ; bundle exec ../../exe/lowkiq -r ./lib/app.rb
# open localhost:8080
docker-compose run --rm --service-port frontend bash
npm run dumb
# open localhost:8081
# npm run build
# npm run web-api
Exceptions
StandardError
thrown by a worker are handled with middleware. Such exceptions don't lead to process stops.
All other exceptions cause the process to stop. Lowkiq will wait for job execution by other threads.
StandardError
thrown outside of worker are passed to Lowkiq.last_words
.
For example, it can happen when Redis connection is lost or when Lowkiq's code has a bug.
Rails integration
# config/routes.rb
Rails.application.routes.draw do
# ...
mount Lowkiq::Web => '/lowkiq'
# ...
end
# config/initializers/lowkiq.rb
# configuration:
# Lowkiq.redis = -> { Redis.new url: ENV.fetch('LOWKIQ_REDIS_URL') }
# Lowkiq.threads_per_node = ENV.fetch('LOWKIQ_THREADS_PER_NODE').to_i
# Lowkiq.client_pool_size = ENV.fetch('LOWKIQ_CLIENT_POOL_SIZE').to_i
# ...
# since 1.1.0
Lowkiq.workers = [
ATestWorker,
OtherCoolWorker
]
Lowkiq.server_middlewares << -> (worker, batch, &block) do
logger = Rails.logger
tag = "#{worker}-#{Thread.current.object_id}"
logger.tagged(tag) do
time_start = Time.now
logger.info "#{time_start} Started job, batch: #{batch}"
begin
block.call
rescue => e
logger.error e.message
raise e
ensure
time_end = Time.now
logger.info "#{time_end} Finished job, duration: #{time_end - time_start} sec"
end
end
end
# Sentry integration
Lowkiq.server_middlewares << -> (worker, batch, &block) do
opts = {
extra: {
lowkiq: {
worker: worker.name,
batch: batch,
}
}
}
Raven.capture opts do
block.call
end
end
# NewRelic integration
if defined? NewRelic
class NewRelicLowkiqMiddleware
include NewRelic::Agent::Instrumentation::ControllerInstrumentation
def call(worker, batch, &block)
opts = {
category: 'OtherTransaction/LowkiqJob',
class_name: worker.name,
name: :perform,
}
perform_action_with_newrelic_trace opts do
block.call
end
end
end
Lowkiq.server_middlewares << NewRelicLowkiqMiddleware.new
end
# Rails reloader, responsible for cleaning of ActiveRecord connections
Lowkiq.server_middlewares << -> (worker, batch, &block) do
Rails.application.reloader.wrap do
block.call
end
end
Lowkiq.on_server_init = ->() do
[[ActiveRecord::Base, ActiveRecord::Base.configurations[Rails.env]]].each do |(klass, init_config)|
klass.connection_pool.disconnect!
config = init_config.merge 'pool' => Lowkiq.threads_per_node
klass.establish_connection(config)
end
end
Note: In Rails 7, the worker files wouldn't be loaded by default in the initializers since they are managed by the main
autoloader. To solve this, we can wrap setting the workers around the to_prepare
configuration.
Rails.application.config.to_prepare do
Lowkiq.workers = [
ATestWorker,
OtherCoolWorker
]
end
Execution: bundle exec lowkiq -r ./config/environment.rb
Splitter
Each worker has several shards:
# worker: shard ids
worker A: 0, 1, 2
worker B: 0, 1, 2, 3
worker C: 0
worker D: 0, 1
Lowkiq uses a fixed number of threads for job processing, therefore it is necessary to distribute shards between threads. Splitter does it.
To define a set of shards, which is being processed by a thread, let's move them to one list:
A0, A1, A2, B0, B1, B2, B3, C0, D0, D1
Default splitter evenly distributes shards by threads of a single node.
If threads_per_node
is set to 3, the distribution will be:
# thread id: shards
t0: A0, B0, B3, D1
t1: A1, B1, C0
t2: A2, B2, D0
Besides Default Lowkiq has the ByNode splitter. It allows dividing the load by several processes (nodes).
Lowkiq.build_splitter = -> () do
Lowkiq.build_by_node_splitter(
ENV.fetch('LOWKIQ_NUMBER_OF_NODES').to_i,
ENV.fetch('LOWKIQ_NODE_NUMBER').to_i
)
end
So, instead of a single process, you need to execute multiple ones and to set environment variables up:
# process 0
LOWKIQ_NUMBER_OF_NODES=2 LOWKIQ_NODE_NUMBER=0 bundle exec lowkiq -r ./lib/app.rb
# process 1
LOWKIQ_NUMBER_OF_NODES=2 LOWKIQ_NODE_NUMBER=1 bundle exec lowkiq -r ./lib/app.rb
Summary amount of threads are equal product of ENV.fetch('LOWKIQ_NUMBER_OF_NODES')
and Lowkiq.threads_per_node
.
You can also write your own splitter if your app needs an extra distribution of shards between threads or nodes.
Scheduler
Every thread processes a set of shards. The scheduler selects shard for processing. Every thread has its own instance of the scheduler.
Lowkiq has 2 schedulers for your choice.
Seq
sequentially looks over shards.
Lag
chooses shard with the oldest job minimizing the lag. It's used by default.
The scheduler can be set up through settings:
Lowkiq.build_scheduler = ->() { Lowkiq.build_seq_scheduler }
# or
Lowkiq.build_scheduler = ->() { Lowkiq.build_lag_scheduler }
Recommendations on configuration
SomeWorker.shards_count
Sum of shards_count
of all workers shouldn't be less than Lowkiq.threads_per_node
otherwise, threads will stay idle.
Sum of shards_count
of all workers can be equal to Lowkiq.threads_per_node
.
In this case, a thread processes a single shard. This makes sense only with a uniform queue load.
Sum of shards_count
of all workers can be more than Lowkiq.threads_per_node
.
In this case, shards_count
can be counted as a priority.
The larger it is, the more often the tasks of this queue will be processed.
There is no reason to set shards_count
of one worker more than Lowkiq.threads_per_node
,
because every thread will handle more than one shard from this queue, so it increases the overhead.
SomeWorker.max_retry_count
From retry_in
and max_retry_count
, you can calculate the approximate time that a payload of a job will be in a queue.
After max_retry_count
is reached a payload with a minimal score will be moved to a morgue.
For default retry_in
we receive the following table.
def retry_in(retry_count)
(retry_count ** 4) + 15 + (rand(30) * (retry_count + 1))
end
max_retry_count | amount of days of job's life |
---|---|
14 | 1 |
16 | 2 |
18 | 3 |
19 | 5 |
20 | 6 |
21 | 8 |
22 | 10 |
23 | 13 |
24 | 16 |
25 | 20 |
(0...25).map{ |c| retry_in c }.sum / 60 / 60 / 24
Changing of worker's shards amount
Try to count the number of shards right away and don't change it in the future.
If you can disable adding of new jobs, wait for queues to get empty, and deploy the new version of code with a changed amount of shards.
If you can't do it, follow the next steps:
A worker example:
module ATestWorker
extend Lowkiq::Worker
self.shards_count = 5
def self.perform(payloads_by_id)
some_code
end
end
Set the number of shards and the new queue name:
module ATestWorker
extend Lowkiq::Worker
self.shards_count = 10
self.queue_name = "#{self.name}_V2"
def self.perform(payloads_by_id)
some_code
end
end
Add a worker moving jobs from the old queue to the new one:
module ATestMigrationWorker
extend Lowkiq::Worker
self.shards_count = 5
self.queue_name = "ATestWorker"
def self.perform(payloads_by_id)
jobs = payloads_by_id.each_with_object([]) do |(id, payloads), acc|
payloads.each do |payload|
acc << { id: id, payload: payload }
end
end
ATestWorker.perform_async jobs
end
end
Extended error info
For failed jobs, lowkiq only stores error.message
by default. This can be configured by using Lowkiq.format_error
setting.
Lowkiq.dump
and Lowkiq.load_error
can be used to compress and decompress the error messages respectively.
Example:
Lowkiq.format_error = -> (error) { error.full_message(highlight: false) }
Lowkiq.dump_error = Proc.new do |msg|
compressed = Zlib::Deflate.deflate(msg.to_s)
Base64.encode64(compressed)
end
Lowkiq.load_error = Proc.new do |input|
decoded = Base64.decode64(input)
Zlib::Inflate.inflate(decoded)
rescue
input
end