diff --git a/Makefile b/Makefile index 7dd98d8..bb2a6f0 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,7 @@ # Directories containing independent Go modules. MODULE_DIRS = . GOLANGCI_VERSION=1.61.0 +AVRO_CMD_PATH=github.com/hamba/avro/v2/cmd/avrogen@v2.26.0 # Sets up kafka broker using docker compose @@ -47,10 +48,13 @@ golangci-lint: gen: protoc-exists cd test/evolution; protoc --proto_path=. --go_out=./ ./schema_1.proto cd test/evolution; protoc --proto_path=. --go_out=./ ./schema_2.proto - go run github.com/heetch/avro/cmd/avrogo@v0.4.5 -p main -d ./example/producer_avro ./example/producer_avro/dummy_event.avsc - go run github.com/heetch/avro/cmd/avrogo@v0.4.5 -p main -d ./example/worker_avro ./example/worker_avro/dummy_event.avsc - go run github.com/heetch/avro/cmd/avrogo@v0.4.5 -p avro1 -d ./test/evolution/avro1 ./test/evolution/schema_1.avsc - go run github.com/heetch/avro/cmd/avrogo@v0.4.5 -p avro2 -d ./test/evolution/avro2 ./test/evolution/schema_2.avsc + go run ${AVRO_CMD_PATH} -pkg main -o ./example/producer_avro/event_gen.go ./example/producer_avro/event.avsc + go run ${AVRO_CMD_PATH} -pkg main -o ./example/worker_avro/event_gen.go ./example/worker_avro/event.avsc + mkdir -p ./test/evolution/avro1 + mkdir -p ./test/evolution/avro2 + go run ${AVRO_CMD_PATH} -pkg avro1 -o ./test/evolution/avro1/schema_1_gen.go ./test/evolution/schema_1.avsc + go run ${AVRO_CMD_PATH} -pkg avro2 -o ./test/evolution/avro2/schema_2_gen.go ./test/evolution/schema_2.avsc + go run github.com/heetch/avro/cmd/avrogo@v0.4.5 -p avro1 -d ./test/evolution/avro1x ./test/evolution/schema_1.avsc # a forced dependency which fails (and prints) if `avro-tools` isn't installed .PHONY: protoc-exists diff --git a/changelog.md b/changelog.md index 2f0f53c..46a8c8a 100644 --- a/changelog.md +++ b/changelog.md @@ -5,7 +5,9 @@ All notable changes to this project will be documented in this file. This project adheres to Semantic Versioning. ## 1.2.0 (Sep 23, 2024) -1. Update to allow subject name specification (not just TopicNameStrategy, which default ) + +1. Update to allow subject name specification (not just TopicNameStrategy) +1. Updated `avro_schema_registry` formatter deserialization to require passed in schema (less susceptible to errors from inferred target schema) ## 1.1.0 (Sep 23, 2024) diff --git a/config.go b/config.go index 9e53bfc..538033b 100644 --- a/config.go +++ b/config.go @@ -235,6 +235,9 @@ type SerializationConfig struct { } type DeserializationConfig struct { + // Schema is used exclusively by the avro schema registry formatter today. It's necessary to provide proper schema evolution properties + // expected by typical use cases. + Schema string } func getDefaultConsumerTopicConfig(topicConfig *ConsumerTopicConfig) error { diff --git a/example/producer_avro/dummy_event_gen.go b/example/producer_avro/dummy_event_gen.go deleted file mode 100644 index 083961e..0000000 --- a/example/producer_avro/dummy_event_gen.go +++ /dev/null @@ -1,31 +0,0 @@ -// Code generated by avrogen. DO NOT EDIT. - -package main - -import ( - "github.com/heetch/avro/avrotypegen" -) - -type DummyEvent struct { - IntField int - DoubleField float64 - StringField string - BoolField bool - BytesField []byte - NewFieldWithDefault *string - NewFieldWithDefault2 *string -} - -// AvroRecord implements the avro.AvroRecord interface. -func (DummyEvent) AvroRecord() avrotypegen.RecordInfo { - return avrotypegen.RecordInfo{ - Schema: `{"fields":[{"name":"IntField","type":"int"},{"name":"DoubleField","type":"double"},{"name":"StringField","type":"string"},{"name":"BoolField","type":"boolean"},{"name":"BytesField","type":"bytes"},{"default":null,"name":"NewFieldWithDefault","type":["null","string"]},{"default":null,"name":"NewFieldWithDefault2","type":["null","string"]}],"name":"DummyEvent","type":"record"}`, - Required: []bool{ - 0: true, - 1: true, - 2: true, - 3: true, - 4: true, - }, - } -} diff --git a/example/producer_avro/dummy_event.avsc b/example/producer_avro/event.avsc similarity index 100% rename from example/producer_avro/dummy_event.avsc rename to example/producer_avro/event.avsc diff --git a/example/producer_avro/event_gen.go b/example/producer_avro/event_gen.go new file mode 100644 index 0000000..50559e3 --- /dev/null +++ b/example/producer_avro/event_gen.go @@ -0,0 +1,14 @@ +package main + +// Code generated by avro/gen. DO NOT EDIT. + +// DummyEvent is a generated struct. +type DummyEvent struct { + IntField int `avro:"IntField"` + DoubleField float64 `avro:"DoubleField"` + StringField string `avro:"StringField"` + BoolField bool `avro:"BoolField"` + BytesField []byte `avro:"BytesField"` + NewFieldWithDefault *string `avro:"NewFieldWithDefault"` + NewFieldWithDefault2 *string `avro:"NewFieldWithDefault2"` +} diff --git a/example/producer_avro/go.mod b/example/producer_avro/go.mod index f6b2a24..7e0b337 100644 --- a/example/producer_avro/go.mod +++ b/example/producer_avro/go.mod @@ -8,19 +8,29 @@ require github.com/zillow/zkafka v1.0.0 require ( github.com/actgardner/gogen-avro/v10 v10.2.1 // indirect + github.com/bahlo/generic-list-go v0.2.0 // indirect + github.com/bufbuild/protocompile v0.8.0 // indirect + github.com/buger/jsonparser v1.1.1 // indirect github.com/confluentinc/confluent-kafka-go/v2 v2.5.3 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/uuid v1.6.0 // indirect github.com/hamba/avro/v2 v2.24.0 // indirect github.com/heetch/avro v0.4.5 // indirect + github.com/invopop/jsonschema v0.12.0 // indirect + github.com/jhump/protoreflect v1.15.6 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/mailru/easyjson v0.7.7 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/santhosh-tekuri/jsonschema/v5 v5.3.0 // indirect github.com/sony/gobreaker v1.0.0 // indirect + github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect github.com/zillow/zfmt v1.0.1 // indirect go.opentelemetry.io/otel v1.30.0 // indirect go.opentelemetry.io/otel/trace v1.30.0 // indirect golang.org/x/sync v0.8.0 // indirect + google.golang.org/genproto v0.0.0-20240325203815-454cdb8f5daa // indirect google.golang.org/protobuf v1.34.2 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/example/producer_avro/go.sum b/example/producer_avro/go.sum index dafa10c..4746bb1 100644 --- a/example/producer_avro/go.sum +++ b/example/producer_avro/go.sum @@ -1,3 +1,4 @@ +cloud.google.com/go v0.112.1 h1:uJSeirPke5UNZHIb4SxfZklVSiWWVqW4oXlETwZziwM= cloud.google.com/go/compute v1.25.1 h1:ZRpHJedLtTpKgr3RV1Fx23NuaAEN1Zfx9hw1u4aJdjU= cloud.google.com/go/compute v1.25.1/go.mod h1:oopOIR53ly6viBYxaDhBfJwzUAxf1zE//uf3IB011ls= cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY= @@ -62,10 +63,16 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.28.6 h1:cwIxeBttqPN3qkaAjcEcsh8NYr8n github.com/aws/aws-sdk-go-v2/service/sts v1.28.6/go.mod h1:FZf1/nKNEkHdGGJP/cI2MoIMquumuRK6ol3QQJNDxmw= github.com/aws/smithy-go v1.20.2 h1:tbp628ireGtzcHDDmLT/6ADHidqnwgF57XOXZe6tp4Q= github.com/aws/smithy-go v1.20.2/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= +github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= +github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/bufbuild/protocompile v0.8.0 h1:9Kp1q6OkS9L4nM3FYbr8vlJnEwtbpDPQlQOVXfR+78s= +github.com/bufbuild/protocompile v0.8.0/go.mod h1:+Etjg4guZoAqzVk2czwEQP12yaxLJ8DxuqCJ9qHdH94= github.com/buger/goterm v1.0.4 h1:Z9YvGmOih81P0FbVtEYTFF6YsSgxSUKEhf/f9bTMXbY= github.com/buger/goterm v1.0.4/go.mod h1:HiFWV3xnkolgrBV3mY8m0X0Pumt4zg4QhbdOzQtB8tE= +github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs= +github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= github.com/cenkalti/backoff/v3 v3.0.0 h1:ske+9nBpD9qZsTBoF41nW5L+AIuFBKMeze18XQ3eG1c= github.com/cenkalti/backoff/v3 v3.0.0/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= @@ -74,8 +81,7 @@ github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/compose-spec/compose-go/v2 v2.1.0 h1:qdW2qISQlCQG8v1O2TChcdxgAWTUGgUX/CPSO+ES9+E= github.com/compose-spec/compose-go/v2 v2.1.0/go.mod h1:bEPizBkIojlQ20pi2vNluBa58tevvj0Y18oUSHPyfdc= -github.com/confluentinc/confluent-kafka-go/v2 v2.5.0 h1:PM18lA9g6u6Qcz06DpXmGRlxXTvWlHqnlAkQi1chPUo= -github.com/confluentinc/confluent-kafka-go/v2 v2.5.0/go.mod h1:Hyo+IIQ/tmsfkOcRP8T6VlSeOW3T33v0Me8Xvq4u90Y= +github.com/confluentinc/confluent-kafka-go/v2 v2.5.3 h1:3AAMHvhiv3d4ajW4fSnZw+liipffhtunkEWz23zTXXM= github.com/confluentinc/confluent-kafka-go/v2 v2.5.3/go.mod h1:QxYLPRKR1MVlkXCCjzjjrpXb0VyFNfVaZXi0obZykJ0= github.com/containerd/console v1.0.4 h1:F2g4+oChYvBTsASRTz8NP6iIAi97J3TtSAsLbIFn4ro= github.com/containerd/console v1.0.4/go.mod h1:YynlIjWYF8myEu6sdkwKIvGQq+cOckRm6So2avqoYAk= @@ -187,8 +193,7 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 h1:+9834+KizmvFV7pXQGSXQTsaW github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2m2hlwIgKw+rp3sdCBRoJY+30Y= github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms= github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg= -github.com/hamba/avro/v2 v2.20.1 h1:3WByQiVn7wT7d27WQq6pvBRC00FVOrniP6u67FLA/2E= -github.com/hamba/avro/v2 v2.20.1/go.mod h1:xHiKXbISpb3Ovc809XdzWow+XGTn+Oyf/F9aZbTLAig= +github.com/hamba/avro/v2 v2.24.0 h1:axTlaYDkcSY0dVekRSy8cdrsj5MG86WqosUQacKCids= github.com/hamba/avro/v2 v2.24.0/go.mod h1:7vDfy/2+kYCE8WUHoj2et59GTv0ap7ptktMXu0QHePI= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -220,6 +225,10 @@ github.com/in-toto/in-toto-golang v0.5.0 h1:hb8bgwr0M2hGdDsLjkJ3ZqJ8JFLL/tgYdAxF github.com/in-toto/in-toto-golang v0.5.0/go.mod h1:/Rq0IZHLV7Ku5gielPT4wPHJfH1GdHMCq8+WPxw8/BE= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/invopop/jsonschema v0.12.0 h1:6ovsNSuvn9wEQVOyc72aycBMVQFKz7cPdMJn10CvzRI= +github.com/invopop/jsonschema v0.12.0/go.mod h1:ffZ5Km5SWWRAIN6wbDXItl95euhFz2uON45H2qjYt+0= +github.com/jhump/protoreflect v1.15.6 h1:WMYJbw2Wo+KOWwZFvgY0jMoVHM6i4XIvRs2RcBj5VmI= +github.com/jhump/protoreflect v1.15.6/go.mod h1:jCHoyYQIJnaabEYnbGwyo9hUqfyUMTbJw/tAut5t97E= github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4= github.com/jonboulle/clockwork v0.4.0/go.mod h1:xgRqUGwRcjKCO1vbZUEtSLrqKoPSsUpK7fnezOII0kc= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= @@ -228,8 +237,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= -github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg= -github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -327,6 +336,8 @@ github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjR github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/ryanuber/go-glob v1.0.0 h1:iQh3xXAumdQ+4Ufa5b25cRpC5TYKlno6hsv6Cb3pkBk= github.com/ryanuber/go-glob v1.0.0/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc= +github.com/santhosh-tekuri/jsonschema/v5 v5.3.0 h1:uIkTLo0AGRc8l7h5l9r+GcYi9qfVPt6lD4/bhmzfiKo= +github.com/santhosh-tekuri/jsonschema/v5 v5.3.0/go.mod h1:FKdcjfQW6rpZSnxxUvEA5H/cDPdvJ/SZJQLWWXWGrZ0= github.com/secure-systems-lab/go-securesystemslib v0.4.0 h1:b23VGrQhTA8cN2CbBw7/FulN9fTtqYUdS5+Oxzt+DUE= github.com/secure-systems-lab/go-securesystemslib v0.4.0/go.mod h1:FGBZgq2tXWICsxWQW1msNf49F0Pf2Op5Htayx335Qbs= github.com/serialx/hashring v0.0.0-20200727003509-22c0c7ab6b1b h1:h+3JX2VoWTFuyQEo87pStk/a99dzIO1mM9KxIyLPGTU= @@ -379,6 +390,8 @@ github.com/tonistiigi/units v0.0.0-20180711220420-6950e57a87ea h1:SXhTLE6pb6eld/ github.com/tonistiigi/units v0.0.0-20180711220420-6950e57a87ea/go.mod h1:WPnis/6cRcDZSUvVmezrxJPkiO87ThFYsoUiMwWNDJk= github.com/tonistiigi/vt100 v0.0.0-20230623042737-f9a4f7ef6531 h1:Y/M5lygoNPKwVNLMPXgVfsRT40CSFKXCxuU8LoHySjs= github.com/tonistiigi/vt100 v0.0.0-20230623042737-f9a4f7ef6531/go.mod h1:ulncasL3N9uLrVann0m+CDlJKWsIAP34MPcOJF6VRvc= +github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc= +github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw= github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo= github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHovont7NscjpAxXsDA8S8BMYve8Y5+7cuRE7R0= @@ -399,8 +412,7 @@ go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0. go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.46.1/go.mod h1:GnOaBaFQ2we3b9AGWJpsBa7v1S5RlQzlC3O7dRMxZhM= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw= -go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= -go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= +go.opentelemetry.io/otel v1.30.0 h1:F2t8sK4qf1fAmY9ua4ohFS/K+FUuOPemHUIXHtktrts= go.opentelemetry.io/otel v1.30.0/go.mod h1:tFw4Br9b7fOS+uEao81PJjVMjW/5fvNCbpsDIXqP0pc= go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.42.0 h1:ZtfnDL+tUrs1F0Pzfwbg2d59Gru9NCH3bgSHBM6LDwU= go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.42.0/go.mod h1:hG4Fj/y8TR/tlEDREo8tWstl9fO9gcFkn4xrx0Io8xU= @@ -416,14 +428,13 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.21.0 h1:digkE go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.21.0/go.mod h1:/OpE/y70qVkndM0TrxT4KBoN3RsFZP0QaofcfYrj76I= go.opentelemetry.io/otel/exporters/prometheus v0.42.0 h1:jwV9iQdvp38fxXi8ZC+lNpxjK16MRcZlpDYvbuO1FiA= go.opentelemetry.io/otel/exporters/prometheus v0.42.0/go.mod h1:f3bYiqNqhoPxkvI2LrXqQVC546K7BuRDL/kKuxkujhA= -go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q= -go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s= +go.opentelemetry.io/otel/metric v1.30.0 h1:4xNulvn9gjzo4hjg+wzIKG7iNFEaBMX00Qd4QIZs7+w= +go.opentelemetry.io/otel/metric v1.30.0/go.mod h1:aXTfST94tswhWEb+5QjlSqG+cZlmyXy/u8jFpor3WqQ= go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw= go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg= go.opentelemetry.io/otel/sdk/metric v1.21.0 h1:smhI5oD714d6jHE6Tie36fPx4WDFIg+Y6RfAY4ICcR0= go.opentelemetry.io/otel/sdk/metric v1.21.0/go.mod h1:FJ8RAsoPGv/wYMgBdUJXOm+6pzFY3YdljnXtv1SBE8Q= -go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= -go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= +go.opentelemetry.io/otel/trace v1.30.0 h1:7UBkkYzeg3C7kQX8VAidWh2biiQbtAKjyIML8dQ9wmc= go.opentelemetry.io/otel/trace v1.30.0/go.mod h1:5EyKqTzzmyqB9bwtCCq6pDLktPK6fmGf/Dph+8VI02o= go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I= go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= @@ -437,8 +448,7 @@ golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/oauth2 v0.17.0 h1:6m3ZPmLEFdVxKKWnKq4VqZ60gutO35zm+zrAHVmHyDQ= golang.org/x/oauth2 v0.17.0/go.mod h1:OzPDGQiuQMguemayvdylqddI7qcD9lnSDb+1FiwQ5HA= -golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= -golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= @@ -464,6 +474,9 @@ google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6h google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y= gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= diff --git a/example/producer_avro/main.go b/example/producer_avro/main.go index f9472a5..219b53f 100644 --- a/example/producer_avro/main.go +++ b/example/producer_avro/main.go @@ -10,8 +10,8 @@ import ( "github.com/zillow/zkafka" ) -//go:embed dummy_event.avsc -var dummyEventSchema string +//go:embed event.avsc +var eventSchema string func main() { ctx := context.Background() @@ -29,9 +29,7 @@ func main() { AutoRegisterSchemas: true, // When using avro schema registry, you must specify the schema. In this case, // the schema used to generate the golang type is used. - // The heetch generated struct also embeds the schema as well (and isn't lossy like some of the - // other generative solutions. For example, one lib didn't include default values), so that could be used as well. - Schema: dummyEventSchema, + Schema: eventSchema, }, }, }) @@ -40,6 +38,9 @@ func main() { log.Panic(err) } for { + // The DummyEvent type is generated using `hamba/avro` (see make). This is the preferred generation for + // `formatter=zkafka.AvroSchemaRegistry` because the underlying serializer uses the avro tags on the generated struct + // // to properly connect the schema and struct event := DummyEvent{ IntField: rand.Intn(100), StringField: randomNames[rand.Intn(len(randomNames))], diff --git a/example/worker_avro/dummy_event_gen.go b/example/worker_avro/dummy_event_gen.go deleted file mode 100644 index e5245e1..0000000 --- a/example/worker_avro/dummy_event_gen.go +++ /dev/null @@ -1,29 +0,0 @@ -// Code generated by avrogen. DO NOT EDIT. - -package main - -import ( - "github.com/heetch/avro/avrotypegen" -) - -type DummyEvent struct { - IntField int - DoubleField float64 - StringField string - BoolField bool - BytesField []byte -} - -// AvroRecord implements the avro.AvroRecord interface. -func (DummyEvent) AvroRecord() avrotypegen.RecordInfo { - return avrotypegen.RecordInfo{ - Schema: `{"fields":[{"name":"IntField","type":"int"},{"name":"DoubleField","type":"double"},{"name":"StringField","type":"string"},{"name":"BoolField","type":"boolean"},{"name":"BytesField","type":"bytes"}],"name":"DummyEvent","type":"record"}`, - Required: []bool{ - 0: true, - 1: true, - 2: true, - 3: true, - 4: true, - }, - } -} diff --git a/example/worker_avro/dummy_event.avsc b/example/worker_avro/event.avsc similarity index 100% rename from example/worker_avro/dummy_event.avsc rename to example/worker_avro/event.avsc diff --git a/example/worker_avro/event_gen.go b/example/worker_avro/event_gen.go new file mode 100644 index 0000000..a6fed57 --- /dev/null +++ b/example/worker_avro/event_gen.go @@ -0,0 +1,12 @@ +package main + +// Code generated by avro/gen. DO NOT EDIT. + +// DummyEvent is a generated struct. +type DummyEvent struct { + IntField int `avro:"IntField"` + DoubleField float64 `avro:"DoubleField"` + StringField string `avro:"StringField"` + BoolField bool `avro:"BoolField"` + BytesField []byte `avro:"BytesField"` +} diff --git a/example/worker_avro/go.mod b/example/worker_avro/go.mod index 605f918..ca246bd 100644 --- a/example/worker_avro/go.mod +++ b/example/worker_avro/go.mod @@ -6,7 +6,6 @@ replace github.com/zillow/zkafka v1.0.0 => ../.. require ( github.com/google/uuid v1.6.0 - github.com/heetch/avro v0.4.5 github.com/zillow/zkafka v1.0.0 ) @@ -18,6 +17,7 @@ require ( github.com/confluentinc/confluent-kafka-go/v2 v2.5.3 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/hamba/avro/v2 v2.24.0 // indirect + github.com/heetch/avro v0.4.5 // indirect github.com/invopop/jsonschema v0.12.0 // indirect github.com/jhump/protoreflect v1.15.6 // indirect github.com/json-iterator/go v1.1.12 // indirect diff --git a/example/worker_avro/go.sum b/example/worker_avro/go.sum index ddd3399..4746bb1 100644 --- a/example/worker_avro/go.sum +++ b/example/worker_avro/go.sum @@ -81,8 +81,7 @@ github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/compose-spec/compose-go/v2 v2.1.0 h1:qdW2qISQlCQG8v1O2TChcdxgAWTUGgUX/CPSO+ES9+E= github.com/compose-spec/compose-go/v2 v2.1.0/go.mod h1:bEPizBkIojlQ20pi2vNluBa58tevvj0Y18oUSHPyfdc= -github.com/confluentinc/confluent-kafka-go/v2 v2.5.0 h1:PM18lA9g6u6Qcz06DpXmGRlxXTvWlHqnlAkQi1chPUo= -github.com/confluentinc/confluent-kafka-go/v2 v2.5.0/go.mod h1:Hyo+IIQ/tmsfkOcRP8T6VlSeOW3T33v0Me8Xvq4u90Y= +github.com/confluentinc/confluent-kafka-go/v2 v2.5.3 h1:3AAMHvhiv3d4ajW4fSnZw+liipffhtunkEWz23zTXXM= github.com/confluentinc/confluent-kafka-go/v2 v2.5.3/go.mod h1:QxYLPRKR1MVlkXCCjzjjrpXb0VyFNfVaZXi0obZykJ0= github.com/containerd/console v1.0.4 h1:F2g4+oChYvBTsASRTz8NP6iIAi97J3TtSAsLbIFn4ro= github.com/containerd/console v1.0.4/go.mod h1:YynlIjWYF8myEu6sdkwKIvGQq+cOckRm6So2avqoYAk= @@ -194,8 +193,7 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 h1:+9834+KizmvFV7pXQGSXQTsaW github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2m2hlwIgKw+rp3sdCBRoJY+30Y= github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms= github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg= -github.com/hamba/avro/v2 v2.20.1 h1:3WByQiVn7wT7d27WQq6pvBRC00FVOrniP6u67FLA/2E= -github.com/hamba/avro/v2 v2.20.1/go.mod h1:xHiKXbISpb3Ovc809XdzWow+XGTn+Oyf/F9aZbTLAig= +github.com/hamba/avro/v2 v2.24.0 h1:axTlaYDkcSY0dVekRSy8cdrsj5MG86WqosUQacKCids= github.com/hamba/avro/v2 v2.24.0/go.mod h1:7vDfy/2+kYCE8WUHoj2et59GTv0ap7ptktMXu0QHePI= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -239,8 +237,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= -github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg= -github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -414,8 +412,7 @@ go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0. go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.46.1/go.mod h1:GnOaBaFQ2we3b9AGWJpsBa7v1S5RlQzlC3O7dRMxZhM= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw= -go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= -go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= +go.opentelemetry.io/otel v1.30.0 h1:F2t8sK4qf1fAmY9ua4ohFS/K+FUuOPemHUIXHtktrts= go.opentelemetry.io/otel v1.30.0/go.mod h1:tFw4Br9b7fOS+uEao81PJjVMjW/5fvNCbpsDIXqP0pc= go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.42.0 h1:ZtfnDL+tUrs1F0Pzfwbg2d59Gru9NCH3bgSHBM6LDwU= go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.42.0/go.mod h1:hG4Fj/y8TR/tlEDREo8tWstl9fO9gcFkn4xrx0Io8xU= @@ -431,14 +428,13 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.21.0 h1:digkE go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.21.0/go.mod h1:/OpE/y70qVkndM0TrxT4KBoN3RsFZP0QaofcfYrj76I= go.opentelemetry.io/otel/exporters/prometheus v0.42.0 h1:jwV9iQdvp38fxXi8ZC+lNpxjK16MRcZlpDYvbuO1FiA= go.opentelemetry.io/otel/exporters/prometheus v0.42.0/go.mod h1:f3bYiqNqhoPxkvI2LrXqQVC546K7BuRDL/kKuxkujhA= -go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q= -go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s= +go.opentelemetry.io/otel/metric v1.30.0 h1:4xNulvn9gjzo4hjg+wzIKG7iNFEaBMX00Qd4QIZs7+w= +go.opentelemetry.io/otel/metric v1.30.0/go.mod h1:aXTfST94tswhWEb+5QjlSqG+cZlmyXy/u8jFpor3WqQ= go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw= go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg= go.opentelemetry.io/otel/sdk/metric v1.21.0 h1:smhI5oD714d6jHE6Tie36fPx4WDFIg+Y6RfAY4ICcR0= go.opentelemetry.io/otel/sdk/metric v1.21.0/go.mod h1:FJ8RAsoPGv/wYMgBdUJXOm+6pzFY3YdljnXtv1SBE8Q= -go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= -go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= +go.opentelemetry.io/otel/trace v1.30.0 h1:7UBkkYzeg3C7kQX8VAidWh2biiQbtAKjyIML8dQ9wmc= go.opentelemetry.io/otel/trace v1.30.0/go.mod h1:5EyKqTzzmyqB9bwtCCq6pDLktPK6fmGf/Dph+8VI02o= go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I= go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= @@ -452,8 +448,7 @@ golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/oauth2 v0.17.0 h1:6m3ZPmLEFdVxKKWnKq4VqZ60gutO35zm+zrAHVmHyDQ= golang.org/x/oauth2 v0.17.0/go.mod h1:OzPDGQiuQMguemayvdylqddI7qcD9lnSDb+1FiwQ5HA= -golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= -golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= diff --git a/example/worker_avro/main.go b/example/worker_avro/main.go index 66c185e..8050c49 100644 --- a/example/worker_avro/main.go +++ b/example/worker_avro/main.go @@ -13,6 +13,9 @@ import ( "github.com/zillow/zkafka" ) +//go:embed event.avsc +var eventSchema string + // Demonstrates reading from a topic via the zkafka.Work struct which is more convenient, typically, than using the consumer directly func main() { ctx := context.Background() @@ -40,8 +43,12 @@ func main() { // json, proto, avro, etc. Formatter: zkafka.AvroSchemaRegistry, SchemaRegistry: zkafka.SchemaRegistryConfig{ - URL: "http://localhost:8081", - Deserialization: zkafka.DeserializationConfig{}, + URL: "http://localhost:8081", + Deserialization: zkafka.DeserializationConfig{ + // When using avro schema registry, you must specify the schema. In this case, + // the schema used to generate the golang type is used. + Schema: eventSchema, + }, }, AdditionalProps: map[string]any{ // only important the first time a consumer group connects. Subsequent connections will start @@ -73,6 +80,10 @@ func main() { func Process(_ context.Context, msg *zkafka.Message) error { // sleep to simulate random amount of work time.Sleep(100 * time.Millisecond) + + // The DummyEvent type is generated using `hamba/avro` (see make). This is the preferred generation for + // `formatter=zkafka.AvroSchemaRegistry` because the underlying deserializer uses the avro tags on the generated struct + // to properly connect the schema and struct event := DummyEvent{} err := msg.Decode(&event) if err != nil { diff --git a/formatter.go b/formatter.go index ea2b8fb..b8ba118 100644 --- a/formatter.go +++ b/formatter.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" + "github.com/hamba/avro/v2" "github.com/zillow/zfmt" ) @@ -35,10 +36,10 @@ type Formatter interface { type marshReq struct { // topic is the kafka topic being written to topic string - // subject is the data to be marshalled - subject any + // v is the data to be marshalled + v any // schema is currently only used for avro schematizations. It is necessary, - // because the confluent implementation reflects on the subject to get the schema to use for + // because the confluent implementation reflects on the v to get the schema to use for // communicating with schema-registry and backward compatible evolutions fail beause if dataloss during reflection. // For example, if a field has a default value, the reflection doesn't pick this up schema string @@ -51,6 +52,11 @@ type unmarshReq struct { data []byte // target is the stuct which is to be hydrated by the contents of data target any + // schema is currently only used for avro schematizations. It is necessary, + // because the confluent implementation reflects on the subject to get the schema to use for + // communicating with schema-registry and backward compatible evolutions fail beause if dataloss during reflection. + // For example, if a field has a default value, the reflection doesn't pick this up + schema string } var _ kFormatter = (*avroSchemaRegistryFormatter)(nil) @@ -70,7 +76,7 @@ type zfmtShim struct { } func (f zfmtShim) marshall(req marshReq) ([]byte, error) { - return f.F.Marshall(req.subject) + return f.F.Marshall(req.v) } func (f zfmtShim) unmarshal(req unmarshReq) error { @@ -93,7 +99,6 @@ func (f errFormatter) unmarshal(_ unmarshReq) error { type avroSchemaRegistryFormatter struct { afmt avroFmt - f zfmt.SchematizedAvroFormatter } func newAvroSchemaRegistryFormatter(afmt avroFmt) (avroSchemaRegistryFormatter, error) { @@ -102,6 +107,10 @@ func newAvroSchemaRegistryFormatter(afmt avroFmt) (avroSchemaRegistryFormatter, }, nil } +// marshall looks a subject's schema (id) so that it can prefix the eventual message payload. +// A schema must be provided and hamba/avro is used in conjunction with this schema to marshall they payload. +// Structs generated using hamba/avro work best, since they provide avro tags which handles casing +// which can lead to errors otherwise. func (f avroSchemaRegistryFormatter) marshall(req marshReq) ([]byte, error) { if req.schema == "" { return nil, errors.New("avro schema is required for schema registry formatter") @@ -110,16 +119,56 @@ func (f avroSchemaRegistryFormatter) marshall(req marshReq) ([]byte, error) { if err != nil { return nil, fmt.Errorf("failed to get avro schema by id for topic %s: %w", req.topic, err) } - f.f.SchemaID = id - data, err := f.f.Marshall(req.subject) + avroSchema, err := avro.Parse(req.schema) if err != nil { - return nil, fmt.Errorf("failed to marshal avro schema for topic %s: %w", req.topic, err) + return nil, fmt.Errorf("failed to get schema from payload: %w", err) } - return data, nil -} - + msgBytes, err := avro.Marshal(avroSchema, req.v) + if err != nil { + return nil, fmt.Errorf("failed to marhall payload per avro schema: %w", err) + } + payload, err := f.afmt.ser.WriteBytes(id, msgBytes) + if err != nil { + return nil, fmt.Errorf("failed to prepend schema related bytes: %w", err) + } + return payload, nil +} + +// unmarshall looks up the schema based on the schemaID in the message payload (`dataSchema`). +// Additionally, a target schema is provided in the request (`targetSchema`). +// +// The 'targetSchema' and 'dataSchema' are resolved so that data written by the `dataSchema` may +// be read by the `targetSchema`. +// +// The avro.Deserializer has much of this functionality built in, other than being able to specify +// the `targetSchema` and instead infers the target schema from the target struct. This creates +// issues in some common use cases. func (f avroSchemaRegistryFormatter) unmarshal(req unmarshReq) error { - err := f.afmt.Deserialize(req.topic, req.data, req.target) + if req.schema == "" { + return errors.New("avro schema is required for schema registry formatter") + } + inInfo, err := f.afmt.deser.GetSchema(req.topic, req.data) + if err != nil { + return fmt.Errorf("failed to get schema from message payload: %w", err) + } + + // schema of data that exists on the wire, that is about to be marshalled into the schema of our target + dataSchema, err := avro.Parse(inInfo.Schema) + if err != nil { + return fmt.Errorf("failed to parse schema associated with message: %w", err) + } + + targetSchema, err := avro.Parse(req.schema) + if err != nil { + return fmt.Errorf("failed to parse schema : %w", err) + } + sc := avro.NewSchemaCompatibility() + resolvedSchema, err := sc.Resolve(dataSchema, targetSchema) + if err != nil { + return fmt.Errorf("failed to get schema from payload: %w", err) + } + + err = avro.Unmarshal(resolvedSchema, req.data[5:], req.target) if err != nil { return fmt.Errorf("failed to deserialize to confluent schema registry avro type: %w", err) } @@ -137,7 +186,7 @@ func newProtoSchemaRegistryFormatter(pfmt protoFmt) protoSchemaRegistryFormatter } func (f protoSchemaRegistryFormatter) marshall(req marshReq) ([]byte, error) { - msgBytes, err := f.pfmt.ser.Serialize(req.topic, req.subject) + msgBytes, err := f.pfmt.ser.Serialize(req.topic, req.v) if err != nil { return nil, fmt.Errorf("failed to proto serialize: %w", err) } @@ -162,7 +211,7 @@ func newJsonSchemaRegistryFormatter(jfmt jsonFmt) jsonSchemaRegistryFormatter { } func (f jsonSchemaRegistryFormatter) marshall(req marshReq) ([]byte, error) { - msgBytes, err := f.jfmt.ser.Serialize(req.topic, req.subject) + msgBytes, err := f.jfmt.ser.Serialize(req.topic, req.v) if err != nil { return nil, fmt.Errorf("failed to json schema serialize: %w", err) } diff --git a/formatter_test.go b/formatter_test.go index 8c464b1..2c4493e 100644 --- a/formatter_test.go +++ b/formatter_test.go @@ -9,7 +9,7 @@ import ( func TestNoopFormatter_Marshall_Unmarshal(t *testing.T) { defer recoverThenFail(t) formatter := errFormatter{} - _, err := formatter.marshall(marshReq{subject: "anything"}) + _, err := formatter.marshall(marshReq{v: "anything"}) require.ErrorIs(t, err, errMissingFormatter) var someInt int32 diff --git a/go.mod b/go.mod index fab3c63..8dff515 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/golang/mock v1.6.0 github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.6.0 + github.com/hamba/avro/v2 v2.24.0 github.com/heetch/avro v0.4.5 github.com/sony/gobreaker v1.0.0 github.com/stretchr/testify v1.9.0 @@ -26,7 +27,6 @@ require ( github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/golang/protobuf v1.5.4 // indirect - github.com/hamba/avro/v2 v2.24.0 // indirect github.com/invopop/jsonschema v0.12.0 // indirect github.com/jhump/protoreflect v1.15.6 // indirect github.com/json-iterator/go v1.1.12 // indirect diff --git a/message.go b/message.go index 6486ebd..3a95741 100644 --- a/message.go +++ b/message.go @@ -26,6 +26,7 @@ type Message struct { fmt kFormatter doneFunc func(ctx context.Context) doneOnce sync.Once + schema string } // DoneWithContext is used to alert that message processing has completed. @@ -63,6 +64,7 @@ func (m *Message) unmarshall(target any) error { topic: m.Topic, data: m.value, target: target, + schema: m.schema, }) } diff --git a/reader.go b/reader.go index 3ccd108..105f16a 100644 --- a/reader.go +++ b/reader.go @@ -225,8 +225,9 @@ func (r *KReader) mapMessage(_ context.Context, msg kafka.Message) *Message { r.logger.Errorw(ctx, "Error storing offsets", "topicName", topicName, "groupID", r.topicConfig.GroupID, "partition", partition, "offset", offset, "error", err) } }, - value: msg.Value, - fmt: r.formatter, + value: msg.Value, + fmt: r.formatter, + schema: r.topicConfig.SchemaRegistry.Deserialization.Schema, } } diff --git a/test/evolution/avro1/schema_1_gen.go b/test/evolution/avro1/schema_1_gen.go index 832022c..f26338b 100644 --- a/test/evolution/avro1/schema_1_gen.go +++ b/test/evolution/avro1/schema_1_gen.go @@ -1,29 +1,14 @@ -// Code generated by avrogen. DO NOT EDIT. - package avro1 +// Code generated by avro/gen. DO NOT EDIT. + import ( - "github.com/heetch/avro/avrotypegen" + "time" ) -type DummyEvent struct { - IntField int - DoubleField float64 - StringField string - BoolField bool - BytesField []byte -} - -// AvroRecord implements the avro.AvroRecord interface. -func (DummyEvent) AvroRecord() avrotypegen.RecordInfo { - return avrotypegen.RecordInfo{ - Schema: `{"fields":[{"name":"IntField","type":"int"},{"name":"DoubleField","type":"double"},{"name":"StringField","type":"string"},{"name":"BoolField","type":"boolean"},{"name":"BytesField","type":"bytes"}],"name":"DummyEvent","type":"record"}`, - Required: []bool{ - 0: true, - 1: true, - 2: true, - 3: true, - 4: true, - }, - } +// Event is a generated struct. +type Event struct { + ID string `avro:"id"` + DeliveredAtDateTimeUtc time.Time `avro:"deliveredAtDateTimeUtc"` + EventType string `avro:"eventType"` } diff --git a/test/evolution/avro1x/schema_1_gen.go b/test/evolution/avro1x/schema_1_gen.go new file mode 100644 index 0000000..e97ec5f --- /dev/null +++ b/test/evolution/avro1x/schema_1_gen.go @@ -0,0 +1,70 @@ +// Code generated by avrogen. DO NOT EDIT. + +package avro1 + +import ( + "fmt" + "strconv" + + "github.com/heetch/avro/avrotypegen" +) + +type Event struct { + Id string `json:"id"` + Deliveredatdatetimeutc int64 `json:"deliveredAtDateTimeUtc"` + Eventtype EventType `json:"eventType"` +} + +// AvroRecord implements the avro.AvroRecord interface. +func (Event) AvroRecord() avrotypegen.RecordInfo { + return avrotypegen.RecordInfo{ + Schema: `{"fields":[{"name":"id","type":"string"},{"name":"deliveredAtDateTimeUtc","type":{"logicalType":"timestamp-millis","type":"long"}},{"name":"eventType","type":{"default":"created","name":"EventType","symbols":["created","associated"],"type":"enum"}}],"name":"com.zillowgroup.Event","type":"record"}`, + Required: []bool{ + 0: true, + 1: true, + 2: true, + }, + } +} + +type EventType int + +const ( + EventTypeCreated EventType = iota + EventTypeAssociated +) + +var _EventType_strings = []string{ + "created", + "associated", +} + +// String returns the textual representation of EventType. +func (e EventType) String() string { + if e < 0 || int(e) >= len(_EventType_strings) { + return "EventType(" + strconv.FormatInt(int64(e), 10) + ")" + } + return _EventType_strings[e] +} + +// MarshalText implements encoding.TextMarshaler +// by returning the textual representation of EventType. +func (e EventType) MarshalText() ([]byte, error) { + if e < 0 || int(e) >= len(_EventType_strings) { + return nil, fmt.Errorf("EventType value %d is out of bounds", e) + } + return []byte(_EventType_strings[e]), nil +} + +// UnmarshalText implements encoding.TextUnmarshaler +// by expecting the textual representation of EventType. +func (e *EventType) UnmarshalText(data []byte) error { + // Note for future: this could be more efficient. + for i, s := range _EventType_strings { + if string(data) == s { + *e = EventType(i) + return nil + } + } + return fmt.Errorf("unknown value %q for EventType", data) +} diff --git a/test/evolution/avro2/schema_2_gen.go b/test/evolution/avro2/schema_2_gen.go index 13d289c..3a3e6d8 100644 --- a/test/evolution/avro2/schema_2_gen.go +++ b/test/evolution/avro2/schema_2_gen.go @@ -1,30 +1,20 @@ -// Code generated by avrogen. DO NOT EDIT. - package avro2 +// Code generated by avro/gen. DO NOT EDIT. + import ( - "github.com/heetch/avro/avrotypegen" + "time" ) -type DummyEvent struct { - IntField int - DoubleField float64 - StringField string - BoolField bool - BytesField []byte - NewFieldWithDefault *string +// InteractiveContentRecord is a generated struct. +type InteractiveContentRecord struct { + URL string `avro:"url"` } -// AvroRecord implements the avro.AvroRecord interface. -func (DummyEvent) AvroRecord() avrotypegen.RecordInfo { - return avrotypegen.RecordInfo{ - Schema: `{"fields":[{"name":"IntField","type":"int"},{"name":"DoubleField","type":"double"},{"name":"StringField","type":"string"},{"name":"BoolField","type":"boolean"},{"name":"BytesField","type":"bytes"},{"default":null,"name":"NewFieldWithDefault","type":["null","string"]}],"name":"DummyEvent","type":"record"}`, - Required: []bool{ - 0: true, - 1: true, - 2: true, - 3: true, - 4: true, - }, - } +// Event is a generated struct. +type Event struct { + ID string `avro:"id"` + DeliveredAtDateTimeUtc time.Time `avro:"deliveredAtDateTimeUtc"` + EventType string `avro:"eventType"` + InteractiveContent *[]InteractiveContentRecord `avro:"interactiveContent"` } diff --git a/test/evolution/schema_1.avsc b/test/evolution/schema_1.avsc index 03ea6e7..9788647 100644 --- a/test/evolution/schema_1.avsc +++ b/test/evolution/schema_1.avsc @@ -1,11 +1,30 @@ { "type": "record", - "name": "DummyEvent", + "name": "Event", + "namespace": "com.zillowgroup", "fields": [ - {"name": "IntField", "type": "int"}, - {"name": "DoubleField", "type": "double"}, - {"name": "StringField", "type": "string"}, - {"name": "BoolField", "type": "boolean"}, - {"name": "BytesField", "type": "bytes"} + { + "name": "id", + "type": "string" + }, + { + "name": "deliveredAtDateTimeUtc", + "type": { + "type": "long", + "logicalType": "timestamp-millis" + } + }, + { + "name": "eventType", + "type": { + "type": "enum", + "name": "EventType", + "symbols": [ + "created", + "associated" + ], + "default": "created" + } + } ] } \ No newline at end of file diff --git a/test/evolution/schema_2.avsc b/test/evolution/schema_2.avsc index 8355d50..1e765d3 100644 --- a/test/evolution/schema_2.avsc +++ b/test/evolution/schema_2.avsc @@ -1,12 +1,50 @@ { "type": "record", - "name": "DummyEvent", + "name": "Event", + "namespace": "com.zillowgroup", "fields": [ - {"name": "IntField", "type": "int"}, - {"name": "DoubleField", "type": "double"}, - {"name": "StringField", "type": "string"}, - {"name": "BoolField", "type": "boolean"}, - {"name": "BytesField", "type": "bytes"}, - {"name": "NewFieldWithDefault", "type": ["null", "string"], "default": null } + { + "name": "id", + "type": "string" + }, + { + "name": "deliveredAtDateTimeUtc", + "type": { + "type": "long", + "logicalType": "timestamp-millis" + } + }, + { + "name": "eventType", + "type": { + "type": "enum", + "name": "EventType", + "symbols": [ + "created", + "associated" + ], + "default": "created" + } + }, + { + "name": "interactiveContent", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "InteractiveContentRecord", + "fields": [ + { + "name": "url", + "type": "string" + } + ] + } + } + ], + "default": null + } ] } \ No newline at end of file diff --git a/test/schema_registry_evo_test.go b/test/schema_registry_evo_test.go index 5a82e08..7252ab2 100644 --- a/test/schema_registry_evo_test.go +++ b/test/schema_registry_evo_test.go @@ -12,6 +12,7 @@ import ( "fmt" "math/rand" "testing" + "time" "github.com/google/go-cmp/cmp/cmpopts" "github.com/google/uuid" @@ -83,27 +84,24 @@ func Test_SchemaRegistryReal_Avro_AutoRegisterSchemas_BackwardCompatibleSchemasC }, }) require.NoError(t, err) + id := uuid.NewString() - evt1 := avro1.DummyEvent{ - IntField: int(rand.Int31()), - DoubleField: rand.Float64(), - StringField: uuid.NewString(), - BoolField: true, - BytesField: []byte(uuid.NewString()), + evt1 := avro1.Event{ + ID: id, + DeliveredAtDateTimeUtc: time.Now().UTC().Truncate(time.Millisecond), + EventType: "created", } - // write msg1, and msg2 - _, err = writer1.Write(ctx, evt1) + _, err = writer1.Write(ctx, &evt1) require.NoError(t, err) - evt2 := avro2.DummyEvent{ - IntField: int(rand.Int31()), - DoubleField: rand.Float64(), - StringField: uuid.NewString(), - BoolField: true, - BytesField: []byte(uuid.NewString()), - NewFieldWithDefault: ptr(uuid.NewString()), + listingID2 := uuid.NewString() + + evt2 := avro2.Event{ + ID: listingID2, + DeliveredAtDateTimeUtc: time.Now().UTC().Truncate(time.Millisecond), + EventType: "created", } - _, err = writer2.Write(ctx, evt2) + _, err = writer2.Write(ctx, &evt2) require.NoError(t, err) consumerTopicConfig := zkafka.ConsumerTopicConfig{ @@ -111,7 +109,8 @@ func Test_SchemaRegistryReal_Avro_AutoRegisterSchemas_BackwardCompatibleSchemasC Topic: topic, Formatter: zkafka.AvroSchemaRegistry, SchemaRegistry: zkafka.SchemaRegistryConfig{ - URL: "http://localhost:8081", + URL: "http://localhost:8081", + Deserialization: zkafka.DeserializationConfig{Schema: dummyEventSchema1}, }, GroupID: groupID, AdditionalProps: map[string]any{ @@ -131,22 +130,20 @@ func Test_SchemaRegistryReal_Avro_AutoRegisterSchemas_BackwardCompatibleSchemasC require.NoError(t, reader.Close()) - receivedEvt1 := avro1.DummyEvent{} + receivedEvt1 := avro1.Event{} require.NoError(t, msg1.Decode(&receivedEvt1)) - assertEqual(t, evt1, receivedEvt1) + assertEqual(t, receivedEvt1, evt1) - receivedEvt2Schema1 := avro1.DummyEvent{} + receivedEvt2Schema1 := avro1.Event{} require.NoError(t, msg2.Decode(&receivedEvt2Schema1)) - expectedEvt2 := avro1.DummyEvent{ - IntField: evt2.IntField, - DoubleField: evt2.DoubleField, - StringField: evt2.StringField, - BoolField: evt2.BoolField, - BytesField: evt2.BytesField, + expectedEvt2 := avro1.Event{ + ID: evt2.ID, + DeliveredAtDateTimeUtc: evt2.DeliveredAtDateTimeUtc, + EventType: evt2.EventType, } assertEqual(t, expectedEvt2, receivedEvt2Schema1) - receivedEvt2Schema2 := avro2.DummyEvent{} + receivedEvt2Schema2 := avro2.Event{} require.NoError(t, msg2.Decode(&receivedEvt2Schema2)) assertEqual(t, evt2, receivedEvt2Schema2) } @@ -371,117 +368,6 @@ func Test_SchemaRegistryReal_JSON_AutoRegisterSchemas_BackwardCompatibleSchemasC assertEqual(t, evt2, receivedEvt2Schema2, cmpopts.IgnoreUnexported(json2.DummyEvent{})) } -func Test_SchemaRegistry_Avro_AutoRegisterSchemas_BackwardCompatibleSchemasCanBeRegisteredAndReadFrom(t *testing.T) { - checkShouldSkipTest(t, enableKafkaBrokerTest) - - ctx := context.Background() - topic := "integration-test-topic-2" + uuid.NewString() - bootstrapServer := getBootstrap() - - createTopic(t, bootstrapServer, topic, 1) - t.Logf("Created topic: %s", topic) - - groupID := uuid.NewString() - - client := zkafka.NewClient(zkafka.Config{BootstrapServers: []string{bootstrapServer}}, zkafka.LoggerOption(stdLogger{})) - defer func() { require.NoError(t, client.Close()) }() - - t.Log("Created writer with auto registered schemas") - writer1, err := client.Writer(ctx, zkafka.ProducerTopicConfig{ - ClientID: fmt.Sprintf("writer-%s-%s", t.Name(), uuid.NewString()), - Topic: topic, - Formatter: zkafka.AvroSchemaRegistry, - SchemaRegistry: zkafka.SchemaRegistryConfig{ - URL: "mock://", - Serialization: zkafka.SerializationConfig{ - AutoRegisterSchemas: true, - Schema: dummyEventSchema1, - }, - }, - }) - require.NoError(t, err) - - writer2, err := client.Writer(ctx, zkafka.ProducerTopicConfig{ - ClientID: fmt.Sprintf("writer-%s-%s", t.Name(), uuid.NewString()), - Topic: topic, - Formatter: zkafka.AvroSchemaRegistry, - SchemaRegistry: zkafka.SchemaRegistryConfig{ - URL: "mock://", - Serialization: zkafka.SerializationConfig{ - AutoRegisterSchemas: true, - Schema: dummyEventSchema2, - }, - }, - }) - require.NoError(t, err) - - evt1 := avro1.DummyEvent{ - IntField: int(rand.Int31()), - DoubleField: rand.Float64(), - StringField: uuid.NewString(), - BoolField: true, - BytesField: []byte(uuid.NewString()), - } - // write msg1, and msg2 - _, err = writer1.Write(ctx, evt1) - require.NoError(t, err) - - evt2 := avro2.DummyEvent{ - IntField: int(rand.Int31()), - DoubleField: rand.Float64(), - StringField: uuid.NewString(), - BoolField: true, - BytesField: []byte(uuid.NewString()), - NewFieldWithDefault: ptr(uuid.NewString()), - } - _, err = writer2.Write(ctx, evt2) - require.NoError(t, err) - - consumerTopicConfig := zkafka.ConsumerTopicConfig{ - ClientID: fmt.Sprintf("reader-%s-%s", t.Name(), uuid.NewString()), - Topic: topic, - Formatter: zkafka.AvroSchemaRegistry, - SchemaRegistry: zkafka.SchemaRegistryConfig{ - URL: "mock://", - }, - GroupID: groupID, - AdditionalProps: map[string]any{ - "auto.offset.reset": "earliest", - }, - } - reader, err := client.Reader(ctx, consumerTopicConfig) - require.NoError(t, err) - - t.Log("Begin reading messages") - results, err := readMessages(reader, 2) - require.NoError(t, err) - - msg1 := <-results - msg2 := <-results - t.Log("Close reader") - - require.NoError(t, reader.Close()) - - receivedEvt1 := avro1.DummyEvent{} - require.NoError(t, msg1.Decode(&receivedEvt1)) - assertEqual(t, evt1, receivedEvt1) - - receivedEvt2Schema1 := avro1.DummyEvent{} - require.NoError(t, msg2.Decode(&receivedEvt2Schema1)) - expectedEvt2 := avro1.DummyEvent{ - IntField: evt2.IntField, - DoubleField: evt2.DoubleField, - StringField: evt2.StringField, - BoolField: evt2.BoolField, - BytesField: evt2.BytesField, - } - assertEqual(t, expectedEvt2, receivedEvt2Schema1) - - receivedEvt2Schema2 := avro2.DummyEvent{} - require.NoError(t, msg2.Decode(&receivedEvt2Schema2)) - assertEqual(t, evt2, receivedEvt2Schema2) -} - func Test_SchemaRegistry_Proto_AutoRegisterSchemas_BackwardCompatibleSchemasCanBeRegisteredAndReadFrom(t *testing.T) { checkShouldSkipTest(t, enableKafkaBrokerTest) diff --git a/test/schema_registry_test.go b/test/schema_registry_test.go index c032f30..d04f433 100644 --- a/test/schema_registry_test.go +++ b/test/schema_registry_test.go @@ -7,6 +7,7 @@ import ( "math/rand" "os" "testing" + "time" "github.com/google/go-cmp/cmp/cmpopts" "github.com/google/uuid" @@ -14,6 +15,7 @@ import ( "github.com/zillow/zfmt" "github.com/zillow/zkafka" "github.com/zillow/zkafka/test/evolution/avro1" + avro1x "github.com/zillow/zkafka/test/evolution/avro1x" "github.com/zillow/zkafka/test/evolution/json1" "github.com/zillow/zkafka/test/evolution/proto1" ) @@ -55,20 +57,21 @@ func Test_SchemaRegistry_AutoRegisterSchemasFalse_WillNotWriteMessage(t *testing }) require.NoError(t, err) - evt1 := avro1.DummyEvent{ - IntField: int(rand.Int31()), - DoubleField: rand.Float64(), - StringField: uuid.NewString(), - BoolField: true, - BytesField: []byte(uuid.NewString()), + id := uuid.NewString() + + evt1 := avro1.Event{ + ID: id, + DeliveredAtDateTimeUtc: time.Now().UTC().Truncate(time.Millisecond), + EventType: "listingCreated", } - // write msg1, and msg2 + _, err = writer1.Write(ctx, evt1) require.ErrorContains(t, err, "failed to get avro schema by id") } // Its possible not specify a schema for your producer. // In this case, the underlying lib does + func Test_SchemaRegistry_Avro_AutoRegisterSchemas_RequiresSchemaSpecification(t *testing.T) { checkShouldSkipTest(t, enableKafkaBrokerTest) @@ -98,20 +101,21 @@ func Test_SchemaRegistry_Avro_AutoRegisterSchemas_RequiresSchemaSpecification(t }) require.NoError(t, err) - evt1 := avro1.DummyEvent{ - IntField: int(rand.Int31()), - DoubleField: rand.Float64(), - StringField: uuid.NewString(), - BoolField: true, - BytesField: []byte(uuid.NewString()), + id := uuid.NewString() + + evt1 := avro1.Event{ + ID: id, + DeliveredAtDateTimeUtc: time.Now().UTC().Truncate(time.Millisecond), + EventType: "listingCreated", } - // write msg1, and msg2 + _, err = writer1.Write(ctx, evt1) require.ErrorContains(t, err, "avro schema is required for schema registry formatter") } // Test_SchemaNotRegistered_ResultsInWorkerDecodeError demonstrates the behavior when a worker reads // a message for a schema that doesn't exist in shcema registry. This test shows that such a situation would result in a decode error + func Test_SchemaNotRegistered_ResultsInWorkerDecodeError(t *testing.T) { checkShouldSkipTest(t, enableKafkaBrokerTest) @@ -139,14 +143,14 @@ func Test_SchemaNotRegistered_ResultsInWorkerDecodeError(t *testing.T) { }) require.NoError(t, err) - evt1 := avro1.DummyEvent{ - IntField: int(rand.Int31()), - DoubleField: rand.Float64(), - StringField: uuid.NewString(), - BoolField: true, - BytesField: []byte(uuid.NewString()), + id := uuid.NewString() + + evt1 := avro1x.Event{ + Id: id, + Deliveredatdatetimeutc: rand.Int63(), + Eventtype: avro1x.EventTypeCreated, } - // write msg1 + _, err = writer1.Write(ctx, evt1) require.NoError(t, err) @@ -156,6 +160,9 @@ func Test_SchemaNotRegistered_ResultsInWorkerDecodeError(t *testing.T) { Formatter: zkafka.AvroSchemaRegistry, SchemaRegistry: zkafka.SchemaRegistryConfig{ URL: "mock://", + Deserialization: zkafka.DeserializationConfig{ + Schema: dummyEventSchema1, + }, }, GroupID: groupID, AdditionalProps: map[string]any{ @@ -166,7 +173,7 @@ func Test_SchemaNotRegistered_ResultsInWorkerDecodeError(t *testing.T) { wf := zkafka.NewWorkFactory(client) w := wf.CreateWithFunc(consumerTopicConfig, func(_ context.Context, msg *zkafka.Message) error { defer cancel() - gotErr = msg.Decode(&avro1.DummyEvent{}) + gotErr = msg.Decode(&avro1.Event{}) return gotErr }) @@ -208,13 +215,14 @@ func Test_SchemaRegistry_Avro_SubjectNameSpecification(t *testing.T) { }) require.NoError(t, err) - evt1 := avro1.DummyEvent{ - IntField: int(rand.Int31()), - DoubleField: rand.Float64(), - StringField: uuid.NewString(), - BoolField: true, - BytesField: []byte(uuid.NewString()), + id := uuid.NewString() + + evt1 := avro1.Event{ + ID: id, + DeliveredAtDateTimeUtc: time.Now().UTC().Truncate(time.Millisecond), + EventType: "created", } + // write msg1, and msg2 _, err = writer1.Write(ctx, evt1) require.NoError(t, err) @@ -226,6 +234,9 @@ func Test_SchemaRegistry_Avro_SubjectNameSpecification(t *testing.T) { SchemaRegistry: zkafka.SchemaRegistryConfig{ URL: "mock://", SubjectName: subjName, + Deserialization: zkafka.DeserializationConfig{ + Schema: dummyEventSchema1, + }, }, GroupID: groupID, AdditionalProps: map[string]any{ @@ -244,7 +255,7 @@ func Test_SchemaRegistry_Avro_SubjectNameSpecification(t *testing.T) { require.NoError(t, reader.Close()) - receivedEvt1 := avro1.DummyEvent{} + receivedEvt1 := avro1.Event{} require.NoError(t, msg1.Decode(&receivedEvt1)) assertEqual(t, evt1, receivedEvt1) } @@ -288,7 +299,6 @@ func Test_SchemaRegistry_Proto_SubjectNameSpecification(t *testing.T) { BoolField: true, BytesField: []byte(uuid.NewString()), } - _, err = writer1.Write(ctx, evt1) require.NoError(t, err) @@ -299,6 +309,9 @@ func Test_SchemaRegistry_Proto_SubjectNameSpecification(t *testing.T) { SchemaRegistry: zkafka.SchemaRegistryConfig{ URL: "mock://", SubjectName: subjName, + Deserialization: zkafka.DeserializationConfig{ + Schema: dummyEventSchema1, + }, }, GroupID: groupID, AdditionalProps: map[string]any{ @@ -361,7 +374,6 @@ func Test_SchemaRegistry_Json_SubjectNameSpecification(t *testing.T) { BoolField: true, BytesField: []byte(uuid.NewString()), } - _, err = writer1.Write(ctx, evt1) require.NoError(t, err) @@ -372,6 +384,9 @@ func Test_SchemaRegistry_Json_SubjectNameSpecification(t *testing.T) { SchemaRegistry: zkafka.SchemaRegistryConfig{ URL: "mock://", SubjectName: subjName, + Deserialization: zkafka.DeserializationConfig{ + Schema: dummyEventSchema1, + }, }, GroupID: groupID, AdditionalProps: map[string]any{ @@ -394,7 +409,6 @@ func Test_SchemaRegistry_Json_SubjectNameSpecification(t *testing.T) { require.NoError(t, msg1.Decode(&receivedEvt1)) assertEqual(t, evt1, receivedEvt1) } - func checkShouldSkipTest(t *testing.T, flags ...string) { t.Helper() for _, flag := range flags { diff --git a/writer.go b/writer.go index 4b706a1..6880ae8 100644 --- a/writer.go +++ b/writer.go @@ -233,9 +233,9 @@ func (w *KWriter) marshall(_ context.Context, value any, schema string) ([]byte, return nil, errors.New("formatter or confluent formatter is not supplied to produce kafka message") } return w.formatter.marshall(marshReq{ - topic: w.topicConfig.Topic, - subject: value, - schema: schema, + topic: w.topicConfig.Topic, + v: value, + schema: schema, }) }