Skip to content

Commit

Permalink
tests
Browse files Browse the repository at this point in the history
  • Loading branch information
krowinski committed Feb 28, 2016
1 parent 8bae5bb commit 0016ed4
Show file tree
Hide file tree
Showing 29 changed files with 604 additions and 194 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ composer.phar
composer.lock
.php_cs.cache
.DS_Store
Thumbs.db
Thumbs.db
/example/profiler.php
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
"require": {
"php": ">=5.4",
"doctrine/dbal": "^2.5",
"doctrine/collections": "^1.3"
"doctrine/collections": "^1.3",
"ext-sockets": "^0.0.0"
},
"require-dev": {
"phpunit/phpunit": "*"
Expand Down
11 changes: 6 additions & 5 deletions example/benchmark.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
include __DIR__ . '/../vendor/autoload.php';

use Doctrine\DBAL\DriverManager;
use MySQLReplication\BinLogStream;
use MySQLReplication\MySQLReplicationFactory;
use MySQLReplication\Config\ConfigService;
use MySQLReplication\Definitions\ConstEventType;
use MySQLReplication\Event\DTO\UpdateRowsDTO;
Expand Down Expand Up @@ -35,10 +35,10 @@ public function __construct()
$conn->exec("INSERT INTO test2 VALUES(1)");
$conn->exec("RESET MASTER");

$this->binLogStream = new BinLogStream(
$this->binLogStream = new MySQLReplicationFactory(
(new ConfigService())->makeConfigFromArray([
'user' => 'root',
'host' => '192.168.1.100',
'ip' => '192.168.1.100',
'password' => 'root',
'eventsOnly' => [ConstEventType::UPDATE_ROWS_EVENT_V1, ConstEventType::UPDATE_ROWS_EVENT_V2],
'slaveId' => 9999
Expand Down Expand Up @@ -96,7 +96,7 @@ private function consume()
$result = $this->binLogStream->getBinLogEvent();
if ($result instanceof UpdateRowsDTO)
{
$i += 1;
++$i;
if (0 === ($i % 1000))
{
echo ((int)($i / (microtime(true) - $start)) . ' event by seconds (' . $i . ' total)') . PHP_EOL;
Expand All @@ -120,7 +120,8 @@ private function produce()
}

$conn->close();
die;
}
}

(new Benchmark())->run();
(new Benchmark())->run();
8 changes: 4 additions & 4 deletions example/dump_events.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
date_default_timezone_set('UTC');
include __DIR__ . '/../vendor/autoload.php';

use MySQLReplication\BinLogStream;
use MySQLReplication\MySQLReplicationFactory;
use MySQLReplication\Config\ConfigService;

$binLogStream = new BinLogStream(
$binLogStream = new MySQLReplicationFactory(
(new ConfigService())->makeConfigFromArray([
'user' => 'root',
'host' => '192.168.1.100',
'ip' => '192.168.1.100',
'password' => 'root',
//'gtid' => '9b1c8d18-2a76-11e5-a26b-000c2976f3f3:1-177592',
])
Expand All @@ -28,4 +28,4 @@

echo 'Memory usage ' . round(memory_get_usage() / 1048576, 2) . ' MB' . PHP_EOL;
}
}
}
45 changes: 3 additions & 42 deletions src/MySQLReplication/BinLog/BinLogAuth.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

namespace MySQLReplication\BinLog;

use MySQLReplication\Exception\BinLogException;

/**
* Class PackAuth
*/
Expand All @@ -13,19 +11,7 @@ class BinLogAuth
* 2^24 - 1 16m
* @var int
*/
public $packageMaxLength = 16777215;

/**
* http://dev.mysql.com/doc/internals/en/auth-phase-fast-path.html 00 FE
* @var array
*/
public $packageOkHeader = [0, 254];

/**
* FF
* @var array
*/
public $packageErrorHeader = [255];
private $binaryDataMaxLength = 16777215;

/**
* @param $flag
Expand All @@ -35,10 +21,10 @@ class BinLogAuth
* @return string
* @link http://dev.mysql.com/doc/internals/en/secure-password-authentication.html#packet-Authentication::Native41
*/
public function createAuthenticationPacket($flag, $user, $pass, $salt)
public function createAuthenticationBinary($flag, $user, $pass, $salt)
{
$data = pack('L', $flag);
$data .= pack('L', $this->packageMaxLength);
$data .= pack('L', $this->binaryDataMaxLength);
$data .= chr(33);
for ($i = 0; $i < 23; $i++)
{
Expand All @@ -53,29 +39,4 @@ public function createAuthenticationPacket($flag, $user, $pass, $salt)

return $data;
}

/**
* @param string $packet
* @return array
* @throws BinLogException
*/
public function isWriteSuccessful($packet)
{
$head = ord($packet[0]);
if (in_array($head, $this->packageOkHeader))
{
return ['status' => true, 'code' => 0, 'msg' => ''];
}
else
{
$error_code = unpack('v', $packet[1] . $packet[2])[1];
$error_msg = '';
for ($i = 9; $i < strlen($packet); $i++)
{
$error_msg .= $packet[$i];
}

throw new BinLogException($error_msg, $error_code);
}
}
}
106 changes: 65 additions & 41 deletions src/MySQLReplication/BinLog/BinLogConnect.php
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
<?php
namespace MySQLReplication\BinLog;

use MySQLReplication\BinLog\Exception\BinLogException;
use MySQLReplication\Config\Config;
use MySQLReplication\Repository\MySQLRepository;
use MySQLReplication\Definitions\ConstCapabilityFlags;
use MySQLReplication\Definitions\ConstCommand;
use MySQLReplication\Exception\BinLogException;
use MySQLReplication\Gtid\GtidCollection;

/**
Expand All @@ -24,7 +24,7 @@ class BinLogConnect
/**
* @var MySQLRepository
*/
private $DBHelper;
private $mySQLRepository;
/**
* @var Config
*/
Expand All @@ -37,25 +37,33 @@ class BinLogConnect
* @var GtidCollection
*/
private $gtidCollection;
/**
* http://dev.mysql.com/doc/internals/en/auth-phase-fast-path.html 00 FE
* @var array
*/
private $packageOkHeader = [0, 254];

/**
* @param Config $config
* @param MySQLRepository $DBHelper
* @param MySQLRepository $mySQLRepository
* @param BinLogAuth $packAuth
* @param GtidCollection $gtidCollection
*/
public function __construct(
Config $config,
MySQLRepository $DBHelper,
MySQLRepository $mySQLRepository,
BinLogAuth $packAuth,
GtidCollection $gtidCollection
) {
$this->DBHelper = $DBHelper;
$this->mySQLRepository = $mySQLRepository;
$this->config = $config;
$this->packAuth = $packAuth;
$this->gtidCollection = $gtidCollection;
}

/**
*
*/
public function __destruct()
{
if (true === $this->isConnected())
Expand All @@ -81,20 +89,25 @@ public function getCheckSum()
return $this->checkSum;
}


/**
* @throws BinLogException
* @return self
*/
public function connectToStream()
{
if (false === filter_var($this->config->getIp(), FILTER_VALIDATE_IP))
{
throw new BinLogException('Given parameter "' . $this->config->getIp() . '" is not a valid IP');
}

if (false === ($this->socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP)))
{
throw new BinLogException('Unable to create a socket:' . socket_strerror(socket_last_error()), socket_last_error());
}
socket_set_block($this->socket);
socket_set_option($this->socket, SOL_SOCKET, SO_KEEPALIVE, 1);

if (false === socket_connect($this->socket, $this->config->getHost(), $this->config->getPort()))
if (false === socket_connect($this->socket, $this->config->getIp(), $this->config->getPort()))
{
throw new BinLogException(socket_strerror(socket_last_error()), socket_last_error());
}
Expand All @@ -104,6 +117,9 @@ public function connectToStream()
$this->getBinlogStream();
}

/**
*
*/
private function serverInfo()
{
BinLogServerInfo::parsePackage($this->getPacket(false));
Expand All @@ -126,11 +142,37 @@ public function getPacket($checkForOkByte = true)
$result = $this->readFromSocket($dataLength);
if (true === $checkForOkByte)
{
$this->packAuth->isWriteSuccessful($result);
$this->isWriteSuccessful($result);
}
return $result;
}


/**
* @param string $packet
* @return array
* @throws BinLogException
*/
public function isWriteSuccessful($packet)
{
$head = ord($packet[0]);
if (in_array($head, $this->packageOkHeader))
{
return ['status' => true, 'code' => 0, 'msg' => ''];
}
else
{
$error_code = unpack('v', $packet[1] . $packet[2])[1];
$error_msg = '';
for ($i = 9; $i < strlen($packet); $i++)
{
$error_msg .= $packet[$i];
}

throw new BinLogException($error_msg, $error_code);
}
}

/**
* @param $length
* @return string
Expand All @@ -143,43 +185,25 @@ private function readFromSocket($length)
throw new BinLogException('read 5 bytes from mysql server has gone away');
}

try
{
$bytes_read = 0;
$body = '';
while ($bytes_read < $length)
{
$resp = socket_read($this->socket, $length - $bytes_read);
if ($resp === false)
{
throw new BinLogException(socket_strerror(socket_last_error()), socket_last_error());
}

// server kill connection or server gone away
if (strlen($resp) === 0)
{
throw new BinLogException('read less ' . ($length - strlen($body)));
}
$body .= $resp;
$bytes_read += strlen($resp);
}
if (strlen($body) < $length)
{
throw new BinLogException('read less ' . ($length - strlen($body)));
}
return $body;
} catch (\Exception $e)
if ($length === socket_recv($this->socket, $buf, $length, MSG_WAITALL))
{
throw new BinLogException(var_export($e, true));
return $buf;
}

throw new BinLogException(socket_strerror(socket_last_error()), socket_last_error());
}

/**
* @throws BinLogException
*/
private function auth()
{
$data = $this->packAuth->createAuthenticationPacket(ConstCapabilityFlags::getCapabilities(), $this->config->getUser(), $this->config->getPassword(), BinLogServerInfo::getSalt());
$data = $this->packAuth->createAuthenticationBinary(
ConstCapabilityFlags::getCapabilities(),
$this->config->getUser(),
$this->config->getPassword(),
BinLogServerInfo::getSalt()
);

$this->writeToSocket($data);
$this->getPacket();
Expand All @@ -202,7 +226,7 @@ private function writeToSocket($data)
*/
private function getBinlogStream()
{
$this->checkSum = $this->DBHelper->isCheckSum();
$this->checkSum = $this->mySQLRepository->isCheckSum();
if (true === $this->checkSum)
{
$this->execute('SET @master_binlog_checksum=@@global.binlog_checksum');
Expand All @@ -215,7 +239,7 @@ private function getBinlogStream()

if ('' === $binFilePos || '' === $binFileName)
{
$master = $this->DBHelper->getMasterStatus();
$master = $this->mySQLRepository->getMasterStatus();
$binFilePos = $master['Position'];
$binFileName = $master['File'];
}
Expand All @@ -228,7 +252,7 @@ private function getBinlogStream()
}
else
{
$prelude = pack('l', 26 + $this->gtidCollection->getEncodedPacketLength()) . chr(ConstCommand::COM_BINLOG_DUMP_GTID);
$prelude = pack('l', 26 + $this->gtidCollection->getEncodedLength()) . chr(ConstCommand::COM_BINLOG_DUMP_GTID);
$prelude .= pack('S', 0);
$prelude .= pack('I', $this->config->getSlaveId());
$prelude .= pack('I', 3);
Expand All @@ -237,8 +261,8 @@ private function getBinlogStream()
$prelude .= chr(0);
$prelude .= pack('Q', 4);

$prelude .= pack('I', $this->gtidCollection->getEncodedPacketLength());
$prelude .= $this->gtidCollection->getEncodedPacket();
$prelude .= pack('I', $this->gtidCollection->getEncodedLength());
$prelude .= $this->gtidCollection->getEncoded();
}

$this->writeToSocket($prelude);
Expand Down
9 changes: 9 additions & 0 deletions src/MySQLReplication/BinLog/Exception/BinLogException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?php

namespace MySQLReplication\BinLog\Exception;

use MySQLReplication\Exception\MySQLReplicationException;

class BinLogException extends MySQLReplicationException
{
}
Loading

0 comments on commit 0016ed4

Please sign in to comment.