Skip to content

Commit

Permalink
added validator for config, callback example added
Browse files Browse the repository at this point in the history
  • Loading branch information
krowinski committed Mar 3, 2016
1 parent 0016ed4 commit b60a417
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 20 deletions.
31 changes: 31 additions & 0 deletions example/dump_events_callback.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php
error_reporting(E_ALL);
ini_set('display_errors', 1);
date_default_timezone_set('UTC');
include __DIR__ . '/../vendor/autoload.php';

use MySQLReplication\Event\DTO\DeleteRowsDTO;
use MySQLReplication\Event\DTO\EventDTO;
use MySQLReplication\Event\DTO\GTIDLogDTO;
use MySQLReplication\Event\DTO\QueryDTO;
use MySQLReplication\Event\DTO\RotateDTO;
use MySQLReplication\Event\DTO\TableMapDTO;
use MySQLReplication\Event\DTO\UpdateRowsDTO;
use MySQLReplication\Event\DTO\WriteRowsDTO;
use MySQLReplication\Event\DTO\XidDTO;
use MySQLReplication\MySQLReplicationFactory;
use MySQLReplication\Config\ConfigService;

$binLogStream = new MySQLReplicationFactory(
(new ConfigService())->makeConfigFromArray([
'user' => 'root',
'ip' => '192.168.1.100',
'password' => 'root'
])
);

$binLogStream->parseBinLogUsingCallback(function($event)
{
/** @var DeleteRowsDTO|EventDTO|GTIDLogDTO|QueryDTO|RotateDTO|TableMapDTO|UpdateRowsDTO|WriteRowsDTO|XidDTO $event */
echo $event;
});
19 changes: 7 additions & 12 deletions src/MySQLReplication/BinLog/BinLogConnect.php
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?php
namespace MySQLReplication\BinLog;

use MySQLReplication\BinLog\Exception\BinLogException;
use MySQLReplication\BinLog\Exception\ConfigException;
use MySQLReplication\Config\Config;
use MySQLReplication\Repository\MySQLRepository;
use MySQLReplication\Definitions\ConstCapabilityFlags;
Expand Down Expand Up @@ -95,21 +95,16 @@ public function getCheckSum()
*/
public function connectToStream()
{
if (false === filter_var($this->config->getIp(), FILTER_VALIDATE_IP))
{
throw new BinLogException('Given parameter "' . $this->config->getIp() . '" is not a valid IP');
}

if (false === ($this->socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP)))
{
throw new BinLogException('Unable to create a socket:' . socket_strerror(socket_last_error()), socket_last_error());
throw new ConfigException('Unable to create a socket:' . socket_strerror(socket_last_error()), socket_last_error());
}
socket_set_block($this->socket);
socket_set_option($this->socket, SOL_SOCKET, SO_KEEPALIVE, 1);

if (false === socket_connect($this->socket, $this->config->getIp(), $this->config->getPort()))
{
throw new BinLogException(socket_strerror(socket_last_error()), socket_last_error());
throw new ConfigException(socket_strerror(socket_last_error()), socket_last_error());
}

$this->serverInfo();
Expand Down Expand Up @@ -169,7 +164,7 @@ public function isWriteSuccessful($packet)
$error_msg .= $packet[$i];
}

throw new BinLogException($error_msg, $error_code);
throw new ConfigException($error_msg, $error_code);
}
}

Expand All @@ -182,15 +177,15 @@ private function readFromSocket($length)
{
if ($length == 5)
{
throw new BinLogException('read 5 bytes from mysql server has gone away');
throw new ConfigException('read 5 bytes from mysql server has gone away');
}

if ($length === socket_recv($this->socket, $buf, $length, MSG_WAITALL))
{
return $buf;
}

throw new BinLogException(socket_strerror(socket_last_error()), socket_last_error());
throw new ConfigException(socket_strerror(socket_last_error()), socket_last_error());
}

/**
Expand All @@ -217,7 +212,7 @@ private function writeToSocket($data)
{
if (false === socket_write($this->socket, $data, strlen($data)))
{
throw new BinLogException('Unable to write to socket: ' . socket_strerror(socket_last_error()), socket_last_error());
throw new ConfigException('Unable to write to socket: ' . socket_strerror(socket_last_error()), socket_last_error());
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/MySQLReplication/BinaryDataReader/BinaryDataReader.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

namespace MySQLReplication\BinaryDataReader;

use MySQLReplication\BinaryDataReader\Exception\BinaryDataReaderException;

/**
* Class BinaryDataReader
* @package MySQLReplication\BinaryDataReader
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?php

namespace MySQLReplication\BinaryDataReader;
namespace MySQLReplication\BinaryDataReader\Exception;

use MySQLReplication\Exception\MySQLReplicationException;

Expand Down
55 changes: 55 additions & 0 deletions src/MySQLReplication/Config/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

namespace MySQLReplication\Config;

use MySQLReplication\Config\Exception\ConfigException;

/**
* Class Config
* @package MySQLReplication\Config
Expand Down Expand Up @@ -114,6 +116,59 @@ public function __construct(
$this->databasesOnly = $databasesOnly;
}

/**
* @throws ConfigException
*/
public function validate()
{
if (!empty($this->user) && false === is_string($this->user))
{
throw new ConfigException(ConfigException::USER_ERROR_MESSAGE, ConfigException::USER_ERROR_CODE);
}
if (!empty($this->ip) && false === filter_var($this->ip, FILTER_VALIDATE_IP))
{
throw new ConfigException(ConfigException::IP_ERROR_MESSAGE, ConfigException::IP_ERROR_CODE);
}
if (!empty($this->port) && false === filter_var($this->port, FILTER_VALIDATE_INT, ['options' => ['min_range' => 0]]))
{
throw new ConfigException(ConfigException::PORT_ERROR_MESSAGE, ConfigException::PORT_ERROR_CODE);
}
if (!empty($this->password) && false === is_string($this->password) && false === is_numeric($this->password))
{
throw new ConfigException(ConfigException::PASSWORD_ERROR_MESSAGE, ConfigException::PASSWORD_ERROR_CODE);
}
if (!empty($this->dbName) && false === is_string($this->dbName))
{
throw new ConfigException(ConfigException::DB_NAME_ERROR_MESSAGE, ConfigException::DB_NAME_ERROR_CODE);
}
if (!empty($this->charset) && false === is_string($this->charset))
{
throw new ConfigException(ConfigException::CHARSET_ERROR_MESSAGE, ConfigException::CHARSET_ERROR_CODE);
}
if (!empty($this->gtid) && false === is_string($this->gtid))
{
foreach (explode(',', $this->gtid) as $gtid)
{
if (false === (bool)preg_match('/^([0-9a-fA-F]{8}(?:-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12})((?::[0-9-]+)+)$/', $gtid, $matches))
{
throw new ConfigException(ConfigException::GTID_ERROR_MESSAGE, ConfigException::GTID_ERROR_CODE);
}
}
}
if (!empty($this->slaveId) && false === filter_var($this->slaveId, FILTER_VALIDATE_INT, ['options' => ['min_range' => 0]]))
{
throw new ConfigException(ConfigException::SLAVE_ID_ERROR_MESSAGE, ConfigException::SLAVE_ID_ERROR_CODE);
}
if (!empty($this->binLogFileName) && false === is_string($this->binLogFileName))
{
throw new ConfigException(ConfigException::BIN_LOG_FILE_NAME_ERROR_MESSAGE, ConfigException::BIN_LOG_FILE_NAME_ERROR_CODE);
}
if (!empty($this->binLogPosition) && false === filter_var($this->binLogPosition, FILTER_VALIDATE_INT, ['options' => ['min_range' => 0]]))
{
throw new ConfigException(ConfigException::BIN_LOG_FILE_POSITION_ERROR_MESSAGE, ConfigException::BIN_LOG_FILE_POSITION_ERROR_CODE);
}
}

/**
* @return string
*/
Expand Down
29 changes: 29 additions & 0 deletions src/MySQLReplication/Config/Exception/ConfigException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php

namespace MySQLReplication\Config\Exception;

use MySQLReplication\Exception\MySQLReplicationException;

class ConfigException extends MySQLReplicationException
{
const USER_ERROR_MESSAGE = 'Incorrect user given';
const USER_ERROR_CODE = 1;
const IP_ERROR_MESSAGE = 'Incorrect IP given';
const IP_ERROR_CODE = 2;
const PORT_ERROR_MESSAGE = 'Incorrect port given should be numeric ';
const PORT_ERROR_CODE = 3;
const PASSWORD_ERROR_MESSAGE = 'Incorrect password type';
const PASSWORD_ERROR_CODE = 4;
const DB_NAME_ERROR_MESSAGE = 'Incorrect db name type';
const DB_NAME_ERROR_CODE = 5;
const CHARSET_ERROR_MESSAGE = 'Incorrect charset type';
const CHARSET_ERROR_CODE = 6;
const GTID_ERROR_MESSAGE = 'Incorrect gtid';
const GTID_ERROR_CODE = 7;
const SLAVE_ID_ERROR_MESSAGE = 'Incorrect slave id type';
const SLAVE_ID_ERROR_CODE = 8;
const BIN_LOG_FILE_NAME_ERROR_MESSAGE = 'Incorrect binlog name type';
const BIN_LOG_FILE_NAME_ERROR_CODE = 9;
const BIN_LOG_FILE_POSITION_ERROR_MESSAGE = 'Incorrect binlog position type';
const BIN_LOG_FILE_POSITION_ERROR_CODE = 10;
}
4 changes: 2 additions & 2 deletions src/MySQLReplication/Event/RowEvent/Columns.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
namespace MySQLReplication\Event\RowEvent;

use MySQLReplication\BinaryDataReader\BinaryDataReader;
use MySQLReplication\BinLog\Exception\BinLogException;
use MySQLReplication\BinLog\Exception\ConfigException;
use MySQLReplication\Definitions\ConstFieldType;

/**
Expand Down Expand Up @@ -128,7 +128,7 @@ private static function getFieldSpecialValues($columnSchema)
}
else
{
throw new BinLogException('Type not handled! - ' . self::$field['type']);
throw new ConfigException('Type not handled! - ' . self::$field['type']);
}
}
}
17 changes: 17 additions & 0 deletions src/MySQLReplication/MySQLReplicationFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ class MySQLReplicationFactory
*/
public function __construct(Config $config)
{
$config->validate();

$this->connection = DriverManager::getConnection([
'user' => $config->getUser(),
'password' => $config->getPassword(),
Expand Down Expand Up @@ -93,4 +95,19 @@ public function getBinLogEvent()
{
return $this->event->consume();
}

/**
* @param callable $callback
*/
public function parseBinLogUsingCallback(Callable $callback)
{
while (1)
{
$event = $this->event->consume();
if (!is_null($event))
{
call_user_func($callback, $event);
}
}
}
}
10 changes: 5 additions & 5 deletions tests/Unit/BinaryDataReader/BinaryDataReaderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public function shouldReadCodedBinary()

/**
* @test
* @expectedException \MySQLReplication\BinaryDataReader\BinaryDataReaderException
* @expectedException \MySQLReplication\BinaryDataReader\Exception\BinaryDataReaderException
*/
public function shouldThrowErrorOnUnknownCodedBinary()
{
Expand All @@ -71,7 +71,7 @@ public function dataProviderForUInt()
* @param $size
* @param $data
* @param $expected
* @throws \MySQLReplication\BinaryDataReader\BinaryDataReaderException
* @throws \MySQLReplication\BinaryDataReader\Exception\BinaryDataReaderException
*/
public function shouldReadUIntBySize($size, $data, $expected)
{
Expand All @@ -80,7 +80,7 @@ public function shouldReadUIntBySize($size, $data, $expected)

/**
* @test
* @expectedException \MySQLReplication\BinaryDataReader\BinaryDataReaderException
* @expectedException \MySQLReplication\BinaryDataReader\Exception\BinaryDataReaderException
*/
public function shouldThrowErrorOnReadUIntBySizeNotSupported()
{
Expand All @@ -104,7 +104,7 @@ public function dataProviderForBeInt()
* @param $size
* @param $data
* @param $expected
* @throws \MySQLReplication\BinaryDataReader\BinaryDataReaderException
* @throws \MySQLReplication\BinaryDataReader\Exception\BinaryDataReaderException
*/
public function shouldReadIntBeBySize($size, $data, $expected)
{
Expand All @@ -113,7 +113,7 @@ public function shouldReadIntBeBySize($size, $data, $expected)

/**
* @test
* @expectedException \MySQLReplication\BinaryDataReader\BinaryDataReaderException
* @expectedException \MySQLReplication\BinaryDataReader\Exception\BinaryDataReaderException
*/
public function shouldThrowErrorOnReadIntBeBySizeNotSupported()
{
Expand Down

0 comments on commit b60a417

Please sign in to comment.