Home

Awesome

gocraft/work GoDoc

gocraft/work lets you enqueue and processes background jobs in Go. Jobs are durable and backed by Redis. Very similar to Sidekiq for Go.

Enqueue new jobs

To enqueue jobs, you need to make an Enqueuer with a redis namespace and a redigo pool. Each enqueued job has a name and can take optional arguments. Arguments are k/v pairs (serialized as JSON internally).

package main

import (
	"github.com/gomodule/redigo/redis"
	"github.com/gocraft/work"
)

// Make a redis pool
var redisPool = &redis.Pool{
	MaxActive: 5,
	MaxIdle: 5,
	Wait: true,
	Dial: func() (redis.Conn, error) {
		return redis.Dial("tcp", ":6379")
	},
}

// Make an enqueuer with a particular namespace
var enqueuer = work.NewEnqueuer("my_app_namespace", redisPool)

func main() {
	// Enqueue a job named "send_email" with the specified parameters.
	_, err := enqueuer.Enqueue("send_email", work.Q{"address": "test@example.com", "subject": "hello world", "customer_id": 4})
	if err != nil {
		log.Fatal(err)
	}
}


Process jobs

In order to process jobs, you'll need to make a WorkerPool. Add middleware and jobs to the pool, and start the pool.

package main

import (
	"github.com/gomodule/redigo/redis"
	"github.com/gocraft/work"
	"os"
	"os/signal"
)

// Make a redis pool
var redisPool = &redis.Pool{
	MaxActive: 5,
	MaxIdle: 5,
	Wait: true,
	Dial: func() (redis.Conn, error) {
		return redis.Dial("tcp", ":6379")
	},
}

type Context struct{
    customerID int64
}

func main() {
	// Make a new pool. Arguments:
	// Context{} is a struct that will be the context for the request.
	// 10 is the max concurrency
	// "my_app_namespace" is the Redis namespace
	// redisPool is a Redis pool
	pool := work.NewWorkerPool(Context{}, 10, "my_app_namespace", redisPool)

	// Add middleware that will be executed for each job
	pool.Middleware((*Context).Log)
	pool.Middleware((*Context).FindCustomer)

	// Map the name of jobs to handler functions
	pool.Job("send_email", (*Context).SendEmail)

	// Customize options:
	pool.JobWithOptions("export", work.JobOptions{Priority: 10, MaxFails: 1}, (*Context).Export)

	// Start processing jobs
	pool.Start()

	// Wait for a signal to quit:
	signalChan := make(chan os.Signal, 1)
	signal.Notify(signalChan, os.Interrupt, os.Kill)
	<-signalChan

	// Stop the pool
	pool.Stop()
}

func (c *Context) Log(job *work.Job, next work.NextMiddlewareFunc) error {
	fmt.Println("Starting job: ", job.Name)
	return next()
}

func (c *Context) FindCustomer(job *work.Job, next work.NextMiddlewareFunc) error {
	// If there's a customer_id param, set it in the context for future middleware and handlers to use.
	if _, ok := job.Args["customer_id"]; ok {
		c.customerID = job.ArgInt64("customer_id")
		if err := job.ArgError(); err != nil {
			return err
		}
	}

	return next()
}

func (c *Context) SendEmail(job *work.Job) error {
	// Extract arguments:
	addr := job.ArgString("address")
	subject := job.ArgString("subject")
	if err := job.ArgError(); err != nil {
		return err
	}

	// Go ahead and send the email...
	// sendEmailTo(addr, subject)

	return nil
}

func (c *Context) Export(job *work.Job) error {
	return nil
}

Redis Cluster

If you're attempting to use gocraft/work on a Redis Cluster deployment, then you may encounter a CROSSSLOT Keys in request don't hash to the same slot error during the execution of the various lua scripts used to manage job data (see Issue 93). The current workaround is to force the keys for an entire namespace for a given worker pool on a single node in the cluster using Redis Hash Tags. Using the example above:

func main() {
	// Make a new pool. Arguments:
	// Context{} is a struct that will be the context for the request.
	// 10 is the max concurrency
	// "my_app_namespace" is the Redis namespace and the {} chars forces all of the keys onto a single node
	// redisPool is a Redis pool
	pool := work.NewWorkerPool(Context{}, 10, "{my_app_namespace}", redisPool)

Note this is not an issue for Redis Sentinel deployments.

Special Features

Contexts

Just like in gocraft/web, gocraft/work lets you use your own contexts. Your context can be empty or it can have various fields in it. The fields can be whatever you want - it's your type! When a new job is processed by a worker, we'll allocate an instance of this struct and pass it to your middleware and handlers. This allows you to pass information from one middleware function to the next, and onto your handlers.

Custom contexts aren't really needed for trivial example applications, but are very important for production apps. For instance, one field in your context can be your tagged logger. Your tagged logger augments your log statements with a job-id. This lets you filter your logs by that job-id.

Check-ins

Since this is a background job processing library, it's fairly common to have jobs that that take a long time to execute. Imagine you have a job that takes an hour to run. It can often be frustrating to know if it's hung, or about to finish, or if it has 30 more minutes to go.

To solve this, you can instrument your jobs to "checkin" every so often with a string message. This checkin status will show up in the web UI. For instance, your job could look like this:

func (c *Context) Export(job *work.Job) error {
	rowsToExport := getRows()
	for i, row := range rowsToExport {
		exportRow(row)
		if i % 1000 == 0 {
			job.Checkin("i=" + fmt.Sprint(i))   // Here's the magic! This tells gocraft/work our status
		}
	}
}

Then in the web UI, you'll see the status of the worker:

NameArgumentsStarted AtCheck-in AtCheck-in
export{"account_id": 123}2016/07/09 04:16:512016/07/09 05:03:13i=335000

Scheduled Jobs

You can schedule jobs to be executed in the future. To do so, make a new Enqueuer and call its EnqueueIn method:

enqueuer := work.NewEnqueuer("my_app_namespace", redisPool)
secondsInTheFuture := 300
_, err := enqueuer.EnqueueIn("send_welcome_email", secondsInTheFuture, work.Q{"address": "test@example.com"})

Unique Jobs

You can enqueue unique jobs so that only one job with a given name/arguments exists in the queue at once. For instance, you might have a worker that expires the cache of an object. It doesn't make sense for multiple such jobs to exist at once. Also note that unique jobs are supported for normal enqueues as well as scheduled enqueues.

enqueuer := work.NewEnqueuer("my_app_namespace", redisPool)
job, err := enqueuer.EnqueueUnique("clear_cache", work.Q{"object_id_": "123"}) // job returned
job, err = enqueuer.EnqueueUnique("clear_cache", work.Q{"object_id_": "123"}) // job == nil -- this duplicate job isn't enqueued.
job, err = enqueuer.EnqueueUniqueIn("clear_cache", 300, work.Q{"object_id_": "789"}) // job != nil (diff id)

Alternatively, you can provide your own key for making a job unique. When another job is enqueued with the same key as a job already in the queue, it will simply update the arguments.

enqueuer := work.NewEnqueuer("my_app_namespace", redisPool)
job, err := enqueuer.EnqueueUniqueByKey("clear_cache", work.Q{"object_id_": "123"}, map[string]interface{}{"my_key": "586"})
job, err = enqueuer.EnqueueUniqueInByKey("clear_cache", 300, work.Q{"object_id_": "789"}, map[string]interface{}{"my_key": "586"})

For information on how this map will be serialized to form a unique key, see (https://golang.org/pkg/encoding/json/#Marshal).

Periodic Enqueueing (Cron)

You can periodically enqueue jobs on your gocraft/work cluster using your worker pool. The scheduling specification uses a Cron syntax where the fields represent seconds, minutes, hours, day of the month, month, and week of the day, respectively. Even if you have multiple worker pools on different machines, they'll all coordinate and only enqueue your job once.

pool := work.NewWorkerPool(Context{}, 10, "my_app_namespace", redisPool)
pool.PeriodicallyEnqueue("0 0 * * * *", "calculate_caches") // This will enqueue a "calculate_caches" job every hour
pool.Job("calculate_caches", (*Context).CalculateCaches) // Still need to register a handler for this job separately

Job concurrency

You can control job concurrency using JobOptions{MaxConcurrency: <num>}. Unlike the WorkerPool concurrency, this controls the limit on the number jobs of that type that can be active at one time by within a single redis instance. This works by putting a precondition on enqueuing function, meaning a new job will not be scheduled if we are at or over a job's MaxConcurrency limit. A redis key (see redis.go::redisKeyJobsLock) is used as a counting semaphore in order to track job concurrency per job type. The default value is 0, which means "no limit on job concurrency".

Note: if you want to run jobs "single threaded" then you can set the MaxConcurrency accordingly:

      worker_pool.JobWithOptions(jobName, JobOptions{MaxConcurrency: 1}, (*Context).WorkFxn)

Run the Web UI

The web UI provides a view to view the state of your gocraft/work cluster, inspect queued jobs, and retry or delete dead jobs.

Building an installing the binary:

go get github.com/gocraft/work/cmd/workwebui
go install github.com/gocraft/work/cmd/workwebui

Then, you can run it:

workwebui -redis="redis:6379" -ns="work" -listen=":5040"

Navigate to http://localhost:5040/.

You'll see a view that looks like this:

Web UI Screenshot

Design and concepts

Enqueueing jobs

Scheduling algorithm

Processing a job

Workers and WorkerPools

Retry job, scheduled jobs, and the requeuer

Dead jobs

The reaper

Unique jobs

Periodic jobs

Paused jobs

Terminology reference

Benchmarks

The benches folder contains various benchmark code. In each case, we enqueue 100k jobs across 5 queues. The jobs are almost no-op jobs: they simply increment an atomic counter. We then measure the rate of change of the counter to obtain our measurement.

LibrarySpeed
gocraft/work20944 jobs/s
jrallison/go-workers19945 jobs/s
benmanns/goworker10328.5 jobs/s
albrow/jobs40 jobs/s

gocraft

gocraft offers a toolkit for building web apps. Currently these packages are available:

These packages were developed by the engineering team at UserVoice and currently power much of its infrastructure and tech stack.

Authors