Skip to content

Commit

Permalink
fix: centrum manager stream issue
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Trost <[email protected]>
  • Loading branch information
galexrt committed Sep 1, 2024
1 parent d68c6e4 commit 680b181
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 3 deletions.
13 changes: 11 additions & 2 deletions gen/go/proto/services/centrum/manager/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,19 @@ import (
"google.golang.org/protobuf/proto"
)

func (s *Manager) registerSubscriptions(ctx context.Context, c context.Context) error {
func (s *Manager) registerStream(ctx context.Context) (jetstream.StreamConfig, error) {
streamCfg, err := eventscentrum.RegisterStream(ctx, s.js)
if err != nil {
return fmt.Errorf("failed to register events: %w", err)
return streamCfg, fmt.Errorf("failed to register stream: %w", err)
}

return streamCfg, nil
}

func (s *Manager) registerSubscriptions(ctx context.Context, c context.Context) error {
streamCfg, err := s.registerStream(c)
if err != nil {
return err
}

consumer, err := s.js.CreateConsumer(ctx, streamCfg.Name, jetstream.ConsumerConfig{
Expand Down
6 changes: 5 additions & 1 deletion gen/go/proto/services/centrum/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,18 @@ func New(p Params) *Manager {
}

p.LC.Append(fx.StartHook(func(c context.Context) error {
if err := s.registerSubscriptions(c, ctx); err != nil {
if _, err := s.registerStream(c); err != nil {
return err
}

if err := s.loadData(ctx); err != nil {
return err
}

if err := s.registerSubscriptions(c, ctx); err != nil {
return err
}

return nil
}))

Expand Down

0 comments on commit 680b181

Please sign in to comment.