Skip to content

Commit

Permalink
[Miner] feat: add events query client (#64)
Browse files Browse the repository at this point in the history
* feat: add the map channel observable operator

(cherry picked from commit 22371aa550eb0060b528f4573ba6908bbdfa0c1c)

* feat: add replay observable

(cherry picked from commit ab21790164ab544ae5f1508d3237a3faab33e71e)

* chore: add query client interface

* chore: add query client errors

* test: fix false positive, prevent regression, & add comments

* chore: add godoc comment

* feat: add query client implementation

* chore: add connection & dialer wrapper implementations

* test: query client & add testquery helper pkg

* chore: add go_test_integration make target

* chore: add internal mocks pkg

* test: query client integration test

* docs: add event query client docs

* chore: update go.mod

* chore: re-order `eventsQueryClient` methods to improve readability

* chore: add godoc comments to testclient helpers

* fix: comment formatting

* chore: improve comment & naming in evt query client test

* test: tune events query client parameters

* chore: improve godoc comments

* chore: review improvements

* refactor: `replayObservable` as its own interface type

* refactor: `replayObservable#Next() V`  to `ReplayObservable#Last(ctx, n) []V`

* chore: add constructor func for `ReplayObservable`

* test: reorder to improve readibility

* refactor: rename and add godoc comments

* chore: improve naming & comments

* chore: add warning log and improve comments

* test: improve and add tests

* fix: interface assertion

* fix: comment typo

* chore: review improvements

* fix: race

* fix: race on eventsBytesAndConns map

* fix: interface assertions

Co-authored-by: Redouane Lakrache <[email protected]>

* fix: race

* Small updates to the README

* chore: review improvements

(cherry picked from commit 31555cdc68211964358c43842e0581f565d1afff)

* refactor: eliminate `EventsQueryClient#requestId` field

(cherry picked from commit ccb1d6981f67ab860cb65dde4da15d89bcf57875)

* refactor: eliminate `EventsQueryClient#requestId` field

* refactor: move websocket dialer and connection to own pkg

* chore: add comment

* chore: move `EventsBytesObservable type above interfaces

* chore: review improvements

* fix: bug & improve naming & comments

* chore: review improvements

* chore: review improvements

* chore: add comment

Co-authored-by: Daniel Olshansky <[email protected]>

* revert: replay observable, merged into previous base branch

---------

Co-authored-by: Redouane Lakrache <[email protected]>
Co-authored-by: Daniel Olshansky <[email protected]>
  • Loading branch information
3 people authored Oct 27, 2023
1 parent d7d87eb commit f8b61e4
Show file tree
Hide file tree
Showing 16 changed files with 1,165 additions and 3 deletions.
7 changes: 6 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,11 @@ test_e2e: ## Run all E2E tests

.PHONY: go_test
go_test: go_version_check ## Run all go tests
go test -v ./...
go test -v -race -tags test ./...

.PHONY: go_test_integration
go_test_integration: go_version_check ## Run all go tests, including integration
go test -v -race -tags test,integration ./...

.PHONY: itest
itest: go_version_check ## Run tests iteratively (see usage for more)
Expand All @@ -132,6 +136,7 @@ go_mockgen: ## Use `mockgen` to generate mocks used for testing purposes of all
go generate ./x/gateway/types/
go generate ./x/supplier/types/
go generate ./x/session/types/
go generate ./pkg/...

.PHONY: go_develop
go_develop: proto_regen go_mockgen ## Generate protos and mocks
Expand Down
204 changes: 204 additions & 0 deletions docs/pkg/client/events_query.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
# Package `pocket/pkg/client/events_query` <!-- omit in toc -->

> An event query package for interfacing with [CometBFT](https://cometbft.com/) and the [Cosmos SDK](https://v1.cosmos.network/sdk), facilitating subscriptions to chain event messages.
- [Overview](#overview)
- [Architecture Diagrams](#architecture-diagrams)
- [Installation](#installation)
- [Features](#features)
- [Usage](#usage)
- [Basic Example](#basic-example)
- [Advanced Usage](#advanced-usage)
- [Configuration](#configuration)
- [Best Practices](#best-practices)
- [FAQ](#faq)
- [Why use `events_query` over directly using Gorilla WebSockets?](#why-use-events_query-over-directly-using-gorilla-websockets)
- [How can I use a different connection mechanism other than WebSockets?](#how-can-i-use-a-different-connection-mechanism-other-than-websockets)

## Overview

The `events_query` package provides a client interface to subscribe to chain event messages. It abstracts the underlying connection mechanisms and offers a clear and easy-to-use way to get events from the chain. Highlights:

- Offers subscription to chain event messages matching a given query.
- Uses the Gorilla WebSockets package for underlying connection operations.
- Provides a modular structure with interfaces allowing for mock implementations and testing.
- Offers considerations for potential improvements and replacements, such as integration with the cometbft RPC client.

## Architecture Diagrams

### Components
```mermaid
---
title: Component Diagram Legend
---
flowchart
a[Component A]
b[Component B]
c[Component C]
d[Component D]
a --"A uses B via B#MethodName()"--> b
a =="A returns C from A#MethodName()"==> c
b -."A uses D via network IO".-> d
```
```mermaid
---
title: EventsQueryClient Components
---
flowchart
subgraph comet[Cometbft Node]
subgraph rpc[JSON-RPC]
sub[subscribe endpoint]
end
end
subgraph eqc[EventsQueryClient]
q1_eb[EventsBytesObservable]
q1_conn[Connection]
q1_dial[Dialer]
end
q1_obsvr1[Observer 1]
q1_obsvr2[Observer 2]
q1_obsvr1 --"#Subscribe()"--> q1_eb
q1_obsvr2 --"#Subscribe()"--> q1_eb
q1_dial =="#DialContext()"==> q1_conn
q1_eb --"#Receive()"--> q1_conn
q1_conn -.-> sub
```

### Subscriptions
```mermaid
---
title: Event Subscription Data Flow
---
flowchart
subgraph comet[Cometbft Node]
subgraph rpc[JSON-RPC]
sub[subscribe endpoint]
end
end
subgraph eqc[EventsQueryClient]
subgraph q1[Query 1]
q1_eb[EventsBytesObservable]
q1_conn[Connection]
end
subgraph q2[Query 2]
q2_conn[Connection]
q2_eb[EventsBytesObservable]
end
end
q1_obsvr1[Query 1 Observer 1]
q1_obsvr2[Query 1 Observer 2]
q2_obsvr[Query 2 Observer]
q1_eb -.-> q1_obsvr1
q1_eb -.-> q1_obsvr2
q2_eb -.-> q2_obsvr
q1_conn -.-> q1_eb
q2_conn -.-> q2_eb
sub -.-> q1_conn
sub -.-> q2_conn
```

## Installation

```bash
go get github.com/pokt-network/poktroll/pkg/client/events_query
```

## Features

- **Websocket Connection**: Uses the [Gorilla WebSockets](https://github.com/gorilla/websocket) for implementing the connection interface.
- **Events Subscription**: Subscribe to chain event messages using a simple query mechanism.
- **Dialer Interface**: Offers a `Dialer` interface for constructing connections, which can be easily mocked for tests.
- **Observable Pattern**: Integrates the observable pattern, making it easier to react to chain events.

## Usage

### Basic Example

```go
ctx := context.Background()

// Creating a new EventsQueryClient with the default, websocket dialer:
cometWebsocketURL := "ws://example.com"
evtClient := eventsquery.NewEventsQueryClient(cometWebsocketURL)

// Subscribing to a specific event, e.g. newly committed blocks:
// (see: https://docs.cosmos.network/v0.47/core/events#subscribing-to-events)
observable := evtClient.EventsBytes(ctx, "tm.event='NewBlock'")

// Subscribe and receive from the observer channel, typically in some other scope.
observer := observable.Subscribe(ctx)

// Observer channel closes when the context is cancelled, observer is
// unsubscribed, or after the subscription returns an error.
for eitherEvent := range observer.Ch() {
// (see either.Either: https://github.com/pokt-network/poktroll/blob/main/pkg/either/either.go#L3)
eventBz, err := eitherEvent.ValueOrError()

// ...
}
```

### Advanced Usage

```go
// Given some custom dialer & connection implementation, e.g.:
var (
tcpDialer eventsquery.Dialer = exampletcp.NewTcpDialerImpl()
grcpDialer eventsquery.Dialer = examplegrpc.NewGrpcDialerImpl()
)

// Both TCP and gRPC use the TCP scheme as gRPC uses TCP for its transport layer.
cometUrl = "tcp://example.com"

// Creating new EventsQueryClients with a custom tcpDialer:
tcpDialerOpt := eventsquery.WithDialer(tcpDialer)
tcpEvtClient := eventsquery.NewEventsQueryClient(cometUrl, tcpDialerOpt)

// Alternatively, with a custom gRPC dialer:
gcpDialerOpt := eventsquery.WithDialer(grcpDialer)
grpcEvtClient := eventsquery.NewEventsQueryClient(cometUrl, grpcDialerOpt)

// ... rest follows the same as the basic example.
```

### Configuration

- **WithDialer**: Configure the client to use a custom dialer for connections.

## Best Practices

- **Connection Handling**: Ensure to close the `EventsQueryClient` when done to free up resources and avoid potential leaks.
- **Error Handling**: Always check both the synchronous error returned by `EventsBytes` as well as asynchronous errors send over the observable.

## FAQ

#### Why use `events_query` over directly using Gorilla WebSockets?

`events_query` abstracts many of the underlying details and provides a streamlined interface for subscribing to chain events.
It also integrates the observable pattern and provides mockable interfaces for better testing.

#### How can I use a different connection mechanism other than WebSockets?

You can implement the `Dialer` and `Connection` interfaces and use the `WithDialer` configuration to provide your custom dialer.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ require (
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.3
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.5.0
github.com/grpc-ecosystem/grpc-gateway v1.16.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.2
github.com/regen-network/gocuke v0.6.2
github.com/spf13/cast v1.5.1
github.com/spf13/cobra v1.7.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.4
go.uber.org/multierr v1.11.0
golang.org/x/crypto v0.12.0
golang.org/x/sync v0.3.0
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1
Expand Down Expand Up @@ -123,7 +125,6 @@ require (
github.com/googleapis/gax-go/v2 v2.8.0 // indirect
github.com/gorilla/handlers v1.5.1 // indirect
github.com/gorilla/rpc v1.2.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
github.com/gtank/merlin v0.1.1 // indirect
Expand Down Expand Up @@ -252,7 +253,6 @@ require (
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/dig v1.16.1 // indirect
go.uber.org/fx v1.19.2 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect
golang.org/x/mod v0.11.0 // indirect
Expand Down
Empty file added internal/mocks/.gitkeep
Empty file.
3 changes: 3 additions & 0 deletions internal/testclient/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package testclient

const CometLocalWebsocketURL = "ws://localhost:36657/websocket"
17 changes: 17 additions & 0 deletions internal/testclient/testeventsquery/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package testeventsquery

import (
"testing"

"pocket/internal/testclient"
"pocket/pkg/client"
eventsquery "pocket/pkg/client/events_query"
)

// NewLocalnetClient returns a new events query client which is configured to
// connect to the localnet sequencer.
func NewLocalnetClient(t *testing.T, opts ...client.EventsQueryClientOption) client.EventsQueryClient {
t.Helper()

return eventsquery.NewEventsQueryClient(testclient.CometLocalWebsocketURL, opts...)
}
49 changes: 49 additions & 0 deletions internal/testclient/testeventsquery/connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package testeventsquery

import (
"pocket/pkg/either"
"testing"

"github.com/golang/mock/gomock"

"pocket/internal/mocks/mockclient"
)

// NewOneTimeMockConnAndDialer returns a new mock connection and mock dialer that
// will return the mock connection when DialContext is called. The mock dialer
// will expect DialContext to be called exactly once. The connection mock will
// expect Close to be called exactly once.
// Callers must mock the Receive method with an EXPECT call before the connection
// mock can be used.
func NewOneTimeMockConnAndDialer(t *testing.T) (
*mockclient.MockConnection,
*mockclient.MockDialer,
) {
ctrl := gomock.NewController(t)
connMock := mockclient.NewMockConnection(ctrl)
connMock.EXPECT().Close().
Return(nil).
Times(1)

dialerMock := NewOneTimeMockDialer(t, either.Success(connMock))

return connMock, dialerMock
}

// NewOneTimeMockDialer returns a mock dialer that will return either the given
// connection mock or error when DialContext is called. The mock dialer will
// expect DialContext to be called exactly once.
func NewOneTimeMockDialer(
t *testing.T,
eitherConnMock either.Either[*mockclient.MockConnection],
) *mockclient.MockDialer {
ctrl := gomock.NewController(t)
dialerMock := mockclient.NewMockDialer(ctrl)

connMock, err := eitherConnMock.ValueOrError()
dialerMock.EXPECT().DialContext(gomock.Any(), gomock.Any()).
Return(connMock, err).
Times(1)

return dialerMock
}
Loading

0 comments on commit f8b61e4

Please sign in to comment.