Home

Awesome

Tarantool Queue

Quality Assurance Scrutinizer Code Quality Code Coverage Mentioned in Awesome PHP Telegram

Tarantool is a NoSQL database running in a Lua application server. It integrates Lua modules, called LuaRocks. This package provides PHP bindings for Tarantool Queue LuaRock.

Table of contents

Installation

The recommended way to install the library is through Composer:

composer require tarantool/queue

Before start

In order to use queue, you first need to make sure that your Tarantool instance is configured, up and running. The minimal required configuration might look like this:

-- queues.lua

box.cfg {listen = 3301}

queue = require('queue')
queue.create_tube('foobar', 'fifottl', {if_not_exists = true})

You can read more about the box configuration in the official Tarantool documentation. More information on queue configuration can be found here.

To start the instance you need to copy (or symlink) queues.lua file into the /etc/tarantool/instances.enabled directory and run the following command:

sudo tarantoolctl start queues

Working with queue

Once you have your instance running, you can start by creating a queue object with the queue (tube) name you defined in the Lua script:

use Tarantool\Queue\Queue;

...

$queue = new Queue($client, 'foobar');

where $client is an instance of Tarantool\Client\Client from the tarantool/client package.

Data types

Under the hood Tarantool uses MessagePack binary format to serialize/deserialize data being stored in a queue. It can handle most of the PHP data types (except resources and closures) without any manual pre- or post-processing:

$queue->put('foo');
$queue->put(true);
$queue->put(42);
$queue->put(4.2);
$queue->put(['foo' => ['bar' => ['baz' => null]]]);
$queue->put(new MyObject());

To learn more about object serialization, please follow this link.

Tasks

Most of the Queue API methods return a Task object containing the following getters:

Task::getId()
Task::getState() // States::READY, States::TAKEN, States::DONE, States::BURY or States::DELAYED
Task::getData()

And some sugar methods:

Task::isReady()
Task::isTaken()
Task::isDone()
Task::isBuried()
Task::isDelayed()

Producer API

As you've already seen, to insert a task into a queue you need to call put() method, which accepts two arguments: the data you want to process and optional array of task options, which this particular queue supports. For example, fifottl queue (which we defined earlier in our Lua config file), supports delay, ttl, ttr and pri options:

use Tarantool\Queue\Options;

$queue->put('foo', [Options::DELAY => 30.0]);
$queue->put('bar', [Options::TTL => 5.0]);
$queue->put('baz', [Options::TTR => 10.0, Options::PRI => 42]);

See the full list of available options here.

Consumer API

To reserve a task for execution, call take() method. It accepts an optional timeout parameter. If a timeout value is supplied the call will wait timeout seconds until a READY task appears in the queue. The method returns either a Task object or null:

$taskOrNull = $queue->take();

// wait 2 seconds
$taskOrNull = $queue->take(2.0);

// wait 100 milliseconds
$taskOrNull = $queue->take(.1);

After successful execution, a task can be marked as acknowledged (that will also delete the task from a queue):

$data = $task->getData();

// process $data

$task = $queue->ack($task->getId());

Or put back into the queue in case it cannot be executed:

$task = $queue->release($task->getId());

// for *ttl queues you can specify a delay
$task = $queue->release($task->getId(), [Options::DELAY => 30.0]);

To look at a task without changing its state, use:

$task = $queue->peek($task->getId());

To bury (disable) a task:

$task = $queue->bury($task->getId());

To reset buried task(s) back to READY state:

$count = $queue->kick(3); // kick 3 buried tasks

To increase TTR and/or TTL of a running task (only for *ttl queues):

$taskOrNull = $queue->touch($takenTask->getId(), 5.0); // increase ttr/ttl to 5 seconds

A task (in any state) can be deleted permanently with delete():

$task = $queue->delete($task->getId());

To delete all tasks in a queue:

$queue->truncate();

For a detailed API documentation, please read the section "Using the queue module" of the queue README.

Statistics

The stats() method provides access to the statistical information accumulated since a queue was created:

$stats = $queue->stats();

The result of this call might look like this:

[
    'tasks' => [
        'taken'   => 1,
        'buried'  => 1,
        'ready'   => 1,
        'done'    => 0,
        'delayed' => 0,
        'total'   => 3,
    ],
    'calls' => [
        'bury' => 1,
        'put'  => 3,
        'take' => 1,
        ...
    ],
]

In addition, you can specify a key to return only a subset of the array:

$calls = $queue->stats('calls');
$total = $queue->stats('tasks.total');

Custom methods

Thanks to flexible nature of the queue Lua module, you can easily create your own queue drivers or extend existing ones with an additional functionality. For example, suppose you added the put_many method to your foobar queue, which inserts multiple tasks atomically:

-- queues.lua

...

queue.tube.foobar.put_many = function(self, items)
    local put = {}

    box.begin()
    for k, item in pairs(items) do
        put[k] = tube:put(unpack(item))
    end
    box.commit()

    return put
end

To invoke this method from php, use Queue::call():

$result = $queue->call('put_many', [
    'foo' => ['foo', [Options::DELAY => 30.0]],
    'bar' => ['bar'],
]);

Testing

The easiest way to run tests is with Docker. First, build an image using the dockerfile.sh generator:

./dockerfile.sh | docker build -t queue -

Then run a Tarantool instance (needed for integration tests):

docker network create tarantool-php
docker run -d --net=tarantool-php -p 3301:3301 --name=tarantool \
    -v $(pwd)/tests/Integration/queues.lua:/queues.lua \
    tarantool/tarantool:2 tarantool /queues.lua

And then run both unit and integration tests:

docker run --rm --net=tarantool-php -v $(pwd):/queue -w /queue queue

The library uses PHPUnit under the hood, and if needed, you can pass additional arguments and options to the phpunit command. For example, to run only unit tests, execute:

docker run --rm --net=tarantool-php -v $(pwd):/queue -w /queue queue \
    vendor/bin/phpunit --testsuite=unit

License

The library is released under the MIT License. See the bundled LICENSE file for details.