Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SMQ - 2546 - Add telemetry aggregation for clients telemetry #1

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .github/workflows/check-generated-files.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ jobs:
- "invitations/invitations.go"
- "users/emailer.go"
- "users/hasher.go"
- "mqtt/events/streams.go"
- "certs/certs.go"
- "certs/pki/vault.go"
- "certs/service.go"
Expand Down Expand Up @@ -149,7 +148,6 @@ jobs:
mv ./clients/mocks/clients_client.go ./clients/mocks/clients_client.go.tmp
mv ./clients/mocks/cache.go ./clients/mocks/cache.go.tmp
mv ./clients/mocks/service.go ./clients/mocks/service.go.tmp
mv ./mqtt/mocks/events.go ./mqtt/mocks/events.go.tmp
mv ./readers/mocks/messages.go ./readers/mocks/messages.go.tmp
mv ./pkg/sdk/mocks/sdk.go ./pkg/sdk/mocks/sdk.go.tmp
mv ./pkg/messaging/mocks/pubsub.go ./pkg/messaging/mocks/pubsub.go.tmp
Expand Down Expand Up @@ -208,7 +206,6 @@ jobs:
check_mock_changes ./clients/mocks/clients_client.go " ./clients/mocks/clients_client.go"
check_mock_changes ./clients/mocks/cache.go " ./clients/mocks/cache.go"
check_mock_changes ./clients/mocks/service.go " ./clients/mocks/service.go"
check_mock_changes ./mqtt/mocks/events.go " ./mqtt/mocks/events.go"
check_mock_changes ./readers/mocks/messages.go " ./readers/mocks/messages.go"
check_mock_changes ./pkg/sdk/mocks/sdk.go " ./pkg/sdk/mocks/sdk.go"
check_mock_changes ./pkg/messaging/mocks/pubsub.go " ./pkg/messaging/mocks/pubsub.go"
Expand Down
9 changes: 9 additions & 0 deletions cmd/coap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
jaegerclient "github.com/absmach/supermq/pkg/jaeger"
"github.com/absmach/supermq/pkg/messaging/brokers"
brokerstracing "github.com/absmach/supermq/pkg/messaging/brokers/tracing"
msgevents "github.com/absmach/supermq/pkg/messaging/events"
"github.com/absmach/supermq/pkg/prometheus"
"github.com/absmach/supermq/pkg/server"
coapserver "github.com/absmach/supermq/pkg/server/coap"
Expand All @@ -47,6 +48,7 @@ type config struct {
SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"SMQ_COAP_ADAPTER_INSTANCE_ID" envDefault:""`
TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"`
ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"`
}

func main() {
Expand Down Expand Up @@ -143,6 +145,13 @@ func main() {
defer nps.Close()
nps = brokerstracing.NewPubSub(coapServerConfig, tracer, nps)

nps, err = msgevents.NewPubSubMiddleware(ctx, nps, cfg.ESURL)
if err != nil {
logger.Error(fmt.Sprintf("failed to create event store middleware: %s", err))
exitCode = 1
return
}

svc := coap.New(clientsClient, channelsClient, nps)

svc = tracing.New(tracer, svc)
Expand Down
9 changes: 9 additions & 0 deletions cmd/http/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/absmach/supermq/pkg/messaging"
"github.com/absmach/supermq/pkg/messaging/brokers"
brokerstracing "github.com/absmach/supermq/pkg/messaging/brokers/tracing"
msgevents "github.com/absmach/supermq/pkg/messaging/events"
"github.com/absmach/supermq/pkg/messaging/handler"
"github.com/absmach/supermq/pkg/prometheus"
"github.com/absmach/supermq/pkg/server"
Expand Down Expand Up @@ -59,6 +60,7 @@ type config struct {
SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"SMQ_HTTP_ADAPTER_INSTANCE_ID" envDefault:""`
TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"`
ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"`
}

func main() {
Expand Down Expand Up @@ -163,6 +165,13 @@ func main() {
defer pub.Close()
pub = brokerstracing.NewPublisher(httpServerConfig, tracer, pub)

pub, err = msgevents.NewPublisherMiddleware(ctx, pub, cfg.ESURL)
if err != nil {
logger.Error(fmt.Sprintf("failed to create event store middleware: %s", err))
exitCode = 1
return
}

svc := newService(pub, authn, clientsClient, channelsClient, logger, tracer)
targetServerCfg := server.Config{Port: targetHTTPPort}

Expand Down
22 changes: 19 additions & 3 deletions cmd/mqtt/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
jaegerclient "github.com/absmach/supermq/pkg/jaeger"
"github.com/absmach/supermq/pkg/messaging/brokers"
brokerstracing "github.com/absmach/supermq/pkg/messaging/brokers/tracing"
msgevents "github.com/absmach/supermq/pkg/messaging/events"
"github.com/absmach/supermq/pkg/messaging/handler"
mqttpub "github.com/absmach/supermq/pkg/messaging/mqtt"
"github.com/absmach/supermq/pkg/server"
Expand Down Expand Up @@ -142,6 +143,13 @@ func main() {
}
defer mpub.Close()

mpub, err = msgevents.NewPublisherMiddleware(ctx, mpub, cfg.ESURL)
if err != nil {
logger.Error(fmt.Sprintf("failed to create event store middleware: %s", err))
exitCode = 1
return
}
Comment on lines +146 to +151
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ensure the final wrapped publisher is properly closed upon exit.

After reassigning mpub to the wrapped instance returned by msgevents.NewPublisherMiddleware, the previously deferred mpub.Close() will only close the original publisher object. To avoid potential resource leaks, defer the close of the final wrapped publisher instead:

mpub, err := mqttpub.NewPublisher(fmt.Sprintf("mqtt://%s:%s", cfg.MQTTTargetHost, cfg.MQTTTargetPort), cfg.MQTTQoS, cfg.MQTTForwarderTimeout)
 if err != nil {
   logger.Error(fmt.Sprintf("failed to create MQTT publisher: %s", err))
   exitCode = 1
   return
 }
-defer mpub.Close() // This closes the *old* mpub, not the post-middleware mpub
+// Remove the above defer and add it after wrapping
 mpub, err = msgevents.NewPublisherMiddleware(ctx, mpub, cfg.ESURL)
 if err != nil {
   logger.Error(fmt.Sprintf("failed to create event store middleware: %s", err))
   exitCode = 1
   return
 }
+defer mpub.Close() // This now closes the *wrapped* mpub

Committable suggestion skipped: line range outside the PR's diff.


fwd := mqtt.NewForwarder(brokers.SubjectAllChannels, logger)
fwd = mqtttracing.New(serverConfig, tracer, fwd, brokers.SubjectAllChannels)
if err := fwd.Forward(ctx, svcName, bsub, mpub); err != nil {
Expand All @@ -159,9 +167,9 @@ func main() {
defer np.Close()
np = brokerstracing.NewPublisher(serverConfig, tracer, np)

es, err := events.NewEventStore(ctx, cfg.ESURL, cfg.Instance)
np, err = msgevents.NewPublisherMiddleware(ctx, np, cfg.ESURL)
if err != nil {
logger.Error(fmt.Sprintf("failed to create %s event store : %s", svcName, err))
logger.Error(fmt.Sprintf("failed to create event store middleware: %s", err))
Comment on lines +170 to +172
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Revisit closing the final wrapped np publisher.

The same pattern applies here as with mpub: the newly wrapped publisher should be properly closed. Ensure the defer is placed after the publisher is wrapped by msgevents.NewPublisherMiddleware.

exitCode = 1
return
}
Expand Down Expand Up @@ -198,7 +206,15 @@ func main() {
defer channelsHandler.Close()
logger.Info("Channels service gRPC client successfully connected to channels gRPC server " + channelsHandler.Secure())

h := mqtt.NewHandler(np, es, logger, clientsClient, channelsClient)
h := mqtt.NewHandler(np, logger, clientsClient, channelsClient)

h, err = events.NewEventStoreMiddleware(ctx, h, cfg.ESURL, cfg.Instance)
if err != nil {
logger.Error(fmt.Sprintf("failed to create event store middleware: %s", err))
exitCode = 1
return
}

h = handler.NewTracing(tracer, h)

if cfg.SendTelemetry {
Expand Down
9 changes: 9 additions & 0 deletions cmd/ws/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/absmach/supermq/pkg/messaging"
"github.com/absmach/supermq/pkg/messaging/brokers"
brokerstracing "github.com/absmach/supermq/pkg/messaging/brokers/tracing"
msgevents "github.com/absmach/supermq/pkg/messaging/events"
"github.com/absmach/supermq/pkg/prometheus"
"github.com/absmach/supermq/pkg/server"
httpserver "github.com/absmach/supermq/pkg/server/http"
Expand Down Expand Up @@ -55,6 +56,7 @@ type config struct {
SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"SMQ_WS_ADAPTER_INSTANCE_ID" envDefault:""`
TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"`
ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"`
}

func main() {
Expand Down Expand Up @@ -165,6 +167,13 @@ func main() {
defer nps.Close()
nps = brokerstracing.NewPubSub(targetServerConfig, tracer, nps)

nps, err = msgevents.NewPubSubMiddleware(ctx, nps, cfg.ESURL)
if err != nil {
logger.Error(fmt.Sprintf("failed to create event store middleware: %s", err))
exitCode = 1
return
}

svc := newService(clientsClient, channelsClient, nps, logger, tracer)

hs := httpserver.NewServer(ctx, cancel, svcName, targetServerConfig, httpapi.MakeHandler(ctx, svc, logger, cfg.InstanceID), logger)
Expand Down
7 changes: 4 additions & 3 deletions coap/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,10 @@ func (svc *adapterService) Subscribe(ctx context.Context, key, chanID, subtopic

authzc := newAuthzClient(clientID, chanID, subtopic, svc.channels, c)
subCfg := messaging.SubscriberConfig{
ID: c.Token(),
Topic: subject,
Handler: authzc,
ID: c.Token(),
ClientID: clientID,
Topic: subject,
Handler: authzc,
}
return svc.pubsub.Subscribe(ctx, subCfg)
}
Expand Down
5 changes: 3 additions & 2 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,6 @@ services:
bind:
create_host_path: true


groups-db:
image: postgres:16.2-alpine
container_name: supermq-groups-db
Expand Down Expand Up @@ -948,7 +947,6 @@ services:
bind:
create_host_path: true


jaeger:
image: jaegertracing/all-in-one:1.60
container_name: supermq-jaeger
Expand Down Expand Up @@ -1067,6 +1065,7 @@ services:
SMQ_JAEGER_TRACE_RATIO: ${SMQ_JAEGER_TRACE_RATIO}
SMQ_SEND_TELEMETRY: ${SMQ_SEND_TELEMETRY}
SMQ_HTTP_ADAPTER_INSTANCE_ID: ${SMQ_HTTP_ADAPTER_INSTANCE_ID}
SMQ_ES_URL: ${SMQ_ES_URL}
ports:
- ${SMQ_HTTP_ADAPTER_PORT}:${SMQ_HTTP_ADAPTER_PORT}
networks:
Expand Down Expand Up @@ -1153,6 +1152,7 @@ services:
SMQ_JAEGER_TRACE_RATIO: ${SMQ_JAEGER_TRACE_RATIO}
SMQ_SEND_TELEMETRY: ${SMQ_SEND_TELEMETRY}
SMQ_COAP_ADAPTER_INSTANCE_ID: ${SMQ_COAP_ADAPTER_INSTANCE_ID}
SMQ_ES_URL: ${SMQ_ES_URL}
ports:
- ${SMQ_COAP_ADAPTER_PORT}:${SMQ_COAP_ADAPTER_PORT}/udp
- ${SMQ_COAP_ADAPTER_HTTP_PORT}:${SMQ_COAP_ADAPTER_HTTP_PORT}/tcp
Expand Down Expand Up @@ -1230,6 +1230,7 @@ services:
SMQ_JAEGER_TRACE_RATIO: ${SMQ_JAEGER_TRACE_RATIO}
SMQ_SEND_TELEMETRY: ${SMQ_SEND_TELEMETRY}
SMQ_WS_ADAPTER_INSTANCE_ID: ${SMQ_WS_ADAPTER_INSTANCE_ID}
SMQ_ES_URL: ${SMQ_ES_URL}
ports:
- ${SMQ_WS_ADAPTER_HTTP_PORT}:${SMQ_WS_ADAPTER_HTTP_PORT}
networks:
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/absmach/certs v0.0.0-20241014135535-3f118b801054
github.com/absmach/mgate v0.4.5
github.com/absmach/senml v1.0.6
github.com/authzed/authzed-go v1.2.2-0.20250107172318-7fd4159ab2b7
github.com/authzed/authzed-go v1.3.0
github.com/authzed/grpcutil v0.0.0-20240123194739-2ea1e3d2d98b
github.com/authzed/spicedb v1.40.0
github.com/caarlos0/env/v11 v11.3.1
Expand Down Expand Up @@ -81,7 +81,7 @@ require (
github.com/docker/go-units v0.5.0 // indirect
github.com/dsnet/golib/memfile v1.0.0 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/envoyproxy/protoc-gen-validate v1.1.0 // indirect
github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
github.com/go-errors/errors v1.5.1 // indirect
Expand All @@ -96,7 +96,7 @@ require (
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/gopherjs/gopherjs v1.17.2 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
Expand Down Expand Up @@ -142,7 +142,7 @@ require (
github.com/prometheus/procfs v0.15.1 // indirect
github.com/rs/zerolog v1.33.0 // indirect
github.com/ryanuber/go-glob v1.0.0 // indirect
github.com/samber/lo v1.47.0 // indirect
github.com/samber/lo v1.49.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/smarty/assertions v1.15.0 // indirect
Expand Down
16 changes: 8 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ github.com/absmach/senml v1.0.6 h1:WPeIl6vQ00k7ghWSZYT/QP0KUxq2+4zQoaC7240pLFk=
github.com/absmach/senml v1.0.6/go.mod h1:QnJNPy1DJPy0+qUW21PTcH/xoh0LgfYZxTfwriMIvmQ=
github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI=
github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g=
github.com/authzed/authzed-go v1.2.2-0.20250107172318-7fd4159ab2b7 h1:f99Iz0FoEyqPelojk1kWWgSDVJtDEMrLb+6qVdohoqs=
github.com/authzed/authzed-go v1.2.2-0.20250107172318-7fd4159ab2b7/go.mod h1:/+NblSrzA6Lm6vUO3fqZyLh8MDCLUQq2AyJMlHb32DE=
github.com/authzed/authzed-go v1.3.0 h1:jKIMpYDy+6WoOwl32HRURxLZxNGm+I7ObUlTntEPcXA=
github.com/authzed/authzed-go v1.3.0/go.mod h1:MYkXImtFAxrM/bVZvmC/WO+gZC9RLlvpCM51SLaUZb0=
github.com/authzed/cel-go v0.20.2 h1:GlmLecGry7Z8HU0k+hmaHHUV05ZHrsFxduXHtIePvck=
github.com/authzed/cel-go v0.20.2/go.mod h1:pJHVFWbqUHV1J+klQoZubdKswlbxcsbojda3mye9kiU=
github.com/authzed/grpcutil v0.0.0-20240123194739-2ea1e3d2d98b h1:wbh8IK+aMLTCey9sZasO7b6BWLAJnHHvb79fvWCXwxw=
Expand Down Expand Up @@ -91,8 +91,8 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/envoyproxy/protoc-gen-validate v1.1.0 h1:tntQDh69XqOCOZsDz0lVJQez/2L6Uu2PdjCQwWCJ3bM=
github.com/envoyproxy/protoc-gen-validate v1.1.0/go.mod h1:sXRDRVmzEbkM7CVcM06s9shE/m23dg3wzjl0UWqJ2q4=
github.com/envoyproxy/protoc-gen-validate v1.2.1 h1:DEo3O99U8j4hBFwbJfrz9VtgcDfUKS7KJ7spH3d86P8=
github.com/envoyproxy/protoc-gen-validate v1.2.1/go.mod h1:d/C80l/jxXLdfEIhX1W2TmLfsJ31lvEjwamM4DxlWXU=
github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM=
github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
Expand Down Expand Up @@ -158,8 +158,8 @@ github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aN
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDaL56wXCB/5+wF6uHfaI=
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0/go.mod h1:g5qyo/la0ALbONm6Vbp88Yd8NsDy6rZz+RcrMPxvld8=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 h1:VNqngBF40hVlDloBruUehVYC3ArSgIyScOAyMRqBxRg=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1/go.mod h1:RBRO7fro65R6tjKzYgLAFo0t1QEXY1Dp+i/bvpRiqiQ=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.0 h1:VD1gqscl4nYs1YxVuSdemTrSgTKrwOWDK0FVFMqm+Cg=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.0/go.mod h1:4EgsQoS4TOhJizV+JTFg40qx1Ofh3XmXEQNBpgvNT40=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
Expand Down Expand Up @@ -373,8 +373,8 @@ github.com/rubenv/sql-migrate v1.7.1/go.mod h1:Ob2Psprc0/3ggbM6wCzyYVFFuc6FyZrb2
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/go-glob v1.0.0 h1:iQh3xXAumdQ+4Ufa5b25cRpC5TYKlno6hsv6Cb3pkBk=
github.com/ryanuber/go-glob v1.0.0/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc=
github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc=
github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU=
github.com/samber/lo v1.49.0 h1:AGnTnQrg1jpFuwECPUSoxZCfVH5W22b605kWSry3YxM=
github.com/samber/lo v1.49.0/go.mod h1:dO6KHFzUKXgP8LDhU0oI8d2hekjXnGOu0DB8Jecxd6o=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
Expand Down
17 changes: 16 additions & 1 deletion journal/api/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import (
"github.com/absmach/supermq/journal"
)

var _ supermq.Response = (*pageRes)(nil)
var (
_ supermq.Response = (*pageRes)(nil)
_ supermq.Response = (*clientTelemetryRes)(nil)
)

type pageRes struct {
journal.JournalsPage `json:",inline"`
Expand All @@ -31,3 +34,15 @@ func (res pageRes) Empty() bool {
type clientTelemetryRes struct {
journal.ClientTelemetry `json:",inline"`
}

func (res clientTelemetryRes) Headers() map[string]string {
return map[string]string{}
}

func (res clientTelemetryRes) Code() int {
return http.StatusOK
}

func (res clientTelemetryRes) Empty() bool {
return false
}
Comment on lines +46 to +48
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Implement proper Empty check for clientTelemetryRes.

The Empty() method should check if the telemetry data is actually empty:

 func (res clientTelemetryRes) Empty() bool {
-	return false
+	return res.ClientTelemetry.Messages == 0 &&
+		len(res.ClientTelemetry.Subscriptions) == 0
 }

Committable suggestion skipped: line range outside the PR's diff.

25 changes: 24 additions & 1 deletion journal/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,21 @@ func (page JournalsPage) MarshalJSON() ([]byte, error) {
type ClientTelemetry struct {
ClientID string `json:"client_id"`
DomainID string `json:"domain_id"`
Subscriptions []string `json:"subscriptions"`
Subscriptions uint64 `json:"subscriptions"`
InboundMessages uint64 `json:"inbound_messages"`
OutboundMessages uint64 `json:"outbound_messages"`
FirstSeen time.Time `json:"first_seen"`
LastSeen time.Time `json:"last_seen"`
}

type ClientSubscription struct {
ID string `json:"id" db:"id"`
SubscriberID string `json:"subscriber_id" db:"subscriber_id"`
ChannelID string `json:"channel_id" db:"channel_id"`
Subtopic string `json:"subtopic" db:"subtopic"`
ClientID string `json:"client_id" db:"client_id"`
}
Comment on lines +150 to +156
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding validation for ClientSubscription fields.

The ClientSubscription struct should include field validation to ensure data integrity.

Add validation methods:

func (cs *ClientSubscription) Validate() error {
    if cs.ID == "" {
        return errors.New("id cannot be empty")
    }
    if cs.SubscriberID == "" {
        return errors.New("subscriber_id cannot be empty")
    }
    if cs.ChannelID == "" {
        return errors.New("channel_id cannot be empty")
    }
    return nil
}


// Service provides access to the journal log service.
//
//go:generate mockery --name Service --output=./mocks --filename service.go --quiet --note "Copyright (c) Abstract Machines"
Expand Down Expand Up @@ -179,4 +187,19 @@ type Repository interface {

// DeleteClientTelemetry removes telemetry data for a client from the database.
DeleteClientTelemetry(ctx context.Context, clientID, domainID string) error

// AddSubscription adds a subscription to the client telemetry.
AddSubscription(ctx context.Context, sub ClientSubscription) error

// CountSubscriptions returns the number of subscriptions for a client.
CountSubscriptions(ctx context.Context, clientID string) (uint64, error)

// RemoveSubscription removes a subscription from the client telemetry.
RemoveSubscription(ctx context.Context, subscriberID string) error

// IncrementInboundMessages increments the inbound messages count for a client.
IncrementInboundMessages(ctx context.Context, clientID string) error

// IncrementOutboundMessages increments the outbound messages count for a client.
IncrementOutboundMessages(ctx context.Context, channelID, subtopic string) error
}
2 changes: 1 addition & 1 deletion journal/middleware/authorization.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (am *authorizationMiddleware) RetrieveClientTelemetry(ctx context.Context,
Domain: session.DomainID,
SubjectType: policies.UserType,
SubjectKind: policies.UsersKind,
Subject: session.UserID,
Subject: session.DomainUserID,
Permission: readPermission,
ObjectType: policies.ClientType,
Object: clientID,
Expand Down
Loading
Loading