From 8c7543cf8b8ac5330ce65c0ef827e45d615a4805 Mon Sep 17 00:00:00 2001 From: Sean Wu <111744549+VWagen1989@users.noreply.github.com> Date: Fri, 15 Nov 2024 18:33:35 +0800 Subject: [PATCH] fix: support log file based positioning replication (to #155) (#156) * fix: support log file based positioning replication (to #155) * fix: adopt CR feedback * fix: adopt CR feedback * fix: return INVALID if Source_Log_File is empty * fix: wait for the 'stop_replica' signal if fail to connect to primary * chore: refine the output of unit test in GitHub workflow action * test: add binlog replication tests for log file based positioning --- .github/workflows/go.yml | 23 +++- .github/workflows/mysql-replication.yml | 13 +- binlogreplication/binlog_position_store.go | 5 +- binlogreplication/binlog_replica_applier.go | 113 +++++++++++++++--- .../binlog_replica_controller.go | 24 ++++ .../binlog_replication_reconnect_test.go | 5 +- binlogreplication/binlog_replication_test.go | 113 +++++++++++++++--- devtools/replica-setup/checker.sh | 31 ++--- devtools/replica-setup/replica_setup.sh | 1 + devtools/replica-setup/snapshot.sh | 39 ++++-- devtools/replica-setup/start_replication.sh | 21 ++-- go.mod | 8 +- go.sum | 18 +-- 13 files changed, 317 insertions(+), 97 deletions(-) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 521e41e0..94db9ff7 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -43,10 +43,25 @@ jobs: run: go build -v - name: Test packages - run: go test -v -cover ./charset ./transpiler ./backend ./harness + run: | + go test -v -cover ./charset ./transpiler ./backend ./harness | tee packages.log + cat packages.log | grep -e "^--- " | sed 's/--- //g' | awk 'BEGIN {count=1} {printf "%d. %s\n", count++, $0}' + cat packages.log | grep -q "FAIL" && exit 1 || exit 0 - name: Test Query Engine - run: go test -v -cover --timeout 600s . + run: | + go test -v -cover --timeout 600s . | tee query.log + cat query.log | grep -e "^--- " | sed 's/--- //g' | awk 'BEGIN {count=1} {printf "%d. %s\n", count++, $0}' + cat query.log | grep -q "FAIL" && exit 1 || exit 0 - - name: Test Binlog Replication - run: go test -v -p 1 --timeout 360s ./binlogreplication + - name: Test Binlog Replication With GTID Enabled + run: | + GTID_ENABLED=true go test -v -p 1 --timeout 600s ./binlogreplication | tee replication.log + cat replication.log | grep -e "^--- " | sed 's/--- //g' | awk 'BEGIN {count=1} {printf "%d. %s\n", count++, $0}' + cat replication.log | grep -q "FAIL" && exit 1 || exit 0 + + - name: Test Binlog Replication With GTID Disabled + run: | + GTID_ENABLED=false go test -v -p 1 --timeout 600s ./binlogreplication | tee replication.log + cat replication.log | grep -e "^--- " | sed 's/--- //g' | awk 'BEGIN {count=1} {printf "%d. %s\n", count++, $0}' + cat replication.log | grep -q "FAIL" && exit 1 || exit 0 diff --git a/.github/workflows/mysql-replication.yml b/.github/workflows/mysql-replication.yml index dd962eb1..f1c3a0b9 100644 --- a/.github/workflows/mysql-replication.yml +++ b/.github/workflows/mysql-replication.yml @@ -39,5 +39,14 @@ jobs: - name: Build run: go build -v - - name: Test Binlog Replication - run: go test -v -p 1 --timeout 360s ./binlogreplication + - name: Test Binlog Replication With GTID Enabled + run: | + GTID_ENABLED=true go test -v -p 1 --timeout 600s ./binlogreplication | tee replication.log + cat replication.log | grep -e "^--- " | sed 's/--- //g' | awk 'BEGIN {count=1} {printf "%d. %s\n", count++, $0}' + cat replication.log | grep -q "FAIL" && exit 1 || exit 0 + + - name: Test Binlog Replication With GTID Disabled + run: | + GTID_ENABLED=false go test -v -p 1 --timeout 600s ./binlogreplication | tee replication.log + cat replication.log | grep -e "^--- " | sed 's/--- //g' | awk 'BEGIN {count=1} {printf "%d. %s\n", count++, $0}' + cat replication.log | grep -q "FAIL" && exit 1 || exit 0 diff --git a/binlogreplication/binlog_position_store.go b/binlogreplication/binlog_position_store.go index aeaea54b..9692ccc3 100644 --- a/binlogreplication/binlog_position_store.go +++ b/binlogreplication/binlog_position_store.go @@ -32,7 +32,6 @@ import ( ) const binlogPositionDirectory = ".replica" -const mysqlFlavor = "MySQL56" const defaultChannelName = "" // binlogPositionStore manages loading and saving data to the binlog position metadata table. This provides @@ -47,7 +46,7 @@ type binlogPositionStore struct { // Currently only the default binlog channel ("") is supported. // If no position is stored, this method returns a zero mysql.Position and a nil error. // If any errors are encountered, a nil mysql.Position and an error are returned. -func (store *binlogPositionStore) Load(ctx *sql.Context, engine *gms.Engine) (pos replication.Position, err error) { +func (store *binlogPositionStore) Load(flavor string, ctx *sql.Context, engine *gms.Engine) (pos replication.Position, err error) { store.mu.Lock() defer store.mu.Unlock() @@ -62,7 +61,7 @@ func (store *binlogPositionStore) Load(ctx *sql.Context, engine *gms.Engine) (po // Strip off the "MySQL56/" prefix positionString = strings.TrimPrefix(positionString, "MySQL56/") - return replication.ParsePosition(mysqlFlavor, positionString) + return replication.ParsePosition(flavor, positionString) } // Save persists the specified |position| to disk. diff --git a/binlogreplication/binlog_replica_applier.go b/binlogreplication/binlog_replica_applier.go index e1557891..e6b74647 100644 --- a/binlogreplication/binlog_replica_applier.go +++ b/binlogreplication/binlog_replica_applier.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "io" + "regexp" "strconv" "strings" "sync/atomic" @@ -53,6 +54,9 @@ const ( ERFatalReplicaError = 13117 ) +// Match any strings starting with "ON" (case insensitive) +var gtidModeIsOnRegex = regexp.MustCompile(`(?i)^ON$`) + // binlogReplicaApplier represents the process that applies updates from a binlog connection. // // This type is NOT used concurrently – there is currently only one single applier process running to process binlog @@ -116,6 +120,30 @@ func (a *binlogReplicaApplier) IsRunning() bool { return a.running.Load() } +// This function will connect to the MySQL server and check the GTID_MODE. +func connAndCheckGtidModeEnabled(ctx *sql.Context, params mysql.ConnParams) (bool, error) { + conn, err := mysql.Connect(ctx, ¶ms) + if err != nil { + return false, err + } + defer conn.Close() + + var qr *sqltypes.Result + qr, err = conn.ExecuteFetch("SELECT @@GLOBAL.GTID_MODE", 1, true) + if err != nil { + // Maybe it's a MariaDB server, try to get the GTID_STRICT_MODE instead + qr, err = conn.ExecuteFetch("SELECT @@GLOBAL.GTID_STRICT_MODE", 1, true) + if err != nil { + return false, fmt.Errorf("error checking GTID_MODE: %v", err) + } + } + if len(qr.Rows) == 0 { + return false, fmt.Errorf("no rows returned when checking GTID_MODE") + } + gtidMode := string(qr.Rows[0][0].Raw()) + return gtidModeIsOnRegex.MatchString(gtidMode), nil +} + // connectAndStartReplicationEventStream connects to the configured MySQL replication source, including pausing // and retrying if errors are encountered. func (a *binlogReplicaApplier) connectAndStartReplicationEventStream(ctx *sql.Context) (*mysql.Conn, error) { @@ -130,6 +158,8 @@ func (a *binlogReplicaApplier) connectAndStartReplicationEventStream(ctx *sql.Co var conn *mysql.Conn var err error + gtidModeEnabled := false + flavorName := "" for connectionAttempts := uint64(0); ; connectionAttempts++ { replicaSourceInfo, err := loadReplicationConfiguration(ctx, a.engine.Analyzer.Catalog.MySQLDb) @@ -157,6 +187,18 @@ func (a *binlogReplicaApplier) connectAndStartReplicationEventStream(ctx *sql.Co ConnectTimeoutMs: 4_000, } + gtidModeEnabled, err = connAndCheckGtidModeEnabled(ctx, connParams) + if err != nil && connectionAttempts >= maxConnectionAttempts { + return nil, err + } + + if !gtidModeEnabled { + flavorName = replication.FilePosFlavorID + } else { + flavorName = replication.Mysql56FlavorID + } + connParams.Flavor = flavorName + conn, err = mysql.Connect(ctx, &connParams) if err != nil { logrus.Warnf("failed connection attempt to source (%s): %s", @@ -184,7 +226,7 @@ func (a *binlogReplicaApplier) connectAndStartReplicationEventStream(ctx *sql.Co // Request binlog events to start // TODO: This should also have retry logic - err = a.startReplicationEventStream(ctx, conn) + err = a.startReplicationEventStream(ctx, conn, gtidModeEnabled, flavorName) if err != nil { return nil, err } @@ -196,17 +238,10 @@ func (a *binlogReplicaApplier) connectAndStartReplicationEventStream(ctx *sql.Co return conn, nil } -// startReplicationEventStream sends a request over |conn|, the connection to the MySQL source server, to begin -// sending binlog events. -func (a *binlogReplicaApplier) startReplicationEventStream(ctx *sql.Context, conn *mysql.Conn) error { - serverId, err := loadReplicaServerId() +func (a *binlogReplicaApplier) loadGtidPosition(ctx *sql.Context, positionStore *binlogPositionStore, flavorName string) (replication.Position, error) { + position, err := positionStore.Load(flavorName, ctx, a.engine) if err != nil { - return err - } - - position, err := positionStore.Load(ctx, a.engine) - if err != nil { - return err + return replication.Position{}, err } if position.IsZero() { @@ -227,9 +262,9 @@ func (a *binlogReplicaApplier) startReplicationEventStream(ctx *sql.Context, con gtidPurged = gtidPurged[1:] } - purged, err := replication.ParsePosition(mysqlFlavor, gtidPurged) + purged, err := replication.ParsePosition(flavorName, gtidPurged) if err != nil { - return err + return replication.Position{}, err } position = purged } @@ -248,11 +283,57 @@ func (a *binlogReplicaApplier) startReplicationEventStream(ctx *sql.Context, con position = replication.Position{GTIDSet: gtid.GTIDSet()} } + return position, nil +} + +// another method like "initializedGtidPosition" to get the current log file based position +func (a *binlogReplicaApplier) loadLogFilePosition(ctx *sql.Context, positionStore *binlogPositionStore, flavorName string) (replication.Position, error) { + position, err := positionStore.Load(flavorName, ctx, a.engine) + if err != nil { + return replication.Position{}, err + } + + if position.IsZero() { + replicaSourceInfo, err := loadReplicationConfiguration(ctx, a.engine.Analyzer.Catalog.MySQLDb) + if err != nil { + return replication.Position{}, err + } + filePosGtid := replication.FilePosGTID{ + File: replicaSourceInfo.SourceLogFile, + Pos: uint32(replicaSourceInfo.SourceLogPos), + } + position = replication.Position{GTIDSet: filePosGtid} + } + + return position, nil +} + +// startReplicationEventStream sends a request over |conn|, the connection to the MySQL source server, to begin +// sending binlog events. +func (a *binlogReplicaApplier) startReplicationEventStream(ctx *sql.Context, conn *mysql.Conn, gtidModeEnabled bool, flavorName string) error { + serverId, err := loadReplicaServerId() + if err != nil { + return err + } + + var position replication.Position + if gtidModeEnabled { + position, err = a.loadGtidPosition(ctx, positionStore, flavorName) + if err != nil { + return err + } + if err := sql.SystemVariables.AssignValues(map[string]interface{}{"gtid_executed": position.GTIDSet.String()}); err != nil { + ctx.GetLogger().Errorf("unable to set @@GLOBAL.gtid_executed: %s", err.Error()) + } + } else { + position, err = a.loadLogFilePosition(ctx, positionStore, flavorName) + if err != nil { + return err + } + } + a.currentPosition = position a.pendingPosition = position - if err := sql.SystemVariables.AssignValues(map[string]interface{}{"gtid_executed": a.currentPosition.GTIDSet.String()}); err != nil { - ctx.GetLogger().Errorf("unable to set @@GLOBAL.gtid_executed: %s", err.Error()) - } // Clear out the format description in case we're reconnecting, so that we don't use the old format description // to interpret any event messages before we receive the new format description from the new stream. diff --git a/binlogreplication/binlog_replica_controller.go b/binlogreplication/binlog_replica_controller.go index c148ca6e..fdb5b621 100644 --- a/binlogreplication/binlog_replica_controller.go +++ b/binlogreplication/binlog_replica_controller.go @@ -280,6 +280,18 @@ func (d *myBinlogReplicaController) SetReplicationSourceOptions(ctx *sql.Context return err } replicaSourceInfo.ConnectRetryCount = uint64(intValue) + case "SOURCE_LOG_FILE": + value, err := getOptionValueAsString(option) + if err != nil { + return err + } + replicaSourceInfo.SourceLogFile = value + case "SOURCE_LOG_POS": + intValue, err := getOptionValueAsInt(option) + if err != nil { + return err + } + replicaSourceInfo.SourceLogPos = uint64(intValue) case "SOURCE_AUTO_POSITION": intValue, err := getOptionValueAsInt(option) if err != nil { @@ -333,6 +345,14 @@ func (d *myBinlogReplicaController) SetReplicationFilterOptions(_ *sql.Context, return nil } +func changeSourceLogFileToInvalidIfEmpty(status *binlogreplication.ReplicaStatus) { + // As the original design of go-mysql-server, the source log file should be "INVALID" if GTID_MODE is ON. + // An empty string of source log file means GTID_MODE is ON, and we should set it to "INVALID" here. + if status.SourceLogFile == "" { + status.SourceLogFile = "INVALID" + } +} + // GetReplicaStatus implements the BinlogReplicaController interface func (d *myBinlogReplicaController) GetReplicaStatus(ctx *sql.Context) (*binlogreplication.ReplicaStatus, error) { replicaSourceInfo, err := loadReplicationConfiguration(ctx, d.engine.Analyzer.Catalog.MySQLDb) @@ -346,12 +366,15 @@ func (d *myBinlogReplicaController) GetReplicaStatus(ctx *sql.Context) (*binlogr var copy = d.status if replicaSourceInfo == nil { + changeSourceLogFileToInvalidIfEmpty(©) return ©, nil } copy.SourceUser = replicaSourceInfo.User copy.SourceHost = replicaSourceInfo.Host copy.SourcePort = uint(replicaSourceInfo.Port) + copy.SourceLogFile = replicaSourceInfo.SourceLogFile + copy.SourceLogPos = replicaSourceInfo.SourceLogPos copy.SourceServerUuid = replicaSourceInfo.Uuid copy.ConnectRetry = replicaSourceInfo.ConnectRetryInterval copy.SourceRetryCount = replicaSourceInfo.ConnectRetryCount @@ -363,6 +386,7 @@ func (d *myBinlogReplicaController) GetReplicaStatus(ctx *sql.Context) (*binlogr copy.RetrievedGtidSet = copy.ExecutedGtidSet } + changeSourceLogFileToInvalidIfEmpty(©) return ©, nil } diff --git a/binlogreplication/binlog_replication_reconnect_test.go b/binlogreplication/binlog_replication_reconnect_test.go index 5780fc4b..ddd99f8a 100644 --- a/binlogreplication/binlog_replication_reconnect_test.go +++ b/binlogreplication/binlog_replication_reconnect_test.go @@ -119,7 +119,10 @@ func testInitialReplicaStatus(t *testing.T) { require.Equal(t, "Yes", status["Replica_SQL_Running"]) // Unsupported fields - require.Equal(t, "INVALID", status["Source_Log_File"]) + gtidEnabled := getGtidEnabled() + if gtidEnabled { + require.Equal(t, "INVALID", status["Source_Log_File"]) + } require.Equal(t, "Ignored", status["Source_SSL_Allowed"]) require.Equal(t, "None", status["Until_Condition"]) require.Equal(t, "0", status["SQL_Delay"]) diff --git a/binlogreplication/binlog_replication_test.go b/binlogreplication/binlog_replication_test.go index d1682dda..79556315 100644 --- a/binlogreplication/binlog_replication_test.go +++ b/binlogreplication/binlog_replication_test.go @@ -233,7 +233,13 @@ func TestFlushLogs(t *testing.T) { assertCreateTableStatement(t, replicaDatabase, "t", expectedStatement) primaryDatabase.MustExec("flush binary logs;") - waitForReplicaToCatchUp(t) + gtidEnabled := getGtidEnabled() + if gtidEnabled { + // The 'FLUSH BINARY LOGS' statement will update the file position, but not the GTID. + // Since the applier will not update the replication progress when it receives a + // FormatDescription event, we should only check the replica status if GTID is enabled. + waitForReplicaToCatchUp(t) + } primaryDatabase.MustExec("insert into t values (1), (2), (3);") waitForReplicaToCatchUp(t) @@ -549,14 +555,24 @@ func assertWarning(t *testing.T, database *sqlx.DB, code int, message string) { } func queryGtid(t *testing.T, database *sqlx.DB) string { - rows, err := database.Queryx("SELECT @@global.gtid_executed as gtid_executed;") - require.NoError(t, err) - defer rows.Close() - row := convertMapScanResultToStrings(readNextRow(t, rows)) - if row["gtid_executed"] == nil { - t.Fatal("no value for @@GLOBAL.gtid_executed") + gtidEnabled := getGtidEnabled() + isPrimary := database == primaryDatabase + if gtidEnabled { + rows, err := database.Queryx("SELECT @@global.gtid_executed as gtid_executed;") + require.NoError(t, err) + defer rows.Close() + row := convertMapScanResultToStrings(readNextRow(t, rows)) + if row["gtid_executed"] == nil { + t.Fatal("no value for @@GLOBAL.gtid_executed") + } + return row["gtid_executed"].(string) + } else if isPrimary { + sourceLogFile, sourceLogPos := getPrimaryLogPosition(t, gtidEnabled) + return fmt.Sprintf("%s:%s", sourceLogFile, sourceLogPos) + } else { + sourceLogFile, sourceLogPos := getReplicaLogPosition(t) + return fmt.Sprintf("%s:%s", sourceLogFile, sourceLogPos) } - return row["gtid_executed"].(string) } func readNextRow(t *testing.T, rows *sqlx.Rows) map[string]interface{} { @@ -659,13 +675,56 @@ func stopDuckSqlServer(t *testing.T) { time.Sleep(250 * time.Millisecond) } +func getPrimaryLogPosition(t *testing.T, gtidEnabled bool) (string, string) { + rows, err := primaryDatabase.Queryx("SHOW BINARY LOG STATUS;") + require.NoError(t, err) + primaryStatus := convertMapScanResultToStrings(readNextRow(t, rows)) + sourceLogFile := primaryStatus["File"].(string) + sourceLogPos := primaryStatus["Position"].(string) + require.NoError(t, rows.Close()) + + return sourceLogFile, sourceLogPos +} + +func getReplicaLogPosition(t *testing.T) (string, string) { + rows, err := replicaDatabase.Queryx("SHOW REPLICA STATUS;") + require.NoError(t, err) + replicaStatus := convertMapScanResultToStrings(readNextRow(t, rows)) + executedGtidSet := replicaStatus["Executed_Gtid_Set"].(string) + require.NoError(t, rows.Close()) + + // the executedGtidSet is like the format of "binlog.000002:6757", split it into file and pos + parts := strings.Split(executedGtidSet, ":") + require.Equal(t, 2, len(parts)) + sourceLogFile := parts[0] + sourceLogPos := parts[1] + return sourceLogFile, sourceLogPos +} + // startReplication configures the replication source on the replica and runs the START REPLICA statement. -func startReplication(_ *testing.T, port int) { - replicaDatabase.MustExec( - fmt.Sprintf("change replication source to SOURCE_HOST='localhost', "+ - "SOURCE_USER='replicator', SOURCE_PASSWORD='Zqr8_blrGm1!', "+ - "SOURCE_PORT=%v, SOURCE_AUTO_POSITION=1, SOURCE_CONNECT_RETRY=5;", port)) +func startReplication(t *testing.T, port int) { + gtidEnabled := getGtidEnabled() + + // If GTID is not enabled, we should get the log position by "SHOW MASTER STATUS" first, + // if it failed, try "SHOW BINARY LOG STATUS" instead. Then we extract the log position from + // the result and use it as the source_log_pos when starting the replica. + sourceLogFile := "" + sourceLogPos := "" + if !gtidEnabled { + sourceLogFile, sourceLogPos = getPrimaryLogPosition(t, gtidEnabled) + } + cmdStr := fmt.Sprintf("CHANGE REPLICATION SOURCE TO "+ + "SOURCE_HOST='localhost', "+ + "SOURCE_USER='replicator', "+ + "SOURCE_PASSWORD='Zqr8_blrGm1!', "+ + "SOURCE_PORT=%v, "+ + "SOURCE_AUTO_POSITION=1, "+ + "SOURCE_CONNECT_RETRY=5", port) + if !gtidEnabled { + cmdStr += fmt.Sprintf(", SOURCE_LOG_FILE='%s', SOURCE_LOG_POS=%s", sourceLogFile, sourceLogPos) + } + replicaDatabase.MustExec(cmdStr) replicaDatabase.MustExec("start replica;") } @@ -735,6 +794,17 @@ func findFreePort() int { return freePort } +func getGtidEnabled() bool { + gtidEnabled := strings.ToLower(os.Getenv("GTID_ENABLED")) + if gtidEnabled == "" { + return true + } + if gtidEnabled != "true" && gtidEnabled != "false" { + panic(fmt.Sprintf("GTID_ENABLED environment variable must be 'true' or 'false', got: %s", gtidEnabled)) + } + return gtidEnabled == "true" +} + // startMySqlServer configures a starts a fresh MySQL server instance in a Docker container // and returns the port it is running on. If unable to start up the MySQL server, an error is returned. func startMySqlServer(dir string) (int, string, error) { @@ -743,8 +813,11 @@ func startMySqlServer(dir string) (int, string, error) { // Use a random name for the container to avoid conflicts mySqlContainer = "mysql-test-" + strconv.Itoa(rand.Int()) + gtidEnabled := getGtidEnabled() + // Build the Docker command to start the MySQL container - cmd := exec.Command("docker", "run", + cmdArgs := []string{ + "run", "--rm", // Remove the container when it stops "-d", // Run in detached mode "-p", fmt.Sprintf("%d:3306", mySqlPort), // Map the container's port 3306 to the host's mySqlPort @@ -753,9 +826,15 @@ func startMySqlServer(dir string) (int, string, error) { "--name", mySqlContainer, // Give the container a name "mysql:latest", // Use the latest MySQL image "mysqld", - "--gtid_mode=ON", - "--enforce-gtid-consistency=ON", - ) + } + if gtidEnabled { + cmdArgs = append(cmdArgs, "--gtid_mode=ON", "--enforce-gtid-consistency=ON") + } else { + cmdArgs = append(cmdArgs, "--gtid_mode=OFF", "--enforce-gtid-consistency=OFF") + } + + // Build the Docker command to start the MySQL container + cmd := exec.Command("docker", cmdArgs...) // Execute the Docker command output, err := cmd.CombinedOutput() diff --git a/devtools/replica-setup/checker.sh b/devtools/replica-setup/checker.sh index b39092be..d1cb91fc 100644 --- a/devtools/replica-setup/checker.sh +++ b/devtools/replica-setup/checker.sh @@ -28,8 +28,8 @@ check_server_params() { # Check for each parameter and validate their values binlog_format=$(echo "$result" | grep -i "binlog_format" | awk '{print $2}') enforce_gtid_consistency=$(echo "$result" | grep -i "enforce_gtid_consistency" | awk '{print $2}') - gtid_mode=$(echo "$result" | grep -i "gtid_mode" | awk '{print $2}') - gtid_strict_mode=$(echo "$result" | grep -i "gtid_strict_mode" | awk '{print $2}') + gtid_mode=$(echo "$result" | grep -i "gtid_mode" | awk '{print $2}' | tr '[:lower:]' '[:upper:]') + gtid_strict_mode=$(echo "$result" | grep -i "gtid_strict_mode" | awk '{print $2}' | tr '[:lower:]' '[:upper:]') log_bin=$(echo "$result" | grep -i "log_bin" | awk '{print $2}') # Validate binlog_format @@ -39,24 +39,15 @@ check_server_params() { fi # MariaDB use gtid_strict_mode instead of gtid_mode - if [[ -z "$gtid_strict_mode" ]]; then - # Validate enforce_gtid_consistency (for MySQL) - if [[ "$enforce_gtid_consistency" != "ON" ]]; then - echo "Error: enforce_gtid_consistency is not set to 'ON', it is set to '$enforce_gtid_consistency'." - return 1 - fi - - # Validate gtid_mode (for MySQL) - if [[ "$gtid_mode" != "ON" ]]; then - echo "Error: gtid_mode is not set to 'ON', it is set to '$gtid_mode'." - return 1 - fi - else - # Validate gtid_strict_mode (for MariaDB) - if [[ "$gtid_strict_mode" != "ON" ]]; then - echo "Error: gtid_strict_mode is not set to 'ON', it is set to '$gtid_strict_mode'." - return 1 - fi + if [[ "$gtid_strict_mode" == "OFF" || (-z "$gtid_strict_mode" && "${gtid_mode}" =~ ^OFF) ]]; then + GTID_MODE="OFF" + echo "GTID_MODE: $GTID_MODE" + fi + + # If gtid_strict_mode is empty, check gtid_mode. If it's not OFF, then enforce_gtid_consistency must be ON + if [[ -z "$gtid_strict_mode" && $GTID_MODE == "ON" && "$enforce_gtid_consistency" != "ON" ]]; then + echo "Error: gtid_mode is not set to 'OFF', it is set to '$gtid_mode'. enforce_gtid_consistency must be 'ON'." + return 1 fi # Validate log_bin diff --git a/devtools/replica-setup/replica_setup.sh b/devtools/replica-setup/replica_setup.sh index 41a5ef12..21ede034 100644 --- a/devtools/replica-setup/replica_setup.sh +++ b/devtools/replica-setup/replica_setup.sh @@ -11,6 +11,7 @@ MYDUCK_USER=${MYDUCK_USER:-root} MYDUCK_PASSWORD=${MYDUCK_PASSWORD:-} MYDUCK_SERVER_ID=${MYDUCK_SERVER_ID:-2} MYDUCK_IN_DOCKER=${MYDUCK_IN_DOCKER:-false} +GTID_MODE="ON" while [[ $# -gt 0 ]]; do case $1 in diff --git a/devtools/replica-setup/snapshot.sh b/devtools/replica-setup/snapshot.sh index 85abbd75..2e214dc6 100644 --- a/devtools/replica-setup/snapshot.sh +++ b/devtools/replica-setup/snapshot.sh @@ -36,18 +36,39 @@ echo "Thread count set to: $THREAD_COUNT" echo "Copying data from MySQL to MyDuck..." # Run mysqlsh command and capture the output -output=$(mysqlsh -h${MYSQL_HOST} -P${MYSQL_PORT} -u${MYSQL_USER} -p${MYSQL_PASSWORD} -- util copy-instance "mysql://${MYDUCK_USER}:${MYDUCK_PASSWORD}@${MYDUCK_HOST}:${MYDUCK_PORT}" --users false --consistent false --ignore-existing-objects true --handle-grant-errors ignore --threads $THREAD_COUNT --bytesPerChunk 256M --ignore-version true) +output=$(mysqlsh --host=${MYSQL_HOST} --port=${MYSQL_PORT} --user=${MYSQL_USER} --password=${MYSQL_PASSWORD} -- util copy-instance "mysql://${MYDUCK_USER}:${MYDUCK_PASSWORD}@${MYDUCK_HOST}:${MYDUCK_PORT}" --users false --consistent false --ignore-existing-objects true --handle-grant-errors ignore --threads $THREAD_COUNT --bytesPerChunk 256M --ignore-version true) -# Extract the EXECUTED_GTID_SET using grep and awk -EXECUTED_GTID_SET=$(echo "$output" | grep -i "EXECUTED_GTID_SET" | awk '{print $2}') +if [[ $GTID_MODE == "ON" ]]; then + # Extract the EXECUTED_GTID_SET from this output: + # Executed_GTID_set: 369107a6-a0a5-11ef-a255-0242ac110008:1-10 + EXECUTED_GTID_SET=$(echo "$output" | grep -i "EXECUTED_GTID_SET" | awk '{print $2}') -# Check if EXECUTED_GTID_SET is empty -if [ -z "$EXECUTED_GTID_SET" ]; then - echo "EXECUTED_GTID_SET is empty, exiting." - exit 1 + # Check if EXECUTED_GTID_SET is empty + if [ -z "$EXECUTED_GTID_SET" ]; then + echo "EXECUTED_GTID_SET is empty, exiting." + exit 1 + fi + + # If not empty, print the extracted GTID set + echo "EXECUTED_GTID_SET: $EXECUTED_GTID_SET" +else + # Extract the BINLOG_FILE and BINLOG_POS from this output: + # Binlog_file: binlog.000002 + # Binlog_position: 3763 + # Executed_GTID_set: '' + BINLOG_FILE=$(echo "$output" | grep -i "Binlog_file" | awk '{print $2}') + BINLOG_POS=$(echo "$output" | grep -i "Binlog_position" | awk '{print $2}') + + # Check if BINLOG_FILE and BINLOG_POS are empty + if [ -z "$BINLOG_FILE" ] || [ -z "$BINLOG_POS" ]; then + echo "BINLOG_FILE or BINLOG_POS is empty, exiting." + exit 1 + fi + + # If not empty, print the extracted BINLOG_FILE and BINLOG_POS + echo "BINLOG_FILE: $BINLOG_FILE" + echo "BINLOG_POS: $BINLOG_POS" fi -# If not empty, print the extracted GTID set -echo "EXECUTED_GTID_SET: $EXECUTED_GTID_SET" echo "Snapshot completed successfully." \ No newline at end of file diff --git a/devtools/replica-setup/start_replication.sh b/devtools/replica-setup/start_replication.sh index 788e44d7..8e291b88 100644 --- a/devtools/replica-setup/start_replication.sh +++ b/devtools/replica-setup/start_replication.sh @@ -18,20 +18,27 @@ else fi # Use the EXECUTED_GTID_SET variable from the previous steps -if [ ! -z "$EXECUTED_GTID_SET" ]; then +if [ $GTID_MODE == "ON" ] && [ ! -z "$EXECUTED_GTID_SET" ]; then mysqlsh --sql --host=${MYDUCK_HOST} --port=${MYDUCK_PORT} --user=root --no-password < github.com/apecloud/go-mysql-server v0.0.0-20241112031328-30cddba3eea7 - github.com/dolthub/vitess v0.0.0-20241111235433-a20a5ab9d7c9 => github.com/apecloud/dolt-vitess v0.0.0-20241112063127-f62e98a9936a + github.com/dolthub/go-mysql-server v0.18.2-0.20241112002228-81b13e8034f2 => github.com/apecloud/go-mysql-server v0.0.0-20241113072459-9ad423c065f8 + github.com/dolthub/vitess v0.0.0-20241111235433-a20a5ab9d7c9 => github.com/apecloud/dolt-vitess v0.0.0-20241113031931-99ad35228a58 github.com/marcboeker/go-duckdb v1.8.3 => github.com/apecloud/go-duckdb v0.0.0-20241113073916-47619770e595 ) @@ -41,7 +43,6 @@ require ( github.com/biogo/store v0.0.0-20201120204734-aad293a2328f // indirect github.com/blevesearch/snowballstem v0.9.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect - github.com/cockroachdb/errors v1.9.0 // indirect github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f // indirect github.com/cockroachdb/redact v1.1.3 // indirect github.com/dave/dst v0.27.2 // indirect @@ -60,7 +61,6 @@ require ( github.com/golang/protobuf v1.5.4 // indirect github.com/google/flatbuffers v24.3.25+incompatible // indirect github.com/google/go-cmp v0.6.0 // indirect - github.com/google/uuid v1.6.0 // indirect github.com/gorilla/mux v1.8.1 // indirect github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect github.com/hashicorp/golang-lru v1.0.2 // indirect diff --git a/go.sum b/go.sum index c7f6b982..e74050c6 100644 --- a/go.sum +++ b/go.sum @@ -49,14 +49,12 @@ github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.21.0 h1:tdPmh/ptjE1IJnhbhrcl2++TauVjy242rkV/UzJChnE= github.com/apache/thrift v0.21.0/go.mod h1:W1H8aR/QRtYNvrPeFXBtobyRkd0/YVhTc6i07XIAgDw= -github.com/apecloud/dolt-vitess v0.0.0-20241107081545-d894da3857d8 h1:OKsyuwps5eKiUa4GHn35O8kq8R+Tf2/iUYNo3f3SoCc= -github.com/apecloud/dolt-vitess v0.0.0-20241107081545-d894da3857d8/go.mod h1:uBvlRluuL+SbEWTCZ68o0xvsdYZER3CEG/35INdzfJM= +github.com/apecloud/dolt-vitess v0.0.0-20241113031931-99ad35228a58 h1:B7QNvHZxwoXvxOO8c48eQYrBG0cjpsFP6FZESe7ncaQ= +github.com/apecloud/dolt-vitess v0.0.0-20241113031931-99ad35228a58/go.mod h1:uBvlRluuL+SbEWTCZ68o0xvsdYZER3CEG/35INdzfJM= github.com/apecloud/go-duckdb v0.0.0-20241113073916-47619770e595 h1:zAJgtlElXKLbo3HgZmFvfc96vSWGwTqAJphwFarz6Os= github.com/apecloud/go-duckdb v0.0.0-20241113073916-47619770e595/go.mod h1:C9bYRE1dPYb1hhfu/SSomm78B0FXmNgRvv6YBW/Hooc= -github.com/apecloud/dolt-vitess v0.0.0-20241112063127-f62e98a9936a h1:2D9spsdHL5yqHqxghc7FrTfknswMbiUCCJ1Ci3WaIPY= -github.com/apecloud/dolt-vitess v0.0.0-20241112063127-f62e98a9936a/go.mod h1:uBvlRluuL+SbEWTCZ68o0xvsdYZER3CEG/35INdzfJM= -github.com/apecloud/go-mysql-server v0.0.0-20241112031328-30cddba3eea7 h1:nlBHJDxPrUaDpKkS1xj78C0o/hdU5O3RMwOlBJC+U2k= -github.com/apecloud/go-mysql-server v0.0.0-20241112031328-30cddba3eea7/go.mod h1:SXieGXBXf24LLsXZ8uKu8LaFPfBYgcASfU7wNmWYlUg= +github.com/apecloud/go-mysql-server v0.0.0-20241113072459-9ad423c065f8 h1:RzzXe1S0g49GrgfQ/AL2a+hrEqcXJM8Bv4NxPpzK+VI= +github.com/apecloud/go-mysql-server v0.0.0-20241113072459-9ad423c065f8/go.mod h1:SXieGXBXf24LLsXZ8uKu8LaFPfBYgcASfU7wNmWYlUg= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= @@ -141,14 +139,10 @@ github.com/dolthub/flatbuffers/v23 v23.3.3-dh.2 h1:u3PMzfF8RkKd3lB9pZ2bfn0qEG+1G github.com/dolthub/flatbuffers/v23 v23.3.3-dh.2/go.mod h1:mIEZOHnFx4ZMQeawhw9rhsj+0zwQj7adVsnBX7t+eKY= github.com/dolthub/go-icu-regex v0.0.0-20240916130659-0118adc6b662 h1:aC17hZD6iwzBwwfO5M+3oBT5E5gGRiQPdn+vzpDXqIA= github.com/dolthub/go-icu-regex v0.0.0-20240916130659-0118adc6b662/go.mod h1:KPUcpx070QOfJK1gNe0zx4pA5sicIK1GMikIGLKC168= -github.com/dolthub/go-mysql-server v0.18.2-0.20241112002228-81b13e8034f2 h1:1ax2e+4r9ax5eiowBEIfRX7K/oZLeWxNNtt88CgnO0I= -github.com/dolthub/go-mysql-server v0.18.2-0.20241112002228-81b13e8034f2/go.mod h1:sOMQzWUvHvJECzpcUxjDgV5BR/A7U+hOh596PUO2NPI= github.com/dolthub/jsonpath v0.0.2-0.20240227200619-19675ab05c71 h1:bMGS25NWAGTEtT5tOBsCuCrlYnLRKpbJVJkDbrTRhwQ= github.com/dolthub/jsonpath v0.0.2-0.20240227200619-19675ab05c71/go.mod h1:2/2zjLQ/JOOSbbSboojeg+cAwcRV0fDLzIiWch/lhqI= github.com/dolthub/sqllogictest/go v0.0.0-20240618184124-ca47f9354216 h1:JWkKRE4EHUcEVQCMRBej8DYxjYjRz/9MdF/NNQh0o70= github.com/dolthub/sqllogictest/go v0.0.0-20240618184124-ca47f9354216/go.mod h1:e/FIZVvT2IR53HBCAo41NjqgtEnjMJGKca3Y/dAmZaA= -github.com/dolthub/vitess v0.0.0-20241111235433-a20a5ab9d7c9 h1:s36zDuLPuZRWC0nBCJs2Z8joP19eKEtcsIsuE8K9Kx0= -github.com/dolthub/vitess v0.0.0-20241111235433-a20a5ab9d7c9/go.mod h1:uBvlRluuL+SbEWTCZ68o0xvsdYZER3CEG/35INdzfJM= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= @@ -167,8 +161,6 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7 github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw= github.com/fanixk/geohash v0.0.0-20150324002647-c1f9b5fa157a h1:Fyfh/dsHFrC6nkX7H7+nFdTd1wROlX/FxEIWVpKYf1U= github.com/fanixk/geohash v0.0.0-20150324002647-c1f9b5fa157a/go.mod h1:UgNw+PTmmGN8rV7RvjvnBMsoTU8ZXXnaT3hYsDTBlgQ= -github.com/fanyang01/go-mysql-server v0.0.0-20241107083514-e7c2ac20ddba h1:o7t/gFGKP2Hns5JUvbGjL+eTyIrDVI7QnGdsVubsPqE= -github.com/fanyang01/go-mysql-server v0.0.0-20241107083514-e7c2ac20ddba/go.mod h1:IvF70qil4jA4trQvcl4tsNdY4AgL94F2vWzoxAM2zFo= github.com/fasthttp-contrib/websocket v0.0.0-20160511215533-1f3b11f56072/go.mod h1:duJ4Jxv5lDcvg4QuQr0oowTf7dz4/CR8NtyCooz9HL8= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= @@ -402,8 +394,6 @@ github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-b github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= -github.com/marcboeker/go-duckdb v1.8.3 h1:ZkYwiIZhbYsT6MmJsZ3UPTHrTZccDdM4ztoqSlEMXiQ= -github.com/marcboeker/go-duckdb v1.8.3/go.mod h1:C9bYRE1dPYb1hhfu/SSomm78B0FXmNgRvv6YBW/Hooc= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=