Skip to content

Commit

Permalink
added filtering of the pending txs (0xPolygonHermez#1151)
Browse files Browse the repository at this point in the history
* added filtering of the pending txs

* fixed state_test.go

* Discard TX that generates an OOC error if it is the first one in a batch

* deleted Discarding of TX that generates an OOC error if it is the first one in a batch
  • Loading branch information
Mikelle authored Sep 15, 2022
1 parent c839cab commit 75a5cb6
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 27 deletions.
2 changes: 1 addition & 1 deletion pool/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type storage interface {
SetGasPrice(ctx context.Context, gasPrice uint64) error
UpdateTxsStatus(ctx context.Context, hashes []string, newStatus TxStatus) error
UpdateTxStatus(ctx context.Context, hash common.Hash, newStatus TxStatus) error
GetPendingTxsWithLowestNonce(ctx context.Context, limit uint64) ([]*Transaction, error)
GetTxs(ctx context.Context, filterStatus TxStatus, limit uint64) ([]*Transaction, error)
}

type stateInterface interface {
Expand Down
6 changes: 3 additions & 3 deletions pool/pgpoolstorage/pgpoolstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ func (p *PostgresPoolStorage) GetPendingTxHashesSince(ctx context.Context, since
return hashes, nil
}

// GetPendingTxsWithLowestNonce gets top pending txs with the lowest nonce
func (p *PostgresPoolStorage) GetPendingTxsWithLowestNonce(ctx context.Context, limit uint64) ([]*pool.Transaction, error) {
// GetTxs gets txs with the lowest nonce
func (p *PostgresPoolStorage) GetTxs(ctx context.Context, filterStatus pool.TxStatus, limit uint64) ([]*pool.Transaction, error) {
query := `
SELECT
encoded,
Expand Down Expand Up @@ -209,7 +209,7 @@ func (p *PostgresPoolStorage) GetPendingTxsWithLowestNonce(ctx context.Context,
nonce uint64
)

args := []interface{}{pool.TxStatusPending, limit}
args := []interface{}{filterStatus, limit}

rows, err := p.db.Query(ctx, query, args...)
if errors.Is(err, pgx.ErrNoRows) {
Expand Down
2 changes: 1 addition & 1 deletion pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func Test_GetTopPendingTxByProfitabilityAndZkCounters(t *testing.T) {
}
}

txs, err := p.GetPendingTxsWithLowestNonce(ctx, 10)
txs, err := p.GetTxs(ctx, pool.TxStatusPending, 10)
require.NoError(t, err)
// bcs it's sorted by nonce, tx with the lowest nonce is expected here
assert.Equal(t, txs[0].Transaction.Nonce(), uint64(0))
Expand Down
4 changes: 2 additions & 2 deletions pool/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ const (
TxStatusInvalid TxStatus = "invalid"
// TxStatusSelected represents a tx that has been selected
TxStatusSelected TxStatus = "selected"
// TxStatusPreSelected represents a tx that has been preselected for executor processing
TxStatusPreSelected TxStatus = "preselected"
// TxStatusFailed represents a tx that has been failed after processing, but can be processed in the future
TxStatusFailed TxStatus = "failed"
)

// TxStatus represents the state of a tx
Expand Down
43 changes: 27 additions & 16 deletions sequencer/batchbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ import (
const maxTxsPerBatch uint64 = 150

type processTxResponse struct {
processedTxs []*state.ProcessTransactionResponse
processedTxsHashes []string
unprocessedTxs map[string]*state.ProcessTransactionResponse
isBatchProcessed bool
processedTxs []*state.ProcessTransactionResponse
processedTxsHashes []string
unprocessedTxs map[string]*state.ProcessTransactionResponse
unprocessedTxsHashes []string
isBatchProcessed bool
}

func (s *Sequencer) tryToProcessTx(ctx context.Context, ticker *time.Ticker) {
Expand Down Expand Up @@ -68,11 +69,14 @@ func (s *Sequencer) tryToProcessTx(ctx context.Context, ticker *time.Ticker) {
getTxsLimit := maxTxsPerBatch - uint64(len(s.sequenceInProgress.Txs))

// get txs from the pool
pendTxs, err := s.pool.GetPendingTxsWithLowestNonce(ctx, getTxsLimit)
pendTxs, err := s.pool.GetTxs(ctx, pool.TxStatusPending, getTxsLimit)
if err == pgpoolstorage.ErrNotFound || len(pendTxs) == 0 {
log.Info("there is no suitable pending tx in the pool, waiting...")
waitTick(ctx, ticker)
return
pendTxs, err = s.pool.GetTxs(ctx, pool.TxStatusFailed, getTxsLimit)
if err == pgpoolstorage.ErrNotFound || len(pendTxs) == 0 {
log.Info("there is no suitable pending or failed txs in the pool, waiting...")
waitTick(ctx, ticker)
return
}
} else if err != nil {
log.Errorf("failed to get pending tx, err: %w", err)
return
Expand Down Expand Up @@ -133,11 +137,17 @@ func (s *Sequencer) tryToProcessTx(ctx context.Context, ticker *time.Ticker) {
log.Infof("%d txs stored and added into the trusted state", len(processResponse.processedTxs))

// update processed txs
err = s.pool.UpdateTxsStatus(ctx, processResponse.processedTxsHashes, pool.TxStatusSelected)
s.updateTxsStatus(ctx, ticker, processResponse.processedTxsHashes, pool.TxStatusSelected)
// update unprocessed txs
s.updateTxsStatus(ctx, ticker, processResponse.unprocessedTxsHashes, pool.TxStatusFailed)
}

func (s *Sequencer) updateTxsStatus(ctx context.Context, ticker *time.Ticker, hashes []string, status pool.TxStatus) {
err := s.pool.UpdateTxsStatus(ctx, hashes, status)
for err != nil {
log.Errorf("failed to update txs state to selected, err: %w", err)
log.Errorf("failed to update txs status to %s, err: %w", status, err)
waitTick(ctx, ticker)
err = s.pool.UpdateTxsStatus(ctx, processResponse.processedTxsHashes, pool.TxStatusSelected)
err = s.pool.UpdateTxsStatus(ctx, hashes, status)
}
}

Expand Down Expand Up @@ -250,13 +260,14 @@ func (s *Sequencer) processTxs(ctx context.Context) (processTxResponse, error) {
s.sequenceInProgress.StateRoot = processBatchResp.NewStateRoot
s.sequenceInProgress.LocalExitRoot = processBatchResp.NewLocalExitRoot

processedTxs, processedTxsHashes, unprocessedTxs := state.DetermineProcessedTransactions(processBatchResp.Responses)
processedTxs, processedTxsHashes, unprocessedTxs, unprocessedTxsHashes := state.DetermineProcessedTransactions(processBatchResp.Responses)

response := processTxResponse{
processedTxs: processedTxs,
processedTxsHashes: processedTxsHashes,
unprocessedTxs: unprocessedTxs,
isBatchProcessed: processBatchResp.IsBatchProcessed,
processedTxs: processedTxs,
processedTxsHashes: processedTxsHashes,
unprocessedTxs: unprocessedTxs,
unprocessedTxsHashes: unprocessedTxsHashes,
isBatchProcessed: processBatchResp.IsBatchProcessed,
}

return response, nil
Expand Down
2 changes: 1 addition & 1 deletion sequencer/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type txPool interface {
IsTxPending(ctx context.Context, hash common.Hash) (bool, error)
DeleteTxsByHashes(ctx context.Context, hashes []common.Hash) error
MarkReorgedTxsAsPending(ctx context.Context) error
GetPendingTxsWithLowestNonce(ctx context.Context, limit uint64) ([]*pool.Transaction, error)
GetTxs(ctx context.Context, filterStatus pool.TxStatus, limit uint64) ([]*pool.Transaction, error)
}

// etherman contains the methods required to interact with ethereum.
Expand Down
6 changes: 4 additions & 2 deletions state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -1028,18 +1028,20 @@ func (s *State) isContractCreation(tx *types.Transaction) bool {
// returning a slice with only processed and a map unprocessed txs
// respectively.
func DetermineProcessedTransactions(responses []*ProcessTransactionResponse) (
[]*ProcessTransactionResponse, []string, map[string]*ProcessTransactionResponse) {
[]*ProcessTransactionResponse, []string, map[string]*ProcessTransactionResponse, []string) {
processedTxResponses := []*ProcessTransactionResponse{}
processedTxsHashes := []string{}
unprocessedTxResponses := map[string]*ProcessTransactionResponse{}
unprocessedTxsHashes := []string{}
for _, response := range responses {
if response.IsProcessed {
processedTxResponses = append(processedTxResponses, response)
processedTxsHashes = append(processedTxsHashes, response.TxHash.String())
} else {
log.Infof("Tx %s has not been processed", response.TxHash)
unprocessedTxResponses[response.TxHash.String()] = response
unprocessedTxsHashes = append(unprocessedTxsHashes, response.TxHash.String())
}
}
return processedTxResponses, processedTxsHashes, unprocessedTxResponses
return processedTxResponses, processedTxsHashes, unprocessedTxResponses, unprocessedTxsHashes
}
2 changes: 1 addition & 1 deletion state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,7 @@ func TestDetermineProcessedTransactions(t *testing.T) {
for _, tc := range tcs {
tc := tc
t.Run(tc.description, func(t *testing.T) {
actualProcessedTx, _, actualUnprocessedTxs := state.DetermineProcessedTransactions(tc.input)
actualProcessedTx, _, actualUnprocessedTxs, _ := state.DetermineProcessedTransactions(tc.input)
require.Equal(t, tc.expectedProcessedOutput, actualProcessedTx)
require.Equal(t, tc.expectedUnprocessedOutput, actualUnprocessedTxs)
})
Expand Down

0 comments on commit 75a5cb6

Please sign in to comment.