Skip to content

Commit

Permalink
Merge pull request #64 from jorgepurposeinplay/main
Browse files Browse the repository at this point in the history
Made kafkashopifysarama more similar to kafkasarama and added the missing publisher
  • Loading branch information
jorgepurposeinplay authored Jan 16, 2025
2 parents a751dd8 + dd1f08e commit 2e419f8
Show file tree
Hide file tree
Showing 8 changed files with 407 additions and 160 deletions.
23 changes: 23 additions & 0 deletions pubsub/kafkashopifysarama/config/ca_test.crt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
-----BEGIN CERTIFICATE-----
MIID1zCCAr+gAwIBAgIUVKC1c251XQI6xQOSk8TzVd02nV0wDQYJKoZIhvcNAQEL
BQAwQjELMAkGA1UEBhMCVUsxEjAQBgNVBAoMCUNvbmZsdWVudDEPMA0GA1UEBwwG
TG9uZG9uMQ4wDAYDVQQDDAVrYWZrYTAgFw0yMzA3MjgxMjA4NDNaGA8yMTIzMDcw
NDEyMDg0M1owQjELMAkGA1UEBhMCVUsxEjAQBgNVBAoMCUNvbmZsdWVudDEPMA0G
A1UEBwwGTG9uZG9uMQ4wDAYDVQQDDAVrYWZrYTCCASIwDQYJKoZIhvcNAQEBBQAD
ggEPADCCAQoCggEBAML1AIVjSSE/o+hg5jGSSK1gWq3TZTLqGQ75Csc+DrnoKztq
XBzmDcGW/S1cW9Yw+lEyGymc/4SzPIssrJu6y2ncv28kUMEzjftfwtc1zidKaTYk
wIAzksjNU/lP9UC8cq6pDyqztBr9RBHSmtxSIVQHeV5sZij+Rdluz5EUlsm6O0sd
EqHsa7M8rnxQmArHy+sYHEpe1hCN+AnGZtuL1xjUXDNjth5qrTUW6E30mLh7D6y5
YRbR+EO9YDnXaA7nAxpSUXyu2+IrNdxYXdy6Fee1Owb461ufVql5msBlkPBQylQa
4RcXZdRE10elELVEp38IJP4/HRodO157yw7UsScCAwEAAaOBwjCBvzAdBgNVHQ4E
FgQUWBF2Mby1FPlEApc+3XgC0ykmsgYwDwYDVR0TAQH/BAUwAwEB/zB9BgNVHSME
djB0gBRYEXYxvLUU+UQClz7deALTKSayBqFGpEQwQjELMAkGA1UEBhMCVUsxEjAQ
BgNVBAoMCUNvbmZsdWVudDEPMA0GA1UEBwwGTG9uZG9uMQ4wDAYDVQQDDAVrYWZr
YYIUVKC1c251XQI6xQOSk8TzVd02nV0wDgYDVR0PAQH/BAQDAgEGMA0GCSqGSIb3
DQEBCwUAA4IBAQC+nqrlZYH/604R2GtO0tPyABnuP8mc+dV7btatkGjHlY/Lx07T
tfxva3RSNHPh+yoSvwHriLCYBwX9lChAYc+l7ACbtPerbIHMm6YNkVKBRplXCO66
BHv8xXOs9oBgAsgTIvfznbH/hNJrY0A6I4M2tVOvcrmxzZT4oZJwEqy7B3gu2y0A
b2QcQzNnqXvnyGG91b+CBHvU/XcvIuxQysct31banEkIc+nTl+rFGsN2phJz+J+h
i8jmY2AixD++ZFPB5/4msaa6o6x4QFc5bvf92BDuJTEyKt+dM9rhZvtU1plnKmZq
89xtHAZltnhjC7AU5nia+hCd8hHlXasjljoe
-----END CERTIFICATE-----
23 changes: 23 additions & 0 deletions pubsub/kafkashopifysarama/config/test.crt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
-----BEGIN CERTIFICATE-----
MIID0zCCArugAwIBAgIUeHl9LEIwVLvdrQ1Qk/veZPdQ9yUwDQYJKoZIhvcNAQEL
BQAwQjELMAkGA1UEBhMCVUsxEjAQBgNVBAoMCUNvbmZsdWVudDEPMA0GA1UEBwwG
TG9uZG9uMQ4wDAYDVQQDDAVrYWZrYTAgFw0yMzA3MjgxMjA4NDNaGA8yMTIzMDcw
NDEyMDg0M1owQjELMAkGA1UEBhMCVUsxEjAQBgNVBAoMCUNvbmZsdWVudDEPMA0G
A1UEBwwGTG9uZG9uMQ4wDAYDVQQDDAVrYWZrYTCCASIwDQYJKoZIhvcNAQEBBQAD
ggEPADCCAQoCggEBAKiOi8SsfcqZl7A80ECjWFdT+RX4ufc2CtB+0WDRg9tnRh/M
dh7+IX5Y+ILPJ2kYwNnPM+wjBpLXmXX7/aqMgmDe6v3ikRKXh+VMWNMaprJhCldD
8YpCv0tPIt1H0BF6vcku7pn4hsAkbgj0QCzywmz94KlFqtjRiipg3KJoOmrR0YYv
XAyqfefQwFzzRFkp71QRYWJMdJOZ0iibjvj1Fj6rLY3CZzTCZpQdyh9DxiHLgLLz
0BLxZf47MznnE9eofLXsmhS3IJ+/QPeM12mlX4wAHLBLTq4d8Xhp0ds1km4E5dML
2JCdxSFTcfL+ww1X/W0c8dS0m85H/9SLsmbjaa0CAwEAAaOBvjCBuzAdBgNVHQ4E
FgQUZChhFb324S1UADhefq8CfUOGlPYwCQYDVR0TBAIwADAsBglghkgBhvhCAQ0E
HxYdT3BlblNTTCBHZW5lcmF0ZWQgQ2VydGlmaWNhdGUwDgYDVR0PAQH/BAQDAgWg
MBMGA1UdJQQMMAoGCCsGAQUFBwMCMBsGA1UdEQQUMBKCBWthZmthgglsb2NhbGhv
c3QwHwYDVR0jBBgwFoAUWBF2Mby1FPlEApc+3XgC0ykmsgYwDQYJKoZIhvcNAQEL
BQADggEBADxxxKrKVhTKZulsgm1uYaBT7K6NHJrWRKmwXCKPJvDj4Qhw+gwS+FIG
dPcjxALBXYpc2mI5HNgEpGe1FAEryVx/2bI4pN7xkVj2Sso8v7VFIY+Q1gCoyKIQ
Ff7lo+nP4byRNrzk5ffMZHC7CYu9IHNzTPsFWS8A9ieOs8Fwn0kY5Zjbq6qj6HjV
x+xA41Zo3JAxHaT0I6V6d64+adUBDyNYFl10d6FoJTTRVv25h3mInyQrZIMfzOwZ
svGSNWnFxVyyyIlMqIFAPIL+uDR4t76XzYDigo6ox4lZOAPad2f1zd/Ge4kEHvc1
i5IrvYvVc7iJVuchgoxqH6hrb1VsJNI=
-----END CERTIFICATE-----
28 changes: 28 additions & 0 deletions pubsub/kafkashopifysarama/config/test.key
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
-----BEGIN PRIVATE KEY-----
MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCojovErH3KmZew
PNBAo1hXU/kV+Ln3NgrQftFg0YPbZ0YfzHYe/iF+WPiCzydpGMDZzzPsIwaS15l1
+/2qjIJg3ur94pESl4flTFjTGqayYQpXQ/GKQr9LTyLdR9ARer3JLu6Z+IbAJG4I
9EAs8sJs/eCpRarY0YoqYNyiaDpq0dGGL1wMqn3n0MBc80RZKe9UEWFiTHSTmdIo
m4749RY+qy2Nwmc0wmaUHcofQ8Yhy4Cy89AS8WX+OzM55xPXqHy17JoUtyCfv0D3
jNdppV+MABywS06uHfF4adHbNZJuBOXTC9iQncUhU3Hy/sMNV/1tHPHUtJvOR//U
i7Jm42mtAgMBAAECggEAEVnVGOqBq2qaowVrkY/O/vRRsnmPWRuKyvIXBw6iMmJj
5U+UhDW4SGuLr1l4Ltkhtvodl+v2GAUSEsNdFiKPJD4mNfrN4LpQ/qFYjmiSqs+S
8y83DLq4ut2bUuh6ymXWZgCLBrX+1wYY/pp4Bbh6m6IZr4sQnm+Zd6nPZHdytf6W
nqHZDpnq206JffRA7BbKfHMxi1Rs3jwdKTfHeMktaJMtCagJz8jeECLRYoNkbVPT
N/rN4sHYXn23DBN5plEikIdWm5no5v8/BsA1N/jN48cPjmC1byKyjnlzOLbNT8xm
kI/c+zFR4/2y/SEiaCv8ZHjbBwWSWoft7qvq++170QKBgQDs1LNv00JBsXvuyiMZ
+xRRcuKzV4jKlfplJQW1rdoyOfnwu70ZpSnHD3rcHoJMenwrxYReUpqg/6snRKW1
KRHIRpLhpROoxVhTOTy3z2LtzMqTR7nYWnffZkSKqptiEZl5niVN9ItJAJBSRrPs
UrhQr4d9sBodqfx5iv9CMuaWvQKBgQC2MyjjNEt2ps9u3To2jfJN3mLsceymFejW
mTQDo8gSzpE0vnmhd+3vC6DBpjkARY7t3u3hH5sKdmyyl1024iq2Tp/29MGIFDpI
e/tyI9Xm8985HWxPS3ALtU7Z4JSlzZ3x4Ft7pZ0rJZGOY9GAeStRO6JkxQVMoGPU
vSjqlomFsQKBgBW29vDk5OlTFbLyU7+ZFubU6tZYy1EP6VKGz3w2AZCjYjhhblhA
nZED6VbvcTED9gipZpajakwixRWnpK30ow3C8sq/sQrDdXLEB74uxLpbEaPpaq/c
s6sHHHe+ZtraFEFjb7YzGuZJp/HzS6H6f63eOkUa9XoM0Ppv9TGjqyLdAoGBAKxZ
lLHoBAKHJO2wY6K7f5vdZCJaWgt56jklzygqQ9ZWhNEp5RCyy+Y2T6kfPghdxCiL
muY76YNqJsSBnnGOW+z8TyFiwbehushaT67W6z5/Lodup8gSijjpF1/Oq450BJaL
Lr62GQh5j0jsb39iH3HGQYWlQbAMgKL7FLtkD07hAoGBAODdehEn2naDoAMwAE8p
42f2sK0akkMREV29iYvfq7jdj8vsb1RN0fpzfuDQ7NwGCkE052p9OeTTPpkTYI09
2hgeEoj2I4YOeLfRaiDUTTKpsrM+LOcKca/HyqzxI3WAWjvc0oI0fsorpjLeJnJ4
KPziycCfPAlwNOHn95+yiPVl
-----END PRIVATE KEY-----
79 changes: 79 additions & 0 deletions pubsub/kafkashopifysarama/kafkashopifysarama_publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package kafkashopifysarama

import (
"fmt"
"log/slog"
"time"

"github.com/Shopify/sarama"
"github.com/purposeinplay/go-commons/pubsub"
)

var _ pubsub.Publisher[string, []byte] = (*Publisher)(nil)

// Publisher represents a kafka publisher.
type Publisher struct {
logger *slog.Logger
syncProducer sarama.SyncProducer
}

// NewPublisher creates a new kafka publisher.
func NewPublisher(
slogHandler slog.Handler,
saramaConfig *sarama.Config,
brokers []string,
) (*Publisher, error) {
cfg := saramaConfig

if cfg == nil {
cfg = sarama.NewConfig()

cfg.Producer.Retry.Max = 10
cfg.Producer.Return.Successes = true
cfg.Metadata.Retry.Backoff = time.Second * 2
}

producer, err := sarama.NewSyncProducer(brokers, cfg)
if err != nil {
return nil, fmt.Errorf("new kafka publisher: %w", err)
}

return &Publisher{
logger: slog.New(slogHandler),
syncProducer: producer,
}, nil
}

// Publish publishes an event to a kafka topic.
func (p Publisher) Publish(
event pubsub.Event[string, []byte],
channels ...string,
) error {
if len(channels) != 1 {
return pubsub.ErrExactlyOneChannelAllowed
}

topic := channels[0]

mes := &sarama.ProducerMessage{
Topic: topic,
Headers: []sarama.RecordHeader{
{
Key: []byte("type"),
Value: []byte(event.Type),
},
},
Value: sarama.ByteEncoder(event.Payload),
}

if _, _, err := p.syncProducer.SendMessage(mes); err != nil {
return fmt.Errorf("publish: %w", err)
}

return nil
}

// Close closes the kafka publisher.
func (p Publisher) Close() error {
return p.syncProducer.Close()
}
96 changes: 0 additions & 96 deletions pubsub/kafkashopifysarama/kafkashopifysarama_server_test.go

This file was deleted.

44 changes: 32 additions & 12 deletions pubsub/kafkashopifysarama/kafkashopifysarama_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package kafkashopifysarama
import (
"bytes"
"context"
"crypto/tls"
"fmt"
"log/slog"
"sync"
Expand All @@ -26,33 +25,45 @@ type Subscriber struct {
// NewSubscriber creates a new kafka subscriber.
func NewSubscriber(
slogHandler slog.Handler,
tlsConfig *tls.Config,
saramaConfig *sarama.Config,
clientID string,
initialOffset int64,
autoCommit bool,
sessionTimeoutMS int,
heartbeatIntervalMS int,
brokers []string,
groupID string,
) (*Subscriber, error) {
kafkaCfg := NewTLSSubscriberConfig(tlsConfig)
cfg := saramaConfig

kafkaCfg.ClientID = clientID
kafkaCfg.Consumer.Offsets.Initial = sarama.OffsetNewest
kafkaCfg.Consumer.Offsets.AutoCommit.Enable = true
kafkaCfg.Consumer.Group.Session.Timeout = time.Duration(sessionTimeoutMS) * time.Millisecond
kafkaCfg.Consumer.Group.Heartbeat.Interval = time.Duration(
if cfg == nil {
cfg = sarama.NewConfig()

cfg.Consumer.Return.Errors = true
}

cfg.ClientID = clientID
cfg.Consumer.Offsets.Initial = initialOffset
cfg.Consumer.Offsets.AutoCommit.Enable = autoCommit
cfg.Consumer.Group.Session.Timeout = time.Duration(
sessionTimeoutMS,
) * time.Millisecond
cfg.Consumer.Group.Heartbeat.Interval = time.Duration(
heartbeatIntervalMS,
) * time.Millisecond

return &Subscriber{
logger: slog.New(slogHandler),
cfg: kafkaCfg,
cfg: cfg,
brokers: brokers,
consumerGroup: groupID,
}, nil
}

// Subscribe creates a new subscription that runs in the background.
func (s Subscriber) Subscribe(channels ...string) (pubsub.Subscription[string, []byte], error) {
func (s Subscriber) Subscribe(
channels ...string,
) (pubsub.Subscription[string, []byte], error) {
if len(channels) != 1 {
return nil, pubsub.ErrExactlyOneChannelAllowed
}
Expand Down Expand Up @@ -100,11 +111,20 @@ func newSubscription(
wg.Add(len(partitions))

for _, partition := range partitions {
partitionConsumer, err := consumer.ConsumePartition(topic, partition, sarama.OffsetNewest)
partitionConsumer, err := consumer.ConsumePartition(
topic,
partition,
sarama.OffsetNewest,
)
if err != nil {
cancel()

return nil, fmt.Errorf("consume partition %d for topic %q: %w", partition, topic, err)
return nil, fmt.Errorf(
"consume partition %d for topic %q: %w",
partition,
topic,
err,
)
}

go func() {
Expand Down
Loading

0 comments on commit 2e419f8

Please sign in to comment.