Skip to content

Commit

Permalink
Merge pull request #82 from botuniverse/feature/coroutine-support
Browse files Browse the repository at this point in the history
协程支持(并发布 0.5.1)
  • Loading branch information
crazywhalecc authored Dec 30, 2022
2 parents 76f88a3 + a2f0b75 commit cc96914
Show file tree
Hide file tree
Showing 10 changed files with 202 additions and 30 deletions.
4 changes: 2 additions & 2 deletions src/OneBot/Driver/Coroutine/Adaptive.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ class Adaptive
public static function initWithDriver(Driver $driver)
{
if ($driver->getName() === 'swoole') {
self::$coroutine = SwooleCoroutine::getInstance();
self::$coroutine = SwooleCoroutine::getInstance($driver);
} elseif ($driver->getName() === 'workerman' && PHP_VERSION_ID >= 80100) {
// 只有 PHP >= 8.1 才能使用 Fiber 协程接口
self::$coroutine = FiberCoroutine::getInstance();
self::$coroutine = FiberCoroutine::getInstance($driver);
}
}

Expand Down
31 changes: 27 additions & 4 deletions src/OneBot/Driver/Coroutine/CoroutineInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@

namespace OneBot\Driver\Coroutine;

use OneBot\Driver\Process\ExecutionResult;

interface CoroutineInterface
{
public static function getInstance(...$arg);
/**
* 返回当前协程实现是否可以使用
*/
public static function isAvailable(): bool;

/**
* 创建一个协程并运行
Expand All @@ -18,6 +23,8 @@ public static function getInstance(...$arg);
*/
public function create(callable $callback, ...$args): int;

public function exists(int $cid): bool;

/**
* 挂起当前协程
*
Expand All @@ -28,14 +35,30 @@ public function suspend();
/**
* 根据提供的协程 ID 恢复一个协程继续运行
*
* @param int $cid 协程 ID
* @param mixed $value 要传给 suspend 返回值的内容
* @return mixed
* @param int $cid 协程 ID
* @param mixed $value 要传给 suspend 返回值的内容
* @return false|int
*/
public function resume(int $cid, $value = null);

/**
* 获取当前协程 ID
*/
public function getCid(): int;

/**
* @param float|int $time 协程 sleep
*/
public function sleep($time);

/**
* 协程执行命令行
* @param string $cmd 命令行
*/
public function exec(string $cmd): ExecutionResult;

/**
* 获取正在运行的协程数量
*/
public function getCount(): int;
}
114 changes: 106 additions & 8 deletions src/OneBot/Driver/Coroutine/FiberCoroutine.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,39 @@

namespace OneBot\Driver\Coroutine;

use OneBot\Driver\Driver;
use OneBot\Driver\Process\ExecutionResult;
use OneBot\Util\Singleton;

class FiberCoroutine implements CoroutineInterface
{
use Singleton;

/** @var \SplStack */
private static $fiber_stacks;
private static ?\SplStack $fiber_stacks = null;

/** @var array<int, \Fiber> */
private static $suspended_fiber_map = [];
private static array $suspended_fiber_map = [];

private Driver $driver;

public function __construct(Driver $driver)
{
$this->driver = $driver;
}

public static function isAvailable(): bool
{
return PHP_VERSION_ID >= 80100;
}

/**
* @throws \Throwable
* @throws \RuntimeException
*/
public function create(callable $callback, ...$args): int
{
if (PHP_VERSION_ID < 80100) {
throw new \Exception('You need PHP >= 8.1 to enable Fiber feature!');
throw new \RuntimeException('You need PHP >= 8.1 to enable Fiber feature!');
}
$fiber = new \Fiber($callback);

Expand All @@ -37,38 +54,119 @@ public function create(callable $callback, ...$args): int
return $id;
}

public function exists(int $cid): bool
{
return isset(self::$suspended_fiber_map[$cid]);
}

/**
* @throws \Throwable
* @throws \RuntimeException
*/
public function suspend()
{
if (PHP_VERSION_ID < 80100) {
throw new \Exception('You need PHP >= 8.1 to enable Fiber feature!');
throw new \RuntimeException('You need PHP >= 8.1 to enable Fiber feature!');
}
return \Fiber::suspend();
}

/**
* @param null|mixed $value
* @throws \RuntimeException
* @throws \Throwable
* @return false|int
*/
public function resume(int $cid, $value = null)
{
if (PHP_VERSION_ID < 80100) {
throw new \Exception('You need PHP >= 8.1 to enable Fiber feature!');
throw new \RuntimeException('You need PHP >= 8.1 to enable Fiber feature!');
}
if (!isset(self::$suspended_fiber_map[$cid])) {
ob_logger()->error('ID ' . $cid . ' Fiber not suspended!');
return;
return false;
}
self::$fiber_stacks->push(self::$suspended_fiber_map[$cid]);
if (self::$suspended_fiber_map[$cid]->isSuspended()) {
echo "(I have been suspended)\n";
} else {
echo "[I am not been suspended]\n";
}
debug_print_backtrace();
self::$suspended_fiber_map[$cid]->resume($value);
self::$fiber_stacks->pop();
if (self::$suspended_fiber_map[$cid]->isTerminated()) {
unset(self::$suspended_fiber_map[$cid]);
}
return $cid;
}

public function getCid(): int
{
try {
$v = self::$fiber_stacks->pop();
self::$fiber_stacks->push($v);
} catch (\RuntimeException $e) {
return -1;
}
return spl_object_id($v);
}

/**
* @param mixed $time
* @throws \Throwable
* @throws \RuntimeException
*/
public function sleep($time)
{
if (($cid = $this->getCid()) !== -1) {
$this->driver->getEventLoop()->addTimer($time * 1000, function () use ($cid) {
$this->resume($cid);
});
$this->suspend();
return;
}

usleep($time * 1000 * 1000);
}

/**
* @throws \Throwable
* @throws \RuntimeException
*/
public function exec(string $cmd): ExecutionResult
{
if (($cid = $this->getCid()) !== -1) {
$descriptorspec = [
0 => ['pipe', 'r'], // 标准输入,子进程从此管道中读取数据
1 => ['pipe', 'w'], // 标准输出,子进程向此管道中写入数据
2 => STDERR, // 标准错误
];
$res = proc_open($cmd, $descriptorspec, $pipes, getcwd());
if (is_resource($res)) {
$this->driver->getEventLoop()->addReadEvent($pipes[1], function ($x) use ($cid, $res, $pipes) {
$stdout = stream_get_contents($x);
$status = proc_get_status($res);
$this->driver->getEventLoop()->delReadEvent($x);
if ($status['exitcode'] !== -1) {
fclose($x);
fclose($pipes[0]);
$out = new ExecutionResult($status['exitcode'], $stdout);
} else {
$out = new ExecutionResult(-1);
}
$this->resume($cid, $out);
});
return $this->suspend();
}
throw new \RuntimeException('Cannot open process with command ' . $cmd);
}

exec($cmd, $output, $code);
return new ExecutionResult($code, $output);
}

public function getCount(): int
{
return self::$fiber_stacks->count();
}
}
32 changes: 29 additions & 3 deletions src/OneBot/Driver/Coroutine/SwooleCoroutine.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,20 @@

namespace OneBot\Driver\Coroutine;

use OneBot\Driver\Process\ExecutionResult;
use OneBot\Util\Singleton;
use Swoole\Coroutine;

class SwooleCoroutine implements CoroutineInterface
{
use Singleton;

private static $resume_values = [];
private static array $resume_values = [];

public static function isAvailable(): bool
{
return extension_loaded('swoole') || extension_loaded('openswoole');
}

public function create(callable $callback, ...$args): int
{
Expand All @@ -20,7 +26,7 @@ public function create(callable $callback, ...$args): int

public function suspend()
{
Coroutine::yield();
Coroutine::suspend();
if (isset(self::$resume_values[$this->getCid()])) {
$value = self::$resume_values[$this->getCid()];
unset(self::$resume_values[$this->getCid()]);
Expand All @@ -29,19 +35,39 @@ public function suspend()
return null;
}

public function exists(int $cid): bool
{
return Coroutine::exists($cid);
}

public function resume(int $cid, $value = null)
{
if (Coroutine::exists($cid)) {
self::$resume_values[$cid] = $value;
Coroutine::resume($cid);
return $cid;
}
ob_logger()->error('Swoole coroutine #' . $cid . ' not exists');
return false;
}

public function getCid(): int
{
return Coroutine::getCid();
}

public function sleep($time)
{
Coroutine::sleep($time);
}

public function exec(string $cmd): ExecutionResult
{
$result = Coroutine\System::exec($cmd);
return new ExecutionResult($result['code'], $result['output']);
}

public function getCount(): int
{
return Coroutine::stats()['coroutine_num'];
}
}
8 changes: 7 additions & 1 deletion src/OneBot/Driver/Event/EventDispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace OneBot\Driver\Event;

use OneBot\Driver\Coroutine\Adaptive;
use OneBot\Driver\Interfaces\HandledDispatcherInterface;
use OneBot\Exception\ExceptionHandler;

Expand All @@ -13,8 +14,13 @@ class EventDispatcher implements HandledDispatcherInterface
/**
* 分发事件
*/
public function dispatch(object $event): object
public function dispatch(object $event, bool $inside = false): object
{
if (($co = Adaptive::getCoroutine()) !== null && !$inside) {
$co->create([$this, 'dispatch'], $event, true);
return $event;
}
ob_logger()->warning('Dispatching event in fiber: ' . $co->getCid());
foreach (ob_event_provider()->getEventListeners($event->getName()) as $listener) {
try {
// TODO: 允许 Listener 修改 $event
Expand Down
2 changes: 2 additions & 0 deletions src/OneBot/Driver/Swoole/TopEventListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Choir\Http\HttpFactory;
use Choir\WebSocket\FrameInterface;
use OneBot\Driver\Coroutine\Adaptive;
use OneBot\Driver\Event\Http\HttpRequestEvent;
use OneBot\Driver\Event\Process\ManagerStartEvent;
use OneBot\Driver\Event\Process\ManagerStopEvent;
Expand Down Expand Up @@ -38,6 +39,7 @@ public function onWorkerStart(Server $server)
} else {
ProcessManager::initProcess(ONEBOT_PROCESS_WORKER, $server->worker_id);
}
Adaptive::initWithDriver(SwooleDriver::getInstance());
ob_event_dispatcher()->dispatchWithHandler(new WorkerStartEvent());
}

Expand Down
2 changes: 2 additions & 0 deletions src/OneBot/Driver/Workerman/TopEventListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Choir\Http\HttpFactory;
use Choir\WebSocket\FrameFactory;
use Choir\WebSocket\FrameInterface;
use OneBot\Driver\Coroutine\Adaptive;
use OneBot\Driver\Event\Http\HttpRequestEvent;
use OneBot\Driver\Event\Process\WorkerStartEvent;
use OneBot\Driver\Event\Process\WorkerStopEvent;
Expand All @@ -32,6 +33,7 @@ class TopEventListener
public function onWorkerStart(Worker $worker)
{
ProcessManager::initProcess(ONEBOT_PROCESS_WORKER, $worker->id);
Adaptive::initWithDriver(WorkermanDriver::getInstance());
ob_event_dispatcher()->dispatchWithHandler(new WorkerStartEvent());
}

Expand Down
Loading

0 comments on commit cc96914

Please sign in to comment.