Skip to content

Commit

Permalink
[Miner] feat: add replay observable (#93)
Browse files Browse the repository at this point in the history
* feat: add replay observable

(cherry picked from commit ab21790164ab544ae5f1508d3237a3faab33e71e)

* refactor: `replayObservable` as its own interface type

* refactor: `replayObservable#Next() V`  to `ReplayObservable#Last(ctx, n) []V`

* chore: add constructor func for `ReplayObservable`

* test: reorder to improve readibility

* refactor: rename and add godoc comments

* chore: improve naming & comments

* chore: add warning log and improve comments

* test: improve and add tests

* fix: interface assertion

* fix: comment typo

* chore: review improvements

* fix: race

* refactor: add observableInternals interface

(cherry picked from commit 5d149e5297ce7d11dad77983f53be53efd8dae15)

* chore: update last; only block for 1 value min

(cherry picked from commit b24a5e586e9c776a962008043d065a2294fd921c)

* chore: review improvements

* refactor: move add `channelObservableInternals` & migrate its relevant methods & state from channelObservable

* refactor: simplify, cleanup, & improve comments

* chore: review improvements

* fix: bug in `accumulateReplayValues()`

* chore: review feedback improvements

Co-authored-by: Daniel Olshansky <[email protected]>

* fix: use american spelling of cancelation & canceled

---------

Co-authored-by: Daniel Olshansky <[email protected]>
  • Loading branch information
bryanchriswhite and Olshansk authored Oct 30, 2023
1 parent f91bb78 commit 8791c17
Show file tree
Hide file tree
Showing 8 changed files with 669 additions and 128 deletions.
134 changes: 23 additions & 111 deletions pkg/observable/channel/observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package channel

import (
"context"
"sync"

"pocket/pkg/observable"
)

Expand All @@ -13,31 +11,33 @@ import (
// defaultSubscribeBufferSize is the buffer size of a observable's publish channel.
const defaultPublishBufferSize = 50

var _ observable.Observable[any] = (*channelObservable[any])(nil)
var (
_ observable.Observable[any] = (*channelObservable[any])(nil)
_ observerManager[any] = (*channelObservable[any])(nil)
)

// option is a function which receives and can modify the channelObservable state.
type option[V any] func(obs *channelObservable[V])

// channelObservable implements the observable.Observable interface and can be notified
// by sending on its corresponding publishCh channel.
type channelObservable[V any] struct {
// embed observerManager to encapsulate concurrent-safe read/write access to
// observers. This also allows higher-level objects to wrap this observable
// without knowing its specific type by asserting that it implements the
// observerManager interface.
observerManager[V]
// publishCh is an observable-wide channel that is used to receive values
// which are subsequently fanned out to observers.
publishCh chan V
// observersMu protects observers from concurrent access/updates
observersMu *sync.RWMutex
// observers is a list of channelObservers that will be notified when publishCh
// receives a new value.
observers []*channelObserver[V]
}

// NewObservable creates a new observable which is notified when the publishCh
// channel receives a value.
func NewObservable[V any](opts ...option[V]) (observable.Observable[V], chan<- V) {
// initialize an observable that publishes messages from 1 publishCh to N observers
obs := &channelObservable[V]{
observersMu: &sync.RWMutex{},
observers: []*channelObserver[V]{},
observerManager: newObserverManager[V](),
}

for _, opt := range opts {
Expand All @@ -64,125 +64,37 @@ func WithPublisher[V any](publishCh chan V) option[V] {
}
}

// Next synchronously returns the next value from the observable.
func (obsvbl *channelObservable[V]) Next(ctx context.Context) V {
tempObserver := obsvbl.Subscribe(ctx)
defer tempObserver.Unsubscribe()

val := <-tempObserver.Ch()
return val
}

// Subscribe returns an observer which is notified when the publishCh channel
// receives a value.
func (obsvbl *channelObservable[V]) Subscribe(ctx context.Context) observable.Observer[V] {
// must (write) lock observersMu so that we can safely append to the observers list
obsvbl.observersMu.Lock()
defer obsvbl.observersMu.Unlock()

observer := NewObserver[V](ctx, obsvbl.onUnsubscribe)
obsvbl.observers = append(obsvbl.observers, observer)
func (obs *channelObservable[V]) Subscribe(ctx context.Context) observable.Observer[V] {
// Create a new observer and add it to the list of observers to be notified
// when publishCh receives a new value.
observer := NewObserver[V](ctx, obs.observerManager.remove)
obs.observerManager.add(observer)

// caller can rely on context cancellation or call UnsubscribeAll() to unsubscribe
// caller can rely on context cancelation or call UnsubscribeAll() to unsubscribe
// active observers
if ctx != nil {
// asynchronously wait for the context to be done and then unsubscribe
// this observer.
go goUnsubscribeOnDone[V](ctx, observer)
go obs.observerManager.goUnsubscribeOnDone(ctx, observer)
}
return observer
}

// UnsubscribeAll unsubscribes and removes all observers from the observable.
func (obsvbl *channelObservable[V]) UnsubscribeAll() {
obsvbl.unsubscribeAll()
}

// unsubscribeAll unsubscribes and removes all observers from the observable.
func (obsvbl *channelObservable[V]) unsubscribeAll() {
// Copy currentObservers to avoid holding the lock while unsubscribing them.
// The observers at the time of locking, prior to copying, are the canonical
// set of observers which are unsubscribed.
// New or existing Observers may (un)subscribe while the observable is closing.
// Any such observers won't be isClosed but will also stop receiving notifications
// immediately (if they receive any at all).
currentObservers := obsvbl.copyObservers()
for _, observer := range currentObservers {
observer.Unsubscribe()
}

// Reset observers to an empty list. This purges any observers which might have
// subscribed while the observable was closing.
obsvbl.observersMu.Lock()
obsvbl.observers = []*channelObserver[V]{}
obsvbl.observersMu.Unlock()
func (obs *channelObservable[V]) UnsubscribeAll() {
obs.observerManager.removeAll()
}

// goPublish to the publishCh and notify observers when values are received.
// This function is blocking and should be run in a goroutine.
func (obsvbl *channelObservable[V]) goPublish() {
for notification := range obsvbl.publishCh {
// Copy currentObservers to avoid holding the lock while notifying them.
// New or existing Observers may (un)subscribe while this notification
// is being fanned out.
// The observers at the time of locking, prior to copying, are the canonical
// set of observers which receive this notification.
currentObservers := obsvbl.copyObservers()
for _, obsvr := range currentObservers {
// TODO_CONSIDERATION: perhaps continue trying to avoid making this
// notification async as it would effectively use goroutines
// in memory as a buffer (unbounded).
obsvr.notify(notification)
}
func (obs *channelObservable[V]) goPublish() {
for notification := range obs.publishCh {
obs.observerManager.notifyAll(notification)
}

// Here we know that the publisher channel has been closed.
// Unsubscribe all observers as they can no longer receive notifications.
obsvbl.unsubscribeAll()
}

// copyObservers returns a copy of the current observers list. It is safe to
// call concurrently.
func (obsvbl *channelObservable[V]) copyObservers() (observers []*channelObserver[V]) {
defer obsvbl.observersMu.RUnlock()

// This loop blocks on acquiring a read lock on observersMu. If TryRLock
// fails, the loop continues until it succeeds. This is intended to give
// callers a guarantee that this copy operation won't contribute to a deadlock.
for {
// block until a read lock can be acquired
if obsvbl.observersMu.TryRLock() {
break
}
}

observers = make([]*channelObserver[V], len(obsvbl.observers))
copy(observers, obsvbl.observers)

return observers
}

// goUnsubscribeOnDone unsubscribes from the subscription when the context is done.
// It is a blocking function and intended to be called in a goroutine.
func goUnsubscribeOnDone[V any](ctx context.Context, observer observable.Observer[V]) {
<-ctx.Done()
if observer.IsClosed() {
return
}
observer.Unsubscribe()
}

// onUnsubscribe returns a function that removes a given observer from the
// observable's list of observers.
func (obsvbl *channelObservable[V]) onUnsubscribe(toRemove *channelObserver[V]) {
// must (write) lock to iterate over and modify the observers list
obsvbl.observersMu.Lock()
defer obsvbl.observersMu.Unlock()

for i, observer := range obsvbl.observers {
if observer == toRemove {
obsvbl.observers = append((obsvbl.observers)[:i], (obsvbl.observers)[i+1:]...)
break
}
}
obs.observerManager.removeAll()
}
2 changes: 1 addition & 1 deletion pkg/observable/channel/observable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func TestChannelObservable_SequentialPublishAndUnsubscription(t *testing.T) {

// TODO_TECHDEBT/TODO_INCOMPLETE: add coverage for active observers closing when publishCh closes.
func TestChannelObservable_ObserversCloseOnPublishChannelClose(t *testing.T) {
t.Skip("add coverage: all observers should unsubscribeAll when publishCh closes")
t.Skip("add coverage: all observers should unsubscribe when publishCh closes")
}

func delayedPublishFactory[V any](publishCh chan<- V, delay time.Duration) func(value V) {
Expand Down
9 changes: 5 additions & 4 deletions pkg/observable/channel/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ var _ observable.Observer[any] = (*channelObserver[any])(nil)
// channelObserver implements the observable.Observer interface.
type channelObserver[V any] struct {
ctx context.Context
// onUnsubscribe is called in Observer#Unsubscribe, removing the respective
// observer from observers in a concurrency-safe manner.
onUnsubscribe func(toRemove *channelObserver[V])
// onUnsubscribe is called in Observer#Unsubscribe, closing this observer's
// channel and removing it from the respective obervable's observers list
// in a concurrency-safe manner.
onUnsubscribe func(toRemove observable.Observer[V])
// observerMu protects the observerCh and isClosed fields.
observerMu *sync.RWMutex
// observerCh is the channel that is used to emit values to the observer.
Expand All @@ -43,7 +44,7 @@ type channelObserver[V any] struct {
isClosed bool
}

type UnsubscribeFunc[V any] func(toRemove *channelObserver[V])
type UnsubscribeFunc[V any] func(toRemove observable.Observer[V])

func NewObserver[V any](
ctx context.Context,
Expand Down
152 changes: 152 additions & 0 deletions pkg/observable/channel/observer_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package channel

import (
"context"
"sync"

"pocket/pkg/observable"
)

var _ observerManager[any] = (*channelObserverManager[any])(nil)

// observerManager is an interface intended to be used between an observable and some
// higher-level abstraction and/or observable implementation which would embed it.
// Embedding this interface rather than a channelObservable directly allows for
// more transparency and flexibility in higher-level code.
// NOTE: this interface MUST be used with a common concrete Observer type.
// TODO_CONSIDERATION: Consider whether `observerManager` and `Observable` should remain as separate
// types after some more time and experience using both.
type observerManager[V any] interface {
notifyAll(notification V)
add(toAdd observable.Observer[V])
remove(toRemove observable.Observer[V])
removeAll()
goUnsubscribeOnDone(ctx context.Context, observer observable.Observer[V])
}

// TODO_CONSIDERATION: if this were a generic implementation, we wouldn't need
// to cast `toAdd` to a channelObserver in add. There are two things
// currently preventing a generic observerManager implementation:
// 1. channelObserver#notify() is not part of the observable.Observer interface
// and is therefore not accessible here. If we move everything into the
// `observable` pkg so that the unexported member is in scope, then the channel
// pkg can't implement it for the same reason, it's an unexported method defined
// in a different pkg.
// 2. == is not defined for a generic Observer type. We would have to add an Equals()
// to the Observer interface.

// channelObserverManager implements the observerManager interface using
// channelObservers.
type channelObserverManager[V any] struct {
// observersMu protects observers from concurrent access/updates
observersMu *sync.RWMutex
// observers is a list of channelObservers that will be notified when new value
// are received.
observers []*channelObserver[V]
}

func newObserverManager[V any]() *channelObserverManager[V] {
return &channelObserverManager[V]{
observersMu: &sync.RWMutex{},
observers: make([]*channelObserver[V], 0),
}
}

func (com *channelObserverManager[V]) notifyAll(notification V) {
// Copy currentObservers to avoid holding the lock while notifying them.
// New or existing Observers may (un)subscribe while this notification
// is being fanned out.
// The observers at the time of locking, prior to copying, are the canonical
// set of observers which receive this notification.
currentObservers := com.copyObservers()
for _, obsvr := range currentObservers {
// TODO_TECHDEBT: since this synchronously notifies all observers in a loop,
// it is possible to block here, part-way through notifying all observers,
// on a slow observer consumer (i.e. full buffer). Instead, we should notify
// observers with some limited concurrency of "worker" goroutines.
// The storj/common repo contains such a `Limiter` implementation, see:
// https://github.com/storj/common/blob/main/sync2/limiter.go.
obsvr.notify(notification)
}
}

// addObserver implements the respective member of observerManager. It is used
// by the channelObservable implementation as well as embedders of observerManager
// (e.g. replayObservable).
// It panics if toAdd is not a channelObserver.
func (com *channelObserverManager[V]) add(toAdd observable.Observer[V]) {
// must (write) lock observersMu so that we can safely append to the observers list
com.observersMu.Lock()
defer com.observersMu.Unlock()

com.observers = append(com.observers, toAdd.(*channelObserver[V]))
}

// remove removes a given observer from the observable's list of observers.
// It implements the respective member of observerManager and is used by
// the channelObservable implementation as well as embedders of observerManager
// (e.g. replayObservable).
func (com *channelObserverManager[V]) remove(toRemove observable.Observer[V]) {
// must (write) lock to iterate over and modify the observers list
com.observersMu.Lock()
defer com.observersMu.Unlock()

for i, observer := range com.observers {
if observer == toRemove {
com.observers = append((com.observers)[:i], (com.observers)[i+1:]...)
break
}
}
}

// removeAll unsubscribes and removes all observers from the observable.
// It implements the respective member of observerManager and is used by
// the channelObservable implementation as well as embedders of observerManager
// (e.g. replayObservable).
func (com *channelObserverManager[V]) removeAll() {
// Copy currentObservers to avoid holding the lock while unsubscribing them.
// The observers at the time of locking, prior to copying, are the canonical
// set of observers which are unsubscribed.
// New or existing Observers may (un)subscribe while the observable is closing.
// Any such observers won't be isClosed but will also stop receiving notifications
// immediately (if they receive any at all).
currentObservers := com.copyObservers()
for _, observer := range currentObservers {
observer.Unsubscribe()
}

// Reset observers to an empty list. This purges any observers which might have
// subscribed while the observable was closing.
com.observersMu.Lock()
com.observers = []*channelObserver[V]{}
com.observersMu.Unlock()
}

// goUnsubscribeOnDone unsubscribes from the subscription when the context is done.
// It is a blocking function and intended to be called in a goroutine.
func (com *channelObserverManager[V]) goUnsubscribeOnDone(
ctx context.Context,
observer observable.Observer[V],
) {
<-ctx.Done()
if observer.IsClosed() {
return
}
observer.Unsubscribe()
}

// copyObservers returns a copy of the current observers list. It is safe to
// call concurrently. Notably, it is not part of the observerManager interface.
func (com *channelObserverManager[V]) copyObservers() (observers []*channelObserver[V]) {
defer com.observersMu.RUnlock()

// This loop blocks on acquiring a read lock on observersMu. If TryRLock
// fails, the loop continues until it succeeds. This is intended to give
// callers a guarantee that this copy operation won't contribute to a deadlock.
com.observersMu.RLock()

observers = make([]*channelObserver[V], len(com.observers))
copy(observers, com.observers)

return observers
}
Loading

0 comments on commit 8791c17

Please sign in to comment.