Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Miner] feat: add block client #65

Merged
merged 97 commits into from
Nov 1, 2023
Merged
Changes from 1 commit
Commits
Show all changes
97 commits
Select commit Hold shift + click to select a range
3f2971b
feat: add the map channel observable operator
bryanchriswhite Oct 20, 2023
2baddb3
Merge remote-tracking branch 'pokt/main' into feat/observable-map
bryanchriswhite Oct 23, 2023
4af6643
feat: add replay observable
bryanchriswhite Oct 23, 2023
765b0c9
Merge branch 'feat/observable-map' into merge/map_x_replay
bryanchriswhite Oct 23, 2023
5f9ce1b
Merge branch 'feat/replay-observable' into merge/map_x_replay
bryanchriswhite Oct 23, 2023
9c42698
chore: add query client interface
bryanchriswhite Oct 14, 2023
6273f52
chore: add query client errors
bryanchriswhite Oct 19, 2023
30a0a28
test: fix false positive, prevent regression, & add comments
bryanchriswhite Oct 23, 2023
6a67cb2
chore: add godoc comment
bryanchriswhite Oct 23, 2023
ad0121d
feat: add query client implementation
bryanchriswhite Oct 14, 2023
ee42737
chore: add connection & dialer wrapper implementations
bryanchriswhite Oct 17, 2023
66fdc79
test: query client & add testquery helper pkg
bryanchriswhite Oct 14, 2023
891faf9
chore: add go_test_integration make target
bryanchriswhite Oct 14, 2023
c12afe6
chore: add internal mocks pkg
bryanchriswhite Oct 23, 2023
de4defe
test: query client integration test
bryanchriswhite Oct 23, 2023
0453b62
docs: add event query client docs
bryanchriswhite Oct 20, 2023
31099ec
chore: update go.mod
bryanchriswhite Oct 14, 2023
a28ad44
chore: re-order `eventsQueryClient` methods to improve readability
bryanchriswhite Oct 24, 2023
bab1465
chore: add godoc comments to testclient helpers
bryanchriswhite Oct 24, 2023
09d16b4
fix: comment formatting
bryanchriswhite Oct 24, 2023
1c2e38e
chore: improve comment & naming in evt query client test
bryanchriswhite Oct 24, 2023
b670aec
test: tune events query client parameters
bryanchriswhite Oct 24, 2023
01278b0
chore: improve godoc comments
bryanchriswhite Oct 24, 2023
f962995
chore: review improvements
bryanchriswhite Oct 24, 2023
72f2916
Merge remote-tracking branch 'pokt/main' into feat/observable-map
bryanchriswhite Oct 24, 2023
163bb45
refactor: `replayObservable` as its own interface type
bryanchriswhite Oct 24, 2023
82e361e
refactor: `replayObservable#Next() V` to `ReplayObservable#Last(ctx,…
bryanchriswhite Oct 24, 2023
299ffb1
chore: add constructor func for `ReplayObservable`
bryanchriswhite Oct 24, 2023
a52603f
test: reorder to improve readibility
bryanchriswhite Oct 24, 2023
65c9e6e
refactor: rename and add godoc comments
bryanchriswhite Oct 24, 2023
de9c0ee
Merge remote-tracking branch 'pokt/main' into feat/replay-observable
bryanchriswhite Oct 24, 2023
cb79b0a
Merge branch 'feat/replay-observable' into merge/map_x_replay
bryanchriswhite Oct 24, 2023
52c26e0
Merge branch 'feat/observable-map' into merge/map_x_replay
bryanchriswhite Oct 24, 2023
dfecf80
Merge branch 'merge/map_x_replay' into feat/query-client
bryanchriswhite Oct 24, 2023
507c79a
chore: improve naming & comments
bryanchriswhite Oct 24, 2023
bebe700
Merge branch 'feat/replay-observable' into merge/map_x_replay
bryanchriswhite Oct 24, 2023
8782c11
Merge branch 'merge/map_x_replay' into feat/query-client
bryanchriswhite Oct 24, 2023
31c0ceb
chore: add warning log and improve comments
bryanchriswhite Oct 25, 2023
f7a8df3
test: improve and add tests
bryanchriswhite Oct 25, 2023
2225e97
fix: interface assertion
bryanchriswhite Oct 25, 2023
84e21b7
fix: comment typo
bryanchriswhite Oct 25, 2023
7df6220
chore: review improvements
bryanchriswhite Oct 25, 2023
6f1cfcb
Merge remote-tracking branch 'pokt/main' into feat/replay-observable
bryanchriswhite Oct 25, 2023
00e0918
fix: race
bryanchriswhite Oct 25, 2023
a963e24
chore: add block client interface
bryanchriswhite Oct 14, 2023
74ebb62
chore: add `MapReplay` operator
bryanchriswhite Oct 24, 2023
44b84ae
feat: add block client
bryanchriswhite Oct 14, 2023
b817851
test: block client integration
bryanchriswhite Oct 14, 2023
9955dbb
test: block client
bryanchriswhite Oct 23, 2023
acc69ef
docs: fix install instructions
bryanchriswhite Oct 25, 2023
9027c1e
fix: race on eventsBytesAndConns map
bryanchriswhite Oct 25, 2023
a19fcc0
fix: interface assertions
bryanchriswhite Oct 25, 2023
92a9db6
Merge branch 'feat/replay-observable' into feat/block-client
bryanchriswhite Oct 25, 2023
71344b3
Merge branch 'feat/replay-observable' into merge/map_x_replay
bryanchriswhite Oct 25, 2023
8978fa8
fix: race
bryanchriswhite Oct 25, 2023
9b90930
Merge branch 'merge/map_x_replay' into feat/query-client
bryanchriswhite Oct 25, 2023
a6d77b9
Merge remote-tracking branch 'pokt/feat/query-client' into feat/query…
bryanchriswhite Oct 25, 2023
a123a13
Merge branch 'feat/query-client' into feat/block-client
bryanchriswhite Oct 25, 2023
4419602
Small updates to the README
Olshansk Oct 25, 2023
6d06bf7
refactor: add observableInternals interface
bryanchriswhite Oct 26, 2023
deed860
chore: update last; only block for 1 value min
bryanchriswhite Oct 26, 2023
7f81ce0
chore: review improvements
bryanchriswhite Oct 26, 2023
fc6f161
Merge branch 'main' into feat/replay-observable
bryanchriswhite Oct 26, 2023
729bcd1
Merge branch 'feat/replay-observable' into feat/block-client
bryanchriswhite Oct 26, 2023
4b2d922
refactor: move add `channelObservableInternals` & migrate its relevan…
bryanchriswhite Oct 26, 2023
614bc01
refactor: simplify, cleanup, & improve comments
bryanchriswhite Oct 26, 2023
c093488
chore: review improvements
bryanchriswhite Oct 26, 2023
874d424
refactor: eliminate `EventsQueryClient#requestId` field
bryanchriswhite Oct 26, 2023
0737a4a
Merge branch 'feat/query-client' into feat/block-client
bryanchriswhite Oct 26, 2023
777e340
refactor: review improvements
bryanchriswhite Oct 26, 2023
73dedf1
refactor: eliminate `EventsQueryClient#requestId` field
bryanchriswhite Oct 26, 2023
fab303f
refactor: move websocket dialer and connection to own pkg
bryanchriswhite Oct 26, 2023
fe681f9
chore: add comment
bryanchriswhite Oct 26, 2023
b0181dd
fix: notify `retryOnError()` of async error propagating through `#Eve…
bryanchriswhite Oct 26, 2023
ec3d475
chore: review improvements
bryanchriswhite Oct 26, 2023
4830c06
chore: move `EventsBytesObservable type above interfaces
bryanchriswhite Oct 26, 2023
4762fa3
chore: review improvements
bryanchriswhite Oct 26, 2023
27ed494
fix: bug & improve naming & comments
bryanchriswhite Oct 26, 2023
1a4169d
Merge branch 'feat/query-client' into feat/block-client
bryanchriswhite Oct 26, 2023
f08fb04
chore: review improvements
bryanchriswhite Oct 27, 2023
f835b13
Merge remote-tracking branch 'pokt/main' into feat/replay-observable
bryanchriswhite Oct 27, 2023
f8ba998
Merge branch 'feat/replay-observable' into feat/block-client
bryanchriswhite Oct 27, 2023
00a025b
fix: bug in `accumulateReplayValues()`
bryanchriswhite Oct 27, 2023
3d0dd0a
Merge branch 'feat/replay-observable' into feat/block-client
bryanchriswhite Oct 27, 2023
b407e16
Merge remote-tracking branch 'pokt/main' into feat/block-client
bryanchriswhite Oct 27, 2023
e04177f
Merge remote-tracking branch 'pokt/main' into feat/replay-observable
bryanchriswhite Oct 27, 2023
27b805e
Merge branch 'feat/replay-observable' into feat/block-client
bryanchriswhite Oct 27, 2023
6c3c2b9
refactor: promote `retryOnError` to its own pkg: `retry.OnError`
bryanchriswhite Oct 30, 2023
6b005de
Merge remote-tracking branch 'pokt/main' into feat/block-client
bryanchriswhite Oct 30, 2023
b226a10
chore: improve comments
bryanchriswhite Oct 30, 2023
7ade4a0
test: inline wip test helpers
bryanchriswhite Oct 30, 2023
7cfe788
test: skip retry.OnError tests & comment
bryanchriswhite Oct 30, 2023
486dba0
chore: review feedback improvements
bryanchriswhite Oct 31, 2023
1677e3f
chore: review feedback improvements
bryanchriswhite Oct 31, 2023
cc466b2
Merge branch 'main' into feat/block-client
bryanchriswhite Oct 31, 2023
2f49089
Merge branch 'main' into feat/block-client
bryanchriswhite Nov 1, 2023
7ae2650
fix: format placeholder error
bryanchriswhite Nov 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
chore: review improvements
bryanchriswhite committed Oct 27, 2023
commit f08fb04cb9bda7109eded6d0a1d5ab101f800854
111 changes: 0 additions & 111 deletions pkg/observable/channel/internals.go

This file was deleted.

52 changes: 17 additions & 35 deletions pkg/observable/channel/observable.go
Original file line number Diff line number Diff line change
@@ -13,7 +13,7 @@ const defaultPublishBufferSize = 50

var (
_ observable.Observable[any] = (*channelObservable[any])(nil)
_ observableInternals[any] = (*channelObservable[any])(nil)
_ observerManager[any] = (*channelObservable[any])(nil)
)

// option is a function which receives and can modify the channelObservable state.
@@ -22,8 +22,11 @@ type option[V any] func(obs *channelObservable[V])
// channelObservable implements the observable.Observable interface and can be notified
// by sending on its corresponding publishCh channel.
type channelObservable[V any] struct {
//observableInternals[V]
channelObservableInternals[V]
// embed observerManager to encapsulate concurrent-safe read/write access to
// observers. This also allows higher-level objects to wrap this observable
// without knowing its specific type by asserting that it implements the
// observerManager interface.
observerManager[V]
// publishCh is an observable-wide channel that is used to receive values
// which are subsequently fanned out to observers.
publishCh chan V
@@ -34,7 +37,7 @@ type channelObservable[V any] struct {
func NewObservable[V any](opts ...option[V]) (observable.Observable[V], chan<- V) {
// initialize an observable that publishes messages from 1 publishCh to N observers
obs := &channelObservable[V]{
channelObservableInternals: newObservableInternals[V](),
observerManager: newObserverManager[V](),
}

for _, opt := range opts {
@@ -63,56 +66,35 @@ func WithPublisher[V any](publishCh chan V) option[V] {

// Subscribe returns an observer which is notified when the publishCh channel
// receives a value.
func (obsvbl *channelObservable[V]) Subscribe(ctx context.Context) observable.Observer[V] {
func (obs *channelObservable[V]) Subscribe(ctx context.Context) observable.Observer[V] {
// Create a new observer and add it to the list of observers to be notified
// when publishCh receives a new value.
observer := NewObserver[V](ctx, obsvbl.onUnsubscribe)
obsvbl.addObserver(observer)
observer := NewObserver[V](ctx, obs.observerManager.remove)
obs.observerManager.add(observer)

// caller can rely on context cancellation or call UnsubscribeAll() to unsubscribe
// active observers
if ctx != nil {
// asynchronously wait for the context to be done and then unsubscribe
// this observer.
go goUnsubscribeOnDone[V](ctx, observer)
go obs.observerManager.goUnsubscribeOnDone(ctx, observer)
}
return observer
}

// UnsubscribeAll unsubscribes and removes all observers from the observable.
func (obsvbl *channelObservable[V]) UnsubscribeAll() {
obsvbl.unsubscribeAll()
func (obs *channelObservable[V]) UnsubscribeAll() {
obs.observerManager.removeAll()
}

// goPublish to the publishCh and notify observers when values are received.
// This function is blocking and should be run in a goroutine.
func (obsvbl *channelObservable[V]) goPublish() {
for notification := range obsvbl.publishCh {
// Copy currentObservers to avoid holding the lock while notifying them.
// New or existing Observers may (un)subscribe while this notification
// is being fanned out.
// The observers at the time of locking, prior to copying, are the canonical
// set of observers which receive this notification.
currentObservers := obsvbl.copyObservers()
for _, obsvr := range currentObservers {
// TODO_CONSIDERATION: perhaps continue trying to avoid making this
// notification async as it would effectively use goroutines
// in memory as a buffer (unbounded).
obsvr.notify(notification)
}
func (obs *channelObservable[V]) goPublish() {
for notification := range obs.publishCh {
obs.observerManager.notifyAll(notification)
}

// Here we know that the publisher channel has been closed.
// Unsubscribe all observers as they can no longer receive notifications.
obsvbl.unsubscribeAll()
}

// goUnsubscribeOnDone unsubscribes from the subscription when the context is done.
// It is a blocking function and intended to be called in a goroutine.
func goUnsubscribeOnDone[V any](ctx context.Context, observer observable.Observer[V]) {
<-ctx.Done()
if observer.IsClosed() {
return
}
observer.Unsubscribe()
obs.observerManager.removeAll()
}
2 changes: 1 addition & 1 deletion pkg/observable/channel/observable_test.go
Original file line number Diff line number Diff line change
@@ -337,7 +337,7 @@ func TestChannelObservable_SequentialPublishAndUnsubscription(t *testing.T) {

// TODO_TECHDEBT/TODO_INCOMPLETE: add coverage for active observers closing when publishCh closes.
func TestChannelObservable_ObserversCloseOnPublishChannelClose(t *testing.T) {
t.Skip("add coverage: all observers should unsubscribeAll when publishCh closes")
t.Skip("add coverage: all observers should unsubscribe when publishCh closes")
}

func delayedPublishFactory[V any](publishCh chan<- V, delay time.Duration) func(value V) {
5 changes: 3 additions & 2 deletions pkg/observable/channel/observer.go
Original file line number Diff line number Diff line change
@@ -29,8 +29,9 @@ var _ observable.Observer[any] = (*channelObserver[any])(nil)
// channelObserver implements the observable.Observer interface.
type channelObserver[V any] struct {
ctx context.Context
// onUnsubscribe is called in Observer#Unsubscribe, removing the respective
// observer from observers in a concurrency-safe manner.
// onUnsubscribe is called in Observer#Unsubscribe, closing this observer's
// channel and removing it from the respective obervable's observers list
// in a concurrency-safe manner.
onUnsubscribe func(toRemove observable.Observer[V])
// observerMu protects the observerCh and isClosed fields.
observerMu *sync.RWMutex
154 changes: 154 additions & 0 deletions pkg/observable/channel/observer_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package channel

import (
"context"
"sync"

"pocket/pkg/observable"
)

var _ observerManager[any] = (*channelObserverManager[any])(nil)

// observerManager is an interface intended to be used between an observable and some
// higher-level abstraction and/or observable implementation which would embed it.
// Embedding this interface rather than a channelObservable directly allows for
// more transparency and flexibility in higher-level code.
// NOTE: this interface MUST be used with a common concrete Observer type.
type observerManager[V any] interface {
notifyAll(notification V)
add(toAdd observable.Observer[V])
remove(toRemove observable.Observer[V])
removeAll()
goUnsubscribeOnDone(ctx context.Context, observer observable.Observer[V])
}

// TODO_CONSIDERATION: if this were a generic implementation, we wouldn't need
// to cast `toAdd` to a channelObserver in add. There are two things
// currently preventing a generic observerManager implementation:
// 1. channelObserver#notify() is not part of the observable.Observer interface
// and is therefore not accessible here. If we move everything into the
// `observable` pkg so that the unexported member is in scope, then the channel
// pkg can't implement it for the same reason, it's an unexported method defined
// in a different pkg.
// 2. == is not defined for a generic Observer type. We would have to add an Equals()
// to the Observer interface.

// channelObserverManager implements the observerManager interface using
// channelObservers.
type channelObserverManager[V any] struct {
// observersMu protects observers from concurrent access/updates
observersMu *sync.RWMutex
// observers is a list of channelObservers that will be notified when new value
// are received.
observers []*channelObserver[V]
}

func newObserverManager[V any]() *channelObserverManager[V] {
return &channelObserverManager[V]{
observersMu: &sync.RWMutex{},
observers: make([]*channelObserver[V], 0),
}
}

func (com *channelObserverManager[V]) notifyAll(notification V) {
// Copy currentObservers to avoid holding the lock while notifying them.
// New or existing Observers may (un)subscribe while this notification
// is being fanned out.
// The observers at the time of locking, prior to copying, are the canonical
// set of observers which receive this notification.
currentObservers := com.copyObservers()
for _, obsvr := range currentObservers {
// TODO_TECHDEBT: since this synchronously notifies all observers in a loop,
// it is possible to block here, part-way through notifying all observers,
// on a slow observer consumer (i.e. full buffer). Instead, we should notify
// observers with some limited concurrency of "worker" goroutines.
// The storj/common repo contains such a `Limiter` implementation, see:
// https://github.com/storj/common/blob/main/sync2/limiter.go.
obsvr.notify(notification)
}
}

// addObserver implements the respective member of observerManager. It is used
// by the channelObservable implementation as well as embedders of observerManager
// (e.g. replayObservable).
// It panics if toAdd is not a channelObserver.
func (com *channelObserverManager[V]) add(
toAdd observable.Observer[V],
) {
// must (write) lock observersMu so that we can safely append to the observers list
com.observersMu.Lock()
defer com.observersMu.Unlock()

com.observers = append(com.observers, toAdd.(*channelObserver[V]))
}

// remove removes a given observer from the observable's list of observers.
// It implements the respective member of observerManager and is used by
// the channelObservable implementation as well as embedders of observerManager
// (e.g. replayObservable).
func (com *channelObserverManager[V]) remove(
toRemove observable.Observer[V],
) {
// must (write) lock to iterate over and modify the observers list
com.observersMu.Lock()
defer com.observersMu.Unlock()

for i, observer := range com.observers {
if observer == toRemove {
com.observers = append((com.observers)[:i], (com.observers)[i+1:]...)
break
}
}
}

// removeAll unsubscribes and removes all observers from the observable.
// It implements the respective member of observerManager and is used by
// the channelObservable implementation as well as embedders of observerManager
// (e.g. replayObservable).
func (com *channelObserverManager[V]) removeAll() {
// Copy currentObservers to avoid holding the lock while unsubscribing them.
// The observers at the time of locking, prior to copying, are the canonical
// set of observers which are unsubscribed.
// New or existing Observers may (un)subscribe while the observable is closing.
// Any such observers won't be isClosed but will also stop receiving notifications
// immediately (if they receive any at all).
currentObservers := com.copyObservers()
for _, observer := range currentObservers {
observer.Unsubscribe()
}

// Reset observers to an empty list. This purges any observers which might have
// subscribed while the observable was closing.
com.observersMu.Lock()
com.observers = []*channelObserver[V]{}
com.observersMu.Unlock()
}

// goUnsubscribeOnDone unsubscribes from the subscription when the context is done.
// It is a blocking function and intended to be called in a goroutine.
func (com *channelObserverManager[V]) goUnsubscribeOnDone(
ctx context.Context,
observer observable.Observer[V],
) {
<-ctx.Done()
if observer.IsClosed() {
return
}
observer.Unsubscribe()
}

// copyObservers returns a copy of the current observers list. It is safe to
// call concurrently. Notably, it is not part of the observerManager interface.
func (com *channelObserverManager[V]) copyObservers() (observers []*channelObserver[V]) {
defer com.observersMu.RUnlock()

// This loop blocks on acquiring a read lock on observersMu. If TryRLock
// fails, the loop continues until it succeeds. This is intended to give
// callers a guarantee that this copy operation won't contribute to a deadlock.
com.observersMu.RLock()

observers = make([]*channelObserver[V], len(com.observers))
copy(observers, com.observers)

return observers
}
Loading