Skip to content

Commit

Permalink
Merge pull request #17 from softonic/feature/CAT-2198-Set-events-crea…
Browse files Browse the repository at this point in the history
…tedAt-to-the-time-domain-event-is-sent

Allow to configure middleware to publish the events when using AsyncMiddleware
  • Loading branch information
xaviapa authored Feb 24, 2021
2 parents b1ab29c + b1b72ab commit b0de298
Show file tree
Hide file tree
Showing 16 changed files with 193 additions and 146 deletions.
4 changes: 1 addition & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ sudo: false

matrix:
include:
- php: 7.2
env: COLLECT_COVERAGE=true VALIDATE_CODING_STYLE=true
- php: 7.3
- php: 7.4
env: COLLECT_COVERAGE=true VALIDATE_CODING_STYLE=true
- php: master
env: COLLECT_COVERAGE=true VALIDATE_CODING_STYLE=false
Expand Down
15 changes: 12 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,25 @@ php artisan vendor:publish --provider="Softonic\TransactionalEventPublisher\Serv

We provide `Softonic\TransactionalEventPublisher\EventStoreMiddlewares\AmqpMiddleware`,
`Softonic\TransactionalEventPublisher\EventStoreMiddlewares\DatabaseMiddleware`
and `Softonic\TransactionalEventPublisher\EventStoreMiddlewares\AsyncAmqpMiddleware` middlewares to send events.
and `Softonic\TransactionalEventPublisher\EventStoreMiddlewares\AsyncMiddleware` middlewares to send events.

#### Sync AMQP middleware

To use the sync AMQP you just need to configure the AMQP connection using the configuration file or environmental variables.
As you could see, in the configuration you won't be able to define a queue. This is because the library just publishes the message to an exchange and is the events collector responsability to declare the needed queues with the needed bindings.

#### Async AMQP middleware
#### Async middleware

You need to do the Sync AMQP middleware steps and continue with these:
The async middleware is just responsible to delay the action, so you need to configure the real middleware that is who will publish the messages. For example:
```php
/*
|--------------------------------------------------------------------------
| Middleware that publishes the events when using AsyncMiddleware.
|--------------------------------------------------------------------------
*/
'event_publisher_middleware' => \Softonic\TransactionalEventPublisher\EventStoreMiddlewares\AmqpMiddleware::class,
```
If you want to use the AMQP middleware in async, remember to do the Sync AMQP middleware steps and continue with these:

* Create the job table if you don't have it in the project
```bash
Expand Down
7 changes: 4 additions & 3 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,17 @@
"issues": "https://github.com/softonic/laravel-transactional-event-publisher/issues"
},
"require": {
"php": ">=7.3",
"php": ">=7.4",
"softonic/laravel-amqp": "2.1.0",
"laravel/framework": "^7.0 || ^8.0"
},
"require-dev": {
"phpunit/phpunit": "^9.0",
"friendsofphp/php-cs-fixer": "^2.16",
"laravel/legacy-factories": "^1.0.4",
"mockery/mockery": "^1.2",
"orchestra/testbench": "^6.0",
"orchestra/database": "^5.0",
"orchestra/database": "6.x-dev",
"phpunit/phpunit": "^9.0",
"php-mock/php-mock-mockery": "^1.3"
},
"autoload": {
Expand Down
9 changes: 9 additions & 0 deletions config/transactional-event-publisher.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@
*/
'middleware' => \Softonic\TransactionalEventPublisher\EventStoreMiddlewares\AmqpMiddleware::class,

/*
|--------------------------------------------------------------------------
| Middleware that publishes the events when using AsyncMiddleware.
|--------------------------------------------------------------------------
*/
'event_publisher_middleware' => null,

//'event_publisher_middleware' => \Softonic\TransactionalEventPublisher\EventStoreMiddlewares\AmqpMiddleware::class,

/*
|--------------------------------------------------------------------------
| Event Message class.
Expand Down
12 changes: 8 additions & 4 deletions src/Console/Commands/EmitAllEvents.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Softonic\TransactionalEventPublisher\Console\Commands;

use Illuminate\Console\Command;
use Softonic\TransactionalEventPublisher\Contracts\EventStoreMiddlewareContract;
use Softonic\TransactionalEventPublisher\Jobs\SendDomainEvents;
use Softonic\TransactionalEventPublisher\Model\DomainEvent;

Expand Down Expand Up @@ -30,7 +31,7 @@ class EmitAllEvents extends Command
*
* @return mixed
*/
public function handle()
public function handle(EventStoreMiddlewareContract $eventPublisherMiddleware)
{
$queueConnection = $this->argument('queueConnection');
$databaseConnection = $this->option('unbufferedConnection');
Expand All @@ -45,9 +46,12 @@ public function handle()
DomainEvent::on($databaseConnection)->cursor()
->chunk($batchSize)
->each(
function ($domainEvents) use ($queueConnection, $bar, $batchSize) {
SendDomainEvents::dispatch(SendDomainEvents::NO_RETRIES, ...$domainEvents->pluck('message'))
->onConnection($queueConnection);
function ($domainEvents) use ($eventPublisherMiddleware, $queueConnection, $bar, $batchSize) {
SendDomainEvents::dispatch(
$eventPublisherMiddleware,
SendDomainEvents::NO_RETRIES,
...$domainEvents->pluck('message')
)->onConnection($queueConnection);
$bar->advance($batchSize);
}
);
Expand Down
2 changes: 1 addition & 1 deletion src/EventStoreMiddlewares/AmqpMiddleware.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

namespace Softonic\TransactionalEventPublisher\EventStoreMiddlewares;

use Softonic\Amqp\Amqp;
use Exception;
use Psr\Log\LoggerInterface;
use Softonic\Amqp\Amqp;
use Softonic\TransactionalEventPublisher\Contracts\EventMessageContract;
use Softonic\TransactionalEventPublisher\Contracts\EventStoreMiddlewareContract;
use Softonic\TransactionalEventPublisher\Factories\AmqpMessageFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@
use Softonic\TransactionalEventPublisher\Contracts\EventStoreMiddlewareContract;
use Softonic\TransactionalEventPublisher\Jobs\SendDomainEvents;

class AsyncAmqpMiddleware implements EventStoreMiddlewareContract
class AsyncMiddleware implements EventStoreMiddlewareContract
{
/**
* @var Dispatcher
*/
private $dispatcher;
private EventStoreMiddlewareContract $eventPublisherMiddleware;

private Dispatcher $dispatcher;

public function __construct(Dispatcher $dispatcher)
public function __construct(EventStoreMiddlewareContract $eventPublisherMiddleware, Dispatcher $dispatcher)
{
$this->dispatcher = $dispatcher;
$this->eventPublisherMiddleware = $eventPublisherMiddleware;
$this->dispatcher = $dispatcher;
}

/**
Expand All @@ -30,7 +30,7 @@ public function __construct(Dispatcher $dispatcher)
public function store(EventMessageContract ...$messages)
{
try {
$job = new SendDomainEvents(0, ...$messages);
$job = new SendDomainEvents($this->eventPublisherMiddleware, 0, ...$messages);

$this->dispatcher->dispatch($job);

Expand Down
41 changes: 17 additions & 24 deletions src/Jobs/SendDomainEvents.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
use Psr\Log\LoggerInterface;
use RuntimeException;
use Softonic\TransactionalEventPublisher\Contracts\EventMessageContract;
use Softonic\TransactionalEventPublisher\EventStoreMiddlewares\AmqpMiddleware;
use Softonic\TransactionalEventPublisher\Contracts\EventStoreMiddlewareContract;

class SendDomainEvents implements ShouldQueue
{
Expand All @@ -27,36 +27,29 @@ class SendDomainEvents implements ShouldQueue

private int $retry;

private Dispatcher $dispatcher;
private EventStoreMiddlewareContract $eventPublisherMiddleware;

private AmqpMiddleware $amqpMiddleware;
private Dispatcher $dispatcher;

private LoggerInterface $logger;


/**
* Create a new job instance.
*
* @param int $retry
* @param EventMessageContract $eventMessages
*/
public function __construct(int $retry, EventMessageContract ...$eventMessages)
{
$this->eventMessages = $eventMessages;
$this->retry = $retry;
public function __construct(
EventStoreMiddlewareContract $eventPublisherMiddleware,
int $retry,
EventMessageContract ...$eventMessages
) {
$this->eventPublisherMiddleware = $eventPublisherMiddleware;
$this->eventMessages = $eventMessages;
$this->retry = $retry;

$this->onConnection('database')
->onQueue('domainEvents');
}

/**
* Execute the job.
*/
public function handle(AmqpMiddleware $amqpMiddleware, Dispatcher $dispatcher, LoggerInterface $logger): void
public function handle(Dispatcher $dispatcher, LoggerInterface $logger): void
{
$this->dispatcher = $dispatcher;
$this->amqpMiddleware = $amqpMiddleware;
$this->logger = $logger;
$this->dispatcher = $dispatcher;
$this->logger = $logger;

try {
$this->sendEvent();
Expand All @@ -68,8 +61,8 @@ public function handle(AmqpMiddleware $amqpMiddleware, Dispatcher $dispatcher, L

protected function sendEvent(): void
{
if (!$this->amqpMiddleware->store(...$this->eventMessages)) {
$errorMessage = "The event could't be sent. Retrying message: " . json_encode($this->eventMessages);
if (!$this->eventPublisherMiddleware->store(...$this->eventMessages)) {
$errorMessage = "The event couldn't be sent. Retrying message: " . json_encode($this->eventMessages);
$this->logger->alert($errorMessage);

throw new RuntimeException($errorMessage);
Expand All @@ -86,7 +79,7 @@ protected function waitExponentialBackOff(): void

protected function retry(): void
{
$job = (new static($this->retry, ...$this->eventMessages))
$job = (new static($this->eventPublisherMiddleware, $this->retry, ...$this->eventMessages))
->onQueue('retryDomainEvent');

$this->dispatcher->dispatch($job);
Expand Down
20 changes: 19 additions & 1 deletion src/ServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

namespace Softonic\TransactionalEventPublisher;

use Softonic\Amqp\Amqp;
use Illuminate\Contracts\Bus\Dispatcher;
use Illuminate\Log\Logger;
use Illuminate\Support\ServiceProvider as LaravelServiceProvider;
use Softonic\Amqp\Amqp;
use Softonic\TransactionalEventPublisher\Console\Commands\EmitAllEvents;
use Softonic\TransactionalEventPublisher\EventStoreMiddlewares\AmqpMiddleware;
use Softonic\TransactionalEventPublisher\EventStoreMiddlewares\AsyncMiddleware;
use Softonic\TransactionalEventPublisher\Factories\AmqpMessageFactory;
use Softonic\TransactionalEventPublisher\Observers\ModelObserver;

Expand Down Expand Up @@ -51,6 +53,13 @@ public function register()
{
$this->mergeConfigFrom(__DIR__ . '/../config/' . $this->packageName . '.php', $this->packageName);

$this->app->bind(AsyncMiddleware::class, function () {
return new AsyncMiddleware(
resolve(config('transactional-event-publisher.event_publisher_middleware')),
resolve(Dispatcher::class)
);
});

$this->app->bind(AmqpMiddleware::class, function () {
return new AmqpMiddleware(
new AmqpMessageFactory(),
Expand All @@ -77,6 +86,15 @@ public function register()
);
});

$this->app->bindMethod(
'Softonic\TransactionalEventPublisher\Console\Commands\EmitAllEvents@handle',
function ($job) {
return $job->handle(
resolve(config('transactional-event-publisher.event_publisher_middleware'))
);
}
);

$this->commands([EmitAllEvents::class]);
}
}
8 changes: 6 additions & 2 deletions tests/Console/Commands/EmitAllEventsTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Softonic\TransactionalEventPublisher\Tests\Console\Commands;

use Illuminate\Foundation\Testing\DatabaseTransactions;
use Softonic\TransactionalEventPublisher\EventStoreMiddlewares\AmqpMiddleware;
use Softonic\TransactionalEventPublisher\Jobs\SendDomainEvents;
use Softonic\TransactionalEventPublisher\Model\DomainEvent;
use Softonic\TransactionalEventPublisher\TestCase;
Expand All @@ -17,7 +18,10 @@ class EmitAllEventsTest extends TestCase
public function setUp(): void
{
parent::setUp();

$this->loadMigrationsFrom(__DIR__ . '/../../../database/migrations');

config()->set('transactional-event-publisher.event_publisher_middleware', AmqpMiddleware::class);
}

/**
Expand All @@ -30,7 +34,7 @@ public function whenRunCommandItShouldResendAllTheCurrentDomainEvents(): void
$this->app->register('Softonic\TransactionalEventPublisher\ServiceProvider');
$this->artisan('event-sourcing:emit-all')->run();

$this->assertCount(4, $this->dispatchedJobs);
self::assertCount(4, $this->dispatchedJobs);
}

/**
Expand All @@ -43,6 +47,6 @@ public function whenRunCommandWithBatchSizeItShouldResendAllTheCurrentDomainEven
$this->app->register('Softonic\TransactionalEventPublisher\ServiceProvider');
$this->artisan('event-sourcing:emit-all --batchSize=2')->run();

$this->assertCount(2, $this->dispatchedJobs);
self::assertCount(2, $this->dispatchedJobs);
}
}
20 changes: 10 additions & 10 deletions tests/EventStoreMiddlewares/AmqpMiddlewareTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

namespace Softonic\TransactionalEventPublisher\Tests\EventStoreMiddlewares;

use Softonic\Amqp\Amqp;
use Mockery;
use PhpAmqpLib\Message\AMQPMessage;
use Psr\Log\LoggerInterface;
use Softonic\Amqp\Amqp;
use Softonic\TransactionalEventPublisher\Contracts\EventMessageContract;
use Softonic\TransactionalEventPublisher\EventStoreMiddlewares\AmqpMiddleware;
use Softonic\TransactionalEventPublisher\Factories\AmqpMessageFactory;
Expand Down Expand Up @@ -106,7 +106,7 @@ private function getTwoMessages(): array

return [
$this->getOneMessage(),
$message
$message,
];
}

Expand Down Expand Up @@ -136,10 +136,10 @@ public function testWhenStoringAMessageShouldReturnTrue()

public function testWhenStoringMultipleMessagesShouldReturnTrue()
{
$messages = $this->getTwoMessages();
$properties = ['AMQP properties'];
$logger = Mockery::mock(LoggerInterface::class);
$firstAmqpMessage = new AMQPMessage();
$messages = $this->getTwoMessages();
$properties = ['AMQP properties'];
$logger = Mockery::mock(LoggerInterface::class);
$firstAmqpMessage = new AMQPMessage();
$secondAmqpMessage = new AMQPMessage();

$amqpMock = Mockery::mock(Amqp::class);
Expand Down Expand Up @@ -199,12 +199,12 @@ public function testConfigurableRoutingKey()

public function testConfigurableRoutingKeyForMultipleMessages()
{
$messages = $this->getTwoMessages();
$properties = [
$messages = $this->getTwoMessages();
$properties = [
'routing_key_fields' => ['site', 'service', 'eventType', 'modelName'],
];
$logger = Mockery::mock(LoggerInterface::class);
$firstAmqpMessage = new AMQPMessage();
$logger = Mockery::mock(LoggerInterface::class);
$firstAmqpMessage = new AMQPMessage();
$secondAmqpMessage = new AMQPMessage();

$amqpMock = Mockery::mock(Amqp::class);
Expand Down
Loading

0 comments on commit b0de298

Please sign in to comment.