diff --git a/pubsub/go.mod b/pubsub/go.mod index 3760109..3d83270 100644 --- a/pubsub/go.mod +++ b/pubsub/go.mod @@ -6,7 +6,6 @@ toolchain go1.23.2 require ( github.com/IBM/sarama v1.43.3 - github.com/Shopify/sarama v1.38.1 github.com/ThreeDotsLabs/watermill v1.4.0 github.com/ThreeDotsLabs/watermill-kafka/v3 v3.0.5 github.com/google/uuid v1.6.0 diff --git a/pubsub/go.sum b/pubsub/go.sum index ba5d7a2..2940d12 100644 --- a/pubsub/go.sum +++ b/pubsub/go.sum @@ -1,9 +1,5 @@ github.com/IBM/sarama v1.43.3 h1:Yj6L2IaNvb2mRBop39N7mmJAHBVY3dTPncr3qGVkxPA= github.com/IBM/sarama v1.43.3/go.mod h1:FVIRaLrhK3Cla/9FfRF5X9Zua2KpS3SYIXxhac1H+FQ= -github.com/Shopify/sarama v1.38.1 h1:lqqPUPQZ7zPqYlWpTh+LQ9bhYNu2xJL6k1SJN4WVe2A= -github.com/Shopify/sarama v1.38.1/go.mod h1:iwv9a67Ha8VNa+TifujYoWGxWnu2kNVAQdSdZ4X2o5g= -github.com/Shopify/toxiproxy/v2 v2.5.0 h1:i4LPT+qrSlKNtQf5QliVjdP08GyAH8+BUIc9gT0eahc= -github.com/Shopify/toxiproxy/v2 v2.5.0/go.mod h1:yhM2epWtAmel9CB8r2+L+PCmhH6yH2pITaPAo7jxJl0= github.com/ThreeDotsLabs/watermill v1.4.0 h1:c8T4QHY/MuxSXYQ1Cxn93cCZB5lkGgqhYA6L2jh2ghA= github.com/ThreeDotsLabs/watermill v1.4.0/go.mod h1:lBnrLbxOjeMRgcJbv+UiZr8Ylz8RkJ4m6i/VN/Nk+to= github.com/ThreeDotsLabs/watermill-kafka/v3 v3.0.5 h1:ud+4txnRgtr3kZXfXZ5+C7kVQEvsLc5HSNUEa0g+X1Q= diff --git a/pubsub/kafkashopifysarama/kafkashopifysarama_tls.go b/pubsub/kafkasarama/kafkasarama_tls.go similarity index 96% rename from pubsub/kafkashopifysarama/kafkashopifysarama_tls.go rename to pubsub/kafkasarama/kafkasarama_tls.go index 6e4675c..fe837cd 100644 --- a/pubsub/kafkashopifysarama/kafkashopifysarama_tls.go +++ b/pubsub/kafkasarama/kafkasarama_tls.go @@ -1,4 +1,4 @@ -package kafkashopifysarama +package kafkasarama import ( "crypto/tls" @@ -7,7 +7,7 @@ import ( "os" "time" - "github.com/Shopify/sarama" + "github.com/IBM/sarama" ) // NewTLSSubscriberConfig creates a new kafka subscriber config with TLS diff --git a/pubsub/kafkashopifysarama/config/ca_test.crt b/pubsub/kafkashopifysarama/config/ca_test.crt deleted file mode 100644 index 3ac9082..0000000 --- a/pubsub/kafkashopifysarama/config/ca_test.crt +++ /dev/null @@ -1,23 +0,0 @@ ------BEGIN CERTIFICATE----- -MIID1zCCAr+gAwIBAgIUVKC1c251XQI6xQOSk8TzVd02nV0wDQYJKoZIhvcNAQEL -BQAwQjELMAkGA1UEBhMCVUsxEjAQBgNVBAoMCUNvbmZsdWVudDEPMA0GA1UEBwwG -TG9uZG9uMQ4wDAYDVQQDDAVrYWZrYTAgFw0yMzA3MjgxMjA4NDNaGA8yMTIzMDcw -NDEyMDg0M1owQjELMAkGA1UEBhMCVUsxEjAQBgNVBAoMCUNvbmZsdWVudDEPMA0G -A1UEBwwGTG9uZG9uMQ4wDAYDVQQDDAVrYWZrYTCCASIwDQYJKoZIhvcNAQEBBQAD -ggEPADCCAQoCggEBAML1AIVjSSE/o+hg5jGSSK1gWq3TZTLqGQ75Csc+DrnoKztq -XBzmDcGW/S1cW9Yw+lEyGymc/4SzPIssrJu6y2ncv28kUMEzjftfwtc1zidKaTYk -wIAzksjNU/lP9UC8cq6pDyqztBr9RBHSmtxSIVQHeV5sZij+Rdluz5EUlsm6O0sd -EqHsa7M8rnxQmArHy+sYHEpe1hCN+AnGZtuL1xjUXDNjth5qrTUW6E30mLh7D6y5 -YRbR+EO9YDnXaA7nAxpSUXyu2+IrNdxYXdy6Fee1Owb461ufVql5msBlkPBQylQa -4RcXZdRE10elELVEp38IJP4/HRodO157yw7UsScCAwEAAaOBwjCBvzAdBgNVHQ4E -FgQUWBF2Mby1FPlEApc+3XgC0ykmsgYwDwYDVR0TAQH/BAUwAwEB/zB9BgNVHSME -djB0gBRYEXYxvLUU+UQClz7deALTKSayBqFGpEQwQjELMAkGA1UEBhMCVUsxEjAQ -BgNVBAoMCUNvbmZsdWVudDEPMA0GA1UEBwwGTG9uZG9uMQ4wDAYDVQQDDAVrYWZr -YYIUVKC1c251XQI6xQOSk8TzVd02nV0wDgYDVR0PAQH/BAQDAgEGMA0GCSqGSIb3 -DQEBCwUAA4IBAQC+nqrlZYH/604R2GtO0tPyABnuP8mc+dV7btatkGjHlY/Lx07T -tfxva3RSNHPh+yoSvwHriLCYBwX9lChAYc+l7ACbtPerbIHMm6YNkVKBRplXCO66 -BHv8xXOs9oBgAsgTIvfznbH/hNJrY0A6I4M2tVOvcrmxzZT4oZJwEqy7B3gu2y0A -b2QcQzNnqXvnyGG91b+CBHvU/XcvIuxQysct31banEkIc+nTl+rFGsN2phJz+J+h -i8jmY2AixD++ZFPB5/4msaa6o6x4QFc5bvf92BDuJTEyKt+dM9rhZvtU1plnKmZq -89xtHAZltnhjC7AU5nia+hCd8hHlXasjljoe ------END CERTIFICATE----- diff --git a/pubsub/kafkashopifysarama/config/test.crt b/pubsub/kafkashopifysarama/config/test.crt deleted file mode 100644 index fa52174..0000000 --- a/pubsub/kafkashopifysarama/config/test.crt +++ /dev/null @@ -1,23 +0,0 @@ ------BEGIN CERTIFICATE----- -MIID0zCCArugAwIBAgIUeHl9LEIwVLvdrQ1Qk/veZPdQ9yUwDQYJKoZIhvcNAQEL -BQAwQjELMAkGA1UEBhMCVUsxEjAQBgNVBAoMCUNvbmZsdWVudDEPMA0GA1UEBwwG -TG9uZG9uMQ4wDAYDVQQDDAVrYWZrYTAgFw0yMzA3MjgxMjA4NDNaGA8yMTIzMDcw -NDEyMDg0M1owQjELMAkGA1UEBhMCVUsxEjAQBgNVBAoMCUNvbmZsdWVudDEPMA0G -A1UEBwwGTG9uZG9uMQ4wDAYDVQQDDAVrYWZrYTCCASIwDQYJKoZIhvcNAQEBBQAD -ggEPADCCAQoCggEBAKiOi8SsfcqZl7A80ECjWFdT+RX4ufc2CtB+0WDRg9tnRh/M -dh7+IX5Y+ILPJ2kYwNnPM+wjBpLXmXX7/aqMgmDe6v3ikRKXh+VMWNMaprJhCldD -8YpCv0tPIt1H0BF6vcku7pn4hsAkbgj0QCzywmz94KlFqtjRiipg3KJoOmrR0YYv -XAyqfefQwFzzRFkp71QRYWJMdJOZ0iibjvj1Fj6rLY3CZzTCZpQdyh9DxiHLgLLz -0BLxZf47MznnE9eofLXsmhS3IJ+/QPeM12mlX4wAHLBLTq4d8Xhp0ds1km4E5dML -2JCdxSFTcfL+ww1X/W0c8dS0m85H/9SLsmbjaa0CAwEAAaOBvjCBuzAdBgNVHQ4E -FgQUZChhFb324S1UADhefq8CfUOGlPYwCQYDVR0TBAIwADAsBglghkgBhvhCAQ0E -HxYdT3BlblNTTCBHZW5lcmF0ZWQgQ2VydGlmaWNhdGUwDgYDVR0PAQH/BAQDAgWg -MBMGA1UdJQQMMAoGCCsGAQUFBwMCMBsGA1UdEQQUMBKCBWthZmthgglsb2NhbGhv -c3QwHwYDVR0jBBgwFoAUWBF2Mby1FPlEApc+3XgC0ykmsgYwDQYJKoZIhvcNAQEL -BQADggEBADxxxKrKVhTKZulsgm1uYaBT7K6NHJrWRKmwXCKPJvDj4Qhw+gwS+FIG -dPcjxALBXYpc2mI5HNgEpGe1FAEryVx/2bI4pN7xkVj2Sso8v7VFIY+Q1gCoyKIQ -Ff7lo+nP4byRNrzk5ffMZHC7CYu9IHNzTPsFWS8A9ieOs8Fwn0kY5Zjbq6qj6HjV -x+xA41Zo3JAxHaT0I6V6d64+adUBDyNYFl10d6FoJTTRVv25h3mInyQrZIMfzOwZ -svGSNWnFxVyyyIlMqIFAPIL+uDR4t76XzYDigo6ox4lZOAPad2f1zd/Ge4kEHvc1 -i5IrvYvVc7iJVuchgoxqH6hrb1VsJNI= ------END CERTIFICATE----- diff --git a/pubsub/kafkashopifysarama/config/test.key b/pubsub/kafkashopifysarama/config/test.key deleted file mode 100644 index 7999d6e..0000000 --- a/pubsub/kafkashopifysarama/config/test.key +++ /dev/null @@ -1,28 +0,0 @@ ------BEGIN PRIVATE KEY----- -MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCojovErH3KmZew -PNBAo1hXU/kV+Ln3NgrQftFg0YPbZ0YfzHYe/iF+WPiCzydpGMDZzzPsIwaS15l1 -+/2qjIJg3ur94pESl4flTFjTGqayYQpXQ/GKQr9LTyLdR9ARer3JLu6Z+IbAJG4I -9EAs8sJs/eCpRarY0YoqYNyiaDpq0dGGL1wMqn3n0MBc80RZKe9UEWFiTHSTmdIo -m4749RY+qy2Nwmc0wmaUHcofQ8Yhy4Cy89AS8WX+OzM55xPXqHy17JoUtyCfv0D3 -jNdppV+MABywS06uHfF4adHbNZJuBOXTC9iQncUhU3Hy/sMNV/1tHPHUtJvOR//U -i7Jm42mtAgMBAAECggEAEVnVGOqBq2qaowVrkY/O/vRRsnmPWRuKyvIXBw6iMmJj -5U+UhDW4SGuLr1l4Ltkhtvodl+v2GAUSEsNdFiKPJD4mNfrN4LpQ/qFYjmiSqs+S -8y83DLq4ut2bUuh6ymXWZgCLBrX+1wYY/pp4Bbh6m6IZr4sQnm+Zd6nPZHdytf6W -nqHZDpnq206JffRA7BbKfHMxi1Rs3jwdKTfHeMktaJMtCagJz8jeECLRYoNkbVPT -N/rN4sHYXn23DBN5plEikIdWm5no5v8/BsA1N/jN48cPjmC1byKyjnlzOLbNT8xm -kI/c+zFR4/2y/SEiaCv8ZHjbBwWSWoft7qvq++170QKBgQDs1LNv00JBsXvuyiMZ -+xRRcuKzV4jKlfplJQW1rdoyOfnwu70ZpSnHD3rcHoJMenwrxYReUpqg/6snRKW1 -KRHIRpLhpROoxVhTOTy3z2LtzMqTR7nYWnffZkSKqptiEZl5niVN9ItJAJBSRrPs -UrhQr4d9sBodqfx5iv9CMuaWvQKBgQC2MyjjNEt2ps9u3To2jfJN3mLsceymFejW -mTQDo8gSzpE0vnmhd+3vC6DBpjkARY7t3u3hH5sKdmyyl1024iq2Tp/29MGIFDpI -e/tyI9Xm8985HWxPS3ALtU7Z4JSlzZ3x4Ft7pZ0rJZGOY9GAeStRO6JkxQVMoGPU -vSjqlomFsQKBgBW29vDk5OlTFbLyU7+ZFubU6tZYy1EP6VKGz3w2AZCjYjhhblhA -nZED6VbvcTED9gipZpajakwixRWnpK30ow3C8sq/sQrDdXLEB74uxLpbEaPpaq/c -s6sHHHe+ZtraFEFjb7YzGuZJp/HzS6H6f63eOkUa9XoM0Ppv9TGjqyLdAoGBAKxZ -lLHoBAKHJO2wY6K7f5vdZCJaWgt56jklzygqQ9ZWhNEp5RCyy+Y2T6kfPghdxCiL -muY76YNqJsSBnnGOW+z8TyFiwbehushaT67W6z5/Lodup8gSijjpF1/Oq450BJaL -Lr62GQh5j0jsb39iH3HGQYWlQbAMgKL7FLtkD07hAoGBAODdehEn2naDoAMwAE8p -42f2sK0akkMREV29iYvfq7jdj8vsb1RN0fpzfuDQ7NwGCkE052p9OeTTPpkTYI09 -2hgeEoj2I4YOeLfRaiDUTTKpsrM+LOcKca/HyqzxI3WAWjvc0oI0fsorpjLeJnJ4 -KPziycCfPAlwNOHn95+yiPVl ------END PRIVATE KEY----- diff --git a/pubsub/kafkashopifysarama/kafkashopifysarama_publisher.go b/pubsub/kafkashopifysarama/kafkashopifysarama_publisher.go deleted file mode 100644 index 6fb6245..0000000 --- a/pubsub/kafkashopifysarama/kafkashopifysarama_publisher.go +++ /dev/null @@ -1,79 +0,0 @@ -package kafkashopifysarama - -import ( - "fmt" - "log/slog" - "time" - - "github.com/Shopify/sarama" - "github.com/purposeinplay/go-commons/pubsub" -) - -var _ pubsub.Publisher[string, []byte] = (*Publisher)(nil) - -// Publisher represents a kafka publisher. -type Publisher struct { - logger *slog.Logger - syncProducer sarama.SyncProducer -} - -// NewPublisher creates a new kafka publisher. -func NewPublisher( - slogHandler slog.Handler, - saramaConfig *sarama.Config, - brokers []string, -) (*Publisher, error) { - cfg := saramaConfig - - if cfg == nil { - cfg = sarama.NewConfig() - - cfg.Producer.Retry.Max = 10 - cfg.Producer.Return.Successes = true - cfg.Metadata.Retry.Backoff = time.Second * 2 - } - - producer, err := sarama.NewSyncProducer(brokers, cfg) - if err != nil { - return nil, fmt.Errorf("new kafka publisher: %w", err) - } - - return &Publisher{ - logger: slog.New(slogHandler), - syncProducer: producer, - }, nil -} - -// Publish publishes an event to a kafka topic. -func (p Publisher) Publish( - event pubsub.Event[string, []byte], - channels ...string, -) error { - if len(channels) != 1 { - return pubsub.ErrExactlyOneChannelAllowed - } - - topic := channels[0] - - mes := &sarama.ProducerMessage{ - Topic: topic, - Headers: []sarama.RecordHeader{ - { - Key: []byte("type"), - Value: []byte(event.Type), - }, - }, - Value: sarama.ByteEncoder(event.Payload), - } - - if _, _, err := p.syncProducer.SendMessage(mes); err != nil { - return fmt.Errorf("publish: %w", err) - } - - return nil -} - -// Close closes the kafka publisher. -func (p Publisher) Close() error { - return p.syncProducer.Close() -} diff --git a/pubsub/kafkashopifysarama/kafkashopifysarama_subscriber.go b/pubsub/kafkashopifysarama/kafkashopifysarama_subscriber.go deleted file mode 100644 index adafb07..0000000 --- a/pubsub/kafkashopifysarama/kafkashopifysarama_subscriber.go +++ /dev/null @@ -1,204 +0,0 @@ -package kafkashopifysarama - -import ( - "bytes" - "context" - "fmt" - "log/slog" - "sync" - "time" - - "github.com/Shopify/sarama" - "github.com/purposeinplay/go-commons/pubsub" -) - -var _ pubsub.Subscriber[string, []byte] = (*Subscriber)(nil) - -// Subscriber represents a kafka subscriber. -type Subscriber struct { - logger *slog.Logger - cfg *sarama.Config - brokers []string - consumerGroup string -} - -// NewSubscriber creates a new kafka subscriber. -func NewSubscriber( - slogHandler slog.Handler, - saramaConfig *sarama.Config, - clientID string, - initialOffset int64, - autoCommit bool, - sessionTimeoutMS int, - heartbeatIntervalMS int, - brokers []string, - groupID string, -) (*Subscriber, error) { - cfg := saramaConfig - - if cfg == nil { - cfg = sarama.NewConfig() - - cfg.Consumer.Return.Errors = true - } - - cfg.ClientID = clientID - cfg.Consumer.Offsets.Initial = initialOffset - cfg.Consumer.Offsets.AutoCommit.Enable = autoCommit - cfg.Consumer.Group.Session.Timeout = time.Duration( - sessionTimeoutMS, - ) * time.Millisecond - cfg.Consumer.Group.Heartbeat.Interval = time.Duration( - heartbeatIntervalMS, - ) * time.Millisecond - - return &Subscriber{ - logger: slog.New(slogHandler), - cfg: cfg, - brokers: brokers, - consumerGroup: groupID, - }, nil -} - -// Subscribe creates a new subscription that runs in the background. -func (s Subscriber) Subscribe( - channels ...string, -) (pubsub.Subscription[string, []byte], error) { - if len(channels) != 1 { - return nil, pubsub.ErrExactlyOneChannelAllowed - } - - if _, err := sarama.NewConsumerGroup(s.brokers, s.consumerGroup, s.cfg); err != nil { - return nil, fmt.Errorf("new sarama consumer group: %w", err) - } - - consumer, err := sarama.NewConsumer(s.brokers, s.cfg) - if err != nil { - return nil, fmt.Errorf("new sarama consumer: %w", err) - } - - topic := channels[0] - - return newSubscription(s.logger, consumer, topic) -} - -var _ pubsub.Subscription[string, []byte] = (*Subscription)(nil) - -// Subscription represents a stream of events published to a kafka topic. -type Subscription struct { - eventCh chan pubsub.Event[string, []byte] - cancelFunc context.CancelFunc - wg *sync.WaitGroup - consumer sarama.Consumer -} - -func newSubscription( - logger *slog.Logger, - consumer sarama.Consumer, - topic string, -) (*Subscription, error) { - partitions, err := consumer.Partitions(topic) - if err != nil { - return nil, fmt.Errorf("get topic %q partitions: %w", topic, err) - } - - eventCh := make(chan pubsub.Event[string, []byte]) - - ctx, cancel := context.WithCancel(context.Background()) - - wg := new(sync.WaitGroup) - - wg.Add(len(partitions)) - - for _, partition := range partitions { - partitionConsumer, err := consumer.ConsumePartition( - topic, - partition, - sarama.OffsetNewest, - ) - if err != nil { - cancel() - - return nil, fmt.Errorf( - "consume partition %d for topic %q: %w", - partition, - topic, - err, - ) - } - - go func() { - defer wg.Done() - - // consume partition in the background, stop when the context is - // cancelled. - consumePartition(ctx, logger, partitionConsumer, eventCh) - }() - } - - return &Subscription{ - eventCh: eventCh, - cancelFunc: cancel, - wg: wg, - consumer: consumer, - }, nil -} - -func consumePartition( - ctx context.Context, - logger *slog.Logger, - partitionConsumer sarama.PartitionConsumer, - eventCh chan<- pubsub.Event[string, []byte], -) { - for { - select { - case m := <-partitionConsumer.Messages(): - var typ string - - for _, h := range m.Headers { - if bytes.Equal(h.Key, []byte("type")) { - typ = string(h.Value) - - break - } - } - - eventCh <- pubsub.Event[string, []byte]{ - Type: typ, - Payload: m.Value, - } - - case err := <-partitionConsumer.Errors(): - eventCh <- pubsub.Event[string, []byte]{ - Type: pubsub.EventTypeError, - Error: err, - } - - case <-ctx.Done(): - if err := partitionConsumer.Close(); err != nil { - logger.Error( - "close partition consumer error", - slog.String("error", err.Error()), - ) - } - - return - } - } -} - -// C returns a receive-only go channel of events published. -func (s Subscription) C() <-chan pubsub.Event[string, []byte] { - return s.eventCh -} - -// Close closes the subscription. -func (s Subscription) Close() error { - s.cancelFunc() - - s.wg.Wait() - - close(s.eventCh) - - return s.consumer.Close() -} diff --git a/pubsub/kafkashopifysarama/kafkashopifysarama_test.go b/pubsub/kafkashopifysarama/kafkashopifysarama_test.go deleted file mode 100644 index 4791286..0000000 --- a/pubsub/kafkashopifysarama/kafkashopifysarama_test.go +++ /dev/null @@ -1,261 +0,0 @@ -package kafkashopifysarama_test - -import ( - "context" - "os" - "sync" - "testing" - "time" - - "github.com/Shopify/sarama" - "github.com/matryer/is" - "github.com/purposeinplay/go-commons/pubsub" - "github.com/purposeinplay/go-commons/pubsub/kafkashopifysarama" - "go.uber.org/zap" - "go.uber.org/zap/exp/zapslog" -) - -func TestPubSub(t *testing.T) { - logger := zap.NewExample() - - i := is.New(t) - - slogHandler := zapslog.NewHandler(logger.Core(), nil) - - var ( - clientID = "test_client_id" - initialOffset = sarama.OffsetNewest - autoCommit = true - sessionTimeoutMS = 45000 - heartbeatIntervalMS = 15000 - brokers = []string{os.Getenv("KAFKA_BROKER_URL")} - groupID = "test_groupd_id" - topic = "test_topic" - ) - - cfg1, err := kafkashopifysarama.NewTLSSubscriberConfig( - "./config/test.crt", - "./config/test.key", - "./config/ca_test.crt", - ) - i.NoErr(err) - - suber1, err := kafkashopifysarama.NewSubscriber( - slogHandler, - cfg1, - clientID, - initialOffset, - autoCommit, - sessionTimeoutMS, - heartbeatIntervalMS, - brokers, - groupID, - ) - i.NoErr(err) - - pubCfg, err := kafkashopifysarama.NewTLSPublisherConfig( - "./config/test.crt", - "./config/test.key", - "./config/ca_test.crt", - ) - i.NoErr(err) - - pub, err := kafkashopifysarama.NewPublisher( - slogHandler, - pubCfg, - brokers, - ) - i.NoErr(err) - - t.Cleanup(func() { i.NoErr(pub.Close()) }) - - sub1, err := suber1.Subscribe(topic) - i.NoErr(err) - - t.Cleanup(func() { i.NoErr(sub1.Close()) }) - - sub2, err := suber1.Subscribe(topic) - i.NoErr(err) - - t.Cleanup(func() { i.NoErr(sub2.Close()) }) - - mes := pubsub.Event[string, []byte]{ - Type: "test", - Payload: []byte("test_payload"), - } - - err = pub.Publish(mes, topic) - i.NoErr(err) - - var wg sync.WaitGroup - - wg.Add(2) - - now := time.Now() - - go func() { - defer wg.Done() - - receivedMes := <-sub1.C() - i.Equal(receivedMes, mes) - - t.Logf("sub1 received the message in %s", time.Since(now)) - }() - - go func() { - defer wg.Done() - - receivedMes := <-sub2.C() - i.Equal(receivedMes, mes) - - t.Logf("sub2 received the message in %s", time.Since(now)) - }() - - done := make(chan struct{}) - - go func() { - wg.Wait() - close(done) - }() - - select { - case <-done: - case <-time.After(5 * time.Second): - t.Fatal("timeout") - } -} - -func TestConsumerGroups(t *testing.T) { - logger := zap.NewExample() - - i := is.New(t) - - slogHandler := zapslog.NewHandler(logger.Core(), nil) - - var ( - clientID = "test_client_id" - initialOffset = sarama.OffsetNewest - autoCommit = true - sessionTimeoutMS = 45000 - heartbeatIntervalMS = 15000 - brokers = []string{os.Getenv("KAFKA_BROKER_URL")} - groupID = "test_groupd_id" - topic = "test_topic" - ) - - ctx, cancel := context.WithCancel(context.Background()) - - cfg1, err := kafkashopifysarama.NewTLSSubscriberConfig( - "./config/test.crt", - "./config/test.key", - "./config/ca_test.crt", - ) - i.NoErr(err) - - suber1, err := kafkashopifysarama.NewSubscriber( - slogHandler, - cfg1, - clientID, - initialOffset, - autoCommit, - sessionTimeoutMS, - heartbeatIntervalMS, - brokers, - groupID, - ) - i.NoErr(err) - - mes := pubsub.Event[string, []byte]{ - Type: "test", - Payload: []byte("test_payload"), - } - - sub1, err := suber1.Subscribe(topic) - i.NoErr(err) - - t.Cleanup(func() { i.NoErr(sub1.Close()) }) - - var wg sync.WaitGroup - - wg.Add(2) - - go func() { - defer wg.Done() - - for { - select { - case receivedMes := <-sub1.C(): - i.Equal(receivedMes, mes) - - t.Logf("sub 1: %s", mes.Payload) - case <-ctx.Done(): - return - } - } - }() - - cfg2, err := kafkashopifysarama.NewTLSSubscriberConfig( - "./config/test.crt", - "./config/test.key", - "./config/ca_test.crt", - ) - i.NoErr(err) - - suber2, err := kafkashopifysarama.NewSubscriber( - slogHandler, - cfg2, - clientID, - initialOffset, - autoCommit, - sessionTimeoutMS, - heartbeatIntervalMS, - brokers, - groupID, - ) - i.NoErr(err) - - sub2, err := suber2.Subscribe(topic) - i.NoErr(err) - - t.Cleanup(func() { i.NoErr(sub2.Close()) }) - - go func() { - defer wg.Done() - - for { - select { - case receivedMes := <-sub2.C(): - i.Equal(receivedMes, mes) - - t.Logf("sub 2: %s", mes.Payload) - case <-ctx.Done(): - return - } - } - }() - - pubCfg, err := kafkashopifysarama.NewTLSPublisherConfig( - "./config/test.crt", - "./config/test.key", - "./config/ca_test.crt", - ) - i.NoErr(err) - - pub, err := kafkashopifysarama.NewPublisher( - slogHandler, - pubCfg, - brokers, - ) - i.NoErr(err) - - t.Cleanup(func() { i.NoErr(pub.Close()) }) - - err = pub.Publish(mes, topic) - i.NoErr(err) - - time.Sleep(5 * time.Second) - - cancel() - - wg.Wait() -}