Skip to content

Commit

Permalink
Ensure job processing order using priority queues (#4)
Browse files Browse the repository at this point in the history
* Ensure job processing order using priority queues
  • Loading branch information
joskfg authored Jan 31, 2019
1 parent 01dd179 commit ceea5ba
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 11 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,13 @@ php artisan migrate
```
* Run a worker to actually send the events
```bash
php artisan queue:work database --queue=domainEvents
php artisan queue:work database --queue=retryDomainEvent,domainEvents
```

The job table is needed because to ensure that a job is dispatched after an action, we need to do a transaction, so the *job must use the database driver*.

There are two queues so the library is able to retry a job without losing order in the jobs.

#### Database middleware

This middleware just store the events in a table in database. It can be useful if you want to expose the events as a REST endpoint or check your events history.
Expand Down
53 changes: 48 additions & 5 deletions src/Jobs/SendDomainEvents.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Softonic\TransactionalEventPublisher\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Bus\Dispatcher;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
Expand All @@ -20,28 +21,70 @@ class SendDomainEvents implements ShouldQueue
*/
private $eventMessage;

/**
* @var integer $retry
*/
private $retry;

/**
* @var Dispatcher $dispatcher
*/
private $dispatcher;

/**
* Create a new job instance.
*
* @param EventMessageContract $eventMessage
*/
public function __construct(EventMessageContract $eventMessage)
public function __construct(EventMessageContract $eventMessage, $retry = 0)
{
$this->eventMessage = $eventMessage;
$this->retry = $retry;

$this->onConnection('database')
->onQueue('domainEvents');
}

/**
* Execute the job.
*
*/
public function handle(AmqpMiddleware $amqpMiddleware, LoggerInterface $logger)
public function handle(AmqpMiddleware $amqpMiddleware, Dispatcher $dispatcher, LoggerInterface $logger)
{
if (!$amqpMiddleware->store($this->eventMessage)) {
$this->dispatcher = $dispatcher;
$this->amqpMiddleware = $amqpMiddleware;
$this->logger = $logger;

try {
$this->sendEvent();
} catch (\Exception $e) {
$this->waitExponentialBackOff();
$this->retry();
}
}

protected function sendEvent(): void
{
if (!$this->amqpMiddleware->store($this->eventMessage)) {
$errorMessage = "The event could't be sent. Retrying message: " . json_encode($this->eventMessage);
$logger->alert($errorMessage);
$this->logger->alert($errorMessage);

throw new \RuntimeException($errorMessage);
}
}

protected function waitExponentialBackOff(): void
{
$timeToWait = $this->retry < 18
? pow(++$this->retry, 2)
: pow($this->retry, 2);
sleep($timeToWait);
}

protected function retry(): void
{
$job = (new static($this->eventMessage, $this->retry))
->onQueue('retryDomainEvent');

$this->dispatcher->dispatch($job);
}
}
59 changes: 54 additions & 5 deletions tests/Jobs/SendDomainEventsTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,28 @@

namespace Softonic\TransactionalEventPublisher\Jobs;

use Illuminate\Contracts\Bus\Dispatcher;
use Mockery;
use Psr\Log\LoggerInterface;
use Softonic\TransactionalEventPublisher\Contracts\EventMessageContract;
use Softonic\TransactionalEventPublisher\EventStoreMiddlewares\AmqpMiddleware;
use Softonic\TransactionalEventPublisher\TestCase;

function sleep($time) {
SendDomainEventsTest::$functions->sleep($time);
}

class SendDomainEventsTest extends TestCase
{
public static $functions;

protected function setUp()
{
parent::setUp();

self::$functions = Mockery::mock();
}

/**
* @test
*/
Expand All @@ -26,12 +40,43 @@ public function whenMessageIsSendItShouldResumeTheJob()
->with($message)
->andReturn(true);

$dispatcher = Mockery::mock(Dispatcher::class);
$dispatcher->shouldNotReceive('dispatch');

$logger = \Mockery::mock(LoggerInterface::class);
$logger->shouldNotReceive('alert');

$sendDomainEvents = new SendDomainEvents($message);
$sendDomainEvents->handle($amqpMiddleware, $dispatcher, $logger);
}

/**
* @test
*/
public function whenMessageIsSendWithExponentialRetryItShouldResumeTheJobWaitingASpecificTime()
{
$message = Mockery::mock(EventMessageContract::class);
$message->shouldReceive('jsonSerialize')
->andReturn('message');

$amqpMiddleware = Mockery::mock(AmqpMiddleware::class);

$amqpMiddleware->shouldReceive('store')
->once()
->with($message)
->andReturn(true);

$dispatcher = Mockery::mock(Dispatcher::class);
$dispatcher->shouldNotReceive('dispatch');

$logger = \Mockery::mock(LoggerInterface::class);
$logger->shouldReceive('alert')
->never();

self::$functions->shouldNotReceive('sleep');

$sendDomainEvents = new SendDomainEvents($message);
$sendDomainEvents->handle($amqpMiddleware, $logger);
$sendDomainEvents->handle($amqpMiddleware, $dispatcher, $logger);
}

/**
Expand All @@ -51,15 +96,19 @@ public function whenMessageCannotBeSendItShouldTryAgainLater()
->with($message)
->andReturn(false);

$this->expectException(\RuntimeException::class);
$this->expectExceptionMessage($warningMessage);
$dispatcher = Mockery::mock(Dispatcher::class);
$dispatcher->shouldReceive('dispatch')
->once();

$logger = \Mockery::mock(LoggerInterface::class);
$logger->shouldReceive('alert')
->once()
->with($warningMessage);

$sendDomainEvents = new SendDomainEvents($message, $logger);
$sendDomainEvents->handle($amqpMiddleware, $logger);
$sendDomainEvents = new SendDomainEvents($message, 2);

self::$functions->shouldReceive('sleep')->with(9)->once();

$sendDomainEvents->handle($amqpMiddleware, $dispatcher, $logger);
}
}

0 comments on commit ceea5ba

Please sign in to comment.