diff --git a/pool/interfaces.go b/pool/interfaces.go index 64e5316bd3..9dbec1540e 100644 --- a/pool/interfaces.go +++ b/pool/interfaces.go @@ -23,7 +23,7 @@ type storage interface { SetGasPrice(ctx context.Context, gasPrice uint64) error UpdateTxsStatus(ctx context.Context, hashes []string, newStatus TxStatus) error UpdateTxStatus(ctx context.Context, hash common.Hash, newStatus TxStatus) error - GetPendingTxsWithLowestNonce(ctx context.Context, limit uint64) ([]*Transaction, error) + GetTxs(ctx context.Context, filterStatus TxStatus, limit uint64) ([]*Transaction, error) } type stateInterface interface { diff --git a/pool/pgpoolstorage/pgpoolstorage.go b/pool/pgpoolstorage/pgpoolstorage.go index 9b9b22dd6d..630f174d3d 100644 --- a/pool/pgpoolstorage/pgpoolstorage.go +++ b/pool/pgpoolstorage/pgpoolstorage.go @@ -174,8 +174,8 @@ func (p *PostgresPoolStorage) GetPendingTxHashesSince(ctx context.Context, since return hashes, nil } -// GetPendingTxsWithLowestNonce gets top pending txs with the lowest nonce -func (p *PostgresPoolStorage) GetPendingTxsWithLowestNonce(ctx context.Context, limit uint64) ([]*pool.Transaction, error) { +// GetTxs gets txs with the lowest nonce +func (p *PostgresPoolStorage) GetTxs(ctx context.Context, filterStatus pool.TxStatus, limit uint64) ([]*pool.Transaction, error) { query := ` SELECT encoded, @@ -209,7 +209,7 @@ func (p *PostgresPoolStorage) GetPendingTxsWithLowestNonce(ctx context.Context, nonce uint64 ) - args := []interface{}{pool.TxStatusPending, limit} + args := []interface{}{filterStatus, limit} rows, err := p.db.Query(ctx, query, args...) if errors.Is(err, pgx.ErrNoRows) { diff --git a/pool/pool_test.go b/pool/pool_test.go index 98340c29a1..6879a33324 100644 --- a/pool/pool_test.go +++ b/pool/pool_test.go @@ -316,7 +316,7 @@ func Test_GetTopPendingTxByProfitabilityAndZkCounters(t *testing.T) { } } - txs, err := p.GetPendingTxsWithLowestNonce(ctx, 10) + txs, err := p.GetTxs(ctx, pool.TxStatusPending, 10) require.NoError(t, err) // bcs it's sorted by nonce, tx with the lowest nonce is expected here assert.Equal(t, txs[0].Transaction.Nonce(), uint64(0)) diff --git a/pool/transaction.go b/pool/transaction.go index 4a7267fdab..3f305f1941 100644 --- a/pool/transaction.go +++ b/pool/transaction.go @@ -15,8 +15,8 @@ const ( TxStatusInvalid TxStatus = "invalid" // TxStatusSelected represents a tx that has been selected TxStatusSelected TxStatus = "selected" - // TxStatusPreSelected represents a tx that has been preselected for executor processing - TxStatusPreSelected TxStatus = "preselected" + // TxStatusFailed represents a tx that has been failed after processing, but can be processed in the future + TxStatusFailed TxStatus = "failed" ) // TxStatus represents the state of a tx diff --git a/sequencer/batchbuilder.go b/sequencer/batchbuilder.go index 20ba1309e6..c7236d058f 100644 --- a/sequencer/batchbuilder.go +++ b/sequencer/batchbuilder.go @@ -20,10 +20,11 @@ import ( const maxTxsPerBatch uint64 = 150 type processTxResponse struct { - processedTxs []*state.ProcessTransactionResponse - processedTxsHashes []string - unprocessedTxs map[string]*state.ProcessTransactionResponse - isBatchProcessed bool + processedTxs []*state.ProcessTransactionResponse + processedTxsHashes []string + unprocessedTxs map[string]*state.ProcessTransactionResponse + unprocessedTxsHashes []string + isBatchProcessed bool } func (s *Sequencer) tryToProcessTx(ctx context.Context, ticker *time.Ticker) { @@ -68,11 +69,14 @@ func (s *Sequencer) tryToProcessTx(ctx context.Context, ticker *time.Ticker) { getTxsLimit := maxTxsPerBatch - uint64(len(s.sequenceInProgress.Txs)) // get txs from the pool - pendTxs, err := s.pool.GetPendingTxsWithLowestNonce(ctx, getTxsLimit) + pendTxs, err := s.pool.GetTxs(ctx, pool.TxStatusPending, getTxsLimit) if err == pgpoolstorage.ErrNotFound || len(pendTxs) == 0 { - log.Info("there is no suitable pending tx in the pool, waiting...") - waitTick(ctx, ticker) - return + pendTxs, err = s.pool.GetTxs(ctx, pool.TxStatusFailed, getTxsLimit) + if err == pgpoolstorage.ErrNotFound || len(pendTxs) == 0 { + log.Info("there is no suitable pending or failed txs in the pool, waiting...") + waitTick(ctx, ticker) + return + } } else if err != nil { log.Errorf("failed to get pending tx, err: %w", err) return @@ -133,11 +137,17 @@ func (s *Sequencer) tryToProcessTx(ctx context.Context, ticker *time.Ticker) { log.Infof("%d txs stored and added into the trusted state", len(processResponse.processedTxs)) // update processed txs - err = s.pool.UpdateTxsStatus(ctx, processResponse.processedTxsHashes, pool.TxStatusSelected) + s.updateTxsStatus(ctx, ticker, processResponse.processedTxsHashes, pool.TxStatusSelected) + // update unprocessed txs + s.updateTxsStatus(ctx, ticker, processResponse.unprocessedTxsHashes, pool.TxStatusFailed) +} + +func (s *Sequencer) updateTxsStatus(ctx context.Context, ticker *time.Ticker, hashes []string, status pool.TxStatus) { + err := s.pool.UpdateTxsStatus(ctx, hashes, status) for err != nil { - log.Errorf("failed to update txs state to selected, err: %w", err) + log.Errorf("failed to update txs status to %s, err: %w", status, err) waitTick(ctx, ticker) - err = s.pool.UpdateTxsStatus(ctx, processResponse.processedTxsHashes, pool.TxStatusSelected) + err = s.pool.UpdateTxsStatus(ctx, hashes, status) } } @@ -250,13 +260,14 @@ func (s *Sequencer) processTxs(ctx context.Context) (processTxResponse, error) { s.sequenceInProgress.StateRoot = processBatchResp.NewStateRoot s.sequenceInProgress.LocalExitRoot = processBatchResp.NewLocalExitRoot - processedTxs, processedTxsHashes, unprocessedTxs := state.DetermineProcessedTransactions(processBatchResp.Responses) + processedTxs, processedTxsHashes, unprocessedTxs, unprocessedTxsHashes := state.DetermineProcessedTransactions(processBatchResp.Responses) response := processTxResponse{ - processedTxs: processedTxs, - processedTxsHashes: processedTxsHashes, - unprocessedTxs: unprocessedTxs, - isBatchProcessed: processBatchResp.IsBatchProcessed, + processedTxs: processedTxs, + processedTxsHashes: processedTxsHashes, + unprocessedTxs: unprocessedTxs, + unprocessedTxsHashes: unprocessedTxsHashes, + isBatchProcessed: processBatchResp.IsBatchProcessed, } return response, nil diff --git a/sequencer/interfaces.go b/sequencer/interfaces.go index e2505a7fa7..d9d7ef87e7 100644 --- a/sequencer/interfaces.go +++ b/sequencer/interfaces.go @@ -23,7 +23,7 @@ type txPool interface { IsTxPending(ctx context.Context, hash common.Hash) (bool, error) DeleteTxsByHashes(ctx context.Context, hashes []common.Hash) error MarkReorgedTxsAsPending(ctx context.Context) error - GetPendingTxsWithLowestNonce(ctx context.Context, limit uint64) ([]*pool.Transaction, error) + GetTxs(ctx context.Context, filterStatus pool.TxStatus, limit uint64) ([]*pool.Transaction, error) } // etherman contains the methods required to interact with ethereum. diff --git a/state/state.go b/state/state.go index 7ef17b0456..f6e4fc6086 100644 --- a/state/state.go +++ b/state/state.go @@ -1028,10 +1028,11 @@ func (s *State) isContractCreation(tx *types.Transaction) bool { // returning a slice with only processed and a map unprocessed txs // respectively. func DetermineProcessedTransactions(responses []*ProcessTransactionResponse) ( - []*ProcessTransactionResponse, []string, map[string]*ProcessTransactionResponse) { + []*ProcessTransactionResponse, []string, map[string]*ProcessTransactionResponse, []string) { processedTxResponses := []*ProcessTransactionResponse{} processedTxsHashes := []string{} unprocessedTxResponses := map[string]*ProcessTransactionResponse{} + unprocessedTxsHashes := []string{} for _, response := range responses { if response.IsProcessed { processedTxResponses = append(processedTxResponses, response) @@ -1039,7 +1040,8 @@ func DetermineProcessedTransactions(responses []*ProcessTransactionResponse) ( } else { log.Infof("Tx %s has not been processed", response.TxHash) unprocessedTxResponses[response.TxHash.String()] = response + unprocessedTxsHashes = append(unprocessedTxsHashes, response.TxHash.String()) } } - return processedTxResponses, processedTxsHashes, unprocessedTxResponses + return processedTxResponses, processedTxsHashes, unprocessedTxResponses, unprocessedTxsHashes } diff --git a/state/state_test.go b/state/state_test.go index 721cf2be7d..f1c2b2f522 100644 --- a/state/state_test.go +++ b/state/state_test.go @@ -852,7 +852,7 @@ func TestDetermineProcessedTransactions(t *testing.T) { for _, tc := range tcs { tc := tc t.Run(tc.description, func(t *testing.T) { - actualProcessedTx, _, actualUnprocessedTxs := state.DetermineProcessedTransactions(tc.input) + actualProcessedTx, _, actualUnprocessedTxs, _ := state.DetermineProcessedTransactions(tc.input) require.Equal(t, tc.expectedProcessedOutput, actualProcessedTx) require.Equal(t, tc.expectedUnprocessedOutput, actualUnprocessedTxs) })