Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Miner] feat: add block client #65

Merged
merged 97 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
97 commits
Select commit Hold shift + click to select a range
3f2971b
feat: add the map channel observable operator
bryanchriswhite Oct 20, 2023
2baddb3
Merge remote-tracking branch 'pokt/main' into feat/observable-map
bryanchriswhite Oct 23, 2023
4af6643
feat: add replay observable
bryanchriswhite Oct 23, 2023
765b0c9
Merge branch 'feat/observable-map' into merge/map_x_replay
bryanchriswhite Oct 23, 2023
5f9ce1b
Merge branch 'feat/replay-observable' into merge/map_x_replay
bryanchriswhite Oct 23, 2023
9c42698
chore: add query client interface
bryanchriswhite Oct 14, 2023
6273f52
chore: add query client errors
bryanchriswhite Oct 19, 2023
30a0a28
test: fix false positive, prevent regression, & add comments
bryanchriswhite Oct 23, 2023
6a67cb2
chore: add godoc comment
bryanchriswhite Oct 23, 2023
ad0121d
feat: add query client implementation
bryanchriswhite Oct 14, 2023
ee42737
chore: add connection & dialer wrapper implementations
bryanchriswhite Oct 17, 2023
66fdc79
test: query client & add testquery helper pkg
bryanchriswhite Oct 14, 2023
891faf9
chore: add go_test_integration make target
bryanchriswhite Oct 14, 2023
c12afe6
chore: add internal mocks pkg
bryanchriswhite Oct 23, 2023
de4defe
test: query client integration test
bryanchriswhite Oct 23, 2023
0453b62
docs: add event query client docs
bryanchriswhite Oct 20, 2023
31099ec
chore: update go.mod
bryanchriswhite Oct 14, 2023
a28ad44
chore: re-order `eventsQueryClient` methods to improve readability
bryanchriswhite Oct 24, 2023
bab1465
chore: add godoc comments to testclient helpers
bryanchriswhite Oct 24, 2023
09d16b4
fix: comment formatting
bryanchriswhite Oct 24, 2023
1c2e38e
chore: improve comment & naming in evt query client test
bryanchriswhite Oct 24, 2023
b670aec
test: tune events query client parameters
bryanchriswhite Oct 24, 2023
01278b0
chore: improve godoc comments
bryanchriswhite Oct 24, 2023
f962995
chore: review improvements
bryanchriswhite Oct 24, 2023
72f2916
Merge remote-tracking branch 'pokt/main' into feat/observable-map
bryanchriswhite Oct 24, 2023
163bb45
refactor: `replayObservable` as its own interface type
bryanchriswhite Oct 24, 2023
82e361e
refactor: `replayObservable#Next() V` to `ReplayObservable#Last(ctx,…
bryanchriswhite Oct 24, 2023
299ffb1
chore: add constructor func for `ReplayObservable`
bryanchriswhite Oct 24, 2023
a52603f
test: reorder to improve readibility
bryanchriswhite Oct 24, 2023
65c9e6e
refactor: rename and add godoc comments
bryanchriswhite Oct 24, 2023
de9c0ee
Merge remote-tracking branch 'pokt/main' into feat/replay-observable
bryanchriswhite Oct 24, 2023
cb79b0a
Merge branch 'feat/replay-observable' into merge/map_x_replay
bryanchriswhite Oct 24, 2023
52c26e0
Merge branch 'feat/observable-map' into merge/map_x_replay
bryanchriswhite Oct 24, 2023
dfecf80
Merge branch 'merge/map_x_replay' into feat/query-client
bryanchriswhite Oct 24, 2023
507c79a
chore: improve naming & comments
bryanchriswhite Oct 24, 2023
bebe700
Merge branch 'feat/replay-observable' into merge/map_x_replay
bryanchriswhite Oct 24, 2023
8782c11
Merge branch 'merge/map_x_replay' into feat/query-client
bryanchriswhite Oct 24, 2023
31c0ceb
chore: add warning log and improve comments
bryanchriswhite Oct 25, 2023
f7a8df3
test: improve and add tests
bryanchriswhite Oct 25, 2023
2225e97
fix: interface assertion
bryanchriswhite Oct 25, 2023
84e21b7
fix: comment typo
bryanchriswhite Oct 25, 2023
7df6220
chore: review improvements
bryanchriswhite Oct 25, 2023
6f1cfcb
Merge remote-tracking branch 'pokt/main' into feat/replay-observable
bryanchriswhite Oct 25, 2023
00e0918
fix: race
bryanchriswhite Oct 25, 2023
a963e24
chore: add block client interface
bryanchriswhite Oct 14, 2023
74ebb62
chore: add `MapReplay` operator
bryanchriswhite Oct 24, 2023
44b84ae
feat: add block client
bryanchriswhite Oct 14, 2023
b817851
test: block client integration
bryanchriswhite Oct 14, 2023
9955dbb
test: block client
bryanchriswhite Oct 23, 2023
acc69ef
docs: fix install instructions
bryanchriswhite Oct 25, 2023
9027c1e
fix: race on eventsBytesAndConns map
bryanchriswhite Oct 25, 2023
a19fcc0
fix: interface assertions
bryanchriswhite Oct 25, 2023
92a9db6
Merge branch 'feat/replay-observable' into feat/block-client
bryanchriswhite Oct 25, 2023
71344b3
Merge branch 'feat/replay-observable' into merge/map_x_replay
bryanchriswhite Oct 25, 2023
8978fa8
fix: race
bryanchriswhite Oct 25, 2023
9b90930
Merge branch 'merge/map_x_replay' into feat/query-client
bryanchriswhite Oct 25, 2023
a6d77b9
Merge remote-tracking branch 'pokt/feat/query-client' into feat/query…
bryanchriswhite Oct 25, 2023
a123a13
Merge branch 'feat/query-client' into feat/block-client
bryanchriswhite Oct 25, 2023
4419602
Small updates to the README
Olshansk Oct 25, 2023
6d06bf7
refactor: add observableInternals interface
bryanchriswhite Oct 26, 2023
deed860
chore: update last; only block for 1 value min
bryanchriswhite Oct 26, 2023
7f81ce0
chore: review improvements
bryanchriswhite Oct 26, 2023
fc6f161
Merge branch 'main' into feat/replay-observable
bryanchriswhite Oct 26, 2023
729bcd1
Merge branch 'feat/replay-observable' into feat/block-client
bryanchriswhite Oct 26, 2023
4b2d922
refactor: move add `channelObservableInternals` & migrate its relevan…
bryanchriswhite Oct 26, 2023
614bc01
refactor: simplify, cleanup, & improve comments
bryanchriswhite Oct 26, 2023
c093488
chore: review improvements
bryanchriswhite Oct 26, 2023
874d424
refactor: eliminate `EventsQueryClient#requestId` field
bryanchriswhite Oct 26, 2023
0737a4a
Merge branch 'feat/query-client' into feat/block-client
bryanchriswhite Oct 26, 2023
777e340
refactor: review improvements
bryanchriswhite Oct 26, 2023
73dedf1
refactor: eliminate `EventsQueryClient#requestId` field
bryanchriswhite Oct 26, 2023
fab303f
refactor: move websocket dialer and connection to own pkg
bryanchriswhite Oct 26, 2023
fe681f9
chore: add comment
bryanchriswhite Oct 26, 2023
b0181dd
fix: notify `retryOnError()` of async error propagating through `#Eve…
bryanchriswhite Oct 26, 2023
ec3d475
chore: review improvements
bryanchriswhite Oct 26, 2023
4830c06
chore: move `EventsBytesObservable type above interfaces
bryanchriswhite Oct 26, 2023
4762fa3
chore: review improvements
bryanchriswhite Oct 26, 2023
27ed494
fix: bug & improve naming & comments
bryanchriswhite Oct 26, 2023
1a4169d
Merge branch 'feat/query-client' into feat/block-client
bryanchriswhite Oct 26, 2023
f08fb04
chore: review improvements
bryanchriswhite Oct 27, 2023
f835b13
Merge remote-tracking branch 'pokt/main' into feat/replay-observable
bryanchriswhite Oct 27, 2023
f8ba998
Merge branch 'feat/replay-observable' into feat/block-client
bryanchriswhite Oct 27, 2023
00a025b
fix: bug in `accumulateReplayValues()`
bryanchriswhite Oct 27, 2023
3d0dd0a
Merge branch 'feat/replay-observable' into feat/block-client
bryanchriswhite Oct 27, 2023
b407e16
Merge remote-tracking branch 'pokt/main' into feat/block-client
bryanchriswhite Oct 27, 2023
e04177f
Merge remote-tracking branch 'pokt/main' into feat/replay-observable
bryanchriswhite Oct 27, 2023
27b805e
Merge branch 'feat/replay-observable' into feat/block-client
bryanchriswhite Oct 27, 2023
6c3c2b9
refactor: promote `retryOnError` to its own pkg: `retry.OnError`
bryanchriswhite Oct 30, 2023
6b005de
Merge remote-tracking branch 'pokt/main' into feat/block-client
bryanchriswhite Oct 30, 2023
b226a10
chore: improve comments
bryanchriswhite Oct 30, 2023
7ade4a0
test: inline wip test helpers
bryanchriswhite Oct 30, 2023
7cfe788
test: skip retry.OnError tests & comment
bryanchriswhite Oct 30, 2023
486dba0
chore: review feedback improvements
bryanchriswhite Oct 31, 2023
1677e3f
chore: review feedback improvements
bryanchriswhite Oct 31, 2023
cc466b2
Merge branch 'main' into feat/block-client
bryanchriswhite Oct 31, 2023
2f49089
Merge branch 'main' into feat/block-client
bryanchriswhite Nov 1, 2023
7ae2650
fix: format placeholder error
bryanchriswhite Nov 1, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.19

require (
cosmossdk.io/api v0.3.1
cosmossdk.io/depinject v1.0.0-alpha.3
cosmossdk.io/errors v1.0.0-beta.7
cosmossdk.io/math v1.0.1
github.com/cometbft/cometbft v0.37.2
Expand Down Expand Up @@ -36,7 +37,6 @@ require (
cloud.google.com/go/iam v0.13.0 // indirect
cloud.google.com/go/storage v1.29.0 // indirect
cosmossdk.io/core v0.5.1 // indirect
cosmossdk.io/depinject v1.0.0-alpha.3 // indirect
cosmossdk.io/log v1.1.1-0.20230704160919-88f2c830b0ca // indirect
cosmossdk.io/tools/rosetta v0.2.1 // indirect
filippo.io/edwards25519 v1.0.0 // indirect
Expand Down
27 changes: 27 additions & 0 deletions internal/testclient/testblock/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package testblock

import (
"context"
"testing"

"cosmossdk.io/depinject"
"github.com/stretchr/testify/require"

"pocket/internal/testclient"
"pocket/internal/testclient/testeventsquery"
"pocket/pkg/client"
"pocket/pkg/client/block"
)

func NewLocalnetClient(ctx context.Context, t *testing.T) client.BlockClient {
t.Helper()

queryClient := testeventsquery.NewLocalnetClient(t)
require.NotNil(t, queryClient)

deps := depinject.Supply(queryClient)
bClient, err := block.NewBlockClient(ctx, deps, testclient.CometLocalWebsocketURL)
require.NoError(t, err)

return bClient
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
}
44 changes: 44 additions & 0 deletions pkg/client/block/block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package block

import (
"encoding/json"

"github.com/cometbft/cometbft/types"

"pocket/pkg/client"
)

// cometBlockEvent is used to deserialize incoming committed block event messages
// from the respective events query subscription. It implements the client.Block
// interface by loosely wrapping cometbft's block type, into which messages are
// deserialized.
type cometBlockEvent struct {
Block types.Block `json:"block"`
}

// Height returns the block's height.
func (blockEvent *cometBlockEvent) Height() int64 {
return blockEvent.Block.Height
}

// Hash returns the binary representation of the block's hash as a byte slice.
func (blockEvent *cometBlockEvent) Hash() []byte {
return blockEvent.Block.LastBlockID.Hash.Bytes()
}

// newCometBlockEvent attempts to deserialize the given bytes into a comet block.
// if the resulting block has a height of zero, assume the event was not a block
// event and return an ErrUnmarshalBlockEvent error.
func newCometBlockEvent(blockMsgBz []byte) (client.Block, error) {
blockMsg := new(cometBlockEvent)
if err := json.Unmarshal(blockMsgBz, blockMsg); err != nil {
return nil, err
}

// If msg does not match the expected format then the block's height has a zero value.
if blockMsg.Block.Header.Height == 0 {
return nil, ErrUnmarshalBlockEvent.Wrap(string(blockMsgBz))
}

return blockMsg, nil
}
209 changes: 209 additions & 0 deletions pkg/client/block/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
package block

import (
"context"
"fmt"
"time"

"cosmossdk.io/depinject"

"pocket/pkg/client"
"pocket/pkg/either"
"pocket/pkg/observable"
"pocket/pkg/observable/channel"
"pocket/pkg/retry"
)

const (
// eventsBytesRetryDelay is the delay between retry attempts when the events
// bytes observable returns an error.
eventsBytesRetryDelay = time.Second
// eventsBytesRetryLimit is the maximum number of times to attempt to
// re-establish the events query bytes subscription when the events bytes
// observable returns an error.
eventsBytesRetryLimit = 10
eventsBytesRetryResetTimeout = 10 * time.Second
// NB: cometbft event subscription query for newly committed blocks.
// (see: https://docs.cosmos.network/v0.47/core/events#subscribing-to-events)
committedBlocksQuery = "tm.event='NewBlock'"
// latestBlockObsvblsReplayBufferSize is the replay buffer size of the
// latestBlockObsvbls replay observable which is used to cache the latest block observable.
// It is updated with a new "active" observable when a new
// events query subscription is created, for example, after a non-persistent
// connection error.
latestBlockObsvblsReplayBufferSize = 1
// latestBlockReplayBufferSize is the replay buffer size of the latest block
// replay observable which is notified when block commit events are received
// by the events query client subscription created in goPublishBlocks.
latestBlockReplayBufferSize = 1
)

var (
_ client.BlockClient = (*blockClient)(nil)
_ client.Block = (*cometBlockEvent)(nil)
)

// blockClient implements the BlockClient interface.
type blockClient struct {
// endpointURL is the URL of RPC endpoint which eventsClient subscription
// requests will be sent.
endpointURL string
// eventsClient is the events query client which is used to subscribe to
// newly committed block events. It emits an either value which may contain
// an error, at most, once and closes immediately after if it does.
eventsClient client.EventsQueryClient
// latestBlockObsvbls is a replay observable with replay buffer size 1,
// which holds the "active latest block observable" which is notified when
// block commit events are received by the events query client subscription
// created in goPublishBlocks. This observable (and the one it emits) closes
// when the events bytes observable returns an error and is updated with a
// new "active" observable after a new events query subscription is created.
latestBlockObsvbls observable.ReplayObservable[client.BlocksObservable]
// latestBlockObsvblsReplayPublishCh is the publish channel for latestBlockObsvbls.
// It's used to set blockObsvbl initially and subsequently update it, for
// example, when the connection is re-established after erroring.
latestBlockObsvblsReplayPublishCh chan<- client.BlocksObservable
}

// eventsBytesToBlockMapFn is a convenience type to represent the type of a
// function which maps event subscription message bytes into block event objects.
// This is used as a transformFn in a channel.Map() call and is the type returned
// by the newEventsBytesToBlockMapFn factory function.
type eventBytesToBlockMapFn func(either.Either[[]byte]) (client.Block, bool)

// NewBlockClient creates a new block client from the given dependencies and cometWebsocketURL.
func NewBlockClient(
ctx context.Context,
deps depinject.Config,
cometWebsocketURL string,
) (client.BlockClient, error) {
// Initialize block client
bClient := &blockClient{endpointURL: cometWebsocketURL}
bClient.latestBlockObsvbls, bClient.latestBlockObsvblsReplayPublishCh =
channel.NewReplayObservable[client.BlocksObservable](ctx, latestBlockObsvblsReplayBufferSize)

// Inject dependencies
if err := depinject.Inject(deps, &bClient.eventsClient); err != nil {
Olshansk marked this conversation as resolved.
Show resolved Hide resolved
return nil, err
}

// Concurrently publish blocks to the observable emitted by latestBlockObsvbls.
go bClient.goPublishBlocks(ctx)

return bClient, nil
}

// CommittedBlocksSequence returns a ReplayObservable, with a replay buffer size
// of 1, which is notified when block commit events are received by the events
// query subscription.
func (bClient *blockClient) CommittedBlocksSequence(ctx context.Context) client.BlocksObservable {
// Get the latest block observable from the replay observable. We only ever
// want the last 1 as any prior latest block observable values are closed.
// Directly accessing the zeroth index here is safe because the call to Last
// is guaranteed to return a slice with at least 1 element.
return bClient.latestBlockObsvbls.Last(ctx, 1)[0]
}

// LatestBlock returns the latest committed block that's been received by the
// corresponding events query subscription.
// It blocks until at least one block event has been received.
func (bClient *blockClient) LatestBlock(ctx context.Context) client.Block {
return bClient.CommittedBlocksSequence(ctx).Last(ctx, 1)[0]
}

// Close unsubscribes all observers of the committed blocks sequence observable
// and closes the events query client.
func (bClient *blockClient) Close() {
// Closing eventsClient will cascade unsubscribe and close downstream observers.
bClient.eventsClient.Close()
}

// goPublishBlocks runs the work function returned by retryPublishBlocksFactory,
// re-invoking it according to the arguments to retry.OnError when the events bytes
// observable returns an asynchronous error.
// This function is intended to be called in a goroutine.
func (bClient *blockClient) goPublishBlocks(ctx context.Context) {
// React to errors by getting a new events bytes observable, re-mapping it,
// and send it to latestBlockObsvblsReplayPublishCh such that
// latestBlockObsvbls.Last(ctx, 1) will return it.
publishErr := retry.OnError(
ctx,
eventsBytesRetryLimit,
eventsBytesRetryDelay,
eventsBytesRetryResetTimeout,
"goPublishBlocks",
bClient.retryPublishBlocksFactory(ctx),
)

// If we get here, the retry limit was reached and the retry loop exited.
// Since this function runs in a goroutine, we can't return the error to the
// caller. Instead, we panic.
panic(fmt.Errorf("BlockClient.goPublishBlocks shold never reach this spot: %w", publishErr))
}

// retryPublishBlocksFactory returns a function which is intended to be passed to
// retry.OnError. The returned function pipes event bytes from the events query
// client, maps them to block events, and publishes them to the latestBlockObsvbls
// replay observable.
func (bClient *blockClient) retryPublishBlocksFactory(ctx context.Context) func() chan error {
return func() chan error {
errCh := make(chan error, 1)
eventsBzObsvbl, err := bClient.eventsClient.EventsBytes(ctx, committedBlocksQuery)
if err != nil {
errCh <- err
return errCh
}

// NB: must cast back to generic observable type to use with Map.
// client.BlocksObservable is only used to workaround gomock's lack of
// support for generic types.
eventsBz := observable.Observable[either.Either[[]byte]](eventsBzObsvbl)
blockEventFromEventBz := newEventsBytesToBlockMapFn(errCh)
blocksObsvbl := channel.MapReplay(ctx, latestBlockReplayBufferSize, eventsBz, blockEventFromEventBz)

// Initially set latestBlockObsvbls and update if after retrying on error.
bClient.latestBlockObsvblsReplayPublishCh <- blocksObsvbl

return errCh
}
}

// newEventsBytesToBlockMapFn is a factory for a function which is intended
// to be used as a transformFn in a channel.Map() call. Since the map function
// is called asynchronously, this factory creates a closure around an error channel
// which can be used for asynchronous error signaling from within the map function,
// and handling from the Map call context.
//
// The map function itself attempts to deserialize the given byte slice as a
// committed block event. If the events bytes observable contained an error, this value is not emitted
// (skipped) on the destination observable of the map operation.
// If deserialization failed because the event bytes were for a different event type,
// this value is also skipped.
// If deserialization failed for some other reason, this function panics.
func newEventsBytesToBlockMapFn(errCh chan<- error) eventBytesToBlockMapFn {
return func(eitherEventBz either.Either[[]byte]) (_ client.Block, skip bool) {
eventBz, err := eitherEventBz.ValueOrError()
if err != nil {
errCh <- err
// Don't publish (skip) if eitherEventBz contained an error.
// eitherEventBz should automatically close itself in this case.
// (i.e. no more values should be mapped to this transformFn's respective
// dstObservable).
return nil, true
}

block, err := newCometBlockEvent(eventBz)
if err != nil {
if ErrUnmarshalBlockEvent.Is(err) {
// Don't publish (skip) if the message was not a block event.
return nil, true
}

panic(fmt.Sprintf(
"unexpected error deserializing block event: %s; eventBz: %s",
err, string(eventBz),
))
}
return block, false
}
}
77 changes: 77 additions & 0 deletions pkg/client/block/client_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
//go:build integration

package block_test

import (
"context"
"fmt"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"pocket/internal/testclient/testblock"
"pocket/pkg/client"
)

const blockIntegrationSubTimeout = 5 * time.Second

func TestBlockClient_LatestBlock(t *testing.T) {
ctx := context.Background()

blockClient := testblock.NewLocalnetClient(ctx, t)
require.NotNil(t, blockClient)

block := blockClient.LatestBlock(ctx)
require.NotEmpty(t, block)
}

func TestBlockClient_BlocksObservable(t *testing.T) {
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
ctx := context.Background()

blockClient := testblock.NewLocalnetClient(ctx, t)
require.NotNil(t, blockClient)

blockSub := blockClient.CommittedBlocksSequence(ctx).Subscribe(ctx)

var (
blockMu sync.Mutex
blockCounter int
blocksToRecv = 2
errCh = make(chan error, 1)
)
go func() {
var previousBlock client.Block
for block := range blockSub.Ch() {
if previousBlock != nil {
if !assert.Equal(t, previousBlock.Height()+1, block.Height()) {
errCh <- fmt.Errorf("expected block height %d, got %d", previousBlock.Height()+1, block.Height())
return
}
}
previousBlock = block

require.NotEmpty(t, block)
blockMu.Lock()
blockCounter++
if blockCounter >= blocksToRecv {
errCh <- nil
return
}
blockMu.Unlock()
}
}()

select {
case err := <-errCh:
require.NoError(t, err)
require.Equal(t, blocksToRecv, blockCounter)
case <-time.After(blockIntegrationSubTimeout):
t.Fatalf(
"timed out waiting for block subscription; expected %d blocks, got %d",
blocksToRecv, blockCounter,
)
}
}
Loading