Skip to content

Commit

Permalink
Updated spelling of fmtter -> formatter
Browse files Browse the repository at this point in the history
Updated compose.yaml to ping kafka/zookeeper versions
  • Loading branch information
stewartboyd119 committed Sep 23, 2024
1 parent dba790b commit 18ef0e3
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 56 deletions.
28 changes: 14 additions & 14 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,8 @@ func TestClient_Reader(t *testing.T) {
SessionTimeoutMillis: ptr(61000),
MaxPollIntervalMillis: ptr(61000),
},
logger: NoopLogger{},
fmtter: zfmtShim{&zfmt.AvroFormatter{}},
logger: NoopLogger{},
formatter: zfmtShim{&zfmt.AvroFormatter{}},
},
wantErr: false,
},
Expand Down Expand Up @@ -236,8 +236,8 @@ func TestClient_Reader(t *testing.T) {
SessionTimeoutMillis: ptr(20000),
MaxPollIntervalMillis: ptr(21000),
},
logger: NoopLogger{},
fmtter: zfmtShim{&zfmt.AvroFormatter{}},
logger: NoopLogger{},
formatter: zfmtShim{&zfmt.AvroFormatter{}},
},
wantErr: false,
},
Expand Down Expand Up @@ -296,7 +296,7 @@ func TestClient_Reader(t *testing.T) {
assertEqual(t, a, b, cmpopts.IgnoreUnexported(MockKafkaConsumer{}))
}
assertEqual(t, gotReader.logger, tt.want.logger)
assertEqual(t, gotReader.fmtter, tt.want.fmtter)
assertEqual(t, gotReader.formatter, tt.want.formatter)
}
})
}
Expand Down Expand Up @@ -364,10 +364,10 @@ func TestClient_Writer(t *testing.T) {
NagleDisable: ptr(true),
LingerMillis: 0,
},
logger: NoopLogger{},
tracer: noop.TracerProvider{}.Tracer(""),
p: propagation.TraceContext{},
fmtter: zfmtShim{&zfmt.ProtobufRawFormatter{}},
logger: NoopLogger{},
tracer: noop.TracerProvider{}.Tracer(""),
p: propagation.TraceContext{},
formatter: zfmtShim{&zfmt.ProtobufRawFormatter{}},
},
wantErr: false,
},
Expand All @@ -394,10 +394,10 @@ func TestClient_Writer(t *testing.T) {
NagleDisable: ptr(false),
LingerMillis: 1,
},
logger: NoopLogger{},
tracer: noop.TracerProvider{}.Tracer(""),
p: propagation.TraceContext{},
fmtter: zfmtShim{&zfmt.ProtobufRawFormatter{}},
logger: NoopLogger{},
tracer: noop.TracerProvider{}.Tracer(""),
p: propagation.TraceContext{},
formatter: zfmtShim{&zfmt.ProtobufRawFormatter{}},
},
wantErr: false,
},
Expand Down Expand Up @@ -446,7 +446,7 @@ func TestClient_Writer(t *testing.T) {

assertEqual(t, gotKWriter.topicConfig, tt.want.topicConfig)
assertEqual(t, gotKWriter.logger, tt.want.logger)
assertEqual(t, gotKWriter.fmtter, tt.want.fmtter)
assertEqual(t, gotKWriter.formatter, tt.want.formatter)
})
}
}
Expand Down
4 changes: 2 additions & 2 deletions example/compose.yaml
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
image: confluentinc/cp-zookeeper:7.7.1
container_name: zkafka-zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "22181:2181"
kafka:
image: confluentinc/cp-kafka:latest
image: confluentinc/cp-kafka:7.7.1
container_name: zkafka-broker
depends_on:
- zookeeper
Expand Down
6 changes: 3 additions & 3 deletions formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const (
JSONSchemaRegistry zfmt.FormatterType = "json_schema_registry"
)

var errMissingFmtter = errors.New("custom formatter is missing, did you forget to call WithFormatter()")
var errMissingFormatter = errors.New("custom formatter is missing, did you forget to call WithFormatter()")

// Formatter allows the user to extend formatting capability to unsupported data types
type Formatter interface {
Expand Down Expand Up @@ -83,12 +83,12 @@ type errFormatter struct{}

// marshall returns error with reminder
func (f errFormatter) marshall(req marshReq) ([]byte, error) {
return nil, errMissingFmtter
return nil, errMissingFormatter
}

// unmarshal returns error with reminder
func (f errFormatter) unmarshal(req unmarshReq) error {
return errMissingFmtter
return errMissingFormatter
}

type avroSchemaRegistryFormatter struct {
Expand Down
10 changes: 5 additions & 5 deletions formatter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (

func TestNoopFormatter_Marshall_Unmarshal(t *testing.T) {
defer recoverThenFail(t)
fmtter := errFormatter{}
_, err := fmtter.marshall(marshReq{subject: "anything"})
require.ErrorIs(t, err, errMissingFmtter)
formatter := errFormatter{}
_, err := formatter.marshall(marshReq{subject: "anything"})
require.ErrorIs(t, err, errMissingFormatter)

var someInt int32
err = fmtter.unmarshal(unmarshReq{data: []byte("test"), target: &someInt})
require.ErrorIs(t, err, errMissingFmtter)
err = formatter.unmarshal(unmarshReq{data: []byte("test"), target: &someInt})
require.ErrorIs(t, err, errMissingFormatter)
}
2 changes: 1 addition & 1 deletion message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func Test_makeProducerMessageRaw(t *testing.T) {
hasHeaders bool
}{
{
name: "has fmtter with valid input, no key, no partition",
name: "has formatter with valid input, no key, no partition",
args: args{
serviceName: "concierge/test/test_group",
topic: "test_topic",
Expand Down
18 changes: 9 additions & 9 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type KReader struct {
topicConfig ConsumerTopicConfig
isClosed bool

fmtter kFormatter
formatter kFormatter

logger Logger
lifecycle LifecycleHooks
Expand Down Expand Up @@ -83,7 +83,7 @@ func newReader(args readerArgs) (*KReader, error) {
r := &KReader{
consumer: consumer,
topicConfig: topicConfig,
fmtter: formatter,
formatter: formatter,
logger: logger,
lifecycle: args.hooks,
tCommitMgr: newTopicCommitMgr(),
Expand All @@ -92,8 +92,8 @@ func newReader(args readerArgs) (*KReader, error) {
for _, opt := range args.opts {
opt(&s)
}
if s.fmtter != nil {
r.fmtter = s.fmtter
if s.formatter != nil {
r.formatter = s.formatter
}
return r, nil
}
Expand Down Expand Up @@ -226,7 +226,7 @@ func (r *KReader) mapMessage(_ context.Context, msg kafka.Message) *Message {
}
},
value: msg.Value,
fmt: r.fmtter,
fmt: r.formatter,
}
}

Expand Down Expand Up @@ -313,17 +313,17 @@ func getTopicName(topicName *string) string {
}

type ReaderSettings struct {
fmtter kFormatter
formatter kFormatter
}

// ReaderOption is a function that modify the KReader configurations
type ReaderOption func(*ReaderSettings)

// RFormatterOption sets the formatter for this reader
func RFormatterOption(fmtter Formatter) ReaderOption {
func RFormatterOption(formatter Formatter) ReaderOption {
return func(s *ReaderSettings) {
if fmtter != nil {
s.fmtter = zfmtShim{F: fmtter}
if formatter != nil {
s.formatter = zfmtShim{F: formatter}
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type KWriter struct {
mu sync.Mutex
producer KafkaProducer
topicConfig ProducerTopicConfig
fmtter kFormatter
formatter kFormatter
logger Logger
tracer trace.Tracer
p propagation.TextMapPropagator
Expand Down Expand Up @@ -90,7 +90,7 @@ func newWriter(args writerArgs) (*KWriter, error) {
w := &KWriter{
producer: p,
topicConfig: topicConfig,
fmtter: formatter,
formatter: formatter,
logger: args.l,
tracer: args.t,
p: args.p,
Expand All @@ -101,7 +101,7 @@ func newWriter(args writerArgs) (*KWriter, error) {
opt(&s)
}
if s.f != nil {
w.fmtter = s.f
w.formatter = s.f
}
return w, nil
}
Expand Down Expand Up @@ -229,10 +229,10 @@ func (w *KWriter) write(ctx context.Context, msg keyValuePair, opts ...WriteOpti
}

func (w *KWriter) marshall(_ context.Context, value any, schema string) ([]byte, error) {
if w.fmtter == nil {
if w.formatter == nil {
return nil, errors.New("formatter or confluent formatter is not supplied to produce kafka message")
}
return w.fmtter.marshall(marshReq{
return w.formatter.marshall(marshReq{
topic: w.topicConfig.Topic,
subject: value,
schema: schema,
Expand Down
34 changes: 17 additions & 17 deletions writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ func TestWriter_Write(t *testing.T) {
defer recoverThenFail(t)

w := &KWriter{
producer: tt.fields.Producer,
fmtter: tt.fields.fmt,
logger: NoopLogger{},
tracer: noop.TracerProvider{}.Tracer(""),
p: propagation.TraceContext{},
producer: tt.fields.Producer,
formatter: tt.fields.fmt,
logger: NoopLogger{},
tracer: noop.TracerProvider{}.Tracer(""),
p: propagation.TraceContext{},
}
got, err := w.Write(tt.args.ctx, tt.args.value)
if tt.wantErr {
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestWriter_WriteKey(t *testing.T) {
w := &KWriter{
producer: tt.fields.Producer,
topicConfig: tt.fields.conf,
fmtter: zfmtShim{tt.fields.fmt},
formatter: zfmtShim{tt.fields.fmt},
isClosed: tt.fields.isClosed,
logger: NoopLogger{},
tracer: noop.TracerProvider{}.Tracer(""),
Expand Down Expand Up @@ -202,7 +202,7 @@ func TestWriter_WriteKeyReturnsImmediateError(t *testing.T) {
producer: p,
topicConfig: ProducerTopicConfig{},
isClosed: false,
fmtter: zfmtShim{&zfmt.JSONFormatter{}},
formatter: zfmtShim{&zfmt.JSONFormatter{}},
logger: NoopLogger{},
tracer: noop.TracerProvider{}.Tracer(""),
p: propagation.TraceContext{},
Expand Down Expand Up @@ -240,7 +240,7 @@ func TestWriter_WritesMetrics(t *testing.T) {
producer: p,
topicConfig: ProducerTopicConfig{Topic: "orange"},
lifecycle: hooks,
fmtter: zfmtShim{&zfmt.StringFormatter{}},
formatter: zfmtShim{&zfmt.StringFormatter{}},
logger: NoopLogger{},
tracer: noop.TracerProvider{}.Tracer(""),
p: propagation.TraceContext{},
Expand Down Expand Up @@ -303,11 +303,11 @@ func TestWriter_WriteSpecialCase(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
w := &KWriter{
producer: tt.fields.Producer,
fmtter: zfmtShim{tt.fields.fmt},
logger: NoopLogger{},
tracer: noop.TracerProvider{}.Tracer(""),
p: propagation.TraceContext{},
producer: tt.fields.Producer,
formatter: zfmtShim{tt.fields.fmt},
logger: NoopLogger{},
tracer: noop.TracerProvider{}.Tracer(""),
p: propagation.TraceContext{},
}
got, err := w.Write(tt.args.ctx, tt.args.value)
if tt.wantErr {
Expand Down Expand Up @@ -348,7 +348,7 @@ func TestWriter_PreWriteLifecycleHookCanAugmentHeaders(t *testing.T) {
producer: p,
topicConfig: ProducerTopicConfig{Topic: "orange"},
lifecycle: hooks,
fmtter: zfmtShim{&zfmt.StringFormatter{}},
formatter: zfmtShim{&zfmt.StringFormatter{}},
logger: NoopLogger{},
tracer: noop.TracerProvider{}.Tracer(""),
p: propagation.TraceContext{},
Expand Down Expand Up @@ -377,7 +377,7 @@ func TestWriter_WithHeadersWriteOptionCanAugmentHeaders(t *testing.T) {
wr := &KWriter{
producer: p,
topicConfig: ProducerTopicConfig{Topic: "orange"},
fmtter: zfmtShim{&zfmt.StringFormatter{}},
formatter: zfmtShim{&zfmt.StringFormatter{}},
logger: NoopLogger{},
tracer: noop.TracerProvider{}.Tracer(""),
p: propagation.TraceContext{},
Expand Down Expand Up @@ -435,7 +435,7 @@ func TestWriter_PreWriteLifecycleHookErrorDoesntHaltProcessing(t *testing.T) {
producer: p,
topicConfig: ProducerTopicConfig{Topic: "orange"},
lifecycle: hooks,
fmtter: zfmtShim{&zfmt.StringFormatter{}},
formatter: zfmtShim{&zfmt.StringFormatter{}},
logger: NoopLogger{},
tracer: noop.TracerProvider{}.Tracer(""),
p: propagation.TraceContext{},
Expand Down Expand Up @@ -566,7 +566,7 @@ func Test_newWriter(t *testing.T) {
func TestWriter_WithOptions(t *testing.T) {
recoverThenFail(t)
w := &KWriter{}
require.Nil(t, w.fmtter, "expected nil formatter")
require.Nil(t, w.formatter, "expected nil formatter")

settings := WriterSettings{}
WFormatterOption(&zfmt.StringFormatter{})(&settings)
Expand Down

0 comments on commit 18ef0e3

Please sign in to comment.