diff --git a/pkg/observable/channel/replay.go b/pkg/observable/channel/replay.go index 4b28606ac..2b968ea90 100644 --- a/pkg/observable/channel/replay.go +++ b/pkg/observable/channel/replay.go @@ -21,6 +21,15 @@ type replayObservable[V any] struct { replayObservers []observable.Observer[V] } +// NewReplayObservable returns a new ReplayObservable with a replay buffer size +// of n and the corresponding publish channel to notify it of new values. +func NewReplayObservable[V any]( + ctx context.Context, n int, +) (observable.ReplayObservable[V], chan<- V) { + obsvbl, publishCh := NewObservable[V]() + return Replay[V](ctx, n, obsvbl), publishCh +} + // Replay returns an observable which replays the last n values published to the // source observable to new observers, before publishing new values. func Replay[V any](