From 81a808a5e9a8fd9ab14bdd73d986fa91375b844d Mon Sep 17 00:00:00 2001 From: Redouane Lakrache Date: Sat, 11 Nov 2023 00:18:21 +0100 Subject: [PATCH] [AppGate] Implement the MVP AppGateServer (#108) Co-authored-by: h5law <53987565+h5law@users.noreply.github.com> --- cmd/pocketd/cmd/root.go | 6 + go.mod | 2 + go.sum | 4 + pkg/appgateserver/cmd/cmd.go | 147 ++++++++++ pkg/appgateserver/endpoint_selector.go | 45 +++ pkg/appgateserver/errors.go | 13 + pkg/appgateserver/jsonrpc.go | 132 +++++++++ pkg/appgateserver/options.go | 19 ++ pkg/appgateserver/relay_verifier.go | 73 +++++ pkg/appgateserver/rings.go | 146 ++++++++++ pkg/appgateserver/server.go | 284 +++++++++++++++++++ pkg/appgateserver/session.go | 47 +++ pkg/client/block/client.go | 7 +- pkg/client/gomock_reflect_3526400147/prog.go | 66 ----- pkg/relayer/proxy/errors.go | 2 + pkg/relayer/proxy/proxy.go | 13 + pkg/relayer/proxy/relay_signer.go | 23 +- pkg/relayer/proxy/relay_verifier.go | 64 +++-- pkg/relayer/proxy/rings.go | 119 ++++++++ pkg/signer/interface.go | 9 + pkg/signer/ring_signer.go | 38 +++ pkg/signer/simple_signer.go | 23 ++ x/service/types/relay.go | 23 ++ 23 files changed, 1214 insertions(+), 91 deletions(-) create mode 100644 pkg/appgateserver/cmd/cmd.go create mode 100644 pkg/appgateserver/endpoint_selector.go create mode 100644 pkg/appgateserver/errors.go create mode 100644 pkg/appgateserver/jsonrpc.go create mode 100644 pkg/appgateserver/options.go create mode 100644 pkg/appgateserver/relay_verifier.go create mode 100644 pkg/appgateserver/rings.go create mode 100644 pkg/appgateserver/server.go create mode 100644 pkg/appgateserver/session.go delete mode 100644 pkg/client/gomock_reflect_3526400147/prog.go create mode 100644 pkg/relayer/proxy/rings.go create mode 100644 pkg/signer/interface.go create mode 100644 pkg/signer/ring_signer.go create mode 100644 pkg/signer/simple_signer.go create mode 100644 x/service/types/relay.go diff --git a/cmd/pocketd/cmd/root.go b/cmd/pocketd/cmd/root.go index 5b1fb3276..cf58f2447 100644 --- a/cmd/pocketd/cmd/root.go +++ b/cmd/pocketd/cmd/root.go @@ -43,6 +43,7 @@ import ( "github.com/pokt-network/poktroll/app" appparams "github.com/pokt-network/poktroll/app/params" + appgateservercmd "github.com/pokt-network/poktroll/pkg/appgateserver/cmd" ) // NewRootCmd creates a new root command for a Cosmos SDK application @@ -148,6 +149,11 @@ func initRootCmd( txCommand(), keys.Commands(app.DefaultNodeHome), ) + + // add the appgate server command + rootCmd.AddCommand( + appgateservercmd.AppGateServerCmd(), + ) } // queryCommand returns the sub-command to send queries to the app diff --git a/go.mod b/go.mod index 51aa32938..adbedcf85 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( cosmossdk.io/depinject v1.0.0-alpha.3 cosmossdk.io/errors v1.0.0-beta.7 cosmossdk.io/math v1.0.1 + github.com/athanorlabs/go-dleq v0.1.0 github.com/cometbft/cometbft v0.37.2 github.com/cometbft/cometbft-db v0.8.0 github.com/cosmos/cosmos-proto v1.0.0-beta.2 @@ -20,6 +21,7 @@ require ( github.com/gorilla/websocket v1.5.0 github.com/grpc-ecosystem/grpc-gateway v1.16.0 github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.2 + github.com/noot/ring-go v0.0.0-20231019173746-6c4b33bcf03f github.com/pokt-network/smt v0.7.1 github.com/regen-network/gocuke v0.6.2 github.com/spf13/cast v1.5.1 diff --git a/go.sum b/go.sum index b554afc3b..24322a896 100644 --- a/go.sum +++ b/go.sum @@ -300,6 +300,8 @@ github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgI github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6lCRdSC2Tm3DSWRPvIPr6xNKyeHdqDQSQT+A= github.com/ashanbrown/forbidigo v1.3.0/go.mod h1:vVW7PEdqEFqapJe95xHkTfB1+XvZXBFg8t0sG2FIxmI= github.com/ashanbrown/makezero v1.1.1/go.mod h1:i1bJLCRSCHOcOa9Y6MyF2FTfMZMFdHvxKHxgO5Z1axI= +github.com/athanorlabs/go-dleq v0.1.0 h1:0/llWZG8fz2uintMBKOiBC502zCsDA8nt8vxI73W9Qc= +github.com/athanorlabs/go-dleq v0.1.0/go.mod h1:DWry6jSD7A13MKmeZA0AX3/xBeQCXDoygX99VPwL3yU= github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU= github.com/aws/aws-sdk-go v1.23.20/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go v1.25.37/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= @@ -1481,6 +1483,8 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLA github.com/nishanths/exhaustive v0.8.1/go.mod h1:qj+zJJUgJ76tR92+25+03oYUhzF4R7/2Wk7fGTfCHmg= github.com/nishanths/predeclared v0.0.0-20190419143655-18a43bb90ffc/go.mod h1:62PewwiQTlm/7Rj+cxVYqZvDIUc+JjZq6GHAC1fsObQ= github.com/nishanths/predeclared v0.2.2/go.mod h1:RROzoN6TnGQupbC+lqggsOlcgysk3LMK/HI84Mp280c= +github.com/noot/ring-go v0.0.0-20231019173746-6c4b33bcf03f h1:1+NP/H13eFAqBYrGpRkbJUWVWIO2Zr2eP7a/q0UtZVQ= +github.com/noot/ring-go v0.0.0-20231019173746-6c4b33bcf03f/go.mod h1:0t3gzoSfW2bkTce1E/Jis3MQpjiKGhAgqieFK+nkQsI= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= diff --git a/pkg/appgateserver/cmd/cmd.go b/pkg/appgateserver/cmd/cmd.go new file mode 100644 index 000000000..21052e270 --- /dev/null +++ b/pkg/appgateserver/cmd/cmd.go @@ -0,0 +1,147 @@ +package cmd + +import ( + "context" + "errors" + "fmt" + "log" + "net/http" + "net/url" + "os" + "os/signal" + + "cosmossdk.io/depinject" + cosmosclient "github.com/cosmos/cosmos-sdk/client" + "github.com/cosmos/cosmos-sdk/client/flags" + "github.com/spf13/cobra" + + "github.com/pokt-network/poktroll/pkg/appgateserver" + blockclient "github.com/pokt-network/poktroll/pkg/client/block" + eventsquery "github.com/pokt-network/poktroll/pkg/client/events_query" +) + +var ( + flagSigningKey string + flagSelfSigning bool + flagListeningEndpoint string + flagCometWebsocketUrl string +) + +func AppGateServerCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "appgate-server", + Short: "Starts the AppGate server", + Long: `Starts the AppGate server that listens for incoming relay requests and handles +the necessary on-chain interactions (sessions, suppliers, etc) to receive the +respective relay response. + +-- App Mode (Flag)- - +If the server is started with a defined '--self-signing' flag, it will behave +as an Application. Any incoming requests will be signed by using the private +key and ring associated with the '--signing-key' flag. + +-- Gateway Mode (Flag)-- +If the '--self-signing' flag is not provided, the server will behave as a Gateway. +It will sign relays on behalf of any Application sending it relays, provided +that the address associated with '--signing-key' has been delegated to. This is +necessary for the application<->gateway ring signature to function. + +-- App Mode (HTTP) -- +If an application doesn't provide the '--self-signing' flag, it can still send +relays to the AppGate server and function as an Application, provided that: +1. Each request contains the '?senderAddress=[address]' query parameter +2. The key associated with the '--signing-key' flag belongs to the address + provided in the request, otherwise the ring signature will not be valid.`, + Args: cobra.NoArgs, + RunE: runAppGateServer, + } + + cmd.Flags().StringVar(&flagSigningKey, "signing-key", "", "The name of the key that will be used to sign relays") + cmd.Flags().StringVar(&flagListeningEndpoint, "listening-endpoint", "http://localhost:42069", "The host and port that the appgate server will listen on") + cmd.Flags().StringVar(&flagCometWebsocketUrl, "comet-websocket-url", "ws://localhost:36657/websocket", "The URL of the comet websocket endpoint to communicate with the pocket blockchain") + cmd.Flags().BoolVar(&flagSelfSigning, "self-signing", false, "Whether the server should sign all incoming requests with its own ring (for applications)") + + cmd.Flags().String(flags.FlagKeyringBackend, "", "Select keyring's backend (os|file|kwallet|pass|test)") + cmd.Flags().String(flags.FlagNode, "tcp://localhost:36657", "The URL of the comet tcp endpoint to communicate with the pocket blockchain") + + return cmd +} + +func runAppGateServer(cmd *cobra.Command, _ []string) error { + // Create a context that is canceled when the command is interrupted + ctx, cancelCtx := context.WithCancel(cmd.Context()) + defer cancelCtx() + + // Handle interrupts in a goroutine. + go func() { + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, os.Interrupt) + + // Block until we receive an interrupt or kill signal (OS-agnostic) + <-sigCh + log.Println("INFO: Interrupt signal received, shutting down...") + + // Signal goroutines to stop + cancelCtx() + }() + + // Parse the listening endpoint. + listeningUrl, err := url.Parse(flagListeningEndpoint) + if err != nil { + return fmt.Errorf("failed to parse listening endpoint: %w", err) + } + + // Setup the AppGate server dependencies. + appGateServerDeps, err := setupAppGateServerDependencies(cmd, ctx, flagCometWebsocketUrl) + if err != nil { + return fmt.Errorf("failed to setup AppGate server dependencies: %w", err) + } + + log.Println("INFO: Creating AppGate server...") + + // Create the AppGate server. + appGateServer, err := appgateserver.NewAppGateServer( + appGateServerDeps, + appgateserver.WithSigningInformation(&appgateserver.SigningInformation{ + // provide the name of the key to use for signing all incoming requests + SigningKeyName: flagSigningKey, + // provide whether the appgate server should sign all incoming requests + // with its own ring (for applications) or not (for gateways) + SelfSigning: flagSelfSigning, + }), + appgateserver.WithListeningUrl(listeningUrl), + ) + if err != nil { + return fmt.Errorf("failed to create AppGate server: %w", err) + } + + log.Printf("INFO: Starting AppGate server, listening on %s...", listeningUrl.String()) + + // Start the AppGate server. + if err := appGateServer.Start(ctx); err != nil && !errors.Is(err, http.ErrServerClosed) { + return fmt.Errorf("failed to start app gate server: %w", err) + } else if errors.Is(err, http.ErrServerClosed) { + log.Println("INFO: AppGate server stopped") + } + + return nil +} + +func setupAppGateServerDependencies(cmd *cobra.Command, ctx context.Context, cometWebsocketUrl string) (depinject.Config, error) { + // Retrieve the client context for the chain interactions. + clientCtx := cosmosclient.GetClientContextFromCmd(cmd) + + // Create the events client. + eventsQueryClient := eventsquery.NewEventsQueryClient(flagCometWebsocketUrl) + + // Create the block client. + log.Printf("INFO: Creating block client, using comet websocket URL: %s...", flagCometWebsocketUrl) + deps := depinject.Supply(eventsQueryClient) + blockClient, err := blockclient.NewBlockClient(ctx, deps, flagCometWebsocketUrl) + if err != nil { + return nil, fmt.Errorf("failed to create block client: %w", err) + } + + // Return the dependencie config. + return depinject.Supply(clientCtx, blockClient), nil +} diff --git a/pkg/appgateserver/endpoint_selector.go b/pkg/appgateserver/endpoint_selector.go new file mode 100644 index 000000000..380c5dad5 --- /dev/null +++ b/pkg/appgateserver/endpoint_selector.go @@ -0,0 +1,45 @@ +package appgateserver + +import ( + "context" + "log" + "net/url" + + sessiontypes "github.com/pokt-network/poktroll/x/session/types" + sharedtypes "github.com/pokt-network/poktroll/x/shared/types" +) + +// TODO_IMPROVE: This implements a naive greedy approach that defaults to the +// first available supplier. Future optimizations (e.g. Quality-of-Service) can be introduced here. +// TODO(@h5law): Look into different endpoint selection depending on their suitability. +// getRelayerUrl gets the URL of the relayer for the given service. +func (app *appGateServer) getRelayerUrl( + ctx context.Context, + serviceId string, + rpcType sharedtypes.RPCType, + session *sessiontypes.Session, +) (supplierUrl *url.URL, supplierAddress string, err error) { + for _, supplier := range session.Suppliers { + for _, service := range supplier.Services { + // Skip services that don't match the requested serviceId. + if service.Service.Id != serviceId { + continue + } + + for _, endpoint := range service.Endpoints { + // Return the first endpoint url that matches the JSON RPC RpcType. + if endpoint.RpcType == rpcType { + supplierUrl, err := url.Parse(endpoint.Url) + if err != nil { + log.Printf("error parsing url: %s", err) + continue + } + return supplierUrl, supplier.Address, nil + } + } + } + } + + // Return an error if no relayer endpoints were found. + return nil, "", ErrAppGateNoRelayEndpoints +} diff --git a/pkg/appgateserver/errors.go b/pkg/appgateserver/errors.go new file mode 100644 index 000000000..2c8f281bd --- /dev/null +++ b/pkg/appgateserver/errors.go @@ -0,0 +1,13 @@ +package appgateserver + +import sdkerrors "cosmossdk.io/errors" + +var ( + codespace = "appgateserver" + ErrAppGateInvalidRelayResponseSignature = sdkerrors.Register(codespace, 1, "invalid relay response signature") + ErrAppGateNoRelayEndpoints = sdkerrors.Register(codespace, 2, "no relay endpoints found") + ErrAppGateInvalidRequestURL = sdkerrors.Register(codespace, 3, "invalid request URL") + ErrAppGateMissingAppAddress = sdkerrors.Register(codespace, 4, "missing application address") + ErrAppGateMissingSigningInformation = sdkerrors.Register(codespace, 5, "missing app client signing information") + ErrAppGateMissingListeningEndpoint = sdkerrors.Register(codespace, 6, "missing app client listening endpoint") +) diff --git a/pkg/appgateserver/jsonrpc.go b/pkg/appgateserver/jsonrpc.go new file mode 100644 index 000000000..3be01cca7 --- /dev/null +++ b/pkg/appgateserver/jsonrpc.go @@ -0,0 +1,132 @@ +package appgateserver + +import ( + "bytes" + "context" + "io" + "log" + "net/http" + + "github.com/cometbft/cometbft/crypto" + + "github.com/pokt-network/poktroll/x/service/types" + sharedtypes "github.com/pokt-network/poktroll/x/shared/types" +) + +// handleJSONRPCRelay handles JSON RPC relay requests. +// It does everything from preparing, signing and sending the request. +// It then blocks on the response to come back and forward it to the provided writer. +func (app *appGateServer) handleJSONRPCRelay( + ctx context.Context, + appAddress, serviceId string, + request *http.Request, + writer http.ResponseWriter, +) error { + // Read the request body bytes. + payloadBz, err := io.ReadAll(request.Body) + if err != nil { + return err + } + + // Create the relay request payload. + relayRequestPayload := &types.RelayRequest_JsonRpcPayload{} + relayRequestPayload.JsonRpcPayload.Unmarshal(payloadBz) + + session, err := app.getCurrentSession(ctx, appAddress, serviceId) + if err != nil { + return err + } + log.Printf("DEBUG: Current session ID: %s", session.SessionId) + + // Get a supplier URL and address for the given service and session. + supplierUrl, supplierAddress, err := app.getRelayerUrl(ctx, serviceId, sharedtypes.RPCType_JSON_RPC, session) + if err != nil { + return err + } + + // Create the relay request. + relayRequest := &types.RelayRequest{ + Meta: &types.RelayRequestMetadata{ + SessionHeader: session.Header, + Signature: nil, // signature added below + }, + Payload: relayRequestPayload, + } + + // Get the application's signer. + signer, err := app.getRingSingerForAppAddress(ctx, appAddress) + if err != nil { + return err + } + + // Hash and sign the request's signable bytes. + signableBz, err := relayRequest.GetSignableBytes() + if err != nil { + return err + } + + hash := crypto.Sha256(signableBz) + signature, err := signer.Sign(hash) + if err != nil { + return err + } + relayRequest.Meta.Signature = signature + + // Marshal the relay request to bytes and create a reader to be used as an HTTP request body. + relayRequestBz, err := relayRequest.Marshal() + if err != nil { + return err + } + relayRequestReader := io.NopCloser(bytes.NewReader(relayRequestBz)) + + // Create the HTTP request to send the request to the relayer. + relayHTTPRequest := &http.Request{ + Method: request.Method, + Header: request.Header, + URL: supplierUrl, + Body: relayRequestReader, + } + + // Perform the HTTP request to the relayer. + log.Printf("DEBUG: Sending signed relay request to %s", supplierUrl) + relayHTTPResponse, err := http.DefaultClient.Do(relayHTTPRequest) + if err != nil { + return err + } + + // Read the response body bytes. + relayResponseBz, err := io.ReadAll(relayHTTPResponse.Body) + if err != nil { + return err + } + + // Unmarshal the response bytes into a RelayResponse. + relayResponse := &types.RelayResponse{} + if err := relayResponse.Unmarshal(relayResponseBz); err != nil { + return err + } + + // Verify the response signature. We use the supplier address that we got from + // the getRelayerUrl function since this is the address we are expecting to sign the response. + // TODO_TECHDEBT: if the RelayResponse is an internal error response, we should not verify the signature + // as in some relayer early failures, it may not be signed by the supplier. + // TODO_IMPROVE: Add more logging & telemetry so we can get visibility and signal into + // failed responses. + log.Println("DEBUG: Verifying signed relay response from...") + if err := app.verifyResponse(ctx, supplierAddress, relayResponse); err != nil { + return err + } + + // Marshal the response payload to bytes to be sent back to the application. + var responsePayloadBz []byte + if _, err = relayResponse.Payload.MarshalTo(responsePayloadBz); err != nil { + return err + } + + // Reply with the RelayResponse payload. + if _, err := writer.Write(relayRequestBz); err != nil { + return err + } + + return nil +} diff --git a/pkg/appgateserver/options.go b/pkg/appgateserver/options.go new file mode 100644 index 000000000..fb164029b --- /dev/null +++ b/pkg/appgateserver/options.go @@ -0,0 +1,19 @@ +package appgateserver + +import ( + "net/url" +) + +// WithSigningInformation sets the signing information for the appgate server. +func WithSigningInformation(signingInfo *SigningInformation) appGateServerOption { + return func(appGateServer *appGateServer) { + appGateServer.signingInformation = signingInfo + } +} + +// WithListeningUrl sets the listening URL for the appgate server. +func WithListeningUrl(listeningUrl *url.URL) appGateServerOption { + return func(appGateServer *appGateServer) { + appGateServer.listeningEndpoint = listeningUrl + } +} diff --git a/pkg/appgateserver/relay_verifier.go b/pkg/appgateserver/relay_verifier.go new file mode 100644 index 000000000..712eda7f9 --- /dev/null +++ b/pkg/appgateserver/relay_verifier.go @@ -0,0 +1,73 @@ +package appgateserver + +import ( + "context" + + "github.com/cometbft/cometbft/crypto" + cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types" + accounttypes "github.com/cosmos/cosmos-sdk/x/auth/types" + + "github.com/pokt-network/poktroll/x/service/types" +) + +// verifyResponse verifies the relay response signature. +func (app *appGateServer) verifyResponse( + ctx context.Context, + supplierAddress string, + relayResponse *types.RelayResponse, +) error { + // Get the supplier's public key. + supplierPubKey, err := app.getSupplierPubKeyFromAddress(ctx, supplierAddress) + if err != nil { + return err + } + + // Extract the supplier's signature + supplierSignature := relayResponse.Meta.SupplierSignature + + // Get the relay response signable bytes and hash them. + responseBz, err := relayResponse.GetSignableBytes() + if err != nil { + return err + } + hash := crypto.Sha256(responseBz) + + // Verify the relay response signature. + if !supplierPubKey.VerifySignature(hash, supplierSignature) { + return ErrAppGateInvalidRelayResponseSignature + } + + return nil +} + +// getSupplierPubKeyFromAddress gets the supplier's public key from the cache or +// queries if it is not found. The public key is then cached before being returned. +func (app *appGateServer) getSupplierPubKeyFromAddress( + ctx context.Context, + supplierAddress string, +) (cryptotypes.PubKey, error) { + supplierPubKey, ok := app.supplierAccountCache[supplierAddress] + if ok { + return supplierPubKey, nil + } + + // Query for the supplier account to get the application's public key + // to verify the relay request signature. + accQueryReq := &accounttypes.QueryAccountRequest{Address: supplierAddress} + accQueryRes, err := app.accountQuerier.Account(ctx, accQueryReq) + if err != nil { + return nil, err + } + + // Unmarshal the query response into a BaseAccount. + account := new(accounttypes.BaseAccount) + if err := account.Unmarshal(accQueryRes.Account.Value); err != nil { + return nil, err + } + + fetchedPubKey := account.GetPubKey() + // Cache the retrieved public key. + app.supplierAccountCache[supplierAddress] = fetchedPubKey + + return fetchedPubKey, nil +} diff --git a/pkg/appgateserver/rings.go b/pkg/appgateserver/rings.go new file mode 100644 index 000000000..945a5e13b --- /dev/null +++ b/pkg/appgateserver/rings.go @@ -0,0 +1,146 @@ +// TODO_BLOCKER(@h5law): Move all this logic out into a shared package to avoid +// the duplication of core business logic between `pkg/relayer/proxy/rings.go` +// and `pkg/appgateserver/rings.go` +package appgateserver + +import ( + "context" + "fmt" + "log" + + ring_secp256k1 "github.com/athanorlabs/go-dleq/secp256k1" + ringtypes "github.com/athanorlabs/go-dleq/types" + "github.com/cosmos/cosmos-sdk/codec" + codectypes "github.com/cosmos/cosmos-sdk/codec/types" + "github.com/cosmos/cosmos-sdk/crypto/keys/secp256k1" + accounttypes "github.com/cosmos/cosmos-sdk/x/auth/types" + ring "github.com/noot/ring-go" + + "github.com/pokt-network/poktroll/pkg/signer" + apptypes "github.com/pokt-network/poktroll/x/application/types" +) + +// getRingSingerForAppAddress returns the RingSinger used to sign relays. +// This method first attempts to get the points of the ring from the cache, if it +// fails it queries the application module for the points and creates the ring. +func (app *appGateServer) getRingSingerForAppAddress(ctx context.Context, appAddress string) (*signer.RingSigner, error) { + var ring *ring.Ring + var err error + + // lock the cache for reading + app.ringCacheMutex.RLock() + defer app.ringCacheMutex.RUnlock() + + // check if the ring is in the cache + points, ok := app.ringCache[appAddress] + if !ok { + // if the ring is not in the cache, get it from the application module + log.Printf("DEBUG: No ring cached for address: %s", appAddress) + ring, err = app.getRingForAppAddress(ctx, appAddress) + } else { + // if the ring is in the cache, create it from the points + log.Printf("DEBUG: Ring cached for address: %s", appAddress) + ring, err = newRingFromPoints(points) + } + if err != nil { + log.Printf("ERROR: Unable to get ring for address: %s [%v]", appAddress, err) + return nil, err + } + + // return the ring signer + return signer.NewRingSigner(ring, app.signingInformation.SigningKey), nil +} + +// getRingForAppAddress returns the RingSinger used to sign relays. It does so by fetching +// the latest information from the application module and creating the correct ring. +// This method also caches the ring's public keys for future use. +func (app *appGateServer) getRingForAppAddress(ctx context.Context, appAddress string) (*ring.Ring, error) { + points, err := app.getDelegatedPubKeysForAddress(ctx, appAddress) + if err != nil { + return nil, err + } + return newRingFromPoints(points) +} + +// newRingFromPoints creates a new ring from a slice of points on the secp256k1 curve +func newRingFromPoints(points []ringtypes.Point) (*ring.Ring, error) { + return ring.NewFixedKeyRingFromPublicKeys(ring_secp256k1.NewCurve(), points) +} + +// getDelegatedPubKeysForAddress returns the ring used to sign a message for the given +// application address, by querying the application module for it's delegated pubkeys +// and converting them to points on the secp256k1 curve in order to create the ring. +func (app *appGateServer) getDelegatedPubKeysForAddress( + ctx context.Context, + appAddress string, +) ([]ringtypes.Point, error) { + app.ringCacheMutex.RLock() + defer app.ringCacheMutex.RUnlock() + + // get the application's on chain state + req := &apptypes.QueryGetApplicationRequest{Address: appAddress} + res, err := app.applicationQuerier.Application(ctx, req) + if err != nil { + return nil, fmt.Errorf("unable to retrieve application for address: %s [%w]", appAddress, err) + } + + // create a slice of addresses for the ring + ringAddresses := make([]string, 0) + ringAddresses = append(ringAddresses, appAddress) // app address is index 0 + if len(res.Application.DelegateeGatewayAddresses) < 1 { + // add app address twice to make the ring size of mininmum 2 + // TODO_HACK: We are adding the appAddress twice because a ring + // signature requires AT LEAST two pubKeys. When the Application has + // not delegated to any gateways, we add the application's own address + // twice. This is a HACK and should be investigated as to what is the + // best approach to take in this situation. + ringAddresses = append(ringAddresses, appAddress) + } else if len(res.Application.DelegateeGatewayAddresses) > 0 { + // add the delegatee gateway addresses + ringAddresses = append(ringAddresses, res.Application.DelegateeGatewayAddresses...) + } + + // get the points on the secp256k1 curve for the addresses + points, err := app.addressesToPoints(ctx, ringAddresses) + if err != nil { + return nil, err + } + + // update the cache overwriting the previous value + app.ringCache[appAddress] = points + + // return the public key points on the secp256k1 curve + return points, nil +} + +// addressesToPoints converts a slice of addresses to a slice of points on the +// secp256k1 curve, by querying the account module for the public key for each +// address and converting them to the corresponding points on the secp256k1 curve +func (app *appGateServer) addressesToPoints(ctx context.Context, addresses []string) ([]ringtypes.Point, error) { + curve := ring_secp256k1.NewCurve() + points := make([]ringtypes.Point, len(addresses)) + for i, addr := range addresses { + pubKeyReq := &accounttypes.QueryAccountRequest{Address: addr} + pubKeyRes, err := app.accountQuerier.Account(ctx, pubKeyReq) + if err != nil { + return nil, fmt.Errorf("unable to get account for address: %s [%w]", addr, err) + } + var acc accounttypes.AccountI + reg := codectypes.NewInterfaceRegistry() + accounttypes.RegisterInterfaces(reg) + cdc := codec.NewProtoCodec(reg) + if err := cdc.UnpackAny(pubKeyRes.Account, &acc); err != nil { + return nil, fmt.Errorf("unable to deserialise account for address: %s [%w]", addr, err) + } + key := acc.GetPubKey() + if _, ok := key.(*secp256k1.PubKey); !ok { + return nil, fmt.Errorf("public key is not a secp256k1 key: got %T", key) + } + point, err := curve.DecodeToPoint(key.Bytes()) + if err != nil { + return nil, err + } + points[i] = point + } + return points, nil +} diff --git a/pkg/appgateserver/server.go b/pkg/appgateserver/server.go new file mode 100644 index 000000000..d6410021c --- /dev/null +++ b/pkg/appgateserver/server.go @@ -0,0 +1,284 @@ +package appgateserver + +import ( + "context" + "fmt" + "log" + "net/http" + "net/url" + "strings" + "sync" + + "cosmossdk.io/depinject" + ring_secp256k1 "github.com/athanorlabs/go-dleq/secp256k1" + ringtypes "github.com/athanorlabs/go-dleq/types" + sdkclient "github.com/cosmos/cosmos-sdk/client" + "github.com/cosmos/cosmos-sdk/crypto/keyring" + "github.com/cosmos/cosmos-sdk/crypto/keys/secp256k1" + cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types" + accounttypes "github.com/cosmos/cosmos-sdk/x/auth/types" + + blocktypes "github.com/pokt-network/poktroll/pkg/client" + apptypes "github.com/pokt-network/poktroll/x/application/types" + "github.com/pokt-network/poktroll/x/service/types" + sessiontypes "github.com/pokt-network/poktroll/x/session/types" +) + +type SigningInformation struct { + // SelfSigning indicates whether the server is running in self-signing mode + SelfSigning bool + + // SigningKeyName is the name of the key in the keyring that corresponds to the + // private key used to sign relay requests. + SigningKeyName string + + // SigningKey is the scalar point on the appropriate curve corresponding to the + // signer's private key, and is used to sign relay requests via a ring signature + SigningKey ringtypes.Scalar + + // AppAddress is the address of the application that the server is serving if + // If it is nil, then the application address must be included in each request via a query parameter. + AppAddress string +} + +// appGateServer is the server that listens for application requests and relays them to the supplier. +// It is responsible for maintaining the current session for the application, signing the requests, +// and verifying the response signatures. +// The appGateServer is the basis for both applications and gateways, depending on whether the application +// is running their own instance of the appGateServer or they are sending requests to a gateway running an +// instance of the appGateServer, they will need to either include the application address in the request or not. +type appGateServer struct { + // signing information holds the signing key and application address for the server + signingInformation *SigningInformation + + // ringCache is a cache of the public keys used to create the ring for a given application + // they are stored in a map of application address to a slice of points on the secp256k1 curve + // TODO(@h5law): subscribe to on-chain events to update this cache as the ring changes over time + ringCache map[string][]ringtypes.Point + ringCacheMutex *sync.RWMutex + + // clientCtx is the client context for the application. + // It is used to query for the application's account to unmarshal the supplier's account + // and get the public key to verify the relay response signature. + clientCtx sdkclient.Context + + // sessionQuerier is the querier for the session module. + // It used to get the current session for the application given a requested service. + sessionQuerier sessiontypes.QueryClient + + // sessionMu is a mutex to protect currentSession map reads and and updates. + sessionMu sync.RWMutex + + // currentSessions is the current session for the application given a block height. + // It is updated by the goListenForNewSessions goroutine. + currentSessions map[string]*sessiontypes.Session + + // accountQuerier is the querier for the account module. + // It is used to get the the supplier's public key to verify the relay response signature. + accountQuerier accounttypes.QueryClient + + // applicationQuerier is the querier for the application module. + // It is used to get the ring for a given application address. + applicationQuerier apptypes.QueryClient + + // blockClient is the client for the block module. + // It is used to get the current block height to query for the current session. + blockClient blocktypes.BlockClient + + // listeningEndpoint is the endpoint that the appGateServer will listen on. + listeningEndpoint *url.URL + + // server is the HTTP server that will be used capture application requests + // so that they can be signed and relayed to the supplier. + server *http.Server + + // accountCache is a cache of the supplier accounts that has been queried + // TODO_TECHDEBT: Add a size limit to the cache. + supplierAccountCache map[string]cryptotypes.PubKey +} + +func NewAppGateServer( + deps depinject.Config, + opts ...appGateServerOption, +) (*appGateServer, error) { + app := &appGateServer{ + ringCacheMutex: &sync.RWMutex{}, + ringCache: make(map[string][]ringtypes.Point), + currentSessions: make(map[string]*sessiontypes.Session), + supplierAccountCache: make(map[string]cryptotypes.PubKey), + } + + if err := depinject.Inject( + deps, + &app.clientCtx, + &app.blockClient, + ); err != nil { + return nil, err + } + + for _, opt := range opts { + opt(app) + } + + if err := app.validateConfig(); err != nil { + return nil, err + } + + keyRecord, err := app.clientCtx.Keyring.Key(app.signingInformation.SigningKeyName) + if err != nil { + return nil, fmt.Errorf("failed to get key from keyring: %w", err) + } + + appAddress, err := keyRecord.GetAddress() + if err != nil { + return nil, fmt.Errorf("failed to get address from key: %w", err) + } + if app.signingInformation.SelfSigning { + app.signingInformation.AppAddress = appAddress.String() + } + + // Convert the key record to a private key and return the scalar + // point on the secp256k1 curve that it corresponds to. + // If the key is not a secp256k1 key, this will return an error. + signingKey, err := recordLocalToScalar(keyRecord.GetLocal()) + if err != nil { + return nil, fmt.Errorf("failed to convert private key to scalar: %w", err) + } + app.signingInformation.SigningKey = signingKey + + app.sessionQuerier = sessiontypes.NewQueryClient(app.clientCtx) + app.accountQuerier = accounttypes.NewQueryClient(app.clientCtx) + app.applicationQuerier = apptypes.NewQueryClient(app.clientCtx) + app.server = &http.Server{Addr: app.listeningEndpoint.Host} + + return app, nil +} + +// Start starts the appgate server and blocks until the context is done +// or the server returns an error. +func (app *appGateServer) Start(ctx context.Context) error { + // Shutdown the HTTP server when the context is done. + go func() { + <-ctx.Done() + app.server.Shutdown(ctx) + }() + + // Set the HTTP handler. + app.server.Handler = app + + // Start the HTTP server. + return app.server.ListenAndServe() +} + +// Stop stops the appgate server and returns any error that occurred. +func (app *appGateServer) Stop(ctx context.Context) error { + return app.server.Shutdown(ctx) +} + +// ServeHTTP is the HTTP handler for the appgate server. +// It captures the application request, signs it, and sends it to the supplier. +// After receiving the response from the supplier, it verifies the response signature +// before returning the response to the application. +// The serviceId is extracted from the request path. +// The request's path should be of the form: +// +// "://host:port/serviceId[/other/path/segments]?senderAddr=" +// +// where the serviceId is the id of the service that the application is requesting +// and the other (possible) path segments are the JSON RPC request path. +// TODO_TECHDEBT: Revisit the requestPath above based on the SDK that'll be exposed in the future. +func (app *appGateServer) ServeHTTP(writer http.ResponseWriter, request *http.Request) { + ctx := request.Context() + + // Extract the serviceId from the request path. + path := request.URL.Path + serviceId := strings.Split(path, "/")[1] + + // Determine the application address. + appAddress := app.signingInformation.AppAddress + if appAddress == "" { + appAddress = request.URL.Query().Get("senderAddr") + } + if appAddress == "" { + app.replyWithError(writer, ErrAppGateMissingAppAddress) + log.Print("ERROR: no application address provided") + } + + // TODO_TECHDEBT: Currently, there is no information about the RPC type requested. It should + // be extracted from the request and used to determine the RPC type to handle. handle*Relay() + // calls should be wrapped into a switch statement to handle different types of relays. + err := app.handleJSONRPCRelay(ctx, appAddress, serviceId, request, writer) + if err != nil { + // Reply with an error response if there was an error handling the relay. + app.replyWithError(writer, err) + log.Printf("ERROR: failed handling relay: %s", err) + return + } + + log.Print("INFO: request serviced successfully") +} + +// replyWithError replies to the application with an error response. +// TODO_TECHDEBT: This method should be aware of the nature of the error to use the appropriate JSONRPC +// Code, Message and Data. Possibly by augmenting the passed in error with the adequate information. +func (app *appGateServer) replyWithError(writer http.ResponseWriter, err error) { + relayResponse := &types.RelayResponse{ + Payload: &types.RelayResponse_JsonRpcPayload{ + JsonRpcPayload: &types.JSONRPCResponsePayload{ + Id: make([]byte, 0), + Jsonrpc: "2.0", + Error: &types.JSONRPCResponseError{ + // Using conventional error code indicating internal server error. + Code: -32000, + Message: err.Error(), + Data: nil, + }, + }, + }, + } + + relayResponseBz, err := relayResponse.Marshal() + if err != nil { + log.Printf("ERROR: failed marshaling relay response: %s", err) + return + } + + if _, err = writer.Write(relayResponseBz); err != nil { + log.Printf("ERROR: failed writing relay response: %s", err) + return + } +} + +// validateConfig validates the appGateServer configuration. +func (app *appGateServer) validateConfig() error { + if app.signingInformation == nil { + return ErrAppGateMissingSigningInformation + } + if app.listeningEndpoint == nil { + return ErrAppGateMissingListeningEndpoint + } + return nil +} + +// recordLocalToScalar converts the private key obtained from a +// key record to a scalar point on the secp256k1 curve +func recordLocalToScalar(local *keyring.Record_Local) (ringtypes.Scalar, error) { + if local == nil { + return nil, fmt.Errorf("cannot extract private key from key record: nil") + } + priv, ok := local.PrivKey.GetCachedValue().(cryptotypes.PrivKey) + if !ok { + return nil, fmt.Errorf("cannot extract private key from key record: %T", local.PrivKey.GetCachedValue()) + } + if _, ok := priv.(*secp256k1.PrivKey); !ok { + return nil, fmt.Errorf("unexpected private key type: %T, want %T", priv, &secp256k1.PrivKey{}) + } + crv := ring_secp256k1.NewCurve() + privKey, err := crv.DecodeToScalar(priv.Bytes()) + if err != nil { + return nil, fmt.Errorf("failed to decode private key: %w", err) + } + return privKey, nil +} + +type appGateServerOption func(*appGateServer) diff --git a/pkg/appgateserver/session.go b/pkg/appgateserver/session.go new file mode 100644 index 000000000..5db25465d --- /dev/null +++ b/pkg/appgateserver/session.go @@ -0,0 +1,47 @@ +package appgateserver + +import ( + "context" + + sessiontypes "github.com/pokt-network/poktroll/x/session/types" + sharedtypes "github.com/pokt-network/poktroll/x/shared/types" +) + +// getCurrentSession gets the current session for the given service +// It returns the current session if it exists and is still valid, otherwise it +// queries for the latest session, caches and returns it. +func (app *appGateServer) getCurrentSession( + ctx context.Context, + appAddress, serviceId string, +) (*sessiontypes.Session, error) { + app.sessionMu.RLock() + defer app.sessionMu.RUnlock() + + latestBlock := app.blockClient.LatestBlock(ctx) + if currentSession, ok := app.currentSessions[serviceId]; ok { + sessionEndBlockHeight := currentSession.Header.SessionStartBlockHeight + currentSession.NumBlocksPerSession + + // Return the current session if it is still valid. + if latestBlock.Height() < sessionEndBlockHeight { + return currentSession, nil + } + } + + // Query for the current session. + sessionQueryReq := sessiontypes.QueryGetSessionRequest{ + ApplicationAddress: appAddress, + Service: &sharedtypes.Service{Id: serviceId}, + BlockHeight: latestBlock.Height(), + } + sessionQueryRes, err := app.sessionQuerier.GetSession(ctx, &sessionQueryReq) + if err != nil { + return nil, err + } + + session := sessionQueryRes.Session + + // Cache the current session. + app.currentSessions[serviceId] = session + + return session, nil +} diff --git a/pkg/client/block/client.go b/pkg/client/block/client.go index 3fd6489a7..375171d28 100644 --- a/pkg/client/block/client.go +++ b/pkg/client/block/client.go @@ -82,8 +82,7 @@ func NewBlockClient( ) (client.BlockClient, error) { // Initialize block client bClient := &blockClient{endpointURL: cometWebsocketURL} - bClient.latestBlockObsvbls, bClient.latestBlockObsvblsReplayPublishCh = - channel.NewReplayObservable[client.BlocksObservable](ctx, latestBlockObsvblsReplayBufferSize) + bClient.latestBlockObsvbls, bClient.latestBlockObsvblsReplayPublishCh = channel.NewReplayObservable[client.BlocksObservable](ctx, latestBlockObsvblsReplayBufferSize) // Inject dependencies if err := depinject.Inject(deps, &bClient.eventsClient); err != nil { @@ -141,7 +140,9 @@ func (bClient *blockClient) goPublishBlocks(ctx context.Context) { // If we get here, the retry limit was reached and the retry loop exited. // Since this function runs in a goroutine, we can't return the error to the // caller. Instead, we panic. - panic(fmt.Errorf("BlockClient.goPublishBlocks shold never reach this spot: %w", publishErr)) + if publishErr != nil { + panic(fmt.Errorf("BlockClient.goPublishBlocks should never reach this spot: %w", publishErr)) + } } // retryPublishBlocksFactory returns a function which is intended to be passed to diff --git a/pkg/client/gomock_reflect_3526400147/prog.go b/pkg/client/gomock_reflect_3526400147/prog.go deleted file mode 100644 index 6003ba81a..000000000 --- a/pkg/client/gomock_reflect_3526400147/prog.go +++ /dev/null @@ -1,66 +0,0 @@ -package main - -import ( - "encoding/gob" - "flag" - "fmt" - "os" - "path" - "reflect" - - "github.com/golang/mock/mockgen/model" - - pkg_ "github.com/pokt-network/poktroll/pkg/client" -) - -var output = flag.String("output", "", "The output file name, or empty to use stdout.") - -func main() { - flag.Parse() - - its := []struct { - sym string - typ reflect.Type - }{ - - {"TxContext", reflect.TypeOf((*pkg_.TxContext)(nil)).Elem()}, - - {"TxClient", reflect.TypeOf((*pkg_.TxClient)(nil)).Elem()}, - } - pkg := &model.Package{ - // NOTE: This behaves contrary to documented behaviour if the - // package name is not the final component of the import path. - // The reflect package doesn't expose the package name, though. - Name: path.Base("github.com/pokt-network/poktroll/pkg/client"), - } - - for _, it := range its { - intf, err := model.InterfaceFromInterfaceType(it.typ) - if err != nil { - fmt.Fprintf(os.Stderr, "Reflection: %v\n", err) - os.Exit(1) - } - intf.Name = it.sym - pkg.Interfaces = append(pkg.Interfaces, intf) - } - - outfile := os.Stdout - if len(*output) != 0 { - var err error - outfile, err = os.Create(*output) - if err != nil { - fmt.Fprintf(os.Stderr, "failed to open output file %q", *output) - } - defer func() { - if err := outfile.Close(); err != nil { - fmt.Fprintf(os.Stderr, "failed to close output file %q", *output) - os.Exit(1) - } - }() - } - - if err := gob.NewEncoder(outfile).Encode(pkg); err != nil { - fmt.Fprintf(os.Stderr, "gob encode: %v\n", err) - os.Exit(1) - } -} diff --git a/pkg/relayer/proxy/errors.go b/pkg/relayer/proxy/errors.go index 6a0815b58..e16d41e17 100644 --- a/pkg/relayer/proxy/errors.go +++ b/pkg/relayer/proxy/errors.go @@ -10,4 +10,6 @@ var ( ErrRelayerProxyInvalidSupplier = sdkerrors.Register(codespace, 4, "invalid relayer proxy supplier") ErrRelayerProxyUndefinedSigningKeyName = sdkerrors.Register(codespace, 5, "undefined relayer proxy signing key name") ErrRelayerProxyUndefinedProxiedServicesEndpoints = sdkerrors.Register(codespace, 6, "undefined proxied services endpoints for relayer proxy") + ErrRelayerProxyInvalidRelayRequest = sdkerrors.Register(codespace, 7, "invalid relay request") + ErrRelayerProxyInvalidRelayResponse = sdkerrors.Register(codespace, 8, "invalid relay response") ) diff --git a/pkg/relayer/proxy/proxy.go b/pkg/relayer/proxy/proxy.go index 58b9549fd..9ed38e963 100644 --- a/pkg/relayer/proxy/proxy.go +++ b/pkg/relayer/proxy/proxy.go @@ -3,8 +3,10 @@ package proxy import ( "context" "net/url" + "sync" "cosmossdk.io/depinject" + ringtypes "github.com/athanorlabs/go-dleq/types" sdkclient "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/crypto/keyring" accounttypes "github.com/cosmos/cosmos-sdk/x/auth/types" @@ -14,6 +16,7 @@ import ( "github.com/pokt-network/poktroll/pkg/observable" "github.com/pokt-network/poktroll/pkg/observable/channel" "github.com/pokt-network/poktroll/pkg/relayer" + apptypes "github.com/pokt-network/poktroll/x/application/types" "github.com/pokt-network/poktroll/x/service/types" sessiontypes "github.com/pokt-network/poktroll/x/session/types" suppliertypes "github.com/pokt-network/poktroll/x/supplier/types" @@ -54,6 +57,10 @@ type relayerProxy struct { // which is needed to check if the relay proxy should be serving an incoming relay request. sessionQuerier sessiontypes.QueryClient + // applicationQuerier is the querier for the application module. + // It is used to get the ring for a given application address. + applicationQuerier apptypes.QueryClient + // advertisedRelayServers is a map of the services provided by the relayer proxy. Each provided service // has the necessary information to start the server that listens for incoming relay requests and // the client that relays the request to the supported proxied service. @@ -69,6 +76,12 @@ type relayerProxy struct { // servedRelays observable can fan out the notifications to its subscribers. servedRelaysProducer chan<- *types.Relay + // ringCache is a cache of the public keys used to create the ring for a given application + // they are stored in a map of application address to a slice of points on the secp256k1 curve + // TODO(@h5law): subscribe to on-chain events to update this cache as the ring changes over time + ringCache map[string][]ringtypes.Point + ringCacheMutex *sync.RWMutex + // clientCtx is the Cosmos' client context used to build the needed query clients and unmarshal their replies. clientCtx sdkclient.Context diff --git a/pkg/relayer/proxy/relay_signer.go b/pkg/relayer/proxy/relay_signer.go index 5ab929cbe..ac3ec2089 100644 --- a/pkg/relayer/proxy/relay_signer.go +++ b/pkg/relayer/proxy/relay_signer.go @@ -1,8 +1,10 @@ package proxy import ( + sdkerrors "cosmossdk.io/errors" "github.com/cometbft/cometbft/crypto" + "github.com/pokt-network/poktroll/pkg/signer" "github.com/pokt-network/poktroll/x/service/types" ) @@ -12,14 +14,23 @@ import ( // that should not be responsible for signing relay responses. // See https://github.com/pokt-network/poktroll/issues/160 for a better design. func (rp *relayerProxy) SignRelayResponse(relayResponse *types.RelayResponse) error { - var responseBz []byte - _, err := relayResponse.MarshalTo(responseBz) + // create a simple signer for the request + signer := signer.NewSimpleSigner(rp.keyring, rp.signingKeyName) + + // extract and hash the relay response's signable bytes + signableBz, err := relayResponse.GetSignableBytes() if err != nil { - return err + return sdkerrors.Wrapf(ErrRelayerProxyInvalidRelayResponse, "error getting signable bytes: %v", err) } + hash := crypto.Sha256(signableBz) - hash := crypto.Sha256(responseBz) - relayResponse.Meta.SupplierSignature, _, err = rp.keyring.Sign(rp.signingKeyName, hash) + // sign the relay response + sig, err := signer.Sign(hash) + if err != nil { + return sdkerrors.Wrapf(ErrRelayerProxyInvalidRelayResponse, "error signing relay response: %v", err) + } - return err + // set the relay response's signature + relayResponse.Meta.SupplierSignature = sig + return nil } diff --git a/pkg/relayer/proxy/relay_verifier.go b/pkg/relayer/proxy/relay_verifier.go index db9cbe7dc..e64955d55 100644 --- a/pkg/relayer/proxy/relay_verifier.go +++ b/pkg/relayer/proxy/relay_verifier.go @@ -3,8 +3,10 @@ package proxy import ( "context" + sdkerrors "cosmossdk.io/errors" + ring_secp256k1 "github.com/athanorlabs/go-dleq/secp256k1" "github.com/cometbft/cometbft/crypto" - accounttypes "github.com/cosmos/cosmos-sdk/x/auth/types" + "github.com/noot/ring-go" "github.com/pokt-network/poktroll/x/service/types" sessiontypes "github.com/pokt-network/poktroll/x/session/types" @@ -17,33 +19,63 @@ func (rp *relayerProxy) VerifyRelayRequest( relayRequest *types.RelayRequest, service *sharedtypes.Service, ) error { - // Query for the application account to get the application's public key to verify the relay request signature. - applicationAddress := relayRequest.Meta.SessionHeader.ApplicationAddress - accQueryReq := &accounttypes.QueryAccountRequest{Address: applicationAddress} - accQueryRes, err := rp.accountsQuerier.Account(ctx, accQueryReq) + // extract the relay request's ring signature + signature := relayRequest.Meta.Signature + if signature == nil { + return sdkerrors.Wrapf( + ErrRelayerProxyInvalidRelayRequest, + "missing signature from relay request: %v", relayRequest, + ) + } + + ringSig := new(ring.RingSig) + if err := ringSig.Deserialize(ring_secp256k1.NewCurve(), signature); err != nil { + return sdkerrors.Wrapf( + ErrRelayerProxyInvalidRelayRequestSignature, + "error deserializing ring signature: %v", err, + ) + } + + // get the ring for the application address of the relay request + appAddress := relayRequest.Meta.SessionHeader.ApplicationAddress + appRing, err := rp.getRingForAppAddress(ctx, appAddress) if err != nil { - return err + return sdkerrors.Wrapf( + ErrRelayerProxyInvalidRelayRequest, + "error getting ring for application address %s: %v", appAddress, err, + ) } - var payloadBz []byte - if _, err = relayRequest.Payload.MarshalTo(payloadBz); err != nil { - return err + // verify the ring signature against the ring + if !ringSig.Ring().Equals(appRing) { + return sdkerrors.Wrapf( + ErrRelayerProxyInvalidRelayRequestSignature, + "ring signature does not match ring for application address %s", appAddress, + ) } - hash := crypto.Sha256(payloadBz) - account := new(accounttypes.BaseAccount) - if err := account.Unmarshal(accQueryRes.Account.Value); err != nil { - return err + // get and hash the signable bytes of the relay request + signableBz, err := relayRequest.GetSignableBytes() + if err != nil { + return sdkerrors.Wrapf(ErrRelayerProxyInvalidRelayRequest, "error getting signable bytes: %v", err) } - if !account.GetPubKey().VerifySignature(hash, relayRequest.Meta.Signature) { - return ErrRelayerProxyInvalidRelayRequestSignature + hash := crypto.Sha256(signableBz) + var hash32 [32]byte + copy(hash32[:], hash) + + // verify the relay request's signature + if valid := ringSig.Verify(hash32); !valid { + return sdkerrors.Wrapf( + ErrRelayerProxyInvalidRelayRequestSignature, + "invalid ring signature", + ) } // Query for the current session to check if relayRequest sessionId matches the current session. currentBlock := rp.blockClient.LatestBlock(ctx) sessionQuery := &sessiontypes.QueryGetSessionRequest{ - ApplicationAddress: applicationAddress, + ApplicationAddress: appAddress, Service: service, BlockHeight: currentBlock.Height(), } diff --git a/pkg/relayer/proxy/rings.go b/pkg/relayer/proxy/rings.go new file mode 100644 index 000000000..59a19ae70 --- /dev/null +++ b/pkg/relayer/proxy/rings.go @@ -0,0 +1,119 @@ +// TODO_BLOCKER(@h5law): Move all this logic out into a shared package to avoid +// the duplication of core business logic between `pkg/relayer/proxy/rings.go` +// and `pkg/appgateserver/rings.go` +package proxy + +import ( + "context" + "fmt" + + ring_secp256k1 "github.com/athanorlabs/go-dleq/secp256k1" + ringtypes "github.com/athanorlabs/go-dleq/types" + "github.com/cosmos/cosmos-sdk/crypto/keys/secp256k1" + accounttypes "github.com/cosmos/cosmos-sdk/x/auth/types" + ring "github.com/noot/ring-go" + apptypes "github.com/pokt-network/poktroll/x/application/types" +) + +// getRingForAppAddress returns the RingSinger used to sign relays. It does so by fetching +// the latest information from the application module and creating the correct ring. +// This method also caches the ring's public keys for future use. +func (rp *relayerProxy) getRingForAppAddress(ctx context.Context, appAddress string) (*ring.Ring, error) { + // lock the cache for reading + rp.ringCacheMutex.RLock() + + // check if the ring is in the cache + points, ok := rp.ringCache[appAddress] + rp.ringCacheMutex.RUnlock() // unlock the cache incase not found in cache + var err error + if !ok { + // if the ring is not in the cache, get it from the application module + points, err = rp.getDelegatedPubKeysForAddress(ctx, appAddress) + } + if err != nil { + return nil, err + } + + // create the ring from the points + return newRingFromPoints(points) +} + +// newRingFromPoints creates a new ring from a slice of points on the secp256k1 curve +func newRingFromPoints(points []ringtypes.Point) (*ring.Ring, error) { + return ring.NewFixedKeyRingFromPublicKeys(ring_secp256k1.NewCurve(), points) +} + +// getDelegatedPubKeysForAddress returns the ring used to sign a message for the given +// application address, by querying the application module for it's delegated pubkeys +// and converting them to points on the secp256k1 curve in order to create the ring. +func (rp *relayerProxy) getDelegatedPubKeysForAddress( + ctx context.Context, + appAddress string, +) ([]ringtypes.Point, error) { + rp.ringCacheMutex.RLock() + defer rp.ringCacheMutex.RUnlock() + + // get the application's on chain state + req := &apptypes.QueryGetApplicationRequest{Address: appAddress} + res, err := rp.applicationQuerier.Application(ctx, req) + if err != nil { + return nil, fmt.Errorf("unable to retrieve application for address: %s [%w]", appAddress, err) + } + + // create a slice of addresses for the ring + ringAddresses := make([]string, 0) + ringAddresses = append(ringAddresses, appAddress) // app address is index 0 + if len(res.Application.DelegateeGatewayAddresses) < 1 { + // add app address twice to make the ring size of mininmum 2 + // TODO_TECHDEBT: We are adding the appAddress twice because a ring + // signature requires AT LEAST two pubKeys. When the Application has + // not delegated to any gateways, we add the application's own address + // twice. This is a HACK and should be investigated as to what is the + // best approach to take in this situation. + ringAddresses = append(ringAddresses, appAddress) + } else if len(res.Application.DelegateeGatewayAddresses) > 0 { + // add the delegatee gateway addresses + ringAddresses = append(ringAddresses, res.Application.DelegateeGatewayAddresses...) + } + + // get the points on the secp256k1 curve for the addresses + points, err := rp.addressesToPoints(ctx, ringAddresses) + if err != nil { + return nil, err + } + + // update the cache overwriting the previous value + rp.ringCache[appAddress] = points + + // return the public key points on the secp256k1 curve + return points, nil +} + +// addressesToPoints converts a slice of addresses to a slice of points on the +// secp256k1 curve, by querying the account module for the public key for each +// address and converting them to the corresponding points on the secp256k1 curve +func (rp *relayerProxy) addressesToPoints(ctx context.Context, addresses []string) ([]ringtypes.Point, error) { + curve := ring_secp256k1.NewCurve() + points := make([]ringtypes.Point, len(addresses)) + for i, addr := range addresses { + pubKeyReq := &accounttypes.QueryAccountRequest{Address: addr} + pubKeyRes, err := rp.accountsQuerier.Account(ctx, pubKeyReq) + if err != nil { + return nil, fmt.Errorf("unable to get account for address: %s [%w]", addr, err) + } + acc := new(accounttypes.BaseAccount) + if err := acc.Unmarshal(pubKeyRes.Account.Value); err != nil { + return nil, fmt.Errorf("unable to deserialise account for address: %s [%w]", addr, err) + } + key := acc.GetPubKey() + if _, ok := key.(*secp256k1.PubKey); !ok { + return nil, fmt.Errorf("public key is not a secp256k1 key: got %T", key) + } + point, err := curve.DecodeToPoint(key.Bytes()) + if err != nil { + return nil, err + } + points[i] = point + } + return points, nil +} diff --git a/pkg/signer/interface.go b/pkg/signer/interface.go new file mode 100644 index 000000000..5d91ae42a --- /dev/null +++ b/pkg/signer/interface.go @@ -0,0 +1,9 @@ +package signer + +// Signer is an interface that abstracts the signing of a message, it is used +// to sign both relay requests and responses via one of the two implementations. +// The Signer interface expects a 32 byte message (sha256 hash) and returns a +// byte slice containing the signature or any error that occurred during signing. +type Signer interface { + Sign(msg []byte) (signature []byte, err error) +} diff --git a/pkg/signer/ring_signer.go b/pkg/signer/ring_signer.go new file mode 100644 index 000000000..de401c25f --- /dev/null +++ b/pkg/signer/ring_signer.go @@ -0,0 +1,38 @@ +package signer + +import ( + "fmt" + + ringtypes "github.com/athanorlabs/go-dleq/types" + ring "github.com/noot/ring-go" +) + +var _ Signer = (*RingSigner)(nil) + +// RingSigner is a signer implementation that uses a ring to sign messages, for +// verification the ring signature must be verified and confirmed to be using +// the expected ring. +type RingSigner struct { + ring *ring.Ring + privKey ringtypes.Scalar +} + +// NewRingSigner creates a new RingSigner instance with the ring and private key provided +func NewRingSigner(ring *ring.Ring, privKey ringtypes.Scalar) *RingSigner { + return &RingSigner{ring: ring, privKey: privKey} +} + +// Sign uses the ring and private key to sign the message provided and returns the +// serialised ring signature that can be deserialised and verified by the verifier +func (r *RingSigner) Sign(msg []byte) ([]byte, error) { + if len(msg) != 32 { + return nil, fmt.Errorf("message must be 32 bytes long, got %d", len(msg)) + } + var msg32 [32]byte + copy(msg32[:], msg) + ringSig, err := r.ring.Sign(msg32, r.privKey) + if err != nil { + return nil, fmt.Errorf("failed to sign message [%v]: %w", msg, err) + } + return ringSig.Serialize() +} diff --git a/pkg/signer/simple_signer.go b/pkg/signer/simple_signer.go new file mode 100644 index 000000000..208d56f67 --- /dev/null +++ b/pkg/signer/simple_signer.go @@ -0,0 +1,23 @@ +package signer + +import "github.com/cosmos/cosmos-sdk/crypto/keyring" + +var _ Signer = (*SimpleSigner)(nil) + +// SimpleSigner is a signer implementation that uses the local keyring to sign +// messages, for verification using the signer's corresponding public key +type SimpleSigner struct { + keyring keyring.Keyring + keyName string +} + +// NewSimpleSigner creates a new SimpleSigner instance with the keyring and keyName provided +func NewSimpleSigner(keyring keyring.Keyring, keyName string) *SimpleSigner { + return &SimpleSigner{keyring: keyring, keyName: keyName} +} + +// Sign signs the given message using the SimpleSigner's keyring and keyName +func (s *SimpleSigner) Sign(msg []byte) (signature []byte, err error) { + sig, _, err := s.keyring.Sign(s.keyName, msg[:]) + return sig, err +} diff --git a/x/service/types/relay.go b/x/service/types/relay.go new file mode 100644 index 000000000..c7b2a1894 --- /dev/null +++ b/x/service/types/relay.go @@ -0,0 +1,23 @@ +package types + +// GetSignableBytes returns the signable bytes for the relay request +// this involves setting the signature to nil and marshaling the message. +// A value receiver is used to avoid overwriting any pre-existing signature +func (req RelayRequest) GetSignableBytes() ([]byte, error) { + // set signature to nil + req.Meta.Signature = nil + + // return the marshaled message + return req.Marshal() +} + +// GetSignableBytes returns the signable bytes for the relay response +// this involves setting the signature to nil and marshaling the message. +// A value receiver is used to avoid overwriting any pre-existing signature +func (res RelayResponse) GetSignableBytes() ([]byte, error) { + // set signature to nil + res.Meta.SupplierSignature = nil + + // return the marshaled message + return res.Marshal() +}