Skip to content

Commit

Permalink
repro: cometbft http rpc client based query client implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanchriswhite committed Oct 26, 2023
1 parent 73dedf1 commit 332f527
Showing 2 changed files with 146 additions and 0 deletions.
96 changes: 96 additions & 0 deletions pkg/client/comet_query/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package comet_query

import (
"context"
"encoding/json"
"log"
"pocket/pkg/either"
"sync"

cometclient "github.com/cometbft/cometbft/rpc/client"
comethttp "github.com/cometbft/cometbft/rpc/client/http"
comettypes "github.com/cometbft/cometbft/rpc/core/types"

"pocket/pkg/client"
"pocket/pkg/observable/channel"
)

type cometQueryClient struct {
client cometclient.Client
observablesMu sync.Mutex
observables map[string]client.EventsBytesObservable
}

var _ client.EventsQueryClient = (*cometQueryClient)(nil)

func NewCometQueryClient(remote, wsEndpoint string) (client.EventsQueryClient, error) {
cometHttpClient, err := comethttp.New(remote, wsEndpoint)
if err != nil {
return nil, err
}
if err := cometHttpClient.Start(); err != nil {
return nil, err
}

return &cometQueryClient{
client: cometHttpClient,
observables: make(map[string]client.EventsBytesObservable),
}, nil
}

func (cClient *cometQueryClient) EventsBytes(
ctx context.Context,
query string,
) (client.EventsBytesObservable, error) {
cClient.observablesMu.Lock()
defer cClient.observablesMu.Unlock()

if eventsObservable, ok := cClient.observables[query]; ok {
return eventsObservable, nil
}

cometEventsCh, err := cClient.client.Subscribe(ctx, query, query)
if err != nil {
return nil, err
}

eventsObservable, eventsProducer := channel.NewObservable[either.Either[[]byte]]()
cClient.observables[query] = eventsObservable

go cClient.goProduceEvents(cometEventsCh, eventsProducer)

return eventsObservable, nil
}

func (cClient *cometQueryClient) goProduceEvents(
eventsCh <-chan comettypes.ResultEvent,
eventsProducer chan<- either.Either[[]byte],
) {
for {
select {
case event, ok := <-eventsCh:
if !ok {
return
}

eventJson, err := json.MarshalIndent(event, "", " ")
if err != nil {
eventsProducer <- either.Error[[]byte](err)
}

log.Printf("events channel received, producing: %s", event)
eventsProducer <- either.Success(eventJson)
}
}
}

func (cClient *cometQueryClient) Close() {
cClient.observablesMu.Lock()
defer cClient.observablesMu.Unlock()

for _, obsvbl := range cClient.observables {
obsvbl.UnsubscribeAll()
}

_ = cClient.client.Stop()
}
50 changes: 50 additions & 0 deletions pkg/client/comet_query/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
//go:build integration

package comet_query

import (
"context"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestCometQueryClient_EventsObservable(t *testing.T) {
var (
newBlockQuery = "tm.event='NewBlock'"
notifyTimeout = 5 * time.Second
observedEventCounter = new(uint64)
ctx = context.Background()
)

cClient, err := NewCometQueryClient("tcp://localhost:36657", "/websocket")
require.NoError(t, err)
require.NotNil(t, cClient)

eventsObservable, err := cClient.EventsBytes(ctx, newBlockQuery)
require.NoError(t, err)

testCtx, testDone := context.WithCancel(context.Background())
eventsObserver := eventsObservable.Subscribe(ctx)
go func() {
for eitherEvent := range eventsObserver.Ch() {
event, err := eitherEvent.ValueOrError()
assert.NoError(t, err)

t.Log("received event:")
t.Logf("%s", event)
atomic.AddUint64(observedEventCounter, 1)
testDone()
}
}()

select {
case <-testCtx.Done():
require.ErrorIs(t, ctx.Err(), context.Canceled)
case <-time.After(notifyTimeout):
t.Fatal("timeout waiting for error channel to receive")
}
}

0 comments on commit 332f527

Please sign in to comment.