Skip to content

Commit

Permalink
SMQ - 2546 - Add telemetry aggregation for clients telemetry (#2661)
Browse files Browse the repository at this point in the history
Signed-off-by: Felix Gateru <[email protected]>
  • Loading branch information
felixgateru authored Feb 3, 2025
1 parent 07dbb86 commit 484de37
Show file tree
Hide file tree
Showing 19 changed files with 758 additions and 217 deletions.
3 changes: 0 additions & 3 deletions .github/workflows/check-generated-files.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ jobs:
- "invitations/invitations.go"
- "users/emailer.go"
- "users/hasher.go"
- "mqtt/events/streams.go"
- "certs/certs.go"
- "certs/pki/vault.go"
- "certs/service.go"
Expand Down Expand Up @@ -149,7 +148,6 @@ jobs:
mv ./clients/mocks/clients_client.go ./clients/mocks/clients_client.go.tmp
mv ./clients/mocks/cache.go ./clients/mocks/cache.go.tmp
mv ./clients/mocks/service.go ./clients/mocks/service.go.tmp
mv ./mqtt/mocks/events.go ./mqtt/mocks/events.go.tmp
mv ./readers/mocks/messages.go ./readers/mocks/messages.go.tmp
mv ./pkg/sdk/mocks/sdk.go ./pkg/sdk/mocks/sdk.go.tmp
mv ./pkg/messaging/mocks/pubsub.go ./pkg/messaging/mocks/pubsub.go.tmp
Expand Down Expand Up @@ -208,7 +206,6 @@ jobs:
check_mock_changes ./clients/mocks/clients_client.go " ./clients/mocks/clients_client.go"
check_mock_changes ./clients/mocks/cache.go " ./clients/mocks/cache.go"
check_mock_changes ./clients/mocks/service.go " ./clients/mocks/service.go"
check_mock_changes ./mqtt/mocks/events.go " ./mqtt/mocks/events.go"
check_mock_changes ./readers/mocks/messages.go " ./readers/mocks/messages.go"
check_mock_changes ./pkg/sdk/mocks/sdk.go " ./pkg/sdk/mocks/sdk.go"
check_mock_changes ./pkg/messaging/mocks/pubsub.go " ./pkg/messaging/mocks/pubsub.go"
Expand Down
24 changes: 9 additions & 15 deletions cmd/mqtt/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,6 @@ func main() {
defer bsub.Close()
bsub = brokerstracing.NewPubSub(serverConfig, tracer, bsub)

bsub, err = msgevents.NewPubSubMiddleware(ctx, bsub, cfg.ESURL)
if err != nil {
logger.Error(fmt.Sprintf("failed to create event store middleware: %s", err))
exitCode = 1
return
}

mpub, err := mqttpub.NewPublisher(fmt.Sprintf("mqtt://%s:%s", cfg.MQTTTargetHost, cfg.MQTTTargetPort), cfg.MQTTQoS, cfg.MQTTForwarderTimeout)
if err != nil {
logger.Error(fmt.Sprintf("failed to create MQTT publisher: %s", err))
Expand Down Expand Up @@ -181,13 +174,6 @@ func main() {
return
}

es, err := events.NewEventStore(ctx, cfg.ESURL, cfg.Instance)
if err != nil {
logger.Error(fmt.Sprintf("failed to create %s event store : %s", svcName, err))
exitCode = 1
return
}

clientsClientCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&clientsClientCfg, env.Options{Prefix: envPrefixClients}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s auth configuration : %s", svcName, err))
Expand Down Expand Up @@ -220,7 +206,15 @@ func main() {
defer channelsHandler.Close()
logger.Info("Channels service gRPC client successfully connected to channels gRPC server " + channelsHandler.Secure())

h := mqtt.NewHandler(np, es, logger, clientsClient, channelsClient)
h := mqtt.NewHandler(np, logger, clientsClient, channelsClient)

h, err = events.NewEventStoreMiddleware(ctx, h, cfg.ESURL, cfg.Instance)
if err != nil {
logger.Error(fmt.Sprintf("failed to create event store middleware: %s", err))
exitCode = 1
return
}

h = handler.NewTracing(tracer, h)

if cfg.SendTelemetry {
Expand Down
7 changes: 4 additions & 3 deletions coap/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,10 @@ func (svc *adapterService) Subscribe(ctx context.Context, key, chanID, subtopic

authzc := newAuthzClient(clientID, chanID, subtopic, svc.channels, c)
subCfg := messaging.SubscriberConfig{
ID: c.Token(),
Topic: subject,
Handler: authzc,
ID: c.Token(),
ClientID: clientID,
Topic: subject,
Handler: authzc,
}
return svc.pubsub.Subscribe(ctx, subCfg)
}
Expand Down
17 changes: 16 additions & 1 deletion journal/api/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import (
"github.com/absmach/supermq/journal"
)

var _ supermq.Response = (*pageRes)(nil)
var (
_ supermq.Response = (*pageRes)(nil)
_ supermq.Response = (*clientTelemetryRes)(nil)
)

type pageRes struct {
journal.JournalsPage `json:",inline"`
Expand All @@ -31,3 +34,15 @@ func (res pageRes) Empty() bool {
type clientTelemetryRes struct {
journal.ClientTelemetry `json:",inline"`
}

func (res clientTelemetryRes) Headers() map[string]string {
return map[string]string{}
}

func (res clientTelemetryRes) Code() int {
return http.StatusOK
}

func (res clientTelemetryRes) Empty() bool {
return false
}
25 changes: 24 additions & 1 deletion journal/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,21 @@ func (page JournalsPage) MarshalJSON() ([]byte, error) {
type ClientTelemetry struct {
ClientID string `json:"client_id"`
DomainID string `json:"domain_id"`
Subscriptions []string `json:"subscriptions"`
Subscriptions uint64 `json:"subscriptions"`
InboundMessages uint64 `json:"inbound_messages"`
OutboundMessages uint64 `json:"outbound_messages"`
FirstSeen time.Time `json:"first_seen"`
LastSeen time.Time `json:"last_seen"`
}

type ClientSubscription struct {
ID string `json:"id" db:"id"`
SubscriberID string `json:"subscriber_id" db:"subscriber_id"`
ChannelID string `json:"channel_id" db:"channel_id"`
Subtopic string `json:"subtopic" db:"subtopic"`
ClientID string `json:"client_id" db:"client_id"`
}

// Service provides access to the journal log service.
//
//go:generate mockery --name Service --output=./mocks --filename service.go --quiet --note "Copyright (c) Abstract Machines"
Expand Down Expand Up @@ -179,4 +187,19 @@ type Repository interface {

// DeleteClientTelemetry removes telemetry data for a client from the database.
DeleteClientTelemetry(ctx context.Context, clientID, domainID string) error

// AddSubscription adds a subscription to the client telemetry.
AddSubscription(ctx context.Context, sub ClientSubscription) error

// CountSubscriptions returns the number of subscriptions for a client.
CountSubscriptions(ctx context.Context, clientID string) (uint64, error)

// RemoveSubscription removes a subscription from the client telemetry.
RemoveSubscription(ctx context.Context, subscriberID string) error

// IncrementInboundMessages increments the inbound messages count for a client.
IncrementInboundMessages(ctx context.Context, clientID string) error

// IncrementOutboundMessages increments the outbound messages count for a client.
IncrementOutboundMessages(ctx context.Context, channelID, subtopic string) error
}
2 changes: 1 addition & 1 deletion journal/middleware/authorization.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (am *authorizationMiddleware) RetrieveClientTelemetry(ctx context.Context,
Domain: session.DomainID,
SubjectType: policies.UserType,
SubjectKind: policies.UsersKind,
Subject: session.UserID,
Subject: session.DomainUserID,
Permission: readPermission,
ObjectType: policies.ClientType,
Object: clientID,
Expand Down
100 changes: 100 additions & 0 deletions journal/mocks/repository.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 11 additions & 4 deletions journal/postgres/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,25 @@ func Migration() *migrate.MemoryMigrationSource {
`CREATE INDEX idx_journal_default_client_filter ON journal(operation, (attributes->>'id'), (attributes->>'client_id'), occurred_at DESC);`,
`CREATE INDEX idx_journal_default_channel_filter ON journal(operation, (attributes->>'id'), (attributes->>'channel_id'), occurred_at DESC);`,
`CREATE TABLE IF NOT EXISTS clients_telemetry (
client_id VARCHAR(36) NOT NULL,
client_id VARCHAR(36) PRIMARY KEY,
domain_id VARCHAR(36) NOT NULL,
subscriptions TEXT[],
inbound_messages BIGINT DEFAULT 0,
outbound_messages BIGINT DEFAULT 0,
first_seen TIMESTAMP,
last_seen TIMESTAMP,
PRIMARY KEY (client_id, domain_id)
last_seen TIMESTAMP
)`,
`CREATE TABLE IF NOT EXISTS subscriptions (
id VARCHAR(36) PRIMARY KEY,
subscriber_id VARCHAR(1024) NOT NULL,
channel_id VARCHAR(36) NOT NULL,
subtopic VARCHAR(1024),
client_id VARCHAR(36),
FOREIGN KEY (client_id) REFERENCES clients_telemetry(client_id) ON DELETE CASCADE ON UPDATE CASCADE
)`,
},
Down: []string{
`DROP TABLE IF EXISTS clients_telemetry`,
`DROP TABLE IF EXISTS subscriptions`,
`DROP TABLE IF EXISTS journal`,
},
},
Expand Down
Loading

0 comments on commit 484de37

Please sign in to comment.