Skip to content

Commit

Permalink
fix: adds sleep flag to nsq e2e test image (#184)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ulminator authored Nov 5, 2024
1 parent f0ea14c commit af97028
Showing 1 changed file with 9 additions and 4 deletions.
13 changes: 9 additions & 4 deletions e2e/images/nsq/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,28 @@ import (
"os"
"os/signal"
"syscall"
"time"

"github.com/nsqio/go-nsq"
)

type Handler struct{}
type Handler struct {
sleepDuration time.Duration
}

func (h *Handler) HandleMessage(m *nsq.Message) error {
log.Printf("Received message: %s", m.Body)
time.Sleep(h.sleepDuration)
return nil
}

func nsqConsumer(config *nsq.Config, nsqlookupdHTTPAddress, topic, channel string) error {
func nsqConsumer(config *nsq.Config, nsqlookupdHTTPAddress, topic, channel string, sleepDuration time.Duration) error {
consumer, err := nsq.NewConsumer(topic, channel, config)
if err != nil {
return err
}

consumer.AddHandler(&Handler{})
consumer.AddHandler(&Handler{sleepDuration: sleepDuration})

err = consumer.ConnectToNSQLookupd(nsqlookupdHTTPAddress)
if err != nil {
Expand Down Expand Up @@ -70,6 +74,7 @@ func main() {
mode := flag.String("mode", "", "consumer or producer")
topic := flag.String("topic", "", "topic name")
channel := flag.String("channel", "", "channel name")
sleepDuration := flag.Duration("sleep-duration", 0*time.Second, "consumer message processing time")
nsqlookupdHTTPAddress := flag.String("nsqlookupd-http-address", "", "nsqlookupd HTTP address")
messageCount := flag.Int("message-count", 1, "number of messages to send")
nsqdTCPAddress := flag.String("nsqd-tcp-address", "", "nsqd TCP address")
Expand All @@ -83,7 +88,7 @@ func main() {
if *topic == "" || *channel == "" || *nsqlookupdHTTPAddress == "" {
log.Fatalf("topic, channel, and nsqlookupd-http-address are required\n")
}
if err := nsqConsumer(config, *nsqlookupdHTTPAddress, *topic, *channel); err != nil {
if err := nsqConsumer(config, *nsqlookupdHTTPAddress, *topic, *channel, *sleepDuration); err != nil {
log.Fatalf("read from nsq failed: %w\n", err)
}
case "producer":
Expand Down

0 comments on commit af97028

Please sign in to comment.