Skip to content

Commit

Permalink
go: binlogreplication: Add Session{{Begin,End}Command,End} lifecycle …
Browse files Browse the repository at this point in the history
…callbacks to the replica controller execution context session.

This makes small clean ups to the lifecycle around replica applier and the
ownership and lifecycle of the mysql.Conn read connection.

This PR also includes some changes to slightly improve the performance and
reliability of the tests when running them locally. In particular, some of the
changes include:

1) Since `go run ./cmd/dolt` takes about four seconds to validate the existing
   cached build on my laptop, we just go ahead and use a cached build everywhere.

2) We use t.Log{f,} instead of fmt.Prin.. to improve the ergonomics of test
   running and getting output from a failure in particular.

3) We try to minimize global process state changes like unnecessary `os.Chdir`
   calls, since it would be nice to parallelize these tests eventually.

4) We get rid of the unused and seemingly unnecessary --socket= argument to
   Dolt, where we had to use a directory not corresponding to $TMPDIR, for
   example, because max pathlength on a sun_path on MacOS is 104 characters or
   whatever.
  • Loading branch information
reltuk committed Jan 28, 2025
1 parent b830fbc commit 003e2d1
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 170 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ func persistReplicaRunningState(ctx *sql.Context, state replicaRunningState) err

// loadReplicationConfiguration loads the replication configuration for default channel ("") from
// the "mysql" database, |mysqlDb|.
func loadReplicationConfiguration(_ *sql.Context, mysqlDb *mysql_db.MySQLDb) (*mysql_db.ReplicaSourceInfo, error) {
func loadReplicationConfiguration(ctx *sql.Context, mysqlDb *mysql_db.MySQLDb) (*mysql_db.ReplicaSourceInfo, error) {
sql.SessionCommandBegin(ctx.Session)
defer sql.SessionCommandEnd(ctx.Session)
rd := mysqlDb.Reader()
defer rd.Close()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ func TestBinlogPrimary_ReplicaRestart(t *testing.T) {
// Restart the MySQL replica and reconnect to the Dolt primary
prevPrimaryDatabase := primaryDatabase
var err error
mySqlPort, mySqlProcess, err = startMySqlServer(testDir)
mySqlPort, mySqlProcess, err = startMySqlServer(t, testDir)
require.NoError(t, err)
replicaDatabase = primaryDatabase
primaryDatabase = prevPrimaryDatabase
Expand Down Expand Up @@ -1042,7 +1042,7 @@ func outputReplicaApplierStatus(t *testing.T) {
newRows, err := replicaDatabase.Queryx("select * from performance_schema.replication_applier_status_by_worker;")
require.NoError(t, err)
allNewRows := readAllRowsIntoMaps(t, newRows)
fmt.Printf("\n\nreplication_applier_status_by_worker: %v\n", allNewRows)
t.Logf("\n\nreplication_applier_status_by_worker: %v\n", allNewRows)
}

// outputShowReplicaStatus prints out replica status information. This is useful for debugging
Expand All @@ -1052,7 +1052,7 @@ func outputShowReplicaStatus(t *testing.T) {
newRows, err := replicaDatabase.Queryx("show replica status;")
require.NoError(t, err)
allNewRows := readAllRowsIntoMaps(t, newRows)
fmt.Printf("\n\nSHOW REPLICA STATUS: %v\n", allNewRows)
t.Logf("\n\nSHOW REPLICA STATUS: %v\n", allNewRows)
}

// copyMap returns a copy of the specified map |m|.
Expand Down Expand Up @@ -1098,7 +1098,7 @@ func waitForReplicaToReconnect(t *testing.T) {
func mustRestartDoltPrimaryServer(t *testing.T) {
var err error
prevReplicaDatabase := replicaDatabase
doltPort, doltProcess, err = startDoltSqlServer(testDir, nil)
doltPort, doltProcess, err = startDoltSqlServer(t, testDir, nil)
require.NoError(t, err)
primaryDatabase = replicaDatabase
replicaDatabase = prevReplicaDatabase
Expand All @@ -1109,7 +1109,7 @@ func mustRestartDoltPrimaryServer(t *testing.T) {
func mustRestartMySqlReplicaServer(t *testing.T) {
var err error
prevPrimaryDatabase := primaryDatabase
mySqlPort, mySqlProcess, err = startMySqlServer(testDir)
mySqlPort, mySqlProcess, err = startMySqlServer(t, testDir)
require.NoError(t, err)
replicaDatabase = primaryDatabase
primaryDatabase = prevPrimaryDatabase
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"io"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -63,6 +64,7 @@ type binlogReplicaApplier struct {
currentPosition *mysql.Position // successfully executed GTIDs
filters *filterConfiguration
running atomic.Bool
handlerWg sync.WaitGroup
engine *gms.Engine
dbsWithUncommittedChanges map[string]struct{}
}
Expand All @@ -88,10 +90,14 @@ const rowFlag_rowsAreComplete = 0x0008

// Go spawns a new goroutine to run the applier's binlog event handler.
func (a *binlogReplicaApplier) Go(ctx *sql.Context) {
if !a.running.CompareAndSwap(false, true) {
panic("attempt to start binlogReplicaApplier while it is already running")
}
a.handlerWg.Add(1)
go func() {
a.running.Store(true)
defer a.handlerWg.Done()
defer a.running.Store(false)
err := a.replicaBinlogEventHandler(ctx)
a.running.Store(false)
if err != nil {
ctx.GetLogger().Errorf("unexpected error of type %T: '%v'", err, err.Error())
DoltBinlogReplicaController.setSqlError(mysql.ERUnknownError, err.Error())
Expand All @@ -104,6 +110,27 @@ func (a *binlogReplicaApplier) IsRunning() bool {
return a.running.Load()
}

// Stop will shutdown the replication thread if it is running. This is not safe to call concurrently |Go|.
// This is used by the controller when implementing STOP REPLICA, but it is also used on shutdown when the
// replication thread should be shutdown cleanly in the event that it is still running.
func (a *binlogReplicaApplier) Stop() {
if a.IsRunning() {
// We jump through some hoops here. It is not the case that the replication thread
// is guaranteed to read from |stopReplicationChan|. Instead, it can exit on its
// own with an error, for example, after exceeding connection retry attempts.
done := make(chan struct{})
go func() {
defer close(done)
a.handlerWg.Wait()
}()
select {
case a.stopReplicationChan <- struct{}{}:
case <-done:
}
a.handlerWg.Wait()
}
}

// connectAndStartReplicationEventStream connects to the configured MySQL replication source, including pausing
// and retrying if errors are encountered.
func (a *binlogReplicaApplier) connectAndStartReplicationEventStream(ctx *sql.Context) (*mysql.Conn, error) {
Expand Down Expand Up @@ -263,25 +290,21 @@ func (a *binlogReplicaApplier) startReplicationEventStream(ctx *sql.Context, con
func (a *binlogReplicaApplier) replicaBinlogEventHandler(ctx *sql.Context) error {
engine := a.engine

var conn *mysql.Conn
var eventProducer *binlogEventProducer

// Process binlog events
for {
if conn == nil {
if eventProducer == nil {
ctx.GetLogger().Debug("no binlog connection to source, attempting to establish one")
if eventProducer != nil {
eventProducer.Stop()
}

var err error
if conn, err = a.connectAndStartReplicationEventStream(ctx); err == ErrReplicationStopped {
if conn, err := a.connectAndStartReplicationEventStream(ctx); err == ErrReplicationStopped {
return nil
} else if err != nil {
return err
} else {
eventProducer = newBinlogEventProducer(conn)
eventProducer.Go(ctx)
}
eventProducer = newBinlogEventProducer(conn)
eventProducer.Go(ctx)
}

select {
Expand All @@ -305,8 +328,6 @@ func (a *binlogReplicaApplier) replicaBinlogEventHandler(ctx *sql.Context) error
})
eventProducer.Stop()
eventProducer = nil
conn.Close()
conn = nil
}
} else {
// otherwise, log the error if it's something we don't expect and continue
Expand All @@ -317,6 +338,7 @@ func (a *binlogReplicaApplier) replicaBinlogEventHandler(ctx *sql.Context) error
case <-a.stopReplicationChan:
ctx.GetLogger().Trace("received stop replication signal")
eventProducer.Stop()
eventProducer = nil
return nil
}
}
Expand All @@ -325,6 +347,8 @@ func (a *binlogReplicaApplier) replicaBinlogEventHandler(ctx *sql.Context) error
// processBinlogEvent processes a single binlog event message and returns an error if there were any problems
// processing it.
func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.Engine, event mysql.BinlogEvent) error {
sql.SessionCommandBegin(ctx.Session)
defer sql.SessionCommandEnd(ctx.Session)
var err error
createCommit := false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,9 @@ func (d *doltBinlogReplicaController) StartReplica(ctx *sql.Context) error {
// changes and execute DDL statements on the running server. If the account doesn't exist, it will be
// created and locked to disable log ins, and if it does exist, but is missing super privs or is not
// locked, it will be given superuser privs and locked.
func (d *doltBinlogReplicaController) configureReplicationUser(_ *sql.Context) {
func (d *doltBinlogReplicaController) configureReplicationUser(ctx *sql.Context) {
sql.SessionCommandBegin(ctx.Session)
defer sql.SessionCommandEnd(ctx.Session)
mySQLDb := d.engine.Analyzer.Catalog.MySQLDb
ed := mySQLDb.Editor()
defer ed.Close()
Expand All @@ -180,12 +182,15 @@ func (d *doltBinlogReplicaController) SetEngine(engine *sqle.Engine) {

// StopReplica implements the BinlogReplicaController interface.
func (d *doltBinlogReplicaController) StopReplica(ctx *sql.Context) error {
d.operationMutex.Lock()
defer d.operationMutex.Unlock()

if d.applier.IsRunning() == false {
ctx.Warn(3084, "Replication thread(s) for channel '' are already stopped.")
return nil
}

d.applier.stopReplicationChan <- struct{}{}
d.applier.Stop()

d.updateStatus(func(status *binlogreplication.ReplicaStatus) {
status.ReplicaIoRunning = binlogreplication.ReplicaIoNotRunning
Expand Down Expand Up @@ -428,6 +433,17 @@ func (d *doltBinlogReplicaController) AutoStart(_ context.Context) error {
return d.StartReplica(d.ctx)
}

// Release all resources, such as replication threads, associated with the replication.
// This can only be done once in the lifecycle of the instance. Because DoltBinlogReplicaController
// is currently a global singleton, this should only be done once in the lifecycle of the
// application.
func (d *doltBinlogReplicaController) Close() {
d.applier.Stop()
if d.ctx != nil {
sql.SessionEnd(d.ctx.Session)
}
}

//
// Helper functions
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package binlogreplication

import (
"sync"
"sync/atomic"

"github.com/dolthub/go-mysql-server/sql"
Expand All @@ -30,19 +31,24 @@ type binlogEventProducer struct {
conn *mysql.Conn
errorChan chan error
eventChan chan mysql.BinlogEvent
closeChan chan struct{}
wg sync.WaitGroup
running atomic.Bool
}

// newBinlogEventProducer creates a new binlog event producer that reads from the specified, established MySQL
// connection |conn|. The returned binlogEventProducer owns the communication channels
// and is responsible for closing them when the binlogEventProducer is stopped.
//
// The BinlogEventProducer will take ownership of the supplied |*Conn| instance and
// will |Close| it when the producer itself exits.
func newBinlogEventProducer(conn *mysql.Conn) *binlogEventProducer {
producer := &binlogEventProducer{
conn: conn,
eventChan: make(chan mysql.BinlogEvent),
errorChan: make(chan error),
closeChan: make(chan struct{}),
}
producer.running.Store(true)
return producer
}

Expand All @@ -61,7 +67,14 @@ func (p *binlogEventProducer) ErrorChan() <-chan error {
// Go starts this binlogEventProducer in a new goroutine. Right before this routine exits, it will close the
// two communication channels it owns.
func (p *binlogEventProducer) Go(_ *sql.Context) {
if !p.running.CompareAndSwap(false, true) {
panic("attempt to start binlogEventProducer more than once.")
}
p.wg.Add(1)
go func() {
defer p.wg.Done()
defer close(p.errorChan)
defer close(p.eventChan)
for p.IsRunning() {
// ReadBinlogEvent blocks until a binlog event can be read and returned, so this has to be done on a
// separate thread, otherwise the applier would be blocked and wouldn't be able to handle the STOP
Expand All @@ -75,13 +88,19 @@ func (p *binlogEventProducer) Go(_ *sql.Context) {
}

if err != nil {
p.errorChan <- err
select {
case p.errorChan <- err:
case <-p.closeChan:
return
}
} else {
p.eventChan <- event
select {
case p.eventChan <- event:
case <-p.closeChan:
return
}
}
}
close(p.errorChan)
close(p.eventChan)
}()
}

Expand All @@ -92,5 +111,9 @@ func (p *binlogEventProducer) IsRunning() bool {

// Stop requests for this binlogEventProducer to stop processing events as soon as possible.
func (p *binlogEventProducer) Stop() {
p.running.Store(false)
if p.running.CompareAndSwap(true, false) {
p.conn.Close()
close(p.closeChan)
}
p.wg.Wait()
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestBinlogReplicationServerRestart(t *testing.T) {
time.Sleep(1000 * time.Millisecond)

var err error
doltPort, doltProcess, err = startDoltSqlServer(testDir, nil)
doltPort, doltProcess, err = startDoltSqlServer(t, testDir, nil)
require.NoError(t, err)

// Check replication status on the replica and assert configuration persisted
Expand Down
Loading

0 comments on commit 003e2d1

Please sign in to comment.