Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SMQ-2546 - Add events to adapters #2659

Merged
merged 6 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cmd/coap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
jaegerclient "github.com/absmach/supermq/pkg/jaeger"
"github.com/absmach/supermq/pkg/messaging/brokers"
brokerstracing "github.com/absmach/supermq/pkg/messaging/brokers/tracing"
msgevents "github.com/absmach/supermq/pkg/messaging/events"
"github.com/absmach/supermq/pkg/prometheus"
"github.com/absmach/supermq/pkg/server"
coapserver "github.com/absmach/supermq/pkg/server/coap"
Expand All @@ -47,6 +48,7 @@ type config struct {
SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"SMQ_COAP_ADAPTER_INSTANCE_ID" envDefault:""`
TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"`
ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"`
}

func main() {
Expand Down Expand Up @@ -143,6 +145,13 @@ func main() {
defer nps.Close()
nps = brokerstracing.NewPubSub(coapServerConfig, tracer, nps)

nps, err = msgevents.NewPubSubMiddleware(ctx, nps, cfg.ESURL)
if err != nil {
logger.Error(fmt.Sprintf("failed to create event store middleware: %s", err))
exitCode = 1
return
}

svc := coap.New(clientsClient, channelsClient, nps)

svc = tracing.New(tracer, svc)
Expand Down
9 changes: 9 additions & 0 deletions cmd/http/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/absmach/supermq/pkg/messaging"
"github.com/absmach/supermq/pkg/messaging/brokers"
brokerstracing "github.com/absmach/supermq/pkg/messaging/brokers/tracing"
msgevents "github.com/absmach/supermq/pkg/messaging/events"
"github.com/absmach/supermq/pkg/messaging/handler"
"github.com/absmach/supermq/pkg/prometheus"
"github.com/absmach/supermq/pkg/server"
Expand Down Expand Up @@ -59,6 +60,7 @@ type config struct {
SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"SMQ_HTTP_ADAPTER_INSTANCE_ID" envDefault:""`
TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"`
ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"`
}

func main() {
Expand Down Expand Up @@ -163,6 +165,13 @@ func main() {
defer pub.Close()
pub = brokerstracing.NewPublisher(httpServerConfig, tracer, pub)

pub, err = msgevents.NewPublisherMiddleware(ctx, pub, cfg.ESURL)
if err != nil {
logger.Error(fmt.Sprintf("failed to create event store middleware: %s", err))
exitCode = 1
return
}

svc := newService(pub, authn, clientsClient, channelsClient, logger, tracer)
targetServerCfg := server.Config{Port: targetHTTPPort}

Expand Down
22 changes: 22 additions & 0 deletions cmd/mqtt/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
jaegerclient "github.com/absmach/supermq/pkg/jaeger"
"github.com/absmach/supermq/pkg/messaging/brokers"
brokerstracing "github.com/absmach/supermq/pkg/messaging/brokers/tracing"
msgevents "github.com/absmach/supermq/pkg/messaging/events"
"github.com/absmach/supermq/pkg/messaging/handler"
mqttpub "github.com/absmach/supermq/pkg/messaging/mqtt"
"github.com/absmach/supermq/pkg/server"
Expand Down Expand Up @@ -134,6 +135,13 @@ func main() {
defer bsub.Close()
bsub = brokerstracing.NewPubSub(serverConfig, tracer, bsub)

bsub, err = msgevents.NewPubSubMiddleware(ctx, bsub, cfg.ESURL)
if err != nil {
logger.Error(fmt.Sprintf("failed to create event store middleware: %s", err))
exitCode = 1
return
}

mpub, err := mqttpub.NewPublisher(fmt.Sprintf("mqtt://%s:%s", cfg.MQTTTargetHost, cfg.MQTTTargetPort), cfg.MQTTQoS, cfg.MQTTForwarderTimeout)
if err != nil {
logger.Error(fmt.Sprintf("failed to create MQTT publisher: %s", err))
Expand All @@ -142,6 +150,13 @@ func main() {
}
defer mpub.Close()

mpub, err = msgevents.NewPublisherMiddleware(ctx, mpub, cfg.ESURL)
if err != nil {
logger.Error(fmt.Sprintf("failed to create event store middleware: %s", err))
exitCode = 1
return
}

fwd := mqtt.NewForwarder(brokers.SubjectAllChannels, logger)
fwd = mqtttracing.New(serverConfig, tracer, fwd, brokers.SubjectAllChannels)
if err := fwd.Forward(ctx, svcName, bsub, mpub); err != nil {
Expand All @@ -159,6 +174,13 @@ func main() {
defer np.Close()
np = brokerstracing.NewPublisher(serverConfig, tracer, np)

np, err = msgevents.NewPublisherMiddleware(ctx, np, cfg.ESURL)
if err != nil {
logger.Error(fmt.Sprintf("failed to create event store middleware: %s", err))
exitCode = 1
return
}

es, err := events.NewEventStore(ctx, cfg.ESURL, cfg.Instance)
if err != nil {
logger.Error(fmt.Sprintf("failed to create %s event store : %s", svcName, err))
Expand Down
9 changes: 9 additions & 0 deletions cmd/ws/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/absmach/supermq/pkg/messaging"
"github.com/absmach/supermq/pkg/messaging/brokers"
brokerstracing "github.com/absmach/supermq/pkg/messaging/brokers/tracing"
msgevents "github.com/absmach/supermq/pkg/messaging/events"
"github.com/absmach/supermq/pkg/prometheus"
"github.com/absmach/supermq/pkg/server"
httpserver "github.com/absmach/supermq/pkg/server/http"
Expand Down Expand Up @@ -55,6 +56,7 @@ type config struct {
SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"SMQ_WS_ADAPTER_INSTANCE_ID" envDefault:""`
TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"`
ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"`
}

func main() {
Expand Down Expand Up @@ -165,6 +167,13 @@ func main() {
defer nps.Close()
nps = brokerstracing.NewPubSub(targetServerConfig, tracer, nps)

nps, err = msgevents.NewPubSubMiddleware(ctx, nps, cfg.ESURL)
if err != nil {
logger.Error(fmt.Sprintf("failed to create event store middleware: %s", err))
exitCode = 1
return
}

svc := newService(clientsClient, channelsClient, nps, logger, tracer)

hs := httpserver.NewServer(ctx, cancel, svcName, targetServerConfig, httpapi.MakeHandler(ctx, svc, logger, cfg.InstanceID), logger)
Expand Down
5 changes: 3 additions & 2 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,6 @@ services:
bind:
create_host_path: true


groups-db:
image: postgres:16.2-alpine
container_name: supermq-groups-db
Expand Down Expand Up @@ -948,7 +947,6 @@ services:
bind:
create_host_path: true


jaeger:
image: jaegertracing/all-in-one:1.60
container_name: supermq-jaeger
Expand Down Expand Up @@ -1067,6 +1065,7 @@ services:
SMQ_JAEGER_TRACE_RATIO: ${SMQ_JAEGER_TRACE_RATIO}
SMQ_SEND_TELEMETRY: ${SMQ_SEND_TELEMETRY}
SMQ_HTTP_ADAPTER_INSTANCE_ID: ${SMQ_HTTP_ADAPTER_INSTANCE_ID}
SMQ_ES_URL: ${SMQ_ES_URL}
ports:
- ${SMQ_HTTP_ADAPTER_PORT}:${SMQ_HTTP_ADAPTER_PORT}
networks:
Expand Down Expand Up @@ -1153,6 +1152,7 @@ services:
SMQ_JAEGER_TRACE_RATIO: ${SMQ_JAEGER_TRACE_RATIO}
SMQ_SEND_TELEMETRY: ${SMQ_SEND_TELEMETRY}
SMQ_COAP_ADAPTER_INSTANCE_ID: ${SMQ_COAP_ADAPTER_INSTANCE_ID}
SMQ_ES_URL: ${SMQ_ES_URL}
ports:
- ${SMQ_COAP_ADAPTER_PORT}:${SMQ_COAP_ADAPTER_PORT}/udp
- ${SMQ_COAP_ADAPTER_HTTP_PORT}:${SMQ_COAP_ADAPTER_HTTP_PORT}/tcp
Expand Down Expand Up @@ -1230,6 +1230,7 @@ services:
SMQ_JAEGER_TRACE_RATIO: ${SMQ_JAEGER_TRACE_RATIO}
SMQ_SEND_TELEMETRY: ${SMQ_SEND_TELEMETRY}
SMQ_WS_ADAPTER_INSTANCE_ID: ${SMQ_WS_ADAPTER_INSTANCE_ID}
SMQ_ES_URL: ${SMQ_ES_URL}
ports:
- ${SMQ_WS_ADAPTER_HTTP_PORT}:${SMQ_WS_ADAPTER_HTTP_PORT}
networks:
Expand Down
48 changes: 39 additions & 9 deletions mqtt/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,48 @@

import "github.com/absmach/supermq/pkg/events"

var _ events.Event = (*mqttEvent)(nil)
const (
mqttPrefix = "mqtt"
clientSubscribe = mqttPrefix + ".client_subscribe"
clientConnect = mqttPrefix + ".client_connect"
clientDisconnect = mqttPrefix + ".client_disconnect"
)

type mqttEvent struct {
clientID string
operation string
instance string
var (
_ events.Event = (*connectEvent)(nil)
_ events.Event = (*subscribeEvent)(nil)
)

type connectEvent struct {
operation string
clientID string
subscriberID string
instance string
}

func (ce connectEvent) Encode() (map[string]interface{}, error) {
return map[string]interface{}{
"operation": ce.operation,
"client_id": ce.clientID,
"subscriber_id": ce.subscriberID,
"instance": ce.instance,
}, nil

Check warning on line 33 in mqtt/events/events.go

View check run for this annotation

Codecov / codecov/patch

mqtt/events/events.go#L27-L33

Added lines #L27 - L33 were not covered by tests
}

type subscribeEvent struct {
operation string
clientID string
subscriberID string
channelID string
subtopic string
}

func (me mqttEvent) Encode() (map[string]interface{}, error) {
func (se subscribeEvent) Encode() (map[string]interface{}, error) {

Check warning on line 44 in mqtt/events/events.go

View check run for this annotation

Codecov / codecov/patch

mqtt/events/events.go#L44

Added line #L44 was not covered by tests
return map[string]interface{}{
"client_id": me.clientID,
"operation": me.operation,
"instance": me.instance,
"operation": se.operation,
"client_id": se.clientID,
"subscriber_id": se.subscriberID,
"channel_id": se.channelID,
"subtopic": se.subtopic,

Check warning on line 50 in mqtt/events/events.go

View check run for this annotation

Codecov / codecov/patch

mqtt/events/events.go#L46-L50

Added lines #L46 - L50 were not covered by tests
}, nil
}
50 changes: 33 additions & 17 deletions mqtt/events/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@

//go:generate mockery --name EventStore --output=../mocks --filename events.go --quiet --note "Copyright (c) Abstract Machines"
type EventStore interface {
Connect(ctx context.Context, clientID string) error
Disconnect(ctx context.Context, clientID string) error
Connect(ctx context.Context, clientID, subscriberID string) error
Disconnect(ctx context.Context, clientID, subscriberID string) error
Subscribe(ctx context.Context, clientID, channelID, subscriberID, subtopic string) error
}

// EventStore is a struct used to store event streams in Redis.
type eventStore struct {
events.Publisher
ep events.Publisher
instance string
}

Expand All @@ -33,29 +34,44 @@
}

return &eventStore{
instance: instance,
Publisher: publisher,
instance: instance,
ep: publisher,

Check warning on line 38 in mqtt/events/streams.go

View check run for this annotation

Codecov / codecov/patch

mqtt/events/streams.go#L37-L38

Added lines #L37 - L38 were not covered by tests
}, nil
}

// Connect issues event on MQTT CONNECT.
func (es *eventStore) Connect(ctx context.Context, clientID string) error {
ev := mqttEvent{
clientID: clientID,
operation: "connect",
instance: es.instance,
func (es *eventStore) Connect(ctx context.Context, clientID, subscriberID string) error {
ev := connectEvent{
clientID: clientID,
operation: clientConnect,
subscriberID: subscriberID,
instance: es.instance,

Check warning on line 48 in mqtt/events/streams.go

View check run for this annotation

Codecov / codecov/patch

mqtt/events/streams.go#L43-L48

Added lines #L43 - L48 were not covered by tests
}

return es.Publish(ctx, ev)
return es.ep.Publish(ctx, ev)

Check warning on line 51 in mqtt/events/streams.go

View check run for this annotation

Codecov / codecov/patch

mqtt/events/streams.go#L51

Added line #L51 was not covered by tests
}

// Disconnect issues event on MQTT CONNECT.
func (es *eventStore) Disconnect(ctx context.Context, clientID string) error {
ev := mqttEvent{
clientID: clientID,
operation: "disconnect",
instance: es.instance,
func (es *eventStore) Disconnect(ctx context.Context, clientID, subscriberID string) error {
ev := connectEvent{
clientID: clientID,
operation: clientDisconnect,
subscriberID: subscriberID,
instance: es.instance,

Check warning on line 60 in mqtt/events/streams.go

View check run for this annotation

Codecov / codecov/patch

mqtt/events/streams.go#L55-L60

Added lines #L55 - L60 were not covered by tests
}

return es.Publish(ctx, ev)
return es.ep.Publish(ctx, ev)

Check warning on line 63 in mqtt/events/streams.go

View check run for this annotation

Codecov / codecov/patch

mqtt/events/streams.go#L63

Added line #L63 was not covered by tests
}

// Subscribe issues event on MQTT SUBSCRIBE.
func (es *eventStore) Subscribe(ctx context.Context, clientID, channelID, subscriberID, subtopic string) error {
ev := subscribeEvent{
operation: clientSubscribe,
clientID: clientID,
channelID: channelID,
subscriberID: subscriberID,
subtopic: subtopic,
}

return es.ep.Publish(ctx, ev)

Check warning on line 76 in mqtt/events/streams.go

View check run for this annotation

Codecov / codecov/patch

mqtt/events/streams.go#L67-L76

Added lines #L67 - L76 were not covered by tests
}
33 changes: 31 additions & 2 deletions mqtt/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
ErrFailedPublishDisconnectEvent = errors.New("failed to publish disconnect event")
ErrFailedParseSubtopic = errors.New("failed to parse subtopic")
ErrFailedPublishConnectEvent = errors.New("failed to publish connect event")
ErrFailedSubscribeEvent = errors.New("failed to publish subscribe event")
ErrFailedPublishToMsgBroker = errors.New("failed to publish to supermq message broker")
)

Expand Down Expand Up @@ -106,7 +107,7 @@
return errInvalidUserId
}

if err := h.es.Connect(ctx, pwd); err != nil {
if err := h.es.Connect(ctx, s.Username, s.ID); err != nil {
h.logger.Error(errors.Wrap(ErrFailedPublishConnectEvent, err).Error())
}

Expand Down Expand Up @@ -202,6 +203,17 @@
if !ok {
return errors.Wrap(ErrFailedSubscribe, ErrClientNotInitialized)
}

for _, topic := range *topics {
channelID, subTopic, err := parseTopic(topic)
if err != nil {
return err
}

Check warning on line 211 in mqtt/handler.go

View check run for this annotation

Codecov / codecov/patch

mqtt/handler.go#L210-L211

Added lines #L210 - L211 were not covered by tests
if err := h.es.Subscribe(ctx, s.Username, channelID, s.ID, subTopic); err != nil {
return errors.Wrap(ErrFailedSubscribeEvent, err)
}

Check warning on line 214 in mqtt/handler.go

View check run for this annotation

Codecov / codecov/patch

mqtt/handler.go#L213-L214

Added lines #L213 - L214 were not covered by tests
}

h.logger.Info(fmt.Sprintf(LogInfoSubscribed, s.ID, strings.Join(*topics, ",")))
return nil
}
Expand All @@ -223,7 +235,7 @@
return errors.Wrap(ErrFailedDisconnect, ErrClientNotInitialized)
}
h.logger.Error(fmt.Sprintf(LogInfoDisconnected, s.ID, s.Password))
if err := h.es.Disconnect(ctx, string(s.Password)); err != nil {
if err := h.es.Disconnect(ctx, s.Username, s.ID); err != nil {
return errors.Wrap(ErrFailedPublishDisconnectEvent, err)
}
return nil
Expand Down Expand Up @@ -260,6 +272,23 @@
return nil
}

func parseTopic(topic string) (string, string, error) {
channelParts := channelRegExp.FindStringSubmatch(topic)
if len(channelParts) < 2 {
return "", "", errors.Wrap(ErrFailedPublish, ErrMalformedTopic)
}

Check warning on line 279 in mqtt/handler.go

View check run for this annotation

Codecov / codecov/patch

mqtt/handler.go#L278-L279

Added lines #L278 - L279 were not covered by tests

chanID := channelParts[1]
subtopic := channelParts[2]

subtopic, err := parseSubtopic(subtopic)
if err != nil {
return "", "", errors.Wrap(ErrFailedParseSubtopic, err)
}

Check warning on line 287 in mqtt/handler.go

View check run for this annotation

Codecov / codecov/patch

mqtt/handler.go#L286-L287

Added lines #L286 - L287 were not covered by tests

return chanID, subtopic, nil
}

func parseSubtopic(subtopic string) (string, error) {
if subtopic == "" {
return subtopic, nil
Expand Down
Loading
Loading