Home

Awesome

delayqueue-php

延迟队列PHP客户端

Build Status Latest Stable Version Total Downloads Scrutinizer Code Quality Latest Unstable Version License composer.lock available

依赖

安装

composer require start-point/delayqueue-php

使用

<?php

use DelayQueue\Job;
use DelayQueue\Util\Time;
use DelayQueue\DelayQueue;

require 'vendor/autoload.php';

$job = new Job();
$job->setTopic('order');
$job->setId('15702398321');
$job->setDelay(1 * Time::MINUTE);
$job->setTtr(20 * Time::SECOND);
$job->setBody([
   'uid' => 10829378,
  'created' => 1498657365,
]);


$delayQueue = new DelayQueue('http://127.0.0.1:9277');
// 处理Job的类名
$className = 'Demo\\Handler\\OrderHandler';
try {
    // 添加一个Job到队列
    $delayQueue->push($className, $job);

    // 从队列中删除Job
    $delayQueue->delete('15702398321');
} catch (Exception $exception) {
    echo $exception->getMessage();
}

运行Worker处理Job

单个Worker

php vendor/bin/delayqueue-php -c /path/to/config.ini

PHP实现多个Worker

#!/usr/bin/env php
<?php

$workerNum = 10;
$command = '/path/to/vendor/bin/delayqueue-php';
$args = ['-c', '/path/to/config.ini'];
for ($i = 0; $i < $workerNum; $i++) {
    $pid = pcntl_fork();
    if ($pid < 0) {
        // fork失败
    } else if ($pid === 0) {
        // 子进程
        pcntl_exec($command, $args);
        break;
    }
}

Supervisor配置多个Worker

[program:delayqueue-php]
command=/path/to/vendor/bin/delayqueue-php -c /path/to/config.ini
process_name=%(program_name)s_%(process_num)s
numprocs = 10
numprocs_start = 1
autostart=true                           
autorestart=true                        
startretries=3                       
redirect_stderr = true
stdout_logfile=/var/log/supervisor/delayqueue-php/out.log

创建Job处理类

<?php

namespace Demo\Handler;

use DelayQueue\Handler\AbstractHandler;

// 必须继承DelayQueue\Handler\AbstractHandler, 并实现方法perform
class OrderHandler extends AbstractHandler
{
    protected function perform()
    {
        // 未抛出异常视为成功, Job将会被删除
        // Job唯一标识
        // $this->id;
        
        // Job自定义内容
        // $this->body;
    }
}