From 6e5cad87803c424570c2a17aaa5a0138bbc1661b Mon Sep 17 00:00:00 2001 From: Bryan White Date: Mon, 6 Nov 2023 21:27:23 +0100 Subject: [PATCH 1/3] fix: flaky block client test (#132) * fix: flakey block cliekt test * chore: simplify & react to review feedback * chore: add godoc comment * chore: simplify --- internal/testclient/testeventsquery/client.go | 43 ++++++ pkg/client/block/client.go | 2 +- pkg/client/block/client_test.go | 129 +++++++++--------- 3 files changed, 105 insertions(+), 69 deletions(-) diff --git a/internal/testclient/testeventsquery/client.go b/internal/testclient/testeventsquery/client.go index d55a765ab..0aa618fe9 100644 --- a/internal/testclient/testeventsquery/client.go +++ b/internal/testclient/testeventsquery/client.go @@ -1,11 +1,18 @@ package testeventsquery import ( + "context" "testing" + "time" + "github.com/golang/mock/gomock" + + "github.com/pokt-network/poktroll/internal/mocks/mockclient" "github.com/pokt-network/poktroll/internal/testclient" "github.com/pokt-network/poktroll/pkg/client" eventsquery "github.com/pokt-network/poktroll/pkg/client/events_query" + "github.com/pokt-network/poktroll/pkg/either" + "github.com/pokt-network/poktroll/pkg/observable/channel" ) // NewLocalnetClient returns a new events query client which is configured to @@ -15,3 +22,39 @@ func NewLocalnetClient(t *testing.T, opts ...client.EventsQueryClientOption) cli return eventsquery.NewEventsQueryClient(testclient.CometLocalWebsocketURL, opts...) } + +// NewAnyTimesEventsBytesEventsQueryClient returns a new events query client which +// is configured to return the expected event bytes when queried with the expected +// query, any number of times. The returned client also expects to be closed once. +func NewAnyTimesEventsBytesEventsQueryClient( + ctx context.Context, + t *testing.T, + expectedQuery string, + expectedEventBytes []byte, +) client.EventsQueryClient { + t.Helper() + + ctrl := gomock.NewController(t) + eventsQueryClient := mockclient.NewMockEventsQueryClient(ctrl) + eventsQueryClient.EXPECT().Close().Times(1) + eventsQueryClient.EXPECT(). + EventsBytes(gomock.AssignableToTypeOf(ctx), gomock.Eq(expectedQuery)). + DoAndReturn( + func(ctx context.Context, query string) (client.EventsBytesObservable, error) { + bytesObsvbl, bytesPublishCh := channel.NewReplayObservable[either.Bytes](ctx, 1) + + // Now that the observable is set up, publish the expected event bytes. + // Only need to send once because it's a ReplayObservable. + bytesPublishCh <- either.Success(expectedEventBytes) + + // Wait a tick for the observables to be set up. This isn't strictly + // necessary but is done to mitigate test flakiness. + time.Sleep(10 * time.Millisecond) + + return bytesObsvbl, nil + }, + ). + AnyTimes() + + return eventsQueryClient +} diff --git a/pkg/client/block/client.go b/pkg/client/block/client.go index 54569e60d..18526508d 100644 --- a/pkg/client/block/client.go +++ b/pkg/client/block/client.go @@ -155,7 +155,7 @@ func (bClient *blockClient) retryPublishBlocksFactory(ctx context.Context) func( } // NB: must cast back to generic observable type to use with Map. - // client.BlocksObservable is only used to workaround gomock's lack of + // client.BlocksObservable cannot be an alias due to gomock's lack of // support for generic types. eventsBz := observable.Observable[either.Either[[]byte]](eventsBzObsvbl) blockEventFromEventBz := newEventsBytesToBlockMapFn(errCh) diff --git a/pkg/client/block/client_test.go b/pkg/client/block/client_test.go index b983ff274..b2a5515b3 100644 --- a/pkg/client/block/client_test.go +++ b/pkg/client/block/client_test.go @@ -8,17 +8,20 @@ import ( "cosmossdk.io/depinject" comettypes "github.com/cometbft/cometbft/types" - "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" "github.com/pokt-network/poktroll/internal/testclient" "github.com/pokt-network/poktroll/internal/testclient/testeventsquery" "github.com/pokt-network/poktroll/pkg/client" "github.com/pokt-network/poktroll/pkg/client/block" - eventsquery "github.com/pokt-network/poktroll/pkg/client/events_query" ) -const blockAssertionLoopTimeout = 500 * time.Millisecond +const ( + testTimeoutDuration = 100 * time.Millisecond + + // duplicates pkg/client/block/client.go's committedBlocksQuery for testing purposes + committedBlocksQuery = "tm.event='NewBlock'" +) func TestBlockClient(t *testing.T) { var ( @@ -38,19 +41,15 @@ func TestBlockClient(t *testing.T) { ctx = context.Background() ) - // Set up a mock connection and dialer which are expected to be used once. - connMock, dialerMock := testeventsquery.NewOneTimeMockConnAndDialer(t) - connMock.EXPECT().Send(gomock.Any()).Return(nil).Times(1) - // Mock the Receive method to return the expected block event. - connMock.EXPECT().Receive().DoAndReturn(func() ([]byte, error) { - blockEventJson, err := json.Marshal(expectedBlockEvent) - require.NoError(t, err) - return blockEventJson, nil - }).AnyTimes() - - // Set up events query client dependency. - dialerOpt := eventsquery.WithDialer(dialerMock) - eventsQueryClient := testeventsquery.NewLocalnetClient(t, dialerOpt) + expectedEventBz, err := json.Marshal(expectedBlockEvent) + require.NoError(t, err) + + eventsQueryClient := testeventsquery.NewAnyTimesEventsBytesEventsQueryClient( + ctx, t, + committedBlocksQuery, + expectedEventBz, + ) + deps := depinject.Supply(eventsQueryClient) // Set up block client. @@ -58,60 +57,54 @@ func TestBlockClient(t *testing.T) { require.NoError(t, err) require.NotNil(t, blockClient) - // Run LatestBlock and CommittedBlockSequence concurrently because they can - // block, leading to an unresponsive test. This function sends multiple values - // on the actualBlockCh which are all asserted against in blockAssertionLoop. - // If any of the methods under test hang, the test will time out. - var ( - actualBlockCh = make(chan client.Block, 1) - done = make(chan struct{}, 1) - ) - go func() { - // Test LatestBlock method. - actualBlock := blockClient.LatestBlock(ctx) - require.Equal(t, expectedHeight, actualBlock.Height()) - require.Equal(t, expectedHash, actualBlock.Hash()) - - // Test CommittedBlockSequence method. - blockObservable := blockClient.CommittedBlocksSequence(ctx) - require.NotNil(t, blockObservable) - - // Ensure that the observable is replayable via Last. - actualBlockCh <- blockObservable.Last(ctx, 1)[0] - - // Ensure that the observable is replayable via Subscribe. - blockObserver := blockObservable.Subscribe(ctx) - for block := range blockObserver.Ch() { - actualBlockCh <- block - break - } - - // Signal test completion - done <- struct{}{} - }() - - // blockAssertionLoop ensures that the blocks retrieved from both LatestBlock - // method and CommittedBlocksSequence method match the expected block height - // and hash. This loop waits for blocks to be sent on the actualBlockCh channel - // by the methods being tested. Once the methods are done, they send a signal on - // the "done" channel. If the blockAssertionLoop doesn't receive any block or - // the done signal within a specific timeout, it assumes something has gone wrong - // and fails the test. -blockAssertionLoop: - for { - select { - case actualBlock := <-actualBlockCh: - require.Equal(t, expectedHeight, actualBlock.Height()) - require.Equal(t, expectedHash, actualBlock.Hash()) - case <-done: - break blockAssertionLoop - case <-time.After(blockAssertionLoopTimeout): - t.Fatal("timed out waiting for block event") - } + tests := []struct { + name string + fn func() client.Block + }{ + { + name: "LatestBlock successfully returns latest block", + fn: func() client.Block { + lastBlock := blockClient.LatestBlock(ctx) + return lastBlock + }, + }, + { + name: "CommittedBlocksSequence successfully returns latest block", + fn: func() client.Block { + blockObservable := blockClient.CommittedBlocksSequence(ctx) + require.NotNil(t, blockObservable) + + // Ensure that the observable is replayable via Last. + lastBlock := blockObservable.Last(ctx, 1)[0] + require.Equal(t, expectedHeight, lastBlock.Height()) + require.Equal(t, expectedHash, lastBlock.Hash()) + + return lastBlock + }, + }, } - // Wait a tick for the observables to be set up. - time.Sleep(time.Millisecond) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var actualBlockCh = make(chan client.Block, 10) + + // Run test functions asynchronously because they can block, leading + // to an unresponsive test. If any of the methods under test hang, + // the test will time out in the select statement that follows. + go func(fn func() client.Block) { + actualBlockCh <- fn() + close(actualBlockCh) + }(tt.fn) + + select { + case actualBlock := <-actualBlockCh: + require.Equal(t, expectedHeight, actualBlock.Height()) + require.Equal(t, expectedHash, actualBlock.Hash()) + case <-time.After(testTimeoutDuration): + t.Fatal("timed out waiting for block event") + } + }) + } blockClient.Close() } From e64e26e4fd9c030e1b0cfbfc0ab95d3bef7b4b05 Mon Sep 17 00:00:00 2001 From: Dima Kniazev Date: Mon, 6 Nov 2023 15:38:41 -0800 Subject: [PATCH 2/3] [CI] Build container images (#107) * wip - need info from GitHub CI * build image as a part of main ci * troublshoot w/o test * should be a cp here * wip * more label control * install directly from github * use wget * rerun ci * troubleshoot * more information * it was git context * kill previous run if a new commit is pushed * this should work * remove buildlog * resolve conflicts * perform the tests as well * we will be allright withoug keeping the bin dir * bring back ignite version * also build on mai * refine label actions * Update .github/workflows/go.yml Co-authored-by: Daniel Olshansky * add requested changes * pocketd has been replaced with poktrolld * only change the binary name for now, take care of other pocketd instances later * Update .github/label-actions.yml Co-authored-by: Daniel Olshansky * rename pocketd with poktrolld where necessary * typofix * also use poktrolld for e2e tests --------- Co-authored-by: Daniel Olshansky --- .github/label-actions.yml | 35 ++++++++++++ .github/workflows/go.yml | 54 ++++++++++++++++++- .github/workflows/label-actions.yml | 21 ++++++++ .gitignore | 6 ++- Dockerfile.dev | 23 ++++++++ Makefile | 37 ++++++++----- Tiltfile | 6 +-- e2e/tests/node.go | 2 +- .../client/cli/tx_delegate_to_gateway.go | 2 +- .../client/cli/tx_stake_application.go | 2 +- .../client/cli/tx_undelegate_from_gateway.go | 2 +- .../client/cli/tx_unstake_application.go | 2 +- x/gateway/client/cli/tx_stake_gateway.go | 2 +- x/gateway/client/cli/tx_unstake_gateway.go | 2 +- x/supplier/client/cli/tx_stake_supplier.go | 2 +- x/supplier/client/cli/tx_unstake_supplier.go | 2 +- 16 files changed, 171 insertions(+), 29 deletions(-) create mode 100644 .github/label-actions.yml create mode 100644 .github/workflows/label-actions.yml create mode 100644 Dockerfile.dev diff --git a/.github/label-actions.yml b/.github/label-actions.yml new file mode 100644 index 000000000..b4dc1814d --- /dev/null +++ b/.github/label-actions.yml @@ -0,0 +1,35 @@ +# When `devnet-e2e-test` is added, also assign `devnet` to the PR. +devnet-e2e-test: + prs: + comment: The CI will now also run the e2e tests on devnet, which increases the time it takes to complete all CI checks. + label: + - devnet + +# When `devnet-e2e-test` is removed, also delete `devnet` from the PR. +-devnet-e2e-test: + prs: + unlabel: + - devnet + +# When `devnet` is added, also assign `push-image` to the PR. +devnet: + prs: + label: + - push-image + +# When `devnet` is removed, also delete `devnet-e2e-test` from the PR. +-devnet: + prs: + unlabel: + - devnet-e2e-test + +# Let the developer know that they need to push another commit after attaching the label to PR. +push-image: + prs: + comment: The image is going to be pushed after the next commit. If you want to run an e2e test, it is necessary to push another commit. You can use `make trigger_ci` to push an empty commit. + +# When `push-image` is removed, also delete `devnet` from the PR. +-push-image: + prs: + unlabel: + - devnet diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 1b60ddc4c..6f76334c1 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -8,11 +8,16 @@ on: branches: ["main"] pull_request: +concurrency: + group: ${{ github.head_ref || github.ref_name }} + cancel-in-progress: true + jobs: build: runs-on: ubuntu-latest steps: - name: install ignite + # If this step fails due to ignite.com failing, see #116 for a temporary workaround run: | curl https://get.ignite.com/cli! | bash ignite version @@ -39,7 +44,54 @@ jobs: run: make go_lint - name: Build - run: ignite chain build --debug --skip-proto + run: ignite chain build -v --debug --skip-proto - name: Test run: make go_test + + - name: Set up Docker Buildx + if: (github.ref == 'refs/heads/main') || (contains(github.event.pull_request.labels.*.name, 'push-image')) + uses: docker/setup-buildx-action@v3 + + - name: Docker Metadata action + if: (github.ref == 'refs/heads/main') || (contains(github.event.pull_request.labels.*.name, 'push-image')) + id: meta + uses: docker/metadata-action@v5 + env: + DOCKER_METADATA_PR_HEAD_SHA: "true" + with: + images: | + ghcr.io/pokt-network/pocketd + tags: | + type=ref,event=branch + type=ref,event=pr + type=sha + type=sha,format=long + + - name: Login to GitHub Container Registry + if: (github.ref == 'refs/heads/main') || (contains(github.event.pull_request.labels.*.name, 'push-image')) + uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Copy binary to inside of the Docker context + if: (github.ref == 'refs/heads/main') || (contains(github.event.pull_request.labels.*.name, 'push-image')) + run: | + mkdir -p ./bin # Make sure the bin directory exists + cp $(go env GOPATH)/bin/poktrolld ./bin # Copy the binary to the repo's bin directory + + - name: Build and push Docker image + if: (github.ref == 'refs/heads/main') || (contains(github.event.pull_request.labels.*.name, 'push-image')) + uses: docker/build-push-action@v5 + with: + push: true + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + # NB: Uncomment below if arm64 build is needed; arm64 builds are off by default because build times are significant. + platforms: linux/amd64 #,linux/arm64 + file: Dockerfile.dev + cache-from: type=gha + cache-to: type=gha,mode=max + context: . diff --git a/.github/workflows/label-actions.yml b/.github/workflows/label-actions.yml new file mode 100644 index 000000000..caf1a31cc --- /dev/null +++ b/.github/workflows/label-actions.yml @@ -0,0 +1,21 @@ +name: 'Label Actions' + +on: + issues: + types: [labeled, unlabeled] + pull_request_target: + types: [labeled, unlabeled] + discussion: + types: [labeled, unlabeled] + +permissions: + contents: read + issues: write + pull-requests: write + discussions: write + +jobs: + action: + runs-on: ubuntu-latest + steps: + - uses: dessant/label-actions@v3 diff --git a/.gitignore b/.gitignore index aa7066b08..5dc239e58 100644 --- a/.gitignore +++ b/.gitignore @@ -22,7 +22,6 @@ go.work # Don't commit binaries bin -!bin/.keep # Before we provision the localnet, `ignite` creates the accounts, genesis, etc. for us # As many of the files are dynamic, we only preserve the config files in git history. @@ -57,4 +56,7 @@ ts-client/ **/*_mock.go # Localnet config -localnet_config.yaml \ No newline at end of file +localnet_config.yaml + +# Relase artifacts produced by `ignite chain build --release` +release diff --git a/Dockerfile.dev b/Dockerfile.dev new file mode 100644 index 000000000..2d10955e0 --- /dev/null +++ b/Dockerfile.dev @@ -0,0 +1,23 @@ +# This Dockerfile is used to build container image for development purposes. +# It intentionally contains no security features, ships with code and troubleshooting tools. + +FROM golang:1.20 as base + +RUN apt update && \ + apt-get install -y \ + ca-certificates \ + curl jq make + +# enable faster module downloading. +ENV GOPROXY https://proxy.golang.org + +COPY . /poktroll + +WORKDIR /poktroll + +RUN mv /poktroll/bin/poktrolld /usr/bin/poktrolld + +EXPOSE 8545 +EXPOSE 8546 + +ENTRYPOINT ["ignite"] diff --git a/Makefile b/Makefile index d6d3257dc..19c8fe59e 100644 --- a/Makefile +++ b/Makefile @@ -231,11 +231,11 @@ todo_this_commit: ## List all the TODOs needed to be done in this commit .PHONY: gateway_list gateway_list: ## List all the staked gateways - pocketd --home=$(POCKETD_HOME) q gateway list-gateway --node $(POCKET_NODE) + poktrolld --home=$(POCKETD_HOME) q gateway list-gateway --node $(POCKET_NODE) .PHONY: gateway_stake gateway_stake: ## Stake tokens for the gateway specified (must specify the gateway env var) - pocketd --home=$(POCKETD_HOME) tx gateway stake-gateway 1000upokt --keyring-backend test --from $(GATEWAY) --node $(POCKET_NODE) + poktrolld --home=$(POCKETD_HOME) tx gateway stake-gateway 1000upokt --keyring-backend test --from $(GATEWAY) --node $(POCKET_NODE) .PHONY: gateway1_stake gateway1_stake: ## Stake gateway1 @@ -251,7 +251,7 @@ gateway3_stake: ## Stake gateway3 .PHONY: gateway_unstake gateway_unstake: ## Unstake an gateway (must specify the GATEWAY env var) - pocketd --home=$(POCKETD_HOME) tx gateway unstake-gateway --keyring-backend test --from $(GATEWAY) --node $(POCKET_NODE) + poktrolld --home=$(POCKETD_HOME) tx gateway unstake-gateway --keyring-backend test --from $(GATEWAY) --node $(POCKET_NODE) .PHONY: gateway1_unstake gateway1_unstake: ## Unstake gateway1 @@ -271,11 +271,11 @@ gateway3_unstake: ## Unstake gateway3 .PHONY: app_list app_list: ## List all the staked applications - pocketd --home=$(POCKETD_HOME) q application list-application --node $(POCKET_NODE) + poktrolld --home=$(POCKETD_HOME) q application list-application --node $(POCKET_NODE) .PHONY: app_stake app_stake: ## Stake tokens for the application specified (must specify the APP and SERVICES env vars) - pocketd --home=$(POCKETD_HOME) tx application stake-application 1000upokt $(SERVICES) --keyring-backend test --from $(APP) --node $(POCKET_NODE) + poktrolld --home=$(POCKETD_HOME) tx application stake-application 1000upokt $(SERVICES) --keyring-backend test --from $(APP) --node $(POCKET_NODE) .PHONY: app1_stake app1_stake: ## Stake app1 @@ -291,7 +291,7 @@ app3_stake: ## Stake app3 .PHONY: app_unstake app_unstake: ## Unstake an application (must specify the APP env var) - pocketd --home=$(POCKETD_HOME) tx application unstake-application --keyring-backend test --from $(APP) --node $(POCKET_NODE) + poktrolld --home=$(POCKETD_HOME) tx application unstake-application --keyring-backend test --from $(APP) --node $(POCKET_NODE) .PHONY: app1_unstake app1_unstake: ## Unstake app1 @@ -307,7 +307,7 @@ app3_unstake: ## Unstake app3 .PHONY: app_delegate app_delegate: ## Delegate trust to a gateway (must specify the APP and GATEWAY_ADDR env vars). Requires the app to be staked - pocketd --home=$(POCKETD_HOME) tx application delegate-to-gateway $(GATEWAY_ADDR) --keyring-backend test --from $(APP) --node $(POCKET_NODE) + poktrolld --home=$(POCKETD_HOME) tx application delegate-to-gateway $(GATEWAY_ADDR) --keyring-backend test --from $(APP) --node $(POCKET_NODE) .PHONY: app1_delegate_gateway1 app1_delegate_gateway1: ## Delegate trust to gateway1 @@ -323,7 +323,7 @@ app3_delegate_gateway3: ## Delegate trust to gateway3 .PHONY: app_undelegate app_undelegate: ## Undelegate trust to a gateway (must specify the APP and GATEWAY_ADDR env vars). Requires the app to be staked - pocketd --home=$(POCKETD_HOME) tx application undelegate-from-gateway $(GATEWAY_ADDR) --keyring-backend test --from $(APP) --node $(POCKET_NODE) + poktrolld --home=$(POCKETD_HOME) tx application undelegate-from-gateway $(GATEWAY_ADDR) --keyring-backend test --from $(APP) --node $(POCKET_NODE) .PHONY: app1_undelegate_gateway1 app1_undelegate_gateway1: ## Undelegate trust to gateway1 @@ -343,13 +343,13 @@ app3_undelegate_gateway3: ## Undelegate trust to gateway3 .PHONY: supplier_list supplier_list: ## List all the staked supplier - pocketd --home=$(POCKETD_HOME) q supplier list-supplier --node $(POCKET_NODE) + poktrolld --home=$(POCKETD_HOME) q supplier list-supplier --node $(POCKET_NODE) # TODO(@Olshansk, @okdas): Add more services (in addition to anvil) for apps and suppliers to stake for. # TODO_TECHDEBT: svc1, svc2 and svc3 below are only in place to make GetSession testable .PHONY: supplier_stake supplier_stake: ## Stake tokens for the supplier specified (must specify the APP env var) - pocketd --home=$(POCKETD_HOME) tx supplier stake-supplier 1000upokt "$(SERVICES)" --keyring-backend test --from $(SUPPLIER) --node $(POCKET_NODE) + poktrolld --home=$(POCKETD_HOME) tx supplier stake-supplier 1000upokt "$(SERVICES)" --keyring-backend test --from $(SUPPLIER) --node $(POCKET_NODE) .PHONY: supplier1_stake supplier1_stake: ## Stake supplier1 @@ -365,7 +365,7 @@ supplier3_stake: ## Stake supplier3 .PHONY: supplier_unstake supplier_unstake: ## Unstake an supplier (must specify the SUPPLIER env var) - pocketd --home=$(POCKETD_HOME) tx supplier unstake-supplier --keyring-backend test --from $(SUPPLIER) --node $(POCKET_NODE) + poktrolld --home=$(POCKETD_HOME) tx supplier unstake-supplier --keyring-backend test --from $(SUPPLIER) --node $(POCKET_NODE) .PHONY: supplier1_unstake supplier1_unstake: ## Unstake supplier1 @@ -386,10 +386,10 @@ supplier3_unstake: ## Unstake supplier3 .PHONY: acc_balance_query acc_balance_query: ## Query the balance of the account specified (make acc_balance_query ACC=pokt...) @echo "~~~ Balances ~~~" - pocketd --home=$(POCKETD_HOME) q bank balances $(ACC) --node $(POCKET_NODE) + poktrolld --home=$(POCKETD_HOME) q bank balances $(ACC) --node $(POCKET_NODE) @echo "~~~ Spendable Balances ~~~" @echo "Querying spendable balance for $(ACC)" - pocketd --home=$(POCKETD_HOME) q bank spendable-balances $(ACC) --node $(POCKET_NODE) + poktrolld --home=$(POCKETD_HOME) q bank spendable-balances $(ACC) --node $(POCKET_NODE) .PHONY: acc_balance_query_module_app acc_balance_query_module_app: ## Query the balance of the network level "application" module @@ -405,7 +405,7 @@ acc_balance_query_app1: ## Query the balance of app1 .PHONY: acc_balance_total_supply acc_balance_total_supply: ## Query the total supply of the network - pocketd --home=$(POCKETD_HOME) q bank total --node $(POCKET_NODE) + poktrolld --home=$(POCKETD_HOME) q bank total --node $(POCKET_NODE) ###################### ### Ignite Helpers ### @@ -415,6 +415,15 @@ acc_balance_total_supply: ## Query the total supply of the network ignite_acc_list: ## List all the accounts in LocalNet ignite account list --keyring-dir=$(POCKETD_HOME) --keyring-backend test --address-prefix $(POCKET_ADDR_PREFIX) +################## +### CI Helpers ### +################## + +.PHONY: trigger_ci +trigger_ci: ## Trigger the CI pipeline by submitting an empty commit; See https://github.com/pokt-network/pocket/issues/900 for details + git commit --allow-empty -m "Empty commit" + git push + ##################### ### Documentation ### ##################### diff --git a/Tiltfile b/Tiltfile index 1fd0dd779..17521b14b 100644 --- a/Tiltfile +++ b/Tiltfile @@ -101,12 +101,12 @@ docker_build_with_restart( dockerfile_contents="""FROM golang:1.20.8 RUN apt-get -q update && apt-get install -qyy curl jq RUN go install github.com/go-delve/delve/cmd/dlv@latest -COPY bin/pocketd /usr/local/bin/pocketd +COPY bin/poktrolld /usr/local/bin/pocketd WORKDIR / """, - only=["./bin/pocketd"], + only=["./bin/poktrolld"], entrypoint=["/bin/sh", "/scripts/pocket.sh"], - live_update=[sync("bin/pocketd", "/usr/local/bin/pocketd")], + live_update=[sync("bin/poktrolld", "/usr/local/bin/pocketd")], ) # Run celestia and anvil nodes diff --git a/e2e/tests/node.go b/e2e/tests/node.go index 4e34fa827..e46ad2889 100644 --- a/e2e/tests/node.go +++ b/e2e/tests/node.go @@ -67,7 +67,7 @@ func (p *pocketdBin) RunCommandOnHost(rpcUrl string, args ...string) (*commandRe func (p *pocketdBin) runCmd(args ...string) (*commandResult, error) { base := []string{"--home", defaultHome} args = append(base, args...) - cmd := exec.Command("pocketd", args...) + cmd := exec.Command("poktrolld", args...) r := &commandResult{} out, err := cmd.Output() if err != nil { diff --git a/x/application/client/cli/tx_delegate_to_gateway.go b/x/application/client/cli/tx_delegate_to_gateway.go index 324f88622..ea251e6cd 100644 --- a/x/application/client/cli/tx_delegate_to_gateway.go +++ b/x/application/client/cli/tx_delegate_to_gateway.go @@ -22,7 +22,7 @@ that delegates authority to the gateway specified to sign relays requests for th act on the behalf of the application during a session. Example: -$ pocketd --home=$(POCKETD_HOME) tx application delegate-to-gateway $(GATEWAY_ADDR) --keyring-backend test --from $(APP) --node $(POCKET_NODE)`, +$ poktrolld --home=$(POCKETD_HOME) tx application delegate-to-gateway $(GATEWAY_ADDR) --keyring-backend test --from $(APP) --node $(POCKET_NODE)`, Args: cobra.ExactArgs(1), RunE: func(cmd *cobra.Command, args []string) (err error) { gatewayAddress := args[0] diff --git a/x/application/client/cli/tx_stake_application.go b/x/application/client/cli/tx_stake_application.go index 4b077e6c2..510cfd648 100644 --- a/x/application/client/cli/tx_stake_application.go +++ b/x/application/client/cli/tx_stake_application.go @@ -27,7 +27,7 @@ func CmdStakeApplication() *cobra.Command { will stake the tokens and serviceIds and associate them with the application specified by the 'from' address. Example: -$ pocketd --home=$(POCKETD_HOME) tx application stake-application 1000upokt svc1,svc2,svc3 --keyring-backend test --from $(APP) --node $(POCKET_NODE)`, +$ poktrolld --home=$(POCKETD_HOME) tx application stake-application 1000upokt svc1,svc2,svc3 --keyring-backend test --from $(APP) --node $(POCKET_NODE)`, Args: cobra.ExactArgs(2), RunE: func(cmd *cobra.Command, args []string) (err error) { stakeString := args[0] diff --git a/x/application/client/cli/tx_undelegate_from_gateway.go b/x/application/client/cli/tx_undelegate_from_gateway.go index 95a770baa..308a5d8a0 100644 --- a/x/application/client/cli/tx_undelegate_from_gateway.go +++ b/x/application/client/cli/tx_undelegate_from_gateway.go @@ -22,7 +22,7 @@ that removes the authority from the gateway specified to sign relays requests fo act on the behalf of the application during a session. Example: -$ pocketd --home=$(POCKETD_HOME) tx application undelegate-from-gateway $(GATEWAY_ADDR) --keyring-backend test --from $(APP) --node $(POCKET_NODE)`, +$ poktrolld --home=$(POCKETD_HOME) tx application undelegate-from-gateway $(GATEWAY_ADDR) --keyring-backend test --from $(APP) --node $(POCKET_NODE)`, Args: cobra.ExactArgs(1), RunE: func(cmd *cobra.Command, args []string) (err error) { gatewayAddress := args[0] diff --git a/x/application/client/cli/tx_unstake_application.go b/x/application/client/cli/tx_unstake_application.go index ebf720a82..bfbf10e32 100644 --- a/x/application/client/cli/tx_unstake_application.go +++ b/x/application/client/cli/tx_unstake_application.go @@ -22,7 +22,7 @@ func CmdUnstakeApplication() *cobra.Command { the application specified by the 'from' address. Example: -$ pocketd --home=$(POCKETD_HOME) tx application unstake-application --keyring-backend test --from $(APP) --node $(POCKET_NODE)`, +$ poktrolld --home=$(POCKETD_HOME) tx application unstake-application --keyring-backend test --from $(APP) --node $(POCKET_NODE)`, Args: cobra.ExactArgs(0), RunE: func(cmd *cobra.Command, args []string) (err error) { diff --git a/x/gateway/client/cli/tx_stake_gateway.go b/x/gateway/client/cli/tx_stake_gateway.go index 2104b2523..2c363b43b 100644 --- a/x/gateway/client/cli/tx_stake_gateway.go +++ b/x/gateway/client/cli/tx_stake_gateway.go @@ -21,7 +21,7 @@ func CmdStakeGateway() *cobra.Command { Long: `Stake a gateway with the provided parameters. This is a broadcast operation that will stake the tokens and associate them with the gateway specified by the 'from' address. Example: -$ pocketd --home=$(POCKETD_HOME) tx gateway stake-gateway 1000upokt --keyring-backend test --from $(GATEWAY) --node $(POCKET_NODE)`, +$ poktrolld --home=$(POCKETD_HOME) tx gateway stake-gateway 1000upokt --keyring-backend test --from $(GATEWAY) --node $(POCKET_NODE)`, Args: cobra.ExactArgs(1), RunE: func(cmd *cobra.Command, args []string) (err error) { clientCtx, err := client.GetClientTxContext(cmd) diff --git a/x/gateway/client/cli/tx_unstake_gateway.go b/x/gateway/client/cli/tx_unstake_gateway.go index b57fd9eb7..e417b7540 100644 --- a/x/gateway/client/cli/tx_unstake_gateway.go +++ b/x/gateway/client/cli/tx_unstake_gateway.go @@ -21,7 +21,7 @@ func CmdUnstakeGateway() *cobra.Command { Long: `Unstake a gateway. This is a broadcast operation that will unstake the gateway specified by the 'from' address. Example: -$ pocketd --home=$(POCKETD_HOME) tx gateway unstake-gateway --keyring-backend test --from $(GATEWAY) --node $(POCKET_NODE)`, +$ poktrolld --home=$(POCKETD_HOME) tx gateway unstake-gateway --keyring-backend test --from $(GATEWAY) --node $(POCKET_NODE)`, Args: cobra.ExactArgs(0), RunE: func(cmd *cobra.Command, _ []string) (err error) { clientCtx, err := client.GetClientTxContext(cmd) diff --git a/x/supplier/client/cli/tx_stake_supplier.go b/x/supplier/client/cli/tx_stake_supplier.go index f223799cf..eac4b4044 100644 --- a/x/supplier/client/cli/tx_stake_supplier.go +++ b/x/supplier/client/cli/tx_stake_supplier.go @@ -32,7 +32,7 @@ of comma separated values of the form 'service;url' where 'service' is the servi For example, an application that stakes for 'anvil' could be matched with a supplier staking for 'anvil;http://anvil:8547'. Example: -$ pocketd --home=$(POCKETD_HOME) tx supplier stake-supplier 1000upokt anvil;http://anvil:8547 --keyring-backend test --from $(APP) --node $(POCKET_NODE)`, +$ poktrolld --home=$(POCKETD_HOME) tx supplier stake-supplier 1000upokt anvil;http://anvil:8547 --keyring-backend test --from $(APP) --node $(POCKET_NODE)`, Args: cobra.ExactArgs(2), RunE: func(cmd *cobra.Command, args []string) (err error) { stakeString := args[0] diff --git a/x/supplier/client/cli/tx_unstake_supplier.go b/x/supplier/client/cli/tx_unstake_supplier.go index 40ac4a83f..2daf7c00a 100644 --- a/x/supplier/client/cli/tx_unstake_supplier.go +++ b/x/supplier/client/cli/tx_unstake_supplier.go @@ -17,7 +17,7 @@ func CmdUnstakeSupplier() *cobra.Command { Long: `Unstake an supplier with the provided parameters. This is a broadcast operation that will unstake the supplier specified by the 'from' address. Example: -$ pocketd --home=$(POCKETD_HOME) tx supplier unstake-supplier --keyring-backend test --from $(SUPPLIER) --node $(POCKET_NODE)`, +$ poktrolld --home=$(POCKETD_HOME) tx supplier unstake-supplier --keyring-backend test --from $(SUPPLIER) --node $(POCKET_NODE)`, Args: cobra.ExactArgs(0), RunE: func(cmd *cobra.Command, args []string) (err error) { From 1974c8a864816eb8461f9982c0b0cd251151683d Mon Sep 17 00:00:00 2001 From: Bryan White Date: Tue, 7 Nov 2023 09:25:20 +0100 Subject: [PATCH 3/3] [Miner] feat: add `TxClient` (#94) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * chore: add `TxClient` interface * chore: add option support to `ReplayObservable` * feat: add `txClient` implementation * test: `txClient` * test: tx client integration * chore: s/tx/transaction/g * chore: update pkg README.md template * wip: client pkg README * docs: fix client pkg godoc comment * fix: flakey test * chore: dial back godoc comments 😅 * chore: revise (and move to godoc.go) `testblock` & `testeventsquery` pkg godoc comment * chore: update go.mod * chore: refactor & condense godoc comments * chore: fix import paths post-update * chore: review feedback improvements * docs: update client README.md * docs: add `tx query` usage association between `txContext` & `Blockchain` * docs: add TOC * chore: review feedback improvements Co-authored-by: Daniel Olshansky * docs: improve godoc comments & client README.md --------- Co-authored-by: Daniel Olshansky --- docs/pkg/client/README.md | 147 +++++ docs/template/pkg/README.md | 21 +- go.mod | 5 +- go.sum | 3 +- internal/testclient/testblock/client.go | 73 +++ internal/testclient/testblock/godoc.go | 4 + internal/testclient/testeventsquery/client.go | 61 +- internal/testclient/testeventsquery/godoc.go | 5 + internal/testclient/testtx/context.go | 113 +--- pkg/client/godoc.go | 12 + pkg/client/interface.go | 56 +- pkg/client/services.go | 19 + pkg/client/tx/client.go | 567 ++++++++++++++++++ pkg/client/tx/client_integration_test.go | 65 ++ pkg/client/tx/client_test.go | 413 +++++++++++++ pkg/client/tx/context.go | 13 +- pkg/client/tx/encoding.go | 18 + pkg/client/tx/errors.go | 53 ++ pkg/client/tx/options.go | 22 + pkg/observable/channel/replay.go | 3 +- 20 files changed, 1517 insertions(+), 156 deletions(-) create mode 100644 docs/pkg/client/README.md create mode 100644 internal/testclient/testblock/godoc.go create mode 100644 internal/testclient/testeventsquery/godoc.go create mode 100644 pkg/client/godoc.go create mode 100644 pkg/client/services.go create mode 100644 pkg/client/tx/client.go create mode 100644 pkg/client/tx/client_integration_test.go create mode 100644 pkg/client/tx/client_test.go create mode 100644 pkg/client/tx/encoding.go create mode 100644 pkg/client/tx/errors.go create mode 100644 pkg/client/tx/options.go diff --git a/docs/pkg/client/README.md b/docs/pkg/client/README.md new file mode 100644 index 000000000..6f4032800 --- /dev/null +++ b/docs/pkg/client/README.md @@ -0,0 +1,147 @@ +# Package `client` + +## Table of Contents + +- [Overview](#overview) +- [Features](#features) +- [Architecture Overview](#architecture-overview) + - [Component Diagram Legend](#component-diagram-legend) + - [Clients Dependency Tree](#clients-dependency-tree) + - [Network Interaction](#network-interaction) +- [Installation](#installation) +- [Usage](#usage) + - [Basic Example](#basic-example) + - [Advanced Usage](#advanced-usage) + - [Configuration](#configuration) +- [API Reference](#api-reference) +- [Best Practices](#best-practices) +- [FAQ](#faq) + + +## Overview + +The `client` package exposes go APIs to facilitate interactions with the Pocket network. +It includes lower-level interfaces for working with transactions and subscribing to events generally, as well as higher-level interfaces for tracking blocks and broadcasting protocol-specific transactions. + +## Features + +| Interface | Description | +|-------------------------|----------------------------------------------------------------------------------------------------| +| **`SupplierClient`** | A high-level client for use by the "supplier" actor. | +| **`TxClient`** | A high-level client used to build, sign, and broadcast transaction from cosmos-sdk messages. | +| **`TxContext`** | Abstracts and encapsulates the transaction building, signing, encoding, and broadcasting concerns. | +| **`BlockClient`** | Exposes methods for receiving notifications about newly committed blocks. | +| **`EventsQueryClient`** | Encapsulates blockchain event subscriptions. | +| **`Connection`** | A transport agnostic communication channel for sending and receiving messages. | +| **`Dialer`** | Abstracts the establishment of connections. | + +## Architecture Overview + +```mermaid +--- +title: Component Diagram Legend +--- +flowchart + +c[Component] +d[Dependency Component] +s[[Subcomponent]] +r[Remote Component] + +c --"direct usage via #DependencyMethod()"--> d +c -."usage via network I/O".-> r +c --> s +``` + +> **Figure 1**: A legend for the component diagrams in this document. + +```mermaid +--- +title: Clients Dependency Tree +--- +flowchart + +sup[SupplierClient] +tx[TxClient] +txctx[[TxContext]] +bl[BlockClient] +evt[EventsQueryClient] +conn[[Connection]] +dial[[Dialer]] + +sup --"#SignAndBroadcast()"--> tx + +tx --"#CommittedBlocksSequence()"--> bl +tx --"#BroadcastTx"--> txctx +tx --"#EventsBytes()"--> evt +bl --"#EventsBytes()"--> evt +evt --> conn +evt --"#DialContext()"--> dial +dial --"(returns)"--> conn +``` + +> **Figure 2**: An overview which articulates the dependency relationships between the various client interfaces and their subcompnents. + +```mermaid +--- +title: Network Interaction +--- +flowchart + +txctx[[TxContext]] +conn[[Connection]] +dial[[Dialer]] + +chain[Blockchain] + +conn <-."subscribed events".-> chain +dial -."RPC subscribe".-> chain +txctx -."tx broadcast".-> chain +txctx -."tx query".-> chain +``` + +> **Figure 3**: An overview of how client subcomponents interact with the network. + +## Installation + +```bash +go get github.com/pokt-network/poktroll/pkg/client +``` + +## Usage + +### Basic Example + +```go +// TODO: Code example showcasing the use of TxClient or any other primary interface. +``` + +### Advanced Usage + +```go +// TODO: Example illustrating advanced features or edge cases of the package. +``` + +### Configuration + +- **TxClientOption**: Function type that modifies the `TxClient` allowing for flexible and optional configurations. +- **EventsQueryClientOption**: Modifies the `EventsQueryClient` to apply custom behaviors or configurations. + +## API Reference + +For the complete API details, see the [godoc](https://pkg.go.dev/github.com/pokt-network/poktroll/pkg/client). + +## Best Practices + +- **Use Abstractions**: Instead of directly communicating with blockchain platforms, leverage the provided interfaces for consistent and error-free interactions. +- **Stay Updated**: With evolving blockchain technologies, ensure to keep the package updated for any new features or security patches. + +## FAQ + +#### How does the `TxClient` interface differ from `TxContext`? + +While `TxClient` is centered around signing and broadcasting transactions, `TxContext` consolidates operational dependencies for the transaction lifecycle, like building, encoding, and querying. + +#### Can I extend or customize the provided interfaces? + +Yes, the package is designed with modularity in mind. You can either implement the interfaces based on your requirements or extend them for additional functionalities. \ No newline at end of file diff --git a/docs/template/pkg/README.md b/docs/template/pkg/README.md index 44f41885a..10fdc2755 100644 --- a/docs/template/pkg/README.md +++ b/docs/template/pkg/README.md @@ -70,12 +70,7 @@ If the package can be configured in some way, describe it here: ## API Reference -While `godoc` will provide the detailed API reference, you can highlight or briefly describe key functions, types, or methods here. - -- `FunctionOrType1()`: A short description of its purpose. -- `FunctionOrType2(param Type)`: Another brief description. - -For the complete API details, see the [godoc](https://pkg.go.dev/github.com/yourusername/yourproject/[PackageName]). +For the complete API details, see the [godoc](https://pkg.go.dev/github.com/pokt-network/poktroll/[PackageName]). ## Best Practices @@ -90,16 +85,4 @@ Answer for question 1. #### Question 2? -Answer for question 2. - -## Contributing - -Briefly describe how others can contribute to this package. Link to the main contributing guide if you have one. - -## Changelog - -For detailed release notes, see the [CHANGELOG](../CHANGELOG.md) at the root level or link to a separate CHANGELOG specific to this package. - -## License - -This package is released under the XYZ License. For more information, see the [LICENSE](../LICENSE) file at the root level. \ No newline at end of file +Answer for question 2. \ No newline at end of file diff --git a/go.mod b/go.mod index 95c298124..2d699d1cf 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,6 @@ require ( cosmossdk.io/math v1.0.1 github.com/cometbft/cometbft v0.37.2 github.com/cometbft/cometbft-db v0.8.0 - github.com/cosmos/cosmos-proto v1.0.0-beta.2 github.com/cosmos/cosmos-sdk v0.47.3 github.com/cosmos/gogoproto v1.4.10 github.com/cosmos/ibc-go/v7 v7.1.0 @@ -27,7 +26,6 @@ require ( go.uber.org/multierr v1.11.0 golang.org/x/crypto v0.12.0 golang.org/x/sync v0.3.0 - google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 google.golang.org/grpc v1.56.1 gopkg.in/yaml.v2 v2.4.0 ) @@ -71,6 +69,7 @@ require ( github.com/containerd/cgroups v1.1.0 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cosmos/btcutil v1.0.5 // indirect + github.com/cosmos/cosmos-proto v1.0.0-beta.2 // indirect github.com/cosmos/go-bip39 v1.0.0 // indirect github.com/cosmos/gogogateway v1.2.0 // indirect github.com/cosmos/iavl v0.20.0 // indirect @@ -135,7 +134,6 @@ require ( github.com/hashicorp/go-immutable-radix v1.3.1 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-safetemp v1.0.0 // indirect - github.com/hashicorp/go-uuid v1.0.2 // indirect github.com/hashicorp/go-version v1.6.0 // indirect github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect github.com/hashicorp/golang-lru/v2 v2.0.2 // indirect @@ -266,6 +264,7 @@ require ( gonum.org/v1/gonum v0.11.0 // indirect google.golang.org/api v0.122.0 // indirect google.golang.org/appengine v1.6.7 // indirect + google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect google.golang.org/protobuf v1.31.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 48ddd50c7..ef5829bd5 100644 --- a/go.sum +++ b/go.sum @@ -933,9 +933,8 @@ github.com/hashicorp/go-safetemp v1.0.0/go.mod h1:oaerMy3BhqiTbVye6QuFhFtIceqFoD github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU= github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4= github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.1 h1:fv1ep09latC32wFoVwnqcnKJGnMSdBanPczbHAYm1BE= github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= -github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE= -github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/go-version v1.2.1/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek= diff --git a/internal/testclient/testblock/client.go b/internal/testclient/testblock/client.go index 0918ee64f..ebd2ebcd7 100644 --- a/internal/testclient/testblock/client.go +++ b/internal/testclient/testblock/client.go @@ -5,14 +5,19 @@ import ( "testing" "cosmossdk.io/depinject" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" + "github.com/pokt-network/poktroll/internal/mocks/mockclient" "github.com/pokt-network/poktroll/internal/testclient" "github.com/pokt-network/poktroll/internal/testclient/testeventsquery" "github.com/pokt-network/poktroll/pkg/client" "github.com/pokt-network/poktroll/pkg/client/block" + "github.com/pokt-network/poktroll/pkg/observable/channel" ) +// NewLocalnetClient creates and returns a new BlockClient that's configured for +// use with the localnet sequencer. func NewLocalnetClient(ctx context.Context, t *testing.T) client.BlockClient { t.Helper() @@ -25,3 +30,71 @@ func NewLocalnetClient(ctx context.Context, t *testing.T) client.BlockClient { return bClient } + +// NewOneTimeCommittedBlocksSequenceBlockClient creates a new mock BlockClient. +// This mock BlockClient will expect a call to CommittedBlocksSequence, and +// when that call is made, it returns a new BlocksObservable that is notified of +// blocks sent on the given blocksPublishCh. +// blocksPublishCh is the channel the caller can use to publish blocks the observable. +func NewOneTimeCommittedBlocksSequenceBlockClient( + t *testing.T, + blocksPublishCh chan client.Block, +) *mockclient.MockBlockClient { + t.Helper() + + // Create a mock for the block client which expects the LatestBlock method to be called any number of times. + blockClientMock := NewAnyTimeLatestBlockBlockClient(t, nil, 0) + + // Set up the mock expectation for the CommittedBlocksSequence method. When + // the method is called, it returns a new replay observable that publishes + // blocks sent on the given blocksPublishCh. + blockClientMock.EXPECT().CommittedBlocksSequence( + gomock.AssignableToTypeOf(context.Background()), + ).DoAndReturn(func(ctx context.Context) client.BlocksObservable { + // Create a new replay observable with a replay buffer size of 1. Blocks + // are published to this observable via the provided blocksPublishCh. + withPublisherOpt := channel.WithPublisher(blocksPublishCh) + obs, _ := channel.NewReplayObservable[client.Block]( + ctx, 1, withPublisherOpt, + ) + return obs + }) + + return blockClientMock +} + +// NewAnyTimeLatestBlockBlockClient creates a mock BlockClient that expects +// calls to the LatestBlock method any number of times. When the LatestBlock +// method is called, it returns a mock Block with the provided hash and height. +func NewAnyTimeLatestBlockBlockClient( + t *testing.T, + hash []byte, + height int64, +) *mockclient.MockBlockClient { + t.Helper() + ctrl := gomock.NewController(t) + + // Create a mock block that returns the provided hash and height. + blockMock := NewAnyTimesBlock(t, hash, height) + // Create a mock block client that expects calls to LatestBlock method and + // returns the mock block. + blockClientMock := mockclient.NewMockBlockClient(ctrl) + blockClientMock.EXPECT().LatestBlock(gomock.Any()).Return(blockMock).AnyTimes() + + return blockClientMock +} + +// NewAnyTimesBlock creates a mock Block that expects calls to Height and Hash +// methods any number of times. When the methods are called, they return the +// provided height and hash respectively. +func NewAnyTimesBlock(t *testing.T, hash []byte, height int64) *mockclient.MockBlock { + t.Helper() + ctrl := gomock.NewController(t) + + // Create a mock block that returns the provided hash and height AnyTimes. + blockMock := mockclient.NewMockBlock(ctrl) + blockMock.EXPECT().Height().Return(height).AnyTimes() + blockMock.EXPECT().Hash().Return(hash).AnyTimes() + + return blockMock +} diff --git a/internal/testclient/testblock/godoc.go b/internal/testclient/testblock/godoc.go new file mode 100644 index 000000000..866bb4f70 --- /dev/null +++ b/internal/testclient/testblock/godoc.go @@ -0,0 +1,4 @@ +// Package testblock provides helper functions for constructing real (e.g. localnet) +// and mock BlockClient objects with pre-configured and/or parameterized call +// arguments, return value(s), and/or expectations thereof. Intended for use in tests. +package testblock diff --git a/internal/testclient/testeventsquery/client.go b/internal/testclient/testeventsquery/client.go index 0aa618fe9..2c68606ce 100644 --- a/internal/testclient/testeventsquery/client.go +++ b/internal/testclient/testeventsquery/client.go @@ -2,10 +2,13 @@ package testeventsquery import ( "context" + "fmt" "testing" "time" + cosmoskeyring "github.com/cosmos/cosmos-sdk/crypto/keyring" "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" "github.com/pokt-network/poktroll/internal/mocks/mockclient" "github.com/pokt-network/poktroll/internal/testclient" @@ -15,14 +18,68 @@ import ( "github.com/pokt-network/poktroll/pkg/observable/channel" ) -// NewLocalnetClient returns a new events query client which is configured to -// connect to the localnet sequencer. +// NewLocalnetClient creates and returns a new events query client that's configured +// for use with the localnet sequencer. Any options provided are applied to the client. func NewLocalnetClient(t *testing.T, opts ...client.EventsQueryClientOption) client.EventsQueryClient { t.Helper() return eventsquery.NewEventsQueryClient(testclient.CometLocalWebsocketURL, opts...) } +// NewOneTimeEventsQuery creates a mock of the EventsQueryClient which expects +// a single call to the EventsBytes method. query is the query string which is +// expected to be received by that call. +// It returns a mock client whose event bytes method constructs a new observable. +// The caller can simulate blockchain events by sending on the value publishCh +// points to, which is set by this helper function. +func NewOneTimeEventsQuery( + ctx context.Context, + t *testing.T, + query string, + publishCh *chan<- either.Bytes, +) *mockclient.MockEventsQueryClient { + t.Helper() + ctrl := gomock.NewController(t) + + eventsQueryClient := mockclient.NewMockEventsQueryClient(ctrl) + eventsQueryClient.EXPECT().EventsBytes(gomock.Eq(ctx), gomock.Eq(query)). + DoAndReturn(func( + ctx context.Context, + query string, + ) (eventsBzObservable client.EventsBytesObservable, err error) { + eventsBzObservable, *publishCh = channel.NewObservable[either.Bytes]() + return eventsBzObservable, nil + }).Times(1) + return eventsQueryClient +} + +// NewOneTimeTxEventsQueryClient creates a mock of the Events that expects +// a single call to the EventsBytes method where the query is for transaction +// events for sender address matching that of the given key. +// The caller can simulate blockchain events by sending on the value publishCh +// points to, which is set by this helper function. +func NewOneTimeTxEventsQueryClient( + ctx context.Context, + t *testing.T, + key *cosmoskeyring.Record, + publishCh *chan<- either.Bytes, +) *mockclient.MockEventsQueryClient { + t.Helper() + + signingAddr, err := key.GetAddress() + require.NoError(t, err) + + expectedEventsQuery := fmt.Sprintf( + "tm.event='Tx' AND message.sender='%s'", + signingAddr, + ) + return NewOneTimeEventsQuery( + ctx, t, + expectedEventsQuery, + publishCh, + ) +} + // NewAnyTimesEventsBytesEventsQueryClient returns a new events query client which // is configured to return the expected event bytes when queried with the expected // query, any number of times. The returned client also expects to be closed once. diff --git a/internal/testclient/testeventsquery/godoc.go b/internal/testclient/testeventsquery/godoc.go new file mode 100644 index 000000000..0caa02997 --- /dev/null +++ b/internal/testclient/testeventsquery/godoc.go @@ -0,0 +1,5 @@ +// Package testeventsquery provides helper functions for constructing real +// (e.g. localnet) and mock EventsQueryClient objects with pre-configured and/or +// parameterized call arguments, return value(s), and/or expectations thereof. +// Intended for use in tests. +package testeventsquery diff --git a/internal/testclient/testtx/context.go b/internal/testclient/testtx/context.go index e7d1f8446..fa25494e7 100644 --- a/internal/testclient/testtx/context.go +++ b/internal/testclient/testtx/context.go @@ -30,25 +30,10 @@ import ( // correlations between these "times" values and the contexts in which the expected // methods may be called. -// NewOneTimeErrTxTimeoutTxContext creates a mock transaction context designed to simulate a specific -// timeout error scenario during transaction broadcasting. -// -// Parameters: -// - t: The testing.T instance for the current test. -// - keyring: The Cosmos SDK keyring containing the signer's cryptographic keys. -// - signingKeyName: The name of the key within the keyring to use for signing. -// - expectedTx: A pointer whose value will be set to the expected transaction -// bytes (in hexadecimal format). -// - expectedErrMsg: A pointer whose value will be set to the expected error -// message string. -// -// The function performs the following actions: -// 1. It retrieves the signer's cryptographic key from the provided keyring using the signingKeyName. -// 2. It computes the corresponding address of the signer's key. -// 3. It then formats an error message indicating that the fee payer's address does not exist. -// 4. It creates a base mock transaction context using NewBaseTxContext. -// 5. It sets up the mock behavior for the BroadcastTxSync method to return a specific preset response. -// 6. It also sets up the mock behavior for the QueryTx method to return a specific error response. +// NewOneTimeErrTxTimeoutTxContext creates a mock transaction context designed to +// simulate a specific timeout error scenario during transaction broadcasting. +// expectedErrMsg is populated with the same error message which is presented in +// the result from the QueryTx method so that it can be asserted against. func NewOneTimeErrTxTimeoutTxContext( t *testing.T, keyring cosmoskeyring.Keyring, @@ -116,24 +101,8 @@ func NewOneTimeErrTxTimeoutTxContext( // NewOneTimeErrCheckTxTxContext creates a mock transaction context to simulate // a specific error scenario during the ABCI check-tx phase (i.e., during initial // validation before the transaction is included in the block). -// -// Parameters: -// - t: The testing.T instance for the current test. -// - keyring: The Cosmos SDK keyring containing the signer's cryptographic keys. -// - signingKeyName: The name of the key within the keyring to be used for signing. -// - expectedTx: A pointer whose value will be set to the expected transaction -// bytes (in hexadecimal format). -// - expectedErrMsg: A pointer whose value will be set to the expected error -// message string. -// -// The function operates as follows: -// 1. Retrieves the signer's cryptographic key from the provided keyring based on -// the signingKeyName. -// 2. Determines the corresponding address of the signer's key. -// 3. Composes an error message suggesting that the fee payer's address is unrecognized. -// 4. Creates a base mock transaction context using the NewBaseTxContext function. -// 5. Sets up the mock behavior for the BroadcastTxSync method to return a specific -// error response related to the check phase of the transaction. +// expectedErrMsg is populated with the same error message which is presented in +// the result from the QueryTx method so that it can be asserted against. func NewOneTimeErrCheckTxTxContext( t *testing.T, keyring cosmoskeyring.Keyring, @@ -179,22 +148,7 @@ func NewOneTimeErrCheckTxTxContext( } // NewOneTimeTxTxContext creates a mock transaction context primed to respond with -// a single successful transaction response. This function facilitates testing by -// ensuring that the BroadcastTxSync method will return a specific, controlled response -// without actually broadcasting the transaction to the network. -// -// Parameters: -// - t: The testing.T instance used for the current test, typically passed from -// the calling test function. -// - keyring: The Cosmos SDK keyring containing the available cryptographic keys. -// - signingKeyName: The name of the key within the keyring used for transaction signing. -// - expectedTx: A pointer whose value will be set to the expected transaction -// bytes (in hexadecimal format). -// -// The function operates as follows: -// 1. Constructs a base mock transaction context using the NewBaseTxContext function. -// 2. Configures the mock behavior for the BroadcastTxSync method to return a pre-defined -// successful transaction response, ensuring that this behavior will only be triggered once. +// a single successful transaction response. func NewOneTimeTxTxContext( t *testing.T, keyring cosmoskeyring.Keyring, @@ -224,30 +178,11 @@ func NewOneTimeTxTxContext( return txCtxMock } -// NewBaseTxContext establishes a foundational mock transaction context with -// predefined behaviors suitable for a broad range of testing scenarios. It ensures -// that when interactions like transaction building, signing, and encoding occur -// in the test environment, they produce predictable and controlled outcomes. -// -// Parameters: -// - t: The testing.T instance used for the current test, typically passed from -// the calling test function. -// - signingKeyName: The name of the key within the keyring to be used for -// transaction signing. -// - keyring: The Cosmos SDK keyring containing the available cryptographic keys. -// - expectedTx: A pointer whose value will be set to the expected transaction -// bytes (in hexadecimal format). -// - expectedErrMsg: A pointer whose value will be set to the expected error -// message string. -// -// The function works as follows: -// 1. Invokes the NewAnyTimesTxTxContext to create a base mock transaction context. -// 2. Sets the expectation that NewTxBuilder method will be called exactly once. -// 3. Configures the mock behavior for the SignTx method to utilize the context's -// signing logic. -// 4. Overrides the EncodeTx method's behavior to intercept the encoding operation, -// capture the encoded transaction bytes, compute the transaction hash, and populate -// the expectedTx and expectedTxHash parameters accordingly. +// NewBaseTxContext creates a mock transaction context that's configured to expect +// calls to NewTxBuilder, SignTx, and EncodeTx methods, any number of times. +// EncodeTx is used to intercept the encoded transaction bytes and store them in +// the expectedTx output parameter. Each of these methods proxies to the corresponding +// method on a real transaction context. func NewBaseTxContext( t *testing.T, signingKeyName string, @@ -281,30 +216,6 @@ func NewBaseTxContext( // NewAnyTimesTxTxContext initializes a mock transaction context that's configured to allow // arbitrary calls to certain predefined interactions, primarily concerning the retrieval // of account numbers and sequences. -// -// Parameters: -// - t: The testing.T instance used for the current test, typically passed from the calling test function. -// - keyring: The Cosmos SDK keyring containing the available cryptographic keys. -// -// The function operates in the following manner: -// 1. Establishes a new gomock controller for setting up mock expectations and behaviors. -// 2. Prepares a set of flags suitable for localnet testing environments. -// 3. Sets up a mock behavior to intercept the GetAccountNumberSequence method calls, -// ensuring that whenever this method is invoked, it consistently returns an account number -// and sequence of 1, without making real queries to the underlying infrastructure. -// 4. Constructs a client context tailored for localnet testing with the provided keyring -// and the mocked account retriever. -// 5. Initializes a transaction factory from the client context and validates its integrity. -// 6. Injects the transaction factory and client context dependencies to create a new transaction context. -// 7. Creates a mock transaction context that always returns the provided keyring when the GetKeyring method is called. -// -// This setup aids tests by facilitating the creation of mock transaction contexts that have predictable -// and controlled outcomes for account number and sequence retrieval operations. -// -// Returns: -// - A mock transaction context suitable for setting additional expectations in tests. -// - A real transaction context initialized with the supplied dependencies. - func NewAnyTimesTxTxContext( t *testing.T, keyring cosmoskeyring.Keyring, diff --git a/pkg/client/godoc.go b/pkg/client/godoc.go new file mode 100644 index 000000000..66da550dd --- /dev/null +++ b/pkg/client/godoc.go @@ -0,0 +1,12 @@ +// Package client defines interfaces and types that facilitate interactions +// with blockchain functionalities, both transactional and observational. It is +// built to provide an abstraction layer for sending, receiving, and querying +// blockchain data, thereby offering a standardized way of integrating with +// various blockchain platforms. +// +// The client package leverages external libraries like cosmos-sdk and cometbft, +// but there is a preference to minimize direct dependencies on these external +// libraries, when defining interfaces, aiming for a cleaner decoupling. +// It seeks to provide a flexible and comprehensive interface layer, adaptable to +// different blockchain configurations and requirements. +package client diff --git a/pkg/client/interface.go b/pkg/client/interface.go index 2d67c90c0..32dab250c 100644 --- a/pkg/client/interface.go +++ b/pkg/client/interface.go @@ -1,4 +1,5 @@ //go:generate mockgen -destination=../../internal/mocks/mockclient/events_query_client_mock.go -package=mockclient . Dialer,Connection,EventsQueryClient +//go:generate mockgen -destination=../../internal/mocks/mockclient/block_client_mock.go -package=mockclient . Block,BlockClient //go:generate mockgen -destination=../../internal/mocks/mockclient/tx_client_mock.go -package=mockclient . TxContext //go:generate mockgen -destination=../../internal/mocks/mockclient/cosmos_tx_builder_mock.go -package=mockclient github.com/cosmos/cosmos-sdk/client TxBuilder //go:generate mockgen -destination=../../internal/mocks/mockclient/cosmos_keyring_mock.go -package=mockclient github.com/cosmos/cosmos-sdk/crypto/keyring Keyring @@ -18,9 +19,18 @@ import ( "github.com/pokt-network/poktroll/pkg/observable" ) +// TxClient provides a synchronous interface initiating and waiting for transactions +// derived from cosmos-sdk messages, in a cosmos-sdk based blockchain network. +type TxClient interface { + SignAndBroadcast( + ctx context.Context, + msgs ...cosmostypes.Msg, + ) either.AsyncError +} + // TxContext provides an interface which consolidates the operational dependencies -// required to facilitate the sender side of the tx lifecycle: build, sign, encode, -// broadcast, query (optional). +// required to facilitate the sender side of the transaction lifecycle: build, sign, +// encode, broadcast, and query (optional). // // TODO_IMPROVE: Avoid depending on cosmos-sdk structs or interfaces; add Pocket // interface types to substitute: @@ -29,13 +39,13 @@ import ( // - Keyring // - TxBuilder type TxContext interface { - // GetKeyring returns the associated key management mechanism for the tx context. + // GetKeyring returns the associated key management mechanism for the transaction context. GetKeyring() cosmoskeyring.Keyring - // NewTxBuilder creates and returns a new tx builder instance. + // NewTxBuilder creates and returns a new transaction builder instance. NewTxBuilder() cosmosclient.TxBuilder - // SignTx signs a tx using the specified key name. It can operate in offline mode, + // SignTx signs a transaction using the specified key name. It can operate in offline mode, // and can overwrite any existing signatures based on the provided flags. SignTx( keyName string, @@ -43,14 +53,14 @@ type TxContext interface { offline, overwriteSig bool, ) error - // EncodeTx takes a tx builder and encodes it, returning its byte representation. + // EncodeTx takes a transaction builder and encodes it, returning its byte representation. EncodeTx(txBuilder cosmosclient.TxBuilder) ([]byte, error) - // BroadcastTx broadcasts the given tx to the network. + // BroadcastTx broadcasts the given transaction to the network. BroadcastTx(txBytes []byte) (*cosmostypes.TxResponse, error) - // QueryTx retrieves a tx status based on its hash and optionally provides - // proof of the tx. + // QueryTx retrieves a transaction status based on its hash and optionally provides + // proof of the transaction. QueryTx( ctx context.Context, txHash []byte, @@ -60,6 +70,7 @@ type TxContext interface { // BlocksObservable is an observable which is notified with an either // value which contains either an error or the event message bytes. +// // TODO_HACK: The purpose of this type is to work around gomock's lack of // support for generic types. For the same reason, this type cannot be an // alias (i.e. EventsBytesObservable = observable.Observable[either.Either[[]byte]]). @@ -84,23 +95,24 @@ type Block interface { Hash() []byte } -// TODO_CONSIDERATION: the cosmos-sdk CLI code seems to use a cometbft RPC client -// which includes a `#Subscribe()` method for a similar purpose. Perhaps we could -// replace this custom websocket client with that. -// (see: https://github.com/cometbft/cometbft/blob/main/rpc/client/http/http.go#L110) -// (see: https://github.com/cosmos/cosmos-sdk/blob/main/client/rpc/tx.go#L114) -// -// NOTE: a branch which attempts this is available at: -// https://github.com/pokt-network/poktroll/pull/74 - // EventsBytesObservable is an observable which is notified with an either // value which contains either an error or the event message bytes. +// // TODO_HACK: The purpose of this type is to work around gomock's lack of // support for generic types. For the same reason, this type cannot be an // alias (i.e. EventsBytesObservable = observable.Observable[either.Bytes]). type EventsBytesObservable observable.Observable[either.Bytes] // EventsQueryClient is used to subscribe to chain event messages matching the given query, +// +// TODO_CONSIDERATION: the cosmos-sdk CLI code seems to use a cometbft RPC client +// which includes a `#Subscribe()` method for a similar purpose. Perhaps we could +// replace our custom implementation with one which wraps that. +// (see: https://github.com/cometbft/cometbft/blob/main/rpc/client/http/http.go#L110) +// (see: https://github.com/cosmos/cosmos-sdk/blob/main/client/rpc/tx.go#L114) +// +// NOTE: a branch which attempts this is available at: +// https://github.com/pokt-network/poktroll/pull/74 type EventsQueryClient interface { // EventsBytes returns an observable which is notified about chain event messages // matching the given query. It receives an either value which contains either an @@ -131,8 +143,8 @@ type Dialer interface { DialContext(ctx context.Context, urlStr string) (Connection, error) } -// EventsQueryClientOption is an interface-wide type which can be implemented to use or modify the -// query client during construction. This would likely be done in an -// implementation-specific way; e.g. using a type assertion to assign to an -// implementation struct field(s). +// EventsQueryClientOption defines a function type that modifies the EventsQueryClient. type EventsQueryClientOption func(EventsQueryClient) + +// TxClientOption defines a function type that modifies the TxClient. +type TxClientOption func(TxClient) diff --git a/pkg/client/services.go b/pkg/client/services.go new file mode 100644 index 000000000..0d2ca060d --- /dev/null +++ b/pkg/client/services.go @@ -0,0 +1,19 @@ +package client + +import ( + "fmt" + + sharedtypes "github.com/pokt-network/poktroll/x/shared/types" +) + +// NewTestApplicationServiceConfig returns a slice of application service configs for testing. +func NewTestApplicationServiceConfig(prefix string, count int) []*sharedtypes.ApplicationServiceConfig { + appSvcCfg := make([]*sharedtypes.ApplicationServiceConfig, count) + for i, _ := range appSvcCfg { + serviceId := fmt.Sprintf("%s%d", prefix, i) + appSvcCfg[i] = &sharedtypes.ApplicationServiceConfig{ + ServiceId: &sharedtypes.ServiceId{Id: serviceId}, + } + } + return appSvcCfg +} diff --git a/pkg/client/tx/client.go b/pkg/client/tx/client.go new file mode 100644 index 000000000..805443eb4 --- /dev/null +++ b/pkg/client/tx/client.go @@ -0,0 +1,567 @@ +package tx + +import ( + "bytes" + "context" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "sync" + + "cosmossdk.io/depinject" + abciTypes "github.com/cometbft/cometbft/abci/types" + comettypes "github.com/cometbft/cometbft/types" + cosmostypes "github.com/cosmos/cosmos-sdk/types" + "go.uber.org/multierr" + + "github.com/pokt-network/poktroll/pkg/client" + "github.com/pokt-network/poktroll/pkg/either" + "github.com/pokt-network/poktroll/pkg/observable" + "github.com/pokt-network/poktroll/pkg/observable/channel" +) + +const ( + // DefaultCommitTimeoutHeightOffset is the default number of blocks after the + // latest block (when broadcasting) that a transactions should be considered + // errored if it has not been committed. + DefaultCommitTimeoutHeightOffset = 5 + // txWithSenderAddrQueryFmt is the query used to subscribe to cometbft transactions + // events where the sender address matches the interpolated address. + // (see: https://docs.cosmos.network/v0.47/core/events#subscribing-to-events) + txWithSenderAddrQueryFmt = "tm.event='Tx' AND message.sender='%s'" +) + +var _ client.TxClient = (*txClient)(nil) + +// txClient orchestrates building, signing, broadcasting, and querying of +// transactions. It maintains a single events query subscription to its own +// transactions (via the EventsQueryClient) in order to receive notifications +// regarding their status. +// It also depends on the BlockClient as a timer, synchronized to block height, +// to facilitate transaction timeout logic. If a transaction doesn't appear to +// have been committed by commitTimeoutHeightOffset number of blocks have elapsed, +// it is considered as timed out. Upon timeout, the client queries the network for +// the last status of the transaction, which is used to derive the asynchronous +// error that's populated in the either.AsyncError. +type txClient struct { + // TODO_TECHDEBT: this should be configurable & integrated w/ viper, flags, etc. + // commitTimeoutHeightOffset is the number of blocks after the latest block + // that a transactions should be considered errored if it has not been committed. + commitTimeoutHeightOffset int64 + // signingKeyName is the name of the key in the keyring to use for signing + // transactions. + signingKeyName string + // signingAddr is the address of the signing key referenced by signingKeyName. + // It is hydrated from the keyring by calling Keyring#Key() with signingKeyName. + signingAddr cosmostypes.AccAddress + // txCtx is the transactions context which encapsulates transactions building, signing, + // broadcasting, and querying, as well as keyring access. + txCtx client.TxContext + // eventsQueryClient is the client used to subscribe to transactions events from this + // sender. It is used to receive notifications about transactions events corresponding + // to transactions which it has constructed, signed, and broadcast. + eventsQueryClient client.EventsQueryClient + // blockClient is the client used to query for the latest block height. + // It is used to implement timout logic for transactions which weren't committed. + blockClient client.BlockClient + + // txsMutex protects txErrorChans and txTimeoutPool maps. + txsMutex sync.Mutex + // txErrorChans maps tx_hash->channel which will receive an error or nil, + // and close, when the transactions with the given hash is committed. + txErrorChans txErrorChansByHash + // txTimeoutPool maps timeout_block_height->map_of_txsByHash. It + // is used to ensure that transactions error channels receive and close in the event + // that they have not already by the given timeout height. + txTimeoutPool txTimeoutPool +} + +type ( + txTimeoutPool map[height]txErrorChansByHash + txErrorChansByHash map[txHash]chan error + height = int64 + txHash = string +) + +// TxEvent is used to deserialize incoming websocket messages from +// the transactions subscription. +type TxEvent struct { + // Tx is the binary representation of the tx hash. + Tx []byte `json:"tx"` + Events []abciTypes.Event `json:"events"` +} + +// NewTxClient attempts to construct a new TxClient using the given dependencies +// and options. +// +// It performs the following steps: +// 1. Initializes a default txClient with the default commit timeout height +// offset, an empty error channel map, and an empty transaction timeout pool. +// 2. Injects the necessary dependencies using depinject. +// 3. Applies any provided options to customize the client. +// 4. Validates and sets any missing default configurations using the +// validateConfigAndSetDefaults method. +// 5. Subscribes the client to its own transactions. This step might be +// reconsidered for relocation to a potential Start() method in the future. +func NewTxClient( + ctx context.Context, + deps depinject.Config, + opts ...client.TxClientOption, +) (client.TxClient, error) { + tClient := &txClient{ + commitTimeoutHeightOffset: DefaultCommitTimeoutHeightOffset, + txErrorChans: make(txErrorChansByHash), + txTimeoutPool: make(txTimeoutPool), + } + + if err := depinject.Inject( + deps, + &tClient.txCtx, + &tClient.eventsQueryClient, + &tClient.blockClient, + ); err != nil { + return nil, err + } + + for _, opt := range opts { + opt(tClient) + } + + if err := tClient.validateConfigAndSetDefaults(); err != nil { + return nil, err + } + + // Start an events query subscription for transactions originating from this + // client's signing address. + // TODO_CONSIDERATION: move this into a #Start() method + if err := tClient.subscribeToOwnTxs(ctx); err != nil { + return nil, err + } + + // Launch a separate goroutine to handle transaction timeouts. + // TODO_CONSIDERATION: move this into a #Start() method + go tClient.goTimeoutPendingTransactions(ctx) + + return tClient, nil +} + +// SignAndBroadcast signs a set of Cosmos SDK messages, constructs a transaction, +// and broadcasts it to the network. The function performs several steps to +// ensure the messages and the resultant transaction are valid: +// +// 1. Validates each message in the provided set. +// 2. Constructs the transaction using the Cosmos SDK's transaction builder. +// 3. Calculates and sets the transaction's timeout height. +// 4. Sets a default gas limit (note: this will be made configurable in the future). +// 5. Signs the transaction. +// 6. Validates the constructed transaction. +// 7. Serializes and broadcasts the transaction. +// 8. Checks the broadcast response for errors. +// 9. If all the above steps are successful, the function registers the +// transaction as pending. +// +// If any step encounters an error, it returns an either.AsyncError populated with +// the synchronous error. If the function completes successfully, it returns an +// either.AsyncError populated with the error channel which will receive if the +// transaction results in an asynchronous error or times out. +func (tClient *txClient) SignAndBroadcast( + ctx context.Context, + msgs ...cosmostypes.Msg, +) either.AsyncError { + var validationErrs error + for i, msg := range msgs { + if err := msg.ValidateBasic(); err != nil { + validationErr := ErrInvalidMsg.Wrapf("in msg with index %d: %s", i, err) + validationErrs = multierr.Append(validationErrs, validationErr) + } + } + if validationErrs != nil { + return either.SyncErr(validationErrs) + } + + // Construct the transactions using cosmos' transactions builder. + txBuilder := tClient.txCtx.NewTxBuilder() + if err := txBuilder.SetMsgs(msgs...); err != nil { + // return synchronous error + return either.SyncErr(err) + } + + // Calculate timeout height + timeoutHeight := tClient.blockClient.LatestBlock(ctx). + Height() + tClient.commitTimeoutHeightOffset + + // TODO_TECHDEBT: this should be configurable + txBuilder.SetGasLimit(200000) + txBuilder.SetTimeoutHeight(uint64(timeoutHeight)) + + // sign transactions + err := tClient.txCtx.SignTx( + tClient.signingKeyName, + txBuilder, + false, false, + ) + if err != nil { + return either.SyncErr(err) + } + + // ensure transactions is valid + // NOTE: this makes the transactions valid; i.e. it is *REQUIRED* + if err := txBuilder.GetTx().ValidateBasic(); err != nil { + return either.SyncErr(err) + } + + // serialize transactions + txBz, err := tClient.txCtx.EncodeTx(txBuilder) + if err != nil { + return either.SyncErr(err) + } + + txResponse, err := tClient.txCtx.BroadcastTx(txBz) + if err != nil { + return either.SyncErr(err) + } + + if txResponse.Code != 0 { + return either.SyncErr(ErrCheckTx.Wrapf(txResponse.RawLog)) + } + + return tClient.addPendingTransactions(normalizeTxHashHex(txResponse.TxHash), timeoutHeight) +} + +// validateConfigAndSetDefaults ensures that the necessary configurations for the +// txClient are set, and populates any missing defaults. +// +// 1. It checks if the signing key name is set and returns an error if it's empty. +// 2. It then retrieves the key record from the keyring using the signing key name +// and checks its existence. +// 3. The address of the signing key is computed and assigned to txClient#signgingAddr. +// 4. Lastly, it ensures that commitTimeoutHeightOffset has a valid value, setting +// it to DefaultCommitTimeoutHeightOffset if it's zero or negative. +// +// Returns: +// - ErrEmptySigningKeyName if the signing key name is not provided. +// - ErrNoSuchSigningKey if the signing key is not found in the keyring. +// - ErrSigningKeyAddr if there's an issue retrieving the address for the signing key. +// - nil if validation is successful and defaults are set appropriately. +func (tClient *txClient) validateConfigAndSetDefaults() error { + if tClient.signingKeyName == "" { + return ErrEmptySigningKeyName + } + + keyRecord, err := tClient.txCtx.GetKeyring().Key(tClient.signingKeyName) + if err != nil { + return ErrNoSuchSigningKey.Wrapf("name %q: %s", tClient.signingKeyName, err) + } + signingAddr, err := keyRecord.GetAddress() + if err != nil { + return ErrSigningKeyAddr.Wrapf("name %q: %s", tClient.signingKeyName, err) + } + tClient.signingAddr = signingAddr + + if tClient.commitTimeoutHeightOffset <= 0 { + tClient.commitTimeoutHeightOffset = DefaultCommitTimeoutHeightOffset + } + return nil +} + +// addPendingTransactions registers a new pending transaction for monitoring and +// notification of asynchronous errors. It accomplishes the following: +// +// 1. Creates an error notification channel (if one doesn't already exist) and associates +// it with the provided transaction hash in the txErrorChans map. +// +// 2. Ensures that there's an initialized map of transactions by hash for the +// given timeout height in the txTimeoutPool. The same error notification channel +// is also associated with the transaction hash in this map. +// +// Both txErrorChans and txTimeoutPool store references to the same error notification +// channel for a given transaction hash. This ensures idempotency of error handling +// for any given transaction between asynchronous, transaction-specific errors and +// transaction timeout logic. +// +// Note: The error channels are buffered to prevent blocking on send operations and +// are intended to convey a single error event. +// +// Returns: +// - An either.AsyncError populated with the error notification channel for the +// provided transaction hash. +func (tClient *txClient) addPendingTransactions( + txHash string, + timeoutHeight int64, +) either.AsyncError { + tClient.txsMutex.Lock() + defer tClient.txsMutex.Unlock() + + // Initialize txTimeoutPool map if necessary. + txsByHash, ok := tClient.txTimeoutPool[timeoutHeight] + if !ok { + txsByHash = make(map[string]chan error) + tClient.txTimeoutPool[timeoutHeight] = txsByHash + } + + // Initialize txErrorChans map in txTimeoutPool map if necessary. + errCh, ok := txsByHash[txHash] + if !ok { + // NB: intentionally buffered to avoid blocking on send. Only intended + // to send/receive a single error. + errCh = make(chan error, 1) + txsByHash[txHash] = errCh + } + + // Initialize txErrorChans map if necessary. + if _, ok := tClient.txErrorChans[txHash]; !ok { + // NB: both maps hold a reference to the same channel so that we can check + // if the channel has already been closed when timing out. + tClient.txErrorChans[txHash] = errCh + } + + return either.AsyncErr(errCh) +} + +// subscribeToOwnTxs establishes an event query subscription to monitor transactions +// originating from this client's signing address. +// +// It performs the following steps: +// +// 1. Forms a query to fetch transaction events specific to the client's signing address. +// 2. Maps raw event bytes observable notifications to a new transaction event objects observable. +// 3. Handle each transaction event. +// +// Important considerations: +// There's uncertainty surrounding the potential for asynchronous errors post transaction broadcast. +// Current implementation and observations suggest that errors might be returned synchronously, +// even when using Cosmos' BroadcastTxAsync method. Further investigation is required. +// +// This function also spawns a goroutine to handle transaction timeouts via goTimeoutPendingTransactions. +// +// Parameters: +// - ctx: Context for managing the function's lifecycle and child operations. +// +// Returns: +// - An error if there's a failure during the event query or subscription process. +func (tClient *txClient) subscribeToOwnTxs(ctx context.Context) error { + // Form a query based on the client's signing address. + query := fmt.Sprintf(txWithSenderAddrQueryFmt, tClient.signingAddr) + + // Fetch transaction events matching the query. + eventsBz, err := tClient.eventsQueryClient.EventsBytes(ctx, query) + if err != nil { + return err + } + + // Convert raw event data into a stream of transaction events. + txEventsObservable := channel.Map[ + either.Bytes, either.Either[*TxEvent], + ](ctx, eventsBz, tClient.txEventFromEventBz) + txEventsObserver := txEventsObservable.Subscribe(ctx) + + // Handle transaction events asynchronously. + go tClient.goHandleTxEvents(txEventsObserver) + + return nil +} + +// goHandleTxEvents ranges over the transaction events observable, performing +// the following steps on each: +// +// 1. Normalize hexadeimal transaction hash. +// 2. Retrieves the transaction's error channel from txErrorChans. +// 3. Closes and removes it from txErrorChans. +// 4. Removes the transaction error channel from txTimeoutPool. +// +// It is intended to be called in a goroutine. +func (tClient *txClient) goHandleTxEvents( + txEventsObserver observable.Observer[either.Either[*TxEvent]], +) { + for eitherTxEvent := range txEventsObserver.Ch() { + txEvent, err := eitherTxEvent.ValueOrError() + if err != nil { + return + } + + // Convert transaction hash into its normalized hex form. + txHashHex := txHashBytesToNormalizedHex(comettypes.Tx(txEvent.Tx).Hash()) + + tClient.txsMutex.Lock() + + // Check for a corresponding error channel in the map. + txErrCh, ok := tClient.txErrorChans[txHashHex] + if !ok { + panic("Received tx event without an associated error channel.") + } + + // TODO_INVESTIGATE: it seems like it may not be possible for the + // txEvent to represent an error. Cosmos' #BroadcastTxSync() is being + // called internally, which will return an error if the transaction + // is not accepted by the mempool. + // + // It's unclear if a cosmos chain is capable of returning an async + // error for a transaction at this point; even when substituting + // #BroadcastTxAsync(), the error is returned synchronously: + // + // > error in json rpc client, with http response metadata: (Status: + // > 200 OK, Protocol HTTP/1.1). RPC error -32000 - tx added to local + // > mempool but failed to gossip: validation failed + // + // Potential parse and send transaction error on txErrCh here. + + // Close and remove from txErrChans + close(txErrCh) + delete(tClient.txErrorChans, txHashHex) + + // Remove from the txTimeoutPool. + for timeoutHeight, txErrorChans := range tClient.txTimeoutPool { + // Handled transaction isn't in this timeout height. + if _, ok := txErrorChans[txHashHex]; !ok { + continue + } + + delete(txErrorChans, txHashHex) + if len(txErrorChans) == 0 { + delete(tClient.txTimeoutPool, timeoutHeight) + } + } + + tClient.txsMutex.Unlock() + } +} + +// goTimeoutPendingTransactions monitors blocks and handles transaction timeouts. +// For each block observed, it checks if there are transactions associated with that +// block's height in the txTimeoutPool. If transactions are found, the function +// evaluates whether they have already been processed by the transaction events +// query subscription logic. If not, a timeout error is generated and sent on the +// transaction's error channel. Finally, the error channel is closed and removed +// from the txTimeoutPool. +func (tClient *txClient) goTimeoutPendingTransactions(ctx context.Context) { + // Subscribe to a sequence of committed blocks. + blockCh := tClient.blockClient.CommittedBlocksSequence(ctx).Subscribe(ctx).Ch() + + // Iterate over each incoming block. + for block := range blockCh { + select { + case <-ctx.Done(): + // Exit if the context signals done. + return + default: + } + + tClient.txsMutex.Lock() + + // Retrieve transactions associated with the current block's height. + txsByHash, ok := tClient.txTimeoutPool[block.Height()] + if !ok { + // If no transactions are found for the current block height, continue. + tClient.txsMutex.Unlock() + continue + } + + // Process each transaction for the current block height. + for txHash, txErrCh := range txsByHash { + select { + // Check if the transaction was processed by its subscription. + case err, ok := <-txErrCh: + if ok { + // Unexpected state: error channel should be closed after processing. + panic(fmt.Errorf("Expected txErrCh to be closed; received err: %w", err)) + } + // Remove the processed transaction. + delete(txsByHash, txHash) + tClient.txsMutex.Unlock() + continue + default: + } + + // Transaction was not processed by its subscription: handle timeout. + txErrCh <- tClient.getTxTimeoutError(ctx, txHash) // Send a timeout error. + close(txErrCh) // Close the error channel. + delete(txsByHash, txHash) // Remove the transaction. + } + + // Clean up the txTimeoutPool for the current block height. + delete(tClient.txTimeoutPool, block.Height()) + tClient.txsMutex.Unlock() + } +} + +// txEventFromEventBz deserializes a binary representation of a transaction event +// into a TxEvent structure. +// +// Parameters: +// - eitherEventBz: Binary data of the event, potentially encapsulating an error. +// +// Returns: +// - eitherTxEvent: The TxEvent or an encapsulated error, facilitating clear +// error management in the caller's context. +// - skip: A flag denoting if the event should be bypassed. A value of true +// suggests the event be disregarded, progressing to the succeeding message. +func (tClient *txClient) txEventFromEventBz( + eitherEventBz either.Bytes, +) (eitherTxEvent either.Either[*TxEvent], skip bool) { + + // Extract byte data from the given event. In case of failure, wrap the error + // and denote the event for skipping. + eventBz, err := eitherEventBz.ValueOrError() + if err != nil { + return either.Error[*TxEvent](err), true + } + + // Unmarshal byte data into a TxEvent object. + txEvt, err := tClient.unmarshalTxEvent(eventBz) + switch { + // If the error indicates a non-transactional event, return the TxEvent and + // signal for skipping. + case errors.Is(err, ErrNonTxEventBytes): + return either.Success(txEvt), true + // For other errors, wrap them and flag the event to be skipped. + case err != nil: + return either.Error[*TxEvent](ErrUnmarshalTx.Wrapf("%s", err)), true + } + + // For successful unmarshalling, return the TxEvent. + return either.Success(txEvt), false +} + +// unmarshalTxEvent attempts to deserialize a slice of bytes into a TxEvent. +// It checks if the given bytes correspond to a valid transaction event. +// If the resulting TxEvent has empty transaction bytes, it assumes that +// the message was not a transaction event and returns an ErrNonTxEventBytes error. +func (tClient *txClient) unmarshalTxEvent(eventBz []byte) (*TxEvent, error) { + txEvent := new(TxEvent) + + // Try to deserialize the provided bytes into a TxEvent. + if err := json.Unmarshal(eventBz, txEvent); err != nil { + return nil, err + } + + // Check if the TxEvent has empty transaction bytes, which indicates + // the message might not be a valid transaction event. + if bytes.Equal(txEvent.Tx, []byte{}) { + return nil, ErrNonTxEventBytes.Wrapf("%s", string(eventBz)) + } + + return txEvent, nil +} + +// getTxTimeoutError checks if a transaction with the specified hash has timed out. +// The function decodes the provided hexadecimal hash into bytes and queries the +// transaction using the byte hash. If any error occurs during this process, +// appropriate wrapped errors are returned for easier debugging. +func (tClient *txClient) getTxTimeoutError(ctx context.Context, txHashHex string) error { + + // Decode the provided hex hash into bytes. + txHash, err := hex.DecodeString(txHashHex) + if err != nil { + return ErrInvalidTxHash.Wrapf("%s", txHashHex) + } + + // Query the transaction using the decoded byte hash. + txResponse, err := tClient.txCtx.QueryTx(ctx, txHash, false) + if err != nil { + return ErrQueryTx.Wrapf("with hash: %s: %s", txHashHex, err) + } + + // Return a timeout error with details about the transaction. + return ErrTxTimeout.Wrapf("with hash %s: %s", txHashHex, txResponse.TxResult.Log) +} diff --git a/pkg/client/tx/client_integration_test.go b/pkg/client/tx/client_integration_test.go new file mode 100644 index 000000000..c2d5db6e5 --- /dev/null +++ b/pkg/client/tx/client_integration_test.go @@ -0,0 +1,65 @@ +//go:build integration + +package tx_test + +import ( + "context" + "testing" + + "cosmossdk.io/depinject" + "github.com/cosmos/cosmos-sdk/types" + "github.com/stretchr/testify/require" + + "github.com/pokt-network/poktroll/pkg/client/tx" + + "github.com/pokt-network/poktroll/internal/testclient/testblock" + "github.com/pokt-network/poktroll/internal/testclient/testeventsquery" + "github.com/pokt-network/poktroll/internal/testclient/testtx" + "github.com/pokt-network/poktroll/pkg/client" + apptypes "github.com/pokt-network/poktroll/x/application/types" +) + +func TestTxClient_SignAndBroadcast_Integration(t *testing.T) { + t.Skip("TODO_TECHDEBT: this test depends on some setup which is currently not implemented in this test: staked application and servicer with matching services") + + var ctx = context.Background() + + keyring, signingKey := newTestKeyringWithKey(t) + + eventsQueryClient := testeventsquery.NewLocalnetClient(t) + + _, txCtx := testtx.NewAnyTimesTxTxContext(t, keyring) + + // Construct a new mock block client because it is a required dependency. Since + // we're not exercising transactions timeouts in this test, we don't need to set any + // particular expectations on it, nor do we care about the value of blockHash + // argument. + blockClientMock := testblock.NewLocalnetClient(ctx, t) + + // Construct a new depinject config with the mocks we created above. + txClientDeps := depinject.Supply( + eventsQueryClient, + txCtx, + blockClientMock, + ) + + // Construct the transaction client. + txClient, err := tx.NewTxClient(ctx, txClientDeps, tx.WithSigningKeyName(testSigningKeyName)) + require.NoError(t, err) + + signingKeyAddr, err := signingKey.GetAddress() + require.NoError(t, err) + + // Construct a valid (arbitrary) message to sign, encode, and broadcast. + appStake := types.NewCoin("upokt", types.NewInt(1000000)) + appStakeMsg := &apptypes.MsgStakeApplication{ + Address: signingKeyAddr.String(), + Stake: &appStake, + Services: client.NewTestApplicationServiceConfig(testServiceIdPrefix, 2), + } + + // Sign and broadcast the message. + eitherErr := txClient.SignAndBroadcast(ctx, appStakeMsg) + err, _ = eitherErr.SyncOrAsyncError() + require.NoError(t, err) +} diff --git a/pkg/client/tx/client_test.go b/pkg/client/tx/client_test.go new file mode 100644 index 000000000..6cb5d33da --- /dev/null +++ b/pkg/client/tx/client_test.go @@ -0,0 +1,413 @@ +package tx_test + +import ( + "context" + "encoding/json" + "testing" + "time" + + "cosmossdk.io/depinject" + cometbytes "github.com/cometbft/cometbft/libs/bytes" + cosmoskeyring "github.com/cosmos/cosmos-sdk/crypto/keyring" + "github.com/cosmos/cosmos-sdk/types" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + + "github.com/pokt-network/poktroll/internal/mocks/mockclient" + "github.com/pokt-network/poktroll/internal/testclient" + "github.com/pokt-network/poktroll/internal/testclient/testblock" + "github.com/pokt-network/poktroll/internal/testclient/testeventsquery" + "github.com/pokt-network/poktroll/internal/testclient/testtx" + "github.com/pokt-network/poktroll/pkg/client" + "github.com/pokt-network/poktroll/pkg/client/tx" + "github.com/pokt-network/poktroll/pkg/either" + apptypes "github.com/pokt-network/poktroll/x/application/types" +) + +const ( + testSigningKeyName = "test_signer" + // NB: testServiceIdPrefix must not be longer than 7 characters due to + // maxServiceIdLen. + testServiceIdPrefix = "testsvc" + txCommitTimeout = 10 * time.Millisecond +) + +// TODO_TECHDEBT: add coverage for the transactions client handling an events bytes error either. + +func TestTxClient_SignAndBroadcast_Succeeds(t *testing.T) { + var ( + // expectedTx is the expected transactions bytes that will be signed and broadcast + // by the transaction client. It is computed and assigned in the + // testtx.NewOneTimeTxTxContext helper function. The same reference needs + // to be used across the expectations that are set on the transactions context mock. + expectedTx cometbytes.HexBytes + // eventsBzPublishCh is the channel that the mock events query client + // will use to publish the transactions event bytes. It is used near the end of + // the test to mock the network signaling that the transactions was committed. + eventsBzPublishCh chan<- either.Bytes + blocksPublishCh chan client.Block + ctx = context.Background() + ) + + keyring, signingKey := newTestKeyringWithKey(t) + + eventsQueryClient := testeventsquery.NewOneTimeTxEventsQueryClient( + ctx, t, signingKey, &eventsBzPublishCh, + ) + + txCtxMock := testtx.NewOneTimeTxTxContext( + t, keyring, + testSigningKeyName, + &expectedTx, + ) + + // Construct a new mock block client because it is a required dependency. + // Since we're not exercising transactions timeouts in this test, we don't need to + // set any particular expectations on it, nor do we care about the contents + // of the latest block. + blockClientMock := testblock.NewOneTimeCommittedBlocksSequenceBlockClient( + t, blocksPublishCh, + ) + + // Construct a new depinject config with the mocks we created above. + txClientDeps := depinject.Supply( + eventsQueryClient, + txCtxMock, + blockClientMock, + ) + + // Construct the transaction client. + txClient, err := tx.NewTxClient( + ctx, txClientDeps, tx.WithSigningKeyName(testSigningKeyName), + ) + require.NoError(t, err) + + signingKeyAddr, err := signingKey.GetAddress() + require.NoError(t, err) + + // Construct a valid (arbitrary) message to sign, encode, and broadcast. + appStake := types.NewCoin("upokt", types.NewInt(1000000)) + appStakeMsg := &apptypes.MsgStakeApplication{ + Address: signingKeyAddr.String(), + Stake: &appStake, + Services: client.NewTestApplicationServiceConfig(testServiceIdPrefix, 2), + } + + // Sign and broadcast the message. + eitherErr := txClient.SignAndBroadcast(ctx, appStakeMsg) + err, errCh := eitherErr.SyncOrAsyncError() + require.NoError(t, err) + + // Construct the expected transaction event bytes from the expected transaction bytes. + txEventBz, err := json.Marshal(&tx.TxEvent{Tx: expectedTx}) + require.NoError(t, err) + + // Publish the transaction event bytes to the events query client so that the transaction client + // registers the transactions as committed (i.e. removes it from the timeout pool). + eventsBzPublishCh <- either.Success[[]byte](txEventBz) + + // Assert that the error channel was closed without receiving. + select { + case err, ok := <-errCh: + require.NoError(t, err) + require.Falsef(t, ok, "expected errCh to be closed") + case <-time.After(txCommitTimeout): + t.Fatal("test timed out waiting for errCh to receive") + } +} + +func TestTxClient_NewTxClient_Error(t *testing.T) { + // Construct an empty in-memory keyring. + keyring := cosmoskeyring.NewInMemory(testclient.EncodingConfig.Marshaler) + + tests := []struct { + name string + signingKeyName string + expectedErr error + }{ + { + name: "empty signing key name", + signingKeyName: "", + expectedErr: tx.ErrEmptySigningKeyName, + }, + { + name: "signing key does not exist", + signingKeyName: "nonexistent", + expectedErr: tx.ErrNoSuchSigningKey, + }, + // TODO_TECHDEBT: add coverage for this error case + // { + // name: "failed to get address", + // testSigningKeyName: "incompatible", + // expectedErr: tx.ErrSigningKeyAddr, + // }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var ( + ctrl = gomock.NewController(t) + ctx = context.Background() + ) + + // Construct a new mock events query client. Since we expect the + // NewTxClient call to fail, we don't need to set any expectations + // on this mock. + eventsQueryClient := mockclient.NewMockEventsQueryClient(ctrl) + + // Construct a new mock transactions context. + txCtxMock, _ := testtx.NewAnyTimesTxTxContext(t, keyring) + + // Construct a new mock block client. Since we expect the NewTxClient + // call to fail, we don't need to set any expectations on this mock. + blockClientMock := mockclient.NewMockBlockClient(ctrl) + + // Construct a new depinject config with the mocks we created above. + txClientDeps := depinject.Supply( + eventsQueryClient, + txCtxMock, + blockClientMock, + ) + + // Construct a signing key option using the test signing key name. + signingKeyOpt := tx.WithSigningKeyName(tt.signingKeyName) + + // Attempt to create the transactions client. + txClient, err := tx.NewTxClient(ctx, txClientDeps, signingKeyOpt) + require.ErrorIs(t, err, tt.expectedErr) + require.Nil(t, txClient) + }) + } +} + +func TestTxClient_SignAndBroadcast_SyncError(t *testing.T) { + var ( + // eventsBzPublishCh is the channel that the mock events query client + // will use to publish the transactions event bytes. It is not used in + // this test but is required to use the NewOneTimeTxEventsQueryClient + // helper. + eventsBzPublishCh chan<- either.Bytes + // blocksPublishCh is the channel that the mock block client will use + // to publish the latest block. It is not used in this test but is + // required to use the NewOneTimeCommittedBlocksSequenceBlockClient + // helper. + blocksPublishCh chan client.Block + ctx = context.Background() + ) + + keyring, signingKey := newTestKeyringWithKey(t) + + // Construct a new mock events query client. Since we expect the + // NewTxClient call to fail, we don't need to set any expectations + // on this mock. + eventsQueryClient := testeventsquery.NewOneTimeTxEventsQueryClient( + ctx, t, signingKey, &eventsBzPublishCh, + ) + + // Construct a new mock transaction context. + txCtxMock, _ := testtx.NewAnyTimesTxTxContext(t, keyring) + + // Construct a new mock block client because it is a required dependency. + // Since we're not exercising transactions timeouts in this test, we don't need to + // set any particular expectations on it, nor do we care about the contents + // of the latest block. + blockClientMock := testblock.NewOneTimeCommittedBlocksSequenceBlockClient( + t, blocksPublishCh, + ) + + // Construct a new depinject config with the mocks we created above. + txClientDeps := depinject.Supply( + eventsQueryClient, + txCtxMock, + blockClientMock, + ) + + // Construct the transaction client. + txClient, err := tx.NewTxClient( + ctx, txClientDeps, tx.WithSigningKeyName(testSigningKeyName), + ) + require.NoError(t, err) + + // Construct an invalid (arbitrary) message to sign, encode, and broadcast. + signingAddr, err := signingKey.GetAddress() + require.NoError(t, err) + appStakeMsg := &apptypes.MsgStakeApplication{ + // Providing address to avoid panic from #GetSigners(). + Address: signingAddr.String(), + Stake: nil, + // NB: explicitly omitting required fields + } + + eitherErr := txClient.SignAndBroadcast(ctx, appStakeMsg) + err, _ = eitherErr.SyncOrAsyncError() + require.ErrorIs(t, err, tx.ErrInvalidMsg) + + time.Sleep(10 * time.Millisecond) +} + +// TODO_INCOMPLETE: add coverage for async error; i.e. insufficient gas or on-chain error +func TestTxClient_SignAndBroadcast_CheckTxError(t *testing.T) { + var ( + // expectedErrMsg is the expected error message that will be returned + // by the transaction client. It is computed and assigned in the + // testtx.NewOneTimeErrCheckTxTxContext helper function. + expectedErrMsg string + // eventsBzPublishCh is the channel that the mock events query client + // will use to publish the transactions event bytes. It is used near the end of + // the test to mock the network signaling that the transactions was committed. + eventsBzPublishCh chan<- either.Bytes + blocksPublishCh chan client.Block + ctx = context.Background() + ) + + keyring, signingKey := newTestKeyringWithKey(t) + + eventsQueryClient := testeventsquery.NewOneTimeTxEventsQueryClient( + ctx, t, signingKey, &eventsBzPublishCh, + ) + + txCtxMock := testtx.NewOneTimeErrCheckTxTxContext( + t, keyring, + testSigningKeyName, + &expectedErrMsg, + ) + + // Construct a new mock block client because it is a required dependency. + // Since we're not exercising transactions timeouts in this test, we don't need to + // set any particular expectations on it, nor do we care about the contents + // of the latest block. + blockClientMock := testblock.NewOneTimeCommittedBlocksSequenceBlockClient( + t, blocksPublishCh, + ) + + // Construct a new depinject config with the mocks we created above. + txClientDeps := depinject.Supply( + eventsQueryClient, + txCtxMock, + blockClientMock, + ) + + // Construct the transaction client. + txClient, err := tx.NewTxClient(ctx, txClientDeps, tx.WithSigningKeyName(testSigningKeyName)) + require.NoError(t, err) + + signingKeyAddr, err := signingKey.GetAddress() + require.NoError(t, err) + + // Construct a valid (arbitrary) message to sign, encode, and broadcast. + appStake := types.NewCoin("upokt", types.NewInt(1000000)) + appStakeMsg := &apptypes.MsgStakeApplication{ + Address: signingKeyAddr.String(), + Stake: &appStake, + Services: client.NewTestApplicationServiceConfig(testServiceIdPrefix, 2), + } + + // Sign and broadcast the message. + eitherErr := txClient.SignAndBroadcast(ctx, appStakeMsg) + err, _ = eitherErr.SyncOrAsyncError() + require.ErrorIs(t, err, tx.ErrCheckTx) + require.ErrorContains(t, err, expectedErrMsg) +} + +func TestTxClient_SignAndBroadcast_Timeout(t *testing.T) { + var ( + // expectedErrMsg is the expected error message that will be returned + // by the transaction client. It is computed and assigned in the + // testtx.NewOneTimeErrCheckTxTxContext helper function. + expectedErrMsg string + // eventsBzPublishCh is the channel that the mock events query client + // will use to publish the transaction event bytes. It is used near the end of + // the test to mock the network signaling that the transaction was committed. + eventsBzPublishCh chan<- either.Bytes + blocksPublishCh = make(chan client.Block, tx.DefaultCommitTimeoutHeightOffset) + ctx = context.Background() + ) + + keyring, signingKey := newTestKeyringWithKey(t) + + eventsQueryClient := testeventsquery.NewOneTimeTxEventsQueryClient( + ctx, t, signingKey, &eventsBzPublishCh, + ) + + txCtxMock := testtx.NewOneTimeErrTxTimeoutTxContext( + t, keyring, + testSigningKeyName, + &expectedErrMsg, + ) + + // Construct a new mock block client because it is a required dependency. + // Since we're not exercising transaction timeouts in this test, we don't need to + // set any particular expectations on it, nor do we care about the contents + // of the latest block. + blockClientMock := testblock.NewOneTimeCommittedBlocksSequenceBlockClient( + t, blocksPublishCh, + ) + + // Construct a new depinject config with the mocks we created above. + txClientDeps := depinject.Supply( + eventsQueryClient, + txCtxMock, + blockClientMock, + ) + + // Construct the transaction client. + txClient, err := tx.NewTxClient( + ctx, txClientDeps, tx.WithSigningKeyName(testSigningKeyName), + ) + require.NoError(t, err) + + signingKeyAddr, err := signingKey.GetAddress() + require.NoError(t, err) + + // Construct a valid (arbitrary) message to sign, encode, and broadcast. + appStake := types.NewCoin("upokt", types.NewInt(1000000)) + appStakeMsg := &apptypes.MsgStakeApplication{ + Address: signingKeyAddr.String(), + Stake: &appStake, + Services: client.NewTestApplicationServiceConfig(testServiceIdPrefix, 2), + } + + // Sign and broadcast the message in a transaction. + eitherErr := txClient.SignAndBroadcast(ctx, appStakeMsg) + err, errCh := eitherErr.SyncOrAsyncError() + require.NoError(t, err) + + for i := 0; i < tx.DefaultCommitTimeoutHeightOffset; i++ { + blocksPublishCh <- testblock.NewAnyTimesBlock(t, []byte{}, int64(i+1)) + } + + // Assert that we receive the expected error type & message. + select { + case err := <-errCh: + require.ErrorIs(t, err, tx.ErrTxTimeout) + require.ErrorContains(t, err, expectedErrMsg) + // NB: wait 110% of txCommitTimeout; a bit longer than strictly necessary in + // order to mitigate flakiness. + case <-time.After(txCommitTimeout * 110 / 100): + t.Fatal("test timed out waiting for errCh to receive") + } + + // Assert that the error channel was closed. + select { + case err, ok := <-errCh: + require.Falsef(t, ok, "expected errCh to be closed") + require.NoError(t, err) + // NB: Give the error channel some time to be ready to receive in order to + // mitigate flakiness. + case <-time.After(50 * time.Millisecond): + t.Fatal("expected errCh to be closed") + } +} + +// TODO_TECHDEBT: add coverage for sending multiple messages simultaneously +func TestTxClient_SignAndBroadcast_MultipleMsgs(t *testing.T) { + t.SkipNow() +} + +// newTestKeyringWithKey creates a new in-memory keyring with a test key +// with testSigningKeyName as its name. +func newTestKeyringWithKey(t *testing.T) (cosmoskeyring.Keyring, *cosmoskeyring.Record) { + keyring := cosmoskeyring.NewInMemory(testclient.EncodingConfig.Marshaler) + key, _ := testclient.NewKey(t, testSigningKeyName, keyring) + return keyring, key +} diff --git a/pkg/client/tx/context.go b/pkg/client/tx/context.go index 5865ae526..eca32f943 100644 --- a/pkg/client/tx/context.go +++ b/pkg/client/tx/context.go @@ -22,7 +22,7 @@ type cosmosTxContext struct { // Holds cosmos-sdk client context. // (see: https://pkg.go.dev/github.com/cosmos/cosmos-sdk@v0.47.5/client#Context) clientCtx cosmosclient.Context - // Holds the cosmos-sdk tx factory. + // Holds the cosmos-sdk transaction factory. // (see: https://pkg.go.dev/github.com/cosmos/cosmos-sdk@v0.47.5/client/tx#Factory) txFactory cosmostx.Factory } @@ -67,7 +67,7 @@ func (txCtx cosmosTxContext) SignTx( ) } -// NewTxBuilder returns a new tx builder instance using the cosmos-sdk client tx config. +// NewTxBuilder returns a new transaction builder instance using the cosmos-sdk client transaction config. func (txCtx cosmosTxContext) NewTxBuilder() cosmosclient.TxBuilder { return txCtx.clientCtx.TxConfig.NewTxBuilder() } @@ -77,14 +77,15 @@ func (txCtx cosmosTxContext) EncodeTx(txBuilder cosmosclient.TxBuilder) ([]byte, return txCtx.clientCtx.TxConfig.TxEncoder()(txBuilder.GetTx()) } -// BroadcastTx broadcasts the given tx to the network, blocking until the check-tx -// ABCI operation completes and returns a TxResponse of the tx status at that point in time. +// BroadcastTx broadcasts the given transaction to the network, blocking until the check-tx +// ABCI operation completes and returns a TxResponse of the transaction status at that point in time. func (txCtx cosmosTxContext) BroadcastTx(txBytes []byte) (*cosmostypes.TxResponse, error) { - return txCtx.clientCtx.BroadcastTxSync(txBytes) + return txCtx.clientCtx.BroadcastTxAsync(txBytes) + //return txCtx.clientCtx.BroadcastTxSync(txBytes) } // QueryTx queries the transaction based on its hash and optionally provides proof -// of the transaction. It returns the tx query result. +// of the transaction. It returns the transaction query result. func (txCtx cosmosTxContext) QueryTx( ctx context.Context, txHash []byte, diff --git a/pkg/client/tx/encoding.go b/pkg/client/tx/encoding.go new file mode 100644 index 000000000..78612e7b7 --- /dev/null +++ b/pkg/client/tx/encoding.go @@ -0,0 +1,18 @@ +package tx + +import ( + "fmt" + "strings" +) + +// normalizeTxHashHex defines canonical and unambiguous representation for a +// transaction hash hexadecimal string; lower-case. +func normalizeTxHashHex(txHash string) string { + return strings.ToLower(txHash) +} + +// txHashBytesToNormalizedHex converts a transaction hash bytes to a normalized +// hexadecimal string representation. +func txHashBytesToNormalizedHex(txHash []byte) string { + return normalizeTxHashHex(fmt.Sprintf("%x", txHash)) +} diff --git a/pkg/client/tx/errors.go b/pkg/client/tx/errors.go new file mode 100644 index 000000000..474f2ac19 --- /dev/null +++ b/pkg/client/tx/errors.go @@ -0,0 +1,53 @@ +package tx + +import errorsmod "cosmossdk.io/errors" + +var ( + // ErrEmptySigningKeyName represents an error which indicates that the + // provided signing key name is empty or unspecified. + ErrEmptySigningKeyName = errorsmod.Register(codespace, 1, "empty signing key name") + + // ErrNoSuchSigningKey represents an error signifying that the requested + // signing key does not exist or could not be located. + ErrNoSuchSigningKey = errorsmod.Register(codespace, 2, "signing key does not exist") + + // ErrSigningKeyAddr is raised when there's a failure in retrieving the + // associated address for the provided signing key. + ErrSigningKeyAddr = errorsmod.Register(codespace, 3, "failed to get address for signing key") + + // ErrInvalidMsg signifies that there was an issue in validating the + // transaction message. This could be due to format, content, or other + // constraints imposed on the message. + ErrInvalidMsg = errorsmod.Register(codespace, 4, "failed to validate tx message") + + // ErrCheckTx indicates an error occurred during the ABCI check transaction + // process, which verifies the transaction's integrity before it is added + // to the mempool. + ErrCheckTx = errorsmod.Register(codespace, 5, "error during ABCI check tx") + + // ErrTxTimeout is raised when a transaction has taken too long to + // complete, surpassing a predefined threshold. + ErrTxTimeout = errorsmod.Register(codespace, 6, "tx timed out") + + // ErrQueryTx indicates an error occurred while trying to query for the status + // of a specific transaction, likely due to issues with the query parameters + // or the state of the blockchain network. + ErrQueryTx = errorsmod.Register(codespace, 7, "error encountered while querying for tx") + + // ErrInvalidTxHash represents an error which is triggered when the + // transaction hash provided does not adhere to the expected format or + // constraints, implying it may be corrupted or tampered with. + ErrInvalidTxHash = errorsmod.Register(codespace, 8, "invalid tx hash") + + // ErrNonTxEventBytes indicates an attempt to deserialize bytes that do not + // correspond to a transaction event. This error is triggered when the provided + // byte data isn't recognized as a valid transaction event representation. + ErrNonTxEventBytes = errorsmod.Register(codespace, 9, "attempted to deserialize non-tx event bytes") + + // ErrUnmarshalTx signals a failure in the unmarshalling process of a transaction. + // This error is triggered when the system encounters issues translating a set of + // bytes into the corresponding Tx structure or object. + ErrUnmarshalTx = errorsmod.Register(codespace, 10, "failed to unmarshal tx") + + codespace = "tx_client" +) diff --git a/pkg/client/tx/options.go b/pkg/client/tx/options.go new file mode 100644 index 000000000..34e782b6d --- /dev/null +++ b/pkg/client/tx/options.go @@ -0,0 +1,22 @@ +package tx + +import ( + "github.com/pokt-network/poktroll/pkg/client" +) + +// WithCommitTimeoutBlocks sets the timeout duration in terms of number of blocks +// for the client to wait for broadcast transactions to be committed before +// returning a timeout error. +func WithCommitTimeoutBlocks(timeout int64) client.TxClientOption { + return func(client client.TxClient) { + client.(*txClient).commitTimeoutHeightOffset = timeout + } +} + +// WithSigningKeyName sets the name of the key which should be retrieved from the +// keyring and used for signing transactions. +func WithSigningKeyName(keyName string) client.TxClientOption { + return func(client client.TxClient) { + client.(*txClient).signingKeyName = keyName + } +} diff --git a/pkg/observable/channel/replay.go b/pkg/observable/channel/replay.go index 583edb4e5..a3935543d 100644 --- a/pkg/observable/channel/replay.go +++ b/pkg/observable/channel/replay.go @@ -38,8 +38,9 @@ type replayObservable[V any] struct { func NewReplayObservable[V any]( ctx context.Context, replayBufferSize int, + opts ...option[V], ) (observable.ReplayObservable[V], chan<- V) { - obsvbl, publishCh := NewObservable[V]() + obsvbl, publishCh := NewObservable[V](opts...) return ToReplayObservable[V](ctx, replayBufferSize, obsvbl), publishCh }