Skip to content

Commit

Permalink
Merge pull request #5589 from oasisprotocol/andrej/feature/runtime-ge…
Browse files Browse the repository at this point in the history
…t-unconfirmed-tx-rpc

go/runtime/client: Add the GetUnconfirmedTransactions method
  • Loading branch information
abukosek authored Mar 18, 2024
2 parents 68c95bc + e901e77 commit 714162d
Show file tree
Hide file tree
Showing 10 changed files with 117 additions and 13 deletions.
5 changes: 5 additions & 0 deletions .changelog/5589.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
go/runtime/client: Add the GetUnconfirmedTransactions method

Similarly to GetUnconfirmedTransactions in the consensus API, this
new method returns the currently pending runtime transactions from
the runtime transaction pool.
4 changes: 4 additions & 0 deletions go/runtime/client/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ type RuntimeClient interface {
// its results (outputs and emitted events).
GetTransactionsWithResults(ctx context.Context, request *GetTransactionsRequest) ([]*TransactionWithResults, error)

// GetUnconfirmedTransactions fetches all unconfirmed runtime transactions
// that are currently pending to be included in a block.
GetUnconfirmedTransactions(ctx context.Context, runtimeID common.Namespace) ([][]byte, error)

// GetEvents returns all events emitted in a given block.
GetEvents(ctx context.Context, request *GetEventsRequest) ([]*Event, error)

Expand Down
37 changes: 37 additions & 0 deletions go/runtime/client/api/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ var (
methodGetTransactions = serviceName.NewMethod("GetTransactions", GetTransactionsRequest{})
// methodGetTransactionsWithResults is the GetTransactionsWithResults method.
methodGetTransactionsWithResults = serviceName.NewMethod("GetTransactionsWithResults", GetTransactionsRequest{})
// methodGetUnconfirmedTransactions is the GetUnconfirmedTransactions method.
methodGetUnconfirmedTransactions = serviceName.NewMethod("GetUnconfirmedTransactions", common.Namespace{})
// methodGetEvents is the GetEvents method.
methodGetEvents = serviceName.NewMethod("GetEvents", GetEventsRequest{})
// methodQuery is the Query method.
Expand Down Expand Up @@ -86,6 +88,10 @@ var (
MethodName: methodGetTransactionsWithResults.ShortName(),
Handler: handlerGetTransactionsWithResults,
},
{
MethodName: methodGetUnconfirmedTransactions.ShortName(),
Handler: handlerGetUnconfirmedTransactions,
},
{
MethodName: methodGetEvents.ShortName(),
Handler: handlerGetEvents,
Expand Down Expand Up @@ -345,6 +351,29 @@ func handlerGetTransactionsWithResults(
return interceptor(ctx, &rq, info, handler)
}

func handlerGetUnconfirmedTransactions(
srv interface{},
ctx context.Context,
dec func(interface{}) error,
interceptor grpc.UnaryServerInterceptor,
) (interface{}, error) {
var runtimeID common.Namespace
if err := dec(&runtimeID); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(RuntimeClient).GetUnconfirmedTransactions(ctx, runtimeID)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: methodGetUnconfirmedTransactions.FullName(),
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(RuntimeClient).GetUnconfirmedTransactions(ctx, req.(common.Namespace))
}
return interceptor(ctx, runtimeID, info, handler)
}

func handlerGetEvents(
srv interface{},
ctx context.Context,
Expand Down Expand Up @@ -495,6 +524,14 @@ func (c *runtimeClient) GetTransactionsWithResults(ctx context.Context, request
return rsp, nil
}

func (c *runtimeClient) GetUnconfirmedTransactions(ctx context.Context, runtimeID common.Namespace) ([][]byte, error) {
var rsp [][]byte
if err := c.conn.Invoke(ctx, methodGetUnconfirmedTransactions.FullName(), runtimeID, &rsp); err != nil {
return nil, err
}
return rsp, nil
}

func (c *runtimeClient) GetEvents(ctx context.Context, request *GetEventsRequest) ([]*Event, error) {
var rsp []*Event
if err := c.conn.Invoke(ctx, methodGetEvents.FullName(), request, &rsp); err != nil {
Expand Down
7 changes: 6 additions & 1 deletion go/runtime/client/tests/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ func testSubmitTransaction(
c api.RuntimeClient,
input string,
) {
testInput := []byte(input)
// Submit a test transaction.
testInput := []byte(input)
resp, err := c.SubmitTxMeta(ctx, &api.SubmitTxRequest{Data: testInput, RuntimeID: runtimeID})

// Check if everything is in order.
Expand Down Expand Up @@ -229,6 +229,11 @@ func testQuery(
Data: []byte("test checktx request"),
})
require.NoError(t, err, "CheckTx")

// Get the number of unconfirmed transactions.
utxs, err := c.GetUnconfirmedTransactions(ctx, runtimeID)
require.NoError(t, err, "GetUnconfirmedTransactions")
require.True(t, len(utxs) == 0)
}

func testSubmitTransactionNoWait(
Expand Down
10 changes: 7 additions & 3 deletions go/runtime/txpool/local_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ func (lq *localQueue) HandleTxsUsed(hashes []hash.Hash) {
lq.txs = keptTxs
}

func (lq *localQueue) PeekAll() []*TxQueueMeta {
lq.l.Lock()
defer lq.l.Unlock()
return append(make([]*TxQueueMeta, 0, len(lq.txs)), lq.txs...)
}

func (lq *localQueue) TakeAll() []*TxQueueMeta {
lq.l.Lock()
defer lq.l.Unlock()
Expand All @@ -87,9 +93,7 @@ func (lq *localQueue) OfferChecked(tx *TxQueueMeta, _ *protocol.CheckTxMetadata)
}

func (lq *localQueue) GetTxsToPublish() []*TxQueueMeta {
lq.l.Lock()
defer lq.l.Unlock()
return append([]*TxQueueMeta(nil), lq.txs...)
return lq.PeekAll()
}

func (lq *localQueue) size() int {
Expand Down
22 changes: 13 additions & 9 deletions go/runtime/txpool/main_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func newMainQueue(capacity int) *mainQueue {

func (mq *mainQueue) GetSchedulingSuggestion(countHint uint32) []*TxQueueMeta {
txMetas := mq.inner.getPrioritizedBatch(nil, countHint)
var txs []*TxQueueMeta
txs := make([]*TxQueueMeta, 0, len(txMetas))
for _, txMeta := range txMetas {
txs = append(txs, &txMeta.TxQueueMeta) //nolint:gosec
}
Expand All @@ -107,17 +107,26 @@ func (mq *mainQueue) HandleTxsUsed(hashes []hash.Hash) {

func (mq *mainQueue) GetSchedulingExtra(offset *hash.Hash, limit uint32) []*TxQueueMeta {
txMetas := mq.inner.getPrioritizedBatch(offset, limit)
var txs []*TxQueueMeta
txs := make([]*TxQueueMeta, 0, len(txMetas))
for _, txMeta := range txMetas {
txs = append(txs, &txMeta.TxQueueMeta) //nolint:gosec
}
return txs
}

func (mq *mainQueue) PeekAll() []*TxQueueMeta {
allTxs := mq.inner.getAll()
txs := make([]*TxQueueMeta, 0, len(allTxs))
for _, tx := range allTxs {
txs = append(txs, &tx.TxQueueMeta) //nolint:gosec
}
return txs
}

func (mq *mainQueue) TakeAll() []*TxQueueMeta {
txMetas := mq.inner.getAll()
mq.inner.clear()
var txs []*TxQueueMeta
txs := make([]*TxQueueMeta, 0, len(txMetas))
for _, txMeta := range txMetas {
txs = append(txs, &txMeta.TxQueueMeta) //nolint:gosec
}
Expand All @@ -132,10 +141,5 @@ func (mq *mainQueue) OfferChecked(tx *TxQueueMeta, meta *protocol.CheckTxMetadat
}

func (mq *mainQueue) GetTxsToPublish() []*TxQueueMeta {
txMetas := mq.inner.getAll()
var txs []*TxQueueMeta
for _, txMeta := range txMetas {
txs = append(txs, &txMeta.TxQueueMeta) //nolint:gosec
}
return txs
return mq.PeekAll()
}
2 changes: 2 additions & 0 deletions go/runtime/txpool/queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type UsableTransactionSource interface {
// HandleTxsUsed is a callback to indicate that the scheduler is done with a set of txs, by hash. For most
// implementations, remove it from internal storage.
HandleTxsUsed(hashes []hash.Hash)
// PeekAll returns all transactions without removing them.
PeekAll() []*TxQueueMeta
}

// RecheckableTransactionStore provides methods for rechecking.
Expand Down
11 changes: 11 additions & 0 deletions go/runtime/txpool/rim_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,17 @@ func (rq *rimQueue) HandleTxsUsed([]hash.Hash) {
// The roothash module manages the incoming message queue on its own, so we don't do anything here.
}

func (rq *rimQueue) PeekAll() []*TxQueueMeta {
rq.l.RLock()
defer rq.l.RUnlock()

txs := make([]*TxQueueMeta, 0, len(rq.txs))
for _, tx := range rq.txs {
txs = append(txs, tx)
}
return txs
}

// Load loads transactions from roothash incoming messages.
func (rq *rimQueue) Load(inMsgs []*message.IncomingMessage) {
newTxs := map[hash.Hash]*TxQueueMeta{}
Expand Down
14 changes: 14 additions & 0 deletions go/runtime/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ type TransactionPool interface {

// PendingCheckSize returns the number of transactions currently pending to be checked.
PendingCheckSize() int

// GetTxs returns all transactions currently queued in the transaction pool.
GetTxs() []*TxQueueMeta
}

// RuntimeHostProvisioner is a runtime host provisioner.
Expand Down Expand Up @@ -425,6 +428,17 @@ func (t *txPool) PendingCheckSize() int {
return t.checkTxQueue.size()
}

func (t *txPool) GetTxs() []*TxQueueMeta {
t.drainLock.Lock()
defer t.drainLock.Unlock()

var txs []*TxQueueMeta
for _, q := range t.usableSources {
txs = append(txs, q.PeekAll()...)
}
return txs
}

func (t *txPool) getCurrentBlockInfo() (*runtime.BlockInfo, time.Time, error) {
t.blockInfoLock.Lock()
defer t.blockInfoLock.Unlock()
Expand Down
18 changes: 18 additions & 0 deletions go/worker/client/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,24 @@ func (s *service) GetTransactionsWithResults(ctx context.Context, request *api.G
return results, nil
}

// Implements api.RuntimeClient.
func (s *service) GetUnconfirmedTransactions(_ context.Context, runtimeID common.Namespace) ([][]byte, error) {
rt := s.w.commonWorker.GetRuntime(runtimeID)
if rt == nil {
return nil, api.ErrNotFound
}

// Get currently pending transactions from the runtime's transaction pool.
pendingTxs := rt.TxPool.GetTxs()

// Copy the raw transactions to the output slice.
out := make([][]byte, 0, len(pendingTxs))
for _, tx := range pendingTxs {
out = append(out, tx.Raw())
}
return out, nil
}

// Implements api.RuntimeClient.
func (s *service) GetEvents(ctx context.Context, request *api.GetEventsRequest) ([]*api.Event, error) {
rt, err := s.w.commonWorker.RuntimeRegistry.GetRuntime(request.RuntimeID)
Expand Down

0 comments on commit 714162d

Please sign in to comment.