From 749abcc0d7a6e1010e5836c79bbe3cc34d91e6e3 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Wed, 4 Sep 2024 22:25:23 +0800 Subject: [PATCH] codec: clean some code in codec (#255) --- downstreamadapter/sink/kafka_sink.go | 9 +-- downstreamadapter/worker/kafka_worker.go | 59 +++++++-------- pkg/sink/codec/avro/arvo.go | 32 ++------- pkg/sink/codec/bootstraper.go | 10 +-- pkg/sink/codec/builder/encoder_builder.go | 71 ------------------- pkg/sink/codec/canal/canal_encoder.go | 27 ++----- pkg/sink/codec/canal/canal_json_decoder.go | 4 +- .../canal/canal_json_row_event_encoder.go | 46 ++++-------- pkg/sink/codec/craft/craft_decoder.go | 6 +- pkg/sink/codec/craft/craft_encoder.go | 23 ++---- pkg/sink/codec/debezium/encoder.go | 25 +------ pkg/sink/codec/{ => decoder}/decoder.go | 2 +- pkg/sink/codec/{ => encoder}/encoder.go | 63 +--------------- pkg/sink/codec/encoder_builder.go | 40 +++++++++++ pkg/sink/codec/encoder_group.go | 51 ++++++++----- pkg/sink/codec/maxwell/maxwell_encoder.go | 47 ++++-------- pkg/sink/codec/open/open_protocol_decoder.go | 9 +-- pkg/sink/codec/open/open_protocol_encoder.go | 42 +++-------- pkg/sink/codec/open/open_protocol_message.go | 4 +- pkg/sink/codec/simple/encoder.go | 65 +++++++---------- 20 files changed, 208 insertions(+), 427 deletions(-) delete mode 100644 pkg/sink/codec/builder/encoder_builder.go rename pkg/sink/codec/{ => decoder}/decoder.go (98%) rename pkg/sink/codec/{ => encoder}/encoder.go (54%) create mode 100644 pkg/sink/codec/encoder_builder.go diff --git a/downstreamadapter/sink/kafka_sink.go b/downstreamadapter/sink/kafka_sink.go index 575acfaf1..daa96ae95 100644 --- a/downstreamadapter/sink/kafka_sink.go +++ b/downstreamadapter/sink/kafka_sink.go @@ -25,7 +25,6 @@ import ( "github.com/flowbehappy/tigate/downstreamadapter/worker/dmlproducer" "github.com/flowbehappy/tigate/pkg/common" "github.com/flowbehappy/tigate/pkg/sink/codec" - "github.com/flowbehappy/tigate/pkg/sink/codec/builder" "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" @@ -108,10 +107,6 @@ func NewKafkaSink(changefeedID model.ChangeFeedID, sinkURI *url.URL, replicaConf if err != nil { return nil, errors.Trace(err) } - encoderBuilder, err := builder.NewRowEventEncoderBuilder(ctx, encoderConfig) - if err != nil { - return nil, cerror.WrapError(cerror.ErrKafkaNewProducer, err) - } failpointCh := make(chan error, 1) asyncProducer, err := factory.AsyncProducer(ctx, failpointCh) @@ -121,7 +116,7 @@ func NewKafkaSink(changefeedID model.ChangeFeedID, sinkURI *url.URL, replicaConf metricsCollector := factory.MetricsCollector(utils.RoleProcessor, adminClient) dmlProducer := dmlproducer.NewKafkaDMLProducer(ctx, changefeedID, asyncProducer, metricsCollector) - encoderGroup := codec.NewEncoderGroup(replicaConfig.Sink, encoderBuilder, changefeedID) + encoderGroup := codec.NewEncoderGroup(ctx, replicaConfig.Sink, encoderConfig, changefeedID) statistics := timetrics.NewStatistics(changefeedID, sink.RowSink) worker := worker.NewKafkaWorker(changefeedID, protocol, dmlProducer, encoderGroup, statistics) @@ -201,7 +196,7 @@ func (s *KafkaSink) AddDMLEvent(event *common.TEvent, tableProgress *types.Table return } - s.worker.GetEventChan() <- common.MQRowEvent{ + s.worker.GetEventChan() <- &common.MQRowEvent{ Key: model.TopicPartitionKey{ Topic: topic, Partition: index, diff --git a/downstreamadapter/worker/kafka_worker.go b/downstreamadapter/worker/kafka_worker.go index d4f232ebf..deb9d3ebb 100644 --- a/downstreamadapter/worker/kafka_worker.go +++ b/downstreamadapter/worker/kafka_worker.go @@ -15,6 +15,7 @@ package worker import ( "context" + "sync" "time" "github.com/flowbehappy/tigate/pkg/common" @@ -29,7 +30,6 @@ import ( "github.com/pingcap/tiflow/pkg/config" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" - "golang.org/x/sync/errgroup" ) const ( @@ -48,7 +48,7 @@ type KafkaWorker struct { protocol config.Protocol // msgChan caches the messages to be sent. // It is an unbounded channel. - msgChan *chann.DrainableChann[common.MQRowEvent] + msgChan *chann.DrainableChann[*common.MQRowEvent] // ticker used to force flush the batched messages when the interval is reached. ticker *time.Ticker @@ -65,6 +65,9 @@ type KafkaWorker struct { metricMQWorkerBatchDuration prometheus.Observer // statistics is used to record DML metrics. statistics *metrics.Statistics + + cancel context.CancelFunc + wg sync.WaitGroup } // newWorker creates a new flush worker. @@ -75,10 +78,11 @@ func NewKafkaWorker( encoderGroup codec.EncoderGroup, statistics *metrics.Statistics, ) *KafkaWorker { + ctx, cancel := context.WithCancel(context.Background()) w := &KafkaWorker{ changeFeedID: id, protocol: protocol, - msgChan: chann.NewAutoDrainChann[common.MQRowEvent](), + msgChan: chann.NewAutoDrainChann[*common.MQRowEvent](), ticker: time.NewTicker(batchInterval), encoderGroup: encoderGroup, producer: producer, @@ -86,41 +90,30 @@ func NewKafkaWorker( metricMQWorkerBatchSize: mq.WorkerBatchSize.WithLabelValues(id.Namespace, id.ID), metricMQWorkerBatchDuration: mq.WorkerBatchDuration.WithLabelValues(id.Namespace, id.ID), statistics: statistics, + cancel: cancel, } - return w -} - -func (w *KafkaWorker) GetEventChan() chan<- common.MQRowEvent { - return w.msgChan.In() -} - -// run starts a loop that keeps collecting, sorting and sending messages -// until it encounters an error or is interrupted. -func (w *KafkaWorker) run(ctx context.Context) (retErr error) { - defer func() { - w.ticker.Stop() - log.Info("MQ sink worker exited", zap.Error(retErr), - zap.String("namespace", w.changeFeedID.Namespace), - zap.String("changefeed", w.changeFeedID.ID), - zap.String("protocol", w.protocol.String()), - ) - }() - - g, ctx := errgroup.WithContext(ctx) - g.Go(func() error { + w.wg.Add(3) + go func() error { + defer w.wg.Done() return w.encoderGroup.Run(ctx) - }) - g.Go(func() error { + }() + go func() error { + defer w.wg.Done() if w.protocol.IsBatchEncode() { return w.batchEncodeRun(ctx) } return w.nonBatchEncodeRun(ctx) - }) - g.Go(func() error { + }() + go func() error { + defer w.wg.Done() return w.sendMessages(ctx) - }) - return g.Wait() + }() + return w +} + +func (w *KafkaWorker) GetEventChan() chan<- *common.MQRowEvent { + return w.msgChan.In() } // nonBatchEncodeRun add events to the encoder group immediately. @@ -156,7 +149,7 @@ func (w *KafkaWorker) batchEncodeRun(ctx context.Context) (retErr error) { zap.String("protocol", w.protocol.String()), ) - msgsBuf := make([]common.MQRowEvent, batchSize) + msgsBuf := make([]*common.MQRowEvent, batchSize) for { start := time.Now() msgCount, err := w.batch(ctx, msgsBuf, batchInterval) @@ -184,7 +177,7 @@ func (w *KafkaWorker) batchEncodeRun(ctx context.Context) (retErr error) { // batch collects a batch of messages from w.msgChan into buffer. // It returns the number of messages collected. // Note: It will block until at least one message is received. -func (w *KafkaWorker) batch(ctx context.Context, buffer []common.MQRowEvent, flushInterval time.Duration) (int, error) { +func (w *KafkaWorker) batch(ctx context.Context, buffer []*common.MQRowEvent, flushInterval time.Duration) (int, error) { msgCount := 0 maxBatchSize := len(buffer) // We need to receive at least one message or be interrupted, @@ -233,7 +226,7 @@ func (w *KafkaWorker) batch(ctx context.Context, buffer []common.MQRowEvent, flu } // group groups messages by its key. -func (w *KafkaWorker) group(msgs []common.MQRowEvent) map[model.TopicPartitionKey][]*common.RowEvent { +func (w *KafkaWorker) group(msgs []*common.MQRowEvent) map[model.TopicPartitionKey][]*common.RowEvent { groupedMsgs := make(map[model.TopicPartitionKey][]*common.RowEvent) for _, msg := range msgs { if _, ok := groupedMsgs[msg.Key]; !ok { diff --git a/pkg/sink/codec/avro/arvo.go b/pkg/sink/codec/avro/arvo.go index edf801c2a..561595a6f 100644 --- a/pkg/sink/codec/avro/arvo.go +++ b/pkg/sink/codec/avro/arvo.go @@ -24,7 +24,7 @@ import ( "strings" "github.com/flowbehappy/tigate/pkg/common" - "github.com/flowbehappy/tigate/pkg/sink/codec" + "github.com/flowbehappy/tigate/pkg/sink/codec/encoder" "github.com/linkedin/goavro/v2" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -978,6 +978,8 @@ func (a *BatchEncoder) columnToAvroData( } } +func (a *BatchEncoder) Clean() {} + const ( // avro does not send ddl and checkpoint message, the following 2 field is used to distinguish // TiCDC DDL event and checkpoint event, only used for testing purpose, not for production @@ -1008,10 +1010,8 @@ const ( valueSchemaSuffix = "-value" ) -// NewBatchEncoderBuilder creates an avro batchEncoderBuilder. -func NewBatchEncoderBuilder( - ctx context.Context, config *ticommon.Config, -) (codec.RowEventEncoderBuilder, error) { +// NewAvroEncoder return a avro encoder. +func NewAvroEncoder(ctx context.Context, config *ticommon.Config) (encoder.RowEventEncoder, error) { var schemaM SchemaManager var err error @@ -1030,30 +1030,12 @@ func NewBatchEncoderBuilder( default: return nil, cerror.ErrAvroSchemaAPIError.GenWithStackByArgs(schemaRegistryType) } - - return &batchEncoderBuilder{ - namespace: config.ChangefeedID.Namespace, - config: config, - schemaM: schemaM, - }, nil -} - -// Build an AvroEventBatchEncoder. -func (b *batchEncoderBuilder) Build() codec.RowEventEncoder { - return NewAvroEncoder(b.namespace, b.schemaM, b.config) -} - -// CleanMetrics is a no-op for AvroEventBatchEncoder. -func (b *batchEncoderBuilder) CleanMetrics() {} - -// NewAvroEncoder return a avro encoder. -func NewAvroEncoder(namespace string, schemaM SchemaManager, config *ticommon.Config) codec.RowEventEncoder { return &BatchEncoder{ - namespace: namespace, + namespace: config.ChangefeedID.Namespace, schemaM: schemaM, result: make([]*ticommon.Message, 0, 1), config: config, - } + }, nil } // // SetupEncoderAndSchemaRegistry4Testing start a local schema registry for testing. diff --git a/pkg/sink/codec/bootstraper.go b/pkg/sink/codec/bootstraper.go index f42542909..498da479c 100644 --- a/pkg/sink/codec/bootstraper.go +++ b/pkg/sink/codec/bootstraper.go @@ -20,6 +20,7 @@ import ( "time" "github.com/flowbehappy/tigate/pkg/common" + "github.com/flowbehappy/tigate/pkg/sink/codec/encoder" "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" @@ -39,7 +40,7 @@ const ( type bootstrapWorker struct { changefeedID model.ChangeFeedID activeTables sync.Map - encoder RowEventEncoder + rowEventEncoder encoder.RowEventEncoder sendBootstrapInterval time.Duration sendBootstrapInMsgCount int32 sendBootstrapToAllPartition bool @@ -52,7 +53,7 @@ type bootstrapWorker struct { func newBootstrapWorker( changefeedID model.ChangeFeedID, outCh chan<- *future, - encoder RowEventEncoder, + rowEventEncoder encoder.RowEventEncoder, sendBootstrapInterval int64, sendBootstrapInMsgCount int32, sendBootstrapToAllPartition bool, @@ -66,7 +67,7 @@ func newBootstrapWorker( return &bootstrapWorker{ changefeedID: changefeedID, outCh: outCh, - encoder: encoder, + rowEventEncoder: rowEventEncoder, activeTables: sync.Map{}, sendBootstrapInterval: time.Duration(sendBootstrapInterval) * time.Second, sendBootstrapInMsgCount: sendBootstrapInMsgCount, @@ -79,6 +80,7 @@ func (b *bootstrapWorker) run(ctx context.Context) error { sendTicker := time.NewTicker(bootstrapWorkerTickerInterval) gcTicker := time.NewTicker(bootstrapWorkerGCInterval) defer func() { + b.rowEventEncoder.Clean() gcTicker.Stop() sendTicker.Stop() }() @@ -156,7 +158,7 @@ func (b *bootstrapWorker) generateEvents( tableInfo *model.TableInfo, ) ([]*future, error) { res := make([]*future, 0, totalPartition) - msg, err := b.encoder.EncodeDDLEvent(model.NewBootstrapDDLEvent(tableInfo)) + msg, err := b.rowEventEncoder.EncodeDDLEvent(model.NewBootstrapDDLEvent(tableInfo)) if err != nil { return nil, errors.Trace(err) } diff --git a/pkg/sink/codec/builder/encoder_builder.go b/pkg/sink/codec/builder/encoder_builder.go deleted file mode 100644 index d3bfdc6bb..000000000 --- a/pkg/sink/codec/builder/encoder_builder.go +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package builder - -import ( - "context" - - "github.com/flowbehappy/tigate/pkg/sink/codec" - "github.com/flowbehappy/tigate/pkg/sink/codec/avro" - "github.com/flowbehappy/tigate/pkg/sink/codec/canal" - "github.com/flowbehappy/tigate/pkg/sink/codec/craft" - "github.com/flowbehappy/tigate/pkg/sink/codec/debezium" - "github.com/flowbehappy/tigate/pkg/sink/codec/maxwell" - "github.com/flowbehappy/tigate/pkg/sink/codec/open" - "github.com/flowbehappy/tigate/pkg/sink/codec/simple" - "github.com/pingcap/tiflow/pkg/config" - cerror "github.com/pingcap/tiflow/pkg/errors" - "github.com/pingcap/tiflow/pkg/sink/codec/common" -) - -// NewRowEventEncoderBuilder returns an RowEventEncoderBuilder -func NewRowEventEncoderBuilder( - ctx context.Context, - cfg *common.Config, -) (codec.RowEventEncoderBuilder, error) { - switch cfg.Protocol { - case config.ProtocolDefault, config.ProtocolOpen: - return open.NewBatchEncoderBuilder(ctx, cfg) - case config.ProtocolCanal: - return canal.NewBatchEncoderBuilder(cfg), nil - case config.ProtocolAvro: - return avro.NewBatchEncoderBuilder(ctx, cfg) - case config.ProtocolMaxwell: - return maxwell.NewBatchEncoderBuilder(cfg), nil - case config.ProtocolCanalJSON: - return canal.NewJSONRowEventEncoderBuilder(ctx, cfg) - case config.ProtocolCraft: - return craft.NewBatchEncoderBuilder(cfg), nil - case config.ProtocolDebezium: - return debezium.NewBatchEncoderBuilder(cfg, config.GetGlobalServerConfig().ClusterID), nil - case config.ProtocolSimple: - return simple.NewBuilder(ctx, cfg) - default: - return nil, cerror.ErrSinkUnknownProtocol.GenWithStackByArgs(cfg.Protocol) - } -} - -// // NewTxnEventEncoderBuilder returns an TxnEventEncoderBuilder. -// func NewTxnEventEncoderBuilder( -// c *common.Config, -// ) (codec.TxnEventEncoderBuilder, error) { -// switch c.Protocol { -// case config.ProtocolCsv: -// return csv.NewTxnEventEncoderBuilder(c), nil -// case config.ProtocolCanalJSON: -// return canal.NewJSONTxnEventEncoderBuilder(c), nil -// default: -// return nil, cerror.ErrSinkUnknownProtocol.GenWithStackByArgs(c.Protocol) -// } -// } diff --git a/pkg/sink/codec/canal/canal_encoder.go b/pkg/sink/codec/canal/canal_encoder.go index 9676f3d3a..b4e62fb83 100644 --- a/pkg/sink/codec/canal/canal_encoder.go +++ b/pkg/sink/codec/canal/canal_encoder.go @@ -17,7 +17,7 @@ import ( "context" "github.com/flowbehappy/tigate/pkg/common" - "github.com/flowbehappy/tigate/pkg/sink/codec" + "github.com/flowbehappy/tigate/pkg/sink/codec/encoder" "github.com/golang/protobuf/proto" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -158,8 +158,10 @@ func (d *BatchEncoder) resetPacket() { } } +func (d *BatchEncoder) Clean() {} + // newBatchEncoder creates a new canalBatchEncoder. -func newBatchEncoder(config *ticommon.Config) codec.RowEventEncoder { +func NewBatchEncoder(config *ticommon.Config) (encoder.RowEventEncoder, error) { encoder := &BatchEncoder{ messages: &canal.Messages{}, callbackBuf: make([]func(), 0), @@ -169,24 +171,5 @@ func newBatchEncoder(config *ticommon.Config) codec.RowEventEncoder { } encoder.resetPacket() - return encoder -} - -type batchEncoderBuilder struct { - config *ticommon.Config -} - -// Build a `canalBatchEncoder` -func (b *batchEncoderBuilder) Build() codec.RowEventEncoder { - return newBatchEncoder(b.config) -} - -// CleanMetrics is a no-op for canalBatchEncoder. -func (b *batchEncoderBuilder) CleanMetrics() {} - -// NewBatchEncoderBuilder creates a canal batchEncoderBuilder. -func NewBatchEncoderBuilder(config *ticommon.Config) codec.RowEventEncoderBuilder { - return &batchEncoderBuilder{ - config: config, - } + return encoder, nil } diff --git a/pkg/sink/codec/canal/canal_json_decoder.go b/pkg/sink/codec/canal/canal_json_decoder.go index 19d3f9bf2..51dcf1e9d 100644 --- a/pkg/sink/codec/canal/canal_json_decoder.go +++ b/pkg/sink/codec/canal/canal_json_decoder.go @@ -23,7 +23,7 @@ import ( "time" "github.com/flowbehappy/tigate/pkg/common" - "github.com/flowbehappy/tigate/pkg/sink/codec" + "github.com/flowbehappy/tigate/pkg/sink/codec/decoder" "github.com/goccy/go-json" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -54,7 +54,7 @@ type batchDecoder struct { // NewBatchDecoder return a decoder for canal-json func NewBatchDecoder( ctx context.Context, codecConfig *ticommon.Config, db *sql.DB, -) (codec.RowEventDecoder, error) { +) (decoder.RowEventDecoder, error) { var ( externalStorage storage.ExternalStorage err error diff --git a/pkg/sink/codec/canal/canal_json_row_event_encoder.go b/pkg/sink/codec/canal/canal_json_row_event_encoder.go index ac3b2aa92..7a05019b9 100644 --- a/pkg/sink/codec/canal/canal_json_row_event_encoder.go +++ b/pkg/sink/codec/canal/canal_json_row_event_encoder.go @@ -18,7 +18,7 @@ import ( "time" "github.com/flowbehappy/tigate/pkg/common" - "github.com/flowbehappy/tigate/pkg/sink/codec" + "github.com/flowbehappy/tigate/pkg/sink/codec/encoder" "github.com/goccy/go-json" "github.com/mailru/easyjson/jwriter" "github.com/pingcap/errors" @@ -313,15 +313,20 @@ type JSONRowEventEncoder struct { } // newJSONRowEventEncoder creates a new JSONRowEventEncoder -func newJSONRowEventEncoder( - config *ticommon.Config, claimCheck *claimcheck.ClaimCheck, -) codec.RowEventEncoder { +func NewJSONRowEventEncoder(ctx context.Context, config *ticommon.Config) (encoder.RowEventEncoder, error) { + claimCheck, err := claimcheck.New(ctx, config.LargeMessageHandle, config.ChangefeedID) + if err != nil { + return nil, errors.Trace(err) + } + if err != nil { + return nil, errors.Trace(err) + } return &JSONRowEventEncoder{ builder: newCanalEntryBuilder(config), messages: make([]*ticommon.Message, 0, 1), config: config, claimCheck: claimCheck, - } + }, nil } func (c *JSONRowEventEncoder) newJSONMessageForDDL(e *model.DDLEvent) canalJSONMessageInterface { @@ -528,27 +533,10 @@ func (c *JSONRowEventEncoder) EncodeDDLEvent(e *model.DDLEvent) (*ticommon.Messa return ticommon.NewDDLMsg(config.ProtocolCanalJSON, nil, value, e), nil } -type jsonRowEventEncoderBuilder struct { - config *ticommon.Config - - claimCheck *claimcheck.ClaimCheck -} - -// NewJSONRowEventEncoderBuilder creates a canal-json batchEncoderBuilder. -func NewJSONRowEventEncoderBuilder(ctx context.Context, config *ticommon.Config) (codec.RowEventEncoderBuilder, error) { - claimCheck, err := claimcheck.New(ctx, config.LargeMessageHandle, config.ChangefeedID) - if err != nil { - return nil, errors.Trace(err) +func (b *JSONRowEventEncoder) Clean() { + if b.claimCheck != nil { + b.claimCheck.CleanMetrics() } - return &jsonRowEventEncoderBuilder{ - config: config, - claimCheck: claimCheck, - }, nil -} - -// Build a `jsonRowEventEncoderBuilder` -func (b *jsonRowEventEncoderBuilder) Build() codec.RowEventEncoder { - return newJSONRowEventEncoder(b.config, b.claimCheck) } func shouldIgnoreColumn(col *common.Column, @@ -561,15 +549,9 @@ func shouldIgnoreColumn(col *common.Column, return false } // value equal - if codec.IsColumnValueEqual(newCol.Value, col.Value) { + if encoder.IsColumnValueEqual(newCol.Value, col.Value) { return true } } return false } - -func (b *jsonRowEventEncoderBuilder) CleanMetrics() { - if b.claimCheck != nil { - b.claimCheck.CleanMetrics() - } -} diff --git a/pkg/sink/codec/craft/craft_decoder.go b/pkg/sink/codec/craft/craft_decoder.go index ab9c91b14..ca30e50c9 100644 --- a/pkg/sink/codec/craft/craft_decoder.go +++ b/pkg/sink/codec/craft/craft_decoder.go @@ -15,7 +15,7 @@ package craft import ( "github.com/flowbehappy/tigate/pkg/common" - "github.com/flowbehappy/tigate/pkg/sink/codec" + "github.com/flowbehappy/tigate/pkg/sink/codec/decoder" "github.com/pingcap/errors" "github.com/pingcap/tiflow/cdc/model" cerror "github.com/pingcap/tiflow/pkg/errors" @@ -128,7 +128,7 @@ func (b *batchDecoder) NextDDLEvent() (*model.DDLEvent, error) { return event, nil } -func newBatchDecoder(_, value []byte) (codec.RowEventDecoder, error) { +func newBatchDecoder(_, value []byte) (decoder.RowEventDecoder, error) { decoder := NewBatchDecoderWithAllocator(NewSliceAllocator(64)) err := decoder.AddKeyValue(nil, value) return decoder, err @@ -137,7 +137,7 @@ func newBatchDecoder(_, value []byte) (codec.RowEventDecoder, error) { // NewBatchDecoderWithAllocator creates a new batchDecoder with given allocator. func NewBatchDecoderWithAllocator( allocator *SliceAllocator, -) codec.RowEventDecoder { +) decoder.RowEventDecoder { return &batchDecoder{ allocator: allocator, } diff --git a/pkg/sink/codec/craft/craft_encoder.go b/pkg/sink/codec/craft/craft_encoder.go index e2b72e1b7..63214c99c 100644 --- a/pkg/sink/codec/craft/craft_encoder.go +++ b/pkg/sink/codec/craft/craft_encoder.go @@ -17,7 +17,7 @@ import ( "context" "github.com/flowbehappy/tigate/pkg/common" - "github.com/flowbehappy/tigate/pkg/sink/codec" + "github.com/flowbehappy/tigate/pkg/sink/codec/encoder" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" ticommon "github.com/pingcap/tiflow/pkg/sink/codec/common" @@ -97,32 +97,17 @@ func (e *BatchEncoder) flush() { } // NewBatchEncoder creates a new BatchEncoder. -func NewBatchEncoder(config *ticommon.Config) codec.RowEventEncoder { +func NewBatchEncoder(config *ticommon.Config) encoder.RowEventEncoder { // 64 is a magic number that come up with these assumptions and manual benchmark. // 1. Most table will not have more than 64 columns // 2. It only worth allocating slices in batch for slices that's small enough return NewBatchEncoderWithAllocator(NewSliceAllocator(64), config) } -type batchEncoderBuilder struct { - config *ticommon.Config -} - -// Build a BatchEncoder -func (b *batchEncoderBuilder) Build() codec.RowEventEncoder { - return NewBatchEncoder(b.config) -} - -// CleanMetrics do nothing -func (b *batchEncoderBuilder) CleanMetrics() {} - -// NewBatchEncoderBuilder creates a craft batchEncoderBuilder. -func NewBatchEncoderBuilder(config *ticommon.Config) codec.RowEventEncoderBuilder { - return &batchEncoderBuilder{config: config} -} +func (e *BatchEncoder) Clean() {} // NewBatchEncoderWithAllocator creates a new BatchEncoder with given allocator. -func NewBatchEncoderWithAllocator(allocator *SliceAllocator, config *ticommon.Config) codec.RowEventEncoder { +func NewBatchEncoderWithAllocator(allocator *SliceAllocator, config *ticommon.Config) encoder.RowEventEncoder { return &BatchEncoder{ allocator: allocator, messageBuf: make([]*ticommon.Message, 0, 2), diff --git a/pkg/sink/codec/debezium/encoder.go b/pkg/sink/codec/debezium/encoder.go index 399fd0561..1bf660473 100644 --- a/pkg/sink/codec/debezium/encoder.go +++ b/pkg/sink/codec/debezium/encoder.go @@ -19,7 +19,7 @@ import ( "time" "github.com/flowbehappy/tigate/pkg/common" - "github.com/flowbehappy/tigate/pkg/sink/codec" + "github.com/flowbehappy/tigate/pkg/sink/codec/encoder" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/errors" @@ -96,7 +96,7 @@ func (d *BatchEncoder) Build() []*ticommon.Message { } // newBatchEncoder creates a new Debezium BatchEncoder. -func newBatchEncoder(c *ticommon.Config, clusterID string) codec.RowEventEncoder { +func NewBatchEncoder(c *ticommon.Config, clusterID string) encoder.RowEventEncoder { batch := &BatchEncoder{ messages: nil, config: c, @@ -109,23 +109,4 @@ func newBatchEncoder(c *ticommon.Config, clusterID string) codec.RowEventEncoder return batch } -type batchEncoderBuilder struct { - config *ticommon.Config - clusterID string -} - -// NewBatchEncoderBuilder creates a Debezium batchEncoderBuilder. -func NewBatchEncoderBuilder(config *ticommon.Config, clusterID string) codec.RowEventEncoderBuilder { - return &batchEncoderBuilder{ - config: config, - clusterID: clusterID, - } -} - -// Build a `BatchEncoder` -func (b *batchEncoderBuilder) Build() codec.RowEventEncoder { - return newBatchEncoder(b.config, b.clusterID) -} - -// CleanMetrics do nothing -func (b *batchEncoderBuilder) CleanMetrics() {} +func (d *BatchEncoder) Clean() {} diff --git a/pkg/sink/codec/decoder.go b/pkg/sink/codec/decoder/decoder.go similarity index 98% rename from pkg/sink/codec/decoder.go rename to pkg/sink/codec/decoder/decoder.go index 8eb8aefd6..b4ac8cac7 100644 --- a/pkg/sink/codec/decoder.go +++ b/pkg/sink/codec/decoder/decoder.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package codec +package decoder import ( "github.com/flowbehappy/tigate/pkg/common" diff --git a/pkg/sink/codec/encoder.go b/pkg/sink/codec/encoder/encoder.go similarity index 54% rename from pkg/sink/codec/encoder.go rename to pkg/sink/codec/encoder/encoder.go index ccc4d5b2e..9bea474d8 100644 --- a/pkg/sink/codec/encoder.go +++ b/pkg/sink/codec/encoder/encoder.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package codec +package encoder import ( "bytes" @@ -52,24 +52,7 @@ type RowEventEncoder interface { // AppendRowChangedEvent appends a row changed event into the batch or buffer. AppendRowChangedEvent(context.Context, string, *common.RowChangedEvent, func()) error MessageBuilder -} - -// RowEventEncoderBuilder builds row encoder with context. -type RowEventEncoderBuilder interface { - Build() RowEventEncoder - CleanMetrics() -} - -// TxnEventEncoder is an abstraction for txn events encoder. -type TxnEventEncoder interface { - // AppendTxnEvent append a txn event into the buffer. - AppendTxnEvent(*model.SingleTableTxn, func()) error - MessageBuilder -} - -// TxnEventEncoderBuilder builds txn encoder with context. -type TxnEventEncoderBuilder interface { - Build() TxnEventEncoder + Clean() } // IsColumnValueEqual checks whether the preValue and updatedValue are equal. @@ -87,45 +70,3 @@ func IsColumnValueEqual(preValue, updatedValue interface{}) bool { // the value type should be the same return preValue == updatedValue } - -// MockRowEventEncoderBuilder is a mock implementation of RowEventEncoderBuilder -type MockRowEventEncoderBuilder struct{} - -// Build implement the RowEventEncoderBuilder interface -func (m *MockRowEventEncoderBuilder) Build() RowEventEncoder { - return &MockRowEventEncoder{} -} - -// CleanMetrics implement the RowEventEncoderBuilder interface -func (m *MockRowEventEncoderBuilder) CleanMetrics() { - // Clean up metrics if needed -} - -// MockRowEventEncoder is a mock implementation of RowEventEncoder -type MockRowEventEncoder struct{} - -// EncodeCheckpointEvent implement the DDLEventBatchEncoder interface -func (m *MockRowEventEncoder) EncodeCheckpointEvent(ts uint64) (*ticommon.Message, error) { - // Implement the encoding logic for checkpoint event - return nil, nil -} - -// EncodeDDLEvent implement the DDLEventBatchEncoder interface -func (m *MockRowEventEncoder) EncodeDDLEvent(e *model.DDLEvent) (*ticommon.Message, error) { - // Implement the encoding logic for DDL event - return nil, nil -} - -// AppendRowChangedEvent implement the RowEventEncoder interface -func (m *MockRowEventEncoder) AppendRowChangedEvent( - ctx context.Context, tableID string, event *common.RowChangedEvent, callback func(), -) error { - // Implement the logic for appending row changed event - return nil -} - -// Build implement the RowEventEncoder interface -func (m *MockRowEventEncoder) Build() []*ticommon.Message { - // Implement the logic for building the batch and returning the bytes of key and value - return nil -} diff --git a/pkg/sink/codec/encoder_builder.go b/pkg/sink/codec/encoder_builder.go new file mode 100644 index 000000000..a039bfef9 --- /dev/null +++ b/pkg/sink/codec/encoder_builder.go @@ -0,0 +1,40 @@ +package codec + +import ( + "context" + + "github.com/flowbehappy/tigate/pkg/sink/codec/avro" + "github.com/flowbehappy/tigate/pkg/sink/codec/canal" + "github.com/flowbehappy/tigate/pkg/sink/codec/craft" + "github.com/flowbehappy/tigate/pkg/sink/codec/debezium" + "github.com/flowbehappy/tigate/pkg/sink/codec/encoder" + "github.com/flowbehappy/tigate/pkg/sink/codec/maxwell" + "github.com/flowbehappy/tigate/pkg/sink/codec/open" + "github.com/flowbehappy/tigate/pkg/sink/codec/simple" + "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/sink/codec/common" +) + +func NewRowEventEncoder(ctx context.Context, cfg *common.Config) (encoder.RowEventEncoder, error) { + switch cfg.Protocol { + case config.ProtocolDefault, config.ProtocolOpen: + return open.NewBatchEncoder(ctx, cfg) + case config.ProtocolCanal: + return canal.NewBatchEncoder(cfg) + case config.ProtocolAvro: + return avro.NewAvroEncoder(ctx, cfg) + case config.ProtocolMaxwell: + return maxwell.NewBatchEncoder(cfg), nil + case config.ProtocolCanalJSON: + return canal.NewJSONRowEventEncoder(ctx, cfg) + case config.ProtocolCraft: + return craft.NewBatchEncoder(cfg), nil + case config.ProtocolDebezium: + return debezium.NewBatchEncoder(cfg, config.GetGlobalServerConfig().ClusterID), nil + case config.ProtocolSimple: + return simple.NewEncoder(ctx, cfg) + default: + return nil, cerror.ErrSinkUnknownProtocol.GenWithStackByArgs(cfg.Protocol) + } +} diff --git a/pkg/sink/codec/encoder_group.go b/pkg/sink/codec/encoder_group.go index 158b3f4bc..11e26b1d9 100644 --- a/pkg/sink/codec/encoder_group.go +++ b/pkg/sink/codec/encoder_group.go @@ -20,11 +20,12 @@ import ( "time" "github.com/flowbehappy/tigate/pkg/common" + "github.com/flowbehappy/tigate/pkg/sink/codec/encoder" "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" - helper "github.com/pingcap/tiflow/pkg/sink/codec/common" + ticommon "github.com/pingcap/tiflow/pkg/sink/codec/common" "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -49,13 +50,14 @@ type EncoderGroup interface { type encoderGroup struct { changefeedID model.ChangeFeedID - builder RowEventEncoderBuilder // concurrency is the number of encoder pipelines to run concurrency int // inputCh is the input channel for each encoder pipeline inputCh []chan *future index uint64 + rowEventEncoders []encoder.RowEventEncoder + outputCh chan *future bootstrapWorker *bootstrapWorker @@ -63,8 +65,9 @@ type encoderGroup struct { // NewEncoderGroup creates a new EncoderGroup instance func NewEncoderGroup( + ctx context.Context, cfg *config.SinkConfig, - builder RowEventEncoderBuilder, + encoderConfig *ticommon.Config, changefeedID model.ChangeFeedID, ) *encoderGroup { concurrency := util.GetOrZero(cfg.EncoderConcurrency) @@ -72,17 +75,29 @@ func NewEncoderGroup( concurrency = config.DefaultEncoderGroupConcurrency } inputCh := make([]chan *future, concurrency) + rowEventEncoders := make([]encoder.RowEventEncoder, concurrency) + var err error for i := 0; i < concurrency; i++ { inputCh[i] = make(chan *future, defaultInputChanSize) + rowEventEncoders[i], err = NewRowEventEncoder(ctx, encoderConfig) + if err != nil { + log.Error("failed to create row event encoder", zap.Error(err)) + return nil + } } outCh := make(chan *future, defaultInputChanSize*concurrency) var bootstrapWorker *bootstrapWorker if cfg.ShouldSendBootstrapMsg() { + rowEventEncoder, err := NewRowEventEncoder(ctx, encoderConfig) + if err != nil { + log.Error("failed to create row event encoder", zap.Error(err)) + return nil + } bootstrapWorker = newBootstrapWorker( changefeedID, outCh, - builder.Build(), + rowEventEncoder, util.GetOrZero(cfg.SendBootstrapIntervalInSec), util.GetOrZero(cfg.SendBootstrapInMsgCount), util.GetOrZero(cfg.SendBootstrapToAllPartition), @@ -91,13 +106,13 @@ func NewEncoderGroup( } return &encoderGroup{ - changefeedID: changefeedID, - builder: builder, - concurrency: concurrency, - inputCh: inputCh, - index: 0, - outputCh: outCh, - bootstrapWorker: bootstrapWorker, + changefeedID: changefeedID, + rowEventEncoders: rowEventEncoders, + concurrency: concurrency, + inputCh: inputCh, + index: 0, + outputCh: outCh, + bootstrapWorker: bootstrapWorker, } } @@ -126,7 +141,6 @@ func (g *encoderGroup) Run(ctx context.Context) error { } func (g *encoderGroup) runEncoder(ctx context.Context, idx int) error { - encoder := g.builder.Build() inputCh := g.inputCh[idx] metric := encoderGroupInputChanSizeGauge. WithLabelValues(g.changefeedID.Namespace, g.changefeedID.ID, strconv.Itoa(idx)) @@ -140,12 +154,13 @@ func (g *encoderGroup) runEncoder(ctx context.Context, idx int) error { metric.Set(float64(len(inputCh))) case future := <-inputCh: for _, event := range future.events { - err := encoder.AppendRowChangedEvent(ctx, future.Key.Topic, event.Event, event.Callback) + err := g.rowEventEncoders[idx].AppendRowChangedEvent(ctx, future.Key.Topic, event.Event, event.Callback) if err != nil { return errors.Trace(err) } } - future.Messages = encoder.Build() + future.Messages = g.rowEventEncoders[idx].Build() + // TODO:是不是要用后清零 close(future.done) } } @@ -187,8 +202,10 @@ func (g *encoderGroup) Output() <-chan *future { func (g *encoderGroup) cleanMetrics() { encoderGroupInputChanSizeGauge.DeleteLabelValues(g.changefeedID.Namespace, g.changefeedID.ID) - g.builder.CleanMetrics() - helper.CleanMetrics(g.changefeedID) + for _, encoder := range g.rowEventEncoders { + encoder.Clean() + } + ticommon.CleanMetrics(g.changefeedID) } // future is a wrapper of the result of encoding events @@ -197,7 +214,7 @@ func (g *encoderGroup) cleanMetrics() { type future struct { Key model.TopicPartitionKey events []*common.RowEvent - Messages []*helper.Message + Messages []*ticommon.Message done chan struct{} } diff --git a/pkg/sink/codec/maxwell/maxwell_encoder.go b/pkg/sink/codec/maxwell/maxwell_encoder.go index f745b1dbb..17619b534 100644 --- a/pkg/sink/codec/maxwell/maxwell_encoder.go +++ b/pkg/sink/codec/maxwell/maxwell_encoder.go @@ -19,7 +19,7 @@ import ( "encoding/binary" "github.com/flowbehappy/tigate/pkg/common" - "github.com/flowbehappy/tigate/pkg/sink/codec" + "github.com/flowbehappy/tigate/pkg/sink/codec/encoder" "github.com/pingcap/errors" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" @@ -36,6 +36,18 @@ type BatchEncoder struct { config *ticommon.Config } +// newBatchEncoder creates a new maxwell BatchEncoder. +func NewBatchEncoder(config *ticommon.Config) encoder.RowEventEncoder { + batch := &BatchEncoder{ + keyBuf: &bytes.Buffer{}, + valueBuf: &bytes.Buffer{}, + callbackBuf: make([]func(), 0), + config: config, + } + batch.reset() + return batch +} + // EncodeCheckpointEvent implements the RowEventEncoder interface func (d *BatchEncoder) EncodeCheckpointEvent(ts uint64) (*ticommon.Message, error) { // For maxwell now, there is no such a corresponding type to ResolvedEvent so far. @@ -107,37 +119,8 @@ func (d *BatchEncoder) reset() { d.valueBuf.Reset() d.batchSize = 0 var versionByte [8]byte - binary.BigEndian.PutUint64(versionByte[:], codec.BatchVersion1) + binary.BigEndian.PutUint64(versionByte[:], encoder.BatchVersion1) d.keyBuf.Write(versionByte[:]) } -// newBatchEncoder creates a new maxwell BatchEncoder. -func newBatchEncoder(config *ticommon.Config) codec.RowEventEncoder { - batch := &BatchEncoder{ - keyBuf: &bytes.Buffer{}, - valueBuf: &bytes.Buffer{}, - callbackBuf: make([]func(), 0), - config: config, - } - batch.reset() - return batch -} - -type batchEncoderBuilder struct { - config *ticommon.Config -} - -// NewBatchEncoderBuilder creates a maxwell batchEncoderBuilder. -func NewBatchEncoderBuilder(config *ticommon.Config) codec.RowEventEncoderBuilder { - return &batchEncoderBuilder{ - config: config, - } -} - -// Build a `maxwellBatchEncoder` -func (b *batchEncoderBuilder) Build() codec.RowEventEncoder { - return newBatchEncoder(b.config) -} - -// CleanMetrics do nothing -func (b *batchEncoderBuilder) CleanMetrics() {} +func (d *BatchEncoder) Clean() {} diff --git a/pkg/sink/codec/open/open_protocol_decoder.go b/pkg/sink/codec/open/open_protocol_decoder.go index b1782feaf..13f808e9d 100644 --- a/pkg/sink/codec/open/open_protocol_decoder.go +++ b/pkg/sink/codec/open/open_protocol_decoder.go @@ -22,7 +22,8 @@ import ( "time" "github.com/flowbehappy/tigate/pkg/common" - "github.com/flowbehappy/tigate/pkg/sink/codec" + "github.com/flowbehappy/tigate/pkg/sink/codec/decoder" + "github.com/flowbehappy/tigate/pkg/sink/codec/encoder" "github.com/flowbehappy/tigate/pkg/sink/codec/internal" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -52,7 +53,7 @@ type BatchDecoder struct { } // NewBatchDecoder creates a new BatchDecoder. -func NewBatchDecoder(ctx context.Context, config *ticommon.Config, db *sql.DB) (codec.RowEventDecoder, error) { +func NewBatchDecoder(ctx context.Context, config *ticommon.Config, db *sql.DB) (decoder.RowEventDecoder, error) { var ( externalStorage storage.ExternalStorage err error @@ -85,7 +86,7 @@ func (b *BatchDecoder) AddKeyValue(key, value []byte) error { } version := binary.BigEndian.Uint64(key[:8]) key = key[8:] - if version != codec.BatchVersion1 { + if version != encoder.BatchVersion1 { return cerror.ErrOpenProtocolCodecInvalidData. GenWithStack("unexpected key format version") } @@ -319,7 +320,7 @@ func (b *BatchDecoder) assembleEventFromClaimCheckStorage(ctx context.Context) ( } version := binary.BigEndian.Uint64(claimCheckM.Key[:8]) - if version != codec.BatchVersion1 { + if version != encoder.BatchVersion1 { return nil, cerror.ErrOpenProtocolCodecInvalidData. GenWithStack("unexpected key format version") } diff --git a/pkg/sink/codec/open/open_protocol_encoder.go b/pkg/sink/codec/open/open_protocol_encoder.go index e4f6d8fb1..660c2d07c 100644 --- a/pkg/sink/codec/open/open_protocol_encoder.go +++ b/pkg/sink/codec/open/open_protocol_encoder.go @@ -19,7 +19,7 @@ import ( "encoding/binary" "github.com/flowbehappy/tigate/pkg/common" - "github.com/flowbehappy/tigate/pkg/sink/codec" + "github.com/flowbehappy/tigate/pkg/sink/codec/encoder" "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" @@ -149,7 +149,7 @@ func (d *BatchEncoder) AppendRowChangedEvent( // Before we create a new message, we should handle the previous callbacks. d.tryBuildCallback() versionHead := make([]byte, 8) - binary.BigEndian.PutUint64(versionHead, codec.BatchVersion1) + binary.BigEndian.PutUint64(versionHead, encoder.BatchVersion1) msg := ticommon.NewMsg(config.ProtocolOpen, versionHead, nil, 0, model.MessageTypeRow, nil, nil) d.messageBuf = append(d.messageBuf, msg) @@ -183,7 +183,7 @@ func (d *BatchEncoder) AppendRowChangedEvent( func newMessage(key, value []byte) *ticommon.Message { versionHead := make([]byte, 8) - binary.BigEndian.PutUint64(versionHead, codec.BatchVersion1) + binary.BigEndian.PutUint64(versionHead, encoder.BatchVersion1) message := ticommon.NewMsg(config.ProtocolOpen, versionHead, nil, 0, model.MessageTypeRow, nil, nil) var ( @@ -227,7 +227,7 @@ func (d *BatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*ticommon.Message, err keyBuf := new(bytes.Buffer) var versionByte [8]byte - binary.BigEndian.PutUint64(versionByte[:], codec.BatchVersion1) + binary.BigEndian.PutUint64(versionByte[:], encoder.BatchVersion1) keyBuf.Write(versionByte[:]) keyBuf.Write(keyLenByte[:]) keyBuf.Write(key) @@ -255,7 +255,7 @@ func (d *BatchEncoder) EncodeCheckpointEvent(ts uint64) (*ticommon.Message, erro keyBuf := new(bytes.Buffer) var versionByte [8]byte - binary.BigEndian.PutUint64(versionByte[:], codec.BatchVersion1) + binary.BigEndian.PutUint64(versionByte[:], encoder.BatchVersion1) keyBuf.Write(versionByte[:]) keyBuf.Write(keyLenByte[:]) keyBuf.Write(key) @@ -332,40 +332,20 @@ func (d *BatchEncoder) newClaimCheckLocationMessage( return key, value, nil } -type batchEncoderBuilder struct { - claimCheck *claimcheck.ClaimCheck - config *ticommon.Config -} - -// Build a BatchEncoder -func (b *batchEncoderBuilder) Build() codec.RowEventEncoder { - return NewBatchEncoder(b.config, b.claimCheck) -} - -func (b *batchEncoderBuilder) CleanMetrics() { - if b.claimCheck != nil { - b.claimCheck.CleanMetrics() +func (d *BatchEncoder) Clean() { + if d.claimCheck != nil { + d.claimCheck.CleanMetrics() } } -// NewBatchEncoderBuilder creates an open-protocol batchEncoderBuilder. -func NewBatchEncoderBuilder( - ctx context.Context, config *ticommon.Config, -) (codec.RowEventEncoderBuilder, error) { +// NewBatchEncoder creates a new BatchEncoder. +func NewBatchEncoder(ctx context.Context, config *ticommon.Config) (encoder.RowEventEncoder, error) { claimCheck, err := claimcheck.New(ctx, config.LargeMessageHandle, config.ChangefeedID) if err != nil { return nil, errors.Trace(err) } - return &batchEncoderBuilder{ - config: config, - claimCheck: claimCheck, - }, nil -} - -// NewBatchEncoder creates a new BatchEncoder. -func NewBatchEncoder(config *ticommon.Config, claimCheck *claimcheck.ClaimCheck) codec.RowEventEncoder { return &BatchEncoder{ config: config, claimCheck: claimCheck, - } + }, nil } diff --git a/pkg/sink/codec/open/open_protocol_message.go b/pkg/sink/codec/open/open_protocol_message.go index 369073773..0bc5d6132 100644 --- a/pkg/sink/codec/open/open_protocol_message.go +++ b/pkg/sink/codec/open/open_protocol_message.go @@ -20,7 +20,7 @@ import ( "strings" "github.com/flowbehappy/tigate/pkg/common" - "github.com/flowbehappy/tigate/pkg/sink/codec" + "github.com/flowbehappy/tigate/pkg/sink/codec/encoder" "github.com/flowbehappy/tigate/pkg/sink/codec/internal" timodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tiflow/cdc/model" @@ -93,7 +93,7 @@ func (m *messageRow) dropNotUpdatedColumns() { continue } // value equal - if codec.IsColumnValueEqual(oldValue.Value, value.Value) { + if encoder.IsColumnValueEqual(oldValue.Value, value.Value) { delete(m.PreColumns, col) } } diff --git a/pkg/sink/codec/simple/encoder.go b/pkg/sink/codec/simple/encoder.go index b547d5640..8cf79157f 100644 --- a/pkg/sink/codec/simple/encoder.go +++ b/pkg/sink/codec/simple/encoder.go @@ -17,7 +17,7 @@ import ( "context" "github.com/flowbehappy/tigate/pkg/common" - "github.com/flowbehappy/tigate/pkg/sink/codec" + "github.com/flowbehappy/tigate/pkg/sink/codec/encoder" "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" @@ -28,7 +28,7 @@ import ( "go.uber.org/zap" ) -type encoder struct { +type Encoder struct { messages []*ticommon.Message config *ticommon.Config @@ -36,8 +36,25 @@ type encoder struct { marshaller marshaller } +func NewEncoder(ctx context.Context, config *ticommon.Config) (encoder.RowEventEncoder, error) { + claimCheck, err := claimcheck.New(ctx, config.LargeMessageHandle, config.ChangefeedID) + if err != nil { + return nil, errors.Trace(err) + } + marshaller, err := newMarshaller(config) + if err != nil { + return nil, errors.Trace(err) + } + return &Encoder{ + messages: make([]*ticommon.Message, 0, 1), + config: config, + claimCheck: claimCheck, + marshaller: marshaller, + }, nil +} + // AppendRowChangedEvent implement the RowEventEncoder interface -func (e *encoder) AppendRowChangedEvent( +func (e *Encoder) AppendRowChangedEvent( ctx context.Context, _ string, event *common.RowChangedEvent, callback func(), ) error { value, err := e.marshaller.MarshalRowChangedEvent(event, false, "") @@ -114,7 +131,7 @@ func (e *encoder) AppendRowChangedEvent( } // Build implement the RowEventEncoder interface -func (e *encoder) Build() []*ticommon.Message { +func (e *Encoder) Build() []*ticommon.Message { var result []*ticommon.Message if len(e.messages) != 0 { result = e.messages @@ -124,7 +141,7 @@ func (e *encoder) Build() []*ticommon.Message { } // EncodeCheckpointEvent implement the DDLEventBatchEncoder interface -func (e *encoder) EncodeCheckpointEvent(ts uint64) (*ticommon.Message, error) { +func (e *Encoder) EncodeCheckpointEvent(ts uint64) (*ticommon.Message, error) { value, err := e.marshaller.MarshalCheckpoint(ts) if err != nil { return nil, err @@ -136,7 +153,7 @@ func (e *encoder) EncodeCheckpointEvent(ts uint64) (*ticommon.Message, error) { } // EncodeDDLEvent implement the DDLEventBatchEncoder interface -func (e *encoder) EncodeDDLEvent(event *model.DDLEvent) (*ticommon.Message, error) { +func (e *Encoder) EncodeDDLEvent(event *model.DDLEvent) (*ticommon.Message, error) { value, err := e.marshaller.MarshalDDLEvent(event) if err != nil { return nil, err @@ -159,39 +176,9 @@ func (e *encoder) EncodeDDLEvent(event *model.DDLEvent) (*ticommon.Message, erro return result, nil } -type builder struct { - config *ticommon.Config - claimCheck *claimcheck.ClaimCheck - marshaller marshaller -} - -// NewBuilder returns a new builder -func NewBuilder(ctx context.Context, config *ticommon.Config) (*builder, error) { - claimCheck, err := claimcheck.New(ctx, config.LargeMessageHandle, config.ChangefeedID) - if err != nil { - return nil, errors.Trace(err) - } - m, err := newMarshaller(config) - return &builder{ - config: config, - claimCheck: claimCheck, - marshaller: m, - }, errors.Trace(err) -} - -// Build implement the RowEventEncoderBuilder interface -func (b *builder) Build() codec.RowEventEncoder { - return &encoder{ - messages: make([]*ticommon.Message, 0, 1), - config: b.config, - claimCheck: b.claimCheck, - marshaller: b.marshaller, - } -} - // CleanMetrics implement the RowEventEncoderBuilder interface -func (b *builder) CleanMetrics() { - if b.claimCheck != nil { - b.claimCheck.CleanMetrics() +func (e *Encoder) Clean() { + if e.claimCheck != nil { + e.claimCheck.CleanMetrics() } }