diff --git a/.changelog/5589.feature.md b/.changelog/5589.feature.md new file mode 100644 index 00000000000..9373b5e8374 --- /dev/null +++ b/.changelog/5589.feature.md @@ -0,0 +1,5 @@ +go/runtime/client: Add the GetUnconfirmedTransactions method + +Similarly to GetUnconfirmedTransactions in the consensus API, this +new method returns the currently pending runtime transactions from +the runtime transaction pool. diff --git a/go/runtime/client/api/api.go b/go/runtime/client/api/api.go index ebc56458b55..ba40cc7a8c5 100644 --- a/go/runtime/client/api/api.go +++ b/go/runtime/client/api/api.go @@ -72,6 +72,10 @@ type RuntimeClient interface { // its results (outputs and emitted events). GetTransactionsWithResults(ctx context.Context, request *GetTransactionsRequest) ([]*TransactionWithResults, error) + // GetUnconfirmedTransactions fetches all unconfirmed runtime transactions + // that are currently pending to be included in a block. + GetUnconfirmedTransactions(ctx context.Context, runtimeID common.Namespace) ([][]byte, error) + // GetEvents returns all events emitted in a given block. GetEvents(ctx context.Context, request *GetEventsRequest) ([]*Event, error) diff --git a/go/runtime/client/api/grpc.go b/go/runtime/client/api/grpc.go index aec4359026b..7d8af5b2d6c 100644 --- a/go/runtime/client/api/grpc.go +++ b/go/runtime/client/api/grpc.go @@ -37,6 +37,8 @@ var ( methodGetTransactions = serviceName.NewMethod("GetTransactions", GetTransactionsRequest{}) // methodGetTransactionsWithResults is the GetTransactionsWithResults method. methodGetTransactionsWithResults = serviceName.NewMethod("GetTransactionsWithResults", GetTransactionsRequest{}) + // methodGetUnconfirmedTransactions is the GetUnconfirmedTransactions method. + methodGetUnconfirmedTransactions = serviceName.NewMethod("GetUnconfirmedTransactions", common.Namespace{}) // methodGetEvents is the GetEvents method. methodGetEvents = serviceName.NewMethod("GetEvents", GetEventsRequest{}) // methodQuery is the Query method. @@ -86,6 +88,10 @@ var ( MethodName: methodGetTransactionsWithResults.ShortName(), Handler: handlerGetTransactionsWithResults, }, + { + MethodName: methodGetUnconfirmedTransactions.ShortName(), + Handler: handlerGetUnconfirmedTransactions, + }, { MethodName: methodGetEvents.ShortName(), Handler: handlerGetEvents, @@ -345,6 +351,29 @@ func handlerGetTransactionsWithResults( return interceptor(ctx, &rq, info, handler) } +func handlerGetUnconfirmedTransactions( + srv interface{}, + ctx context.Context, + dec func(interface{}) error, + interceptor grpc.UnaryServerInterceptor, +) (interface{}, error) { + var runtimeID common.Namespace + if err := dec(&runtimeID); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(RuntimeClient).GetUnconfirmedTransactions(ctx, runtimeID) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: methodGetUnconfirmedTransactions.FullName(), + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(RuntimeClient).GetUnconfirmedTransactions(ctx, req.(common.Namespace)) + } + return interceptor(ctx, runtimeID, info, handler) +} + func handlerGetEvents( srv interface{}, ctx context.Context, @@ -495,6 +524,14 @@ func (c *runtimeClient) GetTransactionsWithResults(ctx context.Context, request return rsp, nil } +func (c *runtimeClient) GetUnconfirmedTransactions(ctx context.Context, runtimeID common.Namespace) ([][]byte, error) { + var rsp [][]byte + if err := c.conn.Invoke(ctx, methodGetUnconfirmedTransactions.FullName(), runtimeID, &rsp); err != nil { + return nil, err + } + return rsp, nil +} + func (c *runtimeClient) GetEvents(ctx context.Context, request *GetEventsRequest) ([]*Event, error) { var rsp []*Event if err := c.conn.Invoke(ctx, methodGetEvents.FullName(), request, &rsp); err != nil { diff --git a/go/runtime/client/tests/tester.go b/go/runtime/client/tests/tester.go index 4b6e38a9f9b..4a4b2c1d69f 100644 --- a/go/runtime/client/tests/tester.go +++ b/go/runtime/client/tests/tester.go @@ -60,8 +60,8 @@ func testSubmitTransaction( c api.RuntimeClient, input string, ) { - testInput := []byte(input) // Submit a test transaction. + testInput := []byte(input) resp, err := c.SubmitTxMeta(ctx, &api.SubmitTxRequest{Data: testInput, RuntimeID: runtimeID}) // Check if everything is in order. @@ -229,6 +229,11 @@ func testQuery( Data: []byte("test checktx request"), }) require.NoError(t, err, "CheckTx") + + // Get the number of unconfirmed transactions. + utxs, err := c.GetUnconfirmedTransactions(ctx, runtimeID) + require.NoError(t, err, "GetUnconfirmedTransactions") + require.True(t, len(utxs) == 0) } func testSubmitTransactionNoWait( diff --git a/go/runtime/txpool/local_queue.go b/go/runtime/txpool/local_queue.go index 2b1caedb468..beb80b3be7e 100644 --- a/go/runtime/txpool/local_queue.go +++ b/go/runtime/txpool/local_queue.go @@ -69,6 +69,12 @@ func (lq *localQueue) HandleTxsUsed(hashes []hash.Hash) { lq.txs = keptTxs } +func (lq *localQueue) PeekAll() []*TxQueueMeta { + lq.l.Lock() + defer lq.l.Unlock() + return append(make([]*TxQueueMeta, 0, len(lq.txs)), lq.txs...) +} + func (lq *localQueue) TakeAll() []*TxQueueMeta { lq.l.Lock() defer lq.l.Unlock() @@ -87,9 +93,7 @@ func (lq *localQueue) OfferChecked(tx *TxQueueMeta, _ *protocol.CheckTxMetadata) } func (lq *localQueue) GetTxsToPublish() []*TxQueueMeta { - lq.l.Lock() - defer lq.l.Unlock() - return append([]*TxQueueMeta(nil), lq.txs...) + return lq.PeekAll() } func (lq *localQueue) size() int { diff --git a/go/runtime/txpool/main_queue.go b/go/runtime/txpool/main_queue.go index 76bf84908ee..f1d7ce14cef 100644 --- a/go/runtime/txpool/main_queue.go +++ b/go/runtime/txpool/main_queue.go @@ -86,7 +86,7 @@ func newMainQueue(capacity int) *mainQueue { func (mq *mainQueue) GetSchedulingSuggestion(countHint uint32) []*TxQueueMeta { txMetas := mq.inner.getPrioritizedBatch(nil, countHint) - var txs []*TxQueueMeta + txs := make([]*TxQueueMeta, 0, len(txMetas)) for _, txMeta := range txMetas { txs = append(txs, &txMeta.TxQueueMeta) //nolint:gosec } @@ -107,17 +107,26 @@ func (mq *mainQueue) HandleTxsUsed(hashes []hash.Hash) { func (mq *mainQueue) GetSchedulingExtra(offset *hash.Hash, limit uint32) []*TxQueueMeta { txMetas := mq.inner.getPrioritizedBatch(offset, limit) - var txs []*TxQueueMeta + txs := make([]*TxQueueMeta, 0, len(txMetas)) for _, txMeta := range txMetas { txs = append(txs, &txMeta.TxQueueMeta) //nolint:gosec } return txs } +func (mq *mainQueue) PeekAll() []*TxQueueMeta { + allTxs := mq.inner.getAll() + txs := make([]*TxQueueMeta, 0, len(allTxs)) + for _, tx := range allTxs { + txs = append(txs, &tx.TxQueueMeta) //nolint:gosec + } + return txs +} + func (mq *mainQueue) TakeAll() []*TxQueueMeta { txMetas := mq.inner.getAll() mq.inner.clear() - var txs []*TxQueueMeta + txs := make([]*TxQueueMeta, 0, len(txMetas)) for _, txMeta := range txMetas { txs = append(txs, &txMeta.TxQueueMeta) //nolint:gosec } @@ -132,10 +141,5 @@ func (mq *mainQueue) OfferChecked(tx *TxQueueMeta, meta *protocol.CheckTxMetadat } func (mq *mainQueue) GetTxsToPublish() []*TxQueueMeta { - txMetas := mq.inner.getAll() - var txs []*TxQueueMeta - for _, txMeta := range txMetas { - txs = append(txs, &txMeta.TxQueueMeta) //nolint:gosec - } - return txs + return mq.PeekAll() } diff --git a/go/runtime/txpool/queues.go b/go/runtime/txpool/queues.go index 2b9fb44208b..7bf10ec9508 100644 --- a/go/runtime/txpool/queues.go +++ b/go/runtime/txpool/queues.go @@ -50,6 +50,8 @@ type UsableTransactionSource interface { // HandleTxsUsed is a callback to indicate that the scheduler is done with a set of txs, by hash. For most // implementations, remove it from internal storage. HandleTxsUsed(hashes []hash.Hash) + // PeekAll returns all transactions without removing them. + PeekAll() []*TxQueueMeta } // RecheckableTransactionStore provides methods for rechecking. diff --git a/go/runtime/txpool/rim_queue.go b/go/runtime/txpool/rim_queue.go index 5ddce0eebee..18683cbe72e 100644 --- a/go/runtime/txpool/rim_queue.go +++ b/go/runtime/txpool/rim_queue.go @@ -36,6 +36,17 @@ func (rq *rimQueue) HandleTxsUsed([]hash.Hash) { // The roothash module manages the incoming message queue on its own, so we don't do anything here. } +func (rq *rimQueue) PeekAll() []*TxQueueMeta { + rq.l.RLock() + defer rq.l.RUnlock() + + txs := make([]*TxQueueMeta, 0, len(rq.txs)) + for _, tx := range rq.txs { + txs = append(txs, tx) + } + return txs +} + // Load loads transactions from roothash incoming messages. func (rq *rimQueue) Load(inMsgs []*message.IncomingMessage) { newTxs := map[hash.Hash]*TxQueueMeta{} diff --git a/go/runtime/txpool/txpool.go b/go/runtime/txpool/txpool.go index b7c81a19abd..4cbd9a48228 100644 --- a/go/runtime/txpool/txpool.go +++ b/go/runtime/txpool/txpool.go @@ -126,6 +126,9 @@ type TransactionPool interface { // PendingCheckSize returns the number of transactions currently pending to be checked. PendingCheckSize() int + + // GetTxs returns all transactions currently queued in the transaction pool. + GetTxs() []*TxQueueMeta } // RuntimeHostProvisioner is a runtime host provisioner. @@ -425,6 +428,17 @@ func (t *txPool) PendingCheckSize() int { return t.checkTxQueue.size() } +func (t *txPool) GetTxs() []*TxQueueMeta { + t.drainLock.Lock() + defer t.drainLock.Unlock() + + var txs []*TxQueueMeta + for _, q := range t.usableSources { + txs = append(txs, q.PeekAll()...) + } + return txs +} + func (t *txPool) getCurrentBlockInfo() (*runtime.BlockInfo, time.Time, error) { t.blockInfoLock.Lock() defer t.blockInfoLock.Unlock() diff --git a/go/worker/client/service.go b/go/worker/client/service.go index bc776d14f82..0a849848c82 100644 --- a/go/worker/client/service.go +++ b/go/worker/client/service.go @@ -238,6 +238,24 @@ func (s *service) GetTransactionsWithResults(ctx context.Context, request *api.G return results, nil } +// Implements api.RuntimeClient. +func (s *service) GetUnconfirmedTransactions(_ context.Context, runtimeID common.Namespace) ([][]byte, error) { + rt := s.w.commonWorker.GetRuntime(runtimeID) + if rt == nil { + return nil, api.ErrNotFound + } + + // Get currently pending transactions from the runtime's transaction pool. + pendingTxs := rt.TxPool.GetTxs() + + // Copy the raw transactions to the output slice. + out := make([][]byte, 0, len(pendingTxs)) + for _, tx := range pendingTxs { + out = append(out, tx.Raw()) + } + return out, nil +} + // Implements api.RuntimeClient. func (s *service) GetEvents(ctx context.Context, request *api.GetEventsRequest) ([]*api.Event, error) { rt, err := s.w.commonWorker.RuntimeRegistry.GetRuntime(request.RuntimeID)