From 332f5273fa84b89250431e1aca72c32c2c03ea16 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Wed, 18 Oct 2023 16:20:50 +0200 Subject: [PATCH] repro: cometbft http rpc client based query client implementation --- pkg/client/comet_query/client.go | 96 +++++++++++++++++++++++++++ pkg/client/comet_query/client_test.go | 50 ++++++++++++++ 2 files changed, 146 insertions(+) create mode 100644 pkg/client/comet_query/client.go create mode 100644 pkg/client/comet_query/client_test.go diff --git a/pkg/client/comet_query/client.go b/pkg/client/comet_query/client.go new file mode 100644 index 000000000..3f3383786 --- /dev/null +++ b/pkg/client/comet_query/client.go @@ -0,0 +1,96 @@ +package comet_query + +import ( + "context" + "encoding/json" + "log" + "pocket/pkg/either" + "sync" + + cometclient "github.com/cometbft/cometbft/rpc/client" + comethttp "github.com/cometbft/cometbft/rpc/client/http" + comettypes "github.com/cometbft/cometbft/rpc/core/types" + + "pocket/pkg/client" + "pocket/pkg/observable/channel" +) + +type cometQueryClient struct { + client cometclient.Client + observablesMu sync.Mutex + observables map[string]client.EventsBytesObservable +} + +var _ client.EventsQueryClient = (*cometQueryClient)(nil) + +func NewCometQueryClient(remote, wsEndpoint string) (client.EventsQueryClient, error) { + cometHttpClient, err := comethttp.New(remote, wsEndpoint) + if err != nil { + return nil, err + } + if err := cometHttpClient.Start(); err != nil { + return nil, err + } + + return &cometQueryClient{ + client: cometHttpClient, + observables: make(map[string]client.EventsBytesObservable), + }, nil +} + +func (cClient *cometQueryClient) EventsBytes( + ctx context.Context, + query string, +) (client.EventsBytesObservable, error) { + cClient.observablesMu.Lock() + defer cClient.observablesMu.Unlock() + + if eventsObservable, ok := cClient.observables[query]; ok { + return eventsObservable, nil + } + + cometEventsCh, err := cClient.client.Subscribe(ctx, query, query) + if err != nil { + return nil, err + } + + eventsObservable, eventsProducer := channel.NewObservable[either.Either[[]byte]]() + cClient.observables[query] = eventsObservable + + go cClient.goProduceEvents(cometEventsCh, eventsProducer) + + return eventsObservable, nil +} + +func (cClient *cometQueryClient) goProduceEvents( + eventsCh <-chan comettypes.ResultEvent, + eventsProducer chan<- either.Either[[]byte], +) { + for { + select { + case event, ok := <-eventsCh: + if !ok { + return + } + + eventJson, err := json.MarshalIndent(event, "", " ") + if err != nil { + eventsProducer <- either.Error[[]byte](err) + } + + log.Printf("events channel received, producing: %s", event) + eventsProducer <- either.Success(eventJson) + } + } +} + +func (cClient *cometQueryClient) Close() { + cClient.observablesMu.Lock() + defer cClient.observablesMu.Unlock() + + for _, obsvbl := range cClient.observables { + obsvbl.UnsubscribeAll() + } + + _ = cClient.client.Stop() +} diff --git a/pkg/client/comet_query/client_test.go b/pkg/client/comet_query/client_test.go new file mode 100644 index 000000000..323df03aa --- /dev/null +++ b/pkg/client/comet_query/client_test.go @@ -0,0 +1,50 @@ +//go:build integration + +package comet_query + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCometQueryClient_EventsObservable(t *testing.T) { + var ( + newBlockQuery = "tm.event='NewBlock'" + notifyTimeout = 5 * time.Second + observedEventCounter = new(uint64) + ctx = context.Background() + ) + + cClient, err := NewCometQueryClient("tcp://localhost:36657", "/websocket") + require.NoError(t, err) + require.NotNil(t, cClient) + + eventsObservable, err := cClient.EventsBytes(ctx, newBlockQuery) + require.NoError(t, err) + + testCtx, testDone := context.WithCancel(context.Background()) + eventsObserver := eventsObservable.Subscribe(ctx) + go func() { + for eitherEvent := range eventsObserver.Ch() { + event, err := eitherEvent.ValueOrError() + assert.NoError(t, err) + + t.Log("received event:") + t.Logf("%s", event) + atomic.AddUint64(observedEventCounter, 1) + testDone() + } + }() + + select { + case <-testCtx.Done(): + require.ErrorIs(t, ctx.Err(), context.Canceled) + case <-time.After(notifyTimeout): + t.Fatal("timeout waiting for error channel to receive") + } +}