From 5bfe151fd89fbdb20b87b07aea854f8018ba13f7 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Wed, 15 Nov 2023 10:01:27 +0100 Subject: [PATCH 01/10] chore: update anvil service port in make targets --- Makefile | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index 735cec340..5db5ec974 100644 --- a/Makefile +++ b/Makefile @@ -364,19 +364,19 @@ supplier_stake: ## Stake tokens for the supplier specified (must specify the APP supplier1_stake: ## Stake supplier1 (also staked in genesis) # TODO_TECHDEBT(#179): once `relayminer` service is added to tilt, this hostname should point to that service. # I.e.: replace `localhost` with `relayminer` (or whatever the service's hostname is). - SUPPLIER=supplier1 SERVICES="anvil;http://localhost:8548,svc1;http://localhost:8081" make supplier_stake + SUPPLIER=supplier1 SERVICES="anvil;http://localhost:8545,svc1;http://localhost:8081" make supplier_stake .PHONY: supplier2_stake supplier2_stake: ## Stake supplier2 # TODO_TECHDEBT(#179): once `relayminer` service is added to tilt, this hostname should point to that service. # I.e.: replace `localhost` with `relayminer` (or whatever the service's hostname is). - SUPPLIER=supplier2 SERVICES="anvil;http://localhost:8548,svc2;http://localhost:8082" make supplier_stake + SUPPLIER=supplier2 SERVICES="anvil;http://localhost:8545,svc2;http://localhost:8082" make supplier_stake .PHONY: supplier3_stake supplier3_stake: ## Stake supplier3 # TODO_TECHDEBT(#179): once `relayminer` service is added to tilt, this hostname should point to that service. # I.e.: replace `localhost` with `relayminer` (or whatever the service's hostname is). - SUPPLIER=supplier3 SERVICES="anvil;http://localhost:8548,svc3;http://localhost:8083" make supplier_stake + SUPPLIER=supplier3 SERVICES="anvil;http://localhost:8545,svc3;http://localhost:8083" make supplier_stake .PHONY: supplier_unstake supplier_unstake: ## Unstake an supplier (must specify the SUPPLIER env var) From 86255b3ebd90854a719cd9165ebc42c5f76cf5b5 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Wed, 15 Nov 2023 10:14:12 +0100 Subject: [PATCH 02/10] chore: review feedback improvements Co-authored-by: Daniel Olshansky --- pkg/relayer/cmd/cmd.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/relayer/cmd/cmd.go b/pkg/relayer/cmd/cmd.go index 994bdb03d..db9a46389 100644 --- a/pkg/relayer/cmd/cmd.go +++ b/pkg/relayer/cmd/cmd.go @@ -42,10 +42,10 @@ func RelayerCmd() *cobra.Command { cmd.Flags().String(cosmosflags.FlagKeyringBackend, "", "Select keyring's backend (os|file|kwallet|pass|test)") // TODO_TECHDEBT: integrate these flags with the client context (i.e. cosmosflags, config, viper, etc.) - // This is simpler to do with server-side configs (see rootCmd#PersistentPreRunE). - // Will require more effort than currently justifiable. + // This is simpler to do with server-side configs (see rootCmd#PersistentPreRunE) and requires more effort than currently justifiable. cmd.Flags().StringVar(&flagSigningKeyName, "signing-key", "", "Name of the key to sign transactions") - cmd.Flags().StringVar(&flagSmtStorePath, "smt-store", "smt", "Path to the SMT KV store") + // TODO_TECHDEBT(#137): This, alongside other flags, should be part of a config file suppliers provide. + cmd.Flags().StringVar(&flagSmtStorePath, "smt-store", "smt", "Path to where the data backing SMT KV store exists on disk") // Communication flags // TODO_TECHDEBT: We're using `explicitly omitting default` so the relayer crashes if these aren't specified. // Figure out what good defaults should be post alpha. From 0aaddc84fa36acd3964907154065fe2f963e2703 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Wed, 15 Nov 2023 10:49:29 +0100 Subject: [PATCH 03/10] refactor: relayminer depinject helpers & godoc comments on all constructors --- pkg/client/block/client.go | 3 + pkg/client/events_query/client.go | 5 + pkg/client/tx/client.go | 9 ++ pkg/client/tx/context.go | 6 +- pkg/relayer/cmd/cmd.go | 188 +++++++++++++++++------------- pkg/relayer/proxy/proxy.go | 14 ++- pkg/relayer/session/session.go | 7 ++ 7 files changed, 149 insertions(+), 83 deletions(-) diff --git a/pkg/client/block/client.go b/pkg/client/block/client.go index 375171d28..7ded373f2 100644 --- a/pkg/client/block/client.go +++ b/pkg/client/block/client.go @@ -75,6 +75,9 @@ type eventBytesToBlockMapFn = func( ) (client.Block, bool) // NewBlockClient creates a new block client from the given dependencies and cometWebsocketURL. +// +// Required dependencies: +// - client.EventsQueryClient func NewBlockClient( ctx context.Context, deps depinject.Config, diff --git a/pkg/client/events_query/client.go b/pkg/client/events_query/client.go index 88ace493f..34a7d35d2 100644 --- a/pkg/client/events_query/client.go +++ b/pkg/client/events_query/client.go @@ -62,6 +62,11 @@ func (ebc *eventsBytesAndConn) Close() { _ = ebc.conn.Close() } +// NewEventsQueryClient returns a new events query client which is used to +// subscribe to on-chain events matching the given query. +// +// Available options: +// - WithDialer func NewEventsQueryClient(cometWebsocketURL string, opts ...client.EventsQueryClientOption) client.EventsQueryClient { evtClient := &eventsQueryClient{ cometWebsocketURL: cometWebsocketURL, diff --git a/pkg/client/tx/client.go b/pkg/client/tx/client.go index aa2561832..39f5208e0 100644 --- a/pkg/client/tx/client.go +++ b/pkg/client/tx/client.go @@ -105,6 +105,15 @@ type TxEvent struct { // validateConfigAndSetDefaults method. // 5. Subscribes the client to its own transactions. This step might be // reconsidered for relocation to a potential Start() method in the future. +// +// Required dependencies: +// - client.TxContext +// - client.EventsQueryClient +// - client.BlockClient +// +// Available options: +// - WithSigningKeyName +// - WithCommitTimeoutHeightOffset func NewTxClient( ctx context.Context, deps depinject.Config, diff --git a/pkg/client/tx/context.go b/pkg/client/tx/context.go index 040731b46..01cf76382 100644 --- a/pkg/client/tx/context.go +++ b/pkg/client/tx/context.go @@ -30,6 +30,10 @@ type cosmosTxContext struct { // NewTxContext initializes a new cosmosTxContext with the given dependencies. // It uses depinject to populate its members and returns a client.TxContext // interface type. +// +// Required dependencies: +// - cosmosclient.Context +// - cosmostx.Factory func NewTxContext(deps depinject.Config) (client.TxContext, error) { txCtx := cosmosTxContext{} @@ -41,8 +45,6 @@ func NewTxContext(deps depinject.Config) (client.TxContext, error) { return nil, err } - txCtx.clientCtx = cosmosclient.Context(txCtx.clientCtx) - return txCtx, nil } diff --git a/pkg/relayer/cmd/cmd.go b/pkg/relayer/cmd/cmd.go index db9a46389..a13f41b4d 100644 --- a/pkg/relayer/cmd/cmd.go +++ b/pkg/relayer/cmd/cmd.go @@ -30,6 +30,12 @@ var ( flagPocketNode string ) +type supplierFn func( + context.Context, + depinject.Config, + *cobra.Command, +) (depinject.Config, error) + func RelayerCmd() *cobra.Command { cmd := &cobra.Command{ Use: "relayminer", @@ -44,7 +50,7 @@ func RelayerCmd() *cobra.Command { // TODO_TECHDEBT: integrate these flags with the client context (i.e. cosmosflags, config, viper, etc.) // This is simpler to do with server-side configs (see rootCmd#PersistentPreRunE) and requires more effort than currently justifiable. cmd.Flags().StringVar(&flagSigningKeyName, "signing-key", "", "Name of the key to sign transactions") - // TODO_TECHDEBT(#137): This, alongside other flags, should be part of a config file suppliers provide. + // TODO_TECHDEBT(#137): This, alongside other flags, should be part of a config file suppliers provide. cmd.Flags().StringVar(&flagSmtStorePath, "smt-store", "smt", "Path to where the data backing SMT KV store exists on disk") // Communication flags // TODO_TECHDEBT: We're using `explicitly omitting default` so the relayer crashes if these aren't specified. @@ -93,90 +99,45 @@ func runRelayer(cmd *cobra.Command, _ []string) error { return nil } -// setupRelayerDependencies sets up all the dependencies the relay miner needs to run: +// setupRelayerDependencies sets up all the dependencies the relay miner needs +// to run by building the dependency tree from the leaves up, incrementally +// supplying each component to an accumulating depinject.Config: // Miner, EventsQueryClient, BlockClient, cosmosclient.Context, TxFactory, TxContext, // TxClient, SupplierClient, RelayerProxy, RelayerSessionsManager. func setupRelayerDependencies( ctx context.Context, cmd *cobra.Command, ) (deps depinject.Config, err error) { - // Initizlize deps to with empty depinject config. - deps, err = supplyMiner(depinject.Configs()) + pocketNodeWebsocketUrl, err := getPocketNodeWebsocketUrl(cmd) if err != nil { return nil, err } - rpcQueryURL, err := getPocketNodeWebsocketURL(cmd) - if err != nil { - return nil, err + supplierFuncs := []supplierFn{ + newSupplyEventsQueryClientFn(pocketNodeWebsocketUrl), // leaf + newSupplyBlockClientFn(pocketNodeWebsocketUrl), + supplyMiner, // leaf + supplyClientContext, // leaf + supplyTxFactory, + supplyTxContext, + supplyTxClient, + supplySupplierClient, + supplyRelayerProxy, + supplyRelayerSessionsManager, } - // Has no dependencies. - deps, err = supplyEventsQueryClient(deps, rpcQueryURL) - if err != nil { - return nil, err - } - - // Depends on EventsQueryClient. - deps, err = supplyBlockClient(ctx, deps, rpcQueryURL) - if err != nil { - return nil, err - } - - // Has no dependencies - deps, err = supplyClientContext(deps, cmd) - - // Depends on clientCtx. - deps, err = supplyTxFactory(deps, cmd) - if err != nil { - return nil, err - } - - // Depends on clientCtx, txFactory, EventsQueryClient, & BlockClient. - deps, err = supplyTxClient(ctx, deps) - if err != nil { - return nil, err - } - - // Depends on txClient & EventsQueryClient. - deps, err = supplySupplierClient(deps) - if err != nil { - return nil, err - } - - // Depends on clientCtx & BlockClient. - deps, err = supplyRelayerProxy(deps, cmd) - if err != nil { - return nil, err - } - - // Depends on BlockClient & SupplierClient. - deps, err = supplyRelayerSessionsManager(ctx, deps) - if err != nil { - return nil, err + // Initialize deps to with empty depinject config. + deps = depinject.Configs() + for _, supplyFn := range supplierFuncs { + deps, err = supplyFn(ctx, deps, cmd) } return deps, nil } -func supplyMiner( - deps depinject.Config, -) (depinject.Config, error) { - mnr, err := miner.NewMiner() - if err != nil { - return nil, err - } - - return depinject.Configs(deps, depinject.Supply(mnr)), nil -} - -func supplyEventsQueryClient(deps depinject.Config, pocketNodeWebsocketURL string) (depinject.Config, error) { - eventsQueryClient := eventsquery.NewEventsQueryClient(pocketNodeWebsocketURL) - - return depinject.Configs(deps, depinject.Supply(eventsQueryClient)), nil -} - -func getPocketNodeWebsocketURL(cmd *cobra.Command) (string, error) { +// getPocketNodeWebsocketUrl returns the websocket URL of the Pocket Node to +// connect to for subscribing to on-chain events. +func getPocketNodeWebsocketUrl(cmd *cobra.Command) (string, error) { pocketNodeURLStr, err := cmd.Flags().GetString(cosmosflags.FlagNode) if err != nil { return "", err @@ -190,20 +151,62 @@ func getPocketNodeWebsocketURL(cmd *cobra.Command) (string, error) { return fmt.Sprintf("ws://%s/websocket", pocketNodeURL.Host), nil } -func supplyBlockClient( - ctx context.Context, +// newSupplyEventsQueryClientFn constructs an EventsQueryClient instance and returns +// a new depinject.Config which is supplied with the given deps and the new +// EventsQueryClient. +func newSupplyEventsQueryClientFn( + pocketNodeWebsocketUrl string, +) supplierFn { + return func( + _ context.Context, + deps depinject.Config, + _ *cobra.Command, + ) (depinject.Config, error) { + eventsQueryClient := eventsquery.NewEventsQueryClient(pocketNodeWebsocketUrl) + + return depinject.Configs(deps, depinject.Supply(eventsQueryClient)), nil + } +} + +// newSupplyBlockClientFn returns a function with constructs a BlockClient instance +// with the given nodeURL and returns a new +// depinject.Config which is supplied with the given deps and the new +// BlockClient. +func newSupplyBlockClientFn(pocketNodeWebsocketUrl string) supplierFn { + return func( + ctx context.Context, + deps depinject.Config, + _ *cobra.Command, + ) (depinject.Config, error) { + blockClient, err := block.NewBlockClient(ctx, deps, pocketNodeWebsocketUrl) + if err != nil { + return nil, err + } + + return depinject.Configs(deps, depinject.Supply(blockClient)), nil + } +} + +// supplyMiner constructs a Miner instance and returns a new depinject.Config +// which is supplied with the given deps and the new Miner. +func supplyMiner( + _ context.Context, deps depinject.Config, - nodeURL string, + _ *cobra.Command, ) (depinject.Config, error) { - blockClient, err := block.NewBlockClient(ctx, deps, nodeURL) + mnr, err := miner.NewMiner() if err != nil { return nil, err } - return depinject.Configs(deps, depinject.Supply(blockClient)), nil + return depinject.Configs(deps, depinject.Supply(mnr)), nil } +// supplyClientContext constructs a cosmosclient.Context instance and returns a +// new depinject.Config which is supplied with the given deps and the new +// cosmosclient.Context. func supplyClientContext( + _ context.Context, deps depinject.Config, cmd *cobra.Command, ) (depinject.Config, error) { @@ -218,7 +221,11 @@ func supplyClientContext( return deps, nil } +// supplyTxFactory constructs a cosmostx.Factory instance and returns a new +// depinject.Config which is supplied with the given deps and the new +// cosmostx.Factory. func supplyTxFactory( + _ context.Context, deps depinject.Config, cmd *cobra.Command, ) (depinject.Config, error) { @@ -235,16 +242,26 @@ func supplyTxFactory( return depinject.Configs(deps, depinject.Supply(clientFactory)), nil } -func supplyTxClient( - ctx context.Context, +func supplyTxContext( + _ context.Context, deps depinject.Config, + _ *cobra.Command, ) (depinject.Config, error) { txContext, err := tx.NewTxContext(deps) if err != nil { return nil, err } - deps = depinject.Configs(deps, depinject.Supply(txContext)) + return depinject.Configs(deps, depinject.Supply(txContext)), nil +} + +// supplyTxClient constructs a TxClient instance and returns a new +// depinject.Config which is supplied with the given deps and the new TxClient. +func supplyTxClient( + ctx context.Context, + deps depinject.Config, + _ *cobra.Command, +) (depinject.Config, error) { txClient, err := tx.NewTxClient( ctx, deps, @@ -259,7 +276,14 @@ func supplyTxClient( return depinject.Configs(deps, depinject.Supply(txClient)), nil } -func supplySupplierClient(deps depinject.Config) (depinject.Config, error) { +// supplySupplierClient constructs a SupplierClient instance and returns a new +// depinject.Config which is supplied with the given deps and the new +// SupplierClient. +func supplySupplierClient( + _ context.Context, + deps depinject.Config, + _ *cobra.Command, +) (depinject.Config, error) { supplierClient, err := supplier.NewSupplierClient( deps, supplier.WithSigningKeyName(flagSigningKeyName), @@ -271,9 +295,13 @@ func supplySupplierClient(deps depinject.Config) (depinject.Config, error) { return depinject.Configs(deps, depinject.Supply(supplierClient)), nil } +// supplyRelayerProxy constructs a RelayerProxy instance and returns a new +// depinject.Config which is supplied with the given deps and the new +// RelayerProxy. func supplyRelayerProxy( + _ context.Context, deps depinject.Config, - cmd *cobra.Command, + _ *cobra.Command, ) (depinject.Config, error) { // TODO_TECHDEBT: this should be populated from some relayerProxy config. // TODO_TECHDEBT(#179): this hostname should be updated to match that of the @@ -299,9 +327,13 @@ func supplyRelayerProxy( return depinject.Configs(deps, depinject.Supply(relayerProxy)), nil } +// supplyRelayerSessionsManager constructs a RelayerSessionsManager instance +// and returns a new depinject.Config which is supplied with the given deps and +// the new RelayerSessionsManager. func supplyRelayerSessionsManager( ctx context.Context, deps depinject.Config, + _ *cobra.Command, ) (depinject.Config, error) { relayerSessionsManager, err := session.NewRelayerSessions( ctx, deps, diff --git a/pkg/relayer/proxy/proxy.go b/pkg/relayer/proxy/proxy.go index 242ca0230..b97ef4695 100644 --- a/pkg/relayer/proxy/proxy.go +++ b/pkg/relayer/proxy/proxy.go @@ -7,7 +7,7 @@ import ( "cosmossdk.io/depinject" ringtypes "github.com/athanorlabs/go-dleq/types" - sdkclient "github.com/cosmos/cosmos-sdk/client" + cosmosclient "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/crypto/keyring" accounttypes "github.com/cosmos/cosmos-sdk/x/auth/types" "golang.org/x/sync/errgroup" @@ -83,7 +83,7 @@ type relayerProxy struct { ringCacheMutex *sync.RWMutex // clientCtx is the Cosmos' client context used to build the needed query clients and unmarshal their replies. - clientCtx sdkclient.Context + clientCtx cosmosclient.Context // supplierAddress is the address of the supplier that the relayer proxy is running for. supplierAddress string @@ -91,6 +91,14 @@ type relayerProxy struct { // NewRelayerProxy creates a new relayer proxy with the given dependencies or returns // an error if the dependencies fail to resolve or the options are invalid. +// +// Required dependencies: +// - cosmosclient.Context +// - client.BlockClient +// +// Available options: +// - WithSigningKeyName +// - WithProxiedServicesEndpoints func NewRelayerProxy( deps depinject.Config, opts ...relayer.RelayerProxyOption, @@ -105,7 +113,7 @@ func NewRelayerProxy( return nil, err } - rp.clientCtx = sdkclient.Context(rp.clientCtx) + rp.clientCtx = cosmosclient.Context(rp.clientCtx) servedRelays, servedRelaysProducer := channel.NewObservable[*types.Relay]() diff --git a/pkg/relayer/session/session.go b/pkg/relayer/session/session.go index 0a0f66aff..9f4441f82 100644 --- a/pkg/relayer/session/session.go +++ b/pkg/relayer/session/session.go @@ -45,6 +45,13 @@ type relayerSessionsManager struct { } // NewRelayerSessions creates a new relayerSessions. +// +// Required dependencies: +// - client.BlockClient +// - client.SupplierClient +// +// Available options: +// - WithStoresDirectory func NewRelayerSessions( ctx context.Context, deps depinject.Config, From 6ad6abebeaf7575a6f2dcd766376b4922c6af8f2 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Wed, 15 Nov 2023 12:43:34 +0100 Subject: [PATCH 04/10] =?UTF-8?q?refactor:=20separate=20tx=20and=20query?= =?UTF-8?q?=20client=20contexts=20=F0=9F=99=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/client/tx/context.go | 9 +-- pkg/relayer/cmd/cmd.go | 100 ++++++++++++++++++++--------- pkg/relayer/interface.go | 13 ++++ pkg/relayer/proxy/proxy.go | 13 ++-- pkg/relayer/session/sessiontree.go | 2 +- 5 files changed, 95 insertions(+), 42 deletions(-) diff --git a/pkg/client/tx/context.go b/pkg/client/tx/context.go index 01cf76382..2bfa0da5f 100644 --- a/pkg/client/tx/context.go +++ b/pkg/client/tx/context.go @@ -12,6 +12,7 @@ import ( authclient "github.com/cosmos/cosmos-sdk/x/auth/client" "github.com/pokt-network/poktroll/pkg/client" + "github.com/pokt-network/poktroll/pkg/relayer" ) var _ client.TxContext = (*cosmosTxContext)(nil) @@ -21,7 +22,7 @@ var _ client.TxContext = (*cosmosTxContext)(nil) type cosmosTxContext struct { // Holds cosmos-sdk client context. // (see: https://pkg.go.dev/github.com/cosmos/cosmos-sdk@v0.47.5/client#Context) - clientCtx cosmosclient.Context + clientCtx relayer.TxClientContext // Holds the cosmos-sdk transaction factory. // (see: https://pkg.go.dev/github.com/cosmos/cosmos-sdk@v0.47.5/client/tx#Factory) txFactory cosmostx.Factory @@ -64,7 +65,7 @@ func (txCtx cosmosTxContext) SignTx( ) error { return authclient.SignTx( txCtx.txFactory, - txCtx.clientCtx, + cosmosclient.Context(txCtx.clientCtx), signingKeyName, txBuilder, offline, overwriteSig, @@ -84,8 +85,8 @@ func (txCtx cosmosTxContext) EncodeTx(txBuilder cosmosclient.TxBuilder) ([]byte, // BroadcastTx broadcasts the given transaction to the network, blocking until the check-tx // ABCI operation completes and returns a TxResponse of the transaction status at that point in time. func (txCtx cosmosTxContext) BroadcastTx(txBytes []byte) (*cosmostypes.TxResponse, error) { - return txCtx.clientCtx.BroadcastTxAsync(txBytes) - //return txCtx.clientCtx.BroadcastTxSync(txBytes) + clientCtx := cosmosclient.Context(txCtx.clientCtx) + return clientCtx.BroadcastTxAsync(txBytes) } // QueryTx queries the transaction based on its hash and optionally provides proof diff --git a/pkg/relayer/cmd/cmd.go b/pkg/relayer/cmd/cmd.go index a13f41b4d..17561d149 100644 --- a/pkg/relayer/cmd/cmd.go +++ b/pkg/relayer/cmd/cmd.go @@ -23,11 +23,13 @@ import ( "github.com/pokt-network/poktroll/pkg/relayer/session" ) +const omittedDefaultFlagValue = "explicitly omitting default" + var ( - flagSigningKeyName string - flagSmtStorePath string - flagSequencerNode string - flagPocketNode string + flagSigningKeyName string + flagSmtStorePath string + flagSequencerNodeUrl string + flagPocketNodeUrl string ) type supplierFn func( @@ -55,15 +57,9 @@ func RelayerCmd() *cobra.Command { // Communication flags // TODO_TECHDEBT: We're using `explicitly omitting default` so the relayer crashes if these aren't specified. // Figure out what good defaults should be post alpha. - cmd.Flags().StringVar(&flagSequencerNode, "sequencer-node", "explicitly omitting default", ": to sequencer node to submit txs") - cmd.Flags().StringVar(&flagPocketNode, "pocket-node", "explicitly omitting default", ": to full pocket node for reading data and listening for on-chain events") - cmd.Flags().String(cosmosflags.FlagNode, "explicitly omitting default", "registering the default cosmos node flag; needed to initialize the cosmostx and query contexts correctly") - - // Set --node flag to the --sequencer-node for the client context - err := cmd.Flags().Set(cosmosflags.FlagNode, fmt.Sprintf("tcp://%s", flagSequencerNode)) - if err != nil { - panic(err) - } + cmd.Flags().StringVar(&flagSequencerNodeUrl, "sequencer-node", "explicitly omitting default", "tcp://: to sequencer node to submit txs") + cmd.Flags().StringVar(&flagPocketNodeUrl, "pocket-node", omittedDefaultFlagValue, "tcp://: to full pocket node for reading data and listening for on-chain events") + cmd.Flags().String(cosmosflags.FlagNode, omittedDefaultFlagValue, "registering the default cosmos node flag; needed to initialize the cosmostx and query contexts correctly") return cmd } @@ -108,7 +104,7 @@ func setupRelayerDependencies( ctx context.Context, cmd *cobra.Command, ) (deps depinject.Config, err error) { - pocketNodeWebsocketUrl, err := getPocketNodeWebsocketUrl(cmd) + pocketNodeWebsocketUrl, err := getPocketNodeWebsocketUrl() if err != nil { return nil, err } @@ -116,8 +112,9 @@ func setupRelayerDependencies( supplierFuncs := []supplierFn{ newSupplyEventsQueryClientFn(pocketNodeWebsocketUrl), // leaf newSupplyBlockClientFn(pocketNodeWebsocketUrl), - supplyMiner, // leaf - supplyClientContext, // leaf + supplyMiner, // leaf + supplyQueryClientContext, // leaf + supplyTxClientContext, // leaf supplyTxFactory, supplyTxContext, supplyTxClient, @@ -130,6 +127,9 @@ func setupRelayerDependencies( deps = depinject.Configs() for _, supplyFn := range supplierFuncs { deps, err = supplyFn(ctx, deps, cmd) + if err != nil { + return nil, err + } } return deps, nil @@ -137,13 +137,12 @@ func setupRelayerDependencies( // getPocketNodeWebsocketUrl returns the websocket URL of the Pocket Node to // connect to for subscribing to on-chain events. -func getPocketNodeWebsocketUrl(cmd *cobra.Command) (string, error) { - pocketNodeURLStr, err := cmd.Flags().GetString(cosmosflags.FlagNode) - if err != nil { - return "", err +func getPocketNodeWebsocketUrl() (string, error) { + if flagPocketNodeUrl == omittedDefaultFlagValue { + return "", fmt.Errorf("--pocket-node flag is required") } - pocketNodeURL, err := url.Parse(pocketNodeURLStr) + pocketNodeURL, err := url.Parse(flagPocketNodeUrl) if err != nil { return "", err } @@ -202,22 +201,62 @@ func supplyMiner( return depinject.Configs(deps, depinject.Supply(mnr)), nil } -// supplyClientContext constructs a cosmosclient.Context instance and returns a +func supplyQueryClientContext( + _ context.Context, + deps depinject.Config, + cmd *cobra.Command, +) (depinject.Config, error) { + // Set --node flag to the --pocket-node for the client context + // This flag is read by cosmosclient.GetClientQueryContext. + err := cmd.Flags().Set(cosmosflags.FlagNode, flagPocketNodeUrl) + if err != nil { + return nil, err + } + + // NB: Currently, the implementations of GetClientTxContext() and + // GetClientQueryContext() are identical, allowing for their interchangeable + // use in both querying and transaction operations. However, in order to support + // independent configuration of client contexts for distinct querying and + // transacting purposes. E.g.: transactions are dispatched to the sequencer + // while queries are handled by a trusted full-node. + queryClientCtx, err := cosmosclient.GetClientQueryContext(cmd) + if err != nil { + return nil, err + } + deps = depinject.Configs(deps, depinject.Supply( + relayer.QueryClientContext(queryClientCtx), + )) + return deps, nil +} + +// supplyTxClientContext constructs a cosmosclient.Context instance and returns a // new depinject.Config which is supplied with the given deps and the new // cosmosclient.Context. -func supplyClientContext( +func supplyTxClientContext( _ context.Context, deps depinject.Config, cmd *cobra.Command, ) (depinject.Config, error) { - // NB: The implementation of #GetClientTxContext() is identical to that of - // #GetClientQueryContext(); either is sufficient for usage in both querying - // and transaction building and broadcasting. - clientCtx, err := cosmosclient.GetClientTxContext(cmd) + // Set --node flag to the --sequencer-node for this client context. + // This flag is read by cosmosclient.GetClientTxContext. + err := cmd.Flags().Set(cosmosflags.FlagNode, flagSequencerNodeUrl) + if err != nil { + return nil, err + } + + // NB: Currently, the implementations of GetClientTxContext() and + // GetClientQueryContext() are identical, allowing for their interchangeable + // use in both querying and transaction operations. However, in order to support + // independent configuration of client contexts for distinct querying and + // transacting purposes. E.g.: transactions are dispatched to the sequencer + // while queries are handled by a trusted full-node. + txClientCtx, err := cosmosclient.GetClientTxContext(cmd) if err != nil { return nil, err } - deps = depinject.Configs(deps, depinject.Supply(clientCtx)) + deps = depinject.Configs(deps, depinject.Supply( + relayer.TxClientContext(txClientCtx), + )) return deps, nil } @@ -229,11 +268,12 @@ func supplyTxFactory( deps depinject.Config, cmd *cobra.Command, ) (depinject.Config, error) { - var clientCtx cosmosclient.Context - if err := depinject.Inject(deps, &clientCtx); err != nil { + var txClientCtx relayer.TxClientContext + if err := depinject.Inject(deps, &txClientCtx); err != nil { return nil, err } + clientCtx := cosmosclient.Context(txClientCtx) clientFactory, err := cosmostx.NewFactoryCLI(clientCtx, cmd.Flags()) if err != nil { return nil, err diff --git a/pkg/relayer/interface.go b/pkg/relayer/interface.go index 539361f2d..d58d8149a 100644 --- a/pkg/relayer/interface.go +++ b/pkg/relayer/interface.go @@ -3,6 +3,7 @@ package relayer import ( "context" + "github.com/cosmos/cosmos-sdk/client" "github.com/pokt-network/smt" "github.com/pokt-network/poktroll/pkg/observable" @@ -11,6 +12,18 @@ import ( sharedtypes "github.com/pokt-network/poktroll/x/shared/types" ) +// TxClientContext is used to distinguish a cosmosclient.Context intended for use +// in transactions from others. +// This type is intentionally not an alias in order to make this distinction clear +// to the dependency injector +type TxClientContext client.Context + +// QueryClientContext is used to distinguish a cosmosclient.Context intended for use +// in queries from others. +// This type is intentionally not an alias in order to make this distinction clear +// to the dependency injector +type QueryClientContext client.Context + // Miner is responsible for observing servedRelayObs, hashing and checking the // difficulty of each, finally publishing those with sufficient difficulty to // minedRelayObs as they are applicable for relay volume. diff --git a/pkg/relayer/proxy/proxy.go b/pkg/relayer/proxy/proxy.go index b97ef4695..5e168712f 100644 --- a/pkg/relayer/proxy/proxy.go +++ b/pkg/relayer/proxy/proxy.go @@ -83,7 +83,7 @@ type relayerProxy struct { ringCacheMutex *sync.RWMutex // clientCtx is the Cosmos' client context used to build the needed query clients and unmarshal their replies. - clientCtx cosmosclient.Context + clientCtx relayer.QueryClientContext // supplierAddress is the address of the supplier that the relayer proxy is running for. supplierAddress string @@ -113,16 +113,15 @@ func NewRelayerProxy( return nil, err } - rp.clientCtx = cosmosclient.Context(rp.clientCtx) - + clientCtx := cosmosclient.Context(rp.clientCtx) servedRelays, servedRelaysProducer := channel.NewObservable[*types.Relay]() rp.servedRelays = servedRelays rp.servedRelaysPublishCh = servedRelaysProducer - rp.accountsQuerier = accounttypes.NewQueryClient(rp.clientCtx) - rp.supplierQuerier = suppliertypes.NewQueryClient(rp.clientCtx) - rp.sessionQuerier = sessiontypes.NewQueryClient(rp.clientCtx) - rp.applicationQuerier = apptypes.NewQueryClient(rp.clientCtx) + rp.accountsQuerier = accounttypes.NewQueryClient(clientCtx) + rp.supplierQuerier = suppliertypes.NewQueryClient(clientCtx) + rp.sessionQuerier = sessiontypes.NewQueryClient(clientCtx) + rp.applicationQuerier = apptypes.NewQueryClient(clientCtx) rp.keyring = rp.clientCtx.Keyring rp.ringCache = make(map[string][]ringtypes.Point) rp.ringCacheMutex = &sync.RWMutex{} diff --git a/pkg/relayer/session/sessiontree.go b/pkg/relayer/session/sessiontree.go index 2ae0cfb52..9ce6dbf32 100644 --- a/pkg/relayer/session/sessiontree.go +++ b/pkg/relayer/session/sessiontree.go @@ -68,7 +68,7 @@ func NewSessionTree( storePath := filepath.Join(storesDirectory, sessionHeader.SessionId) // Make sure storePath does not exist when creating a new SessionTree - if _, err := os.Stat(storePath); !os.IsNotExist(err) { + if _, err := os.Stat(storePath); err != nil && !os.IsNotExist(err) { return nil, ErrSessionTreeUndefinedStoresDirectory } From 88e046d74dcdea01b90c78807c08a7f48ab4b263 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Wed, 15 Nov 2023 12:47:21 +0100 Subject: [PATCH 05/10] fix: sessiontree store path check --- pkg/relayer/session/sessiontree.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/relayer/session/sessiontree.go b/pkg/relayer/session/sessiontree.go index 9ce6dbf32..18f9c777a 100644 --- a/pkg/relayer/session/sessiontree.go +++ b/pkg/relayer/session/sessiontree.go @@ -68,8 +68,12 @@ func NewSessionTree( storePath := filepath.Join(storesDirectory, sessionHeader.SessionId) // Make sure storePath does not exist when creating a new SessionTree - if _, err := os.Stat(storePath); err != nil && !os.IsNotExist(err) { - return nil, ErrSessionTreeUndefinedStoresDirectory + if _, err := os.Stat(storePath); err != nil { + if !os.IsNotExist(err) { + return nil, err + } + + return nil, ErrSessionTreeStorePathExists } treeStore, err := smt.NewKVStore(storePath) From 9b836919617b6f0cc760278fda4d0447105c1987 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Wed, 15 Nov 2023 12:52:26 +0100 Subject: [PATCH 06/10] fix: sessiontree store path check --- pkg/relayer/session/sessiontree.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/pkg/relayer/session/sessiontree.go b/pkg/relayer/session/sessiontree.go index 18f9c777a..f9f03eaa8 100644 --- a/pkg/relayer/session/sessiontree.go +++ b/pkg/relayer/session/sessiontree.go @@ -68,12 +68,8 @@ func NewSessionTree( storePath := filepath.Join(storesDirectory, sessionHeader.SessionId) // Make sure storePath does not exist when creating a new SessionTree - if _, err := os.Stat(storePath); err != nil { - if !os.IsNotExist(err) { - return nil, err - } - - return nil, ErrSessionTreeStorePathExists + if _, err := os.Stat(storePath); err != nil && !os.IsNotExist(err) { + return nil, ErrSessionTreeStorePathExists.Wrapf("storePath: %q", storePath) } treeStore, err := smt.NewKVStore(storePath) From c865eb07dcc2847591b45bc0c7b74658b1998a74 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Wed, 15 Nov 2023 12:55:13 +0100 Subject: [PATCH 07/10] chore: review feedback improvements Co-authored-by: Daniel Olshansky --- pkg/relayer/cmd/cmd.go | 6 ++++-- pkg/relayer/proxy/proxy.go | 2 +- pkg/relayer/session/session.go | 2 +- proto/pocket/service/relay.proto | 2 +- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/pkg/relayer/cmd/cmd.go b/pkg/relayer/cmd/cmd.go index 17561d149..e2264be26 100644 --- a/pkg/relayer/cmd/cmd.go +++ b/pkg/relayer/cmd/cmd.go @@ -343,14 +343,16 @@ func supplyRelayerProxy( deps depinject.Config, _ *cobra.Command, ) (depinject.Config, error) { - // TODO_TECHDEBT: this should be populated from some relayerProxy config. + // TODO_BLOCKER:(#137): This MUST be populated via the `relayer.json` config file // TODO_TECHDEBT(#179): this hostname should be updated to match that of the // in-tilt anvil service. - anvilURL, err := url.Parse("http://localhost:8547/") + proxyServiceURL, err := url.Parse("http://localhost:8547/") if err != nil { return nil, err } + // TODO_TECHDEBT(#137, #130): Once the `relayer.json` config file is implemented an a local LLM node + // is supported, this needs to be expanded such that a single relayer can proxy to multiple services at once. proxiedServiceEndpoints := map[string]url.URL{ "anvil": *anvilURL, } diff --git a/pkg/relayer/proxy/proxy.go b/pkg/relayer/proxy/proxy.go index 5e168712f..28f92576c 100644 --- a/pkg/relayer/proxy/proxy.go +++ b/pkg/relayer/proxy/proxy.go @@ -137,7 +137,7 @@ func NewRelayerProxy( return rp, nil } -// Start concurrently starts all advertised relay servers and returns an error +// Start concurrently starts all advertised relay services and returns an error // if any of them errors. // This method IS BLOCKING until all RelayServers are stopped. func (rp *relayerProxy) Start(ctx context.Context) error { diff --git a/pkg/relayer/session/session.go b/pkg/relayer/session/session.go index 9f4441f82..ef89e874c 100644 --- a/pkg/relayer/session/session.go +++ b/pkg/relayer/session/session.go @@ -91,7 +91,7 @@ func NewRelayerSessions( // The session trees are piped through a series of map operations which progress // them through the claim/proof lifecycle, broadcasting transactions to the // network as necessary. -// It IS NOT blocking as map operations run in their own goroutines. +// It IS NOT BLOCKING as map operations run in their own goroutines. func (rs *relayerSessionsManager) Start(ctx context.Context) { // Map eitherMinedRelays to a new observable of an error type which is // notified if an error was encountered while attempting to add the relay to diff --git a/proto/pocket/service/relay.proto b/proto/pocket/service/relay.proto index 968fd8dfe..dc269fad0 100644 --- a/proto/pocket/service/relay.proto +++ b/proto/pocket/service/relay.proto @@ -40,7 +40,7 @@ message JSONRPCRequestPayload { uint32 id = 1; // Identifier established by the Client to create context for the request. string jsonrpc = 2; // Version of JSON-RPC. Must be exactly "2.0". string method = 3; // Method being invoked on the server. - // TODO_RESEARCH: Find out why the params being a map causes errors + // TODO_TECHDEBT(#126): Make params a `oneof` of a list or map per the JSON-RPC specifications // should they be a list of maps? //map params = 4; // Parameters for the method. https://www.jsonrpc.org/specification#parameter_structures repeated string params = 4; // Parameters for the method. https://www.jsonrpc.org/specification#parameter_structures From fe9db5822f2f70f7879f82a000c26c52145ef46d Mon Sep 17 00:00:00 2001 From: Bryan White Date: Wed, 15 Nov 2023 13:20:25 +0100 Subject: [PATCH 08/10] chore: review feedback improvements --- Makefile | 1 + 1 file changed, 1 insertion(+) diff --git a/Makefile b/Makefile index 5db5ec974..7a770d038 100644 --- a/Makefile +++ b/Makefile @@ -364,6 +364,7 @@ supplier_stake: ## Stake tokens for the supplier specified (must specify the APP supplier1_stake: ## Stake supplier1 (also staked in genesis) # TODO_TECHDEBT(#179): once `relayminer` service is added to tilt, this hostname should point to that service. # I.e.: replace `localhost` with `relayminer` (or whatever the service's hostname is). + # TODO_IMPROVE(#180): Make sure genesis-staked actors are available via AccountKeeper SUPPLIER=supplier1 SERVICES="anvil;http://localhost:8545,svc1;http://localhost:8081" make supplier_stake .PHONY: supplier2_stake From 8d7e577b8c8937869e785188bd29c177bde83b92 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Wed, 15 Nov 2023 13:20:37 +0100 Subject: [PATCH 09/10] chore: add long command description --- pkg/relayer/cmd/cmd.go | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/pkg/relayer/cmd/cmd.go b/pkg/relayer/cmd/cmd.go index e2264be26..a4db8c1ba 100644 --- a/pkg/relayer/cmd/cmd.go +++ b/pkg/relayer/cmd/cmd.go @@ -43,7 +43,18 @@ func RelayerCmd() *cobra.Command { Use: "relayminer", Short: "Run a relay miner", // TODO_TECHDEBT: add a longer long description. - Long: `Run a relay miner`, + Long: `Run a relay miner. The relay miner process configures and starts +relay servers for each service the supplier actor identified by --signing-key is +staked for (configured on-chain). Relay requests received by the relay servers +are validated and proxied to their respective service endpoints. The responses +are then signed and sent back to the requesting application. + +For each successfully served relay, the miner will hash and compare its difficulty +against an on-chain threshold. If the difficulty is sufficient, it is applicable +to relay volume and therefore rewards. Such relays are inserted into and persisted +via an SMT KV store. The miner will monitor the current block height and periodically +submit claim and proof messages according to the protocol as sessions become eligable +for such operations.`, RunE: runRelayer, } @@ -351,10 +362,10 @@ func supplyRelayerProxy( return nil, err } - // TODO_TECHDEBT(#137, #130): Once the `relayer.json` config file is implemented an a local LLM node + // TODO_TECHDEBT(#137, #130): Once the `relayer.json` config file is implemented an a local LLM node // is supported, this needs to be expanded such that a single relayer can proxy to multiple services at once. proxiedServiceEndpoints := map[string]url.URL{ - "anvil": *anvilURL, + "anvil": *proxyServiceURL, } relayerProxy, err := proxy.NewRelayerProxy( From f9455135a8f672a58cfefba76b2571d6a05d7f81 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Wed, 15 Nov 2023 13:35:22 +0100 Subject: [PATCH 10/10] fix: supplier client test --- testutil/testclient/testtx/context.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/testutil/testclient/testtx/context.go b/testutil/testclient/testtx/context.go index 35b3dfb71..60412baa7 100644 --- a/testutil/testclient/testtx/context.go +++ b/testutil/testclient/testtx/context.go @@ -17,6 +17,7 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" + "github.com/pokt-network/poktroll/pkg/relayer" "github.com/pokt-network/poktroll/testutil/mockclient" "github.com/pokt-network/poktroll/pkg/client" @@ -264,7 +265,8 @@ func NewAnyTimesTxTxContext( require.NoError(t, err) require.NotEmpty(t, txFactory) - txCtxDeps := depinject.Supply(txFactory, clientCtx) + txClientCtx := relayer.TxClientContext(clientCtx) + txCtxDeps := depinject.Supply(txFactory, txClientCtx) txCtx, err := tx.NewTxContext(txCtxDeps) require.NoError(t, err) txCtxMock := mockclient.NewMockTxContext(ctrl)