Skip to content

Commit

Permalink
refactor: replayObservable#Next() V to `ReplayObservable#Last(ctx, …
Browse files Browse the repository at this point in the history
…n) []V`
  • Loading branch information
bryanchriswhite committed Oct 24, 2023
1 parent 163bb45 commit 82e361e
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 17 deletions.
9 changes: 0 additions & 9 deletions pkg/observable/channel/observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,6 @@ func WithPublisher[V any](publishCh chan V) option[V] {
}
}

// Next synchronously returns the next value from the observable.
func (obsvbl *channelObservable[V]) Next(ctx context.Context) V {
tempObserver := obsvbl.Subscribe(ctx)
defer tempObserver.Unsubscribe()

val := <-tempObserver.Ch()
return val
}

// Subscribe returns an observer which is notified when the publishCh channel
// receives a value.
func (obsvbl *channelObservable[V]) Subscribe(ctx context.Context) observable.Observer[V] {
Expand Down
17 changes: 13 additions & 4 deletions pkg/observable/channel/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,23 @@ func Replay[V any](
return replayObsvbl
}

// Next synchronously returns the next value from the observable. This will always
// Last synchronously returns the last n values from the replay buffer. This will always
// return the first value in the replay buffer, if it exists.
func (ro *replayObservable[V]) Next(ctx context.Context) V {
func (ro *replayObservable[V]) Last(ctx context.Context, n int) []V {
tempObserver := ro.Subscribe(ctx)
defer tempObserver.Unsubscribe()

val := <-tempObserver.Ch()
return val
if n > cap(ro.notifications) {
n = cap(ro.notifications)
// TODO_THIS_COMMIT: log a warning
}

values := make([]V, n)
for i, _ := range values {
value := <-tempObserver.Ch()
values[i] = value
}
return values
}

// Subscribe returns an observer which is notified when the publishCh channel
Expand Down
5 changes: 3 additions & 2 deletions pkg/observable/channel/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func TestReplayObservable_Next(t *testing.T) {
time.Sleep(time.Millisecond)
}

require.Equal(t, 3, replayObsvbl.Next(ctx))
require.Equal(t, 3, replayObsvbl.Next(ctx))
require.ElementsMatch(t, []int{3}, replayObsvbl.Last(ctx, 1))
require.Equal(t, []int{3, 4}, replayObsvbl.Last(ctx, 2))
require.Equal(t, []int{3, 4, 5}, replayObsvbl.Last(ctx, 3))
}
2 changes: 0 additions & 2 deletions pkg/observable/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ type ReplayObservable[V any] interface {
// notified of new values asynchronously.
// It is analogous to a publisher in a "Fan-Out" system design.
type Observable[V any] interface {
// Next synchronously returns the next value from the observable.
Next(context.Context) V
// Subscribe returns an observer which is notified when the publishCh channel
// receives a value.
Subscribe(context.Context) Observer[V]
Expand Down

0 comments on commit 82e361e

Please sign in to comment.