Skip to content

Commit

Permalink
Merge pull request #8303 from dolthub/fulghum/replication-bugfix
Browse files Browse the repository at this point in the history
Perf: Optimize SQL transaction commits in binlog replication applier
  • Loading branch information
fulghum authored Aug 29, 2024
2 parents 9d85831 + 4c4e9bc commit e871e9c
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,16 @@ const (
// This type is NOT used concurrently – there is currently only one single applier process running to process binlog
// events, so the state in this type is NOT protected with a mutex.
type binlogReplicaApplier struct {
format *mysql.BinlogFormat
tableMapsById map[uint64]*mysql.TableMap
stopReplicationChan chan struct{}
currentGtid mysql.GTID
replicationSourceUuid string
currentPosition *mysql.Position // successfully executed GTIDs
filters *filterConfiguration
running atomic.Bool
engine *gms.Engine
format *mysql.BinlogFormat
tableMapsById map[uint64]*mysql.TableMap
stopReplicationChan chan struct{}
currentGtid mysql.GTID
replicationSourceUuid string
currentPosition *mysql.Position // successfully executed GTIDs
filters *filterConfiguration
running atomic.Bool
engine *gms.Engine
dbsWithUncommittedChanges map[string]struct{}
}

func newBinlogReplicaApplier(filters *filterConfiguration) *binlogReplicaApplier {
Expand Down Expand Up @@ -326,7 +327,6 @@ func (a *binlogReplicaApplier) replicaBinlogEventHandler(ctx *sql.Context) error
func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.Engine, event mysql.BinlogEvent) error {
var err error
createCommit := false
commitToAllDatabases := false

// We don't support checksum validation, so we MUST strip off any checksum bytes if present, otherwise it gets
// interpreted as part of the payload and corrupts the data. Future checksum sizes, are not guaranteed to be the
Expand Down Expand Up @@ -356,7 +356,6 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
// XA-capable storage engine. For more details, see: https://mariadb.com/kb/en/xid_event/
ctx.GetLogger().Trace("Received binlog event: XID")
createCommit = true
commitToAllDatabases = true

case event.IsQuery():
// A Query event represents a statement executed on the source server that should be executed on the
Expand All @@ -375,13 +374,6 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
"sql_mode": fmt.Sprintf("0x%x", query.SqlMode),
}).Trace("Received binlog event: Query")

// When executing SQL statements sent from the primary, we can't be sure what database was modified unless we
// look closely at the statement. For example, we could be connected to db01, but executed
// "create table db02.t (...);" – i.e., looking at query.Database is NOT enough to always determine the correct
// database that was modified, so instead, we commit to all databases when we see a Query binlog event to
// avoid issues with correctness, at the cost of being slightly less efficient
commitToAllDatabases = true

if query.Options&mysql.QFlagOptionAutoIsNull > 0 {
ctx.GetLogger().Tracef("Setting sql_auto_is_null ON")
ctx.SetSessionVariable(ctx, "sql_auto_is_null", 1)
Expand Down Expand Up @@ -541,13 +533,10 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
}

if createCommit {
var databasesToCommit []string
if commitToAllDatabases {
databasesToCommit = getAllUserDatabaseNames(ctx, engine)
for _, database := range databasesToCommit {
executeQueryWithEngine(ctx, engine, "use `"+database+"`;")
executeQueryWithEngine(ctx, engine, "commit;")
}
doltSession := dsess.DSessFromSess(ctx.Session)
databasesToCommit := doltSession.DirtyDatabases()
if err = doltSession.CommitTransaction(ctx, doltSession.GetTransaction()); err != nil {
return err
}

// Record the last GTID processed after the commit
Expand All @@ -562,17 +551,46 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
}

// For now, create a Dolt commit from every data update. Eventually, we'll want to make this configurable.
ctx.GetLogger().Trace("Creating Dolt commit(s)")
for _, database := range databasesToCommit {
// We commit to every database that we saw had a dirty session – these identify the databases where we have
// run DML commands through the engine. We also commit to every database that was modified through a RowEvent,
// which is all tracked through the applier's databasesWithUncommitedChanges property – these don't show up
// as dirty in our session, since we used TableWriter to update them.
a.addDatabasesWithUncommittedChanges(databasesToCommit...)
for _, database := range a.databasesWithUncommittedChanges() {
executeQueryWithEngine(ctx, engine, "use `"+database+"`;")
executeQueryWithEngine(ctx, engine,
fmt.Sprintf("call dolt_commit('-Am', 'Dolt binlog replica commit: GTID %s');", a.currentGtid))
}
a.dbsWithUncommittedChanges = nil
}

return nil
}

// addDatabasesWithUncommittedChanges marks the specifeid |dbNames| as databases with uncommitted changes so that
// the replica applier knows which databases need to have Dolt commits created.
func (a *binlogReplicaApplier) addDatabasesWithUncommittedChanges(dbNames ...string) {
if a.dbsWithUncommittedChanges == nil {
a.dbsWithUncommittedChanges = make(map[string]struct{})
}
for _, dbName := range dbNames {
a.dbsWithUncommittedChanges[dbName] = struct{}{}
}
}

// databasesWithUncommittedChanges returns a slice of database names indicating which databases have uncommitted
// changes and need a Dolt commit created.
func (a *binlogReplicaApplier) databasesWithUncommittedChanges() []string {
if a.dbsWithUncommittedChanges == nil {
return nil
}
dbNames := make([]string, 0, len(a.dbsWithUncommittedChanges))
for dbName, _ := range a.dbsWithUncommittedChanges {
dbNames = append(dbNames, dbName)
}
return dbNames
}

// processRowEvent processes a WriteRows, DeleteRows, or UpdateRows binlog event and returns an error if any problems
// were encountered.
func (a *binlogReplicaApplier) processRowEvent(ctx *sql.Context, event mysql.BinlogEvent, engine *gms.Engine) error {
Expand All @@ -599,6 +617,7 @@ func (a *binlogReplicaApplier) processRowEvent(ctx *sql.Context, event mysql.Bin
return nil
}

a.addDatabasesWithUncommittedChanges(tableMap.Database)
rows, err := event.Rows(*a.format, tableMap)
if err != nil {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"

"github.com/dolthub/go-mysql-server/sql/binlogreplication"
Expand Down Expand Up @@ -123,6 +124,47 @@ func TestBinlogReplicationSanityCheck(t *testing.T) {
requireReplicaResults(t, "select * from db01.tableT", [][]any{{"300"}})
}

// TestBinlogReplicationWithHundredsOfDatabases asserts that we can efficiently replicate the creation of hundreds of databases.
func TestBinlogReplicationWithHundredsOfDatabases(t *testing.T) {
defer teardown(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)
startReplicationAndCreateTestDb(t, mySqlPort)

// Create a table on the primary and verify on the replica
primaryDatabase.MustExec("create table tableT (pk int primary key)")
waitForReplicaToCatchUp(t)
assertCreateTableStatement(t, replicaDatabase, "tableT",
"CREATE TABLE tableT ( pk int NOT NULL, PRIMARY KEY (pk)) "+
"ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_bin")
assertRepoStateFileExists(t, "db01")

// Create a few hundred databases on the primary and let them replicate to the replica
dbCount := 300
startTime := time.Now()
for i := range dbCount {
dbName := fmt.Sprintf("db%03d", i)
primaryDatabase.MustExec(fmt.Sprintf("create database %s", dbName))
}
waitForReplicaToCatchUp(t)
endTime := time.Now()
logrus.Infof("Time to replicate %d databases: %v", dbCount, endTime.Sub(startTime))

// Spot check the presence of a database on the replica
assertRepoStateFileExists(t, "db042")

// Insert some data in one database
startTime = time.Now()
primaryDatabase.MustExec("use db042;")
primaryDatabase.MustExec("create table t (pk int primary key);")
primaryDatabase.MustExec("insert into t values (100), (101), (102);")

// Verify the results on the replica
waitForReplicaToCatchUp(t)
requireReplicaResults(t, "select * from db042.t;", [][]any{{"100"}, {"101"}, {"102"}})
endTime = time.Now()
logrus.Infof("Time to replicate inserts to 1 database (out of %d): %v", endTime.Sub(startTime), dbCount)
}

// TestAutoRestartReplica tests that a Dolt replica automatically starts up replication if
// replication was running when the replica was shut down.
func TestAutoRestartReplica(t *testing.T) {
Expand Down
15 changes: 15 additions & 0 deletions go/libraries/doltcore/sqle/dsess/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,21 @@ func (d *DoltSession) dirtyWorkingSets() []*branchState {
return dirtyStates
}

// DirtyDatabases returns the names of databases who have outstanding changes in this session and need to be committed
// in a SQL transaction before they are visible to other sessions.
func (d *DoltSession) DirtyDatabases() []string {
var dbNames []string
for _, dbState := range d.dbStates {
for _, branchState := range dbState.heads {
if branchState.dirty {
dbNames = append(dbNames, dbState.dbName)
break
}
}
}
return dbNames
}

// CommitWorkingSet commits the working set for the transaction given, without creating a new dolt commit.
// Clients should typically use CommitTransaction, which performs additional checks, instead of this method.
func (d *DoltSession) CommitWorkingSet(ctx *sql.Context, dbName string, tx sql.Transaction) error {
Expand Down

0 comments on commit e871e9c

Please sign in to comment.