Skip to content

Commit

Permalink
Refactor final goroutine in streamPrediction
Browse files Browse the repository at this point in the history
  • Loading branch information
mattt committed Sep 17, 2024
1 parent 8a826b8 commit 2e36999
Showing 1 changed file with 15 additions and 18 deletions.
33 changes: 15 additions & 18 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,30 +222,27 @@ func (r *Client) streamPrediction(ctx context.Context, prediction *Prediction, l
}()

go func() {
err := g.Wait()

defer close(sseChan)
defer close(errChan)

for {
select {
case <-ctx.Done():
if err != nil {
if errors.Is(err, io.EOF) {
// Attempt to reconnect if the connection was closed before the stream was done
r.streamPrediction(ctx, prediction, lastEvent, sseChan, errChan)
return
case <-done:
return
default:
err := g.Wait()
if err != nil {
if err == io.EOF {
// Attempt to reconnect if the connection was closed before the stream was done
r.streamPrediction(ctx, prediction, lastEvent, sseChan, errChan)
continue
}
}

if errors.Is(err, context.Canceled) {
return
}
if errors.Is(err, context.Canceled) {
// Context was canceled, simply return
return
}

errChan <- err
}
select {
case errChan <- err:
default:
// errChan is full or closed
}
}
}()
Expand Down

0 comments on commit 2e36999

Please sign in to comment.