diff --git a/canal/handler.go b/canal/handler.go index 4ddf03bfb..d51560151 100644 --- a/canal/handler.go +++ b/canal/handler.go @@ -17,6 +17,10 @@ type EventHandler interface { OnGTID(header *replication.EventHeader, gtidEvent mysql.BinlogGTIDEvent) error // OnPosSynced Use your own way to sync position. When force is true, sync position immediately. OnPosSynced(header *replication.EventHeader, pos mysql.Position, set mysql.GTIDSet, force bool) error + // OnRowsQueryEvent is called when binlog_rows_query_log_events=ON for each DML query. + // You'll get the original executed query, with comments if present. + // It will be called before OnRow. + OnRowsQueryEvent(e *replication.RowsQueryEvent) error String() string } @@ -40,6 +44,9 @@ func (h *DummyEventHandler) OnGTID(*replication.EventHeader, mysql.BinlogGTIDEve func (h *DummyEventHandler) OnPosSynced(*replication.EventHeader, mysql.Position, mysql.GTIDSet, bool) error { return nil } +func (h *DummyEventHandler) OnRowsQueryEvent(*replication.RowsQueryEvent) error { + return nil +} func (h *DummyEventHandler) String() string { return "DummyEventHandler" } diff --git a/canal/sync.go b/canal/sync.go index 5dc2ca4dd..b71be536a 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -128,6 +128,10 @@ func (c *Canal) runSyncBinlog() error { if err := c.eventHandler.OnGTID(ev.Header, e); err != nil { return errors.Trace(err) } + case *replication.RowsQueryEvent: + if err := c.eventHandler.OnRowsQueryEvent(e); err != nil { + return errors.Trace(err) + } case *replication.QueryEvent: stmts, _, err := c.parser.Parse(string(e.Query), "", "") if err != nil {