From afde926a729868f5d6f64f378836123c3b247dea Mon Sep 17 00:00:00 2001 From: Cees-Jan Kiewiet Date: Fri, 23 Mar 2018 22:26:12 +0100 Subject: [PATCH] Handle child process unexpectedly dying, emit exceptions when that happens, and also when the connection with the child process gets cut off. Run overflow example on CI's to ensure that behavior Closes #14 --- .travis.yml | 1 + appveyor.yml | 1 + examples/ExamplesChildProcess.php | 8 ++++++ examples/overflow/overflow.php | 25 +++++++++++++++++++ ...ationWithProcessUnexpectedEndException.php | 13 ++++++++++ src/Factory.php | 17 ++++++++++++- src/Messenger.php | 21 ++++++++++++++++ src/OutstandingCalls.php | 8 ++++++ src/ProcessUnexpectedEndException.php | 16 ++++++++++++ tests/MessengerTest.php | 20 +++++++++++++++ tests/OutstandingCallsTest.php | 1 + 11 files changed, 130 insertions(+), 1 deletion(-) create mode 100644 examples/overflow/overflow.php create mode 100644 src/CommunicationWithProcessUnexpectedEndException.php create mode 100644 src/ProcessUnexpectedEndException.php diff --git a/.travis.yml b/.travis.yml index 43bf750..6cc0851 100644 --- a/.travis.yml +++ b/.travis.yml @@ -80,6 +80,7 @@ script: - if [ "$qaExtended" = "true" ]; then make ci-extended; fi; - php benchmark/memory.php - php examples/error/error.php + - php examples/overflow/overflow.php ## Gather coverage and set it to coverage servers after_script: if [ "$qaExtended" = "true" ]; then make ci-coverage; fi; diff --git a/appveyor.yml b/appveyor.yml index a9d63a8..5275776 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -74,3 +74,4 @@ test_script: - vendor/bin/phpunit -c phpunit.xml.dist - php benchmark/memory.php - php example/error/error.php + - php example/overflow/overflow.php diff --git a/examples/ExamplesChildProcess.php b/examples/ExamplesChildProcess.php index 1c28281..62fc3c1 100644 --- a/examples/ExamplesChildProcess.php +++ b/examples/ExamplesChildProcess.php @@ -29,6 +29,14 @@ public static function create(Messenger $messenger, LoopInterface $loop) 'formattedTime' => (new DateTime('@' . $payload['unixTime']))->format('c'), ]); }); + $messenger->registerRpc('overflow', function () { + ini_set('memory_limit', '20M'); + + $string = ''; + while (true) { + $string .= '0123456789'; + } + }); } public static function isPrime($n) diff --git a/examples/overflow/overflow.php b/examples/overflow/overflow.php new file mode 100644 index 0000000..1d60b82 --- /dev/null +++ b/examples/overflow/overflow.php @@ -0,0 +1,25 @@ +then(function (Messenger $messenger) { + return $messenger->rpc( + MessageFactory::rpc('overflow') + )->always(function () use ($messenger) { + $messenger->softTerminate(); + }); +})->done(function (Payload $result) { + throw new Exception('Should never reach this!'); +}, function ($et) { + echo (string)$et; +}); + +$loop->run(); diff --git a/src/CommunicationWithProcessUnexpectedEndException.php b/src/CommunicationWithProcessUnexpectedEndException.php new file mode 100644 index 0000000..18e34df --- /dev/null +++ b/src/CommunicationWithProcessUnexpectedEndException.php @@ -0,0 +1,13 @@ +on( 'connection', function (ConnectionInterface $connection) use ($server, $resolve, $reject, $options) { @@ -162,6 +162,21 @@ function (ConnectionInterface $connection) use ($server, $resolve, $reject, $opt }); $process->start($loop); + }))->then(function (Messenger $messenger) use ($loop, $process) { + $loop->addPeriodicTimer(self::INTERVAL, function ($timer) use ($messenger, $loop, $process) { + if (!$process->isRunning()) { + $loop->cancelTimer($timer); + + $exitCode = $process->getExitCode(); + if ($exitCode === 0) { + return; + } + + $messenger->crashed($exitCode); + } + }); + + return $messenger; }); } } diff --git a/src/Messenger.php b/src/Messenger.php index a0a06d6..33184b4 100644 --- a/src/Messenger.php +++ b/src/Messenger.php @@ -69,6 +69,18 @@ public function __construct( $this->emit('data', [$data]); $this->handleData(); }); + $this->connection->on('close', function () { + $calls = $this->outstandingRpcCalls->getCalls(); + if (count($calls) === 0) { + return; + } + $error = new CommunicationWithProcessUnexpectedEndException(); + $this->emit('error', [$error, $this]); + /** @var OutstandingCall $call */ + foreach ($calls as $call) { + $call->reject($error); + } + }); } /** @@ -184,6 +196,15 @@ public function write($line) $this->connection->write($line); } + /** + * @param int|null $exitCode + * @internal + */ + public function crashed($exitCode) + { + $this->emit('error', [new ProcessUnexpectedEndException($exitCode), $this]); + } + private function handleData() { if (strpos($this->buffer, LineInterface::EOL) === false) { diff --git a/src/OutstandingCalls.php b/src/OutstandingCalls.php index d49d97c..1df94e7 100644 --- a/src/OutstandingCalls.php +++ b/src/OutstandingCalls.php @@ -34,6 +34,14 @@ public function getCall($uniqid) return $this->calls[$uniqid]; } + /** + * @return array + */ + public function getCalls() + { + return array_values($this->calls); + } + /** * @return string */ diff --git a/src/ProcessUnexpectedEndException.php b/src/ProcessUnexpectedEndException.php new file mode 100644 index 0000000..4054b80 --- /dev/null +++ b/src/ProcessUnexpectedEndException.php @@ -0,0 +1,16 @@ +prophesize('React\Socket\ConnectionInterface'); $connection->on('data', Argument::type('callable'))->shouldBeCalled(); + $connection->on('close', Argument::type('callable'))->shouldBeCalled(); $messenger = new Messenger($connection->reveal()); $payload = [ @@ -42,6 +44,7 @@ public function testMessage() { $connection = $this->prophesize('React\Socket\ConnectionInterface'); $connection->on('data', Argument::type('callable'))->shouldBeCalled(); + $connection->on('close', Argument::type('callable'))->shouldBeCalled(); $connection->write(Argument::type('string'))->shouldBeCalled(); $messenger = new Messenger($connection->reveal()); @@ -55,6 +58,7 @@ public function testError() { $connection = $this->prophesize('React\Socket\ConnectionInterface'); $connection->on('data', Argument::type('callable'))->shouldBeCalled(); + $connection->on('close', Argument::type('callable'))->shouldBeCalled(); $connection->write(Argument::type('string'))->shouldBeCalled(); $messenger = new Messenger($connection->reveal()); @@ -68,6 +72,7 @@ public function testRpc() { $connection = $this->prophesize('React\Socket\ConnectionInterface'); $connection->on('data', Argument::type('callable'))->shouldBeCalled(); + $connection->on('close', Argument::type('callable'))->shouldBeCalled(); $connection->write(Argument::type('string'))->shouldBeCalled(); $messenger = new Messenger($connection->reveal()); @@ -91,4 +96,19 @@ public function testEmitOnData() $this->assertTrue($cbCalled); } + + public function testCrashed() + { + $this->setExpectedException('WyriHaximus\React\ChildProcess\Messenger\ProcessUnexpectedEndException'); + + $loop = Factory::create(); + $connection = new ConnectionStub(); + + $messenger = new Messenger($connection); + $loop->futureTick(function () use ($messenger) { + $messenger->crashed(123); + }); + + throw \Clue\React\Block\await(\React\Promise\Stream\first($messenger, 'error'), $loop); + } } diff --git a/tests/OutstandingCallsTest.php b/tests/OutstandingCallsTest.php index 9170523..516ef74 100644 --- a/tests/OutstandingCallsTest.php +++ b/tests/OutstandingCallsTest.php @@ -12,6 +12,7 @@ public function testBasic() $oc = new OutstandingCalls(); $call = $oc->newCall(); $this->assertInstanceOf('WyriHaximus\React\ChildProcess\Messenger\OutstandingCall', $call); + $this->assertEquals([$call], $oc->getCalls()); $this->assertEquals($call, $oc->getCall($call->getUniqid())); } }