From e16261b74623c37b5e45a394e1957348eca2fac4 Mon Sep 17 00:00:00 2001 From: jorgepurposeinplay Date: Thu, 16 Jan 2025 15:46:52 +0100 Subject: [PATCH] refactor(kafkashopifysarama): make changes based on kafkasarama --- pubsub/kafkashopifysarama/config/ca_test.crt | 23 ++ pubsub/kafkashopifysarama/config/test.crt | 23 ++ pubsub/kafkashopifysarama/config/test.key | 28 ++ .../kafkashopifysarama_publisher.go | 79 ++++++ .../kafkashopifysarama_server_test.go | 96 ------- .../kafkashopifysarama_subscriber.go | 44 +++- .../kafkashopifysarama_test.go | 240 ++++++++++++++---- .../kafkashopifysarama_tls.go | 34 ++- 8 files changed, 407 insertions(+), 160 deletions(-) create mode 100644 pubsub/kafkashopifysarama/config/ca_test.crt create mode 100644 pubsub/kafkashopifysarama/config/test.crt create mode 100644 pubsub/kafkashopifysarama/config/test.key create mode 100644 pubsub/kafkashopifysarama/kafkashopifysarama_publisher.go delete mode 100644 pubsub/kafkashopifysarama/kafkashopifysarama_server_test.go diff --git a/pubsub/kafkashopifysarama/config/ca_test.crt b/pubsub/kafkashopifysarama/config/ca_test.crt new file mode 100644 index 0000000..3ac9082 --- /dev/null +++ b/pubsub/kafkashopifysarama/config/ca_test.crt @@ -0,0 +1,23 @@ +-----BEGIN CERTIFICATE----- +MIID1zCCAr+gAwIBAgIUVKC1c251XQI6xQOSk8TzVd02nV0wDQYJKoZIhvcNAQEL +BQAwQjELMAkGA1UEBhMCVUsxEjAQBgNVBAoMCUNvbmZsdWVudDEPMA0GA1UEBwwG +TG9uZG9uMQ4wDAYDVQQDDAVrYWZrYTAgFw0yMzA3MjgxMjA4NDNaGA8yMTIzMDcw +NDEyMDg0M1owQjELMAkGA1UEBhMCVUsxEjAQBgNVBAoMCUNvbmZsdWVudDEPMA0G +A1UEBwwGTG9uZG9uMQ4wDAYDVQQDDAVrYWZrYTCCASIwDQYJKoZIhvcNAQEBBQAD +ggEPADCCAQoCggEBAML1AIVjSSE/o+hg5jGSSK1gWq3TZTLqGQ75Csc+DrnoKztq +XBzmDcGW/S1cW9Yw+lEyGymc/4SzPIssrJu6y2ncv28kUMEzjftfwtc1zidKaTYk +wIAzksjNU/lP9UC8cq6pDyqztBr9RBHSmtxSIVQHeV5sZij+Rdluz5EUlsm6O0sd +EqHsa7M8rnxQmArHy+sYHEpe1hCN+AnGZtuL1xjUXDNjth5qrTUW6E30mLh7D6y5 +YRbR+EO9YDnXaA7nAxpSUXyu2+IrNdxYXdy6Fee1Owb461ufVql5msBlkPBQylQa +4RcXZdRE10elELVEp38IJP4/HRodO157yw7UsScCAwEAAaOBwjCBvzAdBgNVHQ4E +FgQUWBF2Mby1FPlEApc+3XgC0ykmsgYwDwYDVR0TAQH/BAUwAwEB/zB9BgNVHSME +djB0gBRYEXYxvLUU+UQClz7deALTKSayBqFGpEQwQjELMAkGA1UEBhMCVUsxEjAQ +BgNVBAoMCUNvbmZsdWVudDEPMA0GA1UEBwwGTG9uZG9uMQ4wDAYDVQQDDAVrYWZr +YYIUVKC1c251XQI6xQOSk8TzVd02nV0wDgYDVR0PAQH/BAQDAgEGMA0GCSqGSIb3 +DQEBCwUAA4IBAQC+nqrlZYH/604R2GtO0tPyABnuP8mc+dV7btatkGjHlY/Lx07T +tfxva3RSNHPh+yoSvwHriLCYBwX9lChAYc+l7ACbtPerbIHMm6YNkVKBRplXCO66 +BHv8xXOs9oBgAsgTIvfznbH/hNJrY0A6I4M2tVOvcrmxzZT4oZJwEqy7B3gu2y0A +b2QcQzNnqXvnyGG91b+CBHvU/XcvIuxQysct31banEkIc+nTl+rFGsN2phJz+J+h +i8jmY2AixD++ZFPB5/4msaa6o6x4QFc5bvf92BDuJTEyKt+dM9rhZvtU1plnKmZq +89xtHAZltnhjC7AU5nia+hCd8hHlXasjljoe +-----END CERTIFICATE----- diff --git a/pubsub/kafkashopifysarama/config/test.crt b/pubsub/kafkashopifysarama/config/test.crt new file mode 100644 index 0000000..fa52174 --- /dev/null +++ b/pubsub/kafkashopifysarama/config/test.crt @@ -0,0 +1,23 @@ +-----BEGIN CERTIFICATE----- +MIID0zCCArugAwIBAgIUeHl9LEIwVLvdrQ1Qk/veZPdQ9yUwDQYJKoZIhvcNAQEL +BQAwQjELMAkGA1UEBhMCVUsxEjAQBgNVBAoMCUNvbmZsdWVudDEPMA0GA1UEBwwG +TG9uZG9uMQ4wDAYDVQQDDAVrYWZrYTAgFw0yMzA3MjgxMjA4NDNaGA8yMTIzMDcw +NDEyMDg0M1owQjELMAkGA1UEBhMCVUsxEjAQBgNVBAoMCUNvbmZsdWVudDEPMA0G +A1UEBwwGTG9uZG9uMQ4wDAYDVQQDDAVrYWZrYTCCASIwDQYJKoZIhvcNAQEBBQAD +ggEPADCCAQoCggEBAKiOi8SsfcqZl7A80ECjWFdT+RX4ufc2CtB+0WDRg9tnRh/M +dh7+IX5Y+ILPJ2kYwNnPM+wjBpLXmXX7/aqMgmDe6v3ikRKXh+VMWNMaprJhCldD +8YpCv0tPIt1H0BF6vcku7pn4hsAkbgj0QCzywmz94KlFqtjRiipg3KJoOmrR0YYv +XAyqfefQwFzzRFkp71QRYWJMdJOZ0iibjvj1Fj6rLY3CZzTCZpQdyh9DxiHLgLLz +0BLxZf47MznnE9eofLXsmhS3IJ+/QPeM12mlX4wAHLBLTq4d8Xhp0ds1km4E5dML +2JCdxSFTcfL+ww1X/W0c8dS0m85H/9SLsmbjaa0CAwEAAaOBvjCBuzAdBgNVHQ4E +FgQUZChhFb324S1UADhefq8CfUOGlPYwCQYDVR0TBAIwADAsBglghkgBhvhCAQ0E +HxYdT3BlblNTTCBHZW5lcmF0ZWQgQ2VydGlmaWNhdGUwDgYDVR0PAQH/BAQDAgWg +MBMGA1UdJQQMMAoGCCsGAQUFBwMCMBsGA1UdEQQUMBKCBWthZmthgglsb2NhbGhv +c3QwHwYDVR0jBBgwFoAUWBF2Mby1FPlEApc+3XgC0ykmsgYwDQYJKoZIhvcNAQEL +BQADggEBADxxxKrKVhTKZulsgm1uYaBT7K6NHJrWRKmwXCKPJvDj4Qhw+gwS+FIG +dPcjxALBXYpc2mI5HNgEpGe1FAEryVx/2bI4pN7xkVj2Sso8v7VFIY+Q1gCoyKIQ +Ff7lo+nP4byRNrzk5ffMZHC7CYu9IHNzTPsFWS8A9ieOs8Fwn0kY5Zjbq6qj6HjV +x+xA41Zo3JAxHaT0I6V6d64+adUBDyNYFl10d6FoJTTRVv25h3mInyQrZIMfzOwZ +svGSNWnFxVyyyIlMqIFAPIL+uDR4t76XzYDigo6ox4lZOAPad2f1zd/Ge4kEHvc1 +i5IrvYvVc7iJVuchgoxqH6hrb1VsJNI= +-----END CERTIFICATE----- diff --git a/pubsub/kafkashopifysarama/config/test.key b/pubsub/kafkashopifysarama/config/test.key new file mode 100644 index 0000000..7999d6e --- /dev/null +++ b/pubsub/kafkashopifysarama/config/test.key @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCojovErH3KmZew +PNBAo1hXU/kV+Ln3NgrQftFg0YPbZ0YfzHYe/iF+WPiCzydpGMDZzzPsIwaS15l1 ++/2qjIJg3ur94pESl4flTFjTGqayYQpXQ/GKQr9LTyLdR9ARer3JLu6Z+IbAJG4I +9EAs8sJs/eCpRarY0YoqYNyiaDpq0dGGL1wMqn3n0MBc80RZKe9UEWFiTHSTmdIo +m4749RY+qy2Nwmc0wmaUHcofQ8Yhy4Cy89AS8WX+OzM55xPXqHy17JoUtyCfv0D3 +jNdppV+MABywS06uHfF4adHbNZJuBOXTC9iQncUhU3Hy/sMNV/1tHPHUtJvOR//U +i7Jm42mtAgMBAAECggEAEVnVGOqBq2qaowVrkY/O/vRRsnmPWRuKyvIXBw6iMmJj +5U+UhDW4SGuLr1l4Ltkhtvodl+v2GAUSEsNdFiKPJD4mNfrN4LpQ/qFYjmiSqs+S +8y83DLq4ut2bUuh6ymXWZgCLBrX+1wYY/pp4Bbh6m6IZr4sQnm+Zd6nPZHdytf6W +nqHZDpnq206JffRA7BbKfHMxi1Rs3jwdKTfHeMktaJMtCagJz8jeECLRYoNkbVPT +N/rN4sHYXn23DBN5plEikIdWm5no5v8/BsA1N/jN48cPjmC1byKyjnlzOLbNT8xm +kI/c+zFR4/2y/SEiaCv8ZHjbBwWSWoft7qvq++170QKBgQDs1LNv00JBsXvuyiMZ ++xRRcuKzV4jKlfplJQW1rdoyOfnwu70ZpSnHD3rcHoJMenwrxYReUpqg/6snRKW1 +KRHIRpLhpROoxVhTOTy3z2LtzMqTR7nYWnffZkSKqptiEZl5niVN9ItJAJBSRrPs +UrhQr4d9sBodqfx5iv9CMuaWvQKBgQC2MyjjNEt2ps9u3To2jfJN3mLsceymFejW +mTQDo8gSzpE0vnmhd+3vC6DBpjkARY7t3u3hH5sKdmyyl1024iq2Tp/29MGIFDpI +e/tyI9Xm8985HWxPS3ALtU7Z4JSlzZ3x4Ft7pZ0rJZGOY9GAeStRO6JkxQVMoGPU +vSjqlomFsQKBgBW29vDk5OlTFbLyU7+ZFubU6tZYy1EP6VKGz3w2AZCjYjhhblhA +nZED6VbvcTED9gipZpajakwixRWnpK30ow3C8sq/sQrDdXLEB74uxLpbEaPpaq/c +s6sHHHe+ZtraFEFjb7YzGuZJp/HzS6H6f63eOkUa9XoM0Ppv9TGjqyLdAoGBAKxZ +lLHoBAKHJO2wY6K7f5vdZCJaWgt56jklzygqQ9ZWhNEp5RCyy+Y2T6kfPghdxCiL +muY76YNqJsSBnnGOW+z8TyFiwbehushaT67W6z5/Lodup8gSijjpF1/Oq450BJaL +Lr62GQh5j0jsb39iH3HGQYWlQbAMgKL7FLtkD07hAoGBAODdehEn2naDoAMwAE8p +42f2sK0akkMREV29iYvfq7jdj8vsb1RN0fpzfuDQ7NwGCkE052p9OeTTPpkTYI09 +2hgeEoj2I4YOeLfRaiDUTTKpsrM+LOcKca/HyqzxI3WAWjvc0oI0fsorpjLeJnJ4 +KPziycCfPAlwNOHn95+yiPVl +-----END PRIVATE KEY----- diff --git a/pubsub/kafkashopifysarama/kafkashopifysarama_publisher.go b/pubsub/kafkashopifysarama/kafkashopifysarama_publisher.go new file mode 100644 index 0000000..6fb6245 --- /dev/null +++ b/pubsub/kafkashopifysarama/kafkashopifysarama_publisher.go @@ -0,0 +1,79 @@ +package kafkashopifysarama + +import ( + "fmt" + "log/slog" + "time" + + "github.com/Shopify/sarama" + "github.com/purposeinplay/go-commons/pubsub" +) + +var _ pubsub.Publisher[string, []byte] = (*Publisher)(nil) + +// Publisher represents a kafka publisher. +type Publisher struct { + logger *slog.Logger + syncProducer sarama.SyncProducer +} + +// NewPublisher creates a new kafka publisher. +func NewPublisher( + slogHandler slog.Handler, + saramaConfig *sarama.Config, + brokers []string, +) (*Publisher, error) { + cfg := saramaConfig + + if cfg == nil { + cfg = sarama.NewConfig() + + cfg.Producer.Retry.Max = 10 + cfg.Producer.Return.Successes = true + cfg.Metadata.Retry.Backoff = time.Second * 2 + } + + producer, err := sarama.NewSyncProducer(brokers, cfg) + if err != nil { + return nil, fmt.Errorf("new kafka publisher: %w", err) + } + + return &Publisher{ + logger: slog.New(slogHandler), + syncProducer: producer, + }, nil +} + +// Publish publishes an event to a kafka topic. +func (p Publisher) Publish( + event pubsub.Event[string, []byte], + channels ...string, +) error { + if len(channels) != 1 { + return pubsub.ErrExactlyOneChannelAllowed + } + + topic := channels[0] + + mes := &sarama.ProducerMessage{ + Topic: topic, + Headers: []sarama.RecordHeader{ + { + Key: []byte("type"), + Value: []byte(event.Type), + }, + }, + Value: sarama.ByteEncoder(event.Payload), + } + + if _, _, err := p.syncProducer.SendMessage(mes); err != nil { + return fmt.Errorf("publish: %w", err) + } + + return nil +} + +// Close closes the kafka publisher. +func (p Publisher) Close() error { + return p.syncProducer.Close() +} diff --git a/pubsub/kafkashopifysarama/kafkashopifysarama_server_test.go b/pubsub/kafkashopifysarama/kafkashopifysarama_server_test.go deleted file mode 100644 index 55b5e06..0000000 --- a/pubsub/kafkashopifysarama/kafkashopifysarama_server_test.go +++ /dev/null @@ -1,96 +0,0 @@ -package kafkashopifysarama_test - -import ( - "crypto/tls" - "fmt" - "testing" - - "github.com/Shopify/sarama" - "github.com/matryer/is" - "github.com/purposeinplay/go-commons/pubsub" -) - -type publisher struct { - asyncProducer sarama.AsyncProducer -} - -type kafkaServer struct { - publisher *publisher -} - -func initialize(tlsCfg *tls.Config, brokers []string) *kafkaServer { - kafkaCfg := sarama.NewConfig() - - kafkaCfg.Producer.Return.Successes = true - kafkaCfg.Net.TLS.Enable = true - kafkaCfg.Net.TLS.Config = tlsCfg - - publisher, err := newPublisher(kafkaCfg, brokers) - if err != nil { - panic(err) - } - - return &kafkaServer{ - publisher: publisher, - } -} - -func newPublisher( - saramaConfig *sarama.Config, - brokers []string, -) (*publisher, error) { - producer, err := sarama.NewAsyncProducer(brokers, saramaConfig) - if err != nil { - return nil, fmt.Errorf("new kafka publisher: %w", err) - } - - return &publisher{ - asyncProducer: producer, - }, nil -} - -// Publish publishes an event to a kafka topic. -func (p *publisher) Publish(event pubsub.Event[string, []byte], channels ...string) error { - if len(channels) != 1 { - return pubsub.ErrExactlyOneChannelAllowed - } - - topic := channels[0] - - mes := &sarama.ProducerMessage{ - Topic: topic, - Headers: []sarama.RecordHeader{ - { - Key: []byte("type"), - Value: []byte(event.Type), - }, - }, - Value: sarama.ByteEncoder(event.Payload), - } - - p.asyncProducer.Input() <- mes - - return nil -} - -// SendMessage sends a message to the Kafka consumer on the given topic. -func (ks *kafkaServer) SendMessage(t *testing.T, topic, msg string) { - t.Helper() - - i := is.New(t) - i.Helper() - - err := ks.publisher.Publish(pubsub.Event[string, []byte]{ - Type: "test", - Payload: []byte(msg), - }, topic) - - i.NoErr(err) - - t.Logf("Sent message to topic %s: %s", topic, msg) -} - -// Close closes the kafka publisher. -func (ks *kafkaServer) Close() error { - return ks.publisher.asyncProducer.Close() -} diff --git a/pubsub/kafkashopifysarama/kafkashopifysarama_subscriber.go b/pubsub/kafkashopifysarama/kafkashopifysarama_subscriber.go index 91c663a..adafb07 100644 --- a/pubsub/kafkashopifysarama/kafkashopifysarama_subscriber.go +++ b/pubsub/kafkashopifysarama/kafkashopifysarama_subscriber.go @@ -3,7 +3,6 @@ package kafkashopifysarama import ( "bytes" "context" - "crypto/tls" "fmt" "log/slog" "sync" @@ -26,33 +25,45 @@ type Subscriber struct { // NewSubscriber creates a new kafka subscriber. func NewSubscriber( slogHandler slog.Handler, - tlsConfig *tls.Config, + saramaConfig *sarama.Config, clientID string, + initialOffset int64, + autoCommit bool, sessionTimeoutMS int, heartbeatIntervalMS int, brokers []string, groupID string, ) (*Subscriber, error) { - kafkaCfg := NewTLSSubscriberConfig(tlsConfig) + cfg := saramaConfig - kafkaCfg.ClientID = clientID - kafkaCfg.Consumer.Offsets.Initial = sarama.OffsetNewest - kafkaCfg.Consumer.Offsets.AutoCommit.Enable = true - kafkaCfg.Consumer.Group.Session.Timeout = time.Duration(sessionTimeoutMS) * time.Millisecond - kafkaCfg.Consumer.Group.Heartbeat.Interval = time.Duration( + if cfg == nil { + cfg = sarama.NewConfig() + + cfg.Consumer.Return.Errors = true + } + + cfg.ClientID = clientID + cfg.Consumer.Offsets.Initial = initialOffset + cfg.Consumer.Offsets.AutoCommit.Enable = autoCommit + cfg.Consumer.Group.Session.Timeout = time.Duration( + sessionTimeoutMS, + ) * time.Millisecond + cfg.Consumer.Group.Heartbeat.Interval = time.Duration( heartbeatIntervalMS, ) * time.Millisecond return &Subscriber{ logger: slog.New(slogHandler), - cfg: kafkaCfg, + cfg: cfg, brokers: brokers, consumerGroup: groupID, }, nil } // Subscribe creates a new subscription that runs in the background. -func (s Subscriber) Subscribe(channels ...string) (pubsub.Subscription[string, []byte], error) { +func (s Subscriber) Subscribe( + channels ...string, +) (pubsub.Subscription[string, []byte], error) { if len(channels) != 1 { return nil, pubsub.ErrExactlyOneChannelAllowed } @@ -100,11 +111,20 @@ func newSubscription( wg.Add(len(partitions)) for _, partition := range partitions { - partitionConsumer, err := consumer.ConsumePartition(topic, partition, sarama.OffsetNewest) + partitionConsumer, err := consumer.ConsumePartition( + topic, + partition, + sarama.OffsetNewest, + ) if err != nil { cancel() - return nil, fmt.Errorf("consume partition %d for topic %q: %w", partition, topic, err) + return nil, fmt.Errorf( + "consume partition %d for topic %q: %w", + partition, + topic, + err, + ) } go func() { diff --git a/pubsub/kafkashopifysarama/kafkashopifysarama_test.go b/pubsub/kafkashopifysarama/kafkashopifysarama_test.go index e93c45d..4791286 100644 --- a/pubsub/kafkashopifysarama/kafkashopifysarama_test.go +++ b/pubsub/kafkashopifysarama/kafkashopifysarama_test.go @@ -2,110 +2,260 @@ package kafkashopifysarama_test import ( "context" + "os" "sync" "testing" "time" + "github.com/Shopify/sarama" "github.com/matryer/is" + "github.com/purposeinplay/go-commons/pubsub" "github.com/purposeinplay/go-commons/pubsub/kafkashopifysarama" "go.uber.org/zap" "go.uber.org/zap/exp/zapslog" ) -func TestConsumerGroups(t *testing.T) { - // nolint: gocritic, revive - is := is.New(t) - +func TestPubSub(t *testing.T) { logger := zap.NewExample() + + i := is.New(t) + slogHandler := zapslog.NewHandler(logger.Core(), nil) - topic := "test" - ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) - defer cancel() + var ( + clientID = "test_client_id" + initialOffset = sarama.OffsetNewest + autoCommit = true + sessionTimeoutMS = 45000 + heartbeatIntervalMS = 15000 + brokers = []string{os.Getenv("KAFKA_BROKER_URL")} + groupID = "test_groupd_id" + topic = "test_topic" + ) + + cfg1, err := kafkashopifysarama.NewTLSSubscriberConfig( + "./config/test.crt", + "./config/test.key", + "./config/ca_test.crt", + ) + i.NoErr(err) + + suber1, err := kafkashopifysarama.NewSubscriber( + slogHandler, + cfg1, + clientID, + initialOffset, + autoCommit, + sessionTimeoutMS, + heartbeatIntervalMS, + brokers, + groupID, + ) + i.NoErr(err) - tlsCfg, err := kafkashopifysarama.LoadTLSConfig( + pubCfg, err := kafkashopifysarama.NewTLSPublisherConfig( "./config/test.crt", "./config/test.key", "./config/ca_test.crt", ) - is.NoErr(err) + i.NoErr(err) + + pub, err := kafkashopifysarama.NewPublisher( + slogHandler, + pubCfg, + brokers, + ) + i.NoErr(err) + + t.Cleanup(func() { i.NoErr(pub.Close()) }) + + sub1, err := suber1.Subscribe(topic) + i.NoErr(err) + + t.Cleanup(func() { i.NoErr(sub1.Close()) }) + + sub2, err := suber1.Subscribe(topic) + i.NoErr(err) + + t.Cleanup(func() { i.NoErr(sub2.Close()) }) + + mes := pubsub.Event[string, []byte]{ + Type: "test", + Payload: []byte("test_payload"), + } + + err = pub.Publish(mes, topic) + i.NoErr(err) + + var wg sync.WaitGroup + + wg.Add(2) + + now := time.Now() + + go func() { + defer wg.Done() + + receivedMes := <-sub1.C() + i.Equal(receivedMes, mes) + + t.Logf("sub1 received the message in %s", time.Since(now)) + }() + + go func() { + defer wg.Done() + + receivedMes := <-sub2.C() + i.Equal(receivedMes, mes) + + t.Logf("sub2 received the message in %s", time.Since(now)) + }() + + done := make(chan struct{}) + + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("timeout") + } +} + +func TestConsumerGroups(t *testing.T) { + logger := zap.NewExample() + + i := is.New(t) + + slogHandler := zapslog.NewHandler(logger.Core(), nil) var ( clientID = "test_client_id" + initialOffset = sarama.OffsetNewest + autoCommit = true sessionTimeoutMS = 45000 heartbeatIntervalMS = 15000 - brokers = []string{"localhost:9092"} + brokers = []string{os.Getenv("KAFKA_BROKER_URL")} groupID = "test_groupd_id" + topic = "test_topic" + ) + + ctx, cancel := context.WithCancel(context.Background()) + + cfg1, err := kafkashopifysarama.NewTLSSubscriberConfig( + "./config/test.crt", + "./config/test.key", + "./config/ca_test.crt", ) + i.NoErr(err) suber1, err := kafkashopifysarama.NewSubscriber( slogHandler, - tlsCfg, + cfg1, clientID, + initialOffset, + autoCommit, sessionTimeoutMS, heartbeatIntervalMS, brokers, groupID, ) - is.NoErr(err) + i.NoErr(err) + + mes := pubsub.Event[string, []byte]{ + Type: "test", + Payload: []byte("test_payload"), + } + + sub1, err := suber1.Subscribe(topic) + i.NoErr(err) + + t.Cleanup(func() { i.NoErr(sub1.Close()) }) + + var wg sync.WaitGroup + + wg.Add(2) + + go func() { + defer wg.Done() + + for { + select { + case receivedMes := <-sub1.C(): + i.Equal(receivedMes, mes) + + t.Logf("sub 1: %s", mes.Payload) + case <-ctx.Done(): + return + } + } + }() + + cfg2, err := kafkashopifysarama.NewTLSSubscriberConfig( + "./config/test.crt", + "./config/test.key", + "./config/ca_test.crt", + ) + i.NoErr(err) suber2, err := kafkashopifysarama.NewSubscriber( slogHandler, - tlsCfg, + cfg2, clientID, + initialOffset, + autoCommit, sessionTimeoutMS, heartbeatIntervalMS, brokers, groupID, ) - is.NoErr(err) - - var wg sync.WaitGroup + i.NoErr(err) - wg.Add(2) + sub2, err := suber2.Subscribe(topic) + i.NoErr(err) - startSubscriber := func(t *testing.T, subscriber *kafkashopifysarama.Subscriber) { - t.Helper() + t.Cleanup(func() { i.NoErr(sub2.Close()) }) + go func() { defer wg.Done() - //nolint: contextcheck // The subscription is closed in the test - // cleanup. - sub, err := subscriber.Subscribe(topic) - is.NoErr(err) - for { select { - case mes := <-sub.C(): - is.Equal(mes.Type, "test") - is.Equal(string(mes.Payload), "test message") - mes.Ack() + case receivedMes := <-sub2.C(): + i.Equal(receivedMes, mes) - return + t.Logf("sub 2: %s", mes.Payload) case <-ctx.Done(): - is.NoErr(ctx.Err()) return } } - } + }() + + pubCfg, err := kafkashopifysarama.NewTLSPublisherConfig( + "./config/test.crt", + "./config/test.key", + "./config/ca_test.crt", + ) + i.NoErr(err) + + pub, err := kafkashopifysarama.NewPublisher( + slogHandler, + pubCfg, + brokers, + ) + i.NoErr(err) - // Start both subscribers in separate goroutines - go startSubscriber(t, suber1) - go startSubscriber(t, suber2) + t.Cleanup(func() { i.NoErr(pub.Close()) }) - // Initialize Kafka server and send message - kafkaServer := initialize(tlsCfg, brokers) + err = pub.Publish(mes, topic) + i.NoErr(err) - time.Sleep(17 * time.Second) - kafkaServer.SendMessage(t, "test", "test message") - kafkaServer.SendMessage(t, "test", "test message") + time.Sleep(5 * time.Second) - t.Cleanup(func() { - // is.NoErr(consumerGroup1.Close()) - // is.NoErr(consumerGroup2.Close()) - is.NoErr(kafkaServer.Close()) - }) + cancel() - // Wait for all goroutines to finish wg.Wait() } diff --git a/pubsub/kafkashopifysarama/kafkashopifysarama_tls.go b/pubsub/kafkashopifysarama/kafkashopifysarama_tls.go index be2fbb1..6e4675c 100644 --- a/pubsub/kafkashopifysarama/kafkashopifysarama_tls.go +++ b/pubsub/kafkashopifysarama/kafkashopifysarama_tls.go @@ -5,23 +5,40 @@ import ( "crypto/x509" "fmt" "os" + "time" "github.com/Shopify/sarama" ) // NewTLSSubscriberConfig creates a new kafka subscriber config with TLS // authentication. -func NewTLSSubscriberConfig(tlsCfg *tls.Config) *sarama.Config { +func NewTLSSubscriberConfig( + cerfFilePath, keyFilePath, caFilePath string, +) (*sarama.Config, error) { cfg := sarama.NewConfig() + cfg.Consumer.Return.Errors = true - cfg.Net.TLS.Enable = true - cfg.Net.TLS.Config = tlsCfg + return loadTLSConfig(cfg, cerfFilePath, keyFilePath, caFilePath) +} + +// NewTLSPublisherConfig creates a new kafka publisher config with TLS +// authentication. +func NewTLSPublisherConfig( + cerfFilePath, keyFilePath, caFilePath string, +) (*sarama.Config, error) { + cfg := sarama.NewConfig() + + cfg.Producer.Retry.Max = 10 + cfg.Producer.Return.Successes = true + cfg.Metadata.Retry.Backoff = time.Second * 2 - return cfg + return loadTLSConfig(cfg, cerfFilePath, keyFilePath, caFilePath) } -// LoadTLSConfig loads the TLS config from the given folder. -func LoadTLSConfig(cerfFilePath, keyFilePath, caFilePath string) (*tls.Config, error) { +func loadTLSConfig( + cfg *sarama.Config, + cerfFilePath, keyFilePath, caFilePath string, +) (*sarama.Config, error) { // Load client cert cert, err := tls.LoadX509KeyPair( cerfFilePath, @@ -48,5 +65,8 @@ func LoadTLSConfig(cerfFilePath, keyFilePath, caFilePath string) (*tls.Config, e InsecureSkipVerify: false, } - return tlsCfg, nil + cfg.Net.TLS.Enable = true + cfg.Net.TLS.Config = tlsCfg + + return cfg, nil }