diff --git a/Makefile b/Makefile index 3851e2775..34ffc11c7 100644 --- a/Makefile +++ b/Makefile @@ -366,19 +366,20 @@ supplier_stake: ## Stake tokens for the supplier specified (must specify the APP supplier1_stake: ## Stake supplier1 (also staked in genesis) # TODO_TECHDEBT(#179): once `relayminer` service is added to tilt, this hostname should point to that service. # I.e.: replace `localhost` with `relayminer` (or whatever the service's hostname is). - SUPPLIER=supplier1 SERVICES="anvil;http://localhost:8548,svc1;http://localhost:8081" make supplier_stake + # TODO_IMPROVE(#180): Make sure genesis-staked actors are available via AccountKeeper + SUPPLIER=supplier1 SERVICES="anvil;http://localhost:8545,svc1;http://localhost:8081" make supplier_stake .PHONY: supplier2_stake supplier2_stake: ## Stake supplier2 # TODO_TECHDEBT(#179): once `relayminer` service is added to tilt, this hostname should point to that service. # I.e.: replace `localhost` with `relayminer` (or whatever the service's hostname is). - SUPPLIER=supplier2 SERVICES="anvil;http://localhost:8548,svc2;http://localhost:8082" make supplier_stake + SUPPLIER=supplier2 SERVICES="anvil;http://localhost:8545,svc2;http://localhost:8082" make supplier_stake .PHONY: supplier3_stake supplier3_stake: ## Stake supplier3 # TODO_TECHDEBT(#179): once `relayminer` service is added to tilt, this hostname should point to that service. # I.e.: replace `localhost` with `relayminer` (or whatever the service's hostname is). - SUPPLIER=supplier3 SERVICES="anvil;http://localhost:8548,svc3;http://localhost:8083" make supplier_stake + SUPPLIER=supplier3 SERVICES="anvil;http://localhost:8545,svc3;http://localhost:8083" make supplier_stake .PHONY: supplier_unstake supplier_unstake: ## Unstake an supplier (must specify the SUPPLIER env var) diff --git a/pkg/client/block/client.go b/pkg/client/block/client.go index 375171d28..7ded373f2 100644 --- a/pkg/client/block/client.go +++ b/pkg/client/block/client.go @@ -75,6 +75,9 @@ type eventBytesToBlockMapFn = func( ) (client.Block, bool) // NewBlockClient creates a new block client from the given dependencies and cometWebsocketURL. +// +// Required dependencies: +// - client.EventsQueryClient func NewBlockClient( ctx context.Context, deps depinject.Config, diff --git a/pkg/client/events_query/client.go b/pkg/client/events_query/client.go index 88ace493f..34a7d35d2 100644 --- a/pkg/client/events_query/client.go +++ b/pkg/client/events_query/client.go @@ -62,6 +62,11 @@ func (ebc *eventsBytesAndConn) Close() { _ = ebc.conn.Close() } +// NewEventsQueryClient returns a new events query client which is used to +// subscribe to on-chain events matching the given query. +// +// Available options: +// - WithDialer func NewEventsQueryClient(cometWebsocketURL string, opts ...client.EventsQueryClientOption) client.EventsQueryClient { evtClient := &eventsQueryClient{ cometWebsocketURL: cometWebsocketURL, diff --git a/pkg/client/tx/client.go b/pkg/client/tx/client.go index aa2561832..39f5208e0 100644 --- a/pkg/client/tx/client.go +++ b/pkg/client/tx/client.go @@ -105,6 +105,15 @@ type TxEvent struct { // validateConfigAndSetDefaults method. // 5. Subscribes the client to its own transactions. This step might be // reconsidered for relocation to a potential Start() method in the future. +// +// Required dependencies: +// - client.TxContext +// - client.EventsQueryClient +// - client.BlockClient +// +// Available options: +// - WithSigningKeyName +// - WithCommitTimeoutHeightOffset func NewTxClient( ctx context.Context, deps depinject.Config, diff --git a/pkg/client/tx/context.go b/pkg/client/tx/context.go index 040731b46..2bfa0da5f 100644 --- a/pkg/client/tx/context.go +++ b/pkg/client/tx/context.go @@ -12,6 +12,7 @@ import ( authclient "github.com/cosmos/cosmos-sdk/x/auth/client" "github.com/pokt-network/poktroll/pkg/client" + "github.com/pokt-network/poktroll/pkg/relayer" ) var _ client.TxContext = (*cosmosTxContext)(nil) @@ -21,7 +22,7 @@ var _ client.TxContext = (*cosmosTxContext)(nil) type cosmosTxContext struct { // Holds cosmos-sdk client context. // (see: https://pkg.go.dev/github.com/cosmos/cosmos-sdk@v0.47.5/client#Context) - clientCtx cosmosclient.Context + clientCtx relayer.TxClientContext // Holds the cosmos-sdk transaction factory. // (see: https://pkg.go.dev/github.com/cosmos/cosmos-sdk@v0.47.5/client/tx#Factory) txFactory cosmostx.Factory @@ -30,6 +31,10 @@ type cosmosTxContext struct { // NewTxContext initializes a new cosmosTxContext with the given dependencies. // It uses depinject to populate its members and returns a client.TxContext // interface type. +// +// Required dependencies: +// - cosmosclient.Context +// - cosmostx.Factory func NewTxContext(deps depinject.Config) (client.TxContext, error) { txCtx := cosmosTxContext{} @@ -41,8 +46,6 @@ func NewTxContext(deps depinject.Config) (client.TxContext, error) { return nil, err } - txCtx.clientCtx = cosmosclient.Context(txCtx.clientCtx) - return txCtx, nil } @@ -62,7 +65,7 @@ func (txCtx cosmosTxContext) SignTx( ) error { return authclient.SignTx( txCtx.txFactory, - txCtx.clientCtx, + cosmosclient.Context(txCtx.clientCtx), signingKeyName, txBuilder, offline, overwriteSig, @@ -82,8 +85,8 @@ func (txCtx cosmosTxContext) EncodeTx(txBuilder cosmosclient.TxBuilder) ([]byte, // BroadcastTx broadcasts the given transaction to the network, blocking until the check-tx // ABCI operation completes and returns a TxResponse of the transaction status at that point in time. func (txCtx cosmosTxContext) BroadcastTx(txBytes []byte) (*cosmostypes.TxResponse, error) { - return txCtx.clientCtx.BroadcastTxAsync(txBytes) - //return txCtx.clientCtx.BroadcastTxSync(txBytes) + clientCtx := cosmosclient.Context(txCtx.clientCtx) + return clientCtx.BroadcastTxAsync(txBytes) } // QueryTx queries the transaction based on its hash and optionally provides proof diff --git a/pkg/relayer/cmd/cmd.go b/pkg/relayer/cmd/cmd.go index 994bdb03d..a4db8c1ba 100644 --- a/pkg/relayer/cmd/cmd.go +++ b/pkg/relayer/cmd/cmd.go @@ -23,41 +23,54 @@ import ( "github.com/pokt-network/poktroll/pkg/relayer/session" ) +const omittedDefaultFlagValue = "explicitly omitting default" + var ( - flagSigningKeyName string - flagSmtStorePath string - flagSequencerNode string - flagPocketNode string + flagSigningKeyName string + flagSmtStorePath string + flagSequencerNodeUrl string + flagPocketNodeUrl string ) +type supplierFn func( + context.Context, + depinject.Config, + *cobra.Command, +) (depinject.Config, error) + func RelayerCmd() *cobra.Command { cmd := &cobra.Command{ Use: "relayminer", Short: "Run a relay miner", // TODO_TECHDEBT: add a longer long description. - Long: `Run a relay miner`, + Long: `Run a relay miner. The relay miner process configures and starts +relay servers for each service the supplier actor identified by --signing-key is +staked for (configured on-chain). Relay requests received by the relay servers +are validated and proxied to their respective service endpoints. The responses +are then signed and sent back to the requesting application. + +For each successfully served relay, the miner will hash and compare its difficulty +against an on-chain threshold. If the difficulty is sufficient, it is applicable +to relay volume and therefore rewards. Such relays are inserted into and persisted +via an SMT KV store. The miner will monitor the current block height and periodically +submit claim and proof messages according to the protocol as sessions become eligable +for such operations.`, RunE: runRelayer, } cmd.Flags().String(cosmosflags.FlagKeyringBackend, "", "Select keyring's backend (os|file|kwallet|pass|test)") // TODO_TECHDEBT: integrate these flags with the client context (i.e. cosmosflags, config, viper, etc.) - // This is simpler to do with server-side configs (see rootCmd#PersistentPreRunE). - // Will require more effort than currently justifiable. + // This is simpler to do with server-side configs (see rootCmd#PersistentPreRunE) and requires more effort than currently justifiable. cmd.Flags().StringVar(&flagSigningKeyName, "signing-key", "", "Name of the key to sign transactions") - cmd.Flags().StringVar(&flagSmtStorePath, "smt-store", "smt", "Path to the SMT KV store") + // TODO_TECHDEBT(#137): This, alongside other flags, should be part of a config file suppliers provide. + cmd.Flags().StringVar(&flagSmtStorePath, "smt-store", "smt", "Path to where the data backing SMT KV store exists on disk") // Communication flags // TODO_TECHDEBT: We're using `explicitly omitting default` so the relayer crashes if these aren't specified. // Figure out what good defaults should be post alpha. - cmd.Flags().StringVar(&flagSequencerNode, "sequencer-node", "explicitly omitting default", ": to sequencer node to submit txs") - cmd.Flags().StringVar(&flagPocketNode, "pocket-node", "explicitly omitting default", ": to full pocket node for reading data and listening for on-chain events") - cmd.Flags().String(cosmosflags.FlagNode, "explicitly omitting default", "registering the default cosmos node flag; needed to initialize the cosmostx and query contexts correctly") - - // Set --node flag to the --sequencer-node for the client context - err := cmd.Flags().Set(cosmosflags.FlagNode, fmt.Sprintf("tcp://%s", flagSequencerNode)) - if err != nil { - panic(err) - } + cmd.Flags().StringVar(&flagSequencerNodeUrl, "sequencer-node", "explicitly omitting default", "tcp://: to sequencer node to submit txs") + cmd.Flags().StringVar(&flagPocketNodeUrl, "pocket-node", omittedDefaultFlagValue, "tcp://: to full pocket node for reading data and listening for on-chain events") + cmd.Flags().String(cosmosflags.FlagNode, omittedDefaultFlagValue, "registering the default cosmos node flag; needed to initialize the cosmostx and query contexts correctly") return cmd } @@ -93,74 +106,103 @@ func runRelayer(cmd *cobra.Command, _ []string) error { return nil } -// setupRelayerDependencies sets up all the dependencies the relay miner needs to run: +// setupRelayerDependencies sets up all the dependencies the relay miner needs +// to run by building the dependency tree from the leaves up, incrementally +// supplying each component to an accumulating depinject.Config: // Miner, EventsQueryClient, BlockClient, cosmosclient.Context, TxFactory, TxContext, // TxClient, SupplierClient, RelayerProxy, RelayerSessionsManager. func setupRelayerDependencies( ctx context.Context, cmd *cobra.Command, ) (deps depinject.Config, err error) { - // Initizlize deps to with empty depinject config. - deps, err = supplyMiner(depinject.Configs()) - if err != nil { - return nil, err - } - - rpcQueryURL, err := getPocketNodeWebsocketURL(cmd) + pocketNodeWebsocketUrl, err := getPocketNodeWebsocketUrl() if err != nil { return nil, err } - // Has no dependencies. - deps, err = supplyEventsQueryClient(deps, rpcQueryURL) - if err != nil { - return nil, err + supplierFuncs := []supplierFn{ + newSupplyEventsQueryClientFn(pocketNodeWebsocketUrl), // leaf + newSupplyBlockClientFn(pocketNodeWebsocketUrl), + supplyMiner, // leaf + supplyQueryClientContext, // leaf + supplyTxClientContext, // leaf + supplyTxFactory, + supplyTxContext, + supplyTxClient, + supplySupplierClient, + supplyRelayerProxy, + supplyRelayerSessionsManager, } - // Depends on EventsQueryClient. - deps, err = supplyBlockClient(ctx, deps, rpcQueryURL) - if err != nil { - return nil, err + // Initialize deps to with empty depinject config. + deps = depinject.Configs() + for _, supplyFn := range supplierFuncs { + deps, err = supplyFn(ctx, deps, cmd) + if err != nil { + return nil, err + } } - // Has no dependencies - deps, err = supplyClientContext(deps, cmd) + return deps, nil +} - // Depends on clientCtx. - deps, err = supplyTxFactory(deps, cmd) - if err != nil { - return nil, err +// getPocketNodeWebsocketUrl returns the websocket URL of the Pocket Node to +// connect to for subscribing to on-chain events. +func getPocketNodeWebsocketUrl() (string, error) { + if flagPocketNodeUrl == omittedDefaultFlagValue { + return "", fmt.Errorf("--pocket-node flag is required") } - // Depends on clientCtx, txFactory, EventsQueryClient, & BlockClient. - deps, err = supplyTxClient(ctx, deps) + pocketNodeURL, err := url.Parse(flagPocketNodeUrl) if err != nil { - return nil, err + return "", err } - // Depends on txClient & EventsQueryClient. - deps, err = supplySupplierClient(deps) - if err != nil { - return nil, err - } + return fmt.Sprintf("ws://%s/websocket", pocketNodeURL.Host), nil +} - // Depends on clientCtx & BlockClient. - deps, err = supplyRelayerProxy(deps, cmd) - if err != nil { - return nil, err +// newSupplyEventsQueryClientFn constructs an EventsQueryClient instance and returns +// a new depinject.Config which is supplied with the given deps and the new +// EventsQueryClient. +func newSupplyEventsQueryClientFn( + pocketNodeWebsocketUrl string, +) supplierFn { + return func( + _ context.Context, + deps depinject.Config, + _ *cobra.Command, + ) (depinject.Config, error) { + eventsQueryClient := eventsquery.NewEventsQueryClient(pocketNodeWebsocketUrl) + + return depinject.Configs(deps, depinject.Supply(eventsQueryClient)), nil } +} - // Depends on BlockClient & SupplierClient. - deps, err = supplyRelayerSessionsManager(ctx, deps) - if err != nil { - return nil, err +// newSupplyBlockClientFn returns a function with constructs a BlockClient instance +// with the given nodeURL and returns a new +// depinject.Config which is supplied with the given deps and the new +// BlockClient. +func newSupplyBlockClientFn(pocketNodeWebsocketUrl string) supplierFn { + return func( + ctx context.Context, + deps depinject.Config, + _ *cobra.Command, + ) (depinject.Config, error) { + blockClient, err := block.NewBlockClient(ctx, deps, pocketNodeWebsocketUrl) + if err != nil { + return nil, err + } + + return depinject.Configs(deps, depinject.Supply(blockClient)), nil } - - return deps, nil } +// supplyMiner constructs a Miner instance and returns a new depinject.Config +// which is supplied with the given deps and the new Miner. func supplyMiner( + _ context.Context, deps depinject.Config, + _ *cobra.Command, ) (depinject.Config, error) { mnr, err := miner.NewMiner() if err != nil { @@ -170,63 +212,79 @@ func supplyMiner( return depinject.Configs(deps, depinject.Supply(mnr)), nil } -func supplyEventsQueryClient(deps depinject.Config, pocketNodeWebsocketURL string) (depinject.Config, error) { - eventsQueryClient := eventsquery.NewEventsQueryClient(pocketNodeWebsocketURL) - - return depinject.Configs(deps, depinject.Supply(eventsQueryClient)), nil -} - -func getPocketNodeWebsocketURL(cmd *cobra.Command) (string, error) { - pocketNodeURLStr, err := cmd.Flags().GetString(cosmosflags.FlagNode) +func supplyQueryClientContext( + _ context.Context, + deps depinject.Config, + cmd *cobra.Command, +) (depinject.Config, error) { + // Set --node flag to the --pocket-node for the client context + // This flag is read by cosmosclient.GetClientQueryContext. + err := cmd.Flags().Set(cosmosflags.FlagNode, flagPocketNodeUrl) if err != nil { - return "", err + return nil, err } - pocketNodeURL, err := url.Parse(pocketNodeURLStr) + // NB: Currently, the implementations of GetClientTxContext() and + // GetClientQueryContext() are identical, allowing for their interchangeable + // use in both querying and transaction operations. However, in order to support + // independent configuration of client contexts for distinct querying and + // transacting purposes. E.g.: transactions are dispatched to the sequencer + // while queries are handled by a trusted full-node. + queryClientCtx, err := cosmosclient.GetClientQueryContext(cmd) if err != nil { - return "", err + return nil, err } - - return fmt.Sprintf("ws://%s/websocket", pocketNodeURL.Host), nil + deps = depinject.Configs(deps, depinject.Supply( + relayer.QueryClientContext(queryClientCtx), + )) + return deps, nil } -func supplyBlockClient( - ctx context.Context, +// supplyTxClientContext constructs a cosmosclient.Context instance and returns a +// new depinject.Config which is supplied with the given deps and the new +// cosmosclient.Context. +func supplyTxClientContext( + _ context.Context, deps depinject.Config, - nodeURL string, + cmd *cobra.Command, ) (depinject.Config, error) { - blockClient, err := block.NewBlockClient(ctx, deps, nodeURL) + // Set --node flag to the --sequencer-node for this client context. + // This flag is read by cosmosclient.GetClientTxContext. + err := cmd.Flags().Set(cosmosflags.FlagNode, flagSequencerNodeUrl) if err != nil { return nil, err } - return depinject.Configs(deps, depinject.Supply(blockClient)), nil -} - -func supplyClientContext( - deps depinject.Config, - cmd *cobra.Command, -) (depinject.Config, error) { - // NB: The implementation of #GetClientTxContext() is identical to that of - // #GetClientQueryContext(); either is sufficient for usage in both querying - // and transaction building and broadcasting. - clientCtx, err := cosmosclient.GetClientTxContext(cmd) + // NB: Currently, the implementations of GetClientTxContext() and + // GetClientQueryContext() are identical, allowing for their interchangeable + // use in both querying and transaction operations. However, in order to support + // independent configuration of client contexts for distinct querying and + // transacting purposes. E.g.: transactions are dispatched to the sequencer + // while queries are handled by a trusted full-node. + txClientCtx, err := cosmosclient.GetClientTxContext(cmd) if err != nil { return nil, err } - deps = depinject.Configs(deps, depinject.Supply(clientCtx)) + deps = depinject.Configs(deps, depinject.Supply( + relayer.TxClientContext(txClientCtx), + )) return deps, nil } +// supplyTxFactory constructs a cosmostx.Factory instance and returns a new +// depinject.Config which is supplied with the given deps and the new +// cosmostx.Factory. func supplyTxFactory( + _ context.Context, deps depinject.Config, cmd *cobra.Command, ) (depinject.Config, error) { - var clientCtx cosmosclient.Context - if err := depinject.Inject(deps, &clientCtx); err != nil { + var txClientCtx relayer.TxClientContext + if err := depinject.Inject(deps, &txClientCtx); err != nil { return nil, err } + clientCtx := cosmosclient.Context(txClientCtx) clientFactory, err := cosmostx.NewFactoryCLI(clientCtx, cmd.Flags()) if err != nil { return nil, err @@ -235,16 +293,26 @@ func supplyTxFactory( return depinject.Configs(deps, depinject.Supply(clientFactory)), nil } -func supplyTxClient( - ctx context.Context, +func supplyTxContext( + _ context.Context, deps depinject.Config, + _ *cobra.Command, ) (depinject.Config, error) { txContext, err := tx.NewTxContext(deps) if err != nil { return nil, err } - deps = depinject.Configs(deps, depinject.Supply(txContext)) + return depinject.Configs(deps, depinject.Supply(txContext)), nil +} + +// supplyTxClient constructs a TxClient instance and returns a new +// depinject.Config which is supplied with the given deps and the new TxClient. +func supplyTxClient( + ctx context.Context, + deps depinject.Config, + _ *cobra.Command, +) (depinject.Config, error) { txClient, err := tx.NewTxClient( ctx, deps, @@ -259,7 +327,14 @@ func supplyTxClient( return depinject.Configs(deps, depinject.Supply(txClient)), nil } -func supplySupplierClient(deps depinject.Config) (depinject.Config, error) { +// supplySupplierClient constructs a SupplierClient instance and returns a new +// depinject.Config which is supplied with the given deps and the new +// SupplierClient. +func supplySupplierClient( + _ context.Context, + deps depinject.Config, + _ *cobra.Command, +) (depinject.Config, error) { supplierClient, err := supplier.NewSupplierClient( deps, supplier.WithSigningKeyName(flagSigningKeyName), @@ -271,20 +346,26 @@ func supplySupplierClient(deps depinject.Config) (depinject.Config, error) { return depinject.Configs(deps, depinject.Supply(supplierClient)), nil } +// supplyRelayerProxy constructs a RelayerProxy instance and returns a new +// depinject.Config which is supplied with the given deps and the new +// RelayerProxy. func supplyRelayerProxy( + _ context.Context, deps depinject.Config, - cmd *cobra.Command, + _ *cobra.Command, ) (depinject.Config, error) { - // TODO_TECHDEBT: this should be populated from some relayerProxy config. + // TODO_BLOCKER:(#137): This MUST be populated via the `relayer.json` config file // TODO_TECHDEBT(#179): this hostname should be updated to match that of the // in-tilt anvil service. - anvilURL, err := url.Parse("http://localhost:8547/") + proxyServiceURL, err := url.Parse("http://localhost:8547/") if err != nil { return nil, err } + // TODO_TECHDEBT(#137, #130): Once the `relayer.json` config file is implemented an a local LLM node + // is supported, this needs to be expanded such that a single relayer can proxy to multiple services at once. proxiedServiceEndpoints := map[string]url.URL{ - "anvil": *anvilURL, + "anvil": *proxyServiceURL, } relayerProxy, err := proxy.NewRelayerProxy( @@ -299,9 +380,13 @@ func supplyRelayerProxy( return depinject.Configs(deps, depinject.Supply(relayerProxy)), nil } +// supplyRelayerSessionsManager constructs a RelayerSessionsManager instance +// and returns a new depinject.Config which is supplied with the given deps and +// the new RelayerSessionsManager. func supplyRelayerSessionsManager( ctx context.Context, deps depinject.Config, + _ *cobra.Command, ) (depinject.Config, error) { relayerSessionsManager, err := session.NewRelayerSessions( ctx, deps, diff --git a/pkg/relayer/interface.go b/pkg/relayer/interface.go index 539361f2d..d58d8149a 100644 --- a/pkg/relayer/interface.go +++ b/pkg/relayer/interface.go @@ -3,6 +3,7 @@ package relayer import ( "context" + "github.com/cosmos/cosmos-sdk/client" "github.com/pokt-network/smt" "github.com/pokt-network/poktroll/pkg/observable" @@ -11,6 +12,18 @@ import ( sharedtypes "github.com/pokt-network/poktroll/x/shared/types" ) +// TxClientContext is used to distinguish a cosmosclient.Context intended for use +// in transactions from others. +// This type is intentionally not an alias in order to make this distinction clear +// to the dependency injector +type TxClientContext client.Context + +// QueryClientContext is used to distinguish a cosmosclient.Context intended for use +// in queries from others. +// This type is intentionally not an alias in order to make this distinction clear +// to the dependency injector +type QueryClientContext client.Context + // Miner is responsible for observing servedRelayObs, hashing and checking the // difficulty of each, finally publishing those with sufficient difficulty to // minedRelayObs as they are applicable for relay volume. diff --git a/pkg/relayer/proxy/proxy.go b/pkg/relayer/proxy/proxy.go index 242ca0230..28f92576c 100644 --- a/pkg/relayer/proxy/proxy.go +++ b/pkg/relayer/proxy/proxy.go @@ -7,7 +7,7 @@ import ( "cosmossdk.io/depinject" ringtypes "github.com/athanorlabs/go-dleq/types" - sdkclient "github.com/cosmos/cosmos-sdk/client" + cosmosclient "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/crypto/keyring" accounttypes "github.com/cosmos/cosmos-sdk/x/auth/types" "golang.org/x/sync/errgroup" @@ -83,7 +83,7 @@ type relayerProxy struct { ringCacheMutex *sync.RWMutex // clientCtx is the Cosmos' client context used to build the needed query clients and unmarshal their replies. - clientCtx sdkclient.Context + clientCtx relayer.QueryClientContext // supplierAddress is the address of the supplier that the relayer proxy is running for. supplierAddress string @@ -91,6 +91,14 @@ type relayerProxy struct { // NewRelayerProxy creates a new relayer proxy with the given dependencies or returns // an error if the dependencies fail to resolve or the options are invalid. +// +// Required dependencies: +// - cosmosclient.Context +// - client.BlockClient +// +// Available options: +// - WithSigningKeyName +// - WithProxiedServicesEndpoints func NewRelayerProxy( deps depinject.Config, opts ...relayer.RelayerProxyOption, @@ -105,16 +113,15 @@ func NewRelayerProxy( return nil, err } - rp.clientCtx = sdkclient.Context(rp.clientCtx) - + clientCtx := cosmosclient.Context(rp.clientCtx) servedRelays, servedRelaysProducer := channel.NewObservable[*types.Relay]() rp.servedRelays = servedRelays rp.servedRelaysPublishCh = servedRelaysProducer - rp.accountsQuerier = accounttypes.NewQueryClient(rp.clientCtx) - rp.supplierQuerier = suppliertypes.NewQueryClient(rp.clientCtx) - rp.sessionQuerier = sessiontypes.NewQueryClient(rp.clientCtx) - rp.applicationQuerier = apptypes.NewQueryClient(rp.clientCtx) + rp.accountsQuerier = accounttypes.NewQueryClient(clientCtx) + rp.supplierQuerier = suppliertypes.NewQueryClient(clientCtx) + rp.sessionQuerier = sessiontypes.NewQueryClient(clientCtx) + rp.applicationQuerier = apptypes.NewQueryClient(clientCtx) rp.keyring = rp.clientCtx.Keyring rp.ringCache = make(map[string][]ringtypes.Point) rp.ringCacheMutex = &sync.RWMutex{} @@ -130,7 +137,7 @@ func NewRelayerProxy( return rp, nil } -// Start concurrently starts all advertised relay servers and returns an error +// Start concurrently starts all advertised relay services and returns an error // if any of them errors. // This method IS BLOCKING until all RelayServers are stopped. func (rp *relayerProxy) Start(ctx context.Context) error { diff --git a/pkg/relayer/session/session.go b/pkg/relayer/session/session.go index 0a0f66aff..ef89e874c 100644 --- a/pkg/relayer/session/session.go +++ b/pkg/relayer/session/session.go @@ -45,6 +45,13 @@ type relayerSessionsManager struct { } // NewRelayerSessions creates a new relayerSessions. +// +// Required dependencies: +// - client.BlockClient +// - client.SupplierClient +// +// Available options: +// - WithStoresDirectory func NewRelayerSessions( ctx context.Context, deps depinject.Config, @@ -84,7 +91,7 @@ func NewRelayerSessions( // The session trees are piped through a series of map operations which progress // them through the claim/proof lifecycle, broadcasting transactions to the // network as necessary. -// It IS NOT blocking as map operations run in their own goroutines. +// It IS NOT BLOCKING as map operations run in their own goroutines. func (rs *relayerSessionsManager) Start(ctx context.Context) { // Map eitherMinedRelays to a new observable of an error type which is // notified if an error was encountered while attempting to add the relay to diff --git a/pkg/relayer/session/sessiontree.go b/pkg/relayer/session/sessiontree.go index 2ae0cfb52..f9f03eaa8 100644 --- a/pkg/relayer/session/sessiontree.go +++ b/pkg/relayer/session/sessiontree.go @@ -68,8 +68,8 @@ func NewSessionTree( storePath := filepath.Join(storesDirectory, sessionHeader.SessionId) // Make sure storePath does not exist when creating a new SessionTree - if _, err := os.Stat(storePath); !os.IsNotExist(err) { - return nil, ErrSessionTreeUndefinedStoresDirectory + if _, err := os.Stat(storePath); err != nil && !os.IsNotExist(err) { + return nil, ErrSessionTreeStorePathExists.Wrapf("storePath: %q", storePath) } treeStore, err := smt.NewKVStore(storePath) diff --git a/proto/pocket/service/relay.proto b/proto/pocket/service/relay.proto index 968fd8dfe..dc269fad0 100644 --- a/proto/pocket/service/relay.proto +++ b/proto/pocket/service/relay.proto @@ -40,7 +40,7 @@ message JSONRPCRequestPayload { uint32 id = 1; // Identifier established by the Client to create context for the request. string jsonrpc = 2; // Version of JSON-RPC. Must be exactly "2.0". string method = 3; // Method being invoked on the server. - // TODO_RESEARCH: Find out why the params being a map causes errors + // TODO_TECHDEBT(#126): Make params a `oneof` of a list or map per the JSON-RPC specifications // should they be a list of maps? //map params = 4; // Parameters for the method. https://www.jsonrpc.org/specification#parameter_structures repeated string params = 4; // Parameters for the method. https://www.jsonrpc.org/specification#parameter_structures diff --git a/testutil/testclient/testtx/context.go b/testutil/testclient/testtx/context.go index 35b3dfb71..60412baa7 100644 --- a/testutil/testclient/testtx/context.go +++ b/testutil/testclient/testtx/context.go @@ -17,6 +17,7 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" + "github.com/pokt-network/poktroll/pkg/relayer" "github.com/pokt-network/poktroll/testutil/mockclient" "github.com/pokt-network/poktroll/pkg/client" @@ -264,7 +265,8 @@ func NewAnyTimesTxTxContext( require.NoError(t, err) require.NotEmpty(t, txFactory) - txCtxDeps := depinject.Supply(txFactory, clientCtx) + txClientCtx := relayer.TxClientContext(clientCtx) + txCtxDeps := depinject.Supply(txFactory, txClientCtx) txCtx, err := tx.NewTxContext(txCtxDeps) require.NoError(t, err) txCtxMock := mockclient.NewMockTxContext(ctrl)