Skip to content

Commit

Permalink
refactor: rename and add godoc comments
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanchriswhite committed Oct 24, 2023
1 parent a52603f commit 65c9e6e
Showing 1 changed file with 25 additions and 21 deletions.
46 changes: 25 additions & 21 deletions pkg/observable/channel/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,18 @@ var _ observable.ReplayObservable[any] = &replayObservable[any]{}

type replayObservable[V any] struct {
*channelObservable[V]
size int
notificationsMu sync.RWMutex
notifications []V
replayObserversMu sync.RWMutex
replayObservers []observable.Observer[V]
// replayBufferSize is the number of replayBuffer to buffer so that they
// can be replayed to new observers.
replayBufferSize int
// replayBufferMu protects replayBuffer from concurrent access/updates.
replayBufferMu sync.RWMutex
// replayBuffer is the buffer of notifications into which new notifications
// will be pushed and which will be sent to new subscribers before any new
// notifications are sent.
replayBuffer []V
}

// NewReplayObservable returns a new ReplayObservable with a replay buffer size
// NewReplayObservable returns a new ReplayObservable with a replay buffer replayBufferSize
// of n and the corresponding publish channel to notify it of new values.
func NewReplayObservable[V any](
ctx context.Context, n int,
Expand All @@ -46,8 +50,8 @@ func Replay[V any](

replayObsvbl := &replayObservable[V]{
channelObservable: chanObsvbl,
size: n,
notifications: make([]V, 0, n),
replayBufferSize: n,
replayBuffer: make([]V, 0, n),
}

srcObserver := srcObsvbl.Subscribe(ctx)
Expand All @@ -62,8 +66,8 @@ func (ro *replayObservable[V]) Last(ctx context.Context, n int) []V {
tempObserver := ro.Subscribe(ctx)
defer tempObserver.Unsubscribe()

if n > cap(ro.notifications) {
n = cap(ro.notifications)
if n > cap(ro.replayBuffer) {
n = cap(ro.replayBuffer)
// TODO_THIS_COMMIT: log a warning
}

Expand All @@ -78,18 +82,18 @@ func (ro *replayObservable[V]) Last(ctx context.Context, n int) []V {
// Subscribe returns an observer which is notified when the publishCh channel
// receives a value.
func (ro *replayObservable[V]) Subscribe(ctx context.Context) observable.Observer[V] {
ro.notificationsMu.RLock()
defer ro.notificationsMu.RUnlock()
ro.replayBufferMu.RLock()
defer ro.replayBufferMu.RUnlock()

observer := NewObserver[V](ctx, ro.onUnsubscribe)

// Replay all buffered notifications to the observer channel buffer before
// Replay all buffered replayBuffer to the observer channel buffer before
// any new values have an opportunity to send on observerCh (i.e. appending
// observer to ro.observers).
//
// TODO_IMPROVE: this assumes that the observer channel buffer is large enough
// to hold all replay (buffered) notifications.
for _, notification := range ro.notifications {
// to hold all replay (buffered) replayBuffer.
for _, notification := range ro.replayBuffer {
observer.notify(notification)
}

Expand All @@ -109,19 +113,19 @@ func (ro *replayObservable[V]) Subscribe(ctx context.Context) observable.Observe
return observer
}

// goBufferReplayNotifications buffers the last n notifications from a source
// goBufferReplayNotifications buffers the last n replayBuffer from a source
// observer. It is intended to be run in a goroutine.
func (ro *replayObservable[V]) goBufferReplayNotifications(srcObserver observable.Observer[V]) {
for notification := range srcObserver.Ch() {
ro.notificationsMu.Lock()
ro.replayBufferMu.Lock()
// Add the notification to the buffer.
if len(ro.notifications) < ro.size {
ro.notifications = append(ro.notifications, notification)
if len(ro.replayBuffer) < ro.replayBufferSize {
ro.replayBuffer = append(ro.replayBuffer, notification)
} else {
// buffer full, make room for the new notification by removing the
// oldest notification.
ro.notifications = append(ro.notifications[1:], notification)
ro.replayBuffer = append(ro.replayBuffer[1:], notification)
}
ro.notificationsMu.Unlock()
ro.replayBufferMu.Unlock()
}
}

0 comments on commit 65c9e6e

Please sign in to comment.