Skip to content

Commit

Permalink
prune incomplete sql before send (#21202)
Browse files Browse the repository at this point in the history
prune incomplete sql before send

Approved by: @daviszhen
  • Loading branch information
ck89119 authored Jan 13, 2025
1 parent dba711f commit 8210c78
Showing 1 changed file with 4 additions and 2 deletions.
6 changes: 4 additions & 2 deletions pkg/cdc/sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func (s *mysqlSinker) Sink(ctx context.Context, data *DecoderOutput) {

// output the left sql
if s.preSqlBufLen > sqlBufReserved {
s.sqlBufSendCh <- s.sqlBuf
s.sqlBufSendCh <- s.sqlBuf[:s.preSqlBufLen]
s.curBufIdx ^= 1
s.sqlBuf = s.sqlBufs[s.curBufIdx]
}
Expand Down Expand Up @@ -568,12 +568,14 @@ func (s *mysqlSinker) appendSqlBuf(rowType RowType) (err error) {
// complete sql statement
if rowType == InsertRow {
s.sqlBuf = appendString(s.sqlBuf, ";")
s.preSqlBufLen = len(s.sqlBuf)
} else {
s.sqlBuf = appendString(s.sqlBuf, ");")
s.preSqlBufLen = len(s.sqlBuf)
}

// send it to downstream
s.sqlBufSendCh <- s.sqlBuf
s.sqlBufSendCh <- s.sqlBuf[:s.preSqlBufLen]
s.curBufIdx ^= 1
s.sqlBuf = s.sqlBufs[s.curBufIdx]

Expand Down

0 comments on commit 8210c78

Please sign in to comment.