Skip to content

Commit

Permalink
fix: cleanup subscribe (#335)
Browse files Browse the repository at this point in the history
* fix: single child element

* fix: cleanup subscribe

* feat: wait for wg or 60s

* chore: log timeout case

* chore: linting

---------

Co-authored-by: gabriel ruttner <[email protected]>
  • Loading branch information
grutt and grutt authored Apr 4, 2024
1 parent 334ce75 commit 85d35bb
Showing 1 changed file with 24 additions and 7 deletions.
31 changes: 24 additions & 7 deletions internal/services/dispatcher/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sync"
"time"

"github.com/rs/zerolog"

"github.com/hatchet-dev/hatchet/internal/datautils"
"github.com/hatchet-dev/hatchet/internal/msgqueue"
"github.com/hatchet-dev/hatchet/internal/repository"
Expand Down Expand Up @@ -388,6 +390,7 @@ func (s *DispatcherImpl) SubscribeToWorkflowEvents(request *contracts.SubscribeT
err = stream.Send(e)

if err != nil {
cancel() // FIXME is this necessary?
s.l.Error().Err(err).Msgf("could not send workflow event to client")
return nil
}
Expand All @@ -406,18 +409,32 @@ func (s *DispatcherImpl) SubscribeToWorkflowEvents(request *contracts.SubscribeT
return err
}

for range ctx.Done() {
if err := cleanupQueue(); err != nil {
return fmt.Errorf("could not cleanup queue: %w", err)
}

// drain the existing connections
wg.Wait()
<-ctx.Done()
if err := cleanupQueue(); err != nil {
return fmt.Errorf("could not cleanup queue: %w", err)
}

waitFor(&wg, 60*time.Second, s.l)

return nil
}

func waitFor(wg *sync.WaitGroup, timeout time.Duration, l *zerolog.Logger) {

done := make(chan struct{})

go func() {
wg.Wait()
defer close(done)
}()

select {
case <-done:
case <-time.After(timeout):
l.Error().Msg("timed out waiting for wait group")
}
}

func (s *DispatcherImpl) SendStepActionEvent(ctx context.Context, request *contracts.StepActionEvent) (*contracts.ActionEventResponse, error) {
switch request.EventType {
case contracts.StepActionEventType_STEP_EVENT_TYPE_STARTED:
Expand Down

0 comments on commit 85d35bb

Please sign in to comment.