Home

Awesome

PerfectQueue

Build Status Coverage Status

PerfectQueue is a highly available distributed queue built on top of RDBMS. PerfectQueue provides similar API to Amazon SQS, while PerfectQueue focuses on reliability and flexible scheduling rather than scalability.

PerfectQueue introduces following concepts:

All you have to consider is implementing idempotent worker programs. PerfectQueue manages the other problems.

API overview

# open a queue
PerfectQueue.open(config, &block)  #=> #<Queue>

# submit a task
Queue#submit(task_id, type, data, options={})

# poll a task
# (you don't have to use this method directly. see following sections)
Queue#poll  #=> #<AcquiredTask>

# get data associated with a task
AcquiredTask#data  #=> #<Hash>

# finish a task
AcquiredTask#finish!

# retry a task
AcquiredTask#retry!

# create a task reference
Queue#[](key)  #=> #<Task>

# chack the existance of the task
Task#exists?

# force finish a task
# be aware that worker programs can't detect it
Task#force_finish!

Error classes

TaskError

##
# Workers may get these errors:
#

AlreadyFinishedError < TaskError

PreemptedError < TaskError

ProcessStopError < RuntimeError

ImmediateProcessStopError < ProcessStopError

GracefulProcessStopError < ProcessStopError

##
# Client or other situation:
#

ConfigError < RuntimeError

NotFoundError < TaskError

AlreadyExistsError < TaskError

NotSupportedError < TaskError

Example

# submit tasks
PerfectQueue.open(config) {|queue|
  data = {'key'=>"value"}
  queue.submit("task-id", "type1", data)
}

Writing a worker application

1. Implement PerfectQueue::Application::Base

class TestHandler < PerfectQueue::Application::Base
  # implement run method
  def run
    # do something ...
    puts "acquired task: #{task.inspect}"

    # call task.finish!, task.retry! or task.release!
    task.finish!
  end
end

2. Implement PerfectQueue::Application::Dispatch

class Dispatch < PerfectQueue::Application::Dispatch
  # describe routing
  route "type1" => TestHandler
  route /^regexp-.*$/ => :TestHandler  # String or Regexp => Class or Symbol
end

3. Run the worker

In a launcher script or rake file:

system('perfectqueue run -I. -rapp/workers/dispatch Dispatch')

or:

require 'perfectqueue'
require 'app/workers/dispatch'

PerfectQueue::Worker.run(Dispatch) {
  # this method is called when the worker process is restarted
  raw = File.read('config/perfectqueue.yml')
  yml = YAJL.load(raw)
  yml[ENV['RAILS_ENV'] || 'development']
}

Signal handlers

Configuration

Backend types

rdb_compat

additional configuration:

rdb

Not implemented yet.

config/perfectqueue.yml Example

development:
  type: rdb_compat
  url: mysql2://root:@localhost:3306/perfectqueue
  table: queues

Command line management tool

Usage: perfectqueue [options] <command>

commands:
    list                             Show list of tasks
    submit <key> <type> <data>       Submit a new task
    force_finish <key>               Force finish a task
    run <class>                      Run a worker process
    init                             Initialize a backend database

options:
    -e, --environment ENV            Framework environment (default: development)
    -c, --config PATH.yml            Path to a configuration file (default: config/perfectqueue.yml)

options for submit:
    -u, --user USER                  Set user
    -t, --time UNIXTIME              Set time to run the task

options for run:
    -I, --include PATH               Add $LOAD_PATH directory
    -r, --require PATH               Require files before starting

initializing a database

# assume that the config/perfectqueue.yml exists
$ perfectqueue init

submitting a task

$ perfectqueue submit k1 user_task '{"uid":1}' -u user_1

listing tasks

$ perfectqueue list
                           key            type               user             status                   created_at                      timeout   data
                            k1       user_task             user_1            waiting    2012-05-18 13:05:31 -0700    2012-05-18 14:27:36 -0700   {"uid"=>1, "type"=>"user_task"}
                            k2       user_task             user_2            waiting    2012-05-18 13:35:33 -0700    2012-05-18 14:35:33 -0700   {"uid"=>2, "type"=>"user_task"}
                            k3     system_task                               waiting    2012-05-18 14:04:02 -0700    2012-05-22 15:04:02 -0700   {"task_id"=>32, "type"=>"system_task"}
3 entries.

force finish a tasks

$ perfectqueue force_finish k2

running a worker

$ perfectqueue run -I. -Ilib -rconfig/boot.rb -rapps/workers/task_dispatch.rb TaskDispatch

Test

Running spec utilize 'mysql2://root:@localhost/perfectqueue_test' as the connection string. Please install MySQL server at localhost then run;

$ mysql -h localhost -u root -e 'create database perfectqueue_test;'

You can run spec.

$ bundle exec rake spec