From e197477ba25aec2e4ac17b2315cf53380738fe8d Mon Sep 17 00:00:00 2001 From: Jason Fulghum Date: Mon, 28 Oct 2024 11:18:53 -0700 Subject: [PATCH] Bug fixes for replication: converting server_id to uint32 when it gets loaded as a string from config.json, sending binary logfile name in server heartbeats, now required in MySQL 8.4 --- .../sqle/binlogreplication/binlog_primary_streamer.go | 11 ++++++++--- .../sqle/binlogreplication/binlog_primary_test.go | 4 +--- .../sqle/binlogreplication/system_variable_utils.go | 10 +++++++++- 3 files changed, 18 insertions(+), 7 deletions(-) diff --git a/go/libraries/doltcore/sqle/binlogreplication/binlog_primary_streamer.go b/go/libraries/doltcore/sqle/binlogreplication/binlog_primary_streamer.go index 5a0b30afd14..40221fbad28 100644 --- a/go/libraries/doltcore/sqle/binlogreplication/binlog_primary_streamer.go +++ b/go/libraries/doltcore/sqle/binlogreplication/binlog_primary_streamer.go @@ -81,7 +81,8 @@ func (streamer *binlogStreamer) startStream(ctx *sql.Context, conn *mysql.Conn, case <-streamer.ticker.C: logrus.Debug("sending binlog heartbeat") - if err := sendHeartbeat(conn, binlogFormat, *binlogEventMeta); err != nil { + currentLogFilename := filepath.Base(streamer.currentLogFile.Name()) + if err := sendHeartbeat(conn, binlogFormat, *binlogEventMeta, currentLogFilename); err != nil { return err } if err := conn.FlushBuffer(); err != nil { @@ -254,10 +255,14 @@ func (m *binlogStreamerManager) removeStreamer(streamer *binlogStreamer) { } } -func sendHeartbeat(conn *mysql.Conn, binlogFormat *mysql.BinlogFormat, binlogEventMeta mysql.BinlogEventMetadata) error { +// sendHeartbeat sends a heartbeat event over |conn| using the specified |binlogFormat| and |binlogEventMeta| as well +// as |currentLogFilename| to create the event payload. +func sendHeartbeat(conn *mysql.Conn, binlogFormat *mysql.BinlogFormat, binlogEventMeta mysql.BinlogEventMetadata, currentLogFilename string) error { binlogEventMeta.Timestamp = uint32(0) // Timestamp is zero for a heartbeat event logrus.WithField("log_position", binlogEventMeta.NextLogPosition).Tracef("sending heartbeat") - binlogEvent := mysql.NewHeartbeatEvent(*binlogFormat, binlogEventMeta) + // MySQL 8.4 requires that we pass the binlog filename in the heartbeat; previous versions accepted + // heartbeat events without a filename, but those cause crashes on MySQL 8.4. + binlogEvent := mysql.NewHeartbeatEventWithLogFile(*binlogFormat, binlogEventMeta, currentLogFilename) return conn.WriteBinlogEvent(binlogEvent, false) } diff --git a/go/libraries/doltcore/sqle/binlogreplication/binlog_primary_test.go b/go/libraries/doltcore/sqle/binlogreplication/binlog_primary_test.go index f8d8f9bf138..02cb6a0a77e 100644 --- a/go/libraries/doltcore/sqle/binlogreplication/binlog_primary_test.go +++ b/go/libraries/doltcore/sqle/binlogreplication/binlog_primary_test.go @@ -33,6 +33,7 @@ var doltReplicationPrimarySystemVars = map[string]string{ "log_bin": "1", "enforce_gtid_consistency": "ON", "gtid_mode": "ON", + "server_id": "42", } // TestBinlogPrimary_BinlogNotEnabled tests that when binary logging is NOT enabled, primary commands such as @@ -1024,9 +1025,6 @@ func setupForDoltToMySqlReplication() { primaryDatabase = replicaDatabase replicaDatabase = tempDatabase - // On the Primary, make sure we have a unique, non-zero SERVER_ID set - primaryDatabase.MustExec("set GLOBAL SERVER_ID=42;") - // Set the session's timezone to UTC, to avoid TIMESTAMP test values changing // when they are converted to UTC for storage. replicaDatabase.MustExec("SET @@time_zone = '+0:00';") diff --git a/go/libraries/doltcore/sqle/binlogreplication/system_variable_utils.go b/go/libraries/doltcore/sqle/binlogreplication/system_variable_utils.go index 9bda996e41a..37f38e6217e 100644 --- a/go/libraries/doltcore/sqle/binlogreplication/system_variable_utils.go +++ b/go/libraries/doltcore/sqle/binlogreplication/system_variable_utils.go @@ -19,6 +19,7 @@ import ( "fmt" "github.com/dolthub/go-mysql-server/sql" + "github.com/dolthub/go-mysql-server/sql/types" ) // getServerId returns the @@server_id global system variable value. If the value of @@server_id is 0 or is not a @@ -29,7 +30,14 @@ func getServerId() (uint32, error) { return 0, fmt.Errorf("global variable 'server_id' not found") } - if i, ok := value.(uint32); ok { + // Attempt to convert the server_id value into a UINT32, in case it has been loaded as a string + // through global JSON configuration. + convertedValue, _, err := types.Uint32.Convert(value) + if err != nil { + return 0, err + } + + if i, ok := convertedValue.(uint32); ok { if i == 0 { return 0, fmt.Errorf("@@server_id is zero – must be set to a non-zero value") }