From 1f9131d0d31ae6365295f085d2a563b012bb581f Mon Sep 17 00:00:00 2001 From: Sam Wang Date: Fri, 20 Sep 2024 13:15:24 +0800 Subject: [PATCH] fix: update golang generic and kafka three_primary key --- internal/app/crawler/broadcastor.go | 4 +-- internal/app/crawler/payload.go | 4 +-- internal/app/entity/convert/broadcast.go | 2 +- internal/app/entity/convert/concentration.go | 2 +- internal/app/entity/convert/convert.go | 2 +- internal/app/entity/convert/dailyclose.go | 2 +- internal/app/entity/convert/stock.go | 2 +- internal/app/entity/convert/threeprimary.go | 2 +- internal/app/entity/stakeconcentration.go | 2 +- internal/app/entity/threeprimary.go | 2 +- internal/app/parser/concentration.go | 4 +-- internal/app/parser/concentration_test.go | 2 +- internal/app/parser/csv.go | 4 +-- internal/app/parser/csv_test.go | 2 +- internal/app/parser/parser.go | 12 ++++---- internal/app/parser/stocks.go | 4 +-- internal/app/parser/stocks_test.go | 2 +- internal/app/server/serve.go | 27 ----------------- internal/app/services/entities.go | 8 ++--- internal/app/services/entities_test.go | 32 ++++++++++---------- internal/app/services/services.go | 8 ++--- internal/cache/mocks/redis.go | 8 ++--- internal/cronjob/cronjob.go | 4 +-- internal/cronjob/mocks/cronjobs.go | 2 +- internal/kafka/mocks/kafka.go | 4 +-- 25 files changed, 60 insertions(+), 87 deletions(-) diff --git a/internal/app/crawler/broadcastor.go b/internal/app/crawler/broadcastor.go index 6096b20..47141db 100644 --- a/internal/app/crawler/broadcastor.go +++ b/internal/app/crawler/broadcastor.go @@ -39,7 +39,7 @@ func (b *broadcastor) Process(_ context.Context, pipe pipeline.Payload) (pipelin if payload.Strategy == convert.StakeConcentration { if st := b.cacheInMemory(payload.ParsedContent); st != nil { intercept = convert.InterceptData{ - Data: &[]interface{}{st}, + Data: &[]any{st}, Type: payload.Strategy, } } @@ -57,7 +57,7 @@ func (b *broadcastor) Process(_ context.Context, pipe pipeline.Payload) (pipelin return pipe, nil } -func (b *broadcastor) cacheInMemory(data *[]interface{}) *entity.StakeConcentration { +func (b *broadcastor) cacheInMemory(data *[]any) *entity.StakeConcentration { for _, v := range *data { if val, ok := v.(*entity.StakeConcentration); ok { b.memCache[val.StockID] = append(b.memCache[val.StockID], val) diff --git a/internal/app/crawler/payload.go b/internal/app/crawler/payload.go index 454b805..3e57a73 100644 --- a/internal/app/crawler/payload.go +++ b/internal/app/crawler/payload.go @@ -30,13 +30,13 @@ var ( _ pipeline.Payload = (*crawlerPayload)(nil) payloadPool = sync.Pool{ - New: func() interface{} { return new(crawlerPayload) }, + New: func() any { return new(crawlerPayload) }, } ) type crawlerPayload struct { RetrievedAt time.Time - ParsedContent *[]interface{} + ParsedContent *[]any URL string Date string RawContent bytes.Buffer diff --git a/internal/app/entity/convert/broadcast.go b/internal/app/entity/convert/broadcast.go index f3a3447..6f97eda 100644 --- a/internal/app/entity/convert/broadcast.go +++ b/internal/app/entity/convert/broadcast.go @@ -1,6 +1,6 @@ package convert type InterceptData struct { - Data *[]interface{} + Data *[]any Type Source } diff --git a/internal/app/entity/convert/concentration.go b/internal/app/entity/convert/concentration.go index ca2d06b..7833d56 100644 --- a/internal/app/entity/convert/concentration.go +++ b/internal/app/entity/convert/concentration.go @@ -26,7 +26,7 @@ func Concentration() IConvert { return &concentrationImpl{} } -func (c *concentrationImpl) Execute(data *Data) interface{} { +func (c *concentrationImpl) Execute(data *Data) any { var output *entity.StakeConcentration if data == nil || len(data.RawData) < 7 { return output diff --git a/internal/app/entity/convert/convert.go b/internal/app/entity/convert/convert.go index 337cb91..ef6a684 100644 --- a/internal/app/entity/convert/convert.go +++ b/internal/app/entity/convert/convert.go @@ -35,5 +35,5 @@ type Data struct { // Use strategy pattern to convert entities from parser type IConvert interface { - Execute(data *Data) interface{} + Execute(data *Data) any } diff --git a/internal/app/entity/convert/dailyclose.go b/internal/app/entity/convert/dailyclose.go index 30d7bbd..a6e5d6e 100644 --- a/internal/app/entity/convert/dailyclose.go +++ b/internal/app/entity/convert/dailyclose.go @@ -28,7 +28,7 @@ func DailyClose() IConvert { return &dailyCloseImpl{} } -func (c *dailyCloseImpl) Execute(data *Data) interface{} { +func (c *dailyCloseImpl) Execute(data *Data) any { var output *entity.DailyClose if data == nil || len(data.RawData) < 17 { return output diff --git a/internal/app/entity/convert/stock.go b/internal/app/entity/convert/stock.go index 29868c4..5a2c174 100644 --- a/internal/app/entity/convert/stock.go +++ b/internal/app/entity/convert/stock.go @@ -28,7 +28,7 @@ func Stock() IConvert { return &stockImpl{} } -func (c *stockImpl) Execute(data *Data) interface{} { +func (c *stockImpl) Execute(data *Data) any { var output *entity.Stock if data == nil || len(data.RawData) < maxLength { return output diff --git a/internal/app/entity/convert/threeprimary.go b/internal/app/entity/convert/threeprimary.go index 71f9d45..4659e75 100644 --- a/internal/app/entity/convert/threeprimary.go +++ b/internal/app/entity/convert/threeprimary.go @@ -27,7 +27,7 @@ func ThreePrimary() IConvert { return &threePrimaryImpl{} } -func (c *threePrimaryImpl) Execute(data *Data) interface{} { +func (c *threePrimaryImpl) Execute(data *Data) any { var output *entity.ThreePrimary if data == nil || len(data.RawData) < 19 { return output diff --git a/internal/app/entity/stakeconcentration.go b/internal/app/entity/stakeconcentration.go index f0ea4f7..34f75a7 100644 --- a/internal/app/entity/stakeconcentration.go +++ b/internal/app/entity/stakeconcentration.go @@ -23,7 +23,7 @@ import ( //nolint:nolintlint, gochecknoglobals var concentrationPool = sync.Pool{ - New: func() interface{} { return new(StakeConcentration) }, + New: func() any { return new(StakeConcentration) }, } type StakeConcentration struct { diff --git a/internal/app/entity/threeprimary.go b/internal/app/entity/threeprimary.go index 96a6d1f..50f3044 100644 --- a/internal/app/entity/threeprimary.go +++ b/internal/app/entity/threeprimary.go @@ -15,7 +15,7 @@ package entity type ThreePrimary struct { StockID string `json:"stockId"` - Date string `json:"exchangeDate"` + Date string `json:"date"` ForeignTradeShares int64 `json:"foreignTradeShares"` TrustTradeShares int64 `json:"trustTradeShares"` DealerTradeShares int64 `json:"dealerTradeShares"` diff --git a/internal/app/parser/concentration.go b/internal/app/parser/concentration.go index 78c4d1e..b882233 100644 --- a/internal/app/parser/concentration.go +++ b/internal/app/parser/concentration.go @@ -35,10 +35,10 @@ type concentrationStrategy struct { func (s *concentrationStrategy) Parse( input io.Reader, additional ...string, -) ([]interface{}, error) { +) ([]any, error) { var records []string - var output []interface{} + var output []any var isColumn, isTitle, startParsing bool diff --git a/internal/app/parser/concentration_test.go b/internal/app/parser/concentration_test.go index 00bf2e3..318c673 100644 --- a/internal/app/parser/concentration_test.go +++ b/internal/app/parser/concentration_test.go @@ -79,7 +79,7 @@ func TestParseConcentration(t *testing.T) { t.Parallel() res := &parserImpl{ - result: &[]interface{}{}, + result: &[]any{}, } res.SetStrategy(convert.StakeConcentration, "2023-01-10") res.Execute(*bytes.NewBuffer([]byte(tt.content)), "https://stockchannelnew.sinotrade.com.tw/z/zc/zco/zco_3704_1.djhtm") diff --git a/internal/app/parser/csv.go b/internal/app/parser/csv.go index acc0785..adb0781 100644 --- a/internal/app/parser/csv.go +++ b/internal/app/parser/csv.go @@ -32,12 +32,12 @@ type csvStrategy struct { } //nolint:nolintlint, cyclop -func (s *csvStrategy) Parse(input io.Reader, _ ...string) ([]interface{}, error) { +func (s *csvStrategy) Parse(input io.Reader, _ ...string) ([]any, error) { if s.date == "" { return nil, ErrParseDayMissing } - var output []interface{} + var output []any reader := csv.NewReader(input) reader.Comma = ',' diff --git a/internal/app/parser/csv_test.go b/internal/app/parser/csv_test.go index e355ea7..6a62dba 100644 --- a/internal/app/parser/csv_test.go +++ b/internal/app/parser/csv_test.go @@ -74,7 +74,7 @@ func TestParseCsv(t *testing.T) { t.Parallel() res := &parserImpl{ - result: &[]interface{}{}, + result: &[]any{}, } res.SetStrategy(tt.target, "20211130") diff --git a/internal/app/parser/parser.go b/internal/app/parser/parser.go index 93213ad..0ebfbdd 100644 --- a/internal/app/parser/parser.go +++ b/internal/app/parser/parser.go @@ -38,11 +38,11 @@ const ( type Parser interface { SetStrategy(source convert.Source, additional ...string) Execute(in bytes.Buffer, additional ...string) error - Flush() *[]interface{} + Flush() *[]any } type Strategy interface { - Parse(in io.Reader, additional ...string) ([]interface{}, error) + Parse(in io.Reader, additional ...string) ([]any, error) } type Config struct { @@ -52,13 +52,13 @@ type Config struct { type parserImpl struct { cfg Config strategy Strategy - result *[]interface{} + result *[]any } func New(cfg Config) Parser { res := &parserImpl{ cfg: cfg, - result: &[]interface{}{}, + result: &[]any{}, } return res @@ -122,9 +122,9 @@ func (p *parserImpl) Execute(in bytes.Buffer, additional ...string) error { return nil } -func (p *parserImpl) Flush() *[]interface{} { +func (p *parserImpl) Flush() *[]any { res := *p.result - p.result = &[]interface{}{} + p.result = &[]any{} return &res } diff --git a/internal/app/parser/stocks.go b/internal/app/parser/stocks.go index c673ca6..d32c6f8 100644 --- a/internal/app/parser/stocks.go +++ b/internal/app/parser/stocks.go @@ -29,8 +29,8 @@ type htmlStrategy struct { } //nolint:nolintlint, cyclop, gocognit -func (s *htmlStrategy) Parse(input io.Reader, _ ...string) ([]interface{}, error) { - var output []interface{} +func (s *htmlStrategy) Parse(input io.Reader, _ ...string) ([]any, error) { + var output []any var records []string diff --git a/internal/app/parser/stocks_test.go b/internal/app/parser/stocks_test.go index 6268610..ebdbd80 100644 --- a/internal/app/parser/stocks_test.go +++ b/internal/app/parser/stocks_test.go @@ -62,7 +62,7 @@ func TestParseHtml(t *testing.T) { t.Parallel() res := &parserImpl{ - result: &[]interface{}{}, + result: &[]any{}, } res.SetStrategy(convert.TwseStockList) err := res.Execute(*bytes.NewBuffer([]byte(tt.content))) diff --git a/internal/app/server/serve.go b/internal/app/server/serve.go index 494e0ad..f823e1e 100644 --- a/internal/app/server/serve.go +++ b/internal/app/server/serve.go @@ -216,33 +216,6 @@ func (s *server) Run(ctx context.Context) error { }, }) - //nolint:nolintlint, errcheck - svc.Handler().CronDownload(ctx, &dto.StartCronjobRequest{ - Schedule: "30 15 * * 1-5", - Types: []convert.Source{ - convert.TwseDailyClose, - convert.TpexDailyClose, - }, - }) - - //nolint:nolintlint, errcheck - svc.Handler().CronDownload(ctx, &dto.StartCronjobRequest{ - Schedule: "30 17 * * 1-5", - Types: []convert.Source{ - convert.TwseThreePrimary, - convert.TpexThreePrimary, - }, - }) - - //nolint:nolintlint, errcheck - svc.Handler().CronDownload(ctx, &dto.StartCronjobRequest{ - Schedule: "40 18 * * 1-5", - Types: []convert.Source{ - convert.TwseThreePrimary, - convert.TpexThreePrimary, - }, - }) - requestChan := make(chan *dto.StartCronjobRequest) svc.Handler().ListeningDownloadRequest(ctx, requestChan) diff --git a/internal/app/services/entities.go b/internal/app/services/entities.go index 463b891..484744d 100644 --- a/internal/app/services/entities.go +++ b/internal/app/services/entities.go @@ -38,7 +38,7 @@ const ( //nolint:nolintlint, gochecknoglobals var jsoni = jsoniter.ConfigCompatibleWithStandardLibrary -func (s *serviceImpl) DailyCloseThroughKafka(ctx context.Context, objs *[]interface{}) error { +func (s *serviceImpl) DailyCloseThroughKafka(ctx context.Context, objs *[]any) error { for _, val := range *objs { if res, ok := val.(*entity.DailyClose); ok { b, err := jsoni.Marshal(res) @@ -67,7 +67,7 @@ func (s *serviceImpl) DailyCloseThroughKafka(ctx context.Context, objs *[]interf return nil } -func (s *serviceImpl) StockThroughKafka(ctx context.Context, objs *[]interface{}) error { +func (s *serviceImpl) StockThroughKafka(ctx context.Context, objs *[]any) error { for _, val := range *objs { if res, ok := val.(*entity.Stock); ok { b, err := jsoni.Marshal(res) @@ -96,7 +96,7 @@ func (s *serviceImpl) StockThroughKafka(ctx context.Context, objs *[]interface{} return nil } -func (s *serviceImpl) ThreePrimaryThroughKafka(ctx context.Context, objs *[]interface{}) error { +func (s *serviceImpl) ThreePrimaryThroughKafka(ctx context.Context, objs *[]any) error { for _, val := range *objs { if res, ok := val.(*entity.ThreePrimary); ok { b, err := jsoni.Marshal(res) @@ -127,7 +127,7 @@ func (s *serviceImpl) ThreePrimaryThroughKafka(ctx context.Context, objs *[]inte func (s *serviceImpl) StakeConcentrationThroughKafka( ctx context.Context, - objs *[]interface{}, + objs *[]any, ) error { for _, val := range *objs { if res, ok := val.(*entity.StakeConcentration); ok { diff --git a/internal/app/services/entities_test.go b/internal/app/services/entities_test.go index 7272837..bf7d1a4 100644 --- a/internal/app/services/entities_test.go +++ b/internal/app/services/entities_test.go @@ -38,7 +38,7 @@ func TestDailyCloseThroughKafka(t *testing.T) { t.Parallel() type args struct { - data *[]interface{} + data *[]any expectReturn error } @@ -50,7 +50,7 @@ func TestDailyCloseThroughKafka(t *testing.T) { { name: "successfully send daily close data through kafka", args: args{ - data: &[]interface{}{ + data: &[]any{ &entity.DailyClose{ StockID: "2330", Date: "20200101", @@ -71,7 +71,7 @@ func TestDailyCloseThroughKafka(t *testing.T) { { name: "failed to send correct data format through kafka", args: args{ - data: &[]interface{}{ + data: &[]any{ &entity.StakeConcentration{ StockID: "2330", }, @@ -83,7 +83,7 @@ func TestDailyCloseThroughKafka(t *testing.T) { { name: "failed to send daily close data due to kafka error", args: args{ - data: &[]interface{}{ + data: &[]any{ &entity.DailyClose{ StockID: "2330", }, @@ -136,7 +136,7 @@ func TestStockThroughKafka(t *testing.T) { t.Parallel() type args struct { - data *[]interface{} + data *[]any expectReturn error } @@ -148,7 +148,7 @@ func TestStockThroughKafka(t *testing.T) { { name: "successfully send stock data through kafka", args: args{ - data: &[]interface{}{ + data: &[]any{ &entity.Stock{ StockID: "2330", Name: "Test", @@ -161,7 +161,7 @@ func TestStockThroughKafka(t *testing.T) { { name: "failed to send correct data format through kafka", args: args{ - data: &[]interface{}{ + data: &[]any{ &entity.StakeConcentration{ StockID: "2330", }, @@ -173,7 +173,7 @@ func TestStockThroughKafka(t *testing.T) { { name: "failed to send stock data due to kafka error", args: args{ - data: &[]interface{}{ + data: &[]any{ &entity.Stock{ StockID: "2330", Name: "Test", @@ -227,7 +227,7 @@ func TestThreePrimaryThroughKafka(t *testing.T) { t.Parallel() type args struct { - data *[]interface{} + data *[]any expectReturn error } @@ -239,7 +239,7 @@ func TestThreePrimaryThroughKafka(t *testing.T) { { name: "successfully send three primary data through kafka", args: args{ - data: &[]interface{}{ + data: &[]any{ &entity.ThreePrimary{ StockID: "2330", }, @@ -251,7 +251,7 @@ func TestThreePrimaryThroughKafka(t *testing.T) { { name: "failed to send correct data format through kafka", args: args{ - data: &[]interface{}{ + data: &[]any{ &entity.StakeConcentration{ StockID: "2330", }, @@ -263,7 +263,7 @@ func TestThreePrimaryThroughKafka(t *testing.T) { { name: "failed to send three primary due to kafka error", args: args{ - data: &[]interface{}{ + data: &[]any{ &entity.ThreePrimary{ StockID: "2330", }, @@ -319,7 +319,7 @@ func TestStakeConcentrationThroughKafka(t *testing.T) { t.Parallel() type args struct { - data *[]interface{} + data *[]any date string expectReturn error } @@ -332,7 +332,7 @@ func TestStakeConcentrationThroughKafka(t *testing.T) { { name: "successfully send stake concentration data through kafka", args: args{ - data: &[]interface{}{ + data: &[]any{ &entity.StakeConcentration{ StockID: "2330", Date: "20220820", @@ -346,7 +346,7 @@ func TestStakeConcentrationThroughKafka(t *testing.T) { { name: "failed to send correct data format through kafka", args: args{ - data: &[]interface{}{ + data: &[]any{ &entity.Stock{ StockID: "2330", }, @@ -359,7 +359,7 @@ func TestStakeConcentrationThroughKafka(t *testing.T) { { name: "failed to send stake concentration due to kafka error", args: args{ - data: &[]interface{}{ + data: &[]any{ &entity.StakeConcentration{ StockID: "2330", Date: "20220820", diff --git a/internal/app/services/services.go b/internal/app/services/services.go index 88e90fe..7ac27e7 100644 --- a/internal/app/services/services.go +++ b/internal/app/services/services.go @@ -32,10 +32,10 @@ type IService interface { StartCron() StopCron() AddJob(ctx context.Context, spec string, job func()) error - DailyCloseThroughKafka(ctx context.Context, objs *[]interface{}) error - StockThroughKafka(ctx context.Context, objs *[]interface{}) error - ThreePrimaryThroughKafka(ctx context.Context, objs *[]interface{}) error - StakeConcentrationThroughKafka(ctx context.Context, objs *[]interface{}) error + DailyCloseThroughKafka(ctx context.Context, objs *[]any) error + StockThroughKafka(ctx context.Context, objs *[]any) error + ThreePrimaryThroughKafka(ctx context.Context, objs *[]any) error + StakeConcentrationThroughKafka(ctx context.Context, objs *[]any) error ObtainLock(ctx context.Context, key string, expire time.Duration) *redislock.Lock StopRedis() error StopKafka() error diff --git a/internal/cache/mocks/redis.go b/internal/cache/mocks/redis.go index 863a1a3..c22ddea 100644 --- a/internal/cache/mocks/redis.go +++ b/internal/cache/mocks/redis.go @@ -59,7 +59,7 @@ func (m *MockRedis) ObtainLock(ctx context.Context, key string, expire time.Dura } // ObtainLock indicates an expected call of ObtainLock. -func (mr *MockRedisMockRecorder) ObtainLock(ctx, key, expire interface{}) *gomock.Call { +func (mr *MockRedisMockRecorder) ObtainLock(ctx, key, expire any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ObtainLock", reflect.TypeOf((*MockRedis)(nil).ObtainLock), ctx, key, expire) } @@ -73,7 +73,7 @@ func (m *MockRedis) SAdd(ctx context.Context, key, value string) error { } // SAdd indicates an expected call of SAdd. -func (mr *MockRedisMockRecorder) SAdd(ctx, key, value interface{}) *gomock.Call { +func (mr *MockRedisMockRecorder) SAdd(ctx, key, value any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SAdd", reflect.TypeOf((*MockRedis)(nil).SAdd), ctx, key, value) } @@ -88,7 +88,7 @@ func (m *MockRedis) SMembers(ctx context.Context, key string) ([]string, error) } // SMembers indicates an expected call of SMembers. -func (mr *MockRedisMockRecorder) SMembers(ctx, key interface{}) *gomock.Call { +func (mr *MockRedisMockRecorder) SMembers(ctx, key any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SMembers", reflect.TypeOf((*MockRedis)(nil).SMembers), ctx, key) } @@ -102,7 +102,7 @@ func (m *MockRedis) SetExpire(ctx context.Context, key string, expired time.Time } // SetExpire indicates an expected call of SetExpire. -func (mr *MockRedisMockRecorder) SetExpire(ctx, key, expired interface{}) *gomock.Call { +func (mr *MockRedisMockRecorder) SetExpire(ctx, key, expired any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetExpire", reflect.TypeOf((*MockRedis)(nil).SetExpire), ctx, key, expired) } diff --git a/internal/cronjob/cronjob.go b/internal/cronjob/cronjob.go index d169655..49b14e8 100644 --- a/internal/cronjob/cronjob.go +++ b/internal/cronjob/cronjob.go @@ -58,13 +58,13 @@ func New(cfg Config) Cronjob { return job } -func (c Config) Info(_ string, keysAndValues ...interface{}) { +func (c Config) Info(_ string, keysAndValues ...any) { if len(keysAndValues) > 0 { c.Logger.Info().Msgf("cronjob.Run: success, data=%+v;", keysAndValues) } } -func (c Config) Error(_ error, _ string, keysAndValues ...interface{}) { +func (c Config) Error(_ error, _ string, keysAndValues ...any) { if len(keysAndValues) > 0 { c.Logger.Warn().Msgf("cronjob.Run: failed, data=%+v;", keysAndValues) } diff --git a/internal/cronjob/mocks/cronjobs.go b/internal/cronjob/mocks/cronjobs.go index e0f3060..4495978 100644 --- a/internal/cronjob/mocks/cronjobs.go +++ b/internal/cronjob/mocks/cronjobs.go @@ -43,7 +43,7 @@ func (m *MockCronjob) AddJob(ctx context.Context, spec string, job func()) error } // AddJob indicates an expected call of AddJob. -func (mr *MockCronjobMockRecorder) AddJob(ctx, spec, job interface{}) *gomock.Call { +func (mr *MockCronjobMockRecorder) AddJob(ctx, spec, job any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddJob", reflect.TypeOf((*MockCronjob)(nil).AddJob), ctx, spec, job) } diff --git a/internal/kafka/mocks/kafka.go b/internal/kafka/mocks/kafka.go index 65401ee..013ec2a 100644 --- a/internal/kafka/mocks/kafka.go +++ b/internal/kafka/mocks/kafka.go @@ -59,7 +59,7 @@ func (m *MockKafka) ReadMessage(ctx context.Context) (*kafka.ReceivedMessage, er } // ReadMessage indicates an expected call of ReadMessage. -func (mr *MockKafkaMockRecorder) ReadMessage(ctx interface{}) *gomock.Call { +func (mr *MockKafkaMockRecorder) ReadMessage(ctx any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadMessage", reflect.TypeOf((*MockKafka)(nil).ReadMessage), ctx) } @@ -73,7 +73,7 @@ func (m *MockKafka) WriteMessages(ctx context.Context, topic string, message []b } // WriteMessages indicates an expected call of WriteMessages. -func (mr *MockKafkaMockRecorder) WriteMessages(ctx, topic, message interface{}) *gomock.Call { +func (mr *MockKafkaMockRecorder) WriteMessages(ctx, topic, message any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteMessages", reflect.TypeOf((*MockKafka)(nil).WriteMessages), ctx, topic, message) }