diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 9d71da216bd..ae38af93745 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -26,3 +26,4 @@ `[storage]` section delimiter in the generated configuration file - this has now been fixed - [p2p] \#9500 prevent peers who have errored being added to the peer_set (@jmalicevic) +- [indexer] \#9473 fix bug that caused the psql indexer to index empty blocks whenever one of the transactions returned a non zero code. The relevant deduplication logic has been moved within the kv indexer only (@cmwaters) diff --git a/node/node.go b/node/node.go index 5e6350e8ec2..5941c65f44e 100644 --- a/node/node.go +++ b/node/node.go @@ -305,7 +305,7 @@ func createAndStartIndexerService( blockIndexer = &blockidxnull.BlockerIndexer{} } - indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus) + indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus, false) indexerService.SetLogger(logger.With("module", "txindex")) if err := indexerService.Start(); err != nil { diff --git a/proxy/mocks/client_creator.go b/proxy/mocks/client_creator.go index fa0bf360593..eced0aeff60 100644 --- a/proxy/mocks/client_creator.go +++ b/proxy/mocks/client_creator.go @@ -35,7 +35,6 @@ func (_m *ClientCreator) NewABCIClient() (abcicli.Client, error) { return r0, r1 } - type mockConstructorTestingTNewClientCreator interface { mock.TestingT Cleanup(func()) diff --git a/state/indexer/sink/psql/psql_test.go b/state/indexer/sink/psql/psql_test.go index b40ba395efc..6a12090f3fe 100644 --- a/state/indexer/sink/psql/psql_test.go +++ b/state/indexer/sink/psql/psql_test.go @@ -20,6 +20,7 @@ import ( "github.com/stretchr/testify/require" abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/types" // Register the Postgres database driver. @@ -197,6 +198,55 @@ func TestIndexing(t *testing.T) { err = indexer.IndexTxEvents([]*abci.TxResult{txResult}) require.NoError(t, err) }) + + t.Run("IndexerService", func(t *testing.T) { + indexer := &EventSink{store: testDB(), chainID: chainID} + + // event bus + eventBus := types.NewEventBus() + err := eventBus.Start() + require.NoError(t, err) + t.Cleanup(func() { + if err := eventBus.Stop(); err != nil { + t.Error(err) + } + }) + + service := txindex.NewIndexerService(indexer.TxIndexer(), indexer.BlockIndexer(), eventBus, true) + err = service.Start() + require.NoError(t, err) + t.Cleanup(func() { + if err := service.Stop(); err != nil { + t.Error(err) + } + }) + + // publish block with txs + err = eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{ + Header: types.Header{Height: 1}, + NumTxs: int64(2), + }) + require.NoError(t, err) + txResult1 := &abci.TxResult{ + Height: 1, + Index: uint32(0), + Tx: types.Tx("foo"), + Result: abci.ResponseDeliverTx{Code: 0}, + } + err = eventBus.PublishEventTx(types.EventDataTx{TxResult: *txResult1}) + require.NoError(t, err) + txResult2 := &abci.TxResult{ + Height: 1, + Index: uint32(1), + Tx: types.Tx("bar"), + Result: abci.ResponseDeliverTx{Code: 1}, + } + err = eventBus.PublishEventTx(types.EventDataTx{TxResult: *txResult2}) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + require.True(t, service.IsRunning()) + }) } func TestStop(t *testing.T) { diff --git a/state/txindex/indexer_service.go b/state/txindex/indexer_service.go index 828a63c8bbc..0e8fbb9c911 100644 --- a/state/txindex/indexer_service.go +++ b/state/txindex/indexer_service.go @@ -3,7 +3,6 @@ package txindex import ( "context" - abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/service" "github.com/tendermint/tendermint/state/indexer" "github.com/tendermint/tendermint/types" @@ -20,9 +19,10 @@ const ( type IndexerService struct { service.BaseService - txIdxr TxIndexer - blockIdxr indexer.BlockIndexer - eventBus *types.EventBus + txIdxr TxIndexer + blockIdxr indexer.BlockIndexer + eventBus *types.EventBus + terminateOnError bool } // NewIndexerService returns a new service instance. @@ -30,9 +30,10 @@ func NewIndexerService( txIdxr TxIndexer, blockIdxr indexer.BlockIndexer, eventBus *types.EventBus, + terminateOnError bool, ) *IndexerService { - is := &IndexerService{txIdxr: txIdxr, blockIdxr: blockIdxr, eventBus: eventBus} + is := &IndexerService{txIdxr: txIdxr, blockIdxr: blockIdxr, eventBus: eventBus, terminateOnError: terminateOnError} is.BaseService = *service.NewBaseService(nil, "IndexerService", is) return is } @@ -74,24 +75,38 @@ func (is *IndexerService) OnStart() error { "index", txResult.Index, "err", err, ) + + if is.terminateOnError { + if err := is.Stop(); err != nil { + is.Logger.Error("failed to stop", "err", err) + } + return + } } } if err := is.blockIdxr.Index(eventDataHeader); err != nil { is.Logger.Error("failed to index block", "height", height, "err", err) + if is.terminateOnError { + if err := is.Stop(); err != nil { + is.Logger.Error("failed to stop", "err", err) + } + return + } } else { - is.Logger.Info("indexed block", "height", height) - } - - batch.Ops, err = DeduplicateBatch(batch.Ops, is.txIdxr) - if err != nil { - is.Logger.Error("deduplicate batch", "height", height) + is.Logger.Info("indexed block exents", "height", height) } if err = is.txIdxr.AddBatch(batch); err != nil { is.Logger.Error("failed to index block txs", "height", height, "err", err) + if is.terminateOnError { + if err := is.Stop(); err != nil { + is.Logger.Error("failed to stop", "err", err) + } + return + } } else { - is.Logger.Debug("indexed block txs", "height", height, "num_txs", eventDataHeader.NumTxs) + is.Logger.Debug("indexed transactions", "height", height, "num_txs", eventDataHeader.NumTxs) } } }() @@ -104,45 +119,3 @@ func (is *IndexerService) OnStop() { _ = is.eventBus.UnsubscribeAll(context.Background(), subscriber) } } - -// DeduplicateBatch consider the case of duplicate txs. -// if the current one under investigation is NOT OK, then we need to check -// whether there's a previously indexed tx. -// SKIP the current tx if the previously indexed record is found and successful. -func DeduplicateBatch(ops []*abci.TxResult, txIdxr TxIndexer) ([]*abci.TxResult, error) { - result := make([]*abci.TxResult, 0, len(ops)) - - // keep track of successful txs in this block in order to suppress latter ones being indexed. - var successfulTxsInThisBlock = make(map[string]struct{}) - - for _, txResult := range ops { - hash := types.Tx(txResult.Tx).Hash() - - if txResult.Result.IsOK() { - successfulTxsInThisBlock[string(hash)] = struct{}{} - } else { - // if it already appeared in current block and was successful, skip. - if _, found := successfulTxsInThisBlock[string(hash)]; found { - continue - } - - // check if this tx hash is already indexed - old, err := txIdxr.Get(hash) - - // if db op errored - // Not found is not an error - if err != nil { - return nil, err - } - - // if it's already indexed in an older block and was successful, skip. - if old != nil && old.Result.Code == abci.CodeTypeOK { - continue - } - } - - result = append(result, txResult) - } - - return result, nil -} diff --git a/state/txindex/indexer_service_test.go b/state/txindex/indexer_service_test.go index f7070f119fd..8c7dca2ac81 100644 --- a/state/txindex/indexer_service_test.go +++ b/state/txindex/indexer_service_test.go @@ -32,7 +32,7 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) { txIndexer := kv.NewTxIndex(store) blockIndexer := blockidxkv.New(db.NewPrefixDB(store, []byte("block_events"))) - service := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus) + service := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus, false) service.SetLogger(log.TestingLogger()) err = service.Start() require.NoError(t, err) @@ -79,164 +79,3 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) { require.NoError(t, err) require.Equal(t, txResult2, res) } - -func TestTxIndexDuplicatePreviouslySuccessful(t *testing.T) { - var mockTx = types.Tx("MOCK_TX_HASH") - - testCases := []struct { - name string - tx1 abci.TxResult - tx2 abci.TxResult - expSkip bool // do we expect the second tx to be skipped by tx indexer - }{ - {"skip, previously successful", - abci.TxResult{ - Height: 1, - Index: 0, - Tx: mockTx, - Result: abci.ResponseDeliverTx{ - Code: abci.CodeTypeOK, - }, - }, - abci.TxResult{ - Height: 2, - Index: 0, - Tx: mockTx, - Result: abci.ResponseDeliverTx{ - Code: abci.CodeTypeOK + 1, - }, - }, - true, - }, - {"not skip, previously unsuccessful", - abci.TxResult{ - Height: 1, - Index: 0, - Tx: mockTx, - Result: abci.ResponseDeliverTx{ - Code: abci.CodeTypeOK + 1, - }, - }, - abci.TxResult{ - Height: 2, - Index: 0, - Tx: mockTx, - Result: abci.ResponseDeliverTx{ - Code: abci.CodeTypeOK + 1, - }, - }, - false, - }, - {"not skip, both successful", - abci.TxResult{ - Height: 1, - Index: 0, - Tx: mockTx, - Result: abci.ResponseDeliverTx{ - Code: abci.CodeTypeOK, - }, - }, - abci.TxResult{ - Height: 2, - Index: 0, - Tx: mockTx, - Result: abci.ResponseDeliverTx{ - Code: abci.CodeTypeOK, - }, - }, - false, - }, - {"not skip, both unsuccessful", - abci.TxResult{ - Height: 1, - Index: 0, - Tx: mockTx, - Result: abci.ResponseDeliverTx{ - Code: abci.CodeTypeOK + 1, - }, - }, - abci.TxResult{ - Height: 2, - Index: 0, - Tx: mockTx, - Result: abci.ResponseDeliverTx{ - Code: abci.CodeTypeOK + 1, - }, - }, - false, - }, - {"skip, same block, previously successful", - abci.TxResult{ - Height: 1, - Index: 0, - Tx: mockTx, - Result: abci.ResponseDeliverTx{ - Code: abci.CodeTypeOK, - }, - }, - abci.TxResult{ - Height: 1, - Index: 0, - Tx: mockTx, - Result: abci.ResponseDeliverTx{ - Code: abci.CodeTypeOK + 1, - }, - }, - true, - }, - {"not skip, same block, previously unsuccessful", - abci.TxResult{ - Height: 1, - Index: 0, - Tx: mockTx, - Result: abci.ResponseDeliverTx{ - Code: abci.CodeTypeOK + 1, - }, - }, - abci.TxResult{ - Height: 1, - Index: 0, - Tx: mockTx, - Result: abci.ResponseDeliverTx{ - Code: abci.CodeTypeOK, - }, - }, - false, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - indexer := kv.NewTxIndex(db.NewMemDB()) - - if tc.tx1.Height != tc.tx2.Height { - // index the first tx - err := indexer.AddBatch(&txindex.Batch{ - Ops: []*abci.TxResult{&tc.tx1}, - }) - require.NoError(t, err) - - // check if the second one should be skipped. - ops, err := txindex.DeduplicateBatch([]*abci.TxResult{&tc.tx2}, indexer) - require.NoError(t, err) - - if tc.expSkip { - require.Empty(t, ops) - } else { - require.Equal(t, []*abci.TxResult{&tc.tx2}, ops) - } - } else { - // same block - ops := []*abci.TxResult{&tc.tx1, &tc.tx2} - ops, err := txindex.DeduplicateBatch(ops, indexer) - require.NoError(t, err) - if tc.expSkip { - // the second one is skipped - require.Equal(t, []*abci.TxResult{&tc.tx1}, ops) - } else { - require.Equal(t, []*abci.TxResult{&tc.tx1, &tc.tx2}, ops) - } - } - }) - } -} diff --git a/state/txindex/kv/kv.go b/state/txindex/kv/kv.go index 9ee1f9466f0..4c55c5fa76e 100644 --- a/state/txindex/kv/kv.go +++ b/state/txindex/kv/kv.go @@ -101,12 +101,30 @@ func (txi *TxIndex) AddBatch(b *txindex.Batch) error { // that indexed from the tx's events is a composite of the event type and the // respective attribute's key delimited by a "." (eg. "account.number"). // Any event with an empty type is not indexed. +// +// If a transaction is indexed with the same hash as a previous transaction, it will +// be overwritten unless the tx result was NOT OK and the prior result was OK i.e. +// more transactions that successfully executed overwrite transactions that failed +// or successful yet older transactions. func (txi *TxIndex) Index(result *abci.TxResult) error { b := txi.store.NewBatch() defer b.Close() hash := types.Tx(result.Tx).Hash() + if !result.Result.IsOK() { + oldResult, err := txi.Get(hash) + if err != nil { + return err + } + + // if the new transaction failed and it's already indexed in an older block and was successful + // we skip it as we want users to get the older successful transaction when they query. + if oldResult != nil && oldResult.Result.Code == abci.CodeTypeOK { + return nil + } + } + // index tx by events err := txi.indexEvents(result, hash, b) if err != nil { diff --git a/state/txindex/kv/kv_test.go b/state/txindex/kv/kv_test.go index 9b15c1971ce..538807a9242 100644 --- a/state/txindex/kv/kv_test.go +++ b/state/txindex/kv/kv_test.go @@ -259,6 +259,103 @@ func TestTxSearchOneTxWithMultipleSameTagsButDifferentValues(t *testing.T) { } } +func TestTxIndexDuplicatePreviouslySuccessful(t *testing.T) { + var mockTx = types.Tx("MOCK_TX_HASH") + + testCases := []struct { + name string + tx1 *abci.TxResult + tx2 *abci.TxResult + expOverwrite bool // do we expect the second tx to overwrite the first tx + }{ + { + "don't overwrite as a non-zero code was returned and the previous tx was successful", + &abci.TxResult{ + Height: 1, + Index: 0, + Tx: mockTx, + Result: abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK, + }, + }, + &abci.TxResult{ + Height: 2, + Index: 0, + Tx: mockTx, + Result: abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK + 1, + }, + }, + false, + }, + { + "overwrite as the previous tx was also unsuccessful", + &abci.TxResult{ + Height: 1, + Index: 0, + Tx: mockTx, + Result: abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK + 1, + }, + }, + &abci.TxResult{ + Height: 2, + Index: 0, + Tx: mockTx, + Result: abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK + 1, + }, + }, + true, + }, + { + "overwrite as the most recent tx was successful", + &abci.TxResult{ + Height: 1, + Index: 0, + Tx: mockTx, + Result: abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK, + }, + }, + &abci.TxResult{ + Height: 2, + Index: 0, + Tx: mockTx, + Result: abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK, + }, + }, + true, + }, + } + + hash := mockTx.Hash() + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + indexer := NewTxIndex(db.NewMemDB()) + + // index the first tx + err := indexer.Index(tc.tx1) + require.NoError(t, err) + + // index the same tx with different results + err = indexer.Index(tc.tx2) + require.NoError(t, err) + + res, err := indexer.Get(hash) + require.NoError(t, err) + + if tc.expOverwrite { + require.Equal(t, tc.tx2, res) + } else { + require.Equal(t, tc.tx1, res) + } + }) + } +} + func TestTxSearchMultipleTxs(t *testing.T) { indexer := NewTxIndex(db.NewMemDB()) diff --git a/test/maverick/node/node.go b/test/maverick/node/node.go index 6816e8dc5dd..dd29d34df94 100644 --- a/test/maverick/node/node.go +++ b/test/maverick/node/node.go @@ -320,7 +320,7 @@ func createAndStartIndexerService( blockIndexer = &blockidxnull.BlockerIndexer{} } - indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus) + indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus, false) indexerService.SetLogger(logger.With("module", "txindex")) if err := indexerService.Start(); err != nil {