diff --git a/Makefile b/Makefile index 2e916b3..fc9e814 100644 --- a/Makefile +++ b/Makefile @@ -85,6 +85,11 @@ binaries/proxysql_binlog_reader-ubuntu18: docker create --name ubuntu18_build proxysql/packaging:build-ubuntu18 bash -c "while : ; do sleep 10 ; done" docker start ubuntu18_build docker exec ubuntu18_build bash -c "cd /opt; git clone https://github.com/sysown/proxysql_mysqlbinlog.git && cd /opt/proxysql_mysqlbinlog/libslave/ && cmake . && make slave_a && cd /opt/proxysql_mysqlbinlog && make" + # Enable for allowing other replication formats (STATEMENT, MIXED) for debugging purposes + # # + # docker cp patches/slave_allow_rep_formats.patch ubuntu18_build:/opt/proxysql_mysqlbinlog + # docker exec ubuntu18_build bash -c "cd /opt; cd /opt/proxysql_mysqlbinlog; patch -p0 < patches/slave_allow_rep_formats.patch; cd /opt/proxysql_mysqlbinlog/libslave/ && cmake . && make slave_a && cd /opt/proxysql_mysqlbinlog && rm proxysql_binlog_reader && make" + # # sleep 2 docker cp ubuntu18_build:/opt/proxysql_mysqlbinlog/proxysql_binlog_reader ./binaries/proxysql_binlog_reader-ubuntu18 docker exec ubuntu18_build bash -c "apt-get update && apt-get -y install ruby rubygems ruby-dev && gem install fpm && fpm -s dir -t deb -v1.0 --license GPLv3 --category 'Development/Tools' --description 'ProxySQL is a high performance, high availability, protocol aware proxy for MySQL and forks (like Percona Server and MariaDB). All the while getting the unlimited freedom that comes with a GPL license. Its development is driven by the lack of open source proxies that provide high performance.' --url 'https://proxysql.com' --vendor 'ProxySQL LLC' --debug-workspace --workdir /tmp/ --package=/opt/proxysql_mysqlbinlog/ -n proxysql-mysqlbinlog /opt/proxysql_mysqlbinlog/proxysql_binlog_reader/=/bin/" diff --git a/libslave/Slave.cpp b/libslave/Slave.cpp index 065ffb0..8bfa4a4 100644 --- a/libslave/Slave.cpp +++ b/libslave/Slave.cpp @@ -407,7 +407,7 @@ struct raii_mysql_connector } if(was_error) - LOG_INFO(log, "Successfully connected to " << sConnOptions.mysql_host << ":" << m_master_info.mysql_port); + LOG_INFO(log, "Successfully connected to " << sConnOptions.mysql_host << ":" << m_master_info.conn_options.mysql_port); mysql->reconnect = 1; @@ -462,10 +462,22 @@ void Slave::get_remote_binlog(const std::function& _interruptFlag) LOG_INFO(log, "Starting from binlog_pos: " << m_master_info.position); request_dump(m_master_info.position, &mysql); + bool request_dump_again = false; while (!_interruptFlag()) { try { + if (request_dump_again) { + m_master_info.position = getLastBinlogPos(); + if (m_master_info.position.gtid_executed.empty() == false) { + ext_state.setMasterPosition(m_master_info.position); + ext_state.saveMasterPosition(); + + request_dump(m_master_info.position, &mysql); + } + + request_dump_again = false; + } LOG_TRACE(log, "-- reading event --"); @@ -490,6 +502,8 @@ void Slave::get_remote_binlog(const std::function& _interruptFlag) break; case ER_MASTER_FATAL_ERROR_READING_BINLOG: // Error -- unknown binlog file. LOG_ERROR(log, "Myslave: fatal error reading binlog. " << mysql_error(&mysql) ); + request_dump_again = true; + usleep(1000 * 1000); break; case 2013: // Processing error 'Lost connection to MySQL' LOG_WARNING(log, "Myslave: Error from MySQL: " << mysql_error(&mysql) ); @@ -500,6 +514,13 @@ void Slave::get_remote_binlog(const std::function& _interruptFlag) continue; } break; + case ER_MALFORMED_GTID_SET_ENCODING: + LOG_ERROR(log, "Myslave: Error reading packet from server: " << mysql_error(&mysql) + << "; mysql_errno: " << mysql_errno(&mysql)); + LOG_ERROR(log, "Requesting GTID dump again..."); + request_dump_again = true; + usleep(1000 * 1000); + break; default: LOG_ERROR(log, "Myslave: Error reading packet from server: " << mysql_error(&mysql) << "; mysql_error: " << mysql_errno(&mysql)); diff --git a/patches/slave_allow_rep_formats.patch b/patches/slave_allow_rep_formats.patch new file mode 100644 index 0000000..e0a5d14 --- /dev/null +++ b/patches/slave_allow_rep_formats.patch @@ -0,0 +1,14 @@ +diff --git libslave/Slave.cpp libslave/Slave.cpp +index 8bfa4a4..c53348f 100644 +--- libslave/Slave.cpp ++++ libslave/Slave.cpp +@@ -752,7 +752,8 @@ void Slave::check_master_binlog_format() + return; + + } else { +- throw std::runtime_error("Slave::check_binlog_format(): got invalid binlog format: " + tmp); ++ return; ++ // throw std::runtime_error("Slave::check_binlog_format(): got invalid binlog format: " + tmp); + } + } + diff --git a/proxysql_binlog_reader.cpp b/proxysql_binlog_reader.cpp index 68dd0a0..9239eb9 100644 --- a/proxysql_binlog_reader.cpp +++ b/proxysql_binlog_reader.cpp @@ -27,6 +27,8 @@ #include "Slave.h" #include "DefaultExtState.h" +#define BINLOG_VERSION "1.0" + #define ioctl_FIONBIO(fd, mode) \ { \ int ioctl_mode=mode; \ @@ -580,7 +582,7 @@ bool isStopping() { } std::string gtid_executed_to_string(slave::Position &curpos) { - std::string gtid_set; + std::string gtid_set { "" }; for (auto it=curpos.gtid_executed.begin(); it!=curpos.gtid_executed.end(); ++it) { std::string s = it->first; s.insert(8,"-"); @@ -597,7 +599,9 @@ std::string gtid_executed_to_string(slave::Position &curpos) { gtid_set = gtid_set + s2; } } - gtid_set.pop_back(); + if (gtid_set.empty() == false) { + gtid_set.pop_back(); + } return gtid_set; } @@ -627,7 +631,7 @@ int main(int argc, char** argv) { int c; - while (-1 != (c = ::getopt(argc, argv, "fh:u:p:P:l:L:"))) { + while (-1 != (c = ::getopt(argc, argv, "vfh:u:p:P:l:L:"))) { switch (c) { case 'f': foreground=true; break; case 'h': host = optarg; break; @@ -639,6 +643,9 @@ int main(int argc, char** argv) { case 'P': port = std::stoi(optarg); break; case 'l': listen_port = std::stoi(optarg); break; case 'L' : errorstr = optarg; break; + case 'v': + std::cout << "proxysql_binlog_reader version " << BINLOG_VERSION << std::endl; + return 1; default: usage(argv[0]); return 1; @@ -735,6 +742,7 @@ int main(int argc, char** argv) { masterinfo.conn_options.mysql_pass = password; try { + proxy_info("proxysql_binlog_reader version %s\n", BINLOG_VERSION); slave::DefaultExtState sDefExtState; slave::Slave slave(masterinfo, sDefExtState); @@ -750,7 +758,16 @@ int main(int argc, char** argv) { curpos = slave.getLastBinlogPos(); std::string s1 = gtid_executed_to_string(curpos); - std::cout << s1 << std::endl; + + // Wait until a valid 'GTID' has been executed for requesting binlog + while (s1.empty() && !isStopping()) { + proxy_info("'Executed_Gtid_Set' found empty, retrying...\n"); + usleep(1000 * 1000); + + curpos = slave.getLastBinlogPos(); + s1 = gtid_executed_to_string(curpos); + } + proxy_info("Last executed GTID: '%s'\n", s1.c_str()); sDefExtState.setMasterPosition(curpos);