diff --git a/api/handler.go b/api/handler.go index 318ed3c..4f5d1b7 100644 --- a/api/handler.go +++ b/api/handler.go @@ -98,14 +98,19 @@ func (h *apiHandler) streamHealthHandler() chi.Router { if opts.AuthURL != "" { router.Use(authorization(opts.AuthURL)) } - router.Use( - streamStatus(healthcore), - regionProxy(opts.RegionalHostFormat, opts.OwnRegion)) + regionalMiddlewares := []middleware{ + streamStatus(healthcore), + regionProxy(opts.RegionalHostFormat, opts.OwnRegion), + } h.withMetrics(router, "get_stream_health"). + With(regionalMiddlewares...). MethodFunc("GET", "/health", h.getStreamHealth) h.withMetrics(router, "stream_health_events"). + With(regionalMiddlewares...). MethodFunc("GET", "/events", h.subscribeEvents) + h.withMetrics(router, "wait_stream_active"). + MethodFunc("GET", "/wait-active", h.waitStreamActive) return router } @@ -558,6 +563,22 @@ func (h *apiHandler) subscribeEvents(rw http.ResponseWriter, r *http.Request) { } } +func (h *apiHandler) waitStreamActive(rw http.ResponseWriter, r *http.Request) { + if h.core == nil { + respondError(rw, http.StatusNotImplemented, errors.New("stream healthcore is unavailable")) + return + } + + streamID := apiParam(r, streamIDParam) + err := h.core.WaitActive(r.Context(), streamID) + if err != nil { + respondError(rw, http.StatusInternalServerError, err) + return + } + + rw.WriteHeader(http.StatusNoContent) +} + func makeSSEEventChan(ctx context.Context, pastEvents []data.Event, subscription <-chan data.Event) <-chan jsse.Event { if subscription == nil { events := make(chan jsse.Event, len(pastEvents)) diff --git a/health/core.go b/health/core.go index 80b0205..422aac7 100644 --- a/health/core.go +++ b/health/core.go @@ -191,6 +191,20 @@ func (c *Core) handleSingleEvent(evt data.Event) (err error) { glog.Warningf("Buffer full for health event subscription, skipping message. streamId=%q, eventTs=%q", streamID, ts) } } + + for _, cond := range record.LastStatus.Conditions { + if cond.Type != ConditionActive { + continue + } + // We flag the record as initialized unless, from the received events, + // we know for sure that the stream is inactive. + isInactive := cond.Status != nil && !*cond.Status + if !isInactive { + record.FlagInitialized() + } + break + } + return nil } @@ -247,6 +261,16 @@ func (c *Core) SubscribeEvents(ctx context.Context, manifestID string, lastEvtID return pastEvents, subs, nil } +func (c *Core) WaitActive(ctx context.Context, manifestID string) error { + // We actually create the record here if it doesn't exist, so that we can + // wait for it to be initialized. + record := c.storage.GetOrCreate(manifestID, c.conditionTypes) + if err := record.WaitInitialized(ctx); err != nil { + return err + } + return nil +} + func getPastEventsLocked(record *Record, lastEvtID *uuid.UUID, from, to *time.Time) ([]data.Event, error) { fromIdx, toIdx := 0, len(record.PastEvents) if lastEvtID != nil { diff --git a/health/record.go b/health/record.go index 58b6624..4e09ab4 100644 --- a/health/record.go +++ b/health/record.go @@ -16,7 +16,8 @@ type Record struct { Conditions []data.ConditionType sync.RWMutex - disposed chan struct{} + initialized chan struct{} + disposed chan struct{} PastEvents []data.Event EventsByID map[uuid.UUID]data.Event @@ -32,11 +33,43 @@ func NewRecord(id string, conditionTypes []data.ConditionType) *Record { conditions[i] = data.NewCondition(cond, time.Time{}, nil, nil) } return &Record{ - ID: id, - Conditions: conditionTypes, - disposed: make(chan struct{}), - EventsByID: map[uuid.UUID]data.Event{}, - LastStatus: data.NewHealthStatus(id, conditions), + ID: id, + Conditions: conditionTypes, + initialized: make(chan struct{}), + disposed: make(chan struct{}), + EventsByID: map[uuid.UUID]data.Event{}, + LastStatus: data.NewHealthStatus(id, conditions), + } +} + +// FlagInitialized will flag the record as initialized. It is meant to be called +// after the first event is processed, meaning the record is not empty anymore. +// +// This is used to allow waiting until a stream is started by creating its +// record in an uninitialized state first and calling `WaitInitialized`. The +// initialization flag is simply a channel that is closed, which will unblock +// all goroutines waiting to receive from it (`WaitInitialized`). +func (r *Record) FlagInitialized() { + if !r.IsInitialized() { + close(r.initialized) + } +} + +func (r *Record) IsInitialized() bool { + select { + case <-r.initialized: + return true + default: + return false + } +} + +func (r *Record) WaitInitialized(ctx context.Context) error { + select { + case <-r.initialized: + return nil + case <-ctx.Done(): + return ctx.Err() } } @@ -102,7 +135,10 @@ func (s *RecordStorage) StartCleanupLoop(ctx context.Context, ttl time.Duration) func (s *RecordStorage) Get(id string) (*Record, bool) { if saved, ok := s.records.Load(id); ok { - return saved.(*Record), true + // Until Initialize is called, the record is considered inexistent + if record := saved.(*Record); record.IsInitialized() { + return record, true + } } return nil, false } diff --git a/health/reducer.go b/health/reducer.go index 62d9f30..4ced2df 100644 --- a/health/reducer.go +++ b/health/reducer.go @@ -5,6 +5,8 @@ import ( "github.com/livepeer/livepeer-data/pkg/event" ) +const ConditionActive data.ConditionType = "Active" + type Reducer interface { Bindings() []event.BindingArgs Conditions() []data.ConditionType diff --git a/health/reducers/stream_state.go b/health/reducers/stream_state.go index a2878ae..fa8384e 100644 --- a/health/reducers/stream_state.go +++ b/health/reducers/stream_state.go @@ -4,14 +4,13 @@ import ( "time" "github.com/golang/glog" + "github.com/livepeer/livepeer-data/health" "github.com/livepeer/livepeer-data/pkg/data" "github.com/livepeer/livepeer-data/pkg/event" ) const ( streamStateBindingKey = "stream.state.#" - - ConditionActive data.ConditionType = "Active" ) type ActiveConditionExtraData struct { @@ -28,7 +27,7 @@ func (t StreamStateReducer) Bindings() []event.BindingArgs { } func (t StreamStateReducer) Conditions() []data.ConditionType { - return []data.ConditionType{ConditionActive} + return []data.ConditionType{health.ConditionActive} } func (t StreamStateReducer) Reduce(current *data.HealthStatus, _ interface{}, evtIface data.Event) (*data.HealthStatus, interface{}) { @@ -55,7 +54,7 @@ func (t StreamStateReducer) Reduce(current *data.HealthStatus, _ interface{}, ev current = data.NewHealthStatus(current.ID, conditions) } for i, cond := range conditions { - if cond.Type == ConditionActive { + if cond.Type == health.ConditionActive { newCond := data.NewCondition(cond.Type, evt.Timestamp(), &isActive, cond) newCond.ExtraData = ActiveConditionExtraData{NodeID: evt.NodeID, Region: evt.Region} conditions[i] = newCond @@ -75,7 +74,7 @@ func clearConditions(conditions []*data.Condition) []*data.Condition { } func GetLastActiveData(status *data.HealthStatus) ActiveConditionExtraData { - data, ok := status.Condition(ConditionActive).ExtraData.(ActiveConditionExtraData) + data, ok := status.Condition(health.ConditionActive).ExtraData.(ActiveConditionExtraData) if !ok { return ActiveConditionExtraData{} }