Home

Awesome

Backlite: Type-safe, persistent, embedded task queues and background job runner w/ SQLite

Go Report Card Test License: MIT Go Reference GoT

<p align="center"><img alt="Logo" src="https://raw.githubusercontent.com/mikestefanello/readmeimages/main/backlite/logo.png" /></p>

Table of Contents

Introduction

Overview

Backlite provides type-safe, persistent and embedded task queues meant to run within your application as opposed to an external message broker. A task can be of any type and each type declares its own queue along with the configuration for the queue. Tasks are automatically executed in the background via a configurable worker pool.

Origin

This project started shortly after migrating Pagoda to SQLite from Postgres and Redis. Redis was previously used to handle task queues and I wanted to leverage SQLite instead. Originally, goqite was chosen, which is a great library and one that I took inspiration from, but I had a lot of different ideas for the overall approach and it lacked a number of features that I felt were needed.

River, an excellent, similar library built for Postgres, was also a major source of inspiration and ideas.

Screenshots

<img src="https://raw.githubusercontent.com/mikestefanello/readmeimages/main/backlite/failed.png" alt="Failed tasks"/> <img src="https://raw.githubusercontent.com/mikestefanello/readmeimages/main/backlite/task.png" alt="Task details"/> <img src="https://raw.githubusercontent.com/mikestefanello/readmeimages/main/backlite/task-failed.png" alt="Task failed"/>

More to come as the web UI is under active development.

Status

This project is under active development and is not yet recommended for production use though all features outlined below are available.

Installation

Install by simply running: go get github.com/mikestefanello/backlite

Features

Type-safety

No need to deal with serialization and byte slices. By leveraging generics, tasks and queues are completely type-safe which means that you pass in your task type into a queue and your task processor callbacks will only receive that given type.

Persistence with SQLite

When tasks are added to a queue, they are inserted into a SQLite database table to ensure persistence.

Optional retention

Each queue can have completed tasks retained in a separate table for archiving, auditing, monitoring, etc. Options exist to retain all completed tasks or only those that failed all attempts. An option also exists to retain the task data for all tasks or only those that failed.

Retry & Backoff

Each queue can be configured to retry tasks a certain number of times and to backoff a given amount of time between each attempt.

Scheduled execution

When adding a task to a queue, you can specify a duration or specific time to wait until executing the task.

Logging

Optionally log queue operations with a logger of your choice, as long as it implements the simple Logger interface, which log/slog does.

2024/07/21 14:08:13 INFO task processed id=0190d67a-d8da-76d4-8fb8-ded870d69151 queue=example duration=85.101µs attempt=1

Nested tasks

While processing a given task, it's easy to create another task in the same or a different queue, which allows you to nest tasks to create workflows. Use FromContext() with the provided context to get your initialized client from within the task processor, and add one or many tasks.

Graceful shutdown

The task dispatcher, which handles sending tasks to the worker pool for execution, can be shutdown gracefully by calling Stop() on the client. That will wait for all workers to finish for as long as the passed in context is not cancelled. The hard-stop the dispatcher, cancel the context passed in when calling Start(). See usage below.

Transactions

Task creation can be added to a given database transaction. If you are using SQLite as your primary database, this provides a simple, robust way to ensure data integrity. For example, using the eCommerce app example, when inserting a new order into your database, the same transaction to be used to add a task to send an order notification email, and they either both succeed or both fail. Use the chained method Tx() to provide your transaction when adding one or multiple tasks.

No database polling

Since SQLite only supports one writer, no continuous database polling is required. The task dispatcher is able to remain aware of new tasks and keep track of when future tasks are scheduled for, and thus only queries the database when it needs to.

Driver flexibility

Use any SQLite driver that you'd like. This library only includes go-sqlite3 since it is used in tests.

Bulk inserts

Insert one or many tasks across one or many queues in a single operation.

Execution timeout

Each queue can be configured with an execution timeout for processing a given task. The provided context will cancel after the time elapses. If you want to respect that timeout, your processor code will have to listen for the context cancellation.

Background worker pool

When creating a client, you can specify the amount of goroutines to use to build a worker pool. This pool is created and shutdown via the dispatcher by calling Start() and Stop() on the client. The worker pool is the only way to process tasks; they cannot be pulled manually.

Web UI

A simple web UI to monitor running, upcoming, and completed tasks is provided but under active development.

To run, pass your *sql.DB to ui.NewHandler() and provide that to an HTTP server, for example:

err := http.ListenAndServe(":9000", ui.NewHandler(db))

Then visit the given port and/or domain in your browser (ie, localhost:9000).

The web CSS and JS are provided by tabler.

Usage

Client initialization

First, open a connection to your SQLite database using the driver of your choice:

db, err := sql.Open("sqlite3", "data.db?_journal=WAL&_timeout=5000")

Second, initialize a client:

client, err := backlite.NewClient(backlite.ClientConfig{
    DB:              db,
    Logger:          slog.Default(),
    ReleaseAfter:    10 * time.Minute,
    NumWorkers:      10,
    CleanupInterval: time.Hour,
})

The configuration options are:

Schema installation

Until a more robust system is provided, to install the database schema, call client.Install(). This must be done prior to using the client. It is safe to call this if the schema was previously installed. The schema is currently defined in internal/query/schema.sql.

Declaring a Task type

Any type can be a task as long as it implements the Task interface, which requires only the Config() QueueConfig method, used to provide information about the queue that these tasks will be added to. As an example, this is a task used to send new order email notifications:

type NewOrderEmailTask struct {
    OrderID string
    EmailAddress string
}

Then implement the Task interface by providing the queue configuration:

func (t NewOrderEmailTask) Config() backlite.QueueConfig {
    return backlite.QueueConfig{
        Name:        "NewOrderEmail",
        MaxAttempts: 5,
        Backoff:     5 * time.Second,
        Timeout:     10 * time.Second,
        Retention: &backlite.Retention{
            Duration:   6 * time.Hour,
            OnlyFailed: false,
            Data: &backlite.RetainData{
                OnlyFailed: true,
            },
        },
    }
}

The configuration options are:

Queue processor

The easiest way to implement a queue and define the processor is to use backlite.NewQueue(). This leverages generics in order to provide type-safety with a given task type. Using the example above:

processor := func(ctx context.Context, task NewOrderEmailTask) error {
    return email.Send(ctx, task.EmailAddress, fmt.Sprintf("Order %s received", task.OrderID))
}

queue := backlite.NewQueue[NewOrderEmailTask](processor)

The parameter is the processor callback which is what will be called by the dispatcher worker pool to execute the task. If no error is returned, the task is considered successfully executed. If the task fails all attempts and the queue has retention enabled, the value of the error will be stored in the database.

The provided context will be set to timeout at the duration set in the queue settings, if provided. To get the client from the context, you can call client := backlite.FromContext(ctx).

Registering a queue

You must register all queues with the client by calling client.Register(queue). This will panic if duplicate queue names are registered.

Adding tasks

To add a task to the queue, simply pass one or many into client.Add(). You can provide tasks of different types. This returns a chainable operation which contains many options, that can be used as follows:

err := client.
    Add(task1, task2).
    Ctx(ctx).
    Tx(tx).
    At(time.Date(2024, 1, 5, 12, 30, 00)).
    Wait(15 * time.Minute).
    Save()

Only Add() and Save() are required. Don't use At() and Wait() together as they override each other.

The options are:

Starting the dispatcher

To start the dispatcher, which will spin up the worker pool and begin executing tasks in the background, call client.Start(). The context you pass in must persist for as long as you want the dispatcher to continue working. If that is ever cancelled, the dispatcher will shutdown. See the next section for more details.

Shutting down the dispatcher

To gracefully shutdown the dispatcher, which will wait until all tasks currently being executed are finished, call client.Stop(). You can provide a context with a given timeout in order to give the shutdown process a set amount of time to gracefully shutdown. For example:

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
client.Stop(ctx)

This will wait up to 5 seconds for all workers to complete the task they are currently working on.

If you want to hard-stop the dispatcher, cancel the context that was provided when calling client.Start().

Roadmap