From 82e361eeebe678c177bd71633e8df4b552eee136 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Tue, 24 Oct 2023 15:05:51 +0200 Subject: [PATCH] refactor: `replayObservable#Next() V` to `ReplayObservable#Last(ctx, n) []V` --- pkg/observable/channel/observable.go | 9 --------- pkg/observable/channel/replay.go | 17 +++++++++++++---- pkg/observable/channel/replay_test.go | 5 +++-- pkg/observable/interface.go | 2 -- 4 files changed, 16 insertions(+), 17 deletions(-) diff --git a/pkg/observable/channel/observable.go b/pkg/observable/channel/observable.go index 8e17ad9fb..7b8e2d04e 100644 --- a/pkg/observable/channel/observable.go +++ b/pkg/observable/channel/observable.go @@ -64,15 +64,6 @@ func WithPublisher[V any](publishCh chan V) option[V] { } } -// Next synchronously returns the next value from the observable. -func (obsvbl *channelObservable[V]) Next(ctx context.Context) V { - tempObserver := obsvbl.Subscribe(ctx) - defer tempObserver.Unsubscribe() - - val := <-tempObserver.Ch() - return val -} - // Subscribe returns an observer which is notified when the publishCh channel // receives a value. func (obsvbl *channelObservable[V]) Subscribe(ctx context.Context) observable.Observer[V] { diff --git a/pkg/observable/channel/replay.go b/pkg/observable/channel/replay.go index eb0889695..4b28606ac 100644 --- a/pkg/observable/channel/replay.go +++ b/pkg/observable/channel/replay.go @@ -47,14 +47,23 @@ func Replay[V any]( return replayObsvbl } -// Next synchronously returns the next value from the observable. This will always +// Last synchronously returns the last n values from the replay buffer. This will always // return the first value in the replay buffer, if it exists. -func (ro *replayObservable[V]) Next(ctx context.Context) V { +func (ro *replayObservable[V]) Last(ctx context.Context, n int) []V { tempObserver := ro.Subscribe(ctx) defer tempObserver.Unsubscribe() - val := <-tempObserver.Ch() - return val + if n > cap(ro.notifications) { + n = cap(ro.notifications) + // TODO_THIS_COMMIT: log a warning + } + + values := make([]V, n) + for i, _ := range values { + value := <-tempObserver.Ch() + values[i] = value + } + return values } // Subscribe returns an observer which is notified when the publishCh channel diff --git a/pkg/observable/channel/replay_test.go b/pkg/observable/channel/replay_test.go index 92ac737ab..4d595e4cf 100644 --- a/pkg/observable/channel/replay_test.go +++ b/pkg/observable/channel/replay_test.go @@ -83,6 +83,7 @@ func TestReplayObservable_Next(t *testing.T) { time.Sleep(time.Millisecond) } - require.Equal(t, 3, replayObsvbl.Next(ctx)) - require.Equal(t, 3, replayObsvbl.Next(ctx)) + require.ElementsMatch(t, []int{3}, replayObsvbl.Last(ctx, 1)) + require.Equal(t, []int{3, 4}, replayObsvbl.Last(ctx, 2)) + require.Equal(t, []int{3, 4, 5}, replayObsvbl.Last(ctx, 3)) } diff --git a/pkg/observable/interface.go b/pkg/observable/interface.go index 52d120451..d86da414f 100644 --- a/pkg/observable/interface.go +++ b/pkg/observable/interface.go @@ -19,8 +19,6 @@ type ReplayObservable[V any] interface { // notified of new values asynchronously. // It is analogous to a publisher in a "Fan-Out" system design. type Observable[V any] interface { - // Next synchronously returns the next value from the observable. - Next(context.Context) V // Subscribe returns an observer which is notified when the publishCh channel // receives a value. Subscribe(context.Context) Observer[V]