Skip to content

Commit

Permalink
first version (#32)
Browse files Browse the repository at this point in the history
* first version

* fixing php cs

* fixing comments

* updating README

---------

Co-authored-by: Juan Lotito <[email protected]>
  • Loading branch information
Lotykun and Lotykun-Softonic authored Aug 11, 2023
1 parent f221222 commit e657934
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 422 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ You just need to create a job that will run indefinitely with the command `php a

#### Sending all the events stored in database

Sometimes you will need to send all the events stored in the database. To do it, you can run the previous command with the option `--allEvents`.
You also have the option to use a [MySQL unbuffered connection](https://dev.mysql.com/doc/apis-php/en/apis-php-mysqlinfo.concepts.buffering.html) with the option `--dbConnection` to retrieve a large amount of events without consuming all the memory.
Unbuffered connection example from `config/database.php`
```php
Expand Down
28 changes: 0 additions & 28 deletions database/factories/DomainEventsCursorFactory.php

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

use Illuminate\Database\Migrations\Migration;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Facades\Schema;

return new class extends Migration {
/**
* Run the migrations.
*/
public function up(): void
{
Schema::dropIfExists('domain_events_cursor');
}

/**
* Reverse the migrations.
*/
public function down()
{
Schema::create('domain_events_cursor', static function (Blueprint $table) {
$table->unsignedInteger('last_id')
->comment('ID from the last domain event emitted')
->primary();
});
}
};
63 changes: 4 additions & 59 deletions src/Console/Commands/EmitEvents.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
use RuntimeException;
use Softonic\TransactionalEventPublisher\Contracts\EventStoreMiddlewareContract;
use Softonic\TransactionalEventPublisher\Models\DomainEvent;
use Softonic\TransactionalEventPublisher\Models\DomainEventsCursor;

class EmitEvents extends Command
{
Expand All @@ -32,15 +31,12 @@ class EmitEvents extends Command

protected $signature = 'event-sourcing:emit
{--dbConnection=mysql : Indicate the database connection to use (MySQL unbuffered for better performance when large amount of events)}
{--batchSize=100 : Indicate the amount of events to be sent per publish. Increase for higher throughput}
{--allEvents : Option to send all the events from the beginning by resetting the cursor}';
{--batchSize=100 : Indicate the amount of events to be sent per publish. Increase for higher throughput}';

protected $description = 'Continuously emits domain events in batches';

public EventStoreMiddlewareContract $eventPublisherMiddleware;

public DomainEventsCursor $cursor;

public string $databaseConnection;

public int $batchSize;
Expand All @@ -57,31 +53,10 @@ public function handle(EventStoreMiddlewareContract $eventPublisherMiddleware):

$this->databaseConnection = $this->option('dbConnection');
$this->batchSize = (int)$this->option('batchSize');
$resetCursor = $this->option('allEvents');

$this->cursor = $this->getInitialCursor($resetCursor);

$this->sendBatches();
}

private function getInitialCursor(bool $resetCursor): DomainEventsCursor
{
$cursor = DomainEventsCursor::first();

if (empty($cursor)) {
$cursor = new DomainEventsCursor(['last_id' => 0]);
$cursor->save();

return $cursor;
}

if ($resetCursor) {
$cursor->update(['last_id' => 0]);
}

return $cursor;
}

protected function sendBatches(): void
{
while (true) {
Expand All @@ -92,10 +67,9 @@ protected function sendBatches(): void
public function sendBatch(): void
{
$this->eventsProcessed = false;
$lastId = $this->cursor->last_id;

try {
$events = DomainEvent::on($this->databaseConnection)->where('id', '>', $lastId)->cursor();
$events = DomainEvent::on($this->databaseConnection)->cursor();
} catch (Exception $e) {
$this->waitExponentialBackOffForErrors();

Expand Down Expand Up @@ -123,49 +97,20 @@ private function sendEvents(LazyCollection $events): void
$lastId = $events->max('id');
$eventMessagesCount = count($eventMessages);

if ($eventMessagesCount !== $this->batchSize) {
$this->checkCursorConsistencyWithEvents($eventMessagesCount, $lastId);
}

if (!$this->eventPublisherMiddleware->store(...$eventMessages)) {
$errorMessage = "The events couldn't be sent. Retrying...";
Log::alert($errorMessage, ['eventMessages' => $eventMessages->toArray()]);

throw new RuntimeException($errorMessage);
}

try {
$this->cursor->update(['last_id' => $lastId]);
} catch (Exception $e) {
$this->cursor->discardChanges();

throw $e;
}

Log::info("Published {$eventMessagesCount} events, last event ID published: {$lastId}");

$this->eventsProcessed = true;
$this->attemptForErrors = $this->attemptForNoEvents = 1;
}

protected function checkCursorConsistencyWithEvents(int $eventMessagesCount, int $lastId): void
{
$previousLastId = $this->cursor->last_id;

if (!$this->isCursorConsistentWithMessages($previousLastId, $eventMessagesCount, $lastId)) {
$errorMessage = 'Mismatch in the events to send. Retrying...';
Log::warning(
$errorMessage,
compact('previousLastId', 'eventMessagesCount', 'lastId')
);

throw new RuntimeException($errorMessage);
}
}

protected function isCursorConsistentWithMessages(int $previousLastId, int $eventMessagesCount, int $lastId): bool
{
return $previousLastId + $eventMessagesCount === $lastId;
$events->each->delete();
Log::debug("Deleted {$eventMessagesCount} events, last event ID deleted: {$lastId}");
}

private function waitExponentialBackOffForErrors(): void
Expand Down
54 changes: 0 additions & 54 deletions src/Models/DomainEventsCursor.php

This file was deleted.

Loading

0 comments on commit e657934

Please sign in to comment.