diff --git a/.changelog/5813.bugfix.md b/.changelog/5813.bugfix.md new file mode 100644 index 00000000000..30808c72d16 --- /dev/null +++ b/.changelog/5813.bugfix.md @@ -0,0 +1 @@ +go/runtime: Add timeout for HostSubmitTx from ROFL apps diff --git a/go/runtime/host/protocol/connection.go b/go/runtime/host/protocol/connection.go index 838218f8272..70672373a63 100644 --- a/go/runtime/host/protocol/connection.go +++ b/go/runtime/host/protocol/connection.go @@ -504,7 +504,12 @@ func (c *connection) workerIncoming() { wg.Add(1) go func() { defer wg.Done() - c.handleMessage(ctx, &message) + + // Ensure each message has its own context which is canceled at the end. + localCtx, localCancel := context.WithCancel(ctx) + defer localCancel() + + c.handleMessage(localCtx, &message) }() } } diff --git a/go/runtime/registry/host_rofl.go b/go/runtime/registry/host_rofl.go index 743c2b5c021..94714e2b814 100644 --- a/go/runtime/registry/host_rofl.go +++ b/go/runtime/registry/host_rofl.go @@ -28,6 +28,9 @@ const ( roflAttachRuntimeTimeout = 2 * time.Second // roflNotifyTimeout is the maximum amount of time runtime notification handling can take. roflNotifyTimeout = 2 * time.Second + // roflSubmitTxTimeout is the maximum amount of time that the host will wait for transaction + // inclusion into a block. + roflSubmitTxTimeout = 1 * time.Minute // roflLocalStorageKeySeparator is the local storage key separator after component ID. roflLocalStorageKeySeparator = ":" ) @@ -178,10 +181,13 @@ func (rh *roflHostHandler) handleHostSubmitTx( Data: rq.Data, } + submitTxCtx, cancel := context.WithTimeout(ctx, roflSubmitTxTimeout) + defer cancel() + switch rq.Wait { case true: // We need to wait for transaction inclusion. - rsp, err := rh.client.SubmitTxMeta(ctx, submitRq) + rsp, err := rh.client.SubmitTxMeta(submitTxCtx, submitRq) switch { case err != nil: return nil, err @@ -201,7 +207,7 @@ func (rh *roflHostHandler) handleHostSubmitTx( }, nil default: // Just submit and forget. - err := rh.client.SubmitTxNoWait(ctx, submitRq) + err := rh.client.SubmitTxNoWait(submitTxCtx, submitRq) if err != nil { return nil, err } diff --git a/go/worker/client/committee/node.go b/go/worker/client/committee/node.go index 0411f28cd23..6bdac0a6069 100644 --- a/go/worker/client/committee/node.go +++ b/go/worker/client/committee/node.go @@ -24,8 +24,13 @@ import ( ) type pendingTx struct { + chs map[chan *api.SubmitTxResult]struct{} +} + +type wantTx struct { txHash hash.Hash ch chan *api.SubmitTxResult + remove bool } // Node is a client node. @@ -88,7 +93,36 @@ func (n *Node) HandleRuntimeHostEventLocked(*host.Event) { // Nothing to do here. } -func (n *Node) SubmitTx(ctx context.Context, tx []byte) (<-chan *api.SubmitTxResult, *protocol.Error, error) { +// SubmitTxSubscription is a subscription to a transaction submission result. +type SubmitTxSubscription struct { + txHash hash.Hash + ch chan *api.SubmitTxResult + + n *Node +} + +// Result returns a channel that will receive the transaction submission result once the transaction +// has been included in a block. +func (sr *SubmitTxSubscription) Result() <-chan *api.SubmitTxResult { + return sr.ch +} + +// Stop notifies the client to stop watching for the transaction submission result. +func (sr *SubmitTxSubscription) Stop() { + sr.n.txCh.In() <- &wantTx{ + txHash: sr.txHash, + ch: sr.ch, + remove: true, + } +} + +// SubmitTx submits the transaction to the transaction pool, waits for it to be checked and returns +// a subscription that gets a notification when the transaction is included in a block. +// +// When the caller is not interested in the transaction execution result, it should call `Stop` on +// the returned subscription. Not doing so may leak resources associated with tracking the submitted +// transaction. +func (n *Node) SubmitTx(ctx context.Context, tx []byte) (*SubmitTxSubscription, *protocol.Error, error) { // Make sure consensus is synced. select { case <-n.commonNode.Consensus.Synced(): @@ -107,13 +141,19 @@ func (n *Node) SubmitTx(ctx context.Context, tx []byte) (<-chan *api.SubmitTxRes return nil, &result.Error, nil } + txHash := hash.NewFromBytes(tx) ch := make(chan *api.SubmitTxResult, 1) - n.txCh.In() <- &pendingTx{ - txHash: hash.NewFromBytes(tx), + n.txCh.In() <- &wantTx{ + txHash: txHash, ch: ch, } - return ch, nil, nil + sub := &SubmitTxSubscription{ + txHash: txHash, + ch: ch, + n: n, + } + return sub, nil, nil } func (n *Node) CheckTx(ctx context.Context, tx []byte) (*protocol.CheckTxResult, error) { @@ -198,14 +238,16 @@ func (n *Node) checkBlock(ctx context.Context, blk *block.Block, pending map[has var processed []hash.Hash for txHash, tx := range matches { pTx := pending[txHash] - pTx.ch <- &api.SubmitTxResult{ - Result: &api.SubmitTxMetaResponse{ - Round: blk.Header.Round, - BatchOrder: tx.BatchOrder, - Output: tx.Output, - }, + for ch := range pTx.chs { + ch <- &api.SubmitTxResult{ + Result: &api.SubmitTxMetaResponse{ + Round: blk.Header.Round, + BatchOrder: tx.BatchOrder, + Output: tx.Output, + }, + } + close(ch) } - close(pTx.ch) delete(pending, txHash) processed = append(processed, txHash) } @@ -262,8 +304,31 @@ func (n *Node) worker() { n.logger.Info("termination requested") return case rtx := <-n.txCh.Out(): - tx := rtx.(*pendingTx) - pending[tx.txHash] = tx + tx := rtx.(*wantTx) + existingTx, ok := pending[tx.txHash] + + switch tx.remove { + case false: + // Interest in the transaction. + if !ok { + existingTx = &pendingTx{ + chs: make(map[chan *api.SubmitTxResult]struct{}), + } + pending[tx.txHash] = existingTx + } + + existingTx.chs[tx.ch] = struct{}{} + case true: + // Removal of interest in the transaction. + if !ok { + continue + } + + delete(existingTx.chs, tx.ch) + if len(existingTx.chs) == 0 { + delete(pending, tx.txHash) + } + } continue case blk := <-blkCh: blocks = append(blocks, blk.Block) diff --git a/go/worker/client/service.go b/go/worker/client/service.go index faff5eaf8df..f43d863d2ef 100644 --- a/go/worker/client/service.go +++ b/go/worker/client/service.go @@ -15,13 +15,14 @@ import ( "github.com/oasisprotocol/oasis-core/go/runtime/host/protocol" "github.com/oasisprotocol/oasis-core/go/runtime/transaction" storage "github.com/oasisprotocol/oasis-core/go/storage/api" + "github.com/oasisprotocol/oasis-core/go/worker/client/committee" ) type service struct { w *Worker } -func (s *service) submitTx(ctx context.Context, request *api.SubmitTxRequest) (<-chan *api.SubmitTxResult, *protocol.Error, error) { +func (s *service) submitTx(ctx context.Context, request *api.SubmitTxRequest) (*committee.SubmitTxSubscription, *protocol.Error, error) { rt := s.w.runtimes[request.RuntimeID] if rt == nil { return nil, nil, api.ErrNoHostedRuntime @@ -44,7 +45,7 @@ func (s *service) SubmitTx(ctx context.Context, request *api.SubmitTxRequest) ([ // Implements api.RuntimeClient. func (s *service) SubmitTxMeta(ctx context.Context, request *api.SubmitTxRequest) (*api.SubmitTxMetaResponse, error) { - respCh, checkTxErr, err := s.submitTx(ctx, request) + sub, checkTxErr, err := s.submitTx(ctx, request) if err != nil { return nil, err } @@ -53,6 +54,7 @@ func (s *service) SubmitTxMeta(ctx context.Context, request *api.SubmitTxRequest CheckTxError: checkTxErr, }, nil } + defer sub.Stop() // Ensure subscription is stopped. // Wait for result. for { @@ -63,7 +65,7 @@ func (s *service) SubmitTxMeta(ctx context.Context, request *api.SubmitTxRequest case <-ctx.Done(): // The context we're working in was canceled, abort. return nil, ctx.Err() - case resp, ok = <-respCh: + case resp, ok = <-sub.Result(): if !ok { return nil, fmt.Errorf("client: channel closed unexpectedly") } @@ -74,13 +76,14 @@ func (s *service) SubmitTxMeta(ctx context.Context, request *api.SubmitTxRequest // Implements api.RuntimeClient. func (s *service) SubmitTxNoWait(ctx context.Context, request *api.SubmitTxRequest) error { - _, checkTxErr, err := s.submitTx(ctx, request) + sub, checkTxErr, err := s.submitTx(ctx, request) if err != nil { return err } if checkTxErr != nil { return errors.WithContext(api.ErrCheckTxFailed, checkTxErr.String()) } + sub.Stop() // Ensure subscription is stopped. return nil }