Skip to content

Commit

Permalink
Merge pull request #8501 from dolthub/fulghum/binlog-fixes
Browse files Browse the repository at this point in the history
Bug fixes for replication
  • Loading branch information
fulghum authored Oct 28, 2024
2 parents 6364199 + e197477 commit 1d97bd2
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ func (streamer *binlogStreamer) startStream(ctx *sql.Context, conn *mysql.Conn,

case <-streamer.ticker.C:
logrus.Debug("sending binlog heartbeat")
if err := sendHeartbeat(conn, binlogFormat, *binlogEventMeta); err != nil {
currentLogFilename := filepath.Base(streamer.currentLogFile.Name())
if err := sendHeartbeat(conn, binlogFormat, *binlogEventMeta, currentLogFilename); err != nil {
return err
}
if err := conn.FlushBuffer(); err != nil {
Expand Down Expand Up @@ -254,10 +255,14 @@ func (m *binlogStreamerManager) removeStreamer(streamer *binlogStreamer) {
}
}

func sendHeartbeat(conn *mysql.Conn, binlogFormat *mysql.BinlogFormat, binlogEventMeta mysql.BinlogEventMetadata) error {
// sendHeartbeat sends a heartbeat event over |conn| using the specified |binlogFormat| and |binlogEventMeta| as well
// as |currentLogFilename| to create the event payload.
func sendHeartbeat(conn *mysql.Conn, binlogFormat *mysql.BinlogFormat, binlogEventMeta mysql.BinlogEventMetadata, currentLogFilename string) error {
binlogEventMeta.Timestamp = uint32(0) // Timestamp is zero for a heartbeat event
logrus.WithField("log_position", binlogEventMeta.NextLogPosition).Tracef("sending heartbeat")

binlogEvent := mysql.NewHeartbeatEvent(*binlogFormat, binlogEventMeta)
// MySQL 8.4 requires that we pass the binlog filename in the heartbeat; previous versions accepted
// heartbeat events without a filename, but those cause crashes on MySQL 8.4.
binlogEvent := mysql.NewHeartbeatEventWithLogFile(*binlogFormat, binlogEventMeta, currentLogFilename)
return conn.WriteBinlogEvent(binlogEvent, false)
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ var doltReplicationPrimarySystemVars = map[string]string{
"log_bin": "1",
"enforce_gtid_consistency": "ON",
"gtid_mode": "ON",
"server_id": "42",
}

// TestBinlogPrimary_BinlogNotEnabled tests that when binary logging is NOT enabled, primary commands such as
Expand Down Expand Up @@ -1024,9 +1025,6 @@ func setupForDoltToMySqlReplication() {
primaryDatabase = replicaDatabase
replicaDatabase = tempDatabase

// On the Primary, make sure we have a unique, non-zero SERVER_ID set
primaryDatabase.MustExec("set GLOBAL SERVER_ID=42;")

// Set the session's timezone to UTC, to avoid TIMESTAMP test values changing
// when they are converted to UTC for storage.
replicaDatabase.MustExec("SET @@time_zone = '+0:00';")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"

"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/go-mysql-server/sql/types"
)

// getServerId returns the @@server_id global system variable value. If the value of @@server_id is 0 or is not a
Expand All @@ -29,7 +30,14 @@ func getServerId() (uint32, error) {
return 0, fmt.Errorf("global variable 'server_id' not found")
}

if i, ok := value.(uint32); ok {
// Attempt to convert the server_id value into a UINT32, in case it has been loaded as a string
// through global JSON configuration.
convertedValue, _, err := types.Uint32.Convert(value)
if err != nil {
return 0, err
}

if i, ok := convertedValue.(uint32); ok {
if i == 0 {
return 0, fmt.Errorf("@@server_id is zero – must be set to a non-zero value")
}
Expand Down

0 comments on commit 1d97bd2

Please sign in to comment.