Skip to content

Commit

Permalink
Merge pull request #8794 from dolthub/aaron/sql-replication-session-l…
Browse files Browse the repository at this point in the history
…ifecycle

[no-release-notes] go: binlogreplication: Add Session{{Begin,End}Command,End} lifecycle callbacks to the replica controller execution context session.
  • Loading branch information
reltuk authored Jan 28, 2025
2 parents 78e9a8a + 89eb2af commit c401029
Show file tree
Hide file tree
Showing 8 changed files with 202 additions and 170 deletions.
3 changes: 3 additions & 0 deletions go/cmd/dolt/commands/engine/sqlengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,9 @@ func (se *SqlEngine) GetUnderlyingEngine() *gms.Engine {

func (se *SqlEngine) Close() error {
if se.engine != nil {
if se.engine.Analyzer.Catalog.BinlogReplicaController != nil {
dblr.DoltBinlogReplicaController.Close()
}
return se.engine.Close()
}
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ func persistReplicaRunningState(ctx *sql.Context, state replicaRunningState) err

// loadReplicationConfiguration loads the replication configuration for default channel ("") from
// the "mysql" database, |mysqlDb|.
func loadReplicationConfiguration(_ *sql.Context, mysqlDb *mysql_db.MySQLDb) (*mysql_db.ReplicaSourceInfo, error) {
func loadReplicationConfiguration(ctx *sql.Context, mysqlDb *mysql_db.MySQLDb) (*mysql_db.ReplicaSourceInfo, error) {
sql.SessionCommandBegin(ctx.Session)
defer sql.SessionCommandEnd(ctx.Session)
rd := mysqlDb.Reader()
defer rd.Close()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ func TestBinlogPrimary_ReplicaRestart(t *testing.T) {
// Restart the MySQL replica and reconnect to the Dolt primary
prevPrimaryDatabase := primaryDatabase
var err error
mySqlPort, mySqlProcess, err = startMySqlServer(testDir)
mySqlPort, mySqlProcess, err = startMySqlServer(t, testDir)
require.NoError(t, err)
replicaDatabase = primaryDatabase
primaryDatabase = prevPrimaryDatabase
Expand Down Expand Up @@ -1042,7 +1042,7 @@ func outputReplicaApplierStatus(t *testing.T) {
newRows, err := replicaDatabase.Queryx("select * from performance_schema.replication_applier_status_by_worker;")
require.NoError(t, err)
allNewRows := readAllRowsIntoMaps(t, newRows)
fmt.Printf("\n\nreplication_applier_status_by_worker: %v\n", allNewRows)
t.Logf("\n\nreplication_applier_status_by_worker: %v\n", allNewRows)
}

// outputShowReplicaStatus prints out replica status information. This is useful for debugging
Expand All @@ -1052,7 +1052,7 @@ func outputShowReplicaStatus(t *testing.T) {
newRows, err := replicaDatabase.Queryx("show replica status;")
require.NoError(t, err)
allNewRows := readAllRowsIntoMaps(t, newRows)
fmt.Printf("\n\nSHOW REPLICA STATUS: %v\n", allNewRows)
t.Logf("\n\nSHOW REPLICA STATUS: %v\n", allNewRows)
}

// copyMap returns a copy of the specified map |m|.
Expand Down Expand Up @@ -1098,7 +1098,7 @@ func waitForReplicaToReconnect(t *testing.T) {
func mustRestartDoltPrimaryServer(t *testing.T) {
var err error
prevReplicaDatabase := replicaDatabase
doltPort, doltProcess, err = startDoltSqlServer(testDir, nil)
doltPort, doltProcess, err = startDoltSqlServer(t, testDir, nil)
require.NoError(t, err)
primaryDatabase = replicaDatabase
replicaDatabase = prevReplicaDatabase
Expand All @@ -1109,7 +1109,7 @@ func mustRestartDoltPrimaryServer(t *testing.T) {
func mustRestartMySqlReplicaServer(t *testing.T) {
var err error
prevPrimaryDatabase := primaryDatabase
mySqlPort, mySqlProcess, err = startMySqlServer(testDir)
mySqlPort, mySqlProcess, err = startMySqlServer(t, testDir)
require.NoError(t, err)
replicaDatabase = primaryDatabase
primaryDatabase = prevPrimaryDatabase
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"io"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -63,6 +64,7 @@ type binlogReplicaApplier struct {
currentPosition *mysql.Position // successfully executed GTIDs
filters *filterConfiguration
running atomic.Bool
handlerWg sync.WaitGroup
engine *gms.Engine
dbsWithUncommittedChanges map[string]struct{}
}
Expand All @@ -88,10 +90,14 @@ const rowFlag_rowsAreComplete = 0x0008

// Go spawns a new goroutine to run the applier's binlog event handler.
func (a *binlogReplicaApplier) Go(ctx *sql.Context) {
if !a.running.CompareAndSwap(false, true) {
panic("attempt to start binlogReplicaApplier while it is already running")
}
a.handlerWg.Add(1)
go func() {
a.running.Store(true)
defer a.handlerWg.Done()
defer a.running.Store(false)
err := a.replicaBinlogEventHandler(ctx)
a.running.Store(false)
if err != nil {
ctx.GetLogger().Errorf("unexpected error of type %T: '%v'", err, err.Error())
DoltBinlogReplicaController.setSqlError(mysql.ERUnknownError, err.Error())
Expand All @@ -104,6 +110,27 @@ func (a *binlogReplicaApplier) IsRunning() bool {
return a.running.Load()
}

// Stop will shutdown the replication thread if it is running. This is not safe to call concurrently |Go|.
// This is used by the controller when implementing STOP REPLICA, but it is also used on shutdown when the
// replication thread should be shutdown cleanly in the event that it is still running.
func (a *binlogReplicaApplier) Stop() {
if a.IsRunning() {
// We jump through some hoops here. It is not the case that the replication thread
// is guaranteed to read from |stopReplicationChan|. Instead, it can exit on its
// own with an error, for example, after exceeding connection retry attempts.
done := make(chan struct{})
go func() {
defer close(done)
a.handlerWg.Wait()
}()
select {
case a.stopReplicationChan <- struct{}{}:
case <-done:
}
a.handlerWg.Wait()
}
}

// connectAndStartReplicationEventStream connects to the configured MySQL replication source, including pausing
// and retrying if errors are encountered.
func (a *binlogReplicaApplier) connectAndStartReplicationEventStream(ctx *sql.Context) (*mysql.Conn, error) {
Expand Down Expand Up @@ -263,25 +290,21 @@ func (a *binlogReplicaApplier) startReplicationEventStream(ctx *sql.Context, con
func (a *binlogReplicaApplier) replicaBinlogEventHandler(ctx *sql.Context) error {
engine := a.engine

var conn *mysql.Conn
var eventProducer *binlogEventProducer

// Process binlog events
for {
if conn == nil {
if eventProducer == nil {
ctx.GetLogger().Debug("no binlog connection to source, attempting to establish one")
if eventProducer != nil {
eventProducer.Stop()
}

var err error
if conn, err = a.connectAndStartReplicationEventStream(ctx); err == ErrReplicationStopped {
if conn, err := a.connectAndStartReplicationEventStream(ctx); err == ErrReplicationStopped {
return nil
} else if err != nil {
return err
} else {
eventProducer = newBinlogEventProducer(conn)
eventProducer.Go(ctx)
}
eventProducer = newBinlogEventProducer(conn)
eventProducer.Go(ctx)
}

select {
Expand All @@ -305,8 +328,6 @@ func (a *binlogReplicaApplier) replicaBinlogEventHandler(ctx *sql.Context) error
})
eventProducer.Stop()
eventProducer = nil
conn.Close()
conn = nil
}
} else {
// otherwise, log the error if it's something we don't expect and continue
Expand All @@ -317,6 +338,7 @@ func (a *binlogReplicaApplier) replicaBinlogEventHandler(ctx *sql.Context) error
case <-a.stopReplicationChan:
ctx.GetLogger().Trace("received stop replication signal")
eventProducer.Stop()
eventProducer = nil
return nil
}
}
Expand All @@ -325,6 +347,8 @@ func (a *binlogReplicaApplier) replicaBinlogEventHandler(ctx *sql.Context) error
// processBinlogEvent processes a single binlog event message and returns an error if there were any problems
// processing it.
func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.Engine, event mysql.BinlogEvent) error {
sql.SessionCommandBegin(ctx.Session)
defer sql.SessionCommandEnd(ctx.Session)
var err error
createCommit := false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,9 @@ func (d *doltBinlogReplicaController) StartReplica(ctx *sql.Context) error {
// changes and execute DDL statements on the running server. If the account doesn't exist, it will be
// created and locked to disable log ins, and if it does exist, but is missing super privs or is not
// locked, it will be given superuser privs and locked.
func (d *doltBinlogReplicaController) configureReplicationUser(_ *sql.Context) {
func (d *doltBinlogReplicaController) configureReplicationUser(ctx *sql.Context) {
sql.SessionCommandBegin(ctx.Session)
defer sql.SessionCommandEnd(ctx.Session)
mySQLDb := d.engine.Analyzer.Catalog.MySQLDb
ed := mySQLDb.Editor()
defer ed.Close()
Expand All @@ -180,12 +182,15 @@ func (d *doltBinlogReplicaController) SetEngine(engine *sqle.Engine) {

// StopReplica implements the BinlogReplicaController interface.
func (d *doltBinlogReplicaController) StopReplica(ctx *sql.Context) error {
d.operationMutex.Lock()
defer d.operationMutex.Unlock()

if d.applier.IsRunning() == false {
ctx.Warn(3084, "Replication thread(s) for channel '' are already stopped.")
return nil
}

d.applier.stopReplicationChan <- struct{}{}
d.applier.Stop()

d.updateStatus(func(status *binlogreplication.ReplicaStatus) {
status.ReplicaIoRunning = binlogreplication.ReplicaIoNotRunning
Expand Down Expand Up @@ -428,6 +433,17 @@ func (d *doltBinlogReplicaController) AutoStart(_ context.Context) error {
return d.StartReplica(d.ctx)
}

// Release all resources, such as replication threads, associated with the replication.
// This can only be done once in the lifecycle of the instance. Because DoltBinlogReplicaController
// is currently a global singleton, this should only be done once in the lifecycle of the
// application.
func (d *doltBinlogReplicaController) Close() {
d.applier.Stop()
if d.ctx != nil {
sql.SessionEnd(d.ctx.Session)
}
}

//
// Helper functions
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package binlogreplication

import (
"sync"
"sync/atomic"

"github.com/dolthub/go-mysql-server/sql"
Expand All @@ -30,19 +31,24 @@ type binlogEventProducer struct {
conn *mysql.Conn
errorChan chan error
eventChan chan mysql.BinlogEvent
closeChan chan struct{}
wg sync.WaitGroup
running atomic.Bool
}

// newBinlogEventProducer creates a new binlog event producer that reads from the specified, established MySQL
// connection |conn|. The returned binlogEventProducer owns the communication channels
// and is responsible for closing them when the binlogEventProducer is stopped.
//
// The BinlogEventProducer will take ownership of the supplied |*Conn| instance and
// will |Close| it when the producer itself exits.
func newBinlogEventProducer(conn *mysql.Conn) *binlogEventProducer {
producer := &binlogEventProducer{
conn: conn,
eventChan: make(chan mysql.BinlogEvent),
errorChan: make(chan error),
closeChan: make(chan struct{}),
}
producer.running.Store(true)
return producer
}

Expand All @@ -61,7 +67,14 @@ func (p *binlogEventProducer) ErrorChan() <-chan error {
// Go starts this binlogEventProducer in a new goroutine. Right before this routine exits, it will close the
// two communication channels it owns.
func (p *binlogEventProducer) Go(_ *sql.Context) {
if !p.running.CompareAndSwap(false, true) {
panic("attempt to start binlogEventProducer more than once.")
}
p.wg.Add(1)
go func() {
defer p.wg.Done()
defer close(p.errorChan)
defer close(p.eventChan)
for p.IsRunning() {
// ReadBinlogEvent blocks until a binlog event can be read and returned, so this has to be done on a
// separate thread, otherwise the applier would be blocked and wouldn't be able to handle the STOP
Expand All @@ -75,13 +88,19 @@ func (p *binlogEventProducer) Go(_ *sql.Context) {
}

if err != nil {
p.errorChan <- err
select {
case p.errorChan <- err:
case <-p.closeChan:
return
}
} else {
p.eventChan <- event
select {
case p.eventChan <- event:
case <-p.closeChan:
return
}
}
}
close(p.errorChan)
close(p.eventChan)
}()
}

Expand All @@ -92,5 +111,9 @@ func (p *binlogEventProducer) IsRunning() bool {

// Stop requests for this binlogEventProducer to stop processing events as soon as possible.
func (p *binlogEventProducer) Stop() {
p.running.Store(false)
if p.running.CompareAndSwap(true, false) {
p.conn.Close()
close(p.closeChan)
}
p.wg.Wait()
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestBinlogReplicationServerRestart(t *testing.T) {
time.Sleep(1000 * time.Millisecond)

var err error
doltPort, doltProcess, err = startDoltSqlServer(testDir, nil)
doltPort, doltProcess, err = startDoltSqlServer(t, testDir, nil)
require.NoError(t, err)

// Check replication status on the replica and assert configuration persisted
Expand Down
Loading

0 comments on commit c401029

Please sign in to comment.