Home

Awesome

Archer

Php Version Swoole Version Archer License

简介

协程Task弓兵, Swoole人性化组件库之PHP高性能Task队列, 基于Swoole原生协程, 底层提供无额外I/O的高性能解决方案, 让开发者专注于功能开发, 从繁琐的传统Task队列或协程并发旋涡中解放。


<br>

安装

最好的安装方法是通过 Composer 包管理器 :

composer require swlib/archer

或者下载代码,并在autoloader中手动注册Archer:

$loader = include YOUR_BASE_PATH . '/vendor/autoload.php';
$loader->setPsr4('Swlib\\Archer\\', YOUR_PATH . '/src/');
$loader->addClassMap([
    'Swlib\\Archer' => YOUR_PATH . '/src/Archer.php'
]);

依赖


<br>

协程调度

Swoole底层实现协程调度, 业务层无需感知, 开发者可以无感知的用同步的代码编写方式达到异步IO的效果和超高性能,避免了传统异步回调所带来的离散的代码逻辑和陷入多层回调中导致代码无法维护。
Task队列循环与各Task的执行都处于独立的协程中,不会占用用户自己创建的协程。可以将任意协程变为Defer模式,无需手动触发defer()与recv()。
Archer运行于全协程的场景中,禁忌同步阻塞代码的出现,会影响队列的运行。

需要在onRequet, onReceive, onConnect等事件回调函数中使用, 或是使用go关键字包裹 (swoole.use_shortname默认开启).

\Swoole\Runtime::enableCoroutine();
go(function () {
    $callback = function(string $method, ...$param) {
        $redis = new \Redis();
        $redis->connect('127.0.0.1', 6379);
        return $redis->{$method}(...$param);
    };
    $task1 = \Swlib\Archer::taskDefer($callback, ['get', 'some_key']);
    $task2 = \Swlib\Archer::taskDefer($callback, ['hget', 'a', 'b']);
    $task3 = \Swlib\Archer::taskDefer($callback, ['lget', 'k1', 10]);
    var_dump($task1->recv());
    var_dump($task2->recv());
    var_dump($task3->recv());
    
    Archer::taskTimerAfter(1.5, function (string $s1, string $s2) {
        echo "1.5s later:{$s1} {$s2}\n";
    }, ['hello', 'world']);
});

接口

所有模式的Task在执行时所处的协程与原协程不是同一个,所以所有基于Context的变量传递与维护会失效,务必注意这一点。

模式1:Defer模式 (即CSP模型)

获取Task:

/*定义*/ \Swlib\Archer::taskDefer(callable $task_callback, ?array $params = null): \Swlib\Archer\Task\Defer;
$task = \Swlib\Archer::taskDefer($task_callback, ['foo', 'bar']);
返回模式异常处理
返回Task对象若Task抛出了任何异常,Archer会捕获后在执行recv时抛出。

获取执行结果:

/*定义*/ \Swlib\Archer\Task\Defer->recv(?float $timeout = null);
$task->recv(0.5);
返回模式异常处理
若Task已执行完则直接返回结果。否则协程挂起,等待执行完毕后恢复并返回结果。若Task抛出了任何异常,Archer会捕获后会在此处抛出。

模式2:Task集模式

获取容器:

// $max_concurrent表示集内最大并行数量,缺省表示不限制
$container = \Swlib\Archer::getMultiTask(?int $max_concurrent = null);

向队列投递Task并立即返回Task id。

$container->addTask(callable $task_callback, ?array $params = null): int;

两种执行方式:

等待全部结果:等待所有Task全部执行完。返回值为键值对,键为Taskid,值为其对应的返回值
$container->waitForAll(?float $timeout = null): array;
返回模式异常处理
若运行时所有Task已执行完,则会直接以键值对的形式返回所有Task的返回值。否则当前协程挂起。当所有Task执行完成后,会恢复投递的协程,并返回结果。若某个Task抛出了任何异常,不会影响其他Task的执行,但在返回值中不会出现该Task id对应的项,需要通过getError(int $taskid)getErrorMap()方法获取异常对象
先完成先返回:各Task的执行结果会根据其完成的顺序,以键值对的形式yield出来

对于生成器(Generator)的定义:查看

$container->yieldEachOne(?float $timeout = null): \Generator;
返回模式异常处理
若运行时已经有些Task已执行完,则会按执行完毕的顺序将他们先yield出来。若这之后仍存在未执行完的Task,则当前协程将会挂起,每有一个Task执行完,当前协程将恢复且其结果就会以以键值对的方式yield出来,然后协程会挂起等待下一个执行完的Task。若某个Task抛出了任何异常,不会影响其他Task的执行,但这个Task不会被yield出来,需要通过getError(int $taskid)getErrorMap()方法获取异常对象

获取某Task抛出的异常(若Task未抛出异常则返回null)

$container->getError(int $id): ?\Throwable;

获取所有异常Task与他们抛出的异常,返回值为键值对,键为Taskid,值为其抛出的异常

$container->getErrorMap(): array;

模式3:一次性计时器模式

该模式的Task不受队列配置的影响
(该模式与直接使用co::sleep()执行协程代码的区别在于:不直接切换走当前协程;底层经过算法优化,会减少并行sleep()的协程数量,节约内存;可以在执行之前清除掉计时器;运行于不同的协程)

\Swlib\Archer::taskTimerAfter(float $after_time, callable $task_callback, ?array $params = null): int;
返回模式异常处理
返回 TaskidArcher会捕获异常,并产生一个warning

取消执行:

$taskid = \Swlib\Archer::taskTimerAfter(1.5, function() { echo 'aaa'; });
\Swlib\Archer::clearTimerTask($taskid); // 返回true为成功,若已执行则返回false

模式4:持续型计时器模式

该模式的Task不受队列配置的影响
(该模式与直接使用co::sleep()执行协程代码的区别在于:不直接切换走当前协程;底层经过算法优化,会减少并行sleep()的协程数量,节约内存;可以在执行之前清除掉计时器;运行于不同的协程)

\Swlib\Archer::taskTimerTick(float $tick_time, callable $task_callback, ?array $params = null, ?float $first_time_after = null): int;
返回模式异常处理
返回 TaskidArcher会捕获异常,并产生一个warning

取消执行:

$taskid = \Swlib\Archer::taskTimerTick(1.5, function() { echo 'aaa'; });
\Swlib\Archer::clearTimerTask($taskid); // 返回true为成功,若已被清理则返回false

在Task内获取当前的Taskid

\Swlib\Archer\Task::getCurrentTaskId(): ?int;

在Task执行中,调用该方法可以获取当前的Taskid,在其他地方调用会返回null(该方法基于协程uid缓存)

终止事件循环

调用此方法以防止循环使得进程事件循环不能结束

\Swlib\Archer\Queue::stop(): void;
\Swlib\Archer\TimerHeap::stop(): void;

注册一个全局回调函数

swoole>=4.2.9版本推荐在项目使用Context的时候通过Coroutine::defer()注册清理函数,无需在此注册

\Swlib\Archer\Task::registerTaskFinishFunc(callable $func): void;

这里注册的回调函数会在每个Task结束时执行,不论Task是否抛出了异常,不论Task模式,格式如下:

function (int $task_id, $task_return_value, ?\Throwable $e) {
    // $task_id 为\Swlib\Archer::task()或\Swlib\Archer\MultiTask->addTask() 返回的Task id。\Swlib\Archer::taskWait()由于无法获取Taskid,所以可以忽略该项。
    // $task_return_value 为Task闭包 $task_callback 的返回值,若没有返回值或抛出了异常,则该项为null
    // $e为Task闭包 $task_callback 中抛出的异常,正常情况下为null
}

不建议在该方法中执行会引起阻塞或协程切换的操作,因为会影响到Task运行结果的传递效率;也不要在该方法中抛出任何异常,会导致catch不到而使进程退出。
该方法所处的协程与Task所处的协程为同一个,所以可以利用该函数清理执行Task所留下的Context
- Task为伪异步模式时,该方法会在 $finish_callback 之前执行 - Task为协程同步返回模式或集模式时,该方法会在返回或抛出异常给原协程之前调用。

配置

\Swlib\Archer\Queue::setQueueSize(int $size): void;
\Swlib\Archer\Queue::setConcurrent(int $concurrent): void;

异常

Archer会抛出以下几种异常:

例子

*假设所有场景均已处于协程环境之中;场景都是理想化,简易化的;除了例子中使用的闭包,Archer支持所有callable类型

场景:执行某些协程Client(或由Runtime::enableCoroutine()变为协程的传统Client)时,未开启或无法开启Defer特性,但又想使用Defer功能。

$task_redis = \Swlib\Archer::taskDefer(function() {
    $redis = new \Swoole\Coroutine\Redis();
    $redis->connect('127.0.0.1', 6379);
    return $redis->get('key');
});
$task_mysql = \Swlib\Archer::taskDefer(function() {
    $mysql = new \Swoole\Coroutine\MySQL();
    $mysql->connect([
        'host' => '127.0.0.1',
        'user' => 'user',
        'password' => 'pass',
        'database' => 'test',
    ]);
    return $mysql->query('select sleep(1)');
});
$task_http = \Swlib\Archer::taskDefer(function(string $url): string {
    $httpclient = new \Swoole\Coroutine\Http\Client('0.0.0.0', 9599);
    $httpclient->setHeaders(['Host' => "api.mp.qq.com"]);
    $httpclient->set(['timeout' => 1]);
    $httpclient->get('/');
    return $httpclient->body;
}, ['api.mp.qq.com']);
var_dump($task_redis->recv());
var_dump($task_mysql->recv());
var_dump($task_http->recv());

场景:并发20条SQL并一起获取返回值

$container = \Swlib\Archer::getMultiTask();
$task_callback = function(int $id): int {
    $mysql = new Swoole\Coroutine\MySQL();
    $mysql->connect([
        'host' => '127.0.0.1',
        'user' => 'user',
        'password' => 'pass',
        'database' => 'test',
    ]);
    $result = $mysql->query('SELECT COUNT(*) AS `c` FROM `order` WHERE `user`='.id);
    if (empty($result)) return 0;
    return current($result)['c'] ?? 0;
};
$map = [];
$map2 = [];
$results = [];
for ($id=1; $id<=20; ++$id) {// 虽然用 GROUP BY 一条SQL实现,这里只是举个例子
    $taskid = $container->addTask($task_callback, [$id]);
    $map[$taskid] = $id;
    $map2[$id] = $taskid;
}

foreach ($container->waitForAll(10) as $taskid=>$count)
    $results[$map[$taskid]] = $count;
    
for ($id=1; $id<=20; ++$id)
    if (array_key_exists($id, $results))
        echo "id:{$id} count:{$results[$id]}\n";
    else
        echo "id:{$id} error:". $container->getError($map2[$id])->getMessage() ."\n";

场景:并发20条SQL,并将结果发给20个用户,每条运行完就立刻发送。

$container = \Swlib\Archer::getMultiTask();
$task_callback = function(int $id): int {
    $mysql = new Swoole\Coroutine\MySQL();
    $mysql->connect([
        'host' => '127.0.0.1',
        'user' => 'user',
        'password' => 'pass',
        'database' => 'test',
    ]);
    $result = $mysql->query('SELECT COUNT(*) AS `c` FROM `order` WHERE `user`='.id);
    if (empty($result)) return 0;
    return current($result)['c'] ?? 0;
};
$map = [];
for ($id=1; $id<=20; ++$id) {
    $taskid = $container->addTask($task_callback, [$id]);
    $map[$taskid] = $id;
}

foreach ($container->yieldEachOne(10) as $taskid=>$count) {
    $server->send($map[$taskid], $count); // 假设 fd 和 id 取值一样,这只是一个简化的场景例子,正式应用肯定更复杂
    unset($map[$taskid]);
}

foreach ($map as $taskid => $id)
    $server->send($id, 'Error: ' . $container->getError($taskid)->getMessage());

场景:计时器,2.5秒后开始第一次,之后每5秒执行一次,共8次

\Swlib\Archer::taskTimerTick(5, function(int $limit) {
    static $count = 0;
    ++ $count;
    echo "$count\n";
    if ($count >= $limit)
        \Swlib\Archer::clearTimerTask(\Swlib\Archer\Task::getCurrentTaskId());
}, [8], 2.5);

重中之重

欢迎提交issue和PR.