Skip to content

Commit

Permalink
feat: binlog replication from upstream MySQL instance
Browse files Browse the repository at this point in the history
  • Loading branch information
fanyang01 authored Aug 28, 2024
2 parents 7932eb5 + 77fda60 commit b51b076
Show file tree
Hide file tree
Showing 14 changed files with 621 additions and 247 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
myduckserver
*.db
*.wal
*.bin
*.log
.replica/
5 changes: 1 addition & 4 deletions binlogreplication/binlog_json_serialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,7 @@ const maxOffsetSize = uint32(65_535)
// And a third-party description is here:
// https://lafengnan.gitbooks.io/blog/content/mysql/chapter2.html
func encodeJsonDoc(jsonDoc sql.JSONWrapper) (buffer []byte, err error) {
val, err := jsonDoc.ToInterface()
if err != nil {
return nil, err
}
val := jsonDoc.ToInterface()
typeId, encodedValue, err := encodeJsonValue(val)
if err != nil {
return nil, err
Expand Down
120 changes: 59 additions & 61 deletions binlogreplication/binlog_replica_applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
gms "github.com/dolthub/go-mysql-server"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/go-mysql-server/sql/binlogreplication"
"github.com/dolthub/go-mysql-server/sql/mysql_db"
"github.com/dolthub/go-mysql-server/sql/planbuilder"
"github.com/dolthub/go-mysql-server/sql/rowexec"
"github.com/dolthub/go-mysql-server/sql/types"
Expand Down Expand Up @@ -57,6 +56,7 @@ type binlogReplicaApplier struct {
filters *filterConfiguration
running atomic.Bool
engine *gms.Engine
tableWriterProvider TableWriterProvider
}

func newBinlogReplicaApplier(filters *filterConfiguration) *binlogReplicaApplier {
Expand Down Expand Up @@ -86,7 +86,7 @@ func (a *binlogReplicaApplier) Go(ctx *sql.Context) {
a.running.Store(false)
if err != nil {
ctx.GetLogger().Errorf("unexpected error of type %T: '%v'", err, err.Error())
DoltBinlogReplicaController.setSqlError(mysql.ERUnknownError, err.Error())
MyBinlogReplicaController.setSqlError(mysql.ERUnknownError, err.Error())
}
}()
}
Expand All @@ -101,7 +101,7 @@ func (a *binlogReplicaApplier) IsRunning() bool {
func (a *binlogReplicaApplier) connectAndStartReplicationEventStream(ctx *sql.Context) (*mysql.Conn, error) {
var maxConnectionAttempts uint64
var connectRetryDelay uint32
DoltBinlogReplicaController.updateStatus(func(status *binlogreplication.ReplicaStatus) {
MyBinlogReplicaController.updateStatus(func(status *binlogreplication.ReplicaStatus) {
status.ReplicaIoRunning = binlogreplication.ReplicaIoConnecting
status.ReplicaSqlRunning = binlogreplication.ReplicaSqlRunning
maxConnectionAttempts = status.SourceRetryCount
Expand All @@ -115,17 +115,17 @@ func (a *binlogReplicaApplier) connectAndStartReplicationEventStream(ctx *sql.Co

if replicaSourceInfo == nil {
err = ErrServerNotConfiguredAsReplica
DoltBinlogReplicaController.setIoError(ERFatalReplicaError, err.Error())
MyBinlogReplicaController.setIoError(ERFatalReplicaError, err.Error())
return nil, err
} else if replicaSourceInfo.Uuid != "" {
a.replicationSourceUuid = replicaSourceInfo.Uuid
}

if replicaSourceInfo.Host == "" {
DoltBinlogReplicaController.setIoError(ERFatalReplicaError, ErrEmptyHostname.Error())
MyBinlogReplicaController.setIoError(ERFatalReplicaError, ErrEmptyHostname.Error())
return nil, ErrEmptyHostname
} else if replicaSourceInfo.User == "" {
DoltBinlogReplicaController.setIoError(ERFatalReplicaError, ErrEmptyUsername.Error())
MyBinlogReplicaController.setIoError(ERFatalReplicaError, ErrEmptyUsername.Error())
return nil, ErrEmptyUsername
}

Expand Down Expand Up @@ -169,7 +169,7 @@ func (a *binlogReplicaApplier) connectAndStartReplicationEventStream(ctx *sql.Co
return nil, err
}

DoltBinlogReplicaController.updateStatus(func(status *binlogreplication.ReplicaStatus) {
MyBinlogReplicaController.updateStatus(func(status *binlogreplication.ReplicaStatus) {
status.ReplicaIoRunning = binlogreplication.ReplicaIoRunning
})

Expand Down Expand Up @@ -278,15 +278,15 @@ func (a *binlogReplicaApplier) replicaBinlogEventHandler(ctx *sql.Context) error
err := a.processBinlogEvent(ctx, engine, event)
if err != nil {
ctx.GetLogger().Errorf("unexpected error of type %T: '%v'", err, err.Error())
DoltBinlogReplicaController.setSqlError(mysql.ERUnknownError, err.Error())
MyBinlogReplicaController.setSqlError(mysql.ERUnknownError, err.Error())
}

case err := <-eventProducer.ErrorChan():
if sqlError, isSqlError := err.(*mysql.SQLError); isSqlError {
badConnection := sqlError.Message == io.EOF.Error() ||
strings.HasPrefix(sqlError.Message, io.ErrUnexpectedEOF.Error())
if badConnection {
DoltBinlogReplicaController.updateStatus(func(status *binlogreplication.ReplicaStatus) {
MyBinlogReplicaController.updateStatus(func(status *binlogreplication.ReplicaStatus) {
status.LastIoError = sqlError.Message
status.LastIoErrNumber = ERNetReadError
currentTime := time.Now()
Expand All @@ -300,7 +300,7 @@ func (a *binlogReplicaApplier) replicaBinlogEventHandler(ctx *sql.Context) error
} else {
// otherwise, log the error if it's something we don't expect and continue
ctx.GetLogger().Errorf("unexpected error of type %T: '%v'", err, err.Error())
DoltBinlogReplicaController.setIoError(mysql.ERUnknownError, err.Error())
MyBinlogReplicaController.setIoError(mysql.ERUnknownError, err.Error())
}

case <-a.stopReplicationChan:
Expand Down Expand Up @@ -330,7 +330,7 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
if err != nil {
msg := fmt.Sprintf("unable to strip checksum from binlog event: '%v'", err.Error())
ctx.GetLogger().Error(msg)
DoltBinlogReplicaController.setSqlError(mysql.ERUnknownError, msg)
MyBinlogReplicaController.setSqlError(mysql.ERUnknownError, msg)
}
}

Expand Down Expand Up @@ -363,7 +363,7 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
"query": query.SQL,
"options": fmt.Sprintf("0x%x", query.Options),
"sql_mode": fmt.Sprintf("0x%x", query.SqlMode),
}).Trace("Received binlog event: Query")
}).Infoln("Received binlog event: Query")

// When executing SQL statements sent from the primary, we can't be sure what database was modified unless we
// look closely at the statement. For example, we could be connected to db01, but executed
Expand Down Expand Up @@ -405,9 +405,14 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
ctx.SetSessionVariable(ctx, "unique_checks", 1)
}

ctx.SetCurrentDatabase(query.Database)
executeQueryWithEngine(ctx, engine, query.SQL)
createCommit = strings.ToLower(query.SQL) != "begin"
// TODO(fan): Disable the transaction for now.
if createCommit {
if !(query.Database == "mysql" && strings.HasPrefix(query.SQL, "TRUNCATE TABLE")) {
ctx.SetCurrentDatabase(query.Database)
executeQueryWithEngine(ctx, engine, query.SQL)
}
}

case event.IsRotate():
// When a binary log file exceeds the configured size limit, a ROTATE_EVENT is written at the end of the file,
Expand Down Expand Up @@ -502,7 +507,7 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
if flags != 0 {
msg := fmt.Sprintf("unsupported binlog protocol message: TableMap event with unsupported flags '%x'", flags)
ctx.GetLogger().Errorf(msg)
DoltBinlogReplicaController.setSqlError(mysql.ERUnknownError, msg)
MyBinlogReplicaController.setSqlError(mysql.ERUnknownError, msg)
}
a.tableMapsById[tableId] = tableMap
}
Expand Down Expand Up @@ -531,14 +536,16 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
}

if createCommit {
var databasesToCommit []string
if commitToAllDatabases {
databasesToCommit = getAllUserDatabaseNames(ctx, engine)
for _, database := range databasesToCommit {
executeQueryWithEngine(ctx, engine, "use `"+database+"`;")
executeQueryWithEngine(ctx, engine, "commit;")
}
}
// TODO(fan): Disable transaction commit for now
_ = commitToAllDatabases
// var databasesToCommit []string
// if commitToAllDatabases {
// databasesToCommit = getAllUserDatabaseNames(ctx, engine)
// for _, database := range databasesToCommit {
// executeQueryWithEngine(ctx, engine, "use `"+database+"`;")
// executeQueryWithEngine(ctx, engine, "commit;")
// }
// }

// Record the last GTID processed after the commit
a.currentPosition.GTIDSet = a.currentPosition.GTIDSet.AddGTID(a.currentGtid)
Expand All @@ -550,14 +557,6 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
if err != nil {
return fmt.Errorf("unable to store GTID executed metadata to disk: %s", err.Error())
}

// For now, create a Dolt commit from every data update. Eventually, we'll want to make this configurable.
ctx.GetLogger().Trace("Creating Dolt commit(s)")
for _, database := range databasesToCommit {
executeQueryWithEngine(ctx, engine, "use `"+database+"`;")
executeQueryWithEngine(ctx, engine,
fmt.Sprintf("call dolt_commit('-Am', 'Dolt binlog replica commit: GTID %s');", a.currentGtid))
}
}

return nil
Expand Down Expand Up @@ -585,6 +584,11 @@ func (a *binlogReplicaApplier) processRowEvent(ctx *sql.Context, event mysql.Bin
return fmt.Errorf("unable to find replication metadata for table ID: %d", tableId)
}

// Skip processing of MySQL system tables
if tableMap.Database == "mysql" {
return nil
}

if a.filters.isTableFilteredOut(ctx, tableMap) {
return nil
}
Expand All @@ -611,23 +615,34 @@ func (a *binlogReplicaApplier) processRowEvent(ctx *sql.Context, event mysql.Bin
if flags != 0 {
msg := fmt.Sprintf("unsupported binlog protocol message: row event with unsupported flags '%x'", flags)
ctx.GetLogger().Errorf(msg)
DoltBinlogReplicaController.setSqlError(mysql.ERUnknownError, msg)
MyBinlogReplicaController.setSqlError(mysql.ERUnknownError, msg)
}
schema, tableName, err := getTableSchema(ctx, engine, tableMap.Name, tableMap.Database)
if err != nil {
return err
}

var typ EventType
switch {
case event.IsDeleteRows():
typ = DeleteEvent
ctx.GetLogger().Tracef(" - Deleted Rows (table: %s)", tableMap.Name)
case event.IsUpdateRows():
typ = UpdateEvent
ctx.GetLogger().Tracef(" - Updated Rows (table: %s)", tableMap.Name)
case event.IsWriteRows():
typ = InsertEvent
ctx.GetLogger().Tracef(" - Inserted Rows (table: %s)", tableMap.Name)
}

writeSession, tableWriter, err := getTableWriter(ctx, engine, tableName, tableMap.Database, foreignKeyChecksDisabled)
tableWriter, err := a.tableWriterProvider.GetTableWriter(
ctx, engine,
tableMap.Database, tableName,
schema, len(tableMap.Types),
rows.IdentifyColumns, rows.DataColumns,
typ,
foreignKeyChecksDisabled,
)
if err != nil {
return err
}
Expand Down Expand Up @@ -661,27 +676,22 @@ func (a *binlogReplicaApplier) processRowEvent(ctx *sql.Context, event mysql.Bin
if err != nil {
return err
}

}

err = closeWriteSession(ctx, engine, tableMap.Database, writeSession)
if err != nil {
return err
}
ctx.GetLogger().WithFields(logrus.Fields{
"db": tableMap.Database,
"table": tableName,
"event": eventType,
"rows": len(rows.Rows),
}).Infoln("processRowEvent")

return nil
return tableWriter.Close()
}

//
// Helper functions
//

// closeWriteSession flushes and closes the specified |writeSession| and returns an error if anything failed.
func closeWriteSession(ctx *sql.Context, engine *gms.Engine, databaseName string, writeSession WriteSession) error {
// TO BE IMPLEMENTED
return nil
}

// getTableSchema returns a sql.Schema for the case-insensitive |tableName| in the database named
// |databaseName|, along with the exact, case-sensitive table name.
func getTableSchema(ctx *sql.Context, engine *gms.Engine, tableName, databaseName string) (sql.Schema, string, error) {
Expand All @@ -700,18 +710,6 @@ func getTableSchema(ctx *sql.Context, engine *gms.Engine, tableName, databaseNam
return table.Schema(), table.Name(), nil
}

// getTableWriter returns a WriteSession and a TableWriter for writing to the specified |table| in the specified |database|.
func getTableWriter(ctx *sql.Context, engine *gms.Engine, tableName, databaseName string, foreignKeyChecksDisabled bool) (WriteSession, TableWriter, error) {
database, err := engine.Analyzer.Catalog.Database(ctx, databaseName)
if err != nil {
return nil, nil, err
}
if privDatabase, ok := database.(mysql_db.PrivilegedDatabase); ok {
database = privDatabase.Unwrap()
}
return nil, nil, nil
}

// parseRow parses the binary row data from a MySQL binlog event and converts it into a go-mysql-server Row using the
// |schema| information provided. |columnsPresentBitmap| indicates which column values are present in |data| and
// |nullValuesBitmap| indicates which columns have null values and are NOT present in |data|.
Expand Down Expand Up @@ -882,7 +880,7 @@ func loadReplicaServerId() (uint32, error) {

func executeQueryWithEngine(ctx *sql.Context, engine *gms.Engine, query string) {
// Create a sub-context when running queries against the engine, so that we get an accurate query start time.
queryCtx := sql.NewContext(ctx, sql.WithSession(ctx.Session))
queryCtx := sql.NewContext(ctx, sql.WithSession(ctx.Session)).WithQuery(query)

if queryCtx.GetCurrentDatabase() == "" {
ctx.GetLogger().WithFields(logrus.Fields{
Expand All @@ -897,9 +895,9 @@ func executeQueryWithEngine(ctx *sql.Context, engine *gms.Engine, query string)
queryCtx.GetLogger().WithFields(logrus.Fields{
"error": err.Error(),
"query": query,
}).Errorf("Error executing query")
msg := fmt.Sprintf("Error executing query: %v", err.Error())
DoltBinlogReplicaController.setSqlError(mysql.ERUnknownError, msg)
}).Errorf("Applying query failed")
msg := fmt.Sprintf("Applying query failed: %v", err.Error())
MyBinlogReplicaController.setSqlError(mysql.ERUnknownError, msg)
}
return
}
Expand Down
Loading

0 comments on commit b51b076

Please sign in to comment.