Awesome
delayqueue-php
延迟队列PHP客户端
依赖
- PHP5.4+
- ext-pcntl
安装
composer require start-point/delayqueue-php
使用
<?php
use DelayQueue\Job;
use DelayQueue\Util\Time;
use DelayQueue\DelayQueue;
require 'vendor/autoload.php';
$job = new Job();
$job->setTopic('order');
$job->setId('15702398321');
$job->setDelay(1 * Time::MINUTE);
$job->setTtr(20 * Time::SECOND);
$job->setBody([
'uid' => 10829378,
'created' => 1498657365,
]);
$delayQueue = new DelayQueue('http://127.0.0.1:9277');
// 处理Job的类名
$className = 'Demo\\Handler\\OrderHandler';
try {
// 添加一个Job到队列
$delayQueue->push($className, $job);
// 从队列中删除Job
$delayQueue->delete('15702398321');
} catch (Exception $exception) {
echo $exception->getMessage();
}
运行Worker处理Job
单个Worker
php vendor/bin/delayqueue-php -c /path/to/config.ini
PHP实现多个Worker
#!/usr/bin/env php
<?php
$workerNum = 10;
$command = '/path/to/vendor/bin/delayqueue-php';
$args = ['-c', '/path/to/config.ini'];
for ($i = 0; $i < $workerNum; $i++) {
$pid = pcntl_fork();
if ($pid < 0) {
// fork失败
} else if ($pid === 0) {
// 子进程
pcntl_exec($command, $args);
break;
}
}
Supervisor配置多个Worker
[program:delayqueue-php]
command=/path/to/vendor/bin/delayqueue-php -c /path/to/config.ini
process_name=%(program_name)s_%(process_num)s
numprocs = 10
numprocs_start = 1
autostart=true
autorestart=true
startretries=3
redirect_stderr = true
stdout_logfile=/var/log/supervisor/delayqueue-php/out.log
创建Job处理类
<?php
namespace Demo\Handler;
use DelayQueue\Handler\AbstractHandler;
// 必须继承DelayQueue\Handler\AbstractHandler, 并实现方法perform
class OrderHandler extends AbstractHandler
{
protected function perform()
{
// 未抛出异常视为成功, Job将会被删除
// Job唯一标识
// $this->id;
// Job自定义内容
// $this->body;
}
}