Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Made kafkashopifysarama more similar to kafkasarama and added the missing publisher #64

Merged
merged 2 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions pubsub/kafkashopifysarama/config/ca_test.crt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
-----BEGIN CERTIFICATE-----
MIID1zCCAr+gAwIBAgIUVKC1c251XQI6xQOSk8TzVd02nV0wDQYJKoZIhvcNAQEL
BQAwQjELMAkGA1UEBhMCVUsxEjAQBgNVBAoMCUNvbmZsdWVudDEPMA0GA1UEBwwG
TG9uZG9uMQ4wDAYDVQQDDAVrYWZrYTAgFw0yMzA3MjgxMjA4NDNaGA8yMTIzMDcw
NDEyMDg0M1owQjELMAkGA1UEBhMCVUsxEjAQBgNVBAoMCUNvbmZsdWVudDEPMA0G
A1UEBwwGTG9uZG9uMQ4wDAYDVQQDDAVrYWZrYTCCASIwDQYJKoZIhvcNAQEBBQAD
ggEPADCCAQoCggEBAML1AIVjSSE/o+hg5jGSSK1gWq3TZTLqGQ75Csc+DrnoKztq
XBzmDcGW/S1cW9Yw+lEyGymc/4SzPIssrJu6y2ncv28kUMEzjftfwtc1zidKaTYk
wIAzksjNU/lP9UC8cq6pDyqztBr9RBHSmtxSIVQHeV5sZij+Rdluz5EUlsm6O0sd
EqHsa7M8rnxQmArHy+sYHEpe1hCN+AnGZtuL1xjUXDNjth5qrTUW6E30mLh7D6y5
YRbR+EO9YDnXaA7nAxpSUXyu2+IrNdxYXdy6Fee1Owb461ufVql5msBlkPBQylQa
4RcXZdRE10elELVEp38IJP4/HRodO157yw7UsScCAwEAAaOBwjCBvzAdBgNVHQ4E
FgQUWBF2Mby1FPlEApc+3XgC0ykmsgYwDwYDVR0TAQH/BAUwAwEB/zB9BgNVHSME
djB0gBRYEXYxvLUU+UQClz7deALTKSayBqFGpEQwQjELMAkGA1UEBhMCVUsxEjAQ
BgNVBAoMCUNvbmZsdWVudDEPMA0GA1UEBwwGTG9uZG9uMQ4wDAYDVQQDDAVrYWZr
YYIUVKC1c251XQI6xQOSk8TzVd02nV0wDgYDVR0PAQH/BAQDAgEGMA0GCSqGSIb3
DQEBCwUAA4IBAQC+nqrlZYH/604R2GtO0tPyABnuP8mc+dV7btatkGjHlY/Lx07T
tfxva3RSNHPh+yoSvwHriLCYBwX9lChAYc+l7ACbtPerbIHMm6YNkVKBRplXCO66
BHv8xXOs9oBgAsgTIvfznbH/hNJrY0A6I4M2tVOvcrmxzZT4oZJwEqy7B3gu2y0A
b2QcQzNnqXvnyGG91b+CBHvU/XcvIuxQysct31banEkIc+nTl+rFGsN2phJz+J+h
i8jmY2AixD++ZFPB5/4msaa6o6x4QFc5bvf92BDuJTEyKt+dM9rhZvtU1plnKmZq
89xtHAZltnhjC7AU5nia+hCd8hHlXasjljoe
-----END CERTIFICATE-----
23 changes: 23 additions & 0 deletions pubsub/kafkashopifysarama/config/test.crt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
-----BEGIN CERTIFICATE-----
MIID0zCCArugAwIBAgIUeHl9LEIwVLvdrQ1Qk/veZPdQ9yUwDQYJKoZIhvcNAQEL
BQAwQjELMAkGA1UEBhMCVUsxEjAQBgNVBAoMCUNvbmZsdWVudDEPMA0GA1UEBwwG
TG9uZG9uMQ4wDAYDVQQDDAVrYWZrYTAgFw0yMzA3MjgxMjA4NDNaGA8yMTIzMDcw
NDEyMDg0M1owQjELMAkGA1UEBhMCVUsxEjAQBgNVBAoMCUNvbmZsdWVudDEPMA0G
A1UEBwwGTG9uZG9uMQ4wDAYDVQQDDAVrYWZrYTCCASIwDQYJKoZIhvcNAQEBBQAD
ggEPADCCAQoCggEBAKiOi8SsfcqZl7A80ECjWFdT+RX4ufc2CtB+0WDRg9tnRh/M
dh7+IX5Y+ILPJ2kYwNnPM+wjBpLXmXX7/aqMgmDe6v3ikRKXh+VMWNMaprJhCldD
8YpCv0tPIt1H0BF6vcku7pn4hsAkbgj0QCzywmz94KlFqtjRiipg3KJoOmrR0YYv
XAyqfefQwFzzRFkp71QRYWJMdJOZ0iibjvj1Fj6rLY3CZzTCZpQdyh9DxiHLgLLz
0BLxZf47MznnE9eofLXsmhS3IJ+/QPeM12mlX4wAHLBLTq4d8Xhp0ds1km4E5dML
2JCdxSFTcfL+ww1X/W0c8dS0m85H/9SLsmbjaa0CAwEAAaOBvjCBuzAdBgNVHQ4E
FgQUZChhFb324S1UADhefq8CfUOGlPYwCQYDVR0TBAIwADAsBglghkgBhvhCAQ0E
HxYdT3BlblNTTCBHZW5lcmF0ZWQgQ2VydGlmaWNhdGUwDgYDVR0PAQH/BAQDAgWg
MBMGA1UdJQQMMAoGCCsGAQUFBwMCMBsGA1UdEQQUMBKCBWthZmthgglsb2NhbGhv
c3QwHwYDVR0jBBgwFoAUWBF2Mby1FPlEApc+3XgC0ykmsgYwDQYJKoZIhvcNAQEL
BQADggEBADxxxKrKVhTKZulsgm1uYaBT7K6NHJrWRKmwXCKPJvDj4Qhw+gwS+FIG
dPcjxALBXYpc2mI5HNgEpGe1FAEryVx/2bI4pN7xkVj2Sso8v7VFIY+Q1gCoyKIQ
Ff7lo+nP4byRNrzk5ffMZHC7CYu9IHNzTPsFWS8A9ieOs8Fwn0kY5Zjbq6qj6HjV
x+xA41Zo3JAxHaT0I6V6d64+adUBDyNYFl10d6FoJTTRVv25h3mInyQrZIMfzOwZ
svGSNWnFxVyyyIlMqIFAPIL+uDR4t76XzYDigo6ox4lZOAPad2f1zd/Ge4kEHvc1
i5IrvYvVc7iJVuchgoxqH6hrb1VsJNI=
-----END CERTIFICATE-----
28 changes: 28 additions & 0 deletions pubsub/kafkashopifysarama/config/test.key
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
-----BEGIN PRIVATE KEY-----
MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCojovErH3KmZew
PNBAo1hXU/kV+Ln3NgrQftFg0YPbZ0YfzHYe/iF+WPiCzydpGMDZzzPsIwaS15l1
+/2qjIJg3ur94pESl4flTFjTGqayYQpXQ/GKQr9LTyLdR9ARer3JLu6Z+IbAJG4I
9EAs8sJs/eCpRarY0YoqYNyiaDpq0dGGL1wMqn3n0MBc80RZKe9UEWFiTHSTmdIo
m4749RY+qy2Nwmc0wmaUHcofQ8Yhy4Cy89AS8WX+OzM55xPXqHy17JoUtyCfv0D3
jNdppV+MABywS06uHfF4adHbNZJuBOXTC9iQncUhU3Hy/sMNV/1tHPHUtJvOR//U
i7Jm42mtAgMBAAECggEAEVnVGOqBq2qaowVrkY/O/vRRsnmPWRuKyvIXBw6iMmJj
5U+UhDW4SGuLr1l4Ltkhtvodl+v2GAUSEsNdFiKPJD4mNfrN4LpQ/qFYjmiSqs+S
8y83DLq4ut2bUuh6ymXWZgCLBrX+1wYY/pp4Bbh6m6IZr4sQnm+Zd6nPZHdytf6W
nqHZDpnq206JffRA7BbKfHMxi1Rs3jwdKTfHeMktaJMtCagJz8jeECLRYoNkbVPT
N/rN4sHYXn23DBN5plEikIdWm5no5v8/BsA1N/jN48cPjmC1byKyjnlzOLbNT8xm
kI/c+zFR4/2y/SEiaCv8ZHjbBwWSWoft7qvq++170QKBgQDs1LNv00JBsXvuyiMZ
+xRRcuKzV4jKlfplJQW1rdoyOfnwu70ZpSnHD3rcHoJMenwrxYReUpqg/6snRKW1
KRHIRpLhpROoxVhTOTy3z2LtzMqTR7nYWnffZkSKqptiEZl5niVN9ItJAJBSRrPs
UrhQr4d9sBodqfx5iv9CMuaWvQKBgQC2MyjjNEt2ps9u3To2jfJN3mLsceymFejW
mTQDo8gSzpE0vnmhd+3vC6DBpjkARY7t3u3hH5sKdmyyl1024iq2Tp/29MGIFDpI
e/tyI9Xm8985HWxPS3ALtU7Z4JSlzZ3x4Ft7pZ0rJZGOY9GAeStRO6JkxQVMoGPU
vSjqlomFsQKBgBW29vDk5OlTFbLyU7+ZFubU6tZYy1EP6VKGz3w2AZCjYjhhblhA
nZED6VbvcTED9gipZpajakwixRWnpK30ow3C8sq/sQrDdXLEB74uxLpbEaPpaq/c
s6sHHHe+ZtraFEFjb7YzGuZJp/HzS6H6f63eOkUa9XoM0Ppv9TGjqyLdAoGBAKxZ
lLHoBAKHJO2wY6K7f5vdZCJaWgt56jklzygqQ9ZWhNEp5RCyy+Y2T6kfPghdxCiL
muY76YNqJsSBnnGOW+z8TyFiwbehushaT67W6z5/Lodup8gSijjpF1/Oq450BJaL
Lr62GQh5j0jsb39iH3HGQYWlQbAMgKL7FLtkD07hAoGBAODdehEn2naDoAMwAE8p
42f2sK0akkMREV29iYvfq7jdj8vsb1RN0fpzfuDQ7NwGCkE052p9OeTTPpkTYI09
2hgeEoj2I4YOeLfRaiDUTTKpsrM+LOcKca/HyqzxI3WAWjvc0oI0fsorpjLeJnJ4
KPziycCfPAlwNOHn95+yiPVl
-----END PRIVATE KEY-----
79 changes: 79 additions & 0 deletions pubsub/kafkashopifysarama/kafkashopifysarama_publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package kafkashopifysarama

import (
"fmt"
"log/slog"
"time"

"github.com/Shopify/sarama"
"github.com/purposeinplay/go-commons/pubsub"
)

var _ pubsub.Publisher[string, []byte] = (*Publisher)(nil)

// Publisher represents a kafka publisher.
type Publisher struct {
logger *slog.Logger
syncProducer sarama.SyncProducer
}

// NewPublisher creates a new kafka publisher.
func NewPublisher(
slogHandler slog.Handler,
saramaConfig *sarama.Config,
brokers []string,
) (*Publisher, error) {
cfg := saramaConfig

if cfg == nil {
cfg = sarama.NewConfig()

cfg.Producer.Retry.Max = 10
cfg.Producer.Return.Successes = true
cfg.Metadata.Retry.Backoff = time.Second * 2
}

producer, err := sarama.NewSyncProducer(brokers, cfg)
if err != nil {
return nil, fmt.Errorf("new kafka publisher: %w", err)
}

return &Publisher{
logger: slog.New(slogHandler),
syncProducer: producer,
}, nil
}

// Publish publishes an event to a kafka topic.
func (p Publisher) Publish(
event pubsub.Event[string, []byte],
channels ...string,
) error {
if len(channels) != 1 {
return pubsub.ErrExactlyOneChannelAllowed
}

topic := channels[0]

mes := &sarama.ProducerMessage{
Topic: topic,
Headers: []sarama.RecordHeader{
{
Key: []byte("type"),
Value: []byte(event.Type),
},
},
Value: sarama.ByteEncoder(event.Payload),
}

if _, _, err := p.syncProducer.SendMessage(mes); err != nil {
return fmt.Errorf("publish: %w", err)
}

return nil
}

// Close closes the kafka publisher.
func (p Publisher) Close() error {
return p.syncProducer.Close()
}
96 changes: 0 additions & 96 deletions pubsub/kafkashopifysarama/kafkashopifysarama_server_test.go

This file was deleted.

44 changes: 32 additions & 12 deletions pubsub/kafkashopifysarama/kafkashopifysarama_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package kafkashopifysarama
import (
"bytes"
"context"
"crypto/tls"
"fmt"
"log/slog"
"sync"
Expand All @@ -26,33 +25,45 @@ type Subscriber struct {
// NewSubscriber creates a new kafka subscriber.
func NewSubscriber(
slogHandler slog.Handler,
tlsConfig *tls.Config,
saramaConfig *sarama.Config,
clientID string,
initialOffset int64,
autoCommit bool,
sessionTimeoutMS int,
heartbeatIntervalMS int,
brokers []string,
groupID string,
) (*Subscriber, error) {
kafkaCfg := NewTLSSubscriberConfig(tlsConfig)
cfg := saramaConfig

kafkaCfg.ClientID = clientID
kafkaCfg.Consumer.Offsets.Initial = sarama.OffsetNewest
kafkaCfg.Consumer.Offsets.AutoCommit.Enable = true
kafkaCfg.Consumer.Group.Session.Timeout = time.Duration(sessionTimeoutMS) * time.Millisecond
kafkaCfg.Consumer.Group.Heartbeat.Interval = time.Duration(
if cfg == nil {
cfg = sarama.NewConfig()

cfg.Consumer.Return.Errors = true
}

cfg.ClientID = clientID
cfg.Consumer.Offsets.Initial = initialOffset
cfg.Consumer.Offsets.AutoCommit.Enable = autoCommit
cfg.Consumer.Group.Session.Timeout = time.Duration(
sessionTimeoutMS,
) * time.Millisecond
cfg.Consumer.Group.Heartbeat.Interval = time.Duration(
heartbeatIntervalMS,
) * time.Millisecond

return &Subscriber{
logger: slog.New(slogHandler),
cfg: kafkaCfg,
cfg: cfg,
brokers: brokers,
consumerGroup: groupID,
}, nil
}

// Subscribe creates a new subscription that runs in the background.
func (s Subscriber) Subscribe(channels ...string) (pubsub.Subscription[string, []byte], error) {
func (s Subscriber) Subscribe(
channels ...string,
) (pubsub.Subscription[string, []byte], error) {
if len(channels) != 1 {
return nil, pubsub.ErrExactlyOneChannelAllowed
}
Expand Down Expand Up @@ -100,11 +111,20 @@ func newSubscription(
wg.Add(len(partitions))

for _, partition := range partitions {
partitionConsumer, err := consumer.ConsumePartition(topic, partition, sarama.OffsetNewest)
partitionConsumer, err := consumer.ConsumePartition(
topic,
partition,
sarama.OffsetNewest,
)
if err != nil {
cancel()

return nil, fmt.Errorf("consume partition %d for topic %q: %w", partition, topic, err)
return nil, fmt.Errorf(
"consume partition %d for topic %q: %w",
partition,
topic,
err,
)
}

go func() {
Expand Down
Loading
Loading