Skip to content

Commit

Permalink
Merge remote-tracking branch 'pokt/main' into issues/120/fix
Browse files Browse the repository at this point in the history
* pokt/main:
  [Miner] feat: add block client (#65)
  • Loading branch information
bryanchriswhite committed Nov 1, 2023
2 parents 6d89ce9 + e85cc8a commit 4405514
Show file tree
Hide file tree
Showing 11 changed files with 969 additions and 3 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.19

require (
cosmossdk.io/api v0.3.1
cosmossdk.io/depinject v1.0.0-alpha.3
cosmossdk.io/errors v1.0.0-beta.7
cosmossdk.io/math v1.0.1
github.com/cometbft/cometbft v0.37.2
Expand Down Expand Up @@ -36,7 +37,6 @@ require (
cloud.google.com/go/iam v0.13.0 // indirect
cloud.google.com/go/storage v1.29.0 // indirect
cosmossdk.io/core v0.5.1 // indirect
cosmossdk.io/depinject v1.0.0-alpha.3 // indirect
cosmossdk.io/log v1.1.1-0.20230704160919-88f2c830b0ca // indirect
cosmossdk.io/tools/rosetta v0.2.1 // indirect
filippo.io/edwards25519 v1.0.0 // indirect
Expand Down
27 changes: 27 additions & 0 deletions internal/testclient/testblock/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package testblock

import (
"context"
"testing"

"cosmossdk.io/depinject"
"github.com/stretchr/testify/require"

"pocket/internal/testclient"
"pocket/internal/testclient/testeventsquery"
"pocket/pkg/client"
"pocket/pkg/client/block"
)

func NewLocalnetClient(ctx context.Context, t *testing.T) client.BlockClient {
t.Helper()

queryClient := testeventsquery.NewLocalnetClient(t)
require.NotNil(t, queryClient)

deps := depinject.Supply(queryClient)
bClient, err := block.NewBlockClient(ctx, deps, testclient.CometLocalWebsocketURL)
require.NoError(t, err)

return bClient
}
44 changes: 44 additions & 0 deletions pkg/client/block/block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package block

import (
"encoding/json"

"github.com/cometbft/cometbft/types"

"pocket/pkg/client"
)

// cometBlockEvent is used to deserialize incoming committed block event messages
// from the respective events query subscription. It implements the client.Block
// interface by loosely wrapping cometbft's block type, into which messages are
// deserialized.
type cometBlockEvent struct {
Block types.Block `json:"block"`
}

// Height returns the block's height.
func (blockEvent *cometBlockEvent) Height() int64 {
return blockEvent.Block.Height
}

// Hash returns the binary representation of the block's hash as a byte slice.
func (blockEvent *cometBlockEvent) Hash() []byte {
return blockEvent.Block.LastBlockID.Hash.Bytes()
}

// newCometBlockEvent attempts to deserialize the given bytes into a comet block.
// if the resulting block has a height of zero, assume the event was not a block
// event and return an ErrUnmarshalBlockEvent error.
func newCometBlockEvent(blockMsgBz []byte) (client.Block, error) {
blockMsg := new(cometBlockEvent)
if err := json.Unmarshal(blockMsgBz, blockMsg); err != nil {
return nil, err
}

// If msg does not match the expected format then the block's height has a zero value.
if blockMsg.Block.Header.Height == 0 {
return nil, ErrUnmarshalBlockEvent.Wrap(string(blockMsgBz))
}

return blockMsg, nil
}
209 changes: 209 additions & 0 deletions pkg/client/block/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
package block

import (
"context"
"fmt"
"time"

"cosmossdk.io/depinject"

"pocket/pkg/client"
"pocket/pkg/either"
"pocket/pkg/observable"
"pocket/pkg/observable/channel"
"pocket/pkg/retry"
)

const (
// eventsBytesRetryDelay is the delay between retry attempts when the events
// bytes observable returns an error.
eventsBytesRetryDelay = time.Second
// eventsBytesRetryLimit is the maximum number of times to attempt to
// re-establish the events query bytes subscription when the events bytes
// observable returns an error.
eventsBytesRetryLimit = 10
eventsBytesRetryResetTimeout = 10 * time.Second
// NB: cometbft event subscription query for newly committed blocks.
// (see: https://docs.cosmos.network/v0.47/core/events#subscribing-to-events)
committedBlocksQuery = "tm.event='NewBlock'"
// latestBlockObsvblsReplayBufferSize is the replay buffer size of the
// latestBlockObsvbls replay observable which is used to cache the latest block observable.
// It is updated with a new "active" observable when a new
// events query subscription is created, for example, after a non-persistent
// connection error.
latestBlockObsvblsReplayBufferSize = 1
// latestBlockReplayBufferSize is the replay buffer size of the latest block
// replay observable which is notified when block commit events are received
// by the events query client subscription created in goPublishBlocks.
latestBlockReplayBufferSize = 1
)

var (
_ client.BlockClient = (*blockClient)(nil)
_ client.Block = (*cometBlockEvent)(nil)
)

// blockClient implements the BlockClient interface.
type blockClient struct {
// endpointURL is the URL of RPC endpoint which eventsClient subscription
// requests will be sent.
endpointURL string
// eventsClient is the events query client which is used to subscribe to
// newly committed block events. It emits an either value which may contain
// an error, at most, once and closes immediately after if it does.
eventsClient client.EventsQueryClient
// latestBlockObsvbls is a replay observable with replay buffer size 1,
// which holds the "active latest block observable" which is notified when
// block commit events are received by the events query client subscription
// created in goPublishBlocks. This observable (and the one it emits) closes
// when the events bytes observable returns an error and is updated with a
// new "active" observable after a new events query subscription is created.
latestBlockObsvbls observable.ReplayObservable[client.BlocksObservable]
// latestBlockObsvblsReplayPublishCh is the publish channel for latestBlockObsvbls.
// It's used to set blockObsvbl initially and subsequently update it, for
// example, when the connection is re-established after erroring.
latestBlockObsvblsReplayPublishCh chan<- client.BlocksObservable
}

// eventsBytesToBlockMapFn is a convenience type to represent the type of a
// function which maps event subscription message bytes into block event objects.
// This is used as a transformFn in a channel.Map() call and is the type returned
// by the newEventsBytesToBlockMapFn factory function.
type eventBytesToBlockMapFn func(either.Either[[]byte]) (client.Block, bool)

// NewBlockClient creates a new block client from the given dependencies and cometWebsocketURL.
func NewBlockClient(
ctx context.Context,
deps depinject.Config,
cometWebsocketURL string,
) (client.BlockClient, error) {
// Initialize block client
bClient := &blockClient{endpointURL: cometWebsocketURL}
bClient.latestBlockObsvbls, bClient.latestBlockObsvblsReplayPublishCh =
channel.NewReplayObservable[client.BlocksObservable](ctx, latestBlockObsvblsReplayBufferSize)

// Inject dependencies
if err := depinject.Inject(deps, &bClient.eventsClient); err != nil {
return nil, err
}

// Concurrently publish blocks to the observable emitted by latestBlockObsvbls.
go bClient.goPublishBlocks(ctx)

return bClient, nil
}

// CommittedBlocksSequence returns a ReplayObservable, with a replay buffer size
// of 1, which is notified when block commit events are received by the events
// query subscription.
func (bClient *blockClient) CommittedBlocksSequence(ctx context.Context) client.BlocksObservable {
// Get the latest block observable from the replay observable. We only ever
// want the last 1 as any prior latest block observable values are closed.
// Directly accessing the zeroth index here is safe because the call to Last
// is guaranteed to return a slice with at least 1 element.
return bClient.latestBlockObsvbls.Last(ctx, 1)[0]
}

// LatestBlock returns the latest committed block that's been received by the
// corresponding events query subscription.
// It blocks until at least one block event has been received.
func (bClient *blockClient) LatestBlock(ctx context.Context) client.Block {
return bClient.CommittedBlocksSequence(ctx).Last(ctx, 1)[0]
}

// Close unsubscribes all observers of the committed blocks sequence observable
// and closes the events query client.
func (bClient *blockClient) Close() {
// Closing eventsClient will cascade unsubscribe and close downstream observers.
bClient.eventsClient.Close()
}

// goPublishBlocks runs the work function returned by retryPublishBlocksFactory,
// re-invoking it according to the arguments to retry.OnError when the events bytes
// observable returns an asynchronous error.
// This function is intended to be called in a goroutine.
func (bClient *blockClient) goPublishBlocks(ctx context.Context) {
// React to errors by getting a new events bytes observable, re-mapping it,
// and send it to latestBlockObsvblsReplayPublishCh such that
// latestBlockObsvbls.Last(ctx, 1) will return it.
publishErr := retry.OnError(
ctx,
eventsBytesRetryLimit,
eventsBytesRetryDelay,
eventsBytesRetryResetTimeout,
"goPublishBlocks",
bClient.retryPublishBlocksFactory(ctx),
)

// If we get here, the retry limit was reached and the retry loop exited.
// Since this function runs in a goroutine, we can't return the error to the
// caller. Instead, we panic.
panic(fmt.Errorf("BlockClient.goPublishBlocks shold never reach this spot: %w", publishErr))
}

// retryPublishBlocksFactory returns a function which is intended to be passed to
// retry.OnError. The returned function pipes event bytes from the events query
// client, maps them to block events, and publishes them to the latestBlockObsvbls
// replay observable.
func (bClient *blockClient) retryPublishBlocksFactory(ctx context.Context) func() chan error {
return func() chan error {
errCh := make(chan error, 1)
eventsBzObsvbl, err := bClient.eventsClient.EventsBytes(ctx, committedBlocksQuery)
if err != nil {
errCh <- err
return errCh
}

// NB: must cast back to generic observable type to use with Map.
// client.BlocksObservable is only used to workaround gomock's lack of
// support for generic types.
eventsBz := observable.Observable[either.Either[[]byte]](eventsBzObsvbl)
blockEventFromEventBz := newEventsBytesToBlockMapFn(errCh)
blocksObsvbl := channel.MapReplay(ctx, latestBlockReplayBufferSize, eventsBz, blockEventFromEventBz)

// Initially set latestBlockObsvbls and update if after retrying on error.
bClient.latestBlockObsvblsReplayPublishCh <- blocksObsvbl

return errCh
}
}

// newEventsBytesToBlockMapFn is a factory for a function which is intended
// to be used as a transformFn in a channel.Map() call. Since the map function
// is called asynchronously, this factory creates a closure around an error channel
// which can be used for asynchronous error signaling from within the map function,
// and handling from the Map call context.
//
// The map function itself attempts to deserialize the given byte slice as a
// committed block event. If the events bytes observable contained an error, this value is not emitted
// (skipped) on the destination observable of the map operation.
// If deserialization failed because the event bytes were for a different event type,
// this value is also skipped.
// If deserialization failed for some other reason, this function panics.
func newEventsBytesToBlockMapFn(errCh chan<- error) eventBytesToBlockMapFn {
return func(eitherEventBz either.Either[[]byte]) (_ client.Block, skip bool) {
eventBz, err := eitherEventBz.ValueOrError()
if err != nil {
errCh <- err
// Don't publish (skip) if eitherEventBz contained an error.
// eitherEventBz should automatically close itself in this case.
// (i.e. no more values should be mapped to this transformFn's respective
// dstObservable).
return nil, true
}

block, err := newCometBlockEvent(eventBz)
if err != nil {
if ErrUnmarshalBlockEvent.Is(err) {
// Don't publish (skip) if the message was not a block event.
return nil, true
}

panic(fmt.Sprintf(
"unexpected error deserializing block event: %s; eventBz: %s",
err, string(eventBz),
))
}
return block, false
}
}
77 changes: 77 additions & 0 deletions pkg/client/block/client_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
//go:build integration

package block_test

import (
"context"
"fmt"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"pocket/internal/testclient/testblock"
"pocket/pkg/client"
)

const blockIntegrationSubTimeout = 5 * time.Second

func TestBlockClient_LatestBlock(t *testing.T) {
ctx := context.Background()

blockClient := testblock.NewLocalnetClient(ctx, t)
require.NotNil(t, blockClient)

block := blockClient.LatestBlock(ctx)
require.NotEmpty(t, block)
}

func TestBlockClient_BlocksObservable(t *testing.T) {
ctx := context.Background()

blockClient := testblock.NewLocalnetClient(ctx, t)
require.NotNil(t, blockClient)

blockSub := blockClient.CommittedBlocksSequence(ctx).Subscribe(ctx)

var (
blockMu sync.Mutex
blockCounter int
blocksToRecv = 2
errCh = make(chan error, 1)
)
go func() {
var previousBlock client.Block
for block := range blockSub.Ch() {
if previousBlock != nil {
if !assert.Equal(t, previousBlock.Height()+1, block.Height()) {
errCh <- fmt.Errorf("expected block height %d, got %d", previousBlock.Height()+1, block.Height())
return
}
}
previousBlock = block

require.NotEmpty(t, block)
blockMu.Lock()
blockCounter++
if blockCounter >= blocksToRecv {
errCh <- nil
return
}
blockMu.Unlock()
}
}()

select {
case err := <-errCh:
require.NoError(t, err)
require.Equal(t, blocksToRecv, blockCounter)
case <-time.After(blockIntegrationSubTimeout):
t.Fatalf(
"timed out waiting for block subscription; expected %d blocks, got %d",
blocksToRecv, blockCounter,
)
}
}
Loading

0 comments on commit 4405514

Please sign in to comment.