diff --git a/pkg/observable/channel/replay.go b/pkg/observable/channel/replay.go index 2b968ea90..70ca7d68c 100644 --- a/pkg/observable/channel/replay.go +++ b/pkg/observable/channel/replay.go @@ -14,14 +14,18 @@ var _ observable.ReplayObservable[any] = &replayObservable[any]{} type replayObservable[V any] struct { *channelObservable[V] - size int - notificationsMu sync.RWMutex - notifications []V - replayObserversMu sync.RWMutex - replayObservers []observable.Observer[V] + // replayBufferSize is the number of replayBuffer to buffer so that they + // can be replayed to new observers. + replayBufferSize int + // replayBufferMu protects replayBuffer from concurrent access/updates. + replayBufferMu sync.RWMutex + // replayBuffer is the buffer of notifications into which new notifications + // will be pushed and which will be sent to new subscribers before any new + // notifications are sent. + replayBuffer []V } -// NewReplayObservable returns a new ReplayObservable with a replay buffer size +// NewReplayObservable returns a new ReplayObservable with a replay buffer replayBufferSize // of n and the corresponding publish channel to notify it of new values. func NewReplayObservable[V any]( ctx context.Context, n int, @@ -46,8 +50,8 @@ func Replay[V any]( replayObsvbl := &replayObservable[V]{ channelObservable: chanObsvbl, - size: n, - notifications: make([]V, 0, n), + replayBufferSize: n, + replayBuffer: make([]V, 0, n), } srcObserver := srcObsvbl.Subscribe(ctx) @@ -62,8 +66,8 @@ func (ro *replayObservable[V]) Last(ctx context.Context, n int) []V { tempObserver := ro.Subscribe(ctx) defer tempObserver.Unsubscribe() - if n > cap(ro.notifications) { - n = cap(ro.notifications) + if n > cap(ro.replayBuffer) { + n = cap(ro.replayBuffer) // TODO_THIS_COMMIT: log a warning } @@ -78,18 +82,18 @@ func (ro *replayObservable[V]) Last(ctx context.Context, n int) []V { // Subscribe returns an observer which is notified when the publishCh channel // receives a value. func (ro *replayObservable[V]) Subscribe(ctx context.Context) observable.Observer[V] { - ro.notificationsMu.RLock() - defer ro.notificationsMu.RUnlock() + ro.replayBufferMu.RLock() + defer ro.replayBufferMu.RUnlock() observer := NewObserver[V](ctx, ro.onUnsubscribe) - // Replay all buffered notifications to the observer channel buffer before + // Replay all buffered replayBuffer to the observer channel buffer before // any new values have an opportunity to send on observerCh (i.e. appending // observer to ro.observers). // // TODO_IMPROVE: this assumes that the observer channel buffer is large enough - // to hold all replay (buffered) notifications. - for _, notification := range ro.notifications { + // to hold all replay (buffered) replayBuffer. + for _, notification := range ro.replayBuffer { observer.notify(notification) } @@ -109,19 +113,19 @@ func (ro *replayObservable[V]) Subscribe(ctx context.Context) observable.Observe return observer } -// goBufferReplayNotifications buffers the last n notifications from a source +// goBufferReplayNotifications buffers the last n replayBuffer from a source // observer. It is intended to be run in a goroutine. func (ro *replayObservable[V]) goBufferReplayNotifications(srcObserver observable.Observer[V]) { for notification := range srcObserver.Ch() { - ro.notificationsMu.Lock() + ro.replayBufferMu.Lock() // Add the notification to the buffer. - if len(ro.notifications) < ro.size { - ro.notifications = append(ro.notifications, notification) + if len(ro.replayBuffer) < ro.replayBufferSize { + ro.replayBuffer = append(ro.replayBuffer, notification) } else { // buffer full, make room for the new notification by removing the // oldest notification. - ro.notifications = append(ro.notifications[1:], notification) + ro.replayBuffer = append(ro.replayBuffer[1:], notification) } - ro.notificationsMu.Unlock() + ro.replayBufferMu.Unlock() } }