From 003e2d1411b550a302be188a767878b9b9094433 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Mon, 27 Jan 2025 17:46:22 -0800 Subject: [PATCH 1/2] go: binlogreplication: Add Session{{Begin,End}Command,End} lifecycle callbacks to the replica controller execution context session. This makes small clean ups to the lifecycle around replica applier and the ownership and lifecycle of the mysql.Conn read connection. This PR also includes some changes to slightly improve the performance and reliability of the tests when running them locally. In particular, some of the changes include: 1) Since `go run ./cmd/dolt` takes about four seconds to validate the existing cached build on my laptop, we just go ahead and use a cached build everywhere. 2) We use t.Log{f,} instead of fmt.Prin.. to improve the ergonomics of test running and getting output from a failure in particular. 3) We try to minimize global process state changes like unnecessary `os.Chdir` calls, since it would be nice to parallelize these tests eventually. 4) We get rid of the unused and seemingly unnecessary --socket= argument to Dolt, where we had to use a directory not corresponding to $TMPDIR, for example, because max pathlength on a sun_path on MacOS is 104 characters or whatever. --- .../binlog_metadata_persistence.go | 4 +- .../binlogreplication/binlog_primary_test.go | 10 +- .../binlog_replica_applier.go | 50 +++- .../binlog_replica_controller.go | 20 +- .../binlog_replica_event_producer.go | 35 ++- .../binlog_replication_restart_test.go | 2 +- .../binlog_replication_test.go | 248 ++++++++---------- 7 files changed, 199 insertions(+), 170 deletions(-) diff --git a/go/libraries/doltcore/sqle/binlogreplication/binlog_metadata_persistence.go b/go/libraries/doltcore/sqle/binlogreplication/binlog_metadata_persistence.go index 01c6b38d0fa..3737498ff1f 100644 --- a/go/libraries/doltcore/sqle/binlogreplication/binlog_metadata_persistence.go +++ b/go/libraries/doltcore/sqle/binlogreplication/binlog_metadata_persistence.go @@ -104,7 +104,9 @@ func persistReplicaRunningState(ctx *sql.Context, state replicaRunningState) err // loadReplicationConfiguration loads the replication configuration for default channel ("") from // the "mysql" database, |mysqlDb|. -func loadReplicationConfiguration(_ *sql.Context, mysqlDb *mysql_db.MySQLDb) (*mysql_db.ReplicaSourceInfo, error) { +func loadReplicationConfiguration(ctx *sql.Context, mysqlDb *mysql_db.MySQLDb) (*mysql_db.ReplicaSourceInfo, error) { + sql.SessionCommandBegin(ctx.Session) + defer sql.SessionCommandEnd(ctx.Session) rd := mysqlDb.Reader() defer rd.Close() diff --git a/go/libraries/doltcore/sqle/binlogreplication/binlog_primary_test.go b/go/libraries/doltcore/sqle/binlogreplication/binlog_primary_test.go index 02cb6a0a77e..9b8770befa1 100644 --- a/go/libraries/doltcore/sqle/binlogreplication/binlog_primary_test.go +++ b/go/libraries/doltcore/sqle/binlogreplication/binlog_primary_test.go @@ -430,7 +430,7 @@ func TestBinlogPrimary_ReplicaRestart(t *testing.T) { // Restart the MySQL replica and reconnect to the Dolt primary prevPrimaryDatabase := primaryDatabase var err error - mySqlPort, mySqlProcess, err = startMySqlServer(testDir) + mySqlPort, mySqlProcess, err = startMySqlServer(t, testDir) require.NoError(t, err) replicaDatabase = primaryDatabase primaryDatabase = prevPrimaryDatabase @@ -1042,7 +1042,7 @@ func outputReplicaApplierStatus(t *testing.T) { newRows, err := replicaDatabase.Queryx("select * from performance_schema.replication_applier_status_by_worker;") require.NoError(t, err) allNewRows := readAllRowsIntoMaps(t, newRows) - fmt.Printf("\n\nreplication_applier_status_by_worker: %v\n", allNewRows) + t.Logf("\n\nreplication_applier_status_by_worker: %v\n", allNewRows) } // outputShowReplicaStatus prints out replica status information. This is useful for debugging @@ -1052,7 +1052,7 @@ func outputShowReplicaStatus(t *testing.T) { newRows, err := replicaDatabase.Queryx("show replica status;") require.NoError(t, err) allNewRows := readAllRowsIntoMaps(t, newRows) - fmt.Printf("\n\nSHOW REPLICA STATUS: %v\n", allNewRows) + t.Logf("\n\nSHOW REPLICA STATUS: %v\n", allNewRows) } // copyMap returns a copy of the specified map |m|. @@ -1098,7 +1098,7 @@ func waitForReplicaToReconnect(t *testing.T) { func mustRestartDoltPrimaryServer(t *testing.T) { var err error prevReplicaDatabase := replicaDatabase - doltPort, doltProcess, err = startDoltSqlServer(testDir, nil) + doltPort, doltProcess, err = startDoltSqlServer(t, testDir, nil) require.NoError(t, err) primaryDatabase = replicaDatabase replicaDatabase = prevReplicaDatabase @@ -1109,7 +1109,7 @@ func mustRestartDoltPrimaryServer(t *testing.T) { func mustRestartMySqlReplicaServer(t *testing.T) { var err error prevPrimaryDatabase := primaryDatabase - mySqlPort, mySqlProcess, err = startMySqlServer(testDir) + mySqlPort, mySqlProcess, err = startMySqlServer(t, testDir) require.NoError(t, err) replicaDatabase = primaryDatabase primaryDatabase = prevPrimaryDatabase diff --git a/go/libraries/doltcore/sqle/binlogreplication/binlog_replica_applier.go b/go/libraries/doltcore/sqle/binlogreplication/binlog_replica_applier.go index 3833aecc6aa..36bce2c5767 100644 --- a/go/libraries/doltcore/sqle/binlogreplication/binlog_replica_applier.go +++ b/go/libraries/doltcore/sqle/binlogreplication/binlog_replica_applier.go @@ -19,6 +19,7 @@ import ( "io" "strconv" "strings" + "sync" "sync/atomic" "time" @@ -63,6 +64,7 @@ type binlogReplicaApplier struct { currentPosition *mysql.Position // successfully executed GTIDs filters *filterConfiguration running atomic.Bool + handlerWg sync.WaitGroup engine *gms.Engine dbsWithUncommittedChanges map[string]struct{} } @@ -88,10 +90,14 @@ const rowFlag_rowsAreComplete = 0x0008 // Go spawns a new goroutine to run the applier's binlog event handler. func (a *binlogReplicaApplier) Go(ctx *sql.Context) { + if !a.running.CompareAndSwap(false, true) { + panic("attempt to start binlogReplicaApplier while it is already running") + } + a.handlerWg.Add(1) go func() { - a.running.Store(true) + defer a.handlerWg.Done() + defer a.running.Store(false) err := a.replicaBinlogEventHandler(ctx) - a.running.Store(false) if err != nil { ctx.GetLogger().Errorf("unexpected error of type %T: '%v'", err, err.Error()) DoltBinlogReplicaController.setSqlError(mysql.ERUnknownError, err.Error()) @@ -104,6 +110,27 @@ func (a *binlogReplicaApplier) IsRunning() bool { return a.running.Load() } +// Stop will shutdown the replication thread if it is running. This is not safe to call concurrently |Go|. +// This is used by the controller when implementing STOP REPLICA, but it is also used on shutdown when the +// replication thread should be shutdown cleanly in the event that it is still running. +func (a *binlogReplicaApplier) Stop() { + if a.IsRunning() { + // We jump through some hoops here. It is not the case that the replication thread + // is guaranteed to read from |stopReplicationChan|. Instead, it can exit on its + // own with an error, for example, after exceeding connection retry attempts. + done := make(chan struct{}) + go func() { + defer close(done) + a.handlerWg.Wait() + }() + select { + case a.stopReplicationChan <- struct{}{}: + case <-done: + } + a.handlerWg.Wait() + } +} + // connectAndStartReplicationEventStream connects to the configured MySQL replication source, including pausing // and retrying if errors are encountered. func (a *binlogReplicaApplier) connectAndStartReplicationEventStream(ctx *sql.Context) (*mysql.Conn, error) { @@ -263,25 +290,21 @@ func (a *binlogReplicaApplier) startReplicationEventStream(ctx *sql.Context, con func (a *binlogReplicaApplier) replicaBinlogEventHandler(ctx *sql.Context) error { engine := a.engine - var conn *mysql.Conn var eventProducer *binlogEventProducer // Process binlog events for { - if conn == nil { + if eventProducer == nil { ctx.GetLogger().Debug("no binlog connection to source, attempting to establish one") - if eventProducer != nil { - eventProducer.Stop() - } - var err error - if conn, err = a.connectAndStartReplicationEventStream(ctx); err == ErrReplicationStopped { + if conn, err := a.connectAndStartReplicationEventStream(ctx); err == ErrReplicationStopped { return nil } else if err != nil { return err + } else { + eventProducer = newBinlogEventProducer(conn) + eventProducer.Go(ctx) } - eventProducer = newBinlogEventProducer(conn) - eventProducer.Go(ctx) } select { @@ -305,8 +328,6 @@ func (a *binlogReplicaApplier) replicaBinlogEventHandler(ctx *sql.Context) error }) eventProducer.Stop() eventProducer = nil - conn.Close() - conn = nil } } else { // otherwise, log the error if it's something we don't expect and continue @@ -317,6 +338,7 @@ func (a *binlogReplicaApplier) replicaBinlogEventHandler(ctx *sql.Context) error case <-a.stopReplicationChan: ctx.GetLogger().Trace("received stop replication signal") eventProducer.Stop() + eventProducer = nil return nil } } @@ -325,6 +347,8 @@ func (a *binlogReplicaApplier) replicaBinlogEventHandler(ctx *sql.Context) error // processBinlogEvent processes a single binlog event message and returns an error if there were any problems // processing it. func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.Engine, event mysql.BinlogEvent) error { + sql.SessionCommandBegin(ctx.Session) + defer sql.SessionCommandEnd(ctx.Session) var err error createCommit := false diff --git a/go/libraries/doltcore/sqle/binlogreplication/binlog_replica_controller.go b/go/libraries/doltcore/sqle/binlogreplication/binlog_replica_controller.go index 8e22a09cd92..5ea8edd9681 100644 --- a/go/libraries/doltcore/sqle/binlogreplication/binlog_replica_controller.go +++ b/go/libraries/doltcore/sqle/binlogreplication/binlog_replica_controller.go @@ -157,7 +157,9 @@ func (d *doltBinlogReplicaController) StartReplica(ctx *sql.Context) error { // changes and execute DDL statements on the running server. If the account doesn't exist, it will be // created and locked to disable log ins, and if it does exist, but is missing super privs or is not // locked, it will be given superuser privs and locked. -func (d *doltBinlogReplicaController) configureReplicationUser(_ *sql.Context) { +func (d *doltBinlogReplicaController) configureReplicationUser(ctx *sql.Context) { + sql.SessionCommandBegin(ctx.Session) + defer sql.SessionCommandEnd(ctx.Session) mySQLDb := d.engine.Analyzer.Catalog.MySQLDb ed := mySQLDb.Editor() defer ed.Close() @@ -180,12 +182,15 @@ func (d *doltBinlogReplicaController) SetEngine(engine *sqle.Engine) { // StopReplica implements the BinlogReplicaController interface. func (d *doltBinlogReplicaController) StopReplica(ctx *sql.Context) error { + d.operationMutex.Lock() + defer d.operationMutex.Unlock() + if d.applier.IsRunning() == false { ctx.Warn(3084, "Replication thread(s) for channel '' are already stopped.") return nil } - d.applier.stopReplicationChan <- struct{}{} + d.applier.Stop() d.updateStatus(func(status *binlogreplication.ReplicaStatus) { status.ReplicaIoRunning = binlogreplication.ReplicaIoNotRunning @@ -428,6 +433,17 @@ func (d *doltBinlogReplicaController) AutoStart(_ context.Context) error { return d.StartReplica(d.ctx) } +// Release all resources, such as replication threads, associated with the replication. +// This can only be done once in the lifecycle of the instance. Because DoltBinlogReplicaController +// is currently a global singleton, this should only be done once in the lifecycle of the +// application. +func (d *doltBinlogReplicaController) Close() { + d.applier.Stop() + if d.ctx != nil { + sql.SessionEnd(d.ctx.Session) + } +} + // // Helper functions // diff --git a/go/libraries/doltcore/sqle/binlogreplication/binlog_replica_event_producer.go b/go/libraries/doltcore/sqle/binlogreplication/binlog_replica_event_producer.go index 34f45eb3abf..872a7ccba31 100644 --- a/go/libraries/doltcore/sqle/binlogreplication/binlog_replica_event_producer.go +++ b/go/libraries/doltcore/sqle/binlogreplication/binlog_replica_event_producer.go @@ -15,6 +15,7 @@ package binlogreplication import ( + "sync" "sync/atomic" "github.com/dolthub/go-mysql-server/sql" @@ -30,19 +31,24 @@ type binlogEventProducer struct { conn *mysql.Conn errorChan chan error eventChan chan mysql.BinlogEvent + closeChan chan struct{} + wg sync.WaitGroup running atomic.Bool } // newBinlogEventProducer creates a new binlog event producer that reads from the specified, established MySQL // connection |conn|. The returned binlogEventProducer owns the communication channels // and is responsible for closing them when the binlogEventProducer is stopped. +// +// The BinlogEventProducer will take ownership of the supplied |*Conn| instance and +// will |Close| it when the producer itself exits. func newBinlogEventProducer(conn *mysql.Conn) *binlogEventProducer { producer := &binlogEventProducer{ conn: conn, eventChan: make(chan mysql.BinlogEvent), errorChan: make(chan error), + closeChan: make(chan struct{}), } - producer.running.Store(true) return producer } @@ -61,7 +67,14 @@ func (p *binlogEventProducer) ErrorChan() <-chan error { // Go starts this binlogEventProducer in a new goroutine. Right before this routine exits, it will close the // two communication channels it owns. func (p *binlogEventProducer) Go(_ *sql.Context) { + if !p.running.CompareAndSwap(false, true) { + panic("attempt to start binlogEventProducer more than once.") + } + p.wg.Add(1) go func() { + defer p.wg.Done() + defer close(p.errorChan) + defer close(p.eventChan) for p.IsRunning() { // ReadBinlogEvent blocks until a binlog event can be read and returned, so this has to be done on a // separate thread, otherwise the applier would be blocked and wouldn't be able to handle the STOP @@ -75,13 +88,19 @@ func (p *binlogEventProducer) Go(_ *sql.Context) { } if err != nil { - p.errorChan <- err + select { + case p.errorChan <- err: + case <-p.closeChan: + return + } } else { - p.eventChan <- event + select { + case p.eventChan <- event: + case <-p.closeChan: + return + } } } - close(p.errorChan) - close(p.eventChan) }() } @@ -92,5 +111,9 @@ func (p *binlogEventProducer) IsRunning() bool { // Stop requests for this binlogEventProducer to stop processing events as soon as possible. func (p *binlogEventProducer) Stop() { - p.running.Store(false) + if p.running.CompareAndSwap(true, false) { + p.conn.Close() + close(p.closeChan) + } + p.wg.Wait() } diff --git a/go/libraries/doltcore/sqle/binlogreplication/binlog_replication_restart_test.go b/go/libraries/doltcore/sqle/binlogreplication/binlog_replication_restart_test.go index 70810019f7c..1ebb0a046ca 100644 --- a/go/libraries/doltcore/sqle/binlogreplication/binlog_replication_restart_test.go +++ b/go/libraries/doltcore/sqle/binlogreplication/binlog_replication_restart_test.go @@ -49,7 +49,7 @@ func TestBinlogReplicationServerRestart(t *testing.T) { time.Sleep(1000 * time.Millisecond) var err error - doltPort, doltProcess, err = startDoltSqlServer(testDir, nil) + doltPort, doltProcess, err = startDoltSqlServer(t, testDir, nil) require.NoError(t, err) // Check replication status on the replica and assert configuration persisted diff --git a/go/libraries/doltcore/sqle/binlogreplication/binlog_replication_test.go b/go/libraries/doltcore/sqle/binlogreplication/binlog_replication_test.go index 9231185dcdc..55eae2086ed 100644 --- a/go/libraries/doltcore/sqle/binlogreplication/binlog_replication_test.go +++ b/go/libraries/doltcore/sqle/binlogreplication/binlog_replication_test.go @@ -29,6 +29,7 @@ import ( "slices" "strconv" "strings" + "sync" "syscall" "testing" "time" @@ -47,7 +48,6 @@ var mySqlProcess, doltProcess *os.Process var doltLogFilePath, oldDoltLogFilePath, mysqlLogFilePath string var doltLogFile, mysqlLogFile *os.File var testDir string -var originalWorkingDir string // doltReplicaSystemVars are the common system variables that need // to be set on a Dolt replica before replication is turned on. @@ -55,6 +55,48 @@ var doltReplicaSystemVars = map[string]string{ "server_id": "42", } +func TestMain(m *testing.M) { + res := func() int { + defer func() { + cachedDoltDevBuildPathOnce.Do(func() {}) + if cachedDoltDevBuildPath != "" { + os.RemoveAll(filepath.Dir(cachedDoltDevBuildPath)) + } + }() + return m.Run() + }() + os.Exit(res) +} + +var cachedDoltDevBuildPath string +var cachedDoltDevBuildPathOnce sync.Once + +func DoltDevBuildPath() string { + cachedDoltDevBuildPathOnce.Do(func() { + tmp, err := os.MkdirTemp("", "binlog-replication-doltbin-") + if err != nil { + panic(err) + } + fullpath := filepath.Join(tmp, "dolt") + + originalWorkingDir, err := os.Getwd() + if err != nil { + panic(err) + } + + goDirPath := filepath.Join(originalWorkingDir, "..", "..", "..", "..") + + cmd := exec.Command("go", "build", "-o", fullpath, "./cmd/dolt") + cmd.Dir = goDirPath + output, err := cmd.CombinedOutput() + if err != nil { + panic("unable to build dolt for binlog integration tests: " + err.Error() + "\nFull output: " + string(output) + "\n") + } + cachedDoltDevBuildPath = fullpath + }) + return cachedDoltDevBuildPath +} + func teardown(t *testing.T) { if mySqlProcess != nil { stopMySqlServer(t) @@ -72,17 +114,17 @@ func teardown(t *testing.T) { // Output server logs on failure for easier debugging if t.Failed() { if oldDoltLogFilePath != "" { - fmt.Printf("\nDolt server log from %s:\n", oldDoltLogFilePath) - printFile(oldDoltLogFilePath) + t.Logf("\nDolt server log from %s:\n", oldDoltLogFilePath) + printFile(t, oldDoltLogFilePath) } - fmt.Printf("\nDolt server log from %s:\n", doltLogFilePath) - printFile(doltLogFilePath) - fmt.Printf("\nMySQL server log from %s:\n", mysqlLogFilePath) - printFile(mysqlLogFilePath) + t.Logf("\nDolt server log from %s:\n", doltLogFilePath) + printFile(t, doltLogFilePath) + t.Logf("\nMySQL server log from %s:\n", mysqlLogFilePath) + printFile(t, mysqlLogFilePath) mysqlErrorLogFilePath := filepath.Join(filepath.Dir(mysqlLogFilePath), "error_log.err") - fmt.Printf("\nMySQL server error log from %s:\n", mysqlErrorLogFilePath) - printFile(mysqlErrorLogFilePath) + t.Logf("\nMySQL server error log from %s:\n", mysqlErrorLogFilePath) + printFile(t, mysqlErrorLogFilePath) } else { // clean up temp files on clean test runs defer os.RemoveAll(testDir) @@ -194,7 +236,7 @@ func TestAutoRestartReplica(t *testing.T) { // Restart the Dolt replica stopDoltSqlServer(t) var err error - doltPort, doltProcess, err = startDoltSqlServer(testDir, nil) + doltPort, doltProcess, err = startDoltSqlServer(t, testDir, nil) require.NoError(t, err) // Assert that some test data replicates correctly @@ -218,7 +260,7 @@ func TestAutoRestartReplica(t *testing.T) { // Restart the Dolt replica stopDoltSqlServer(t) - doltPort, doltProcess, err = startDoltSqlServer(testDir, nil) + doltPort, doltProcess, err = startDoltSqlServer(t, testDir, nil) require.NoError(t, err) // SHOW REPLICA STATUS should show that replication is NOT running, with no errors @@ -590,11 +632,13 @@ func TestCharsetsAndCollations(t *testing.T) { // Test Helper Functions // -// waitForReplicaToCatchUp waits (up to 30s) for the replica to catch up with the primary database. The -// lag is measured by checking that gtid_executed is the same on the primary and replica. +// waitForReplicaToCatchUp waits for the replica to catch up with the primary database. The +// lag is measured by checking that gtid_executed is the same on the primary and replica. If +// no progress is made in 30 seconds, this function will fail the test. func waitForReplicaToCatchUp(t *testing.T) { timeLimit := 30 * time.Second + lastReplicaGtid := "" endTime := time.Now().Add(timeLimit) for time.Now().Before(endTime) { replicaGtid := queryGtid(t, replicaDatabase) @@ -602,8 +646,11 @@ func waitForReplicaToCatchUp(t *testing.T) { if primaryGtid == replicaGtid { return + } else if lastReplicaGtid != replicaGtid { + lastReplicaGtid = replicaGtid + endTime = time.Now().Add(timeLimit) } else { - fmt.Printf("primary and replica not in sync yet... (primary: %s, replica: %s)\n", primaryGtid, replicaGtid) + t.Logf("primary and replica not in sync yet... (primary: %s, replica: %s)\n", primaryGtid, replicaGtid) time.Sleep(250 * time.Millisecond) } } @@ -639,7 +686,7 @@ func waitForReplicaToReachGtid(t *testing.T, target int) { } } - fmt.Printf("replica has not reached transaction %d yet; currently at: %s \n", target, replicaGtid) + t.Logf("replica has not reached transaction %d yet; currently at: %s \n", target, replicaGtid) } t.Fatal("replica did not reach target GTID within " + timeLimit.String()) @@ -725,20 +772,13 @@ func startSqlServersWithDoltSystemVars(t *testing.T, doltPersistentSystemVars ma testDir = filepath.Join(os.TempDir(), fmt.Sprintf("%s-%v", t.Name(), time.Now().Unix())) err := os.MkdirAll(testDir, 0777) - - cmd := exec.Command("chmod", "777", testDir) - _, err = cmd.Output() - if err != nil { - panic(err) - } - require.NoError(t, err) - fmt.Printf("temp dir: %v \n", testDir) + t.Logf("temp dir: %v \n", testDir) // Start up primary and replica databases - mySqlPort, mySqlProcess, err = startMySqlServer(testDir) + mySqlPort, mySqlProcess, err = startMySqlServer(t, testDir) require.NoError(t, err) - doltPort, doltProcess, err = startDoltSqlServer(testDir, doltPersistentSystemVars) + doltPort, doltProcess, err = startDoltSqlServer(t, testDir, doltPersistentSystemVars) require.NoError(t, err) } @@ -856,25 +896,9 @@ func findFreePort() int { // startMySqlServer configures a starts a fresh MySQL server instance and returns the port it is running on, // and the os.Process handle. If unable to start up the MySQL server, an error is returned. -func startMySqlServer(dir string) (int, *os.Process, error) { - originalCwd, err := os.Getwd() - if err != nil { - panic(err) - } - - dir = dir + string(os.PathSeparator) + "mysql" + string(os.PathSeparator) - dataDir := dir + "mysql_data" - err = os.MkdirAll(dir, 0777) - if err != nil { - return -1, nil, err - } - cmd := exec.Command("chmod", "777", dir) - output, err := cmd.Output() - if err != nil { - panic(err) - } - - err = os.Chdir(dir) +func startMySqlServer(t *testing.T, dir string) (int, *os.Process, error) { + dir = filepath.Join(dir, "mysql") + err := os.MkdirAll(dir, 0777) if err != nil { return -1, nil, err } @@ -889,28 +913,31 @@ func startMySqlServer(dir string) (int, *os.Process, error) { } username := user.Username if username == "root" { - fmt.Printf("overriding current user (root) to run mysql as 'mysql' user instead\n") + t.Logf("overriding current user (root) to run mysql as 'mysql' user instead\n") username = "mysql" } + dataDir := filepath.Join(dir, "mysql_data") + // Check to see if the MySQL data directory has the "mysql" directory in it, which // tells us whether this MySQL instance has been initialized yet or not. initialized := directoryExists(filepath.Join(dataDir, "mysql")) if !initialized { // Create a fresh MySQL server for the primary - chmodCmd := exec.Command("mysqld", + initCmd := exec.Command("mysqld", "--no-defaults", "--user="+username, "--initialize-insecure", "--datadir="+dataDir, "--default-authentication-plugin=mysql_native_password") - output, err = chmodCmd.CombinedOutput() + initCmd.Dir = dir + output, err := initCmd.CombinedOutput() if err != nil { - return -1, nil, fmt.Errorf("unable to execute command %v: %v – %v", cmd.String(), err.Error(), string(output)) + return -1, nil, fmt.Errorf("unable to execute command %v: %v – %v", initCmd.String(), err.Error(), string(output)) } } - cmd = exec.Command("mysqld", + cmd := exec.Command("mysqld", "--no-defaults", "--user="+username, "--datadir="+dataDir, @@ -920,17 +947,18 @@ func startMySqlServer(dir string) (int, *os.Process, error) { fmt.Sprintf("--port=%v", mySqlPort), "--server-id=11223344", fmt.Sprintf("--socket=mysql-%v.sock", mySqlPort), - "--general_log_file="+dir+"general_log", - "--slow_query_log_file="+dir+"slow_query_log", + "--general_log_file="+filepath.Join(dir, "general_log"), + "--slow_query_log_file="+filepath.Join(dir, "slow_query_log"), "--log-error="+dir+"error_log", - fmt.Sprintf("--pid-file="+dir+"pid-%v.pid", mySqlPort)) + fmt.Sprintf("--pid-file="+filepath.Join(dir, "pid-%v.pid"), mySqlPort)) + cmd.Dir = dir mysqlLogFilePath = filepath.Join(dir, fmt.Sprintf("mysql-%d.out.log", time.Now().Unix())) mysqlLogFile, err = os.Create(mysqlLogFilePath) if err != nil { return -1, nil, err } - fmt.Printf("MySQL server logs at: %s \n", mysqlLogFilePath) + t.Logf("MySQL server logs at: %s \n", mysqlLogFilePath) cmd.Stdout = mysqlLogFile cmd.Stderr = mysqlLogFile err = cmd.Start() @@ -941,7 +969,7 @@ func startMySqlServer(dir string) (int, *os.Process, error) { dsn := fmt.Sprintf("root@tcp(127.0.0.1:%v)/", mySqlPort) primaryDatabase = sqlx.MustOpen("mysql", dsn) - err = waitForSqlServerToStart(primaryDatabase) + err = waitForSqlServerToStart(t, primaryDatabase) if err != nil { return -1, nil, err } @@ -955,8 +983,7 @@ func startMySqlServer(dir string) (int, *os.Process, error) { dsn = fmt.Sprintf("root@tcp(127.0.0.1:%v)/", mySqlPort) primaryDatabase = sqlx.MustOpen("mysql", dsn) - os.Chdir(originalCwd) - fmt.Printf("MySQL server started on port %v \n", mySqlPort) + t.Logf("MySQL server started on port %v \n", mySqlPort) return mySqlPort, cmd.Process, nil } @@ -971,43 +998,10 @@ func directoryExists(path string) bool { return info.IsDir() } -var cachedDoltDevBuildPath = "" - -func initializeDevDoltBuild(dir string, goDirPath string) string { - if cachedDoltDevBuildPath != "" { - return cachedDoltDevBuildPath - } - - // If we're not in a CI environment, don't worry about building a dev build - if os.Getenv("CI") != "true" { - return "" - } - - basedir := filepath.Dir(filepath.Dir(dir)) - fullpath := filepath.Join(basedir, fmt.Sprintf("devDolt-%d", os.Getpid())) - - _, err := os.Stat(fullpath) - if err == nil { - return fullpath - } - - fmt.Printf("building dolt dev build at: %s \n", fullpath) - cmd := exec.Command("go", "build", "-o", fullpath, "./cmd/dolt") - cmd.Dir = goDirPath - - output, err := cmd.CombinedOutput() - if err != nil { - panic("unable to build dolt for binlog integration tests: " + err.Error() + "\nFull output: " + string(output) + "\n") - } - cachedDoltDevBuildPath = fullpath - - return cachedDoltDevBuildPath -} - // startDoltSqlServer starts a Dolt sql-server on a free port from the specified directory |dir|. If // |doltPeristentSystemVars| is populated, then those system variables will be set, persistently, for // the Dolt database, before the Dolt sql-server is started. -func startDoltSqlServer(dir string, doltPersistentSystemVars map[string]string) (int, *os.Process, error) { +func startDoltSqlServer(t *testing.T, dir string, doltPersistentSystemVars map[string]string) (int, *os.Process, error) { dir = filepath.Join(dir, "dolt") err := os.MkdirAll(dir, 0777) if err != nil { @@ -1019,57 +1013,34 @@ func startDoltSqlServer(dir string, doltPersistentSystemVars map[string]string) if doltPort < 1 { doltPort = findFreePort() } - fmt.Printf("Starting Dolt sql-server on port: %d, with data dir %s\n", doltPort, dir) - - // take the CWD and move up four directories to find the go directory - if originalWorkingDir == "" { - var err error - originalWorkingDir, err = os.Getwd() - if err != nil { - panic(err) - } - } - goDirPath := filepath.Join(originalWorkingDir, "..", "..", "..", "..") - err = os.Chdir(goDirPath) - if err != nil { - panic(err) - } - - socketPath := filepath.Join("/tmp", fmt.Sprintf("dolt.%v.sock", doltPort)) + t.Logf("Starting Dolt sql-server on port: %d, with data dir %s\n", doltPort, dir) // use an admin user NOT named "root" to test that we don't require the "root" account adminUser := "admin" if doltPersistentSystemVars != nil && len(doltPersistentSystemVars) > 0 { // Initialize the dolt directory first - err = runDoltCommand(dir, goDirPath, "init", "--name=binlog-test", "--email=binlog@test") + err = runDoltCommand(t, dir, "init", "--name=binlog-test", "--email=binlog@test") if err != nil { return -1, nil, err } for systemVar, value := range doltPersistentSystemVars { query := fmt.Sprintf("SET @@PERSIST.%s=%s;", systemVar, value) - err = runDoltCommand(dir, goDirPath, "sql", fmt.Sprintf("-q=%s", query)) + err = runDoltCommand(t, dir, "sql", fmt.Sprintf("-q=%s", query)) if err != nil { return -1, nil, err } } } - args := []string{"go", "run", "./cmd/dolt", + args := []string{DoltDevBuildPath(), "sql-server", fmt.Sprintf("-u%s", adminUser), "--loglevel=TRACE", fmt.Sprintf("--data-dir=%s", dir), - fmt.Sprintf("--port=%v", doltPort), - fmt.Sprintf("--socket=%s", socketPath)} - - // If we're running in CI, use a precompiled dolt binary instead of go run - devDoltPath := initializeDevDoltBuild(dir, goDirPath) - if devDoltPath != "" { - args[2] = devDoltPath - args = args[2:] - } + fmt.Sprintf("--port=%v", doltPort)} + cmd := exec.Command(args[0], args[1:]...) // Set a unique process group ID so that we can cleanly kill this process, as well as @@ -1094,7 +1065,7 @@ func startDoltSqlServer(dir string, doltPersistentSystemVars map[string]string) if err != nil { return -1, nil, err } - fmt.Printf("dolt sql-server logs at: %s \n", doltLogFilePath) + t.Logf("dolt sql-server logs at: %s \n", doltLogFilePath) cmd.Stdout = doltLogFile cmd.Stderr = doltLogFile err = cmd.Start() @@ -1102,18 +1073,18 @@ func startDoltSqlServer(dir string, doltPersistentSystemVars map[string]string) return -1, nil, fmt.Errorf("unable to execute command %v: %v", cmd.String(), err.Error()) } - fmt.Printf("Dolt CMD: %s\n", cmd.String()) + t.Logf("Dolt CMD: %s\n", cmd.String()) dsn := fmt.Sprintf("%s@tcp(127.0.0.1:%v)/", adminUser, doltPort) replicaDatabase = sqlx.MustOpen("mysql", dsn) - err = waitForSqlServerToStart(replicaDatabase) + err = waitForSqlServerToStart(t, replicaDatabase) if err != nil { return -1, nil, err } mustCreateReplicatorUser(replicaDatabase) - fmt.Printf("Dolt server started on port %v \n", doltPort) + t.Logf("Dolt server started on port %v \n", doltPort) return doltPort, cmd.Process, nil } @@ -1125,24 +1096,17 @@ func mustCreateReplicatorUser(db *sqlx.DB) { } // runDoltCommand runs a short-lived dolt CLI command with the specified arguments from |doltArgs|. The Dolt data -// directory is specified from |doltDataDir| and |goDirPath| is the path to the go directory within the Dolt repo. +// directory is specified from |doltDataDir|. // This function will only return when the Dolt CLI command has completed, so it is not suitable for running // long-lived commands such as "sql-server". If the command fails, an error is returned with the combined output. -func runDoltCommand(doltDataDir string, goDirPath string, doltArgs ...string) error { - // If we're running in CI, use a precompiled dolt binary instead of go run - devDoltPath := initializeDevDoltBuild(doltDataDir, goDirPath) - - args := append([]string{"go", "run", "./cmd/dolt", +func runDoltCommand(t *testing.T, doltDataDir string, doltArgs ...string) error { + args := append([]string{DoltDevBuildPath(), fmt.Sprintf("--data-dir=%s", doltDataDir)}, doltArgs...) - if devDoltPath != "" { - args[2] = devDoltPath - args = args[2:] - } cmd := exec.Command(args[0], args[1:]...) - fmt.Printf("Running Dolt CMD: %s\n", cmd.String()) + t.Logf("Running Dolt CMD: %s\n", cmd.String()) output, err := cmd.CombinedOutput() - fmt.Printf("Dolt CMD output: %s\n", string(output)) + t.Logf("Dolt CMD output: %s\n", string(output)) if err != nil { return fmt.Errorf("%w: %s", err, string(output)) } @@ -1152,13 +1116,13 @@ func runDoltCommand(doltDataDir string, goDirPath string, doltArgs ...string) er // waitForSqlServerToStart polls the specified database to wait for it to become available, pausing // between retry attempts, and returning an error if it is not able to verify that the database is // available. -func waitForSqlServerToStart(database *sqlx.DB) error { - fmt.Printf("Waiting for server to start...\n") +func waitForSqlServerToStart(t *testing.T, database *sqlx.DB) error { + t.Logf("Waiting for server to start...\n") for counter := 0; counter < 30; counter++ { if database.Ping() == nil { return nil } - fmt.Printf("not up yet; waiting...\n") + t.Logf("not up yet; waiting...\n") time.Sleep(500 * time.Millisecond) } @@ -1166,10 +1130,10 @@ func waitForSqlServerToStart(database *sqlx.DB) error { } // printFile opens the specified filepath |path| and outputs the contents of that file to stdout. -func printFile(path string) { +func printFile(t *testing.T, path string) { file, err := os.Open(path) if err != nil { - fmt.Printf("Unable to open file: %s \n", err) + t.Logf("Unable to open file: %s \n", err) return } defer file.Close() @@ -1184,9 +1148,9 @@ func printFile(path string) { panic(err) } } - fmt.Print(s) + t.Log(s) } - fmt.Println() + t.Log() } // assertRepoStateFileExists asserts that the repo_state.json file is present for the specified From 89eb2afb7ccdc184d586e305967e7fbf03a6c555 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Mon, 27 Jan 2025 17:57:35 -0800 Subject: [PATCH 2/2] go: cmd/dolt: sqlengine: Actually call DoltBinlogReplicaController.Close when closing the SqlEngine. --- go/cmd/dolt/commands/engine/sqlengine.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/go/cmd/dolt/commands/engine/sqlengine.go b/go/cmd/dolt/commands/engine/sqlengine.go index d37f25e212b..af9adb0f50f 100644 --- a/go/cmd/dolt/commands/engine/sqlengine.go +++ b/go/cmd/dolt/commands/engine/sqlengine.go @@ -316,6 +316,9 @@ func (se *SqlEngine) GetUnderlyingEngine() *gms.Engine { func (se *SqlEngine) Close() error { if se.engine != nil { + if se.engine.Analyzer.Catalog.BinlogReplicaController != nil { + dblr.DoltBinlogReplicaController.Close() + } return se.engine.Close() } return nil