Rate controls for your ActiveJobs, powered by Suo, a distributed semaphore library backed by Redis or Memcached.


Add this line to your application's Gemfile:

gem 'activejob-traffic_control'

And then execute:

$ bundle

Or install it yourself as:

$ gem install activejob-traffic_control


ActiveJob::TrafficControl adds three modules you can mixin to your job classes as needed, or to ApplicationJob if you are using ActiveJob 5+ (or you have created a base job class yourself).

# to initialize the type of locking client (memcached vs. redis):
ActiveJob::TrafficControl.client = ConnectionPool.new(size: 5, timeout: 5) { Redis.new } # set poolthresholds as needed
# or, ActiveJob::TrafficControl.client = ConnectionPool.new(size: 5, timeout: 5) { Dalli::Client.new }
# or if not multithreaded, ActiveJob::TrafficControl.client = Redis.new


class CanThrottleJob < ActiveJob::Base
  throttle threshold: 2, period: 1.second

  def perform
    # no more than two of `CanThrottleJob` will run every second
    # if more than that attempt to run, they will be re-enqueued to run in a random time
    # ranging from 1 - 5x the period (so, 1-5 seconds in this case)

If you do not care about the job being re-enqueued (if it's scheduled to run otherwise, or dropping will have no ill effect), you can specify drop: true instead. The drop: true flag also applies to Concurrency, below.

class CanThrottleAndDropJob < ActiveJob::Base
  throttle threshold: 2, period: 1.second, drop: true

  def perform
    # no more than two of `CanThrottleJob` will run every second
    # if more than that attempt to run, they will be dropped


class ConcurrencyTestJob < ActiveJob::Base
  concurrency 5, drop: false

  def perform
    # only five `ConcurrencyTestJob` will ever run simultaneously


For Disable, you also need to configure the cache client:

ActiveJob::TrafficControl.cache_client = Rails.cache.dalli # if using :dalli_store
# or ActiveJob::TrafficControl.cache_client = ActiveSupport::Cache.lookup_store(:dalli_store, "localhost:11211")
class CanDisableJob < ActiveJob::Base
  def perform
    # you can pause this job from running by executing `CanDisableJob.disable!` (which will cause the job to be re-enqueued),
    # or have it be dropped entirely via `CanDisableJob.disable!(drop: true)`
    # enable it again via `CanDisableJob.enable!`


