diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..31a7ca0 --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,69 @@ +run: + skip-dirs: + - docs + - datadog + - kustomize + skip-files: + - 'wire_gen.go' + tests: false +linters-settings: + errcheck: + check-type-assertions: true + check-blank: true + gci: + sections: + - standard + - default + gosimple: + go: '1.17' + govet: + check-shadowing: true + settings: + printf: + funcs: + - (gitlab.zgtools.net/devex/archetypes/gomods/zlog.Logger).Debug + - (gitlab.zgtools.net/devex/archetypes/gomods/zlog.Logger).Info + - (gitlab.zgtools.net/devex/archetypes/gomods/zlog.Logger).Warn + - (gitlab.zgtools.net/devex/archetypes/gomods/zlog.Logger).Error + depguard: + rules: + Main: + files: + - $all + - "!$test" + deny: + - github.com/satori/go.uuid: Prefer "github.com/google/uuid" + disable-all: true + enable: + - asciicheck + - bidichk + - bodyclose + - cyclop + - decorder + - depguard + - deadcode + - dupl + - errcheck + - errchkjson + - errname + - errorlint + - exportloopref + - gci + - gocognit + - goconst + - gocritic + - gocyclo + - gofmt + - gosimple + - govet + - ineffassign + - nolintlint + - prealloc + - staticcheck + - structcheck + - typecheck + - unconvert + - unparam + - unused + - varcheck + - whitespace diff --git a/Makefile b/Makefile index 1396851..20577ca 100644 --- a/Makefile +++ b/Makefile @@ -1,3 +1,6 @@ +# Directories containing independent Go modules. +MODULE_DIRS = . + .PHONY: test-no-setup test-no-setup: ./coverage.sh @@ -20,4 +23,14 @@ example-producer: .PHONY: example-worker example-worker: - go run example/worker/worker.go \ No newline at end of file + go run example/worker/worker.go + +.PHONY: lint +lint: golangci-lint + +.PHONY: golangci-lint +golangci-lint: + @$(foreach mod,$(MODULE_DIRS), \ + (cd $(mod) && \ + echo "[lint] golangci-lint: $(mod)" && \ + golangci-lint run --path-prefix $(mod) ./...) &&) true diff --git a/client.go b/client.go index 6f5cdc3..c6f0b93 100644 --- a/client.go +++ b/client.go @@ -133,17 +133,16 @@ func (c *Client) Writer(_ context.Context, topicConfig ProducerTopicConfig, opts } func getFormatter(topicConfig TopicConfig) (zfmt.Formatter, error) { - var fmtter zfmt.Formatter switch topicConfig.GetFormatter() { case CustomFmt: - fmtter = &noopFormatter{} + return &noopFormatter{}, nil default: - fmtter, _ = zfmt.GetFormatter(topicConfig.GetFormatter(), topicConfig.GetSchemaID()) - } - if fmtter == nil { - return nil, fmt.Errorf("unsupported formatter %s", topicConfig.GetFormatter()) + f, err := zfmt.GetFormatter(topicConfig.GetFormatter(), topicConfig.GetSchemaID()) + if err != nil { + return nil, fmt.Errorf("unsupported formatter %s", topicConfig.GetFormatter()) + } + return f, nil } - return fmtter, nil } // Close terminates all cached readers and writers gracefully. diff --git a/example/worker/bench/main.go b/example/worker/bench/main.go index 9459bad..eae21ba 100644 --- a/example/worker/bench/main.go +++ b/example/worker/bench/main.go @@ -48,7 +48,9 @@ func main() { ) ctx, c := context.WithTimeout(context.Background(), 2*time.Minute) defer c() - w.Run(ctx, nil) + if err := w.Run(ctx, nil); err != nil { + log.Panic(err) + } } type kafkaProcessorError struct{} diff --git a/example/worker/worker.go b/example/worker/worker.go index 832c42f..a41d314 100644 --- a/example/worker/worker.go +++ b/example/worker/worker.go @@ -61,7 +61,9 @@ func main() { // Register a processor which is executed per message. // Speedup is used to create multiple processor goroutines. Order is still maintained with this setup by way of `virtual partitions` work := wf.Create(topicConfig, &Processor{}, zkafka.Speedup(5)) - work.Run(ctx, shutdown) + if err := work.Run(ctx, shutdown); err != nil { + log.Panic(err) + } } type Processor struct{} diff --git a/message.go b/message.go index dcad019..fdaee5f 100644 --- a/message.go +++ b/message.go @@ -65,6 +65,7 @@ func makeProducerMessageRaw(_ context.Context, serviceName, topic string, key *s Key: obsKeyOriginService, Value: []byte(serviceName), }) + //nolint:errcheck // Its not particularly noteworthy if if host isn't propagated forward. We'll suppress the error hostname, _ := os.Hostname() // hn is empty string if there's an error kafkaMessage.Headers = append(kafkaMessage.Headers, kafka.Header{ diff --git a/writer.go b/writer.go index b14bea1..2224742 100644 --- a/writer.go +++ b/writer.go @@ -126,7 +126,11 @@ func (w *KWriter) WriteRaw(ctx context.Context, key *string, value []byte, opts e := <-deliveryChan w.lifecyclePostAck(ctx, begin) - m := e.(*kafka.Message) + + m, ok := e.(*kafka.Message) + if !ok { + return Response{}, errors.New("unexpected message delivered on kafka delivery channel") + } span.SetAttributes( semconv.MessagingMessageIDKey.Int64(int64(m.TopicPartition.Offset)),