Skip to content

Commit

Permalink
indexer: move deduplication functionality purely to the kvindexer (ba…
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Oct 10, 2022
1 parent 430afb2 commit 1d160a5
Show file tree
Hide file tree
Showing 9 changed files with 196 additions and 219 deletions.
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@
`[storage]` section delimiter in the generated configuration file - this has
now been fixed
- [p2p] \#9500 prevent peers who have errored being added to the peer_set (@jmalicevic)
- [indexer] \#9473 fix bug that caused the psql indexer to index empty blocks whenever one of the transactions returned a non zero code. The relevant deduplication logic has been moved within the kv indexer only (@cmwaters)
2 changes: 1 addition & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func createAndStartIndexerService(
blockIndexer = &blockidxnull.BlockerIndexer{}
}

indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus)
indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus, false)
indexerService.SetLogger(logger.With("module", "txindex"))

if err := indexerService.Start(); err != nil {
Expand Down
1 change: 0 additions & 1 deletion proxy/mocks/client_creator.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 50 additions & 0 deletions state/indexer/sink/psql/psql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/stretchr/testify/require"

abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types"

// Register the Postgres database driver.
Expand Down Expand Up @@ -197,6 +198,55 @@ func TestIndexing(t *testing.T) {
err = indexer.IndexTxEvents([]*abci.TxResult{txResult})
require.NoError(t, err)
})

t.Run("IndexerService", func(t *testing.T) {
indexer := &EventSink{store: testDB(), chainID: chainID}

// event bus
eventBus := types.NewEventBus()
err := eventBus.Start()
require.NoError(t, err)
t.Cleanup(func() {
if err := eventBus.Stop(); err != nil {
t.Error(err)
}
})

service := txindex.NewIndexerService(indexer.TxIndexer(), indexer.BlockIndexer(), eventBus, true)
err = service.Start()
require.NoError(t, err)
t.Cleanup(func() {
if err := service.Stop(); err != nil {
t.Error(err)
}
})

// publish block with txs
err = eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{
Header: types.Header{Height: 1},
NumTxs: int64(2),
})
require.NoError(t, err)
txResult1 := &abci.TxResult{
Height: 1,
Index: uint32(0),
Tx: types.Tx("foo"),
Result: abci.ResponseDeliverTx{Code: 0},
}
err = eventBus.PublishEventTx(types.EventDataTx{TxResult: *txResult1})
require.NoError(t, err)
txResult2 := &abci.TxResult{
Height: 1,
Index: uint32(1),
Tx: types.Tx("bar"),
Result: abci.ResponseDeliverTx{Code: 1},
}
err = eventBus.PublishEventTx(types.EventDataTx{TxResult: *txResult2})
require.NoError(t, err)

time.Sleep(100 * time.Millisecond)
require.True(t, service.IsRunning())
})
}

func TestStop(t *testing.T) {
Expand Down
81 changes: 27 additions & 54 deletions state/txindex/indexer_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package txindex
import (
"context"

abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/types"
Expand All @@ -20,19 +19,21 @@ const (
type IndexerService struct {
service.BaseService

txIdxr TxIndexer
blockIdxr indexer.BlockIndexer
eventBus *types.EventBus
txIdxr TxIndexer
blockIdxr indexer.BlockIndexer
eventBus *types.EventBus
terminateOnError bool
}

// NewIndexerService returns a new service instance.
func NewIndexerService(
txIdxr TxIndexer,
blockIdxr indexer.BlockIndexer,
eventBus *types.EventBus,
terminateOnError bool,
) *IndexerService {

is := &IndexerService{txIdxr: txIdxr, blockIdxr: blockIdxr, eventBus: eventBus}
is := &IndexerService{txIdxr: txIdxr, blockIdxr: blockIdxr, eventBus: eventBus, terminateOnError: terminateOnError}
is.BaseService = *service.NewBaseService(nil, "IndexerService", is)
return is
}
Expand Down Expand Up @@ -74,24 +75,38 @@ func (is *IndexerService) OnStart() error {
"index", txResult.Index,
"err", err,
)

if is.terminateOnError {
if err := is.Stop(); err != nil {
is.Logger.Error("failed to stop", "err", err)
}
return
}
}
}

if err := is.blockIdxr.Index(eventDataHeader); err != nil {
is.Logger.Error("failed to index block", "height", height, "err", err)
if is.terminateOnError {
if err := is.Stop(); err != nil {
is.Logger.Error("failed to stop", "err", err)
}
return
}
} else {
is.Logger.Info("indexed block", "height", height)
}

batch.Ops, err = DeduplicateBatch(batch.Ops, is.txIdxr)
if err != nil {
is.Logger.Error("deduplicate batch", "height", height)
is.Logger.Info("indexed block exents", "height", height)
}

if err = is.txIdxr.AddBatch(batch); err != nil {
is.Logger.Error("failed to index block txs", "height", height, "err", err)
if is.terminateOnError {
if err := is.Stop(); err != nil {
is.Logger.Error("failed to stop", "err", err)
}
return
}
} else {
is.Logger.Debug("indexed block txs", "height", height, "num_txs", eventDataHeader.NumTxs)
is.Logger.Debug("indexed transactions", "height", height, "num_txs", eventDataHeader.NumTxs)
}
}
}()
Expand All @@ -104,45 +119,3 @@ func (is *IndexerService) OnStop() {
_ = is.eventBus.UnsubscribeAll(context.Background(), subscriber)
}
}

// DeduplicateBatch consider the case of duplicate txs.
// if the current one under investigation is NOT OK, then we need to check
// whether there's a previously indexed tx.
// SKIP the current tx if the previously indexed record is found and successful.
func DeduplicateBatch(ops []*abci.TxResult, txIdxr TxIndexer) ([]*abci.TxResult, error) {
result := make([]*abci.TxResult, 0, len(ops))

// keep track of successful txs in this block in order to suppress latter ones being indexed.
var successfulTxsInThisBlock = make(map[string]struct{})

for _, txResult := range ops {
hash := types.Tx(txResult.Tx).Hash()

if txResult.Result.IsOK() {
successfulTxsInThisBlock[string(hash)] = struct{}{}
} else {
// if it already appeared in current block and was successful, skip.
if _, found := successfulTxsInThisBlock[string(hash)]; found {
continue
}

// check if this tx hash is already indexed
old, err := txIdxr.Get(hash)

// if db op errored
// Not found is not an error
if err != nil {
return nil, err
}

// if it's already indexed in an older block and was successful, skip.
if old != nil && old.Result.Code == abci.CodeTypeOK {
continue
}
}

result = append(result, txResult)
}

return result, nil
}
163 changes: 1 addition & 162 deletions state/txindex/indexer_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) {
txIndexer := kv.NewTxIndex(store)
blockIndexer := blockidxkv.New(db.NewPrefixDB(store, []byte("block_events")))

service := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus)
service := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus, false)
service.SetLogger(log.TestingLogger())
err = service.Start()
require.NoError(t, err)
Expand Down Expand Up @@ -79,164 +79,3 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) {
require.NoError(t, err)
require.Equal(t, txResult2, res)
}

func TestTxIndexDuplicatePreviouslySuccessful(t *testing.T) {
var mockTx = types.Tx("MOCK_TX_HASH")

testCases := []struct {
name string
tx1 abci.TxResult
tx2 abci.TxResult
expSkip bool // do we expect the second tx to be skipped by tx indexer
}{
{"skip, previously successful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
abci.TxResult{
Height: 2,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
true,
},
{"not skip, previously unsuccessful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
abci.TxResult{
Height: 2,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
false,
},
{"not skip, both successful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
abci.TxResult{
Height: 2,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
false,
},
{"not skip, both unsuccessful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
abci.TxResult{
Height: 2,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
false,
},
{"skip, same block, previously successful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
true,
},
{"not skip, same block, previously unsuccessful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
false,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
indexer := kv.NewTxIndex(db.NewMemDB())

if tc.tx1.Height != tc.tx2.Height {
// index the first tx
err := indexer.AddBatch(&txindex.Batch{
Ops: []*abci.TxResult{&tc.tx1},
})
require.NoError(t, err)

// check if the second one should be skipped.
ops, err := txindex.DeduplicateBatch([]*abci.TxResult{&tc.tx2}, indexer)
require.NoError(t, err)

if tc.expSkip {
require.Empty(t, ops)
} else {
require.Equal(t, []*abci.TxResult{&tc.tx2}, ops)
}
} else {
// same block
ops := []*abci.TxResult{&tc.tx1, &tc.tx2}
ops, err := txindex.DeduplicateBatch(ops, indexer)
require.NoError(t, err)
if tc.expSkip {
// the second one is skipped
require.Equal(t, []*abci.TxResult{&tc.tx1}, ops)
} else {
require.Equal(t, []*abci.TxResult{&tc.tx1, &tc.tx2}, ops)
}
}
})
}
}
Loading

0 comments on commit 1d160a5

Please sign in to comment.