diff --git a/composer.json b/composer.json index c33bb8d..463e2e5 100644 --- a/composer.json +++ b/composer.json @@ -3,8 +3,8 @@ "description": "Async running mode for Nutgram", "type": "library", "require": { - "nutgram/nutgram": ">=3.1.4", - "spatie/fork": "^1.1" + "nutgram/nutgram": "^4.0", + "ext-pcntl": "*" }, "license": "MIT", "autoload": { @@ -19,7 +19,7 @@ } ], "require-dev": { - "orchestra/testbench": "^7.5" + "orchestra/testbench": "^7.0|^8.0" }, "extra": { "laravel": { diff --git a/src/ExtendsNutgramProvider.php b/src/ExtendsNutgramProvider.php index 33e9859..37d9d5a 100644 --- a/src/ExtendsNutgramProvider.php +++ b/src/ExtendsNutgramProvider.php @@ -16,8 +16,8 @@ public function register() { $this->app->extend(Nutgram::class, function (Nutgram $bot, Application $app) { if (!$app->runningUnitTests() && $app->runningInConsole()) { - $concurrency = $bot->getConfig()['concurrency'] ?? 2; - $bot->setRunningMode(new ParallelPolling($concurrency)); + $mode = new ParallelPolling($bot->getConfig()?->concurrency ?? 2); + $bot->setRunningMode($mode); } return $bot; diff --git a/src/ParallelPolling.php b/src/ParallelPolling.php index 611c1dd..b84a467 100644 --- a/src/ParallelPolling.php +++ b/src/ParallelPolling.php @@ -4,34 +4,19 @@ use SergiX44\Nutgram\Nutgram; use SergiX44\Nutgram\RunningMode\Polling; -use Spatie\Async\Pool; -use Spatie\Fork\Fork; use Throwable; class ParallelPolling extends Polling { - private Fork $pool; + private ProcessManager $manager; public function __construct(int $concurrency = 2) { - $this->pool = Fork::new()->concurrent($concurrency); + $this->manager = new ProcessManager($concurrency); } - protected function fire(Nutgram $bot, array|null $updates): void + protected function fire(Nutgram $bot, array $updates = []): void { - $tasks = []; - foreach ($updates as $update) { - $tasks[] = static function () use ($bot, $update) { - try { - $bot->processUpdate($update); - } catch (Throwable $e) { - echo "$e\n"; - } finally { - $bot->clearData(); - } - }; - } - - $this->pool->run(...$tasks); + $this->manager->pushUpdates($bot, self::$STDERR, $updates); } } diff --git a/src/ProcessManager.php b/src/ProcessManager.php new file mode 100644 index 0000000..bd652f2 --- /dev/null +++ b/src/ProcessManager.php @@ -0,0 +1,73 @@ +runners[$pid] = $pid; + + // remove the stopped runners + foreach ($this->runners as $pid) { + if (pcntl_waitpid($pid, $status, WNOHANG | WUNTRACED)) { + if (pcntl_wifexited($status)) { + unset($this->runners[$pid]); + } + } + } + } else { + $this->forkRunners($bot, $stderr, $updates); + } + } + + public function forkRunners(Nutgram $bot, mixed $stderr, array $updates) + { + $updateWorkers = []; + foreach ($updates as $update) { + if (count($updateWorkers) >= $this->maxWorkers) { + $pid = pcntl_wait($status); + unset($updateWorkers[$pid]); + } + + $pid = pcntl_fork(); + + if ($pid == -1) { + throw new RuntimeException('Cannot fork!'); + } elseif ($pid) { + $updateWorkers[$pid] = $pid; + } else { + try { + $bot->processUpdate($update); + } catch (Throwable $e) { + fwrite($stderr, "$e\n"); + } finally { + exit(0); + } + } + } + + foreach ($updateWorkers as $pid) { + pcntl_waitpid($pid, $status); + } + exit(0); + } + +} \ No newline at end of file