diff --git a/.travis.yml b/.travis.yml index 43bf750..6cc0851 100644 --- a/.travis.yml +++ b/.travis.yml @@ -80,6 +80,7 @@ script: - if [ "$qaExtended" = "true" ]; then make ci-extended; fi; - php benchmark/memory.php - php examples/error/error.php + - php examples/overflow/overflow.php ## Gather coverage and set it to coverage servers after_script: if [ "$qaExtended" = "true" ]; then make ci-coverage; fi; diff --git a/appveyor.yml b/appveyor.yml index a9d63a8..5275776 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -74,3 +74,4 @@ test_script: - vendor/bin/phpunit -c phpunit.xml.dist - php benchmark/memory.php - php example/error/error.php + - php example/overflow/overflow.php diff --git a/examples/ExamplesChildProcess.php b/examples/ExamplesChildProcess.php index 1c28281..62fc3c1 100644 --- a/examples/ExamplesChildProcess.php +++ b/examples/ExamplesChildProcess.php @@ -29,6 +29,14 @@ public static function create(Messenger $messenger, LoopInterface $loop) 'formattedTime' => (new DateTime('@' . $payload['unixTime']))->format('c'), ]); }); + $messenger->registerRpc('overflow', function () { + ini_set('memory_limit', '20M'); + + $string = ''; + while (true) { + $string .= '0123456789'; + } + }); } public static function isPrime($n) diff --git a/examples/overflow/overflow.php b/examples/overflow/overflow.php new file mode 100644 index 0000000..1d60b82 --- /dev/null +++ b/examples/overflow/overflow.php @@ -0,0 +1,25 @@ +then(function (Messenger $messenger) { + return $messenger->rpc( + MessageFactory::rpc('overflow') + )->always(function () use ($messenger) { + $messenger->softTerminate(); + }); +})->done(function (Payload $result) { + throw new Exception('Should never reach this!'); +}, function ($et) { + echo (string)$et; +}); + +$loop->run(); diff --git a/src/CommunicationWithProcessUnexpectedEndException.php b/src/CommunicationWithProcessUnexpectedEndException.php new file mode 100644 index 0000000..18e34df --- /dev/null +++ b/src/CommunicationWithProcessUnexpectedEndException.php @@ -0,0 +1,13 @@ +on( 'connection', function (ConnectionInterface $connection) use ($server, $resolve, $reject, $options) { @@ -162,6 +162,21 @@ function (ConnectionInterface $connection) use ($server, $resolve, $reject, $opt }); $process->start($loop); + }))->then(function (Messenger $messenger) use ($loop, $process) { + $loop->addPeriodicTimer(self::INTERVAL, function ($timer) use ($messenger, $loop, $process) { + if (!$process->isRunning()) { + $loop->cancelTimer($timer); + + $exitCode = $process->getExitCode(); + if ($exitCode === 0) { + return; + } + + $messenger->crashed($exitCode); + } + }); + + return $messenger; }); } } diff --git a/src/Messenger.php b/src/Messenger.php index a0a06d6..33184b4 100644 --- a/src/Messenger.php +++ b/src/Messenger.php @@ -69,6 +69,18 @@ public function __construct( $this->emit('data', [$data]); $this->handleData(); }); + $this->connection->on('close', function () { + $calls = $this->outstandingRpcCalls->getCalls(); + if (count($calls) === 0) { + return; + } + $error = new CommunicationWithProcessUnexpectedEndException(); + $this->emit('error', [$error, $this]); + /** @var OutstandingCall $call */ + foreach ($calls as $call) { + $call->reject($error); + } + }); } /** @@ -184,6 +196,15 @@ public function write($line) $this->connection->write($line); } + /** + * @param int|null $exitCode + * @internal + */ + public function crashed($exitCode) + { + $this->emit('error', [new ProcessUnexpectedEndException($exitCode), $this]); + } + private function handleData() { if (strpos($this->buffer, LineInterface::EOL) === false) { diff --git a/src/OutstandingCalls.php b/src/OutstandingCalls.php index d49d97c..1df94e7 100644 --- a/src/OutstandingCalls.php +++ b/src/OutstandingCalls.php @@ -34,6 +34,14 @@ public function getCall($uniqid) return $this->calls[$uniqid]; } + /** + * @return array + */ + public function getCalls() + { + return array_values($this->calls); + } + /** * @return string */ diff --git a/src/ProcessUnexpectedEndException.php b/src/ProcessUnexpectedEndException.php new file mode 100644 index 0000000..4054b80 --- /dev/null +++ b/src/ProcessUnexpectedEndException.php @@ -0,0 +1,16 @@ +prophesize('React\Socket\ConnectionInterface'); $connection->on('data', Argument::type('callable'))->shouldBeCalled(); + $connection->on('close', Argument::type('callable'))->shouldBeCalled(); $messenger = new Messenger($connection->reveal()); $payload = [ @@ -42,6 +44,7 @@ public function testMessage() { $connection = $this->prophesize('React\Socket\ConnectionInterface'); $connection->on('data', Argument::type('callable'))->shouldBeCalled(); + $connection->on('close', Argument::type('callable'))->shouldBeCalled(); $connection->write(Argument::type('string'))->shouldBeCalled(); $messenger = new Messenger($connection->reveal()); @@ -55,6 +58,7 @@ public function testError() { $connection = $this->prophesize('React\Socket\ConnectionInterface'); $connection->on('data', Argument::type('callable'))->shouldBeCalled(); + $connection->on('close', Argument::type('callable'))->shouldBeCalled(); $connection->write(Argument::type('string'))->shouldBeCalled(); $messenger = new Messenger($connection->reveal()); @@ -68,6 +72,7 @@ public function testRpc() { $connection = $this->prophesize('React\Socket\ConnectionInterface'); $connection->on('data', Argument::type('callable'))->shouldBeCalled(); + $connection->on('close', Argument::type('callable'))->shouldBeCalled(); $connection->write(Argument::type('string'))->shouldBeCalled(); $messenger = new Messenger($connection->reveal()); @@ -91,4 +96,19 @@ public function testEmitOnData() $this->assertTrue($cbCalled); } + + public function testCrashed() + { + $this->setExpectedException('WyriHaximus\React\ChildProcess\Messenger\ProcessUnexpectedEndException'); + + $loop = Factory::create(); + $connection = new ConnectionStub(); + + $messenger = new Messenger($connection); + $loop->futureTick(function () use ($messenger) { + $messenger->crashed(123); + }); + + throw \Clue\React\Block\await(\React\Promise\Stream\first($messenger, 'error'), $loop); + } } diff --git a/tests/OutstandingCallsTest.php b/tests/OutstandingCallsTest.php index 9170523..516ef74 100644 --- a/tests/OutstandingCallsTest.php +++ b/tests/OutstandingCallsTest.php @@ -12,6 +12,7 @@ public function testBasic() $oc = new OutstandingCalls(); $call = $oc->newCall(); $this->assertInstanceOf('WyriHaximus\React\ChildProcess\Messenger\OutstandingCall', $call); + $this->assertEquals([$call], $oc->getCalls()); $this->assertEquals($call, $oc->getCall($call->getUniqid())); } }