From eafe094bbdc57aebaf005bb169c0d64a1ff72a59 Mon Sep 17 00:00:00 2001 From: qupeng Date: Fri, 13 Sep 2024 11:33:51 +0800 Subject: [PATCH 01/12] init Signed-off-by: qupeng --- cdc/model/sink.go | 11 ++++++ cdc/sink/dmlsink/txn/event.go | 12 ++++--- cdc/sink/dmlsink/txn/mysql/dml.go | 56 ++++++++++++++++------------- cdc/sink/dmlsink/txn/mysql/mysql.go | 10 +++--- 4 files changed, 55 insertions(+), 34 deletions(-) diff --git a/cdc/model/sink.go b/cdc/model/sink.go index d3e0d525bf7..825768c6fb8 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -1292,3 +1292,14 @@ type TopicPartitionKey struct { PartitionKey string TotalPartition int32 } + +// GetColumnFlag returns column flag. +func GetColumnFlag(column *ColumnData, tb *TableInfo) ColumnFlagType { + return *tb.ColumnsFlag[column.ColumnID] +} + +// GetColumnInfo returns column info. +func GetColumnInfo(column *ColumnData, tb *TableInfo) *model.ColumnInfo { + offset := tb.columnsOffset[column.ColumnID] + return tb.Columns[offset] +} diff --git a/cdc/sink/dmlsink/txn/event.go b/cdc/sink/dmlsink/txn/event.go index 1632bce8823..532556394ad 100644 --- a/cdc/sink/dmlsink/txn/event.go +++ b/cdc/sink/dmlsink/txn/event.go @@ -73,7 +73,7 @@ func genRowKeys(row *model.RowChangedEvent) [][]byte { var keys [][]byte if len(row.Columns) != 0 { for iIdx, idxCol := range row.TableInfo.IndexColumnsOffset { - key := genKeyList(row.GetColumns(), iIdx, idxCol, row.GetTableID()) + key := genKeyList(row.Columns, row.TableInfo, iIdx, idxCol, row.GetTableID()) if len(key) == 0 { continue } @@ -82,7 +82,7 @@ func genRowKeys(row *model.RowChangedEvent) [][]byte { } if len(row.PreColumns) != 0 { for iIdx, idxCol := range row.TableInfo.IndexColumnsOffset { - key := genKeyList(row.GetPreColumns(), iIdx, idxCol, row.GetTableID()) + key := genKeyList(row.PreColumns, row.TableInfo, iIdx, idxCol, row.GetTableID()) if len(key) == 0 { continue } @@ -101,19 +101,21 @@ func genRowKeys(row *model.RowChangedEvent) [][]byte { } func genKeyList( - columns []*model.Column, iIdx int, colIdx []int, tableID int64, + columns []*model.ColumnData, tb *model.TableInfo, iIdx int, colIdx []int, tableID int64, ) []byte { var key []byte for _, i := range colIdx { + colInfo := model.GetColumnInfo(columns[i], tb) + colFlag := model.GetColumnFlag(columns[i], tb) // if a column value is null, we can ignore this index // If the index contain generated column, we can't use this key to detect conflict with other DML, // Because such as insert can't specify the generated value. - if columns[i] == nil || columns[i].Value == nil || columns[i].Flag.IsGeneratedColumn() { + if columns[i] == nil || columns[i].Value == nil || colFlag.IsGeneratedColumn() { return nil } val := model.ColumnValueString(columns[i].Value) - if columnNeeds2LowerCase(columns[i].Type, columns[i].Collation) { + if columnNeeds2LowerCase(colInfo.GetType(), colInfo.GetCollate()) { val = strings.ToLower(val) } diff --git a/cdc/sink/dmlsink/txn/mysql/dml.go b/cdc/sink/dmlsink/txn/mysql/dml.go index da29618908f..115acf7ce7a 100644 --- a/cdc/sink/dmlsink/txn/mysql/dml.go +++ b/cdc/sink/dmlsink/txn/mysql/dml.go @@ -17,6 +17,7 @@ import ( "strings" "github.com/pingcap/tidb/pkg/parser/charset" + pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/quotes" ) @@ -24,18 +25,20 @@ import ( // prepareUpdate builds a parametrics UPDATE statement as following // sql: `UPDATE `test`.`t` SET {} = ?, {} = ? WHERE {} = ?, {} = {} LIMIT 1` // `WHERE` conditions come from `preCols` and SET clause targets come from `cols`. -func prepareUpdate(quoteTable string, preCols, cols []*model.Column, forceReplicate bool) (string, []interface{}) { +func prepareUpdate(quoteTable string, preCols, cols []*model.ColumnData, tb *model.TableInfo, forceReplicate bool) (string, []interface{}) { var builder strings.Builder builder.WriteString("UPDATE " + quoteTable + " SET ") columnNames := make([]string, 0, len(cols)) args := make([]interface{}, 0, len(cols)+len(preCols)) for _, col := range cols { - if col == nil || col.Flag.IsGeneratedColumn() { + colInfo := model.GetColumnInfo(col, tb) + colFlag := model.GetColumnFlag(col, tb) + if col == nil || colFlag.IsGeneratedColumn() { continue } - columnNames = append(columnNames, col.Name) - args = appendQueryArgs(args, col) + columnNames = append(columnNames, colInfo.Name.O) + args = appendQueryArgs(args, col, colInfo) } if len(args) == 0 { return "", nil @@ -49,7 +52,7 @@ func prepareUpdate(quoteTable string, preCols, cols []*model.Column, forceReplic } builder.WriteString(" WHERE ") - colNames, wargs := whereSlice(preCols, forceReplicate) + colNames, wargs := whereSlice(preCols, tb, forceReplicate) if len(wargs) == 0 { return "", nil } @@ -73,7 +76,8 @@ func prepareUpdate(quoteTable string, preCols, cols []*model.Column, forceReplic // sql: `REPLACE INTO `test`.`t` VALUES (?,?,?)` func prepareReplace( quoteTable string, - cols []*model.Column, + cols []*model.ColumnData, + tb *model.TableInfo, appendPlaceHolder bool, translateToInsert bool, ) (string, []interface{}) { @@ -81,11 +85,13 @@ func prepareReplace( columnNames := make([]string, 0, len(cols)) args := make([]interface{}, 0, len(cols)) for _, col := range cols { - if col == nil || col.Flag.IsGeneratedColumn() { + colInfo := model.GetColumnInfo(col, tb) + colFlag := model.GetColumnFlag(col, tb) + if col == nil || colFlag.IsGeneratedColumn() { continue } - columnNames = append(columnNames, col.Name) - args = appendQueryArgs(args, col) + columnNames = append(columnNames, colInfo.Name.O) + args = appendQueryArgs(args, col, colInfo) } if len(args) == 0 { return "", nil @@ -108,8 +114,9 @@ func prepareReplace( // representation. Because if we use the byte array respresentation, the go-sql-driver // will automatically set `_binary` charset for that column, which is not expected. // See https://github.com/go-sql-driver/mysql/blob/ce134bfc/connection.go#L267 -func appendQueryArgs(args []interface{}, col *model.Column) []interface{} { - if col.Charset != "" && col.Charset != charset.CharsetBin { +func appendQueryArgs(args []interface{}, col *model.ColumnData, colInfo *pmodel.ColumnInfo) []interface{} { + cst := colInfo.GetCharset() + if cst != "" && cst != charset.CharsetBin { colValBytes, ok := col.Value.([]byte) if ok { args = append(args, string(colValBytes)) @@ -125,11 +132,11 @@ func appendQueryArgs(args []interface{}, col *model.Column) []interface{} { // prepareDelete builds a parametric DELETE statement as following // sql: `DELETE FROM `test`.`t` WHERE x = ? AND y >= ? LIMIT 1` -func prepareDelete(quoteTable string, cols []*model.Column, forceReplicate bool) (string, []interface{}) { +func prepareDelete(quoteTable string, cols []*model.ColumnData, tb *model.TableInfo, forceReplicate bool) (string, []interface{}) { var builder strings.Builder builder.WriteString("DELETE FROM " + quoteTable + " WHERE ") - colNames, wargs := whereSlice(cols, forceReplicate) + colNames, wargs := whereSlice(cols, tb, forceReplicate) if len(wargs) == 0 { return "", nil } @@ -152,23 +159,22 @@ func prepareDelete(quoteTable string, cols []*model.Column, forceReplicate bool) // whereSlice builds a parametric WHERE clause as following // sql: `WHERE {} = ? AND {} > ?` -func whereSlice(cols []*model.Column, forceReplicate bool) (colNames []string, args []interface{}) { - // Try to use unique key values when available - for _, col := range cols { - if col == nil || !col.Flag.IsHandleKey() { - continue - } - colNames = append(colNames, col.Name) - args = appendQueryArgs(args, col) - } +func whereSlice(cols []*model.ColumnData, tb *model.TableInfo, forceReplicate bool) (colNames []string, args []interface{}) { // if no explicit row id but force replicate, use all key-values in where condition if len(colNames) == 0 && forceReplicate { colNames = make([]string, 0, len(cols)) args = make([]interface{}, 0, len(cols)) - for _, col := range cols { - colNames = append(colNames, col.Name) - args = appendQueryArgs(args, col) + } + + // Try to use unique key values when available + for _, col := range cols { + colFlag := model.GetColumnFlag(col, tb) + colInfo := model.GetColumnInfo(col, tb) + if col == nil || !colFlag.IsHandleKey() { + continue } + colNames = append(colNames, colInfo.Name.O) + args = appendQueryArgs(args, col, colInfo) } return } diff --git a/cdc/sink/dmlsink/txn/mysql/mysql.go b/cdc/sink/dmlsink/txn/mysql/mysql.go index 27fddeeafa7..710f87ba09f 100644 --- a/cdc/sink/dmlsink/txn/mysql/mysql.go +++ b/cdc/sink/dmlsink/txn/mysql/mysql.go @@ -571,8 +571,9 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs { if len(row.PreColumns) != 0 && len(row.Columns) != 0 { query, args = prepareUpdate( quoteTable, - row.GetPreColumns(), - row.GetColumns(), + row.PreColumns, + row.Columns, + row.TableInfo, s.cfg.ForceReplicate) if query != "" { sqls = append(sqls, query) @@ -584,7 +585,7 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs { // Delete Event if len(row.PreColumns) != 0 { - query, args = prepareDelete(quoteTable, row.GetPreColumns(), s.cfg.ForceReplicate) + query, args = prepareDelete(quoteTable, row.PreColumns, row.TableInfo, s.cfg.ForceReplicate) if query != "" { sqls = append(sqls, query) values = append(values, args) @@ -598,7 +599,8 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs { if len(row.Columns) != 0 { query, args = prepareReplace( quoteTable, - row.GetColumns(), + row.Columns, + row.TableInfo, true, /* appendPlaceHolder */ translateToInsert) if query != "" { From edf2f2c99725a78ebe76d984e1d1f79c7a4821ea Mon Sep 17 00:00:00 2001 From: qupeng Date: Fri, 13 Sep 2024 12:09:58 +0800 Subject: [PATCH 02/12] fix Signed-off-by: qupeng --- cdc/model/sink.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 825768c6fb8..41a81a0e6f5 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -1294,8 +1294,8 @@ type TopicPartitionKey struct { } // GetColumnFlag returns column flag. -func GetColumnFlag(column *ColumnData, tb *TableInfo) ColumnFlagType { - return *tb.ColumnsFlag[column.ColumnID] +func GetColumnFlag(column *ColumnData, tb *TableInfo) *ColumnFlagType { + return tb.ColumnsFlag[column.ColumnID] } // GetColumnInfo returns column info. From 3f7854d8036742c21b13054acb2ae24629287656 Mon Sep 17 00:00:00 2001 From: qupeng Date: Fri, 13 Sep 2024 12:19:05 +0800 Subject: [PATCH 03/12] more changes Signed-off-by: qupeng --- cdc/sink/dmlsink/txn/mysql/dml.go | 32 ++++++++++++++++++++----------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/cdc/sink/dmlsink/txn/mysql/dml.go b/cdc/sink/dmlsink/txn/mysql/dml.go index 115acf7ce7a..cb91912756f 100644 --- a/cdc/sink/dmlsink/txn/mysql/dml.go +++ b/cdc/sink/dmlsink/txn/mysql/dml.go @@ -160,21 +160,31 @@ func prepareDelete(quoteTable string, cols []*model.ColumnData, tb *model.TableI // whereSlice builds a parametric WHERE clause as following // sql: `WHERE {} = ? AND {} > ?` func whereSlice(cols []*model.ColumnData, tb *model.TableInfo, forceReplicate bool) (colNames []string, args []interface{}) { - // if no explicit row id but force replicate, use all key-values in where condition + // If no explicit row id but force replicate, use all key-values in where condition. if len(colNames) == 0 && forceReplicate { colNames = make([]string, 0, len(cols)) args = make([]interface{}, 0, len(cols)) - } - - // Try to use unique key values when available - for _, col := range cols { - colFlag := model.GetColumnFlag(col, tb) - colInfo := model.GetColumnInfo(col, tb) - if col == nil || !colFlag.IsHandleKey() { - continue + for _, col := range cols { + if col == nil { + continue + } + colInfo := model.GetColumnInfo(col, tb) + colNames = append(colNames, colInfo.Name.O) + args = appendQueryArgs(args, col, colInfo) + } + } else { // Try to use unique key values when available. + for _, col := range cols { + if col == nil { + continue + } + colFlag := model.GetColumnFlag(col, tb) + colInfo := model.GetColumnInfo(col, tb) + if !colFlag.IsHandleKey() { + continue + } + colNames = append(colNames, colInfo.Name.O) + args = appendQueryArgs(args, col, colInfo) } - colNames = append(colNames, colInfo.Name.O) - args = appendQueryArgs(args, col, colInfo) } return } From c86f5ae84fac2ecfa432a991e15ed6646f76ee11 Mon Sep 17 00:00:00 2001 From: qupeng Date: Thu, 19 Sep 2024 13:23:28 +0800 Subject: [PATCH 04/12] stage Signed-off-by: qupeng --- .../canal/canal_json_row_event_encoder.go | 26 ++++++++++++------- pkg/sink/codec/internal/column.go | 9 ++++--- pkg/sink/codec/open/open_protocol_message.go | 18 +++++++------ 3 files changed, 31 insertions(+), 22 deletions(-) diff --git a/pkg/sink/codec/canal/canal_json_row_event_encoder.go b/pkg/sink/codec/canal/canal_json_row_event_encoder.go index 76f1f5e52d3..042c7c520ab 100644 --- a/pkg/sink/codec/canal/canal_json_row_event_encoder.go +++ b/pkg/sink/codec/canal/canal_json_row_event_encoder.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" + pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tiflow/pkg/sink/codec" "github.com/pingcap/tiflow/pkg/sink/codec/common" "github.com/pingcap/tiflow/pkg/sink/codec/utils" @@ -32,7 +33,8 @@ import ( ) func fillColumns( - columns []*model.Column, + columns []*model.ColumnData, + tb *model.TableInfo, onlyOutputUpdatedColumn bool, onlyHandleKeyColumn bool, newColumnMap map[string]*model.Column, @@ -227,17 +229,19 @@ func newJSONMessageForDML( out.RawString(",\"old\":null") out.RawString(",\"data\":") if err := fillColumns( - e.GetColumns(), + e.Columns, + e.TableInfo, false, onlyHandleKey, nil, out, builder, ); err != nil { return nil, err } } else if e.IsUpdate() { - var newColsMap map[string]*model.Column + var newColsMap map[string]*model.ColumnData if config.OnlyOutputUpdatedColumns { - newColsMap = make(map[string]*model.Column, len(e.Columns)) - for _, col := range e.GetColumns() { - newColsMap[col.Name] = col + newColsMap = make(map[string]*model.ColumnData, len(e.Columns)) + for _, col := range e.Columns { + name := model.GetColumnInfo(col, tb).Name.O + newColsMap[name] = col } } out.RawString(",\"old\":") @@ -549,13 +553,15 @@ func (b *jsonRowEventEncoderBuilder) Build() codec.RowEventEncoder { return newJSONRowEventEncoder(b.config, b.claimCheck) } -func shouldIgnoreColumn(col *model.Column, - newColumnMap map[string]*model.Column, +func shouldIgnoreColumn( + col *model.ColumnData, + info *pmodel.ColumnInfo, + newColumnMap map[string]*model.ColumnData, ) bool { - newCol, ok := newColumnMap[col.Name] + newCol, ok := newColumnMap[info.Name.O] if ok && newCol != nil { // sql type is not equal - if newCol.Type != col.Type { + if newCol.Type != info.GetType() { return false } // value equal diff --git a/pkg/sink/codec/internal/column.go b/pkg/sink/codec/internal/column.go index 5aefe9d9a16..3e9a4eefcc8 100644 --- a/pkg/sink/codec/internal/column.go +++ b/pkg/sink/codec/internal/column.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/pkg/parser/mysql" + pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tiflow/cdc/model" "go.uber.org/zap" ) @@ -34,9 +35,9 @@ type Column struct { } // FromRowChangeColumn converts from a row changed column to a codec column. -func (c *Column) FromRowChangeColumn(col *model.Column) { - c.Type = col.Type - c.Flag = col.Flag +func (c *Column) FromRowChangeColumn(col *model.ColumnData, flag *model.ColumnFlagType, info *pmodel.ColumnInfo) { + c.Type = info.GetType() + c.Flag = *flag if c.Flag.IsHandleKey() { whereHandle := true c.WhereHandle = &whereHandle @@ -45,7 +46,7 @@ func (c *Column) FromRowChangeColumn(col *model.Column) { c.Value = nil return } - switch col.Type { + switch c.Type { case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar: var str string switch col.Value.(type) { diff --git a/pkg/sink/codec/open/open_protocol_message.go b/pkg/sink/codec/open/open_protocol_message.go index 77829e69935..2f5ac425875 100644 --- a/pkg/sink/codec/open/open_protocol_message.go +++ b/pkg/sink/codec/open/open_protocol_message.go @@ -140,14 +140,14 @@ func rowChangeToMsg( value := &messageRow{} if e.IsDelete() { onlyHandleKeyColumns := config.DeleteOnlyHandleKeyColumns || largeMessageOnlyHandleKeyColumns - value.Delete = rowChangeColumns2CodecColumns(e.GetPreColumns(), onlyHandleKeyColumns) + value.Delete = rowChangeColumns2CodecColumns(e.Columns, e.TableInfo, onlyHandleKeyColumns) if onlyHandleKeyColumns && len(value.Delete) == 0 { return nil, nil, cerror.ErrOpenProtocolCodecInvalidData.GenWithStack("not found handle key columns for the delete event") } } else if e.IsUpdate() { - value.Update = rowChangeColumns2CodecColumns(e.GetColumns(), largeMessageOnlyHandleKeyColumns) + value.Update = rowChangeColumns2CodecColumns(e.Columns, e.TableInfo, largeMessageOnlyHandleKeyColumns) if config.OpenOutputOldValue { - value.PreColumns = rowChangeColumns2CodecColumns(e.GetPreColumns(), largeMessageOnlyHandleKeyColumns) + value.PreColumns = rowChangeColumns2CodecColumns(e.PreColumns, e.TableInfo, largeMessageOnlyHandleKeyColumns) } if largeMessageOnlyHandleKeyColumns && (len(value.Update) == 0 || (len(value.PreColumns) == 0 && config.OpenOutputOldValue)) { @@ -157,7 +157,7 @@ func rowChangeToMsg( value.dropNotUpdatedColumns() } } else { - value.Update = rowChangeColumns2CodecColumns(e.GetColumns(), largeMessageOnlyHandleKeyColumns) + value.Update = rowChangeColumns2CodecColumns(e.Columns, e.TableInfo, largeMessageOnlyHandleKeyColumns) if largeMessageOnlyHandleKeyColumns && len(value.Update) == 0 { return nil, nil, cerror.ErrOpenProtocolCodecInvalidData.GenWithStack("not found handle key columns for the insert event") } @@ -198,18 +198,20 @@ func msgToRowChange(key *internal.MessageKey, value *messageRow) *model.RowChang return e } -func rowChangeColumns2CodecColumns(cols []*model.Column, onlyHandleKeyColumns bool) map[string]internal.Column { +func rowChangeColumns2CodecColumns(cols []*model.ColumnData, tb *model.TableInfo, onlyHandleKeyColumns bool) map[string]internal.Column { jsonCols := make(map[string]internal.Column, len(cols)) for _, col := range cols { if col == nil { continue } - if onlyHandleKeyColumns && !col.Flag.IsHandleKey() { + flag := model.GetColumnFlag(col, tb) + if onlyHandleKeyColumns && !flag.IsHandleKey() { continue } + colInfo := model.GetColumnInfo(col, tb) c := internal.Column{} - c.FromRowChangeColumn(col) - jsonCols[col.Name] = c + c.FromRowChangeColumn(col, flag, colInfo) + jsonCols[colInfo.Name.O] = c } if len(jsonCols) == 0 { return nil From 7dfcba68613ac43544db50d823c6403a10ce6b75 Mon Sep 17 00:00:00 2001 From: qupeng Date: Fri, 20 Sep 2024 11:38:21 +0800 Subject: [PATCH 05/12] init Signed-off-by: qupeng --- cdc/model/sink.go | 65 +++++++++++++++---- cdc/sink/dmlsink/txn/event.go | 16 ++--- cdc/sink/dmlsink/txn/mysql/dml.go | 60 ++++++++++------- cdc/sink/dmlsink/txn/mysql/mysql.go | 13 ++-- pkg/sink/codec/avro/avro.go | 2 +- pkg/sink/codec/canal/canal_entry.go | 43 +++++------- .../canal/canal_json_row_event_encoder.go | 55 +++++++--------- pkg/sink/codec/internal/column.go | 13 ++-- pkg/sink/codec/open/open_protocol_message.go | 24 ++++--- pkg/util/bitflag.go | 8 +-- 10 files changed, 162 insertions(+), 137 deletions(-) diff --git a/cdc/model/sink.go b/cdc/model/sink.go index d4b6876124f..535f9e10f09 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -94,8 +94,8 @@ func (b *ColumnFlagType) UnsetIsBinary() { } // IsBinary shows whether BinaryFlag is set -func (b *ColumnFlagType) IsBinary() bool { - return (*util.Flag)(b).HasAll(util.Flag(BinaryFlag)) +func (b ColumnFlagType) IsBinary() bool { + return (util.Flag)(b).HasAll(util.Flag(BinaryFlag)) } // SetIsHandleKey sets HandleKey @@ -109,8 +109,8 @@ func (b *ColumnFlagType) UnsetIsHandleKey() { } // IsHandleKey shows whether HandleKey is set -func (b *ColumnFlagType) IsHandleKey() bool { - return (*util.Flag)(b).HasAll(util.Flag(HandleKeyFlag)) +func (b ColumnFlagType) IsHandleKey() bool { + return (util.Flag)(b).HasAll(util.Flag(HandleKeyFlag)) } // SetIsGeneratedColumn sets GeneratedColumn @@ -124,8 +124,8 @@ func (b *ColumnFlagType) UnsetIsGeneratedColumn() { } // IsGeneratedColumn shows whether GeneratedColumn is set -func (b *ColumnFlagType) IsGeneratedColumn() bool { - return (*util.Flag)(b).HasAll(util.Flag(GeneratedColumnFlag)) +func (b ColumnFlagType) IsGeneratedColumn() bool { + return (util.Flag)(b).HasAll(util.Flag(GeneratedColumnFlag)) } // SetIsPrimaryKey sets PrimaryKeyFlag @@ -139,8 +139,8 @@ func (b *ColumnFlagType) UnsetIsPrimaryKey() { } // IsPrimaryKey shows whether PrimaryKeyFlag is set -func (b *ColumnFlagType) IsPrimaryKey() bool { - return (*util.Flag)(b).HasAll(util.Flag(PrimaryKeyFlag)) +func (b ColumnFlagType) IsPrimaryKey() bool { + return (util.Flag)(b).HasAll(util.Flag(PrimaryKeyFlag)) } // SetIsUniqueKey sets UniqueKeyFlag @@ -555,8 +555,8 @@ func (r *RowChangedEvent) GetHandleKeyColumnValues() []string { } // HandleKeyColInfos returns the column(s) and colInfo(s) corresponding to the handle key(s) -func (r *RowChangedEvent) HandleKeyColInfos() ([]*Column, []rowcodec.ColInfo) { - pkeyCols := make([]*Column, 0) +func (r *RowChangedEvent) HandleKeyColInfos() ([]ColumnDataX, []rowcodec.ColInfo) { + pkeyCols := make([]ColumnDataX, 0) pkeyColInfos := make([]rowcodec.ColInfo, 0) var cols []*ColumnData @@ -570,7 +570,7 @@ func (r *RowChangedEvent) HandleKeyColInfos() ([]*Column, []rowcodec.ColInfo) { colInfos := tableInfo.GetColInfosForRowChangedEvent() for i, col := range cols { if col != nil && tableInfo.ForceGetColumnFlagType(col.ColumnID).IsHandleKey() { - pkeyCols = append(pkeyCols, columnData2Column(col, tableInfo)) + pkeyCols = append(pkeyCols, GetColumnDataX(col, tableInfo)) pkeyColInfos = append(pkeyColInfos, colInfos[i]) } } @@ -1294,3 +1294,46 @@ type TopicPartitionKey struct { PartitionKey string TotalPartition int32 } + +type ColumnDataX struct { + *ColumnData + flag *ColumnFlagType + info *model.ColumnInfo +} + +func GetColumnDataX(col *ColumnData, tb *TableInfo) ColumnDataX { + x := ColumnDataX{ColumnData: col} + if x.ColumnData != nil { + x.flag = tb.ColumnsFlag[col.ColumnID] + x.info = tb.Columns[tb.columnsOffset[col.ColumnID]] + } + return x +} + +func (x ColumnDataX) GetName() string { + return x.info.Name.O +} + +func (x ColumnDataX) GetType() byte { + return x.info.GetType() +} + +func (x ColumnDataX) GetCharset() string { + return x.info.GetCharset() +} + +func (x ColumnDataX) GetCollation() string { + return x.info.GetCollate() +} + +func (x ColumnDataX) GetFlag() ColumnFlagType { + return *x.flag +} + +func (x ColumnDataX) GetDefaultValue() interface{} { + return GetColumnDefaultValue(x.info) +} + +func (x ColumnDataX) GetColumnInfo() *model.ColumnInfo { + return x.info +} diff --git a/cdc/sink/dmlsink/txn/event.go b/cdc/sink/dmlsink/txn/event.go index 1632bce8823..f4e90e8595f 100644 --- a/cdc/sink/dmlsink/txn/event.go +++ b/cdc/sink/dmlsink/txn/event.go @@ -73,7 +73,7 @@ func genRowKeys(row *model.RowChangedEvent) [][]byte { var keys [][]byte if len(row.Columns) != 0 { for iIdx, idxCol := range row.TableInfo.IndexColumnsOffset { - key := genKeyList(row.GetColumns(), iIdx, idxCol, row.GetTableID()) + key := genKeyList(row.Columns, row.TableInfo, iIdx, idxCol, row.GetTableID()) if len(key) == 0 { continue } @@ -82,7 +82,7 @@ func genRowKeys(row *model.RowChangedEvent) [][]byte { } if len(row.PreColumns) != 0 { for iIdx, idxCol := range row.TableInfo.IndexColumnsOffset { - key := genKeyList(row.GetPreColumns(), iIdx, idxCol, row.GetTableID()) + key := genKeyList(row.PreColumns, row.TableInfo, iIdx, idxCol, row.GetTableID()) if len(key) == 0 { continue } @@ -100,20 +100,20 @@ func genRowKeys(row *model.RowChangedEvent) [][]byte { return keys } -func genKeyList( - columns []*model.Column, iIdx int, colIdx []int, tableID int64, -) []byte { +func genKeyList(columns []*model.ColumnData, tb *model.TableInfo, iIdx int, colIdx []int, tableID int64) []byte { var key []byte for _, i := range colIdx { + col := model.GetColumnDataX(columns[i], tb) + // if a column value is null, we can ignore this index // If the index contain generated column, we can't use this key to detect conflict with other DML, // Because such as insert can't specify the generated value. - if columns[i] == nil || columns[i].Value == nil || columns[i].Flag.IsGeneratedColumn() { + if col.ColumnData == nil || col.Value == nil || col.GetFlag().IsGeneratedColumn() { return nil } - val := model.ColumnValueString(columns[i].Value) - if columnNeeds2LowerCase(columns[i].Type, columns[i].Collation) { + val := model.ColumnValueString(col.Value) + if columnNeeds2LowerCase(col.GetType(), col.GetCollation()) { val = strings.ToLower(val) } diff --git a/cdc/sink/dmlsink/txn/mysql/dml.go b/cdc/sink/dmlsink/txn/mysql/dml.go index da29618908f..abbe1067874 100644 --- a/cdc/sink/dmlsink/txn/mysql/dml.go +++ b/cdc/sink/dmlsink/txn/mysql/dml.go @@ -24,18 +24,19 @@ import ( // prepareUpdate builds a parametrics UPDATE statement as following // sql: `UPDATE `test`.`t` SET {} = ?, {} = ? WHERE {} = ?, {} = {} LIMIT 1` // `WHERE` conditions come from `preCols` and SET clause targets come from `cols`. -func prepareUpdate(quoteTable string, preCols, cols []*model.Column, forceReplicate bool) (string, []interface{}) { +func prepareUpdate(quoteTable string, preCols, cols []*model.ColumnData, tb *model.TableInfo, forceReplicate bool) (string, []interface{}) { var builder strings.Builder builder.WriteString("UPDATE " + quoteTable + " SET ") columnNames := make([]string, 0, len(cols)) args := make([]interface{}, 0, len(cols)+len(preCols)) for _, col := range cols { - if col == nil || col.Flag.IsGeneratedColumn() { + colx := model.GetColumnDataX(col, tb) + if colx.ColumnData == nil || colx.GetFlag().IsGeneratedColumn() { continue } - columnNames = append(columnNames, col.Name) - args = appendQueryArgs(args, col) + columnNames = append(columnNames, colx.GetName()) + args = appendQueryArgs(args, colx) } if len(args) == 0 { return "", nil @@ -49,7 +50,7 @@ func prepareUpdate(quoteTable string, preCols, cols []*model.Column, forceReplic } builder.WriteString(" WHERE ") - colNames, wargs := whereSlice(preCols, forceReplicate) + colNames, wargs := whereSlice(preCols, tb, forceReplicate) if len(wargs) == 0 { return "", nil } @@ -73,7 +74,8 @@ func prepareUpdate(quoteTable string, preCols, cols []*model.Column, forceReplic // sql: `REPLACE INTO `test`.`t` VALUES (?,?,?)` func prepareReplace( quoteTable string, - cols []*model.Column, + cols []*model.ColumnData, + tb *model.TableInfo, appendPlaceHolder bool, translateToInsert bool, ) (string, []interface{}) { @@ -81,11 +83,12 @@ func prepareReplace( columnNames := make([]string, 0, len(cols)) args := make([]interface{}, 0, len(cols)) for _, col := range cols { - if col == nil || col.Flag.IsGeneratedColumn() { + colx := model.GetColumnDataX(col, tb) + if colx.ColumnData == nil || colx.GetFlag().IsGeneratedColumn() { continue } - columnNames = append(columnNames, col.Name) - args = appendQueryArgs(args, col) + columnNames = append(columnNames, colx.GetName()) + args = appendQueryArgs(args, colx) } if len(args) == 0 { return "", nil @@ -108,8 +111,9 @@ func prepareReplace( // representation. Because if we use the byte array respresentation, the go-sql-driver // will automatically set `_binary` charset for that column, which is not expected. // See https://github.com/go-sql-driver/mysql/blob/ce134bfc/connection.go#L267 -func appendQueryArgs(args []interface{}, col *model.Column) []interface{} { - if col.Charset != "" && col.Charset != charset.CharsetBin { +func appendQueryArgs(args []interface{}, col model.ColumnDataX) []interface{} { + cst := col.GetCharset() + if cst != "" && cst != charset.CharsetBin { colValBytes, ok := col.Value.([]byte) if ok { args = append(args, string(colValBytes)) @@ -125,11 +129,11 @@ func appendQueryArgs(args []interface{}, col *model.Column) []interface{} { // prepareDelete builds a parametric DELETE statement as following // sql: `DELETE FROM `test`.`t` WHERE x = ? AND y >= ? LIMIT 1` -func prepareDelete(quoteTable string, cols []*model.Column, forceReplicate bool) (string, []interface{}) { +func prepareDelete(quoteTable string, cols []*model.ColumnData, tb *model.TableInfo, forceReplicate bool) (string, []interface{}) { var builder strings.Builder builder.WriteString("DELETE FROM " + quoteTable + " WHERE ") - colNames, wargs := whereSlice(cols, forceReplicate) + colNames, wargs := whereSlice(cols, tb, forceReplicate) if len(wargs) == 0 { return "", nil } @@ -152,24 +156,30 @@ func prepareDelete(quoteTable string, cols []*model.Column, forceReplicate bool) // whereSlice builds a parametric WHERE clause as following // sql: `WHERE {} = ? AND {} > ?` -func whereSlice(cols []*model.Column, forceReplicate bool) (colNames []string, args []interface{}) { - // Try to use unique key values when available - for _, col := range cols { - if col == nil || !col.Flag.IsHandleKey() { - continue - } - colNames = append(colNames, col.Name) - args = appendQueryArgs(args, col) - } - // if no explicit row id but force replicate, use all key-values in where condition +func whereSlice(cols []*model.ColumnData, tb *model.TableInfo, forceReplicate bool) (colNames []string, args []interface{}) { + // If no explicit row id but force replicate, use all key-values in where condition. if len(colNames) == 0 && forceReplicate { colNames = make([]string, 0, len(cols)) args = make([]interface{}, 0, len(cols)) for _, col := range cols { - colNames = append(colNames, col.Name) - args = appendQueryArgs(args, col) + colx := model.GetColumnDataX(col, tb) + if colx.ColumnData == nil { + continue + } + colNames = append(colNames, colx.GetName()) + args = appendQueryArgs(args, colx) + } + } else { // Try to use unique key values when available. + for _, col := range cols { + colx := model.GetColumnDataX(col, tb) + if colx.ColumnData == nil || !colx.GetFlag().IsHandleKey() { + continue + } + colNames = append(colNames, colx.GetName()) + args = appendQueryArgs(args, colx) } } + return } diff --git a/cdc/sink/dmlsink/txn/mysql/mysql.go b/cdc/sink/dmlsink/txn/mysql/mysql.go index 27fddeeafa7..39397e93688 100644 --- a/cdc/sink/dmlsink/txn/mysql/mysql.go +++ b/cdc/sink/dmlsink/txn/mysql/mysql.go @@ -571,8 +571,9 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs { if len(row.PreColumns) != 0 && len(row.Columns) != 0 { query, args = prepareUpdate( quoteTable, - row.GetPreColumns(), - row.GetColumns(), + row.PreColumns, + row.Columns, + row.TableInfo, s.cfg.ForceReplicate) if query != "" { sqls = append(sqls, query) @@ -584,7 +585,7 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs { // Delete Event if len(row.PreColumns) != 0 { - query, args = prepareDelete(quoteTable, row.GetPreColumns(), s.cfg.ForceReplicate) + query, args = prepareDelete(quoteTable, row.PreColumns, row.TableInfo, s.cfg.ForceReplicate) if query != "" { sqls = append(sqls, query) values = append(values, args) @@ -596,11 +597,7 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs { // INSERT(not in safe mode) // or REPLACE(in safe mode) SQL. if len(row.Columns) != 0 { - query, args = prepareReplace( - quoteTable, - row.GetColumns(), - true, /* appendPlaceHolder */ - translateToInsert) + query, args = prepareReplace(quoteTable, row.Columns, row.TableInfo, true, translateToInsert) if query != "" { sqls = append(sqls, query) values = append(values, args) diff --git a/pkg/sink/codec/avro/avro.go b/pkg/sink/codec/avro/avro.go index 10222296a06..fddb939b304 100644 --- a/pkg/sink/codec/avro/avro.go +++ b/pkg/sink/codec/avro/avro.go @@ -49,7 +49,7 @@ type BatchEncoder struct { } type avroEncodeInput struct { - columns []*model.Column + columns []model.ColumnDataX colInfos []rowcodec.ColInfo } diff --git a/pkg/sink/codec/canal/canal_entry.go b/pkg/sink/codec/canal/canal_entry.go index e4724a525c1..212a970e88f 100644 --- a/pkg/sink/codec/canal/canal_entry.go +++ b/pkg/sink/codec/canal/canal_entry.go @@ -22,7 +22,6 @@ import ( "github.com/golang/protobuf/proto" // nolint:staticcheck "github.com/pingcap/errors" mm "github.com/pingcap/tidb/pkg/parser/model" - timodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tiflow/cdc/model" cerror "github.com/pingcap/tiflow/pkg/errors" @@ -117,22 +116,22 @@ func (b *canalEntryBuilder) formatValue(value interface{}, isBinary bool) (resul // build the Column in the canal RowData // see https://github.com/alibaba/canal/blob/b54bea5e3337c9597c427a53071d214ff04628d1/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java#L756-L872 -func (b *canalEntryBuilder) buildColumn(c *model.Column, columnInfo *timodel.ColumnInfo, updated bool) (*canal.Column, error) { - mysqlType := utils.GetMySQLType(columnInfo, b.config.ContentCompatible) - javaType, err := getJavaSQLType(c.Value, c.Type, c.Flag) +func (b *canalEntryBuilder) buildColumn(c model.ColumnDataX, updated bool) (*canal.Column, error) { + mysqlType := utils.GetMySQLType(c.GetColumnInfo(), b.config.ContentCompatible) + javaType, err := getJavaSQLType(c.Value, c.GetType(), c.GetFlag()) if err != nil { return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err) } - value, err := b.formatValue(c.Value, c.Flag.IsBinary()) + value, err := b.formatValue(c.Value, c.GetFlag().IsBinary()) if err != nil { return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err) } canalColumn := &canal.Column{ SqlType: int32(javaType), - Name: c.Name, - IsKey: c.Flag.IsPrimaryKey(), + Name: c.GetName(), + IsKey: c.GetFlag().IsPrimaryKey(), Updated: updated, IsNullPresent: &canal.Column_IsNull{IsNull: c.Value == nil}, Value: value, @@ -144,17 +143,13 @@ func (b *canalEntryBuilder) buildColumn(c *model.Column, columnInfo *timodel.Col // build the RowData of a canal entry func (b *canalEntryBuilder) buildRowData(e *model.RowChangedEvent, onlyHandleKeyColumns bool) (*canal.RowData, error) { var columns []*canal.Column - colInfos := e.TableInfo.GetColInfosForRowChangedEvent() - for idx, column := range e.GetColumns() { - if column == nil { + for _, col := range e.Columns { + column := model.GetColumnDataX(col, e.TableInfo) + if column.ColumnData == nil { continue } - columnInfo, ok := e.TableInfo.GetColumnInfo(colInfos[idx].ID) - if !ok { - return nil, cerror.ErrCanalEncodeFailed.GenWithStack( - "column info not found for column id: %d", colInfos[idx].ID) - } - c, err := b.buildColumn(column, columnInfo, !e.IsDelete()) + + c, err := b.buildColumn(column, !e.IsDelete()) if err != nil { return nil, errors.Trace(err) } @@ -163,19 +158,13 @@ func (b *canalEntryBuilder) buildRowData(e *model.RowChangedEvent, onlyHandleKey onlyHandleKeyColumns = onlyHandleKeyColumns && e.IsDelete() var preColumns []*canal.Column - for idx, column := range e.GetPreColumns() { - if column == nil { - continue - } - if onlyHandleKeyColumns && !column.Flag.IsHandleKey() { + for _, col := range e.PreColumns { + column := model.GetColumnDataX(col, e.TableInfo) + if column.ColumnData == nil || onlyHandleKeyColumns && !column.GetFlag().IsHandleKey() { continue } - columnInfo, ok := e.TableInfo.GetColumnInfo(colInfos[idx].ID) - if !ok { - return nil, cerror.ErrCanalEncodeFailed.GenWithStack( - "column info not found for column id: %d", colInfos[idx].ID) - } - c, err := b.buildColumn(column, columnInfo, !e.IsDelete()) + + c, err := b.buildColumn(column, !e.IsDelete()) if err != nil { return nil, errors.Trace(err) } diff --git a/pkg/sink/codec/canal/canal_json_row_event_encoder.go b/pkg/sink/codec/canal/canal_json_row_event_encoder.go index 76f1f5e52d3..e1efefb3f32 100644 --- a/pkg/sink/codec/canal/canal_json_row_event_encoder.go +++ b/pkg/sink/codec/canal/canal_json_row_event_encoder.go @@ -32,10 +32,11 @@ import ( ) func fillColumns( - columns []*model.Column, + columns []*model.ColumnData, + tb *model.TableInfo, onlyOutputUpdatedColumn bool, onlyHandleKeyColumn bool, - newColumnMap map[string]*model.Column, + newColumnMap map[string]model.ColumnDataX, out *jwriter.Writer, builder *canalEntryBuilder, ) error { @@ -47,12 +48,13 @@ func fillColumns( out.RawByte('{') isFirst := true for _, col := range columns { - if col != nil { + colx := model.GetColumnDataX(col, tb) + if colx.ColumnData != nil { // column equal, do not output it - if onlyOutputUpdatedColumn && shouldIgnoreColumn(col, newColumnMap) { + if onlyOutputUpdatedColumn && shouldIgnoreColumn(colx, newColumnMap) { continue } - if onlyHandleKeyColumn && !col.Flag.IsHandleKey() { + if onlyHandleKeyColumn && !colx.GetFlag().IsHandleKey() { continue } if isFirst { @@ -60,11 +62,11 @@ func fillColumns( } else { out.RawByte(',') } - value, err := builder.formatValue(col.Value, col.Flag.IsBinary()) + value, err := builder.formatValue(colx.Value, colx.GetFlag().IsBinary()) if err != nil { return cerror.WrapError(cerror.ErrCanalEncodeFailed, err) } - out.String(col.Name) + out.String(colx.GetName()) out.RawByte(':') if col.Value == nil { out.RawString("null") @@ -217,41 +219,30 @@ func newJSONMessageForDML( if e.IsDelete() { out.RawString(",\"old\":null") out.RawString(",\"data\":") - if err := fillColumns( - e.GetPreColumns(), - false, onlyHandleKey, nil, out, builder, - ); err != nil { + if err := fillColumns(e.PreColumns, e.TableInfo, false, onlyHandleKey, nil, out, builder); err != nil { return nil, err } } else if e.IsInsert() { out.RawString(",\"old\":null") out.RawString(",\"data\":") - if err := fillColumns( - e.GetColumns(), - false, onlyHandleKey, nil, out, builder, - ); err != nil { + if err := fillColumns(e.Columns, e.TableInfo, false, onlyHandleKey, nil, out, builder); err != nil { return nil, err } } else if e.IsUpdate() { - var newColsMap map[string]*model.Column + var newColsMap map[string]model.ColumnDataX if config.OnlyOutputUpdatedColumns { - newColsMap = make(map[string]*model.Column, len(e.Columns)) - for _, col := range e.GetColumns() { - newColsMap[col.Name] = col + newColsMap = make(map[string]model.ColumnDataX, len(e.Columns)) + for _, col := range e.Columns { + colx := model.GetColumnDataX(col, e.TableInfo) + newColsMap[colx.GetName()] = colx } } out.RawString(",\"old\":") - if err := fillColumns( - e.GetPreColumns(), - config.OnlyOutputUpdatedColumns, onlyHandleKey, newColsMap, out, builder, - ); err != nil { + if err := fillColumns(e.PreColumns, e.TableInfo, config.OnlyOutputUpdatedColumns, onlyHandleKey, newColsMap, out, builder); err != nil { return nil, err } out.RawString(",\"data\":") - if err := fillColumns( - e.GetColumns(), - false, onlyHandleKey, nil, out, builder, - ); err != nil { + if err := fillColumns(e.Columns, e.TableInfo, false, onlyHandleKey, nil, out, builder); err != nil { return nil, err } } else { @@ -549,13 +540,11 @@ func (b *jsonRowEventEncoderBuilder) Build() codec.RowEventEncoder { return newJSONRowEventEncoder(b.config, b.claimCheck) } -func shouldIgnoreColumn(col *model.Column, - newColumnMap map[string]*model.Column, -) bool { - newCol, ok := newColumnMap[col.Name] - if ok && newCol != nil { +func shouldIgnoreColumn(col model.ColumnDataX, newColumnMap map[string]model.ColumnDataX) bool { + newCol, ok := newColumnMap[col.GetName()] + if ok && newCol.ColumnData != nil { // sql type is not equal - if newCol.Type != col.Type { + if newCol.GetType() != col.GetType() { return false } // value equal diff --git a/pkg/sink/codec/internal/column.go b/pkg/sink/codec/internal/column.go index 5aefe9d9a16..41401cb0a77 100644 --- a/pkg/sink/codec/internal/column.go +++ b/pkg/sink/codec/internal/column.go @@ -28,24 +28,23 @@ import ( type Column struct { Type byte `json:"t"` // Deprecated: please use Flag instead. - WhereHandle *bool `json:"h,omitempty"` + WhereHandle bool `json:"h,omitempty"` Flag model.ColumnFlagType `json:"f"` Value any `json:"v"` } // FromRowChangeColumn converts from a row changed column to a codec column. -func (c *Column) FromRowChangeColumn(col *model.Column) { - c.Type = col.Type - c.Flag = col.Flag +func (c *Column) FromRowChangeColumn(col model.ColumnDataX) { + c.Type = col.GetType() + c.Flag = col.GetFlag() if c.Flag.IsHandleKey() { - whereHandle := true - c.WhereHandle = &whereHandle + c.WhereHandle = true } if col.Value == nil { c.Value = nil return } - switch col.Type { + switch c.Type { case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar: var str string switch col.Value.(type) { diff --git a/pkg/sink/codec/open/open_protocol_message.go b/pkg/sink/codec/open/open_protocol_message.go index 77829e69935..42604451bb9 100644 --- a/pkg/sink/codec/open/open_protocol_message.go +++ b/pkg/sink/codec/open/open_protocol_message.go @@ -140,14 +140,14 @@ func rowChangeToMsg( value := &messageRow{} if e.IsDelete() { onlyHandleKeyColumns := config.DeleteOnlyHandleKeyColumns || largeMessageOnlyHandleKeyColumns - value.Delete = rowChangeColumns2CodecColumns(e.GetPreColumns(), onlyHandleKeyColumns) + value.Delete = rowChangeColumns2CodecColumns(e.PreColumns, e.TableInfo, onlyHandleKeyColumns) if onlyHandleKeyColumns && len(value.Delete) == 0 { return nil, nil, cerror.ErrOpenProtocolCodecInvalidData.GenWithStack("not found handle key columns for the delete event") } } else if e.IsUpdate() { - value.Update = rowChangeColumns2CodecColumns(e.GetColumns(), largeMessageOnlyHandleKeyColumns) + value.Update = rowChangeColumns2CodecColumns(e.Columns, e.TableInfo, largeMessageOnlyHandleKeyColumns) if config.OpenOutputOldValue { - value.PreColumns = rowChangeColumns2CodecColumns(e.GetPreColumns(), largeMessageOnlyHandleKeyColumns) + value.PreColumns = rowChangeColumns2CodecColumns(e.PreColumns, e.TableInfo, largeMessageOnlyHandleKeyColumns) } if largeMessageOnlyHandleKeyColumns && (len(value.Update) == 0 || (len(value.PreColumns) == 0 && config.OpenOutputOldValue)) { @@ -157,7 +157,7 @@ func rowChangeToMsg( value.dropNotUpdatedColumns() } } else { - value.Update = rowChangeColumns2CodecColumns(e.GetColumns(), largeMessageOnlyHandleKeyColumns) + value.Update = rowChangeColumns2CodecColumns(e.Columns, e.TableInfo, largeMessageOnlyHandleKeyColumns) if largeMessageOnlyHandleKeyColumns && len(value.Update) == 0 { return nil, nil, cerror.ErrOpenProtocolCodecInvalidData.GenWithStack("not found handle key columns for the insert event") } @@ -198,18 +198,16 @@ func msgToRowChange(key *internal.MessageKey, value *messageRow) *model.RowChang return e } -func rowChangeColumns2CodecColumns(cols []*model.Column, onlyHandleKeyColumns bool) map[string]internal.Column { +func rowChangeColumns2CodecColumns(cols []*model.ColumnData, tb *model.TableInfo, onlyHandleKeyColumns bool) map[string]internal.Column { jsonCols := make(map[string]internal.Column, len(cols)) for _, col := range cols { - if col == nil { - continue - } - if onlyHandleKeyColumns && !col.Flag.IsHandleKey() { - continue - } + colx := model.GetColumnDataX(col, tb) + if colx.ColumnData == nil || onlyHandleKeyColumns && !colx.GetFlag().IsHandleKey() { + continue + } c := internal.Column{} - c.FromRowChangeColumn(col) - jsonCols[col.Name] = c + c.FromRowChangeColumn(colx) + jsonCols[colx.GetName()] = c } if len(jsonCols) == 0 { return nil diff --git a/pkg/util/bitflag.go b/pkg/util/bitflag.go index 58c446794ca..089e521ec05 100644 --- a/pkg/util/bitflag.go +++ b/pkg/util/bitflag.go @@ -17,9 +17,9 @@ package util type Flag uint64 // HasAll means has all flags -func (f *Flag) HasAll(flags ...Flag) bool { +func (f Flag) HasAll(flags ...Flag) bool { for _, flag := range flags { - if flag&*f == 0 { + if flag&f == 0 { return false } } @@ -27,9 +27,9 @@ func (f *Flag) HasAll(flags ...Flag) bool { } // HasOne means has one of the flags -func (f *Flag) HasOne(flags ...Flag) bool { +func (f Flag) HasOne(flags ...Flag) bool { for _, flag := range flags { - if flag&*f != 0 { + if flag&f != 0 { return true } } From a6f79375ec0da3b062af3a9376ced15a79b45fa9 Mon Sep 17 00:00:00 2001 From: qupeng Date: Tue, 24 Sep 2024 15:10:53 +0800 Subject: [PATCH 06/12] fix Signed-off-by: qupeng --- cdc/model/sink.go | 30 ++++--- pkg/sink/codec/avro/avro.go | 2 +- pkg/sink/codec/craft/model.go | 24 ++--- pkg/sink/codec/csv/csv_message.go | 18 ++-- pkg/sink/codec/debezium/codec.go | 142 +++++++++++++++--------------- 5 files changed, 113 insertions(+), 103 deletions(-) diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 535f9e10f09..55d5db4e128 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -154,13 +154,13 @@ func (b *ColumnFlagType) UnsetIsUniqueKey() { } // IsUniqueKey shows whether UniqueKeyFlag is set -func (b *ColumnFlagType) IsUniqueKey() bool { - return (*util.Flag)(b).HasAll(util.Flag(UniqueKeyFlag)) +func (b ColumnFlagType) IsUniqueKey() bool { + return (util.Flag)(b).HasAll(util.Flag(UniqueKeyFlag)) } // IsMultipleKey shows whether MultipleKeyFlag is set -func (b *ColumnFlagType) IsMultipleKey() bool { - return (*util.Flag)(b).HasAll(util.Flag(MultipleKeyFlag)) +func (b ColumnFlagType) IsMultipleKey() bool { + return (util.Flag)(b).HasAll(util.Flag(MultipleKeyFlag)) } // SetIsMultipleKey sets MultipleKeyFlag @@ -174,8 +174,8 @@ func (b *ColumnFlagType) UnsetIsMultipleKey() { } // IsNullable shows whether NullableFlag is set -func (b *ColumnFlagType) IsNullable() bool { - return (*util.Flag)(b).HasAll(util.Flag(NullableFlag)) +func (b ColumnFlagType) IsNullable() bool { + return (util.Flag)(b).HasAll(util.Flag(NullableFlag)) } // SetIsNullable sets NullableFlag @@ -189,8 +189,8 @@ func (b *ColumnFlagType) UnsetIsNullable() { } // IsUnsigned shows whether UnsignedFlag is set -func (b *ColumnFlagType) IsUnsigned() bool { - return (*util.Flag)(b).HasAll(util.Flag(UnsignedFlag)) +func (b ColumnFlagType) IsUnsigned() bool { + return (util.Flag)(b).HasAll(util.Flag(UnsignedFlag)) } // SetIsUnsigned sets UnsignedFlag @@ -555,8 +555,8 @@ func (r *RowChangedEvent) GetHandleKeyColumnValues() []string { } // HandleKeyColInfos returns the column(s) and colInfo(s) corresponding to the handle key(s) -func (r *RowChangedEvent) HandleKeyColInfos() ([]ColumnDataX, []rowcodec.ColInfo) { - pkeyCols := make([]ColumnDataX, 0) +func (r *RowChangedEvent) HandleKeyColInfos() ([]*Column, []rowcodec.ColInfo) { + pkeyCols := make([]*Column, 0) pkeyColInfos := make([]rowcodec.ColInfo, 0) var cols []*ColumnData @@ -570,7 +570,7 @@ func (r *RowChangedEvent) HandleKeyColInfos() ([]ColumnDataX, []rowcodec.ColInfo colInfos := tableInfo.GetColInfosForRowChangedEvent() for i, col := range cols { if col != nil && tableInfo.ForceGetColumnFlagType(col.ColumnID).IsHandleKey() { - pkeyCols = append(pkeyCols, GetColumnDataX(col, tableInfo)) + pkeyCols = append(pkeyCols, columnData2Column(col, tableInfo)) pkeyColInfos = append(pkeyColInfos, colInfos[i]) } } @@ -1310,6 +1310,14 @@ func GetColumnDataX(col *ColumnData, tb *TableInfo) ColumnDataX { return x } +func TransColumnDataX(cols []*ColumnData, tb *TableInfo) []ColumnDataX { + colxs := make([]ColumnDataX, 0, len(cols)) + for _, col := range cols { + colxs = append(colxs, GetColumnDataX(col, tb)) + } + return colxs +} + func (x ColumnDataX) GetName() string { return x.info.Name.O } diff --git a/pkg/sink/codec/avro/avro.go b/pkg/sink/codec/avro/avro.go index fddb939b304..10222296a06 100644 --- a/pkg/sink/codec/avro/avro.go +++ b/pkg/sink/codec/avro/avro.go @@ -49,7 +49,7 @@ type BatchEncoder struct { } type avroEncodeInput struct { - columns []model.ColumnDataX + columns []*model.Column colInfos []rowcodec.ColInfo } diff --git a/pkg/sink/codec/craft/model.go b/pkg/sink/codec/craft/model.go index b6a90cb3c61..cbccf444cad 100644 --- a/pkg/sink/codec/craft/model.go +++ b/pkg/sink/codec/craft/model.go @@ -366,7 +366,7 @@ func decodeColumnGroup(bits []byte, allocator *SliceAllocator, dict *termDiction }, nil } -func newColumnGroup(allocator *SliceAllocator, ty byte, columns []*model.Column, onlyHandleKeyColumns bool) (int, *columnGroup) { +func newColumnGroup(allocator *SliceAllocator, ty byte, columns []*model.ColumnData, tb *model.TableInfo, onlyHandleKeyColumns bool) (int, *columnGroup) { l := len(columns) if l == 0 { return 0, nil @@ -378,18 +378,16 @@ func newColumnGroup(allocator *SliceAllocator, ty byte, columns []*model.Column, estimatedSize := 0 idx := 0 for _, col := range columns { - if col == nil { + colx := model.GetColumnDataX(col, tb) + if colx.ColumnData == nil || onlyHandleKeyColumns && !colx.GetFlag().IsHandleKey() { continue } - if onlyHandleKeyColumns && !col.Flag.IsHandleKey() { - continue - } - names[idx] = col.Name - types[idx] = uint64(col.Type) - flags[idx] = uint64(col.Flag) - value := EncodeTiDBType(allocator, col.Type, col.Flag, col.Value) + names[idx] = colx.GetName() + types[idx] = uint64(colx.GetType()) + flags[idx] = uint64(colx.GetFlag()) + value := EncodeTiDBType(allocator, colx.GetType(), colx.GetFlag(), colx.Value) values[idx] = value - estimatedSize += len(col.Name) + len(value) + 16 /* two 64-bits integers */ + estimatedSize += len(colx.GetName()) + len(value) + 16 /* two 64-bits integers */ idx++ } if idx > 0 { @@ -421,7 +419,8 @@ func newRowChangedMessage(allocator *SliceAllocator, ev *model.RowChangedEvent, if size, group := newColumnGroup( allocator, columnGroupTypeNew, - ev.GetColumns(), + ev.Columns, + ev.TableInfo, false); group != nil { groups[idx] = group idx++ @@ -431,7 +430,8 @@ func newRowChangedMessage(allocator *SliceAllocator, ev *model.RowChangedEvent, if size, group := newColumnGroup( allocator, columnGroupTypeOld, - ev.GetPreColumns(), + ev.PreColumns, + ev.TableInfo, onlyHandleKeyColumns); group != nil { groups[idx] = group estimatedSize += size diff --git a/pkg/sink/codec/csv/csv_message.go b/pkg/sink/codec/csv/csv_message.go index 32486b2fe82..99a51042e9f 100644 --- a/pkg/sink/codec/csv/csv_message.go +++ b/pkg/sink/codec/csv/csv_message.go @@ -317,15 +317,15 @@ func fromCsvValToColValue(csvConfig *common.Config, csvVal any, ft types.FieldTy } // fromColValToCsvVal converts column from TiDB type to csv type. -func fromColValToCsvVal(csvConfig *common.Config, col *model.Column, ft *types.FieldType) (any, error) { +func fromColValToCsvVal(csvConfig *common.Config, col model.ColumnDataX, ft *types.FieldType) (any, error) { if col.Value == nil { return nil, nil } - switch col.Type { + switch col.GetType() { case mysql.TypeVarchar, mysql.TypeString, mysql.TypeVarString, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob: - if col.Flag.IsBinary() { + if col.GetFlag().IsBinary() { if v, ok := col.Value.([]byte); ok { switch csvConfig.BinaryEncodingMethod { case config.BinaryEncodingBase64: @@ -385,7 +385,7 @@ func rowChangedEvent2CSVMsg(csvConfig *common.Config, e *model.RowChangedEvent) if e.IsDelete() { csvMsg.opType = operationDelete - csvMsg.columns, err = rowChangeColumns2CSVColumns(csvConfig, e.GetPreColumns(), e.TableInfo) + csvMsg.columns, err = rowChangeColumns2CSVColumns(csvConfig, e.PreColumns, e.TableInfo) if err != nil { return nil, err } @@ -393,7 +393,7 @@ func rowChangedEvent2CSVMsg(csvConfig *common.Config, e *model.RowChangedEvent) if e.PreColumns == nil { // This is a insert operation. csvMsg.opType = operationInsert - csvMsg.columns, err = rowChangeColumns2CSVColumns(csvConfig, e.GetColumns(), e.TableInfo) + csvMsg.columns, err = rowChangeColumns2CSVColumns(csvConfig, e.Columns, e.TableInfo) if err != nil { return nil, err } @@ -406,12 +406,12 @@ func rowChangedEvent2CSVMsg(csvConfig *common.Config, e *model.RowChangedEvent) fmt.Errorf("the column length of preColumns %d doesn't equal to that of columns %d", len(e.PreColumns), len(e.Columns))) } - csvMsg.preColumns, err = rowChangeColumns2CSVColumns(csvConfig, e.GetPreColumns(), e.TableInfo) + csvMsg.preColumns, err = rowChangeColumns2CSVColumns(csvConfig, e.PreColumns, e.TableInfo) if err != nil { return nil, err } } - csvMsg.columns, err = rowChangeColumns2CSVColumns(csvConfig, e.GetColumns(), e.TableInfo) + csvMsg.columns, err = rowChangeColumns2CSVColumns(csvConfig, e.Columns, e.TableInfo) if err != nil { return nil, err } @@ -444,7 +444,7 @@ func csvMsg2RowChangedEvent(csvConfig *common.Config, csvMsg *csvMessage, tableI return e, nil } -func rowChangeColumns2CSVColumns(csvConfig *common.Config, cols []*model.Column, tableInfo *model.TableInfo) ([]any, error) { +func rowChangeColumns2CSVColumns(csvConfig *common.Config, cols []*model.ColumnData, tableInfo *model.TableInfo) ([]any, error) { var csvColumns []any colInfos := tableInfo.GetColInfosForRowChangedEvent() for i, column := range cols { @@ -454,7 +454,7 @@ func rowChangeColumns2CSVColumns(csvConfig *common.Config, cols []*model.Column, continue } - converted, err := fromColValToCsvVal(csvConfig, column, colInfos[i].Ft) + converted, err := fromColValToCsvVal(csvConfig, model.GetColumnDataX(column, tableInfo), colInfos[i].Ft) if err != nil { return nil, errors.Trace(err) } diff --git a/pkg/sink/codec/debezium/codec.go b/pkg/sink/codec/debezium/codec.go index 22a9bca7940..8b219b1aefc 100644 --- a/pkg/sink/codec/debezium/codec.go +++ b/pkg/sink/codec/debezium/codec.go @@ -43,14 +43,15 @@ type dbzCodec struct { func (c *dbzCodec) writeDebeziumFieldValues( writer *util.JSONWriter, fieldName string, - cols []*model.Column, + cols []*model.ColumnData, tableInfo *model.TableInfo, ) error { var err error colInfos := tableInfo.GetColInfosForRowChangedEvent() writer.WriteObjectField(fieldName, func() { for i, col := range cols { - err = c.writeDebeziumFieldValue(writer, col, colInfos[i].Ft) + colx := model.GetColumnDataX(col, tableInfo) + err = c.writeDebeziumFieldValue(writer, colx, colInfos[i].Ft) if err != nil { break } @@ -61,17 +62,17 @@ func (c *dbzCodec) writeDebeziumFieldValues( func (c *dbzCodec) writeDebeziumFieldSchema( writer *util.JSONWriter, - col *model.Column, + col model.ColumnDataX, ft *types.FieldType, ) { - switch col.Type { + switch col.GetType() { case mysql.TypeBit: n := ft.GetFlen() if n == 1 { writer.WriteObjectElement(func() { writer.WriteStringField("type", "boolean") writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) } else { writer.WriteObjectElement(func() { @@ -82,7 +83,7 @@ func (c *dbzCodec) writeDebeziumFieldSchema( writer.WriteObjectField("parameters", func() { writer.WriteStringField("length", fmt.Sprintf("%d", n)) }) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) } @@ -91,7 +92,7 @@ func (c *dbzCodec) writeDebeziumFieldSchema( writer.WriteObjectElement(func() { writer.WriteStringField("type", "string") writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) case mysql.TypeEnum: @@ -103,7 +104,7 @@ func (c *dbzCodec) writeDebeziumFieldSchema( writer.WriteObjectField("parameters", func() { writer.WriteStringField("allowed", strings.Join(ft.GetElems(), ",")) }) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) case mysql.TypeSet: @@ -115,14 +116,14 @@ func (c *dbzCodec) writeDebeziumFieldSchema( writer.WriteObjectField("parameters", func() { writer.WriteStringField("allowed", strings.Join(ft.GetElems(), ",")) }) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) case mysql.TypeNewDecimal: writer.WriteObjectElement(func() { writer.WriteStringField("type", "double") writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) case mysql.TypeDate, mysql.TypeNewDate: @@ -131,7 +132,7 @@ func (c *dbzCodec) writeDebeziumFieldSchema( writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) writer.WriteStringField("name", "io.debezium.time.Date") writer.WriteIntField("version", 1) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) case mysql.TypeDatetime: @@ -144,7 +145,7 @@ func (c *dbzCodec) writeDebeziumFieldSchema( writer.WriteStringField("name", "io.debezium.time.MicroTimestamp") } writer.WriteIntField("version", 1) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) case mysql.TypeTimestamp: @@ -153,7 +154,7 @@ func (c *dbzCodec) writeDebeziumFieldSchema( writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) writer.WriteStringField("name", "io.debezium.time.ZonedTimestamp") writer.WriteIntField("version", 1) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) case mysql.TypeDuration: @@ -162,7 +163,7 @@ func (c *dbzCodec) writeDebeziumFieldSchema( writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) writer.WriteStringField("name", "io.debezium.time.MicroTime") writer.WriteIntField("version", 1) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) case mysql.TypeJSON: @@ -171,14 +172,14 @@ func (c *dbzCodec) writeDebeziumFieldSchema( writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) writer.WriteStringField("name", "io.debezium.data.Json") writer.WriteIntField("version", 1) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) case mysql.TypeTiny: // TINYINT writer.WriteObjectElement(func() { writer.WriteStringField("type", "int16") writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) case mysql.TypeShort: // SMALLINT @@ -189,14 +190,14 @@ func (c *dbzCodec) writeDebeziumFieldSchema( writer.WriteStringField("type", "int16") } writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) case mysql.TypeInt24: // MEDIUMINT writer.WriteObjectElement(func() { writer.WriteStringField("type", "int32") writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) case mysql.TypeLong: // INT @@ -207,28 +208,28 @@ func (c *dbzCodec) writeDebeziumFieldSchema( writer.WriteStringField("type", "int32") } writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) case mysql.TypeLonglong: // BIGINT writer.WriteObjectElement(func() { writer.WriteStringField("type", "int64") writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) case mysql.TypeFloat: writer.WriteObjectElement(func() { writer.WriteStringField("type", "float") writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) case mysql.TypeDouble: writer.WriteObjectElement(func() { writer.WriteStringField("type", "double") writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) case mysql.TypeYear: @@ -237,14 +238,14 @@ func (c *dbzCodec) writeDebeziumFieldSchema( writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) writer.WriteStringField("name", "io.debezium.time.Year") writer.WriteIntField("version", 1) - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) default: log.Warn( "meet unsupported field type", - zap.Any("fieldType", col.Type), - zap.Any("column", col.Name), + zap.Any("fieldType", col.GetType()), + zap.Any("column", col.GetName()), ) } } @@ -254,21 +255,21 @@ func (c *dbzCodec) writeDebeziumFieldSchema( //revive:disable indent-error-flow func (c *dbzCodec) writeDebeziumFieldValue( writer *util.JSONWriter, - col *model.Column, + col model.ColumnDataX, ft *types.FieldType, ) error { if col.Value == nil { - writer.WriteNullField(col.Name) + writer.WriteNullField(col.GetName()) return nil } - switch col.Type { + switch col.GetType() { case mysql.TypeBit: v, ok := col.Value.(uint64) if !ok { return cerror.ErrDebeziumEncodeFailed.GenWithStack( "unexpected column value type %T for bit column %s", col.Value, - col.Name) + col.GetName()) } // Debezium behavior: @@ -277,7 +278,7 @@ func (c *dbzCodec) writeDebeziumFieldValue( // contain the specified number of bits. n := ft.GetFlen() if n == 1 { - writer.WriteBoolField(col.Name, v != 0) + writer.WriteBoolField(col.GetName(), v != 0) return nil } else { var buf [8]byte @@ -286,7 +287,7 @@ func (c *dbzCodec) writeDebeziumFieldValue( if n%8 != 0 { numBytes += 1 } - c.writeBinaryField(writer, col.Name, buf[:numBytes]) + c.writeBinaryField(writer, col.GetName(), buf[:numBytes]) return nil } @@ -297,14 +298,14 @@ func (c *dbzCodec) writeDebeziumFieldValue( return cerror.ErrDebeziumEncodeFailed.GenWithStack( "unexpected column value type %T for string column %s", col.Value, - col.Name) + col.GetName()) } - if col.Flag.IsBinary() { - c.writeBinaryField(writer, col.Name, v) + if col.GetFlag().IsBinary() { + c.writeBinaryField(writer, col.GetName(), v) return nil } else { - writer.WriteStringField(col.Name, string(hack.String(v))) + writer.WriteStringField(col.GetName(), string(hack.String(v))) return nil } @@ -314,17 +315,17 @@ func (c *dbzCodec) writeDebeziumFieldValue( return cerror.ErrDebeziumEncodeFailed.GenWithStack( "unexpected column value type %T for enum column %s", col.Value, - col.Name) + col.GetName()) } enumVar, err := types.ParseEnumValue(ft.GetElems(), v) if err != nil { // Invalid enum value inserted in non-strict mode. - writer.WriteStringField(col.Name, "") + writer.WriteStringField(col.GetName(), "") return nil } - writer.WriteStringField(col.Name, enumVar.Name) + writer.WriteStringField(col.GetName(), enumVar.Name) return nil case mysql.TypeSet: @@ -333,17 +334,17 @@ func (c *dbzCodec) writeDebeziumFieldValue( return cerror.ErrDebeziumEncodeFailed.GenWithStack( "unexpected column value type %T for set column %s", col.Value, - col.Name) + col.GetName()) } setVar, err := types.ParseSetValue(ft.GetElems(), v) if err != nil { // Invalid enum value inserted in non-strict mode. - writer.WriteStringField(col.Name, "") + writer.WriteStringField(col.GetName(), "") return nil } - writer.WriteStringField(col.Name, setVar.Name) + writer.WriteStringField(col.GetName(), setVar.Name) return nil case mysql.TypeNewDecimal: @@ -352,7 +353,7 @@ func (c *dbzCodec) writeDebeziumFieldValue( return cerror.ErrDebeziumEncodeFailed.GenWithStack( "unexpected column value type %T for decimal column %s", col.Value, - col.Name) + col.GetName()) } floatV, err := strconv.ParseFloat(v, 64) @@ -362,7 +363,7 @@ func (c *dbzCodec) writeDebeziumFieldValue( err) } - writer.WriteFloat64Field(col.Name, floatV) + writer.WriteFloat64Field(col.GetName(), floatV) return nil case mysql.TypeDate, mysql.TypeNewDate: @@ -371,7 +372,7 @@ func (c *dbzCodec) writeDebeziumFieldValue( return cerror.ErrDebeziumEncodeFailed.GenWithStack( "unexpected column value type %T for date column %s", col.Value, - col.Name) + col.GetName()) } t, err := time.Parse("2006-01-02", v) @@ -379,15 +380,15 @@ func (c *dbzCodec) writeDebeziumFieldValue( // For example, time may be invalid like 1000-00-00 // return nil, nil if mysql.HasNotNullFlag(ft.GetFlag()) { - writer.WriteInt64Field(col.Name, 0) + writer.WriteInt64Field(col.GetName(), 0) return nil } else { - writer.WriteNullField(col.Name) + writer.WriteNullField(col.GetName()) return nil } } - writer.WriteInt64Field(col.Name, t.Unix()/60/60/24) + writer.WriteInt64Field(col.GetName(), t.Unix()/60/60/24) return nil case mysql.TypeDatetime: @@ -401,26 +402,26 @@ func (c *dbzCodec) writeDebeziumFieldValue( return cerror.ErrDebeziumEncodeFailed.GenWithStack( "unexpected column value type %T for datetime column %s", col.Value, - col.Name) + col.GetName()) } t, err := time.Parse("2006-01-02 15:04:05.999999", v) if err != nil { // For example, time may be 1000-00-00 if mysql.HasNotNullFlag(ft.GetFlag()) { - writer.WriteInt64Field(col.Name, 0) + writer.WriteInt64Field(col.GetName(), 0) return nil } else { - writer.WriteNullField(col.Name) + writer.WriteNullField(col.GetName()) return nil } } if ft.GetDecimal() <= 3 { - writer.WriteInt64Field(col.Name, t.UnixMilli()) + writer.WriteInt64Field(col.GetName(), t.UnixMilli()) return nil } else { - writer.WriteInt64Field(col.Name, t.UnixMicro()) + writer.WriteInt64Field(col.GetName(), t.UnixMicro()) return nil } @@ -439,7 +440,7 @@ func (c *dbzCodec) writeDebeziumFieldValue( return cerror.ErrDebeziumEncodeFailed.GenWithStack( "unexpected column value type %T for timestamp column %s", col.Value, - col.Name) + col.GetName()) } t, err := time.ParseInLocation("2006-01-02 15:04:05.999999", v, c.config.TimeZone) @@ -448,7 +449,7 @@ func (c *dbzCodec) writeDebeziumFieldValue( if mysql.HasNotNullFlag(ft.GetFlag()) { t = time.Unix(0, 0) } else { - writer.WriteNullField(col.Name) + writer.WriteNullField(col.GetName()) return nil } } @@ -461,7 +462,7 @@ func (c *dbzCodec) writeDebeziumFieldValue( } str += "Z" - writer.WriteStringField(col.Name, str) + writer.WriteStringField(col.GetName(), str) return nil case mysql.TypeDuration: @@ -473,7 +474,7 @@ func (c *dbzCodec) writeDebeziumFieldValue( return cerror.ErrDebeziumEncodeFailed.GenWithStack( "unexpected column value type %T for time column %s", col.Value, - col.Name) + col.GetName()) } d, _, _, err := types.StrToDuration(types.DefaultStmtNoWarningContext, v, ft.GetDecimal()) @@ -483,11 +484,11 @@ func (c *dbzCodec) writeDebeziumFieldValue( err) } - writer.WriteInt64Field(col.Name, d.Microseconds()) + writer.WriteInt64Field(col.GetName(), d.Microseconds()) return nil case mysql.TypeLonglong: - if col.Flag.IsUnsigned() { + if col.GetFlag().IsUnsigned() { // Handle with BIGINT UNSIGNED. // Debezium always produce INT64 instead of UINT64 for BIGINT. v, ok := col.Value.(uint64) @@ -495,10 +496,10 @@ func (c *dbzCodec) writeDebeziumFieldValue( return cerror.ErrDebeziumEncodeFailed.GenWithStack( "unexpected column value type %T for unsigned bigint column %s", col.Value, - col.Name) + col.GetName()) } - writer.WriteInt64Field(col.Name, int64(v)) + writer.WriteInt64Field(col.GetName(), int64(v)) return nil } @@ -507,7 +508,7 @@ func (c *dbzCodec) writeDebeziumFieldValue( // So we only handle with TypeLonglong here. } - writer.WriteAnyField(col.Name, col.Value) + writer.WriteAnyField(col.GetName(), col.Value) return nil } @@ -577,18 +578,18 @@ func (c *dbzCodec) EncodeRowChangedEvent( // after: An optional field that specifies the state of the row after the event occurred. // Optional field that specifies the state of the row after the event occurred. // In a delete event value, the after field is null, signifying that the row no longer exists. - err = c.writeDebeziumFieldValues(jWriter, "after", e.GetColumns(), e.TableInfo) + err = c.writeDebeziumFieldValues(jWriter, "after", e.Columns, e.TableInfo) } else if e.IsDelete() { jWriter.WriteStringField("op", "d") jWriter.WriteNullField("after") - err = c.writeDebeziumFieldValues(jWriter, "before", e.GetPreColumns(), e.TableInfo) + err = c.writeDebeziumFieldValues(jWriter, "before", e.PreColumns, e.TableInfo) } else if e.IsUpdate() { jWriter.WriteStringField("op", "u") if c.config.DebeziumOutputOldValue { - err = c.writeDebeziumFieldValues(jWriter, "before", e.GetPreColumns(), e.TableInfo) + err = c.writeDebeziumFieldValues(jWriter, "before", e.PreColumns, e.TableInfo) } if err == nil { - err = c.writeDebeziumFieldValues(jWriter, "after", e.GetColumns(), e.TableInfo) + err = c.writeDebeziumFieldValues(jWriter, "after", e.Columns, e.TableInfo) } } }) @@ -609,17 +610,18 @@ func (c *dbzCodec) EncodeRowChangedEvent( { fieldsBuf := &bytes.Buffer{} fieldsWriter := util.BorrowJSONWriter(fieldsBuf) - var validCols []*model.Column + var validCols []*model.ColumnData if e.IsInsert() { - validCols = e.GetColumns() + validCols = e.Columns } else if e.IsDelete() { - validCols = e.GetPreColumns() + validCols = e.PreColumns } else if e.IsUpdate() { - validCols = e.GetColumns() + validCols = e.Columns } colInfos := e.TableInfo.GetColInfosForRowChangedEvent() for i, col := range validCols { - c.writeDebeziumFieldSchema(fieldsWriter, col, colInfos[i].Ft) + colx := model.GetColumnDataX(col, e.TableInfo) + c.writeDebeziumFieldSchema(fieldsWriter, colx, colInfos[i].Ft) } util.ReturnJSONWriter(fieldsWriter) fieldsJSON = fieldsBuf.String() From d068ae1ff1064f8660be6267f4acf20259849fa9 Mon Sep 17 00:00:00 2001 From: qupeng Date: Tue, 24 Sep 2024 15:14:16 +0800 Subject: [PATCH 07/12] fix Signed-off-by: qupeng --- cdc/model/sink.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 55d5db4e128..756fcf5481a 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -1295,12 +1295,14 @@ type TopicPartitionKey struct { TotalPartition int32 } +// ColumnDataX is like ColumnData, but contains more informations. type ColumnDataX struct { *ColumnData flag *ColumnFlagType info *model.ColumnInfo } +// GetColumnDataX encapsures ColumnData to ColumnDataX. func GetColumnDataX(col *ColumnData, tb *TableInfo) ColumnDataX { x := ColumnDataX{ColumnData: col} if x.ColumnData != nil { @@ -1310,38 +1312,37 @@ func GetColumnDataX(col *ColumnData, tb *TableInfo) ColumnDataX { return x } -func TransColumnDataX(cols []*ColumnData, tb *TableInfo) []ColumnDataX { - colxs := make([]ColumnDataX, 0, len(cols)) - for _, col := range cols { - colxs = append(colxs, GetColumnDataX(col, tb)) - } - return colxs -} - +// GetName returns name. func (x ColumnDataX) GetName() string { return x.info.Name.O } +// GetType returns type. func (x ColumnDataX) GetType() byte { return x.info.GetType() } +// GetCharset returns charset. func (x ColumnDataX) GetCharset() string { return x.info.GetCharset() } +// GetCollation returns collation. func (x ColumnDataX) GetCollation() string { return x.info.GetCollate() } +// GetFlag returns flag. func (x ColumnDataX) GetFlag() ColumnFlagType { return *x.flag } +// GetDefaultValue return default value. func (x ColumnDataX) GetDefaultValue() interface{} { return GetColumnDefaultValue(x.info) } +// GetColumnInfo returns column info. func (x ColumnDataX) GetColumnInfo() *model.ColumnInfo { return x.info } From 6310bce31468a3854024117296fefa65bf30662c Mon Sep 17 00:00:00 2001 From: qupeng Date: Tue, 24 Sep 2024 17:44:33 +0800 Subject: [PATCH 08/12] fix tests Signed-off-by: qupeng --- cdc/model/sink.go | 31 ++++++++++++++++++++++++++ cdc/sink/dmlsink/txn/event_test.go | 12 +++++----- cdc/sink/dmlsink/txn/mysql/dml.go | 24 +++++++++----------- cdc/sink/dmlsink/txn/mysql/dml_test.go | 17 +++++++++----- 4 files changed, 58 insertions(+), 26 deletions(-) diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 019728f40fd..3632072bfbd 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -1347,3 +1347,34 @@ func (x ColumnDataX) GetDefaultValue() interface{} { func (x ColumnDataX) GetColumnInfo() *model.ColumnInfo { return x.info } + +// Column2ColumnDataForTest is for tests. +func Column2ColumnDataForTest(columns []*Column) ([]*ColumnData, *TableInfo) { + info := &TableInfo{ + TableInfo: &model.TableInfo{ + Columns: make([]*model.ColumnInfo, len(columns)), + }, + ColumnsFlag: make(map[int64]*ColumnFlagType, len(columns)), + columnsOffset: make(map[int64]int), + } + colDatas := make([]*ColumnData, 0, len(columns)) + + for i, column := range columns { + var columnID int64 = int64(i) + info.columnsOffset[columnID] = i + + info.Columns[i] = &model.ColumnInfo{} + info.Columns[i].Name.O = column.Name + info.Columns[i].SetType(column.Type) + info.Columns[i].SetCharset(column.Charset) + info.Columns[i].SetCollate(column.Collation) + info.Columns[i].DefaultValue = column.Default + + info.ColumnsFlag[columnID] = new(ColumnFlagType) + *info.ColumnsFlag[columnID] = column.Flag + + colDatas = append(colDatas, &ColumnData{ColumnID: columnID, Value: column.Value}) + } + + return colDatas, info +} diff --git a/cdc/sink/dmlsink/txn/event_test.go b/cdc/sink/dmlsink/txn/event_test.go index bdbb5cd7db9..31e2e05bf16 100644 --- a/cdc/sink/dmlsink/txn/event_test.go +++ b/cdc/sink/dmlsink/txn/event_test.go @@ -25,24 +25,24 @@ import ( func TestGenKeyListCaseInSensitive(t *testing.T) { t.Parallel() - columns := []*model.Column{ + columns, tb := model.Column2ColumnDataForTest([]*model.Column{ { Value: "XyZ", Type: mysql.TypeVarchar, Collation: "utf8_unicode_ci", }, - } + }) - first := genKeyList(columns, 0, []int{0}, 1) + first := genKeyList(columns, tb, 0, []int{0}, 1) - columns = []*model.Column{ + columns, tb = model.Column2ColumnDataForTest([]*model.Column{ { Value: "xYZ", Type: mysql.TypeVarchar, Collation: "utf8_unicode_ci", }, - } - second := genKeyList(columns, 0, []int{0}, 1) + }) + second := genKeyList(columns, tb, 0, []int{0}, 1) require.Equal(t, first, second) } diff --git a/cdc/sink/dmlsink/txn/mysql/dml.go b/cdc/sink/dmlsink/txn/mysql/dml.go index abbe1067874..92802c9ad41 100644 --- a/cdc/sink/dmlsink/txn/mysql/dml.go +++ b/cdc/sink/dmlsink/txn/mysql/dml.go @@ -157,29 +157,25 @@ func prepareDelete(quoteTable string, cols []*model.ColumnData, tb *model.TableI // whereSlice builds a parametric WHERE clause as following // sql: `WHERE {} = ? AND {} > ?` func whereSlice(cols []*model.ColumnData, tb *model.TableInfo, forceReplicate bool) (colNames []string, args []interface{}) { - // If no explicit row id but force replicate, use all key-values in where condition. + // Try to use unique key values when available + for _, col := range cols { + colx := model.GetColumnDataX(col, tb) + if colx.ColumnData == nil || !colx.GetFlag().IsHandleKey() { + continue + } + colNames = append(colNames, colx.GetName()) + args = appendQueryArgs(args, colx) + } + // if no explicit row id but force replicate, use all key-values in where condition if len(colNames) == 0 && forceReplicate { colNames = make([]string, 0, len(cols)) args = make([]interface{}, 0, len(cols)) for _, col := range cols { colx := model.GetColumnDataX(col, tb) - if colx.ColumnData == nil { - continue - } - colNames = append(colNames, colx.GetName()) - args = appendQueryArgs(args, colx) - } - } else { // Try to use unique key values when available. - for _, col := range cols { - colx := model.GetColumnDataX(col, tb) - if colx.ColumnData == nil || !colx.GetFlag().IsHandleKey() { - continue - } colNames = append(colNames, colx.GetName()) args = appendQueryArgs(args, colx) } } - return } diff --git a/cdc/sink/dmlsink/txn/mysql/dml_test.go b/cdc/sink/dmlsink/txn/mysql/dml_test.go index dab7c4b104f..20f22d63c2b 100644 --- a/cdc/sink/dmlsink/txn/mysql/dml_test.go +++ b/cdc/sink/dmlsink/txn/mysql/dml_test.go @@ -250,7 +250,9 @@ func TestPrepareUpdate(t *testing.T) { }, } for _, tc := range testCases { - query, args := prepareUpdate(tc.quoteTable, tc.preCols, tc.cols, false) + preDatas, info := model.Column2ColumnDataForTest(tc.preCols) + datas, _ := model.Column2ColumnDataForTest(tc.cols) + query, args := prepareUpdate(tc.quoteTable, preDatas, datas, info, false) require.Equal(t, tc.expectedSQL, query) require.Equal(t, tc.expectedArgs, args) } @@ -392,7 +394,8 @@ func TestPrepareDelete(t *testing.T) { }, } for _, tc := range testCases { - query, args := prepareDelete(tc.quoteTable, tc.preCols, false) + preDatas, info := model.Column2ColumnDataForTest(tc.preCols) + query, args := prepareDelete(tc.quoteTable, preDatas, info, false) require.Equal(t, tc.expectedSQL, query) require.Equal(t, tc.expectedArgs, args) } @@ -601,9 +604,10 @@ func TestWhereSlice(t *testing.T) { expectedArgs: []interface{}{1, "你好", 100}, }, } - for _, tc := range testCases { - colNames, args := whereSlice(tc.cols, tc.forceReplicate) - require.Equal(t, tc.expectedColNames, colNames) + for i, tc := range testCases { + datas, info := model.Column2ColumnDataForTest(tc.cols) + colNames, args := whereSlice(datas, info, tc.forceReplicate) + require.Equal(t, tc.expectedColNames, colNames, "case %d fails", i) require.Equal(t, tc.expectedArgs, args) } } @@ -713,7 +717,8 @@ func TestMapReplace(t *testing.T) { for _, tc := range testCases { // multiple times to verify the stability of column sequence in query string for i := 0; i < 10; i++ { - query, args := prepareReplace(tc.quoteTable, tc.cols, false, false) + datas, info := model.Column2ColumnDataForTest(tc.cols) + query, args := prepareReplace(tc.quoteTable, datas, info, false, false) require.Equal(t, tc.expectedQuery, query) require.Equal(t, tc.expectedArgs, args) } From e0f723e3034799510a0dde43efde8bb42a587b48 Mon Sep 17 00:00:00 2001 From: qupeng Date: Tue, 24 Sep 2024 18:00:39 +0800 Subject: [PATCH 09/12] fix Signed-off-by: qupeng --- cdc/model/sink.go | 10 ++++++++-- cdc/sink/dmlsink/txn/event_test.go | 4 ++-- cdc/sink/dmlsink/txn/mysql/dml_test.go | 10 +++++----- pkg/sink/codec/csv/csv_message_test.go | 6 +++--- pkg/sink/codec/open/open_protocol_message_test.go | 4 ++-- 5 files changed, 20 insertions(+), 14 deletions(-) diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 3632072bfbd..29b78305a74 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -1348,8 +1348,8 @@ func (x ColumnDataX) GetColumnInfo() *model.ColumnInfo { return x.info } -// Column2ColumnDataForTest is for tests. -func Column2ColumnDataForTest(columns []*Column) ([]*ColumnData, *TableInfo) { +// Columns2ColumnDataForTest is for tests. +func Columns2ColumnDataForTest(columns []*Column) ([]*ColumnData, *TableInfo) { info := &TableInfo{ TableInfo: &model.TableInfo{ Columns: make([]*model.ColumnInfo, len(columns)), @@ -1378,3 +1378,9 @@ func Column2ColumnDataForTest(columns []*Column) ([]*ColumnData, *TableInfo) { return colDatas, info } + +// Column2ColumnDataXForTest is for tests. +func Column2ColumnDataXForTest(column *Column) ColumnDataX { + datas, info := Columns2ColumnDataForTest([]*Column{column}) + return GetColumnDataX(datas[0], info) +} diff --git a/cdc/sink/dmlsink/txn/event_test.go b/cdc/sink/dmlsink/txn/event_test.go index 31e2e05bf16..29d27e13759 100644 --- a/cdc/sink/dmlsink/txn/event_test.go +++ b/cdc/sink/dmlsink/txn/event_test.go @@ -25,7 +25,7 @@ import ( func TestGenKeyListCaseInSensitive(t *testing.T) { t.Parallel() - columns, tb := model.Column2ColumnDataForTest([]*model.Column{ + columns, tb := model.Columns2ColumnDataForTest([]*model.Column{ { Value: "XyZ", Type: mysql.TypeVarchar, @@ -35,7 +35,7 @@ func TestGenKeyListCaseInSensitive(t *testing.T) { first := genKeyList(columns, tb, 0, []int{0}, 1) - columns, tb = model.Column2ColumnDataForTest([]*model.Column{ + columns, tb = model.Columns2ColumnDataForTest([]*model.Column{ { Value: "xYZ", Type: mysql.TypeVarchar, diff --git a/cdc/sink/dmlsink/txn/mysql/dml_test.go b/cdc/sink/dmlsink/txn/mysql/dml_test.go index 20f22d63c2b..dfea5e3f2df 100644 --- a/cdc/sink/dmlsink/txn/mysql/dml_test.go +++ b/cdc/sink/dmlsink/txn/mysql/dml_test.go @@ -250,8 +250,8 @@ func TestPrepareUpdate(t *testing.T) { }, } for _, tc := range testCases { - preDatas, info := model.Column2ColumnDataForTest(tc.preCols) - datas, _ := model.Column2ColumnDataForTest(tc.cols) + preDatas, info := model.Columns2ColumnDataForTest(tc.preCols) + datas, _ := model.Columns2ColumnDataForTest(tc.cols) query, args := prepareUpdate(tc.quoteTable, preDatas, datas, info, false) require.Equal(t, tc.expectedSQL, query) require.Equal(t, tc.expectedArgs, args) @@ -394,7 +394,7 @@ func TestPrepareDelete(t *testing.T) { }, } for _, tc := range testCases { - preDatas, info := model.Column2ColumnDataForTest(tc.preCols) + preDatas, info := model.Columns2ColumnDataForTest(tc.preCols) query, args := prepareDelete(tc.quoteTable, preDatas, info, false) require.Equal(t, tc.expectedSQL, query) require.Equal(t, tc.expectedArgs, args) @@ -605,7 +605,7 @@ func TestWhereSlice(t *testing.T) { }, } for i, tc := range testCases { - datas, info := model.Column2ColumnDataForTest(tc.cols) + datas, info := model.Columns2ColumnDataForTest(tc.cols) colNames, args := whereSlice(datas, info, tc.forceReplicate) require.Equal(t, tc.expectedColNames, colNames, "case %d fails", i) require.Equal(t, tc.expectedArgs, args) @@ -717,7 +717,7 @@ func TestMapReplace(t *testing.T) { for _, tc := range testCases { // multiple times to verify the stability of column sequence in query string for i := 0; i < 10; i++ { - datas, info := model.Column2ColumnDataForTest(tc.cols) + datas, info := model.Columns2ColumnDataForTest(tc.cols) query, args := prepareReplace(tc.quoteTable, datas, info, false, false) require.Equal(t, tc.expectedQuery, query) require.Equal(t, tc.expectedArgs, args) diff --git a/pkg/sink/codec/csv/csv_message_test.go b/pkg/sink/codec/csv/csv_message_test.go index 57fe5f96010..10de218fd0b 100644 --- a/pkg/sink/codec/csv/csv_message_test.go +++ b/pkg/sink/codec/csv/csv_message_test.go @@ -959,9 +959,9 @@ func TestCSVMessageEncode(t *testing.T) { func TestConvertToCSVType(t *testing.T) { for _, group := range csvTestColumnsGroup { for _, c := range group { - val, _ := fromColValToCsvVal(&common.Config{ - BinaryEncodingMethod: c.BinaryEncodingMethod, - }, &c.col, c.colInfo.Ft) + cfg := &common.Config{BinaryEncodingMethod: c.BinaryEncodingMethod} + col := model.Column2ColumnDataXForTest(&c.col) + val, _ := fromColValToCsvVal(cfg, col, c.colInfo.Ft) require.Equal(t, c.want, val, c.col.Name) } } diff --git a/pkg/sink/codec/open/open_protocol_message_test.go b/pkg/sink/codec/open/open_protocol_message_test.go index 51ba13d5439..94744353fb4 100644 --- a/pkg/sink/codec/open/open_protocol_message_test.go +++ b/pkg/sink/codec/open/open_protocol_message_test.go @@ -60,7 +60,7 @@ func TestNonBinaryStringCol(t *testing.T) { Value: "value", } mqCol := internal.Column{} - mqCol.FromRowChangeColumn(col) + mqCol.FromRowChangeColumn(model.Column2ColumnDataXForTest(col)) row := &messageRow{Update: map[string]internal.Column{"test": mqCol}} rowEncode, err := row.encode() require.NoError(t, err) @@ -83,7 +83,7 @@ func TestVarBinaryCol(t *testing.T) { Value: []byte{0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A}, } mqCol := internal.Column{} - mqCol.FromRowChangeColumn(col) + mqCol.FromRowChangeColumn(model.Column2ColumnDataXForTest(col)) row := &messageRow{Update: map[string]internal.Column{"test": mqCol}} rowEncode, err := row.encode() require.NoError(t, err) From 6a5bce5b3e7ceeee81f6e5f74d97d0b00a48047a Mon Sep 17 00:00:00 2001 From: qupeng Date: Wed, 25 Sep 2024 11:00:06 +0800 Subject: [PATCH 10/12] make fast check Signed-off-by: qupeng --- cdc/model/sink.go | 2 ++ docs/swagger/docs.go | 15 ++++++++++++++- docs/swagger/swagger.json | 15 ++++++++++++++- docs/swagger/swagger.yaml | 11 +++++++++++ tools/check/go.sum | 1 + 5 files changed, 42 insertions(+), 2 deletions(-) diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 29b78305a74..dd6396dace1 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -1297,6 +1297,8 @@ type TopicPartitionKey struct { } // ColumnDataX is like ColumnData, but contains more informations. +// +//msgp:ignore RowChangedEvent type ColumnDataX struct { *ColumnData flag *ColumnFlagType diff --git a/docs/swagger/docs.go b/docs/swagger/docs.go index ad976303486..7843c1f9d8b 100644 --- a/docs/swagger/docs.go +++ b/docs/swagger/docs.go @@ -1684,6 +1684,9 @@ var doc = `{ "config.LargeMessageHandleConfig": { "type": "object", "properties": { + "claim-check-raw-value": { + "type": "boolean" + }, "claim-check-storage-uri": { "type": "string" }, @@ -1860,7 +1863,7 @@ var doc = `{ "type": "object", "properties": { "advance-timeout-in-sec": { - "description": "AdvanceTimeoutInSec is a duration in second. If a table sink progress hasn't been\nadvanced for this given duration, the sink will be canceled and re-established.", + "description": "AdvanceTimeoutInSec is a duration in second. If a table sink progress hasn't been\nadvanced for this given duration, the sink will be canceled and re-established.\nDeprecated since v8.1.1", "type": "integer" }, "cloud-storage-config": { @@ -1948,6 +1951,10 @@ var doc = `{ "description": "SchemaRegistry is only available when the downstream is MQ using avro protocol.", "type": "string" }, + "send-all-bootstrap-at-start": { + "description": "SendAllBootstrapAtStart determines whether to send all tables bootstrap message at changefeed start.", + "type": "boolean" + }, "send-bootstrap-in-msg-count": { "description": "SendBootstrapInMsgCount means bootstrap messages are being sent every SendBootstrapInMsgCount row change messages.", "type": "integer" @@ -2815,6 +2822,9 @@ var doc = `{ "v2.LargeMessageHandleConfig": { "type": "object", "properties": { + "claim_check_raw_value": { + "type": "boolean" + }, "claim_check_storage_uri": { "type": "string" }, @@ -3218,6 +3228,9 @@ var doc = `{ "schema_registry": { "type": "string" }, + "send-all-bootstrap-at-start": { + "type": "boolean" + }, "send_bootstrap_in_msg_count": { "type": "integer" }, diff --git a/docs/swagger/swagger.json b/docs/swagger/swagger.json index 8d4f5d1907a..747d9a6da53 100644 --- a/docs/swagger/swagger.json +++ b/docs/swagger/swagger.json @@ -1665,6 +1665,9 @@ "config.LargeMessageHandleConfig": { "type": "object", "properties": { + "claim-check-raw-value": { + "type": "boolean" + }, "claim-check-storage-uri": { "type": "string" }, @@ -1841,7 +1844,7 @@ "type": "object", "properties": { "advance-timeout-in-sec": { - "description": "AdvanceTimeoutInSec is a duration in second. If a table sink progress hasn't been\nadvanced for this given duration, the sink will be canceled and re-established.", + "description": "AdvanceTimeoutInSec is a duration in second. If a table sink progress hasn't been\nadvanced for this given duration, the sink will be canceled and re-established.\nDeprecated since v8.1.1", "type": "integer" }, "cloud-storage-config": { @@ -1929,6 +1932,10 @@ "description": "SchemaRegistry is only available when the downstream is MQ using avro protocol.", "type": "string" }, + "send-all-bootstrap-at-start": { + "description": "SendAllBootstrapAtStart determines whether to send all tables bootstrap message at changefeed start.", + "type": "boolean" + }, "send-bootstrap-in-msg-count": { "description": "SendBootstrapInMsgCount means bootstrap messages are being sent every SendBootstrapInMsgCount row change messages.", "type": "integer" @@ -2796,6 +2803,9 @@ "v2.LargeMessageHandleConfig": { "type": "object", "properties": { + "claim_check_raw_value": { + "type": "boolean" + }, "claim_check_storage_uri": { "type": "string" }, @@ -3199,6 +3209,9 @@ "schema_registry": { "type": "string" }, + "send-all-bootstrap-at-start": { + "type": "boolean" + }, "send_bootstrap_in_msg_count": { "type": "integer" }, diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index 9cb8b63d7de..eb3f89a2b6d 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -204,6 +204,8 @@ definitions: type: object config.LargeMessageHandleConfig: properties: + claim-check-raw-value: + type: boolean claim-check-storage-uri: type: string large-message-handle-compression: @@ -348,6 +350,7 @@ definitions: description: |- AdvanceTimeoutInSec is a duration in second. If a table sink progress hasn't been advanced for this given duration, the sink will be canceled and re-established. + Deprecated since v8.1.1 type: integer cloud-storage-config: $ref: '#/definitions/config.CloudStorageConfig' @@ -417,6 +420,10 @@ definitions: description: SchemaRegistry is only available when the downstream is MQ using avro protocol. type: string + send-all-bootstrap-at-start: + description: SendAllBootstrapAtStart determines whether to send all tables + bootstrap message at changefeed start. + type: boolean send-bootstrap-in-msg-count: description: SendBootstrapInMsgCount means bootstrap messages are being sent every SendBootstrapInMsgCount row change messages. @@ -1008,6 +1015,8 @@ definitions: type: object v2.LargeMessageHandleConfig: properties: + claim_check_raw_value: + type: boolean claim_check_storage_uri: type: string large_message_handle_compression: @@ -1273,6 +1282,8 @@ definitions: type: boolean schema_registry: type: string + send-all-bootstrap-at-start: + type: boolean send_bootstrap_in_msg_count: type: integer send_bootstrap_interval_in_sec: diff --git a/tools/check/go.sum b/tools/check/go.sum index 3f344cd5950..58d14e98667 100644 --- a/tools/check/go.sum +++ b/tools/check/go.sum @@ -879,6 +879,7 @@ golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= +golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= From 84f1173a03bd44b9d7518cee262b493a5fc20230 Mon Sep 17 00:00:00 2001 From: qupeng Date: Wed, 25 Sep 2024 12:37:35 +0800 Subject: [PATCH 11/12] fix Signed-off-by: qupeng --- pkg/sink/codec/debezium/codec.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/sink/codec/debezium/codec.go b/pkg/sink/codec/debezium/codec.go index 639c62b2c1b..7c62c66388e 100644 --- a/pkg/sink/codec/debezium/codec.go +++ b/pkg/sink/codec/debezium/codec.go @@ -246,7 +246,7 @@ func (c *dbzCodec) writeDebeziumFieldSchema( writer.WriteStringField("type", "string") writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) writer.WriteStringField("name", "io.debezium.data.TiDBVectorFloat32") - writer.WriteStringField("field", col.Name) + writer.WriteStringField("field", col.GetName()) }) default: log.Warn( @@ -512,7 +512,7 @@ func (c *dbzCodec) writeDebeziumFieldValue( case mysql.TypeTiDBVectorFloat32: v := col.Value.(types.VectorFloat32).String() - writer.WriteStringField(col.Name, v) + writer.WriteStringField(col.GetName(), v) return nil // Note: Although Debezium's doc claims to use INT32 for INT, but it From eb539d1e487f5c78c10ab9a05c10cd3a650a59f1 Mon Sep 17 00:00:00 2001 From: qupeng Date: Wed, 25 Sep 2024 18:40:56 +0800 Subject: [PATCH 12/12] fix Signed-off-by: qupeng --- cdc/model/sink_gen.go | 197 +++++++++++++++++++++++++++++++++++++ cdc/model/sink_gen_test.go | 113 +++++++++++++++++++++ 2 files changed, 310 insertions(+) diff --git a/cdc/model/sink_gen.go b/cdc/model/sink_gen.go index f431446021d..9483e055f2d 100644 --- a/cdc/model/sink_gen.go +++ b/cdc/model/sink_gen.go @@ -287,6 +287,203 @@ func (z ColumnData) Msgsize() (s int) { return } +// DecodeMsg implements msgp.Decodable +func (z *ColumnDataX) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "ColumnData": + if dc.IsNil() { + err = dc.ReadNil() + if err != nil { + err = msgp.WrapError(err, "ColumnData") + return + } + z.ColumnData = nil + } else { + if z.ColumnData == nil { + z.ColumnData = new(ColumnData) + } + var zb0002 uint32 + zb0002, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err, "ColumnData") + return + } + for zb0002 > 0 { + zb0002-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err, "ColumnData") + return + } + switch msgp.UnsafeString(field) { + case "column_id": + z.ColumnData.ColumnID, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "ColumnData", "ColumnID") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err, "ColumnData") + return + } + } + } + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *ColumnDataX) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 1 + // write "ColumnData" + err = en.Append(0x81, 0xaa, 0x43, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x44, 0x61, 0x74, 0x61) + if err != nil { + return + } + if z.ColumnData == nil { + err = en.WriteNil() + if err != nil { + return + } + } else { + // map header, size 1 + // write "column_id" + err = en.Append(0x81, 0xa9, 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x5f, 0x69, 0x64) + if err != nil { + return + } + err = en.WriteInt64(z.ColumnData.ColumnID) + if err != nil { + err = msgp.WrapError(err, "ColumnData", "ColumnID") + return + } + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *ColumnDataX) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 1 + // string "ColumnData" + o = append(o, 0x81, 0xaa, 0x43, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x44, 0x61, 0x74, 0x61) + if z.ColumnData == nil { + o = msgp.AppendNil(o) + } else { + // map header, size 1 + // string "column_id" + o = append(o, 0x81, 0xa9, 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x5f, 0x69, 0x64) + o = msgp.AppendInt64(o, z.ColumnData.ColumnID) + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *ColumnDataX) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "ColumnData": + if msgp.IsNil(bts) { + bts, err = msgp.ReadNilBytes(bts) + if err != nil { + return + } + z.ColumnData = nil + } else { + if z.ColumnData == nil { + z.ColumnData = new(ColumnData) + } + var zb0002 uint32 + zb0002, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "ColumnData") + return + } + for zb0002 > 0 { + zb0002-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err, "ColumnData") + return + } + switch msgp.UnsafeString(field) { + case "column_id": + z.ColumnData.ColumnID, bts, err = msgp.ReadInt64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "ColumnData", "ColumnID") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err, "ColumnData") + return + } + } + } + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *ColumnDataX) Msgsize() (s int) { + s = 1 + 11 + if z.ColumnData == nil { + s += msgp.NilSize + } else { + s += 1 + 10 + msgp.Int64Size + } + return +} + // DecodeMsg implements msgp.Decodable func (z *DDLEvent) DecodeMsg(dc *msgp.Reader) (err error) { var field []byte diff --git a/cdc/model/sink_gen_test.go b/cdc/model/sink_gen_test.go index a43663539d1..dbe154f0e79 100644 --- a/cdc/model/sink_gen_test.go +++ b/cdc/model/sink_gen_test.go @@ -235,6 +235,119 @@ func BenchmarkDecodeColumnData(b *testing.B) { } } +func TestMarshalUnmarshalColumnDataX(t *testing.T) { + v := ColumnDataX{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgColumnDataX(b *testing.B) { + v := ColumnDataX{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgColumnDataX(b *testing.B) { + v := ColumnDataX{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalColumnDataX(b *testing.B) { + v := ColumnDataX{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeColumnDataX(t *testing.T) { + v := ColumnDataX{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeColumnDataX Msgsize() is inaccurate") + } + + vn := ColumnDataX{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeColumnDataX(b *testing.B) { + v := ColumnDataX{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeColumnDataX(b *testing.B) { + v := ColumnDataX{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + func TestMarshalUnmarshalDDLEvent(t *testing.T) { v := DDLEvent{} bts, err := v.MarshalMsg(nil)