diff --git a/internal/services/dispatcher/server.go b/internal/services/dispatcher/server.go index 7e81a1392..6df48d724 100644 --- a/internal/services/dispatcher/server.go +++ b/internal/services/dispatcher/server.go @@ -7,6 +7,8 @@ import ( "sync" "time" + "github.com/rs/zerolog" + "github.com/hatchet-dev/hatchet/internal/datautils" "github.com/hatchet-dev/hatchet/internal/msgqueue" "github.com/hatchet-dev/hatchet/internal/repository" @@ -388,6 +390,7 @@ func (s *DispatcherImpl) SubscribeToWorkflowEvents(request *contracts.SubscribeT err = stream.Send(e) if err != nil { + cancel() // FIXME is this necessary? s.l.Error().Err(err).Msgf("could not send workflow event to client") return nil } @@ -406,18 +409,32 @@ func (s *DispatcherImpl) SubscribeToWorkflowEvents(request *contracts.SubscribeT return err } - for range ctx.Done() { - if err := cleanupQueue(); err != nil { - return fmt.Errorf("could not cleanup queue: %w", err) - } - - // drain the existing connections - wg.Wait() + <-ctx.Done() + if err := cleanupQueue(); err != nil { + return fmt.Errorf("could not cleanup queue: %w", err) } + waitFor(&wg, 60*time.Second, s.l) + return nil } +func waitFor(wg *sync.WaitGroup, timeout time.Duration, l *zerolog.Logger) { + + done := make(chan struct{}) + + go func() { + wg.Wait() + defer close(done) + }() + + select { + case <-done: + case <-time.After(timeout): + l.Error().Msg("timed out waiting for wait group") + } +} + func (s *DispatcherImpl) SendStepActionEvent(ctx context.Context, request *contracts.StepActionEvent) (*contracts.ActionEventResponse, error) { switch request.EventType { case contracts.StepActionEventType_STEP_EVENT_TYPE_STARTED: