Awesome
Tarantool Queue
Tarantool is a NoSQL database running in a Lua application server. It integrates Lua modules, called LuaRocks. This package provides PHP bindings for Tarantool Queue LuaRock.
Table of contents
Installation
The recommended way to install the library is through Composer:
composer require tarantool/queue
Before start
In order to use queue, you first need to make sure that your Tarantool instance is configured, up and running. The minimal required configuration might look like this:
-- queues.lua
box.cfg {listen = 3301}
queue = require('queue')
queue.create_tube('foobar', 'fifottl', {if_not_exists = true})
You can read more about the box configuration in the official Tarantool documentation. More information on queue configuration can be found here.
To start the instance you need to copy (or symlink) queues.lua
file into the /etc/tarantool/instances.enabled
directory and run the following command:
sudo tarantoolctl start queues
Working with queue
Once you have your instance running, you can start by creating a queue object with the queue (tube) name you defined in the Lua script:
use Tarantool\Queue\Queue;
...
$queue = new Queue($client, 'foobar');
where $client
is an instance of Tarantool\Client\Client
from the tarantool/client package.
Data types
Under the hood Tarantool uses MessagePack binary format to serialize/deserialize data being stored in a queue. It can handle most of the PHP data types (except resources and closures) without any manual pre- or post-processing:
$queue->put('foo');
$queue->put(true);
$queue->put(42);
$queue->put(4.2);
$queue->put(['foo' => ['bar' => ['baz' => null]]]);
$queue->put(new MyObject());
To learn more about object serialization, please follow this link.
Tasks
Most of the Queue API methods return a Task object containing the following getters:
Task::getId()
Task::getState() // States::READY, States::TAKEN, States::DONE, States::BURY or States::DELAYED
Task::getData()
And some sugar methods:
Task::isReady()
Task::isTaken()
Task::isDone()
Task::isBuried()
Task::isDelayed()
Producer API
As you've already seen, to insert a task into a queue you need to call put()
method, which accepts
two arguments: the data you want to process and optional array of task options, which this particular
queue supports. For example, fifottl
queue (which we defined earlier in our Lua config
file), supports delay
, ttl
, ttr
and pri
options:
use Tarantool\Queue\Options;
$queue->put('foo', [Options::DELAY => 30.0]);
$queue->put('bar', [Options::TTL => 5.0]);
$queue->put('baz', [Options::TTR => 10.0, Options::PRI => 42]);
See the full list of available options here.
Consumer API
To reserve a task for execution, call take()
method. It accepts an optional timeout
parameter.
If a timeout value is supplied the call will wait timeout
seconds until a READY
task appears in the queue.
The method returns either a Task object or null
:
$taskOrNull = $queue->take();
// wait 2 seconds
$taskOrNull = $queue->take(2.0);
// wait 100 milliseconds
$taskOrNull = $queue->take(.1);
After successful execution, a task can be marked as acknowledged (that will also delete the task from a queue):
$data = $task->getData();
// process $data
$task = $queue->ack($task->getId());
Or put back into the queue in case it cannot be executed:
$task = $queue->release($task->getId());
// for *ttl queues you can specify a delay
$task = $queue->release($task->getId(), [Options::DELAY => 30.0]);
To look at a task without changing its state, use:
$task = $queue->peek($task->getId());
To bury (disable) a task:
$task = $queue->bury($task->getId());
To reset buried task(s) back to READY
state:
$count = $queue->kick(3); // kick 3 buried tasks
To increase TTR and/or TTL of a running task (only for *ttl queues):
$taskOrNull = $queue->touch($takenTask->getId(), 5.0); // increase ttr/ttl to 5 seconds
A task (in any state) can be deleted permanently with delete()
:
$task = $queue->delete($task->getId());
To delete all tasks in a queue:
$queue->truncate();
For a detailed API documentation, please read the section "Using the queue module" of the queue README.
Statistics
The stats()
method provides access to the statistical information accumulated
since a queue was created:
$stats = $queue->stats();
The result of this call might look like this:
[
'tasks' => [
'taken' => 1,
'buried' => 1,
'ready' => 1,
'done' => 0,
'delayed' => 0,
'total' => 3,
],
'calls' => [
'bury' => 1,
'put' => 3,
'take' => 1,
...
],
]
In addition, you can specify a key to return only a subset of the array:
$calls = $queue->stats('calls');
$total = $queue->stats('tasks.total');
Custom methods
Thanks to flexible nature of the queue Lua module, you can easily create
your own queue drivers or extend existing ones with an additional functionality. For example, suppose you added
the put_many
method to your foobar
queue, which inserts multiple tasks atomically:
-- queues.lua
...
queue.tube.foobar.put_many = function(self, items)
local put = {}
box.begin()
for k, item in pairs(items) do
put[k] = tube:put(unpack(item))
end
box.commit()
return put
end
To invoke this method from php, use Queue::call()
:
$result = $queue->call('put_many', [
'foo' => ['foo', [Options::DELAY => 30.0]],
'bar' => ['bar'],
]);
Testing
The easiest way to run tests is with Docker. First, build an image using the dockerfile.sh generator:
./dockerfile.sh | docker build -t queue -
Then run a Tarantool instance (needed for integration tests):
docker network create tarantool-php
docker run -d --net=tarantool-php -p 3301:3301 --name=tarantool \
-v $(pwd)/tests/Integration/queues.lua:/queues.lua \
tarantool/tarantool:2 tarantool /queues.lua
And then run both unit and integration tests:
docker run --rm --net=tarantool-php -v $(pwd):/queue -w /queue queue
The library uses PHPUnit under the hood, and if needed,
you can pass additional arguments and options to the phpunit
command.
For example, to run only unit tests, execute:
docker run --rm --net=tarantool-php -v $(pwd):/queue -w /queue queue \
vendor/bin/phpunit --testsuite=unit
License
The library is released under the MIT License. See the bundled LICENSE file for details.