From f13378b8c1f8bd44c40bbd8ea256d8ac6f3f4164 Mon Sep 17 00:00:00 2001 From: Denis Zunke Date: Wed, 13 Mar 2024 08:17:02 +0100 Subject: [PATCH] Implement rows query event from mysql --- .github/workflows/tests.yml | 2 +- docker-compose.yml | 3 +- .../Definitions/ConstEventsNames.php | 1 + .../Event/DTO/RowsQueryDTO.php | 40 +++++++++++++++++++ src/MySQLReplication/Event/Event.php | 5 +++ .../Event/EventSubscribers.php | 7 ++++ src/MySQLReplication/Event/RowsQueryEvent.php | 25 ++++++++++++ tests/Integration/BaseCase.php | 10 ++++- tests/Integration/RowsQueryTest.php | 36 +++++++++++++++++ 9 files changed, 126 insertions(+), 3 deletions(-) create mode 100644 src/MySQLReplication/Event/DTO/RowsQueryDTO.php create mode 100644 src/MySQLReplication/Event/RowsQueryEvent.php create mode 100644 tests/Integration/RowsQueryTest.php diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index fc04054..c88419e 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -24,7 +24,7 @@ jobs: - name: Start mysql service run: | - echo -e "\n[mysqld]\nserver-id=1\nbinlog_format=row\nlog_bin=/var/log/mysql/mysql-bin.log" | sudo tee -a /etc/mysql/my.cnf + echo -e "\n[mysqld]\nserver-id=1\nbinlog_format=row\nlog_bin=/var/log/mysql/mysql-bin.log\nbinlog_rows_query_log_events=ON" | sudo tee -a /etc/mysql/my.cnf sudo /etc/init.d/mysql start mysql_tzinfo_to_sql /usr/share/zoneinfo | mysql -u root mysql -proot diff --git a/docker-compose.yml b/docker-compose.yml index 2bc7764..44350ce 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -12,7 +12,8 @@ services: '--log_bin=binlog', '--max_binlog_size=8M', '--binlog_format=row', - '--server-id=1' + '--server-id=1', + '--binlog_rows_query_log_events=ON' ] environment: - MYSQL_ROOT_PASSWORD=root diff --git a/src/MySQLReplication/Definitions/ConstEventsNames.php b/src/MySQLReplication/Definitions/ConstEventsNames.php index 75c3ad3..bf172b4 100644 --- a/src/MySQLReplication/Definitions/ConstEventsNames.php +++ b/src/MySQLReplication/Definitions/ConstEventsNames.php @@ -17,4 +17,5 @@ enum ConstEventsNames: string case TABLE_MAP = 'tableMap'; case WRITE = 'write'; case FORMAT_DESCRIPTION = 'format description'; + case ROWS_QUERY = 'rows_query'; } diff --git a/src/MySQLReplication/Event/DTO/RowsQueryDTO.php b/src/MySQLReplication/Event/DTO/RowsQueryDTO.php new file mode 100644 index 0000000..5140e7d --- /dev/null +++ b/src/MySQLReplication/Event/DTO/RowsQueryDTO.php @@ -0,0 +1,40 @@ +getType() . ' === ' . PHP_EOL . + 'Date: ' . $this->eventInfo->getDateTime() . PHP_EOL . + 'Log position: ' . $this->eventInfo->pos . PHP_EOL . + 'Event size: ' . $this->eventInfo->size . PHP_EOL . + 'Query: ' . $this->query . PHP_EOL; + } + + public function getType(): string + { + return $this->type->value; + } + + public function jsonSerialize(): array + { + return get_object_vars($this); + } +} diff --git a/src/MySQLReplication/Event/Event.php b/src/MySQLReplication/Event/Event.php index 4af0caf..54ef751 100644 --- a/src/MySQLReplication/Event/Event.php +++ b/src/MySQLReplication/Event/Event.php @@ -120,6 +120,11 @@ private function makeEvent(BinaryDataReader $binaryDataReader): ?EventDTO ); } + // The Rows Query Log Event will be triggered with enabled MySQL Config `binlog_rows_query_log_events` + if ($eventInfo->type === ConstEventType::ROWS_QUERY_LOG_EVENT->value) { + return (new RowsQueryEvent($eventInfo, $binaryDataReader, $this->binLogServerInfo))->makeRowsQueryDTO(); + } + if ($eventInfo->type === ConstEventType::FORMAT_DESCRIPTION_EVENT->value) { return new FormatDescriptionEventDTO($eventInfo); } diff --git a/src/MySQLReplication/Event/EventSubscribers.php b/src/MySQLReplication/Event/EventSubscribers.php index 1bb30c2..eb0c405 100644 --- a/src/MySQLReplication/Event/EventSubscribers.php +++ b/src/MySQLReplication/Event/EventSubscribers.php @@ -13,6 +13,7 @@ use MySQLReplication\Event\DTO\MariaDbGtidLogDTO; use MySQLReplication\Event\DTO\QueryDTO; use MySQLReplication\Event\DTO\RotateDTO; +use MySQLReplication\Event\DTO\RowsQueryDTO; use MySQLReplication\Event\DTO\TableMapDTO; use MySQLReplication\Event\DTO\UpdateRowsDTO; use MySQLReplication\Event\DTO\WriteRowsDTO; @@ -35,6 +36,7 @@ public static function getSubscribedEvents(): array ConstEventsNames::MARIADB_GTID->value => 'onMariaDbGtid', ConstEventsNames::FORMAT_DESCRIPTION->value => 'onFormatDescription', ConstEventsNames::HEARTBEAT->value => 'onHeartbeat', + ConstEventsNames::ROWS_QUERY->value => 'onRowsQuery', ]; } @@ -93,6 +95,11 @@ public function onHeartbeat(HeartbeatDTO $event): void $this->allEvents($event); } + public function onRowsQuery(RowsQueryDTO $event): void + { + $this->allEvents($event); + } + protected function allEvents(EventDTO $event): void { } diff --git a/src/MySQLReplication/Event/RowsQueryEvent.php b/src/MySQLReplication/Event/RowsQueryEvent.php new file mode 100644 index 0000000..3ddc611 --- /dev/null +++ b/src/MySQLReplication/Event/RowsQueryEvent.php @@ -0,0 +1,25 @@ +binaryDataReader->advance(1); + return new RowsQueryDTO( + $this->eventInfo, + $this->binaryDataReader->read($this->binaryDataReader->readInt8()), + ); + } +} diff --git a/tests/Integration/BaseCase.php b/tests/Integration/BaseCase.php index a4c11ea..05edeb3 100644 --- a/tests/Integration/BaseCase.php +++ b/tests/Integration/BaseCase.php @@ -41,7 +41,7 @@ protected function setUp(): void ->withHost('0.0.0.0') ->withPassword('root') ->withPort(3306) - ->withEventsIgnore([ConstEventType::GTID_LOG_EVENT->value]); + ->withEventsIgnore($this->getIgnoredEvents()); $this->connect(); @@ -83,6 +83,14 @@ public function connect(): void $this->connection->executeStatement('SET SESSION sql_mode = \'\';'); } + protected function getIgnoredEvents(): array + { + return [ + ConstEventType::GTID_LOG_EVENT->value, // Generally in here + ConstEventType::ROWS_QUERY_LOG_EVENT->value, // Just debugging, there is a special test for it + ]; + } + protected function getEvent(): EventDTO { if ($this->mySQLReplicationFactory === null) { diff --git a/tests/Integration/RowsQueryTest.php b/tests/Integration/RowsQueryTest.php new file mode 100644 index 0000000..129a35f --- /dev/null +++ b/tests/Integration/RowsQueryTest.php @@ -0,0 +1,36 @@ +connection->executeStatement( + 'CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))' + ); + + $insertQuery = 'INSERT INTO test (data) VALUES(\'Hello\') /* Foo:Bar; */'; + $this->connection->executeStatement($insertQuery); + + // The Create Table Query ... irrelevant content for this test + self::assertInstanceOf(QueryDTO::class, $this->getEvent()); + // The BEGIN Query ... irrelevant content for this test + self::assertInstanceOf(QueryDTO::class, $this->getEvent()); + + $rowsQueryEvent = $this->getEvent(); + self::assertInstanceOf(RowsQueryDTO::class, $rowsQueryEvent); + self::assertSame($insertQuery, $rowsQueryEvent->query); + } + + protected function getIgnoredEvents(): array + { + return [ConstEventType::GTID_LOG_EVENT->value]; + } +}