From e657934efde29124069863d963df729207f79b28 Mon Sep 17 00:00:00 2001 From: Lotykun Date: Fri, 11 Aug 2023 13:47:37 +0200 Subject: [PATCH] first version (#32) * first version * fixing php cs * fixing comments * updating README --------- Co-authored-by: Juan Lotito --- README.md | 1 - .../factories/DomainEventsCursorFactory.php | 28 -- ...2703_remove_domain_events_cursor_table.php | 27 ++ src/Console/Commands/EmitEvents.php | 63 +--- src/Models/DomainEventsCursor.php | 54 ---- tests/Console/Commands/EmitEventsTest.php | 298 ++---------------- 6 files changed, 49 insertions(+), 422 deletions(-) delete mode 100644 database/factories/DomainEventsCursorFactory.php create mode 100644 database/migrations/2023_08_11_122703_remove_domain_events_cursor_table.php delete mode 100644 src/Models/DomainEventsCursor.php diff --git a/README.md b/README.md index 010dff4..5ed9be9 100644 --- a/README.md +++ b/README.md @@ -65,7 +65,6 @@ You just need to create a job that will run indefinitely with the command `php a #### Sending all the events stored in database -Sometimes you will need to send all the events stored in the database. To do it, you can run the previous command with the option `--allEvents`. You also have the option to use a [MySQL unbuffered connection](https://dev.mysql.com/doc/apis-php/en/apis-php-mysqlinfo.concepts.buffering.html) with the option `--dbConnection` to retrieve a large amount of events without consuming all the memory. Unbuffered connection example from `config/database.php` ```php diff --git a/database/factories/DomainEventsCursorFactory.php b/database/factories/DomainEventsCursorFactory.php deleted file mode 100644 index dab41b8..0000000 --- a/database/factories/DomainEventsCursorFactory.php +++ /dev/null @@ -1,28 +0,0 @@ - $this->faker->randomNumber(), - ]; - } -} diff --git a/database/migrations/2023_08_11_122703_remove_domain_events_cursor_table.php b/database/migrations/2023_08_11_122703_remove_domain_events_cursor_table.php new file mode 100644 index 0000000..5d55137 --- /dev/null +++ b/database/migrations/2023_08_11_122703_remove_domain_events_cursor_table.php @@ -0,0 +1,27 @@ +unsignedInteger('last_id') + ->comment('ID from the last domain event emitted') + ->primary(); + }); + } +}; diff --git a/src/Console/Commands/EmitEvents.php b/src/Console/Commands/EmitEvents.php index 6f8a44f..92b1745 100644 --- a/src/Console/Commands/EmitEvents.php +++ b/src/Console/Commands/EmitEvents.php @@ -11,7 +11,6 @@ use RuntimeException; use Softonic\TransactionalEventPublisher\Contracts\EventStoreMiddlewareContract; use Softonic\TransactionalEventPublisher\Models\DomainEvent; -use Softonic\TransactionalEventPublisher\Models\DomainEventsCursor; class EmitEvents extends Command { @@ -32,15 +31,12 @@ class EmitEvents extends Command protected $signature = 'event-sourcing:emit {--dbConnection=mysql : Indicate the database connection to use (MySQL unbuffered for better performance when large amount of events)} - {--batchSize=100 : Indicate the amount of events to be sent per publish. Increase for higher throughput} - {--allEvents : Option to send all the events from the beginning by resetting the cursor}'; + {--batchSize=100 : Indicate the amount of events to be sent per publish. Increase for higher throughput}'; protected $description = 'Continuously emits domain events in batches'; public EventStoreMiddlewareContract $eventPublisherMiddleware; - public DomainEventsCursor $cursor; - public string $databaseConnection; public int $batchSize; @@ -57,31 +53,10 @@ public function handle(EventStoreMiddlewareContract $eventPublisherMiddleware): $this->databaseConnection = $this->option('dbConnection'); $this->batchSize = (int)$this->option('batchSize'); - $resetCursor = $this->option('allEvents'); - - $this->cursor = $this->getInitialCursor($resetCursor); $this->sendBatches(); } - private function getInitialCursor(bool $resetCursor): DomainEventsCursor - { - $cursor = DomainEventsCursor::first(); - - if (empty($cursor)) { - $cursor = new DomainEventsCursor(['last_id' => 0]); - $cursor->save(); - - return $cursor; - } - - if ($resetCursor) { - $cursor->update(['last_id' => 0]); - } - - return $cursor; - } - protected function sendBatches(): void { while (true) { @@ -92,10 +67,9 @@ protected function sendBatches(): void public function sendBatch(): void { $this->eventsProcessed = false; - $lastId = $this->cursor->last_id; try { - $events = DomainEvent::on($this->databaseConnection)->where('id', '>', $lastId)->cursor(); + $events = DomainEvent::on($this->databaseConnection)->cursor(); } catch (Exception $e) { $this->waitExponentialBackOffForErrors(); @@ -123,10 +97,6 @@ private function sendEvents(LazyCollection $events): void $lastId = $events->max('id'); $eventMessagesCount = count($eventMessages); - if ($eventMessagesCount !== $this->batchSize) { - $this->checkCursorConsistencyWithEvents($eventMessagesCount, $lastId); - } - if (!$this->eventPublisherMiddleware->store(...$eventMessages)) { $errorMessage = "The events couldn't be sent. Retrying..."; Log::alert($errorMessage, ['eventMessages' => $eventMessages->toArray()]); @@ -134,38 +104,13 @@ private function sendEvents(LazyCollection $events): void throw new RuntimeException($errorMessage); } - try { - $this->cursor->update(['last_id' => $lastId]); - } catch (Exception $e) { - $this->cursor->discardChanges(); - - throw $e; - } - Log::info("Published {$eventMessagesCount} events, last event ID published: {$lastId}"); $this->eventsProcessed = true; $this->attemptForErrors = $this->attemptForNoEvents = 1; - } - - protected function checkCursorConsistencyWithEvents(int $eventMessagesCount, int $lastId): void - { - $previousLastId = $this->cursor->last_id; - if (!$this->isCursorConsistentWithMessages($previousLastId, $eventMessagesCount, $lastId)) { - $errorMessage = 'Mismatch in the events to send. Retrying...'; - Log::warning( - $errorMessage, - compact('previousLastId', 'eventMessagesCount', 'lastId') - ); - - throw new RuntimeException($errorMessage); - } - } - - protected function isCursorConsistentWithMessages(int $previousLastId, int $eventMessagesCount, int $lastId): bool - { - return $previousLastId + $eventMessagesCount === $lastId; + $events->each->delete(); + Log::debug("Deleted {$eventMessagesCount} events, last event ID deleted: {$lastId}"); } private function waitExponentialBackOffForErrors(): void diff --git a/src/Models/DomainEventsCursor.php b/src/Models/DomainEventsCursor.php deleted file mode 100644 index 59d8312..0000000 --- a/src/Models/DomainEventsCursor.php +++ /dev/null @@ -1,54 +0,0 @@ -emitEvents->batchSize = 2; } - /** - * @test - */ - public function whenThereIsNoCursorAndEventsItShouldInitializeItAndCallTheSendBatchesMethod(): void - { - $emitEvents = Mockery::mock(EmitEvents::class)->makePartial(); - $emitEvents->__construct(); - $emitEvents->shouldAllowMockingProtectedMethods(); - $emitEvents->shouldReceive('sendBatches')->once(); - $this->app->instance(EmitEvents::class, $emitEvents); - - $this->artisan('event-sourcing:emit'); - - self::assertDatabaseHas(DomainEventsCursor::class, ['last_id' => 0]); - } - - /** - * @test - */ - public function whenThereIsACursorButNoEventsItShouldCallTheSendBatchesMethod(): void - { - $cursor = DomainEventsCursor::factory()->create(); - - $emitEvents = Mockery::mock(EmitEvents::class)->makePartial(); - $emitEvents->__construct(); - $emitEvents->shouldAllowMockingProtectedMethods(); - $emitEvents->shouldReceive('sendBatches')->once(); - $this->app->instance(EmitEvents::class, $emitEvents); - - $this->artisan('event-sourcing:emit'); - - self::assertDatabaseHas(DomainEventsCursor::class, ['last_id' => $cursor->last_id]); - } - - /** - * @test - */ - public function whenTheAllEventsOptionIsReceivedItShouldResetTheCursorAndCallTheSendBatchesMethod(): void - { - DomainEventsCursor::factory()->create(); - - $emitEvents = Mockery::mock(EmitEvents::class)->makePartial(); - $emitEvents->__construct(); - $emitEvents->shouldAllowMockingProtectedMethods(); - $emitEvents->shouldReceive('sendBatches')->once(); - $this->app->instance(EmitEvents::class, $emitEvents); - - $this->artisan('event-sourcing:emit --allEvents'); - - self::assertDatabaseHas(DomainEventsCursor::class, ['last_id' => 0]); - } - /** * @test */ public function whenSendingABatchButThereAreNoEventsItShouldWaitAndDoNothing(): void { - $cursor = DomainEventsCursor::factory()->create(['last_id' => 0]); - PHPMockery::mock(__NAMESPACE__, 'usleep') ->once() ->with(1000); - $this->emitEvents->cursor = $cursor; $this->emitEvents->sendBatch(); - $this->checkFinalCursor(0); self::assertEquals(1, $this->emitEvents->attemptForErrors); self::assertEquals(2, $this->emitEvents->attemptForNoEvents); } @@ -119,16 +61,12 @@ public function whenSendingABatchButThereAreNoEventsForFifthTimeItShouldWaitAndD { $this->emitEvents->attemptForNoEvents = 5; - $cursor = DomainEventsCursor::factory()->create(['last_id' => 0]); - PHPMockery::mock(__NAMESPACE__, 'usleep') ->once() ->with(16000); - $this->emitEvents->cursor = $cursor; $this->emitEvents->sendBatch(); - $this->checkFinalCursor(0); self::assertEquals(1, $this->emitEvents->attemptForErrors); self::assertEquals(6, $this->emitEvents->attemptForNoEvents); } @@ -136,141 +74,93 @@ public function whenSendingABatchButThereAreNoEventsForFifthTimeItShouldWaitAndD /** * @test */ - public function whenSendingABatchAndThereIsOneEventItShouldPublishIt(): void + public function whenSendingABatchAndThereIsOneEventItShouldPublishItAndDeletedIt(): void { - $cursor = DomainEventsCursor::factory()->create(['last_id' => 0]); $event = DomainEvent::factory()->create(); $this->whenEventsArePublished(fn (...$eventMessages) => $event['message']->toArray() === $eventMessages[0]->toArray()); - - $this->emitEvents->cursor = $cursor; $this->emitEvents->sendBatch(); - $this->checkFinalCursor(1); self::assertEquals(1, $this->emitEvents->attemptForErrors); self::assertEquals(1, $this->emitEvents->attemptForNoEvents); + self::assertDatabaseMissing(DomainEvent::class, ['id' => $event['id']]); } /** * @test */ - public function whenSendingABatchAfterThreeAttemptsWithErrorsItShouldPublishItAndResetAttempts(): void + public function whenSendingABatchAfterThreeAttemptsWithErrorsItShouldPublishItDeleteItAndResetAttempts(): void { $this->emitEvents->attemptForErrors = 4; - $cursor = DomainEventsCursor::factory()->create(['last_id' => 0]); $event = DomainEvent::factory()->create(); $this->whenEventsArePublished(fn (...$eventMessages) => $event['message']->toArray() === $eventMessages[0]->toArray()); - - $this->emitEvents->cursor = $cursor; $this->emitEvents->sendBatch(); - $this->checkFinalCursor(1); self::assertEquals(1, $this->emitEvents->attemptForErrors); self::assertEquals(1, $this->emitEvents->attemptForNoEvents); + self::assertDatabaseMissing(DomainEvent::class, ['id' => $event['id']]); } /** * @test */ - public function whenSendingABatchAfterFiveAttemptsWithNoEventsItShouldPublishItAndResetAttempts(): void + public function whenSendingABatchAfterFiveAttemptsWithNoEventsItShouldPublishItDeleteItAndResetAttempts(): void { $this->emitEvents->attemptForNoEvents = 6; - $cursor = DomainEventsCursor::factory()->create(['last_id' => 0]); $event = DomainEvent::factory()->create(); $this->whenEventsArePublished(fn (...$eventMessages) => $event['message']->toArray() === $eventMessages[0]->toArray()); - - $this->emitEvents->cursor = $cursor; $this->emitEvents->sendBatch(); - $this->checkFinalCursor(1); self::assertEquals(1, $this->emitEvents->attemptForErrors); self::assertEquals(1, $this->emitEvents->attemptForNoEvents); + self::assertDatabaseMissing(DomainEvent::class, ['id' => $event['id']]); } /** * @test */ - public function whenSendingABatchAndThereAreSameEventsThanBatchSizeItShouldPublishThem(): void + public function whenSendingABatchAndThereAreSameEventsThanBatchSizeItShouldPublishThemAndDeleteThem(): void { - $cursor = DomainEventsCursor::factory()->create(['last_id' => 0]); - DomainEvent::factory(2)->create(); + $events = DomainEvent::factory(2)->create(); $this->whenEventsArePublished(fn (...$eventMessages) => count($eventMessages) === 2); - - $this->emitEvents->cursor = $cursor; $this->emitEvents->sendBatch(); - $this->checkFinalCursor(2); self::assertEquals(1, $this->emitEvents->attemptForErrors); self::assertEquals(1, $this->emitEvents->attemptForNoEvents); + $events->each(function ($event) { + self::assertDatabaseMissing(DomainEvent::class, ['id' => $event['id']]); + }); } /** * @test */ - public function whenSendingABatchAndThereAreMoreEventsThanBatchSizeItShouldPublishOnlyTheBatchSizeAmount(): void + public function whenSendingABatchAndThereAreMoreEventsThanBatchSizeItShouldPublishAndDeleteOnlyTheBatchSizeAmount(): void { - $cursor = DomainEventsCursor::factory()->create(['last_id' => 0]); - DomainEvent::factory(3)->create(); + $events = DomainEvent::factory(3)->create(); $this->whenEventsArePublished(fn (...$eventMessages) => count($eventMessages) === 2); $this->whenEventsArePublished(fn (...$eventMessages) => count($eventMessages) === 1); - $this->emitEvents->cursor = $cursor; $this->emitEvents->sendBatch(); - $this->checkFinalCursor(3); self::assertEquals(1, $this->emitEvents->attemptForErrors); self::assertEquals(1, $this->emitEvents->attemptForNoEvents); + $events->each(function ($event) { + self::assertDatabaseMissing(DomainEvent::class, ['id' => $event['id']]); + }); } /** * @test */ - public function whenSendingABatchAndThereAreMorePendingEventsThanBatchSizeAndCursorIsNotAtStartItShouldPublishOnlyTheBatchSizeAmount(): void + public function whenSendingABatchButThereIsAnErrorPublishingTheEventsItShouldLogAnAlertAndWaitAndDoNotDelete(): void { - $cursor = DomainEventsCursor::factory()->create(['last_id' => 2]); - DomainEvent::factory(5)->create(); - - $this->whenEventsArePublished(fn (...$eventMessages) => count($eventMessages) === 2); - $this->whenEventsArePublished(fn (...$eventMessages) => count($eventMessages) === 1); - - $this->emitEvents->cursor = $cursor; - $this->emitEvents->sendBatch(); - - $this->checkFinalCursor(5); - self::assertEquals(1, $this->emitEvents->attemptForErrors); - self::assertEquals(1, $this->emitEvents->attemptForNoEvents); - } - - /** - * @test - */ - public function whenSendingABatchAndThereIsOnePendingEventsThanBatchSizeAndCursorIsNotAtStartItShouldPublishOnlyThatEvent(): void - { - $cursor = DomainEventsCursor::factory()->create(['last_id' => 2]); - DomainEvent::factory(3)->create(); - - $this->whenEventsArePublished(fn (...$eventMessages) => count($eventMessages) === 1); - - $this->emitEvents->cursor = $cursor; - $this->emitEvents->sendBatch(); - - $this->checkFinalCursor(3); - self::assertEquals(1, $this->emitEvents->attemptForErrors); - self::assertEquals(1, $this->emitEvents->attemptForNoEvents); - } - - /** - * @test - */ - public function whenSendingABatchButThereIsAnErrorPublishingTheEventsItShouldLogAnAlertAndWaitAndDoNotChangeCursor(): void - { - $cursor = DomainEventsCursor::factory()->create(['last_id' => 0]); $event = DomainEvent::factory()->create(); $this->eventPublisherMiddleware->shouldReceive('store') @@ -285,157 +175,11 @@ public function whenSendingABatchButThereIsAnErrorPublishingTheEventsItShouldLog ->once() ->with(1); - $this->emitEvents->cursor = $cursor; $this->emitEvents->sendBatch(); - $this->checkFinalCursor(0); self::assertEquals(2, $this->emitEvents->attemptForErrors); self::assertEquals(1, $this->emitEvents->attemptForNoEvents); - } - - /** - * @test - */ - public function whenSendingABatchButTheCursorAndTheNumberOfEventsIsNotConsistentItShouldLogAnErrorAndWaitAndDoNotChangeCursor(): void - { - $cursor = DomainEventsCursor::factory()->create(['last_id' => 0]); - DomainEvent::factory()->create(); - - $this->eventPublisherMiddleware->shouldNotReceive('store'); - - Log::shouldReceive('warning') - ->once() - ->with( - 'Mismatch in the events to send. Retrying...', - [ - 'previousLastId' => 0, - 'eventMessagesCount' => 1, - 'lastId' => 1, - ] - ); - - PHPMockery::mock(__NAMESPACE__, 'sleep') - ->once() - ->with(1); - - $emitEvents = Mockery::mock(EmitEvents::class)->makePartial(); - $emitEvents->__construct(); - $emitEvents->shouldAllowMockingProtectedMethods(); - $emitEvents->shouldReceive('isCursorConsistentWithMessages')->once()->andReturnFalse(); - - $emitEvents->eventPublisherMiddleware = $this->eventPublisherMiddleware; - $emitEvents->databaseConnection = 'testing'; - $emitEvents->batchSize = 2; - $emitEvents->cursor = $cursor; - $emitEvents->sendBatch(); - - self::assertEquals(0, $emitEvents->cursor->last_id); - self::assertDatabaseHas(DomainEventsCursor::class, ['last_id' => 0]); - self::assertEquals(2, $emitEvents->attemptForErrors); - self::assertEquals(1, $emitEvents->attemptForNoEvents); - } - - /** - * @test - */ - public function whenSendingABatchWithTheMaxBatchSizeMessagesItShouldNotCallTheCheckCursorConsistencyWithEventsMethodAndPublishTheEvents(): void - { - $cursor = DomainEventsCursor::factory()->create(['last_id' => 0]); - DomainEvent::factory(2)->create(); - - $this->whenEventsArePublished(fn (...$eventMessages) => count($eventMessages) === 2); - - $emitEvents = Mockery::mock(EmitEvents::class)->makePartial(); - $emitEvents->__construct(); - $emitEvents->shouldAllowMockingProtectedMethods(); - $emitEvents->shouldNotReceive('checkCursorConsistencyWithEvents'); - - $emitEvents->eventPublisherMiddleware = $this->eventPublisherMiddleware; - $emitEvents->databaseConnection = 'testing'; - $emitEvents->batchSize = 2; - $emitEvents->cursor = $cursor; - $emitEvents->sendBatch(); - - self::assertEquals(2, $emitEvents->cursor->last_id); - self::assertDatabaseHas(DomainEventsCursor::class, ['last_id' => 2]); - self::assertEquals(1, $emitEvents->attemptForErrors); - self::assertEquals(1, $emitEvents->attemptForNoEvents); - } - - /** - * @test - */ - public function whenSendingABatchAndThereIsAnErrorSavingTheCursorItShouldWaitAndRestoreTheCursor(): void - { - $cursor = DomainEventsCursor::factory()->create(['last_id' => 0]); - DomainEvent::factory()->create(); - - $this->whenEventsArePublished(fn (...$eventMessages) => count($eventMessages) === 1); - - DomainEventsCursor::saving(fn () => throw new Exception()); - - PHPMockery::mock(__NAMESPACE__, 'sleep') - ->once() - ->with(1); - - $this->emitEvents->cursor = $cursor; - $this->emitEvents->sendBatch(); - - $this->checkFinalCursor(0); - self::assertEquals(2, $this->emitEvents->attemptForErrors); - self::assertEquals(1, $this->emitEvents->attemptForNoEvents); - } - - /** - * @test - */ - public function whenSendingABatchAndThereIsAnErrorSavingTheCursorForThirdTimeItShouldWaitAndRestoreTheCursor(): void - { - $this->emitEvents->attemptForErrors = 3; - - $cursor = DomainEventsCursor::factory()->create(['last_id' => 0]); - DomainEvent::factory()->create(); - - $this->whenEventsArePublished(fn (...$eventMessages) => count($eventMessages) === 1); - - DomainEventsCursor::saving(fn () => throw new Exception()); - - PHPMockery::mock(__NAMESPACE__, 'sleep') - ->once() - ->with(4); - - $this->emitEvents->cursor = $cursor; - $this->emitEvents->sendBatch(); - - $this->checkFinalCursor(0); - self::assertEquals(4, $this->emitEvents->attemptForErrors); - self::assertEquals(1, $this->emitEvents->attemptForNoEvents); - } - - /** - * @test - */ - public function whenSendingABatchAndThereIsAnErrorSavingTheCursorForSeventhTimeItShouldWaitTheMaxSecondsAndRestoreTheCursor(): void - { - $this->emitEvents->attemptForErrors = 7; - - $cursor = DomainEventsCursor::factory()->create(['last_id' => 0]); - DomainEvent::factory()->create(); - - $this->whenEventsArePublished(fn (...$eventMessages) => count($eventMessages) === 1); - - DomainEventsCursor::saving(fn () => throw new Exception()); - - PHPMockery::mock(__NAMESPACE__, 'sleep') - ->once() - ->with(60); - - $this->emitEvents->cursor = $cursor; - $this->emitEvents->sendBatch(); - - $this->checkFinalCursor(0); - self::assertEquals(8, $this->emitEvents->attemptForErrors); - self::assertEquals(1, $this->emitEvents->attemptForNoEvents); + self::assertDatabaseHas(DomainEvent::class, ['id' => $event['id']]); } private function whenEventsArePublished(Closure $closure): void @@ -445,10 +189,4 @@ private function whenEventsArePublished(Closure $closure): void ->withArgs($closure) ->andReturnTrue(); } - - private function checkFinalCursor(int $lastId): void - { - self::assertEquals($lastId, $this->emitEvents->cursor->last_id); - self::assertDatabaseHas(DomainEventsCursor::class, ['last_id' => $lastId]); - } }