Skip to content

Commit

Permalink
Merge remote-tracking branch 'pokt/main' into feat/block-client
Browse files Browse the repository at this point in the history
* pokt/main:
  [Miner] feat: add events query client (#64)
  • Loading branch information
bryanchriswhite committed Oct 27, 2023
2 parents 3d0dd0a + 2c78154 commit b407e16
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 36 deletions.
164 changes: 132 additions & 32 deletions docs/pkg/client/events_query.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,10 @@
- [Basic Example](#basic-example)
- [Advanced Usage](#advanced-usage)
- [Configuration](#configuration)
- [API Reference](#api-reference)
- [Best Practices](#best-practices)
- [FAQ](#faq)
- [Why use `events_query` over directly using Gorilla WebSockets?](#why-use-events_query-over-directly-using-gorilla-websockets)
- [How can I use a different connection mechanism other than WebSockets?](#how-can-i-use-a-different-connection-mechanism-other-than-websockets)
- [Contributing](#contributing)
- [Changelog](#changelog)
- [License](#license)

## Overview

Expand All @@ -30,7 +26,98 @@ The `events_query` package provides a client interface to subscribe to chain eve

## Architecture Diagrams

_TODO(@bryanchriswhite): Add architecture diagrams for the package._
### Components
```mermaid
---
title: Component Diagram Legend
---
flowchart
a[Component A]
b[Component B]
c[Component C]
d[Component D]
a --"A uses B via B#MethodName()"--> b
a =="A returns C from A#MethodName()"==> c
b -."A uses D via network IO".-> d
```
```mermaid
---
title: EventsQueryClient Components
---
flowchart
subgraph comet[Cometbft Node]
subgraph rpc[JSON-RPC]
sub[subscribe endpoint]
end
end
subgraph eqc[EventsQueryClient]
q1_eb[EventsBytesObservable]
q1_conn[Connection]
q1_dial[Dialer]
end
q1_obsvr1[Observer 1]
q1_obsvr2[Observer 2]
q1_obsvr1 --"#Subscribe()"--> q1_eb
q1_obsvr2 --"#Subscribe()"--> q1_eb
q1_dial =="#DialContext()"==> q1_conn
q1_eb --"#Receive()"--> q1_conn
q1_conn -.-> sub
```

### Subscriptions
```mermaid
---
title: Event Subscription Data Flow
---
flowchart
subgraph comet[Cometbft Node]
subgraph rpc[JSON-RPC]
sub[subscribe endpoint]
end
end
subgraph eqc[EventsQueryClient]
subgraph q1[Query 1]
q1_eb[EventsBytesObservable]
q1_conn[Connection]
end
subgraph q2[Query 2]
q2_conn[Connection]
q2_eb[EventsBytesObservable]
end
end
q1_obsvr1[Query 1 Observer 1]
q1_obsvr2[Query 1 Observer 2]
q2_obsvr[Query 2 Observer]
q1_eb -.-> q1_obsvr1
q1_eb -.-> q1_obsvr2
q2_eb -.-> q2_obsvr
q1_conn -.-> q1_eb
q2_conn -.-> q2_eb
sub -.-> q1_conn
sub -.-> q2_conn
```

## Installation

Expand All @@ -50,55 +137,68 @@ go get github.com/pokt-network/poktroll/pkg/client/events_query
### Basic Example

```go
// Creating a new EventsQueryClient with the default websocket dialer:
ctx := context.Background()

// Creating a new EventsQueryClient with the default, websocket dialer:
cometWebsocketURL := "ws://example.com"
evtClient := eventsquery.NewEventsQueryClient(cometWebsocketURL)

// Subscribing to a specific event:
observable, errCh := evtClient.EventsBytes(context.Background(), "your-query-string")
// Subscribing to a specific event, e.g. newly committed blocks:
// (see: https://docs.cosmos.network/v0.47/core/events#subscribing-to-events)
observable := evtClient.EventsBytes(ctx, "tm.event='NewBlock'")

// Subscribe and receive from the observer channel, typically in some other scope.
observer := observable.Subscribe(ctx)

// Observer channel closes when the context is cancelled, observer is
// unsubscribed, or after the subscription returns an error.
for eitherEvent := range observer.Ch() {
// (see either.Either: https://github.com/pokt-network/poktroll/blob/main/pkg/either/either.go#L3)
eventBz, err := eitherEvent.ValueOrError()

// ...
}
```

### Advanced Usage

_TODO(@bryanchriswhite): Add examples of advanced usage_
```go
// Given some custom dialer & connection implementation, e.g.:
var (
tcpDialer eventsquery.Dialer = exampletcp.NewTcpDialerImpl()
grcpDialer eventsquery.Dialer = examplegrpc.NewGrpcDialerImpl()
)

### Configuration
// Both TCP and gRPC use the TCP scheme as gRPC uses TCP for its transport layer.
cometUrl = "tcp://example.com"

- **WithDialer**: Configure the client to use a custom dialer for connections.
// Creating new EventsQueryClients with a custom tcpDialer:
tcpDialerOpt := eventsquery.WithDialer(tcpDialer)
tcpEvtClient := eventsquery.NewEventsQueryClient(cometUrl, tcpDialerOpt)

## API Reference
// Alternatively, with a custom gRPC dialer:
gcpDialerOpt := eventsquery.WithDialer(grcpDialer)
grpcEvtClient := eventsquery.NewEventsQueryClient(cometUrl, grpcDialerOpt)

- `EventsQueryClient`: Main interface to query events. Methods include:
- `EventsBytes(ctx, query)`: Returns an observable for chain events.
- `Close()`: Close any existing connections and unsubscribe all observers.
- `Connection`: Interface representing a bidirectional message-passing connection.
- `Dialer`: Interface encapsulating the creation of connections.
// ... rest follows the same as the basic example.
```

For the complete API details, see the [godoc](https://pkg.go.dev/github.com/pokt-network/poktroll/pkg/client/events_query).
### Configuration

- **WithDialer**: Configure the client to use a custom dialer for connections.

## Best Practices

- **Connection Handling**: Ensure to close the `EventsQueryClient` when done to free up resources and avoid potential leaks.
- **Error Handling**: Always check the error channel returned by `EventsBytes` for asynchronous errors during operation.
- **Error Handling**: Always check both the synchronous error returned by `EventsBytes` as well as asynchronous errors send over the observable.

## FAQ

#### Why use `events_query` over directly using Gorilla WebSockets?

`events_query` abstracts many of the underlying details and provides a streamlined interface for subscribing to chain events. It also integrates the observable pattern and provides mockable interfaces for better testing.
`events_query` abstracts many of the underlying details and provides a streamlined interface for subscribing to chain events.
It also integrates the observable pattern and provides mockable interfaces for better testing.

#### How can I use a different connection mechanism other than WebSockets?

You can implement the `Dialer` and `Connection` interfaces and use the `WithDialer` configuration to provide your custom dialer.

## Contributing

If you're interested in improving the `events_query` package or adding new features, please start by discussing your ideas in the project's issues section. Check our main contributing guide for more details.

## Changelog

For detailed release notes, see the [CHANGELOG](../CHANGELOG.md).

## License

This package is released under the XYZ License. For more information, see the [LICENSE](../LICENSE) file at the root level.
10 changes: 6 additions & 4 deletions pkg/client/events_query/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,10 @@ func (eqc *eventsQueryClient) newEventsBytesAndConn(
// Construct an eventsBytes for the given query.
eventsBzObservable, eventsBzPublishCh := channel.NewObservable[either.Either[[]byte]]()

// TODO_INVESTIGATE: does this require retry on error?
// Publish either events bytes or an error received from the connection to
// the eventsBz observable.
// NB: intentionally not retrying on error, leaving that to the caller.
// (see: https://github.com/pokt-network/poktroll/pull/64#discussion_r1373826542)
go eqc.goPublishEventsBz(ctx, conn, eventsBzPublishCh)

return &eventsBytesAndConn{
Expand Down Expand Up @@ -271,9 +274,8 @@ func (eqc *eventsQueryClient) eventSubscriptionRequest(query string) ([]byte, er
// randRequestId returns a random 8 byte, base64 request ID which is intended
// for in JSON-RPC requests to uniquely identify distinct RPC requests.
// These request IDs only need to be unique to the extent that they are useful
// to this client for identifying distinct RPC requests.
// These IDs
// are expected to be unique (per request). Its size and keyspace are arbitrary.
// to this client for identifying distinct RPC requests. Their size and keyspace
// are arbitrary.
func randRequestId() string {
requestIdBz := make([]byte, 8) // 8 bytes = 64 bits = uint64
if _, err := rand.Read(requestIdBz); err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/client/events_query/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"pocket/internal/testclient/testeventsquery"
)

// The query use to subscribe for new block events on the websocket endpoint exposed by CometBFT nodes
const committedBlockEventsQuery = "tm.event='NewBlock'"

func TestQueryClient_EventsObservable_Integration(t *testing.T) {
Expand Down

0 comments on commit b407e16

Please sign in to comment.