diff --git a/example/dump_events_callback.php b/example/dump_events_callback.php new file mode 100755 index 0000000..dd1b164 --- /dev/null +++ b/example/dump_events_callback.php @@ -0,0 +1,31 @@ +makeConfigFromArray([ + 'user' => 'root', + 'ip' => '192.168.1.100', + 'password' => 'root' + ]) +); + +$binLogStream->parseBinLogUsingCallback(function($event) +{ + /** @var DeleteRowsDTO|EventDTO|GTIDLogDTO|QueryDTO|RotateDTO|TableMapDTO|UpdateRowsDTO|WriteRowsDTO|XidDTO $event */ + echo $event; +}); \ No newline at end of file diff --git a/src/MySQLReplication/BinLog/BinLogConnect.php b/src/MySQLReplication/BinLog/BinLogConnect.php index c37ff7c..3f2047c 100755 --- a/src/MySQLReplication/BinLog/BinLogConnect.php +++ b/src/MySQLReplication/BinLog/BinLogConnect.php @@ -1,7 +1,7 @@ config->getIp(), FILTER_VALIDATE_IP)) - { - throw new BinLogException('Given parameter "' . $this->config->getIp() . '" is not a valid IP'); - } - if (false === ($this->socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP))) { - throw new BinLogException('Unable to create a socket:' . socket_strerror(socket_last_error()), socket_last_error()); + throw new ConfigException('Unable to create a socket:' . socket_strerror(socket_last_error()), socket_last_error()); } socket_set_block($this->socket); socket_set_option($this->socket, SOL_SOCKET, SO_KEEPALIVE, 1); if (false === socket_connect($this->socket, $this->config->getIp(), $this->config->getPort())) { - throw new BinLogException(socket_strerror(socket_last_error()), socket_last_error()); + throw new ConfigException(socket_strerror(socket_last_error()), socket_last_error()); } $this->serverInfo(); @@ -169,7 +164,7 @@ public function isWriteSuccessful($packet) $error_msg .= $packet[$i]; } - throw new BinLogException($error_msg, $error_code); + throw new ConfigException($error_msg, $error_code); } } @@ -182,7 +177,7 @@ private function readFromSocket($length) { if ($length == 5) { - throw new BinLogException('read 5 bytes from mysql server has gone away'); + throw new ConfigException('read 5 bytes from mysql server has gone away'); } if ($length === socket_recv($this->socket, $buf, $length, MSG_WAITALL)) @@ -190,7 +185,7 @@ private function readFromSocket($length) return $buf; } - throw new BinLogException(socket_strerror(socket_last_error()), socket_last_error()); + throw new ConfigException(socket_strerror(socket_last_error()), socket_last_error()); } /** @@ -217,7 +212,7 @@ private function writeToSocket($data) { if (false === socket_write($this->socket, $data, strlen($data))) { - throw new BinLogException('Unable to write to socket: ' . socket_strerror(socket_last_error()), socket_last_error()); + throw new ConfigException('Unable to write to socket: ' . socket_strerror(socket_last_error()), socket_last_error()); } } diff --git a/src/MySQLReplication/BinaryDataReader/BinaryDataReader.php b/src/MySQLReplication/BinaryDataReader/BinaryDataReader.php index 53c1545..c64a6a6 100755 --- a/src/MySQLReplication/BinaryDataReader/BinaryDataReader.php +++ b/src/MySQLReplication/BinaryDataReader/BinaryDataReader.php @@ -2,6 +2,8 @@ namespace MySQLReplication\BinaryDataReader; +use MySQLReplication\BinaryDataReader\Exception\BinaryDataReaderException; + /** * Class BinaryDataReader * @package MySQLReplication\BinaryDataReader diff --git a/src/MySQLReplication/BinaryDataReader/BinaryDataReaderException.php b/src/MySQLReplication/BinaryDataReader/Exception/BinaryDataReaderException.php similarity index 68% rename from src/MySQLReplication/BinaryDataReader/BinaryDataReaderException.php rename to src/MySQLReplication/BinaryDataReader/Exception/BinaryDataReaderException.php index d5cb8fd..9fe9dbf 100755 --- a/src/MySQLReplication/BinaryDataReader/BinaryDataReaderException.php +++ b/src/MySQLReplication/BinaryDataReader/Exception/BinaryDataReaderException.php @@ -1,6 +1,6 @@ databasesOnly = $databasesOnly; } + /** + * @throws ConfigException + */ + public function validate() + { + if (!empty($this->user) && false === is_string($this->user)) + { + throw new ConfigException(ConfigException::USER_ERROR_MESSAGE, ConfigException::USER_ERROR_CODE); + } + if (!empty($this->ip) && false === filter_var($this->ip, FILTER_VALIDATE_IP)) + { + throw new ConfigException(ConfigException::IP_ERROR_MESSAGE, ConfigException::IP_ERROR_CODE); + } + if (!empty($this->port) && false === filter_var($this->port, FILTER_VALIDATE_INT, ['options' => ['min_range' => 0]])) + { + throw new ConfigException(ConfigException::PORT_ERROR_MESSAGE, ConfigException::PORT_ERROR_CODE); + } + if (!empty($this->password) && false === is_string($this->password) && false === is_numeric($this->password)) + { + throw new ConfigException(ConfigException::PASSWORD_ERROR_MESSAGE, ConfigException::PASSWORD_ERROR_CODE); + } + if (!empty($this->dbName) && false === is_string($this->dbName)) + { + throw new ConfigException(ConfigException::DB_NAME_ERROR_MESSAGE, ConfigException::DB_NAME_ERROR_CODE); + } + if (!empty($this->charset) && false === is_string($this->charset)) + { + throw new ConfigException(ConfigException::CHARSET_ERROR_MESSAGE, ConfigException::CHARSET_ERROR_CODE); + } + if (!empty($this->gtid) && false === is_string($this->gtid)) + { + foreach (explode(',', $this->gtid) as $gtid) + { + if (false === (bool)preg_match('/^([0-9a-fA-F]{8}(?:-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12})((?::[0-9-]+)+)$/', $gtid, $matches)) + { + throw new ConfigException(ConfigException::GTID_ERROR_MESSAGE, ConfigException::GTID_ERROR_CODE); + } + } + } + if (!empty($this->slaveId) && false === filter_var($this->slaveId, FILTER_VALIDATE_INT, ['options' => ['min_range' => 0]])) + { + throw new ConfigException(ConfigException::SLAVE_ID_ERROR_MESSAGE, ConfigException::SLAVE_ID_ERROR_CODE); + } + if (!empty($this->binLogFileName) && false === is_string($this->binLogFileName)) + { + throw new ConfigException(ConfigException::BIN_LOG_FILE_NAME_ERROR_MESSAGE, ConfigException::BIN_LOG_FILE_NAME_ERROR_CODE); + } + if (!empty($this->binLogPosition) && false === filter_var($this->binLogPosition, FILTER_VALIDATE_INT, ['options' => ['min_range' => 0]])) + { + throw new ConfigException(ConfigException::BIN_LOG_FILE_POSITION_ERROR_MESSAGE, ConfigException::BIN_LOG_FILE_POSITION_ERROR_CODE); + } + } + /** * @return string */ diff --git a/src/MySQLReplication/Config/Exception/ConfigException.php b/src/MySQLReplication/Config/Exception/ConfigException.php new file mode 100755 index 0000000..dcbc9a1 --- /dev/null +++ b/src/MySQLReplication/Config/Exception/ConfigException.php @@ -0,0 +1,29 @@ +validate(); + $this->connection = DriverManager::getConnection([ 'user' => $config->getUser(), 'password' => $config->getPassword(), @@ -93,4 +95,19 @@ public function getBinLogEvent() { return $this->event->consume(); } + + /** + * @param callable $callback + */ + public function parseBinLogUsingCallback(Callable $callback) + { + while (1) + { + $event = $this->event->consume(); + if (!is_null($event)) + { + call_user_func($callback, $event); + } + } + } } \ No newline at end of file diff --git a/tests/Unit/BinaryDataReader/BinaryDataReaderTest.php b/tests/Unit/BinaryDataReader/BinaryDataReaderTest.php index e8ca3d7..fb9ac2c 100755 --- a/tests/Unit/BinaryDataReader/BinaryDataReaderTest.php +++ b/tests/Unit/BinaryDataReader/BinaryDataReaderTest.php @@ -44,7 +44,7 @@ public function shouldReadCodedBinary() /** * @test - * @expectedException \MySQLReplication\BinaryDataReader\BinaryDataReaderException + * @expectedException \MySQLReplication\BinaryDataReader\Exception\BinaryDataReaderException */ public function shouldThrowErrorOnUnknownCodedBinary() { @@ -71,7 +71,7 @@ public function dataProviderForUInt() * @param $size * @param $data * @param $expected - * @throws \MySQLReplication\BinaryDataReader\BinaryDataReaderException + * @throws \MySQLReplication\BinaryDataReader\Exception\BinaryDataReaderException */ public function shouldReadUIntBySize($size, $data, $expected) { @@ -80,7 +80,7 @@ public function shouldReadUIntBySize($size, $data, $expected) /** * @test - * @expectedException \MySQLReplication\BinaryDataReader\BinaryDataReaderException + * @expectedException \MySQLReplication\BinaryDataReader\Exception\BinaryDataReaderException */ public function shouldThrowErrorOnReadUIntBySizeNotSupported() { @@ -104,7 +104,7 @@ public function dataProviderForBeInt() * @param $size * @param $data * @param $expected - * @throws \MySQLReplication\BinaryDataReader\BinaryDataReaderException + * @throws \MySQLReplication\BinaryDataReader\Exception\BinaryDataReaderException */ public function shouldReadIntBeBySize($size, $data, $expected) { @@ -113,7 +113,7 @@ public function shouldReadIntBeBySize($size, $data, $expected) /** * @test - * @expectedException \MySQLReplication\BinaryDataReader\BinaryDataReaderException + * @expectedException \MySQLReplication\BinaryDataReader\Exception\BinaryDataReaderException */ public function shouldThrowErrorOnReadIntBeBySizeNotSupported() {