Skip to content

Commit

Permalink
Merge pull request #5813 from oasisprotocol/kostko/fix/rofl-submittx-…
Browse files Browse the repository at this point in the history
…timeout
  • Loading branch information
kostko authored Aug 15, 2024
2 parents 9e4d8e6 + 263c307 commit 207b0d1
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 20 deletions.
1 change: 1 addition & 0 deletions .changelog/5813.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/runtime: Add timeout for HostSubmitTx from ROFL apps
7 changes: 6 additions & 1 deletion go/runtime/host/protocol/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,12 @@ func (c *connection) workerIncoming() {
wg.Add(1)
go func() {
defer wg.Done()
c.handleMessage(ctx, &message)

// Ensure each message has its own context which is canceled at the end.
localCtx, localCancel := context.WithCancel(ctx)
defer localCancel()

c.handleMessage(localCtx, &message)
}()
}
}
Expand Down
10 changes: 8 additions & 2 deletions go/runtime/registry/host_rofl.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ const (
roflAttachRuntimeTimeout = 2 * time.Second
// roflNotifyTimeout is the maximum amount of time runtime notification handling can take.
roflNotifyTimeout = 2 * time.Second
// roflSubmitTxTimeout is the maximum amount of time that the host will wait for transaction
// inclusion into a block.
roflSubmitTxTimeout = 1 * time.Minute
// roflLocalStorageKeySeparator is the local storage key separator after component ID.
roflLocalStorageKeySeparator = ":"
)
Expand Down Expand Up @@ -178,10 +181,13 @@ func (rh *roflHostHandler) handleHostSubmitTx(
Data: rq.Data,
}

submitTxCtx, cancel := context.WithTimeout(ctx, roflSubmitTxTimeout)
defer cancel()

switch rq.Wait {
case true:
// We need to wait for transaction inclusion.
rsp, err := rh.client.SubmitTxMeta(ctx, submitRq)
rsp, err := rh.client.SubmitTxMeta(submitTxCtx, submitRq)
switch {
case err != nil:
return nil, err
Expand All @@ -201,7 +207,7 @@ func (rh *roflHostHandler) handleHostSubmitTx(
}, nil
default:
// Just submit and forget.
err := rh.client.SubmitTxNoWait(ctx, submitRq)
err := rh.client.SubmitTxNoWait(submitTxCtx, submitRq)
if err != nil {
return nil, err
}
Expand Down
91 changes: 78 additions & 13 deletions go/worker/client/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@ import (
)

type pendingTx struct {
chs map[chan *api.SubmitTxResult]struct{}
}

type wantTx struct {
txHash hash.Hash
ch chan *api.SubmitTxResult
remove bool
}

// Node is a client node.
Expand Down Expand Up @@ -88,7 +93,36 @@ func (n *Node) HandleRuntimeHostEventLocked(*host.Event) {
// Nothing to do here.
}

func (n *Node) SubmitTx(ctx context.Context, tx []byte) (<-chan *api.SubmitTxResult, *protocol.Error, error) {
// SubmitTxSubscription is a subscription to a transaction submission result.
type SubmitTxSubscription struct {
txHash hash.Hash
ch chan *api.SubmitTxResult

n *Node
}

// Result returns a channel that will receive the transaction submission result once the transaction
// has been included in a block.
func (sr *SubmitTxSubscription) Result() <-chan *api.SubmitTxResult {
return sr.ch
}

// Stop notifies the client to stop watching for the transaction submission result.
func (sr *SubmitTxSubscription) Stop() {
sr.n.txCh.In() <- &wantTx{
txHash: sr.txHash,
ch: sr.ch,
remove: true,
}
}

// SubmitTx submits the transaction to the transaction pool, waits for it to be checked and returns
// a subscription that gets a notification when the transaction is included in a block.
//
// When the caller is not interested in the transaction execution result, it should call `Stop` on
// the returned subscription. Not doing so may leak resources associated with tracking the submitted
// transaction.
func (n *Node) SubmitTx(ctx context.Context, tx []byte) (*SubmitTxSubscription, *protocol.Error, error) {
// Make sure consensus is synced.
select {
case <-n.commonNode.Consensus.Synced():
Expand All @@ -107,13 +141,19 @@ func (n *Node) SubmitTx(ctx context.Context, tx []byte) (<-chan *api.SubmitTxRes
return nil, &result.Error, nil
}

txHash := hash.NewFromBytes(tx)
ch := make(chan *api.SubmitTxResult, 1)
n.txCh.In() <- &pendingTx{
txHash: hash.NewFromBytes(tx),
n.txCh.In() <- &wantTx{
txHash: txHash,
ch: ch,
}

return ch, nil, nil
sub := &SubmitTxSubscription{
txHash: txHash,
ch: ch,
n: n,
}
return sub, nil, nil
}

func (n *Node) CheckTx(ctx context.Context, tx []byte) (*protocol.CheckTxResult, error) {
Expand Down Expand Up @@ -198,14 +238,16 @@ func (n *Node) checkBlock(ctx context.Context, blk *block.Block, pending map[has
var processed []hash.Hash
for txHash, tx := range matches {
pTx := pending[txHash]
pTx.ch <- &api.SubmitTxResult{
Result: &api.SubmitTxMetaResponse{
Round: blk.Header.Round,
BatchOrder: tx.BatchOrder,
Output: tx.Output,
},
for ch := range pTx.chs {
ch <- &api.SubmitTxResult{
Result: &api.SubmitTxMetaResponse{
Round: blk.Header.Round,
BatchOrder: tx.BatchOrder,
Output: tx.Output,
},
}
close(ch)
}
close(pTx.ch)
delete(pending, txHash)
processed = append(processed, txHash)
}
Expand Down Expand Up @@ -262,8 +304,31 @@ func (n *Node) worker() {
n.logger.Info("termination requested")
return
case rtx := <-n.txCh.Out():
tx := rtx.(*pendingTx)
pending[tx.txHash] = tx
tx := rtx.(*wantTx)
existingTx, ok := pending[tx.txHash]

switch tx.remove {
case false:
// Interest in the transaction.
if !ok {
existingTx = &pendingTx{
chs: make(map[chan *api.SubmitTxResult]struct{}),
}
pending[tx.txHash] = existingTx
}

existingTx.chs[tx.ch] = struct{}{}
case true:
// Removal of interest in the transaction.
if !ok {
continue
}

delete(existingTx.chs, tx.ch)
if len(existingTx.chs) == 0 {
delete(pending, tx.txHash)
}
}
continue
case blk := <-blkCh:
blocks = append(blocks, blk.Block)
Expand Down
11 changes: 7 additions & 4 deletions go/worker/client/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ import (
"github.com/oasisprotocol/oasis-core/go/runtime/host/protocol"
"github.com/oasisprotocol/oasis-core/go/runtime/transaction"
storage "github.com/oasisprotocol/oasis-core/go/storage/api"
"github.com/oasisprotocol/oasis-core/go/worker/client/committee"
)

type service struct {
w *Worker
}

func (s *service) submitTx(ctx context.Context, request *api.SubmitTxRequest) (<-chan *api.SubmitTxResult, *protocol.Error, error) {
func (s *service) submitTx(ctx context.Context, request *api.SubmitTxRequest) (*committee.SubmitTxSubscription, *protocol.Error, error) {
rt := s.w.runtimes[request.RuntimeID]
if rt == nil {
return nil, nil, api.ErrNoHostedRuntime
Expand All @@ -44,7 +45,7 @@ func (s *service) SubmitTx(ctx context.Context, request *api.SubmitTxRequest) ([

// Implements api.RuntimeClient.
func (s *service) SubmitTxMeta(ctx context.Context, request *api.SubmitTxRequest) (*api.SubmitTxMetaResponse, error) {
respCh, checkTxErr, err := s.submitTx(ctx, request)
sub, checkTxErr, err := s.submitTx(ctx, request)
if err != nil {
return nil, err
}
Expand All @@ -53,6 +54,7 @@ func (s *service) SubmitTxMeta(ctx context.Context, request *api.SubmitTxRequest
CheckTxError: checkTxErr,
}, nil
}
defer sub.Stop() // Ensure subscription is stopped.

// Wait for result.
for {
Expand All @@ -63,7 +65,7 @@ func (s *service) SubmitTxMeta(ctx context.Context, request *api.SubmitTxRequest
case <-ctx.Done():
// The context we're working in was canceled, abort.
return nil, ctx.Err()
case resp, ok = <-respCh:
case resp, ok = <-sub.Result():
if !ok {
return nil, fmt.Errorf("client: channel closed unexpectedly")
}
Expand All @@ -74,13 +76,14 @@ func (s *service) SubmitTxMeta(ctx context.Context, request *api.SubmitTxRequest

// Implements api.RuntimeClient.
func (s *service) SubmitTxNoWait(ctx context.Context, request *api.SubmitTxRequest) error {
_, checkTxErr, err := s.submitTx(ctx, request)
sub, checkTxErr, err := s.submitTx(ctx, request)
if err != nil {
return err
}
if checkTxErr != nil {
return errors.WithContext(api.ErrCheckTxFailed, checkTxErr.String())
}
sub.Stop() // Ensure subscription is stopped.
return nil
}

Expand Down

0 comments on commit 207b0d1

Please sign in to comment.