diff --git a/command/server/config/config.go b/command/server/config/config.go index 93b372bbf3..532cf482ee 100644 --- a/command/server/config/config.go +++ b/command/server/config/config.go @@ -32,13 +32,17 @@ type Config struct { JSONLogFormat bool `json:"json_log_format" yaml:"json_log_format"` CorsAllowedOrigins []string `json:"cors_allowed_origins" yaml:"cors_allowed_origins"` - Relayer bool `json:"relayer" yaml:"relayer"` - NumBlockConfirmations uint64 `json:"num_block_confirmations" yaml:"num_block_confirmations"` + Relayer bool `json:"relayer" yaml:"relayer"` ConcurrentRequestsDebug uint64 `json:"concurrent_requests_debug" yaml:"concurrent_requests_debug"` WebSocketReadLimit uint64 `json:"web_socket_read_limit" yaml:"web_socket_read_limit"` MetricsInterval time.Duration `json:"metrics_interval" yaml:"metrics_interval"` + + // event tracker + NumBlockConfirmations uint64 `json:"num_block_confirmations" yaml:"num_block_confirmations"` + TrackerSyncBatchSize uint64 `json:"tracker_sync_batch_size" yaml:"tracker_sync_batch_size"` + TrackerBlocksToReconcile uint64 `json:"tracker_blocks_to_reconcile" yaml:"tracker_blocks_to_reconcile"` } // Telemetry holds the config details for metric services. @@ -81,10 +85,6 @@ const ( // requests with fromBlock/toBlock values (e.g. eth_getLogs) DefaultJSONRPCBlockRangeLimit uint64 = 1000 - // DefaultNumBlockConfirmations minimal number of child blocks required for the parent block to be considered final - // on ethereum epoch lasts for 32 blocks. more details: https://www.alchemy.com/overviews/ethereum-commitment-levels - DefaultNumBlockConfirmations uint64 = 64 - // DefaultConcurrentRequestsDebug specifies max number of allowed concurrent requests for debug endpoints DefaultConcurrentRequestsDebug uint64 = 32 @@ -96,6 +96,25 @@ const ( // DefaultMetricsInterval specifies the time interval after which Prometheus metrics will be generated. // A value of 0 means the metrics are disabled. DefaultMetricsInterval time.Duration = time.Second * 8 + + // event tracker + + // DefaultNumBlockConfirmations minimal number of child blocks required for the parent block to be considered final + // on ethereum epoch lasts for 32 blocks. more details: https://www.alchemy.com/overviews/ethereum-commitment-levels + DefaultNumBlockConfirmations uint64 = 64 + + // DefaultTrackerSyncBatchSize defines a default batch size of blocks that will be gotten from tracked chain, + // when tracker is out of sync and needs to sync a number of blocks. + DefaultTrackerSyncBatchSize uint64 = 10 + + // DefaultTrackerBlocksToReconcile defines how default number blocks that tracker + // will sync up from the latest block on tracked chain. + // If a node that has tracker, was offline for days, months, a year, it will miss a lot of blocks. + // In the meantime, we expect the rest of nodes to have collected the desired events and did their + // logic with them, continuing consensus and relayer stuff. + // In order to not waste too much unnecessary time in syncing all those blocks, with NumOfBlocksToReconcile, + // we tell the tracker to sync only latestBlock.Number - NumOfBlocksToReconcile number of blocks. + DefaultTrackerBlocksToReconcile uint64 = 10000 ) // DefaultConfig returns the default server configuration @@ -131,10 +150,13 @@ func DefaultConfig() *Config { JSONRPCBatchRequestLimit: DefaultJSONRPCBatchRequestLimit, JSONRPCBlockRangeLimit: DefaultJSONRPCBlockRangeLimit, Relayer: false, - NumBlockConfirmations: DefaultNumBlockConfirmations, ConcurrentRequestsDebug: DefaultConcurrentRequestsDebug, WebSocketReadLimit: DefaultWebSocketReadLimit, MetricsInterval: DefaultMetricsInterval, + // event tracker + NumBlockConfirmations: DefaultNumBlockConfirmations, + TrackerSyncBatchSize: DefaultTrackerSyncBatchSize, + TrackerBlocksToReconcile: DefaultTrackerBlocksToReconcile, } } diff --git a/command/server/params.go b/command/server/params.go index da48b929e9..85b3439d2e 100644 --- a/command/server/params.go +++ b/command/server/params.go @@ -37,13 +37,17 @@ const ( corsOriginFlag = "access-control-allow-origins" logFileLocationFlag = "log-to" - relayerFlag = "relayer" - numBlockConfirmationsFlag = "num-block-confirmations" + relayerFlag = "relayer" concurrentRequestsDebugFlag = "concurrent-requests-debug" webSocketReadLimitFlag = "websocket-read-limit" metricsIntervalFlag = "metrics-interval" + + // event tracker + numBlockConfirmationsFlag = "num-block-confirmations" + trackerSyncBatchSizeFlag = "tracker-sync-batch-size" + trackerBlocksToReconcileFlag = "tracker-blocks-to-reconcile" ) // Flags that are deprecated, but need to be preserved for @@ -185,8 +189,11 @@ func (p *serverParams) generateConfig() *server.Config { JSONLogFormat: p.rawConfig.JSONLogFormat, LogFilePath: p.logFileLocation, - Relayer: p.relayer, - NumBlockConfirmations: p.rawConfig.NumBlockConfirmations, - MetricsInterval: p.rawConfig.MetricsInterval, + Relayer: p.relayer, + MetricsInterval: p.rawConfig.MetricsInterval, + // event tracker + NumBlockConfirmations: p.rawConfig.NumBlockConfirmations, + TrackerSyncBatchSize: p.rawConfig.TrackerSyncBatchSize, + TrackerBlocksToReconcile: p.rawConfig.TrackerBlocksToReconcile, } } diff --git a/command/server/server.go b/command/server/server.go index f9c4ab4c6c..a4f2458e6c 100644 --- a/command/server/server.go +++ b/command/server/server.go @@ -214,13 +214,6 @@ func setFlags(cmd *cobra.Command) { "start the state sync relayer service (PolyBFT only)", ) - cmd.Flags().Uint64Var( - ¶ms.rawConfig.NumBlockConfirmations, - numBlockConfirmationsFlag, - defaultConfig.NumBlockConfirmations, - "minimal number of child blocks required for the parent block to be considered final", - ) - cmd.Flags().Uint64Var( ¶ms.rawConfig.ConcurrentRequestsDebug, concurrentRequestsDebugFlag, @@ -242,6 +235,38 @@ func setFlags(cmd *cobra.Command) { "the interval (in seconds) at which special metrics are generated. a value of zero means the metrics are disabled", ) + // event tracker config + cmd.Flags().Uint64Var( + ¶ms.rawConfig.NumBlockConfirmations, + numBlockConfirmationsFlag, + defaultConfig.NumBlockConfirmations, + "minimal number of child blocks required for the parent block to be considered final", + ) + + cmd.Flags().Uint64Var( + ¶ms.rawConfig.TrackerSyncBatchSize, + trackerSyncBatchSizeFlag, + defaultConfig.TrackerSyncBatchSize, + `defines a batch size of blocks that will be gotten from tracked chain, + when tracker is out of sync and needs to sync a number of blocks. + (e.g., SyncBatchSize = 10, trackers last processed block is 10, latest block on tracked chain is 100, + it will get blocks 11-20, get logs from confirmed blocks of given batch, remove processed confirm logs + from memory, and continue to the next batch)`, + ) + + cmd.Flags().Uint64Var( + ¶ms.rawConfig.TrackerBlocksToReconcile, + trackerBlocksToReconcileFlag, + defaultConfig.TrackerBlocksToReconcile, + `defines how many blocks we will sync up from the latest block on tracked chain. + If a node that has tracker, was offline for days, months, a year, it will miss a lot of blocks. + In the meantime, we expect the rest of nodes to have collected the desired events and did their + logic with them, continuing consensus and relayer stuff. + In order to not waste too much unnecessary time in syncing all those blocks, with NumOfBlocksToReconcile, + we tell the tracker to sync only latestBlock.Number - NumOfBlocksToReconcile number of blocks. + If 0 is set to this flag, event tracker will sync all the blocks from tracked chain`, + ) + setLegacyFlags(cmd) setDevFlags(cmd) diff --git a/consensus/consensus.go b/consensus/consensus.go index 160faf4523..b1f63e78c2 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -84,8 +84,11 @@ type Params struct { SecretsManager secrets.SecretsManager BlockTime uint64 - NumBlockConfirmations uint64 - MetricsInterval time.Duration + MetricsInterval time.Duration + // event tracker + NumBlockConfirmations uint64 + TrackerSyncBatchSize uint64 + TrackerBlocksToReconcile uint64 } // Factory is the factory function to create a discovery consensus diff --git a/consensus/polybft/consensus_runtime.go b/consensus/polybft/consensus_runtime.go index 288a810daf..1af3078d67 100644 --- a/consensus/polybft/consensus_runtime.go +++ b/consensus/polybft/consensus_runtime.go @@ -74,16 +74,19 @@ type guardedDataDTO struct { // runtimeConfig is a struct that holds configuration data for given consensus runtime type runtimeConfig struct { - PolyBFTConfig *PolyBFTConfig - DataDir string - Key *wallet.Key - State *State - blockchain blockchainBackend - polybftBackend polybftBackend - txPool txPoolInterface - bridgeTopic topic - numBlockConfirmations uint64 - consensusConfig *consensus.Config + PolyBFTConfig *PolyBFTConfig + DataDir string + Key *wallet.Key + State *State + blockchain blockchainBackend + polybftBackend polybftBackend + txPool txPoolInterface + bridgeTopic topic + consensusConfig *consensus.Config + // event tracker + numBlockConfirmations uint64 + trackerSyncBatchSize uint64 + trackerBlocksToReconcile uint64 } // consensusRuntime is a struct that provides consensus runtime features like epoch, state and event management @@ -197,15 +200,18 @@ func (c *consensusRuntime) initStateSyncManager(logger hcf.Logger) error { logger.Named("state-sync-manager"), c.config.State, &stateSyncConfig{ - key: c.config.Key, - stateSenderAddr: stateSenderAddr, - stateSenderStartBlock: c.config.PolyBFTConfig.Bridge.EventTrackerStartBlocks[stateSenderAddr], - jsonrpcAddr: c.config.PolyBFTConfig.Bridge.JSONRPCEndpoint, - dataDir: c.config.DataDir, - topic: c.config.bridgeTopic, - maxCommitmentSize: maxCommitmentSize, + key: c.config.Key, + stateSenderAddr: stateSenderAddr, + stateSenderStartBlock: c.config.PolyBFTConfig.Bridge.EventTrackerStartBlocks[stateSenderAddr], + jsonrpcAddr: c.config.PolyBFTConfig.Bridge.JSONRPCEndpoint, + dataDir: c.config.DataDir, + topic: c.config.bridgeTopic, + maxCommitmentSize: maxCommitmentSize, + // event tracker numBlockConfirmations: c.config.numBlockConfirmations, blockTrackerPollInterval: c.config.PolyBFTConfig.BlockTrackerPollInterval.Duration, + trackerSyncBatchSize: c.config.trackerSyncBatchSize, + trackerBlocksToReconcile: c.config.trackerBlocksToReconcile, }, c, ) diff --git a/consensus/polybft/eventtracker/event_tracker.go b/consensus/polybft/eventtracker/event_tracker.go new file mode 100644 index 0000000000..f0e7d9f37b --- /dev/null +++ b/consensus/polybft/eventtracker/event_tracker.go @@ -0,0 +1,470 @@ +package eventtracker + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/0xPolygon/polygon-edge/helper/common" + hcf "github.com/hashicorp/go-hclog" + "github.com/umbracle/ethgo" + "github.com/umbracle/ethgo/blocktracker" +) + +// EventSubscriber is an interface that defines methods for handling tracked logs (events) from a blockchain +type EventSubscriber interface { + AddLog(log *ethgo.Log) error +} + +// BlockProvider is an interface that defines methods for retrieving blocks and logs from a blockchain +type BlockProvider interface { + GetBlockByHash(hash ethgo.Hash, full bool) (*ethgo.Block, error) + GetBlockByNumber(i ethgo.BlockNumber, full bool) (*ethgo.Block, error) + GetLogs(filter *ethgo.LogFilter) ([]*ethgo.Log, error) +} + +// EventTrackerConfig is a struct that holds configuration of a EventTracker +type EventTrackerConfig struct { + // RPCEndpoint is the full json rpc url on some node on a tracked chain + RPCEndpoint string + + // NumBlockConfirmations defines how many blocks must pass from a certain block, + // to consider that block as final on the tracked chain. + // This is very important for reorgs, and events from the given block will only be + // processed if it hits this confirmation mark. + // (e.g., NumBlockConfirmations = 3, and if the last tracked block is 10, + // events from block 10, will only be processed when we get block 13 from the tracked chain) + NumBlockConfirmations uint64 + + // SyncBatchSize defines a batch size of blocks that will be gotten from tracked chain, + // when tracker is out of sync and needs to sync a number of blocks. + // (e.g., SyncBatchSize = 10, trackers last processed block is 10, latest block on tracked chain is 100, + // it will get blocks 11-20, get logs from confirmed blocks of given batch, remove processed confirm logs + // from memory, and continue to the next batch) + SyncBatchSize uint64 + + // NumOfBlocksToReconcile defines how many blocks we will sync up from the latest block on tracked chain. + // If a node that has tracker, was offline for days, months, a year, it will miss a lot of blocks. + // In the meantime, we expect the rest of nodes to have collected the desired events and did their + // logic with them, continuing consensus and relayer stuff. + // In order to not waste too much unnecessary time in syncing all those blocks, with NumOfBlocksToReconcile, + // we tell the tracker to sync only latestBlock.Number - NumOfBlocksToReconcile number of blocks. + NumOfBlocksToReconcile uint64 + + // PollInterval defines a time interval in which tracker polls json rpc node + // for latest block on the tracked chain. + PollInterval time.Duration + + // Logger is the logger instance for event tracker + Logger hcf.Logger + + // LogFilter defines which events are tracked and from which contracts on the tracked chain + LogFilter map[ethgo.Address][]ethgo.Hash + + // BlockProvider is the implementation of a provider that returns blocks and logs from tracked chain + BlockProvider BlockProvider + + // EventSubscriber is the subscriber that requires events tracked by the event tracker + EventSubscriber EventSubscriber +} + +// EventTracker represents a tracker for events on desired contracts on some chain +type EventTracker struct { + config *EventTrackerConfig + + closeCh chan struct{} + once sync.Once + + blockTracker blocktracker.BlockTrackerInterface + blockContainer *TrackerBlockContainer + + // store is the store implementation for data that tracker saves (lastProcessedBlock and logs) + store EventTrackerStore +} + +// NewEventTracker is a constructor function that creates a new instance of the EventTracker struct. +// +// Example Usage: +// +// config := &EventTrackerConfig{ +// RpcEndpoint: "http://some-json-rpc-url.com", +// NumBlockConfirmations: 10, +// SyncBatchSize: 20, +// NumOfBlocksToReconcile:10_000, +// PollInterval: 2 * time.Second, +// Logger: logger, +// Store: store, +// EventSubscriber: subscriber, +// Provider: provider, +// LogFilter: TrackerLogFilter{ +// Addresses: []ethgo.Address{addressOfSomeContract}, +// IDs: []ethgo.Hash{idHashOfSomeEvent}, +// }, +// } +// t := NewEventTracker(config, store) +// +// Inputs: +// - config (TrackerConfig): configuration of EventTracker. +// +// Outputs: +// - A new instance of the EventTracker struct. +func NewEventTracker(config *EventTrackerConfig, store EventTrackerStore, + startBlockFromGenesis uint64) (*EventTracker, error) { + lastProcessedBlock, err := store.GetLastProcessedBlock() + if err != nil { + return nil, err + } + + if lastProcessedBlock == 0 { + lastProcessedBlock = startBlockFromGenesis + + if config.NumOfBlocksToReconcile > 0 { + latestBlock, err := config.BlockProvider.GetBlockByNumber(ethgo.Latest, false) + if err != nil { + return nil, err + } + + if latestBlock.Number > config.NumOfBlocksToReconcile && + startBlockFromGenesis < latestBlock.Number-config.NumOfBlocksToReconcile { + // if this is a fresh start, and we missed too much blocks, + // then we should start syncing from + // latestBlock.Number - NumOfBlocksToReconcile + lastProcessedBlock = latestBlock.Number - config.NumOfBlocksToReconcile + } + } + } + + return &EventTracker{ + config: config, + store: store, + closeCh: make(chan struct{}), + blockTracker: blocktracker.NewJSONBlockTracker(config.BlockProvider), + blockContainer: NewTrackerBlockContainer(lastProcessedBlock), + }, nil +} + +// Close closes the EventTracker by closing the closeCh channel. +// This method is used to signal the goroutines to stop. +// +// Example Usage: +// +// tracker := NewEventTracker(config) +// tracker.Start() +// defer tracker.Close() +// +// Inputs: None +// +// Flow: +// 1. The Close() method is called on an instance of EventTracker. +// 2. The closeCh channel is closed, which signals the goroutines to stop. +// +// Outputs: None +func (e *EventTracker) Close() { + close(e.closeCh) +} + +// Start is a method in the EventTracker struct that starts the tracking of blocks +// and retrieval of logs from given blocks from the tracked chain. +// If the tracker was turned off (node was down) for some time, it will sync up all the missed +// blocks and logs from the last start (in regards to NumOfBlocksToReconcile field in config). +// +// Returns: +// - nil if start passes successfully. +// - An error if there is an error on startup of blocks tracking on tracked chain. +func (e *EventTracker) Start() error { + e.config.Logger.Info("Starting event tracker", + "jsonRpcEndpoint", e.config.RPCEndpoint, + "numBlockConfirmations", e.config.NumBlockConfirmations, + "pollInterval", e.config.PollInterval, + "syncBatchSize", e.config.SyncBatchSize, + "numOfBlocksToReconcile", e.config.NumOfBlocksToReconcile, + "logFilter", e.config.LogFilter, + "lastBlockProcessed", e.blockContainer.LastProcessedBlock(), + ) + + ctx, cancelFn := context.WithCancel(context.Background()) + go func() { + <-e.closeCh + cancelFn() + }() + + go common.RetryForever(ctx, time.Second, func(context.Context) error { + // sync up all missed blocks on start if it is not already sync up + if err := e.syncOnStart(); err != nil { + e.config.Logger.Error("Syncing up on start failed.", "err", err) + + return err + } + + // start the polling of blocks + err := e.blockTracker.Track(ctx, func(block *ethgo.Block) error { + return e.trackBlock(block) + }) + + if common.IsContextDone(err) { + return nil + } + + return err + }) + + return nil +} + +// trackBlock is a method in the EventTracker struct that is responsible for tracking blocks and processing their logs +// +// Inputs: +// - block: An instance of the ethgo.Block struct representing a block to track. +// +// Returns: +// - nil if tracking block passes successfully. +// - An error if there is an error on tracking given block. +func (e *EventTracker) trackBlock(block *ethgo.Block) error { + if !e.blockContainer.IsOutOfSync(block) { + e.blockContainer.AcquireWriteLock() + defer e.blockContainer.ReleaseWriteLock() + + if e.blockContainer.LastCachedBlock() < block.Number { + // we are not out of sync, it's a sequential add of new block + if err := e.blockContainer.AddBlock(block); err != nil { + return err + } + } + + // check if some blocks reached confirmation level so that we can process their logs + return e.processLogs() + } + + // we are out of sync (either we missed some blocks, or a reorg happened) + // so we get remove the old pending state and get the new one + return e.getNewState(block) +} + +// syncOnStart is a method in the EventTracker struct that is responsible +// for syncing the event tracker on startup. +// It retrieves the latest block and checks if the event tracker is out of sync. +// If it is out of sync, it calls the getNewState method to update the state. +// +// Returns: +// - nil if sync passes successfully, or no sync is done. +// - An error if there is an error retrieving blocks or logs from the external provider or saving logs to the store. +func (e *EventTracker) syncOnStart() (err error) { + var latestBlock *ethgo.Block + + e.once.Do(func() { + e.config.Logger.Info("Syncing up on start...") + latestBlock, err = e.config.BlockProvider.GetBlockByNumber(ethgo.Latest, false) + if err != nil { + return + } + + if !e.blockContainer.IsOutOfSync(latestBlock) { + e.config.Logger.Info("Everything synced up on start") + + return + } + + err = e.getNewState(latestBlock) + }) + + return err +} + +// getNewState is called if tracker is out of sync (it missed some blocks), +// or a reorg happened in the tracked chain. +// It acquires write lock on the block container, so that the state is not changed while it +// retrieves the new blocks (new state). +// It will clean the previously cached state (non confirmed blocks), get the new state, +// set it on the block container and process logs on the confirmed blocks on the new state +// +// Input: +// - latestBlock - latest block on the tracked chain +// +// Returns: +// - nil if there are no confirmed blocks. +// - An error if there is an error retrieving blocks or logs from the external provider or saving logs to the store. +func (e *EventTracker) getNewState(latestBlock *ethgo.Block) error { + lastProcessedBlock := e.blockContainer.LastProcessedBlock() + + e.config.Logger.Info("Getting new state, since some blocks were missed", + "lastProcessedBlock", lastProcessedBlock, "latestBlockFromRpc", latestBlock.Number) + + e.blockContainer.AcquireWriteLock() + defer e.blockContainer.ReleaseWriteLock() + + // clean old state + e.blockContainer.CleanState() + + startBlock := lastProcessedBlock + 1 + + // sanitize startBlock from which we will start polling for blocks + if e.config.NumOfBlocksToReconcile > 0 && + latestBlock.Number > e.config.NumOfBlocksToReconcile && + latestBlock.Number-e.config.NumOfBlocksToReconcile > lastProcessedBlock { + startBlock = latestBlock.Number - e.config.NumOfBlocksToReconcile + } + + // get blocks in batches + for i := startBlock; i < latestBlock.Number; i += e.config.SyncBatchSize { + end := i + e.config.SyncBatchSize - 1 + if end > latestBlock.Number { + // we go until the latest block, since we don't need to + // query for it using an rpc point, since we already have it + end = latestBlock.Number - 1 + } + + e.config.Logger.Info("Getting new state for block batch", "fromBlock", i, "toBlock", end) + + // get and add blocks in batch + for j := i; j <= end; j++ { + block, err := e.config.BlockProvider.GetBlockByNumber(ethgo.BlockNumber(j), false) + if err != nil { + e.config.Logger.Error("Getting new state for block batch failed on rpc call", + "fromBlock", i, + "toBlock", end, + "currentBlock", j, + "err", err) + + return err + } + + if err := e.blockContainer.AddBlock(block); err != nil { + return err + } + } + + // now process logs from confirmed blocks if any + if err := e.processLogs(); err != nil { + return err + } + } + + // add latest block + if err := e.blockContainer.AddBlock(latestBlock); err != nil { + return err + } + + // process logs if there are more confirmed events + if err := e.processLogs(); err != nil { + e.config.Logger.Error("Getting new state failed", + "lastProcessedBlock", lastProcessedBlock, + "latestBlockFromRpc", latestBlock.Number, + "err", err) + + return err + } + + e.config.Logger.Info("Getting new state finished", + "newLastProcessedBlock", e.blockContainer.LastProcessedBlockLocked(), + "latestBlockFromRpc", latestBlock.Number) + + return nil +} + +// ProcessLogs retrieves logs for confirmed blocks, filters them based on certain criteria, +// passes them to the subscriber, and stores them in a store. +// It also removes the processed blocks from the block container. +// +// Returns: +// - nil if there are no confirmed blocks. +// - An error if there is an error retrieving logs from the external provider or saving logs to the store. +func (e *EventTracker) processLogs() error { + confirmedBlocks := e.blockContainer.GetConfirmedBlocks(e.config.NumBlockConfirmations) + if confirmedBlocks == nil { + // no confirmed blocks, so nothing to process + e.config.Logger.Debug("No confirmed blocks. Nothing to process") + + return nil + } + + fromBlock := confirmedBlocks[0] + toBlock := confirmedBlocks[len(confirmedBlocks)-1] + + e.config.Logger.Debug("Processing logs for blocks", "fromBlock", fromBlock, "toBlock", toBlock) + + logs, err := e.config.BlockProvider.GetLogs(e.getLogsQuery(fromBlock, toBlock)) + if err != nil { + e.config.Logger.Error("Process logs failed on getting logs from rpc", + "fromBlock", fromBlock, + "toBlock", toBlock, + "err", err) + + return err + } + + filteredLogs := make([]*ethgo.Log, 0, len(logs)) + + for _, log := range logs { + logIDs, exist := e.config.LogFilter[log.Address] + if !exist { + continue + } + + for _, id := range logIDs { + if log.Topics[0] == id { + filteredLogs = append(filteredLogs, log) + + if err := e.config.EventSubscriber.AddLog(log); err != nil { + // we will only log this, since the store will have these logs + // and subscriber can just get what he missed from store + e.config.Logger.Error("An error occurred while passing event log to subscriber", + "err", err) + } + + break + } + } + } + + if err := e.store.InsertLastProcessedBlock(toBlock); err != nil { + e.config.Logger.Error("Process logs failed on saving last processed block", + "fromBlock", fromBlock, + "toBlock", toBlock, + "err", err) + + return err + } + + if err := e.store.InsertLogs(filteredLogs); err != nil { + e.config.Logger.Error("Process logs failed on saving logs to store", + "fromBlock", fromBlock, + "toBlock", toBlock, + "err", err) + + return err + } + + if err := e.blockContainer.RemoveBlocks(fromBlock, toBlock); err != nil { + return fmt.Errorf("could not remove processed blocks. Err: %w", err) + } + + e.config.Logger.Debug("Processing logs for blocks finished", + "fromBlock", fromBlock, + "toBlock", toBlock, + "numOfLogs", len(filteredLogs)) + + return nil +} + +// getLogsQuery is a method of the EventTracker struct that creates and returns +// a LogFilter object with the specified block range. +// +// Input: +// - from (uint64): The starting block number for the log filter. +// - to (uint64): The ending block number for the log filter. +// +// Returns: +// - filter (*ethgo.LogFilter): The created LogFilter object with the specified block range. +func (e *EventTracker) getLogsQuery(from, to uint64) *ethgo.LogFilter { + addresses := make([]ethgo.Address, 0, len(e.config.LogFilter)) + for a := range e.config.LogFilter { + addresses = append(addresses, a) + } + + filter := ðgo.LogFilter{Address: addresses} + filter.SetFromUint64(from) + filter.SetToUint64(to) + + return filter +} diff --git a/consensus/polybft/eventtracker/event_tracker_fuzz_test.go b/consensus/polybft/eventtracker/event_tracker_fuzz_test.go new file mode 100644 index 0000000000..2400518ca1 --- /dev/null +++ b/consensus/polybft/eventtracker/event_tracker_fuzz_test.go @@ -0,0 +1,88 @@ +package eventtracker + +import ( + "encoding/json" + "testing" + + "github.com/0xPolygon/polygon-edge/types" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/umbracle/ethgo" +) + +type getNewStateF struct { + Address types.Address + Number uint64 + LastProcessed uint64 + BatchSize uint64 + NumBlockConfirmations uint64 + NumBlocksToReconcile uint64 +} + +func FuzzGetNewState(f *testing.F) { + seeds := []getNewStateF{ + { + Address: types.Address(types.StringToAddress("1").Bytes()), + Number: 25, + LastProcessed: 9, + BatchSize: 5, + NumBlockConfirmations: 3, + NumBlocksToReconcile: 1000, + }, + { + Address: types.Address(types.StringToAddress("1").Bytes()), + Number: 30, + LastProcessed: 29, + BatchSize: 5, + NumBlockConfirmations: 3, + NumBlocksToReconcile: 1000, + }, + { + Address: types.Address(types.StringToAddress("2").Bytes()), + Number: 100, + LastProcessed: 10, + BatchSize: 10, + NumBlockConfirmations: 3, + NumBlocksToReconcile: 15, + }, + } + + for _, seed := range seeds { + data, err := json.Marshal(seed) + if err != nil { + return + } + + f.Add(data) + } + + f.Fuzz(func(t *testing.T, input []byte) { + var data getNewStateF + if err := json.Unmarshal(input, &data); err != nil { + t.Skip(err) + } + + providerMock := new(mockProvider) + for blockNum := data.LastProcessed + 1; blockNum <= data.Number; blockNum++ { + providerMock.On("GetBlockByNumber", ethgo.BlockNumber(blockNum), false).Return(ðgo.Block{Number: blockNum}, nil).Once() + } + + logs := []*ethgo.Log{ + createTestLogForStateSyncEvent(t, 1, 1), + createTestLogForStateSyncEvent(t, 1, 11), + createTestLogForStateSyncEvent(t, 2, 3), + } + providerMock.On("GetLogs", mock.Anything).Return(logs, nil) + + testConfig := createTestTrackerConfig(t, data.NumBlockConfirmations, data.BatchSize, data.NumBlocksToReconcile) + testConfig.BlockProvider = providerMock + + eventTracker := &EventTracker{ + config: testConfig, + blockContainer: NewTrackerBlockContainer(data.LastProcessed), + store: newTestTrackerStore(t), + } + + require.NoError(t, eventTracker.getNewState(ðgo.Block{Number: data.Number})) + }) +} diff --git a/consensus/polybft/eventtracker/event_tracker_store.go b/consensus/polybft/eventtracker/event_tracker_store.go new file mode 100644 index 0000000000..25ce1e0c36 --- /dev/null +++ b/consensus/polybft/eventtracker/event_tracker_store.go @@ -0,0 +1,491 @@ +package eventtracker + +import ( + "bytes" + "encoding/json" + "fmt" + "sync" + + "github.com/0xPolygon/polygon-edge/helper/common" + "github.com/umbracle/ethgo" + bolt "go.etcd.io/bbolt" +) + +var ( + petLastProcessedBlockKey = []byte("lastProcessedTrackerBlock") + petLastProcessedBlockBucket = []byte("lastProcessedTrackerBucket") + petLogsBucket = []byte("logs") +) + +// EventTrackerStore is an interface that defines the behavior of an event tracker store +type EventTrackerStore interface { + GetLastProcessedBlock() (uint64, error) + InsertLastProcessedBlock(blockNumber uint64) error + InsertLogs(logs []*ethgo.Log) error + GetLogsByBlockNumber(blockNumber uint64) ([]*ethgo.Log, error) + GetLog(blockNumber, logIndex uint64) (*ethgo.Log, error) + GetAllLogs() ([]*ethgo.Log, error) +} + +var _ EventTrackerStore = (*BoltDBEventTrackerStore)(nil) + +// BoltDBEventTrackerStore represents a store for event tracker events +type BoltDBEventTrackerStore struct { + db *bolt.DB +} + +// NewBoltDBEventTrackerStore is a constructor function that creates +// a new instance of the BoltDBEventTrackerStore struct. +// +// Example Usage: +// +// t := NewBoltDBEventTrackerStore(/edge/polybft/consensus/deposit.db) +// +// Inputs: +// - dbPath (string): Full path to the event tracker store db. +// +// Outputs: +// - A new instance of the BoltDBEventTrackerStore struct with a connection to the event tracker store db. +func NewBoltDBEventTrackerStore(dbPath string) (*BoltDBEventTrackerStore, error) { + db, err := bolt.Open(dbPath, 0666, nil) + if err != nil { + return nil, err + } + + err = db.Update(func(tx *bolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(petLastProcessedBlockBucket) + if err != nil { + return err + } + + _, err = tx.CreateBucketIfNotExists(petLogsBucket) + + return err + }) + if err != nil { + return nil, err + } + + return &BoltDBEventTrackerStore{db: db}, nil +} + +// GetLastProcessedBlock retrieves the last processed block number from a BoltDB database. +// +// Example Usage: +// +// store := NewBoltDBEventTrackerStore(db) +// blockNumber, err := store.GetLastProcessedBlock() +// if err != nil { +// fmt.Println("Error:", err) +// } else { +// fmt.Println("Last processed block number:", blockNumber) +// } +// +// Outputs: +// +// blockNumber: The last processed block number retrieved from the database. +// err: Any error that occurred during the database operation. +func (p *BoltDBEventTrackerStore) GetLastProcessedBlock() (uint64, error) { + var blockNumber uint64 + + err := p.db.View(func(tx *bolt.Tx) error { + value := tx.Bucket(petLastProcessedBlockBucket).Get(petLastProcessedBlockKey) + if value != nil { + blockNumber = common.EncodeBytesToUint64(value) + } + + return nil + }) + + return blockNumber, err +} + +// InsertLastProcessedBlock inserts the last processed block number into a BoltDB bucket. +// +// Inputs: +// - lastProcessedBlockNumber (uint64): The block number to be inserted into the bucket. +// +// Outputs: +// - error: An error indicating if there was a problem with the transaction or the insertion. +func (p *BoltDBEventTrackerStore) InsertLastProcessedBlock(lastProcessedBlockNumber uint64) error { + return p.db.Update(func(tx *bolt.Tx) error { + return tx.Bucket(petLastProcessedBlockBucket).Put( + petLastProcessedBlockKey, common.EncodeUint64ToBytes(lastProcessedBlockNumber)) + }) +} + +// InsertLogs inserts logs into a BoltDB database, where logs are stored by a composite key: +// - {log.BlockNumber,log.LogIndex} +// +// Example Usage: +// +// store := &BoltDBEventTrackerStore{db: boltDB} +// logs := []*ethgo.Log{log1, log2, log3} +// err := store.InsertLogs(logs) +// if err != nil { +// fmt.Println("Error inserting logs:", err) +// } +// +// Inputs: +// - logs: A slice of ethgo.Log structs representing the logs to be inserted into the database. +// +// Outputs: +// - error: If an error occurs during the insertion process, it is returned. Otherwise, nil is returned. +func (p *BoltDBEventTrackerStore) InsertLogs(logs []*ethgo.Log) error { + return p.db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket(petLogsBucket) + for _, log := range logs { + raw, err := json.Marshal(log) + if err != nil { + return err + } + + logKey := bytes.Join([][]byte{ + common.EncodeUint64ToBytes(log.BlockNumber), + common.EncodeUint64ToBytes(log.LogIndex)}, nil) + if err := bucket.Put(logKey, raw); err != nil { + return err + } + } + + return nil + }) +} + +// GetLogsByBlockNumber retrieves all logs that happened in given block from a BoltDB database. +// +// Example Usage: +// +// store := &BoltDBEventTrackerStore{db: boltDB} +// block := uint64(10) +// logs, err := store.GetLogsByBlockNumber(block) +// if err != nil { +// fmt.Println("Error getting logs for block: %d. Err: %w", block, err) +// } +// +// Inputs: +// - blockNumber (uint64): The block number for which the logs need to be retrieved. +// +// Outputs: +// - logs ([]*ethgo.Log): The logs retrieved from the database for the given block number. +// - err (error): Any error that occurred during the transaction or unmarshaling process. +func (p *BoltDBEventTrackerStore) GetLogsByBlockNumber(blockNumber uint64) ([]*ethgo.Log, error) { + var logs []*ethgo.Log + + err := p.db.View(func(tx *bolt.Tx) error { + c := tx.Bucket(petLogsBucket).Cursor() + prefix := common.EncodeUint64ToBytes(blockNumber) + + for k, v := c.Seek(prefix); bytes.HasPrefix(k, prefix); k, v = c.Next() { + var log *ethgo.Log + if err := json.Unmarshal(v, &log); err != nil { + return err + } + + logs = append(logs, log) + } + + return nil + }) + + return logs, err +} + +// GetLog retrieves a log from the BoltDB database based on the given block number and log index. +// +// Example Usage: +// +// store := &BoltDBEventTrackerStore{db: boltDB} +// block := uint64(10) +// logIndex := uint64(1) +// log, err := store.GetLog(block, logIndex) +// if err != nil { +// fmt.Println("Error getting log of index: %d for block: %d. Err: %w", logIndex, block, err) +// } +// +// Inputs: +// - blockNumber (uint64): The block number of the desired log. +// - logIndex (uint64): The index of the desired log within the block. +// +// Outputs: +// - log (*ethgo.Log): The retrieved log from the BoltDB database. If the log does not exist, it will be nil. +// - err (error): Any error that occurred during the database operation. If no error occurred, it will be nil. +func (p *BoltDBEventTrackerStore) GetLog(blockNumber, logIndex uint64) (*ethgo.Log, error) { + var log *ethgo.Log + + err := p.db.View(func(tx *bolt.Tx) error { + logKey := bytes.Join([][]byte{ + common.EncodeUint64ToBytes(blockNumber), + common.EncodeUint64ToBytes(logIndex)}, nil) + + val := tx.Bucket(petLogsBucket).Get(logKey) + if val == nil { + return nil + } + + return json.Unmarshal(val, &log) + }) + + return log, err +} + +// GetAllLogs retrieves all logs from the logs bucket in the BoltDB database and +// returns them as a slice of ethgo.Log structs. +// +// Example Usage: +// store := NewBoltDBEventTrackerStore("path/to/db") +// logs, err := store.GetAllLogs() +// +// if err != nil { +// fmt.Println("Error:", err) +// return +// } +// +// for _, log := range logs { +// fmt.Println(log) +// } +// +// Outputs: +// The code snippet returns a slice of ethgo.Log structs (logs) and an error (err). +// The logs slice contains all the logs stored in the logs bucket in the BoltDB database. +// The error will be non-nil if there was an issue with the read transaction or unmarshaling the log structs. +func (p *BoltDBEventTrackerStore) GetAllLogs() ([]*ethgo.Log, error) { + var logs []*ethgo.Log + + err := p.db.View(func(tx *bolt.Tx) error { + return tx.Bucket(petLogsBucket).ForEach(func(k, v []byte) error { + var log *ethgo.Log + if err := json.Unmarshal(v, &log); err != nil { + return err + } + + logs = append(logs, log) + + return nil + }) + }) + + return logs, err +} + +// TrackerBlockContainer is a struct used to cache and manage tracked blocks from tracked chain. +// It keeps a map of block numbers to hashes, a slice of block numbers to process, +// and the last processed confirmed block number. +// It also uses a mutex to handle concurrent access to the struct +type TrackerBlockContainer struct { + numToHashMap map[uint64]ethgo.Hash + blocks []uint64 + lastProcessedConfirmedBlock uint64 + mux sync.RWMutex +} + +// NewTrackerBlockContainer is a constructor function that creates a +// new instance of the TrackerBlockContainer struct. +// +// Example Usage: +// +// t := NewTrackerBlockContainer(1) +// +// Inputs: +// - lastProcessed (uint64): The last processed block number. +// +// Outputs: +// - A new instance of the TrackerBlockContainer struct with the lastProcessedConfirmedBlock +// field set to the input lastProcessed block number and an empty numToHashMap map. +func NewTrackerBlockContainer(lastProcessed uint64) *TrackerBlockContainer { + return &TrackerBlockContainer{ + lastProcessedConfirmedBlock: lastProcessed, + numToHashMap: make(map[uint64]ethgo.Hash), + } +} + +// AcquireWriteLock acquires the write lock on the TrackerBlockContainer +func (t *TrackerBlockContainer) AcquireWriteLock() { + t.mux.Lock() +} + +// ReleaseWriteLock releases the write lock on the TrackerBlockContainer +func (t *TrackerBlockContainer) ReleaseWriteLock() { + t.mux.Unlock() +} + +// LastProcessedBlockLocked returns number of last processed block for logs +// Function acquires the read lock before accessing the lastProcessedConfirmedBlock field +func (t *TrackerBlockContainer) LastProcessedBlock() uint64 { + t.mux.RLock() + defer t.mux.RUnlock() + + return t.LastProcessedBlockLocked() +} + +// LastProcessedBlockLocked returns number of last processed block for logs +// Function assumes that the read or write lock is already acquired before accessing +// the lastProcessedConfirmedBlock field +func (t *TrackerBlockContainer) LastProcessedBlockLocked() uint64 { + return t.lastProcessedConfirmedBlock +} + +// LastCachedBlock returns the block number of the last cached block for processing +// +// Example Usage: +// +// t := NewTrackerBlockContainer(1) +// t.AddBlock(ðgo.Block{Number: 1, Hash: "hash1"}) +// t.AddBlock(ðgo.Block{Number: 2, Hash: "hash2"}) +// t.AddBlock(ðgo.Block{Number: 3, Hash: "hash3"}) +// +// lastCachedBlock := t.LastCachedBlock() +// fmt.Println(lastCachedBlock) // Output: 3 +// +// Outputs: +// +// The output is a uint64 value representing the block number of the last cached block. +func (t *TrackerBlockContainer) LastCachedBlock() uint64 { + if len(t.blocks) > 0 { + return t.blocks[len(t.blocks)-1] + } + + return 0 +} + +// AddBlock adds a new block to the tracker by storing its number and hash in the numToHashMap map +// and appending the block number to the blocks slice. +// +// Inputs: +// - block (ethgo.Block): The block to be added to the tracker cache for later processing, +// once it hits confirmation number. +func (t *TrackerBlockContainer) AddBlock(block *ethgo.Block) error { + if hash, exists := t.numToHashMap[block.Number-1]; len(t.blocks) > 0 && (!exists || block.ParentHash != hash) { + return fmt.Errorf("no parent for block %d, or a reorg happened", block.Number) + } + + t.numToHashMap[block.Number] = block.Hash + t.blocks = append(t.blocks, block.Number) + + return nil +} + +// RemoveBlocks removes processed blocks from cached maps, +// and updates the lastProcessedConfirmedBlock variable, to the last processed block. +// +// Inputs: +// - from (uint64): The starting block number to remove. +// - last (uint64): The ending block number to remove. +// +// Returns: +// - nil if removal is successful +// - An error if from block is greater than the last, if given range of blocks was already processed and removed, +// if the last block could not be found in cached blocks, or if we are trying to do a non sequential removal +func (t *TrackerBlockContainer) RemoveBlocks(from, last uint64) error { + if from > last { + return fmt.Errorf("from block: %d, greater than last block: %d", from, last) + } + + if last < t.lastProcessedConfirmedBlock { + return fmt.Errorf("blocks until block: %d are already processed and removed", last) + } + + lastIndex := t.indexOf(last) + if lastIndex == -1 { + return fmt.Errorf("could not find last block: %d in cached blocks", last) + } + + removedBlocks := t.blocks[:lastIndex+1] + remainingBlocks := t.blocks[lastIndex+1:] + + if removedBlocks[0] != from { + return fmt.Errorf("trying to do non-sequential removal of blocks. from: %d, last: %d", from, last) + } + + for i := from; i <= last; i++ { + delete(t.numToHashMap, i) + } + + t.blocks = remainingBlocks + t.lastProcessedConfirmedBlock = last + + return nil +} + +// CleanState resets the state of the TrackerBlockContainer +// by clearing the numToHashMap map and setting the blocks slice to an empty slice +// Called when a reorg happened or we are completely out of sync +func (t *TrackerBlockContainer) CleanState() { + t.numToHashMap = make(map[uint64]ethgo.Hash) + t.blocks = make([]uint64, 0) +} + +// IsOutOfSync checks if tracker is out of sync with the tracked chain. +// Tracker is out of sync with the tracked chain if these conditions are met: +// - latest block from chain has higher number than the last tracked block +// - its parent doesn't exist in numToHash map +// - its parent hash doesn't match with the hash of the given parent block we tracked, +// meaning, a reorg happened +// +// Inputs: +// - block (ethgo.Block): The latest block of the tracked chain. +// +// Outputs: +// - outOfSync (bool): A boolean value indicating if the tracker is out of sync (true) or not (false). +func (t *TrackerBlockContainer) IsOutOfSync(block *ethgo.Block) bool { + t.mux.RLock() + defer t.mux.RUnlock() + + if block.Number == 1 { + // if the chain we are tracking just started + return false + } + + parentHash, parentExists := t.numToHashMap[block.Number-1] + + return block.Number > t.LastCachedBlock() && (!parentExists || parentHash != block.ParentHash) +} + +// GetConfirmedBlocks returns a slice of uint64 representing the block numbers of confirmed blocks. +// +// Example Usage: +// +// t := NewTrackerBlockContainer(2) +// t.AddBlock(ðgo.Block{Number: 1, Hash: "hash1"}) +// t.AddBlock(ðgo.Block{Number: 2, Hash: "hash2"}) +// t.AddBlock(ðgo.Block{Number: 3, Hash: "hash3"}) +// +// confirmedBlocks := t.GetConfirmedBlocks(2) +// fmt.Println(confirmedBlocks) // Output: [1] +// +// Inputs: +// - numBlockConfirmations (uint64): The number of block confirmations to consider. +// +// Flow: +// 1. Convert numBlockConfirmations to an integer numBlockConfirmationsInt. +// 2. Check if the length of t.blocks (slice of block numbers) is greater than numBlockConfirmationsInt. +// 3. If it is, return a sub-slice of t.blocks from the beginning to the length of +// t.blocks minus numBlockConfirmationsInt. +// 4. If it is not, return nil. +// +// Outputs: +// - A slice of uint64 representing the block numbers of confirmed blocks. +func (t *TrackerBlockContainer) GetConfirmedBlocks(numBlockConfirmations uint64) []uint64 { + numBlockConfirmationsInt := int(numBlockConfirmations) + if len(t.blocks) > numBlockConfirmationsInt { + return t.blocks[:len(t.blocks)-numBlockConfirmationsInt] + } + + return nil +} + +// indexOf returns the index of a given block number in the blocks slice. +// If the block number is not found, it returns -1 +func (t *TrackerBlockContainer) indexOf(block uint64) int { + index := -1 + + for i, b := range t.blocks { + if b == block { + index = i + + break + } + } + + return index +} diff --git a/consensus/polybft/eventtracker/event_tracker_store_test.go b/consensus/polybft/eventtracker/event_tracker_store_test.go new file mode 100644 index 0000000000..e52478e9c1 --- /dev/null +++ b/consensus/polybft/eventtracker/event_tracker_store_test.go @@ -0,0 +1,480 @@ +package eventtracker + +import ( + "fmt" + "os" + "path" + "testing" + "time" + + "github.com/0xPolygon/polygon-edge/consensus/polybft/contractsapi" + "github.com/0xPolygon/polygon-edge/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/umbracle/ethgo" + "github.com/umbracle/ethgo/abi" +) + +// newTestState creates new instance of state used by tests. +func newTestTrackerStore(tb testing.TB) *BoltDBEventTrackerStore { + tb.Helper() + + dir := fmt.Sprintf("/tmp/even-tracker-temp_%v", time.Now().UTC().Format(time.RFC3339Nano)) + err := os.Mkdir(dir, 0775) + + if err != nil { + tb.Fatal(err) + } + + store, err := NewBoltDBEventTrackerStore(path.Join(dir, "tracker.db")) + if err != nil { + tb.Fatal(err) + } + + tb.Cleanup(func() { + if err := os.RemoveAll(dir); err != nil { + tb.Fatal(err) + } + }) + + return store +} + +func TestEventTrackerStore_InsertAndGetLastProcessedBlock(t *testing.T) { + t.Parallel() + + t.Run("No blocks inserted", func(t *testing.T) { + t.Parallel() + + store := newTestTrackerStore(t) + + result, err := store.GetLastProcessedBlock() + require.NoError(t, err) + require.Equal(t, uint64(0), result) + }) + + t.Run("Has a block inserted", func(t *testing.T) { + t.Parallel() + + store := newTestTrackerStore(t) + + require.NoError(t, store.InsertLastProcessedBlock(10)) + + result, err := store.GetLastProcessedBlock() + require.NoError(t, err) + require.Equal(t, uint64(10), result) + }) + + t.Run("Insert a bunch of blocks - only the last one should be the result", func(t *testing.T) { + t.Parallel() + + store := newTestTrackerStore(t) + + for i := uint64(0); i <= 20; i++ { + require.NoError(t, store.InsertLastProcessedBlock(i)) + } + + result, err := store.GetLastProcessedBlock() + require.NoError(t, err) + require.Equal(t, uint64(20), result) + }) +} + +func TestEventTrackerStore_InsertAndGetLogs(t *testing.T) { + t.Parallel() + + t.Run("No logs inserted", func(t *testing.T) { + t.Parallel() + + store := newTestTrackerStore(t) + + log, err := store.GetLog(1, 1) + require.NoError(t, err) + require.Nil(t, log) + + logs, err := store.GetLogsByBlockNumber(1) + require.NoError(t, err) + require.Nil(t, logs) + }) + + t.Run("Has some logs in store, but no desired log", func(t *testing.T) { + t.Parallel() + + store := newTestTrackerStore(t) + + require.NoError(t, store.InsertLogs([]*ethgo.Log{createTestLogForStateSyncEvent(t, 1, 0)})) + + log, err := store.GetLog(1, 1) + require.NoError(t, err) + require.Nil(t, log) + }) + + t.Run("Has some logs in store, but no desired logs for specific block", func(t *testing.T) { + t.Parallel() + + store := newTestTrackerStore(t) + + require.NoError(t, store.InsertLogs([]*ethgo.Log{createTestLogForStateSyncEvent(t, 1, 0)})) + + logs, err := store.GetLogsByBlockNumber(2) + require.NoError(t, err) + require.Nil(t, logs) + }) + + t.Run("Has bunch of logs", func(t *testing.T) { + t.Parallel() + + numOfBlocks := 10 + numOfLogsPerBlock := 5 + + store := newTestTrackerStore(t) + + for i := 1; i <= numOfBlocks; i++ { + blockLogs := make([]*ethgo.Log, numOfLogsPerBlock) + for j := 0; j < numOfLogsPerBlock; j++ { + blockLogs[j] = createTestLogForStateSyncEvent(t, uint64(i), uint64(j)) + } + + require.NoError(t, store.InsertLogs(blockLogs)) + } + + // check if the num of blocks per each block matches expected values + for i := 1; i <= numOfBlocks; i++ { + logs, err := store.GetLogsByBlockNumber(uint64(i)) + require.NoError(t, err) + require.Len(t, logs, numOfLogsPerBlock) + } + + // get logs for non existing block + logs, err := store.GetLogsByBlockNumber(uint64(numOfBlocks + 1)) + require.NoError(t, err) + require.Nil(t, logs) + + // get specific logs + for i := 1; i <= numOfBlocks; i++ { + for j := 0; j < numOfLogsPerBlock; j++ { + log, err := store.GetLog(uint64(i), uint64(j)) + require.NoError(t, err) + require.NotNil(t, log) + require.Equal(t, uint64(i), log.BlockNumber) + require.Equal(t, uint64(j), log.LogIndex) + } + } + + // get some non existing logs + log, err := store.GetLog(1, uint64(numOfLogsPerBlock+1)) + require.NoError(t, err) + require.Nil(t, log) + + log, err = store.GetLog(uint64(numOfBlocks+1), uint64(numOfLogsPerBlock+1)) + require.NoError(t, err) + require.Nil(t, log) + }) +} + +func TestTrackerBlockContainer_GetConfirmedBlocks(t *testing.T) { + t.Parallel() + + tbc := NewTrackerBlockContainer(0) + + t.Run("Number of blocks is greater than numBlockConfirmations", func(t *testing.T) { + t.Parallel() + + tbc.blocks = []uint64{1, 2, 3, 4, 5} + + numBlockConfirmations := uint64(2) + expected := []uint64{1, 2, 3} + + result := tbc.GetConfirmedBlocks(numBlockConfirmations) + + require.Equal(t, expected, result) + }) + + t.Run("Number of blocks is less or equal than numBlockConfirmations", func(t *testing.T) { + t.Parallel() + + tbc.blocks = []uint64{1, 2, 3} + numBlockConfirmations := uint64(3) + + result := tbc.GetConfirmedBlocks(numBlockConfirmations) + + require.Nil(t, result) + }) + + t.Run("numBlockConfirmations is 0", func(t *testing.T) { + t.Parallel() + + tbc.blocks = []uint64{1, 2, 3} + numBlockConfirmations := uint64(0) + + result := tbc.GetConfirmedBlocks(numBlockConfirmations) + + require.Equal(t, tbc.blocks, result) + }) + + t.Run("numBlockConfirmations is 1", func(t *testing.T) { + t.Parallel() + + tbc.blocks = []uint64{1, 2, 3} + + numBlockConfirmations := uint64(1) + expected := []uint64{1, 2} + + result := tbc.GetConfirmedBlocks(numBlockConfirmations) + + require.Equal(t, expected, result) + }) + + t.Run("No blocks cached", func(t *testing.T) { + t.Parallel() + + tbc.blocks = []uint64{} + + numBlockConfirmations := uint64(0) + + result := tbc.GetConfirmedBlocks(numBlockConfirmations) + + require.Nil(t, result) + }) +} + +func TestTrackerBlockContainer_IsOutOfSync(t *testing.T) { + t.Parallel() + + t.Run("Block number greater than last cached block and parent hash matches", func(t *testing.T) { + t.Parallel() + + tbc := NewTrackerBlockContainer(0) + + cachedBlock := ðgo.Block{ + Number: 2, + Hash: ethgo.Hash{2}, + ParentHash: ethgo.Hash{1}, + } + require.NoError(t, tbc.AddBlock(cachedBlock)) + + latestBlock := ðgo.Block{ + Number: 3, + Hash: ethgo.Hash{3}, + ParentHash: cachedBlock.Hash, + } + + require.False(t, tbc.IsOutOfSync(latestBlock)) + }) + + t.Run("Latest block number equal to 1 (start of chain)", func(t *testing.T) { + t.Parallel() + + tbc := NewTrackerBlockContainer(0) + + require.False(t, tbc.IsOutOfSync(ðgo.Block{Number: 1})) + }) + + t.Run("Block number greater than last cached block and parent hash does not match", func(t *testing.T) { + t.Parallel() + + tbc := NewTrackerBlockContainer(0) + + cachedBlock := ðgo.Block{ + Number: 2, + Hash: ethgo.Hash{2}, + ParentHash: ethgo.Hash{1}, + } + require.NoError(t, tbc.AddBlock(cachedBlock)) + + latestBlock := ðgo.Block{ + Number: 3, + Hash: ethgo.Hash{3}, + ParentHash: ethgo.Hash{22}, // some other parent + } + + require.True(t, tbc.IsOutOfSync(latestBlock)) + }) + + t.Run("Block number less or equal to the last cached block", func(t *testing.T) { + t.Parallel() + + tbc := NewTrackerBlockContainer(0) + + cachedBlock := ðgo.Block{ + Number: 2, + Hash: ethgo.Hash{2}, + ParentHash: ethgo.Hash{1}, + } + require.NoError(t, tbc.AddBlock(cachedBlock)) + + require.False(t, tbc.IsOutOfSync(cachedBlock)) + }) + + t.Run("Block number greater than the last cached block, and parent does not exist", func(t *testing.T) { + t.Parallel() + + tbc := NewTrackerBlockContainer(0) + + cachedBlock := ðgo.Block{ + Number: 12, + Hash: ethgo.Hash{2}, + ParentHash: ethgo.Hash{1}, + } + require.NoError(t, tbc.AddBlock(cachedBlock)) + + require.False(t, tbc.IsOutOfSync(cachedBlock)) + }) +} + +func TestTrackerBlockContainer_RemoveBlocks(t *testing.T) { + t.Parallel() + + t.Run("Remove blocks from 'from' to 'last' index, and update lastProcessedConfirmedBlock", func(t *testing.T) { + t.Parallel() + + tbc := NewTrackerBlockContainer(0) + + // Add some blocks to the container + require.NoError(t, tbc.AddBlock(ðgo.Block{Number: 1, Hash: ethgo.Hash{1}, ParentHash: ethgo.Hash{0}})) + require.NoError(t, tbc.AddBlock(ðgo.Block{Number: 2, Hash: ethgo.Hash{2}, ParentHash: ethgo.Hash{1}})) + require.NoError(t, tbc.AddBlock(ðgo.Block{Number: 3, Hash: ethgo.Hash{3}, ParentHash: ethgo.Hash{2}})) + require.NoError(t, tbc.AddBlock(ðgo.Block{Number: 4, Hash: ethgo.Hash{4}, ParentHash: ethgo.Hash{3}})) + require.NoError(t, tbc.AddBlock(ðgo.Block{Number: 5, Hash: ethgo.Hash{5}, ParentHash: ethgo.Hash{4}})) + + require.NoError(t, tbc.RemoveBlocks(1, 3)) + + // Check if the blocks and lastProcessedConfirmedBlock are updated correctly + require.Equal(t, []uint64{4, 5}, tbc.blocks) + require.Equal(t, uint64(3), tbc.lastProcessedConfirmedBlock) + }) + + t.Run("Remove blocks from 'from' to 'last' index, where 'from' is greater than 'last'", func(t *testing.T) { + t.Parallel() + + tbc := NewTrackerBlockContainer(0) + + // Add some blocks to the container + require.NoError(t, tbc.AddBlock(ðgo.Block{Number: 1, Hash: ethgo.Hash{1}, ParentHash: ethgo.Hash{0}})) + require.NoError(t, tbc.AddBlock(ðgo.Block{Number: 2, Hash: ethgo.Hash{2}, ParentHash: ethgo.Hash{1}})) + require.NoError(t, tbc.AddBlock(ðgo.Block{Number: 3, Hash: ethgo.Hash{3}, ParentHash: ethgo.Hash{2}})) + + require.ErrorContains(t, tbc.RemoveBlocks(3, 1), "greater than last block") + }) + + t.Run("Remove blocks from 'from' to 'last' index, where given already removed", func(t *testing.T) { + t.Parallel() + + tbc := NewTrackerBlockContainer(30) + + // Add some blocks to the container + require.NoError(t, tbc.AddBlock(ðgo.Block{Number: 21, Hash: ethgo.Hash{21}, ParentHash: ethgo.Hash{20}})) + require.NoError(t, tbc.AddBlock(ðgo.Block{Number: 22, Hash: ethgo.Hash{22}, ParentHash: ethgo.Hash{21}})) + require.NoError(t, tbc.AddBlock(ðgo.Block{Number: 23, Hash: ethgo.Hash{23}, ParentHash: ethgo.Hash{22}})) + + require.ErrorContains(t, tbc.RemoveBlocks(18, 19), "are already processed and removed") + }) + + t.Run("Remove blocks from 'from' to 'last' index, where last does not exist in cached blocks", func(t *testing.T) { + t.Parallel() + + tbc := NewTrackerBlockContainer(10) + + // Add some blocks to the container + require.NoError(t, tbc.AddBlock(ðgo.Block{Number: 11, Hash: ethgo.Hash{11}, ParentHash: ethgo.Hash{10}})) + require.NoError(t, tbc.AddBlock(ðgo.Block{Number: 12, Hash: ethgo.Hash{12}, ParentHash: ethgo.Hash{11}})) + require.NoError(t, tbc.AddBlock(ðgo.Block{Number: 13, Hash: ethgo.Hash{13}, ParentHash: ethgo.Hash{12}})) + require.NoError(t, tbc.AddBlock(ðgo.Block{Number: 14, Hash: ethgo.Hash{14}, ParentHash: ethgo.Hash{13}})) + + require.ErrorContains(t, tbc.RemoveBlocks(11, 15), "could not find last block") + }) + + t.Run("Remove all blocks", func(t *testing.T) { + t.Parallel() + + tbc := NewTrackerBlockContainer(0) + + // Add some blocks to the container + require.NoError(t, tbc.AddBlock(ðgo.Block{Number: 1, Hash: ethgo.Hash{1}, ParentHash: ethgo.Hash{0}})) + require.NoError(t, tbc.AddBlock(ðgo.Block{Number: 2, Hash: ethgo.Hash{2}, ParentHash: ethgo.Hash{1}})) + require.NoError(t, tbc.AddBlock(ðgo.Block{Number: 3, Hash: ethgo.Hash{3}, ParentHash: ethgo.Hash{2}})) + require.NoError(t, tbc.AddBlock(ðgo.Block{Number: 4, Hash: ethgo.Hash{4}, ParentHash: ethgo.Hash{3}})) + require.NoError(t, tbc.AddBlock(ðgo.Block{Number: 5, Hash: ethgo.Hash{5}, ParentHash: ethgo.Hash{4}})) + + require.NoError(t, tbc.RemoveBlocks(1, 5)) + + require.Empty(t, tbc.blocks) + require.Equal(t, uint64(5), tbc.lastProcessedConfirmedBlock) + }) + + t.Run("Remove single block", func(t *testing.T) { + t.Parallel() + + tbc := NewTrackerBlockContainer(0) + + // Add some blocks to the container + require.NoError(t, tbc.AddBlock(ðgo.Block{Number: 1, Hash: ethgo.Hash{1}, ParentHash: ethgo.Hash{0}})) + require.NoError(t, tbc.AddBlock(ðgo.Block{Number: 2, Hash: ethgo.Hash{2}, ParentHash: ethgo.Hash{1}})) + require.NoError(t, tbc.AddBlock(ðgo.Block{Number: 3, Hash: ethgo.Hash{3}, ParentHash: ethgo.Hash{2}})) + require.NoError(t, tbc.AddBlock(ðgo.Block{Number: 4, Hash: ethgo.Hash{4}, ParentHash: ethgo.Hash{3}})) + require.NoError(t, tbc.AddBlock(ðgo.Block{Number: 5, Hash: ethgo.Hash{5}, ParentHash: ethgo.Hash{4}})) + + require.NoError(t, tbc.RemoveBlocks(1, 1)) + + assert.Equal(t, []uint64{2, 3, 4, 5}, tbc.blocks) + assert.Equal(t, uint64(1), tbc.lastProcessedConfirmedBlock) + }) + + t.Run("Try to do non-sequential removal of blocks", func(t *testing.T) { + t.Parallel() + + tbc := NewTrackerBlockContainer(110) + + // Add some blocks to the container + require.NoError(t, tbc.AddBlock(ðgo.Block{Number: 111, Hash: ethgo.Hash{111}, ParentHash: ethgo.Hash{110}})) + require.NoError(t, tbc.AddBlock(ðgo.Block{Number: 112, Hash: ethgo.Hash{112}, ParentHash: ethgo.Hash{111}})) + require.NoError(t, tbc.AddBlock(ðgo.Block{Number: 113, Hash: ethgo.Hash{113}, ParentHash: ethgo.Hash{112}})) + require.NoError(t, tbc.AddBlock(ðgo.Block{Number: 114, Hash: ethgo.Hash{114}, ParentHash: ethgo.Hash{113}})) + require.NoError(t, tbc.AddBlock(ðgo.Block{Number: 115, Hash: ethgo.Hash{115}, ParentHash: ethgo.Hash{114}})) + require.NoError(t, tbc.AddBlock(ðgo.Block{Number: 116, Hash: ethgo.Hash{116}, ParentHash: ethgo.Hash{115}})) + require.NoError(t, tbc.AddBlock(ðgo.Block{Number: 117, Hash: ethgo.Hash{117}, ParentHash: ethgo.Hash{116}})) + + // Try to remove last 2 blocks without first removing first 3 + require.ErrorContains(t, tbc.RemoveBlocks(113, 115), "trying to do non-sequential removal") + }) +} + +func TestTrackerBlockContainer_AddBlockAndLastCachedBlock(t *testing.T) { + t.Parallel() + + numOfBlocks := 30 + tbc := NewTrackerBlockContainer(0) + + for i := uint64(1); i <= uint64(numOfBlocks); i++ { + require.NoError(t, tbc.AddBlock(ðgo.Block{Number: i, Hash: ethgo.Hash{byte(i)}, ParentHash: ethgo.Hash{byte(i - 1)}})) + require.Equal(t, i, tbc.LastCachedBlock()) + } + + require.Len(t, tbc.blocks, numOfBlocks) + require.Len(t, tbc.numToHashMap, numOfBlocks) + + for i := 0; i < numOfBlocks; i++ { + require.Equal(t, tbc.blocks[i], uint64(i+1)) + } +} + +func createTestLogForStateSyncEvent(t *testing.T, blockNumber, logIndex uint64) *ethgo.Log { + t.Helper() + + var transferEvent contractsapi.StateSyncedEvent + + topics := make([]ethgo.Hash, 3) + topics[0] = transferEvent.Sig() + topics[1] = ethgo.BytesToHash(types.ZeroAddress.Bytes()) + topics[2] = ethgo.BytesToHash(types.ZeroAddress.Bytes()) + encodedData, err := abi.MustNewType("tuple(string a)").Encode([]string{"data"}) + require.NoError(t, err) + + return ðgo.Log{ + BlockNumber: blockNumber, + LogIndex: logIndex, + Address: ethgo.ZeroAddress, + Topics: topics, + Data: encodedData, + } +} diff --git a/consensus/polybft/eventtracker/event_tracker_test.go b/consensus/polybft/eventtracker/event_tracker_test.go new file mode 100644 index 0000000000..907f7da651 --- /dev/null +++ b/consensus/polybft/eventtracker/event_tracker_test.go @@ -0,0 +1,610 @@ +package eventtracker + +import ( + "errors" + "testing" + "time" + + "github.com/0xPolygon/polygon-edge/consensus/polybft/contractsapi" + "github.com/hashicorp/go-hclog" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/umbracle/ethgo" +) + +var _ EventSubscriber = (*mockEventSubscriber)(nil) + +type mockEventSubscriber struct { + logs []*ethgo.Log +} + +func (m *mockEventSubscriber) AddLog(log *ethgo.Log) error { + m.logs = append(m.logs, log) + + return nil +} + +var _ BlockProvider = (*mockProvider)(nil) + +type mockProvider struct { + mock.Mock + + blocks map[uint64]*ethgo.Block + logs []*ethgo.Log +} + +// GetBlockByHash implements tracker.Provider. +func (m *mockProvider) GetBlockByHash(hash ethgo.Hash, full bool) (*ethgo.Block, error) { + args := m.Called(hash, full) + + return1 := args.Get(0) + + if return1 != nil { + return return1.(*ethgo.Block), args.Error(1) //nolint:forcetypeassert + } + + return nil, args.Error(1) +} + +// GetBlockByNumber implements tracker.Provider. +func (m *mockProvider) GetBlockByNumber(i ethgo.BlockNumber, full bool) (*ethgo.Block, error) { + args := m.Called(i, full) + + if m.blocks != nil { + return m.blocks[uint64(i)], nil + } + + return1 := args.Get(0) + + if return1 != nil { + return return1.(*ethgo.Block), args.Error(1) //nolint:forcetypeassert + } + + return nil, args.Error(1) +} + +// GetLogs implements tracker.Provider. +func (m *mockProvider) GetLogs(filter *ethgo.LogFilter) ([]*ethgo.Log, error) { + args := m.Called(filter) + + if len(m.logs) > 0 { + returnLog := m.logs[0] + m.logs = m.logs[1:] + + return []*ethgo.Log{returnLog}, nil + } + + return1 := args.Get(0) + + if return1 != nil { + return return1.([]*ethgo.Log), args.Error(1) //nolint:forcetypeassert + } + + return nil, args.Error(1) +} + +func TestEventTracker_TrackBlock(t *testing.T) { + t.Parallel() + + t.Run("Add block by block - no confirmed blocks", func(t *testing.T) { + t.Parallel() + + tracker, err := NewEventTracker(createTestTrackerConfig(t, 10, 10, 0), newTestTrackerStore(t), 0) + + require.NoError(t, err) + + // add some blocks, but don't go to confirmation level + for i := uint64(1); i <= tracker.config.NumBlockConfirmations; i++ { + require.NoError(t, tracker.trackBlock( + ðgo.Block{ + Number: i, + Hash: ethgo.Hash{byte(i)}, + ParentHash: ethgo.Hash{byte(i - 1)}, + })) + } + + // check that we have correct number of cached blocks + require.Len(t, tracker.blockContainer.blocks, int(tracker.config.NumBlockConfirmations)) + require.Len(t, tracker.blockContainer.numToHashMap, int(tracker.config.NumBlockConfirmations)) + + // check that we have no confirmed blocks + require.Nil(t, tracker.blockContainer.GetConfirmedBlocks(tracker.config.NumBlockConfirmations)) + + // check that the last processed block is 0, since we did not have any confirmed blocks + require.Equal(t, uint64(0), tracker.blockContainer.LastProcessedBlockLocked()) + lastProcessedBlockInStore, err := tracker.store.GetLastProcessedBlock() + require.NoError(t, err) + require.Equal(t, uint64(0), lastProcessedBlockInStore) + + // check that the last cached block is as expected + require.Equal(t, tracker.config.NumBlockConfirmations, tracker.blockContainer.LastCachedBlock()) + }) + + t.Run("Add block by block - have confirmed blocks - no logs in them", func(t *testing.T) { + t.Parallel() + + numBlockConfirmations := uint64(3) + totalNumOfPreCachedBlocks := numBlockConfirmations + 1 + numOfConfirmedBlocks := totalNumOfPreCachedBlocks - numBlockConfirmations + 1 + + // mock logs return so that no confirmed block has any logs we need + blockProviderMock := new(mockProvider) + blockProviderMock.On("GetLogs", mock.Anything).Return([]*ethgo.Log{}, nil).Once() + + tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, 10, 0), + newTestTrackerStore(t), 0) + require.NoError(t, err) + + tracker.config.BlockProvider = blockProviderMock + + // add some blocks + var block *ethgo.Block + for i := uint64(1); i <= totalNumOfPreCachedBlocks; i++ { + block = ðgo.Block{ + Number: i, + Hash: ethgo.Hash{byte(i)}, + ParentHash: ethgo.Hash{byte(i - 1)}, + } + require.NoError(t, tracker.blockContainer.AddBlock(block)) + } + + // check that we have correct number of cached blocks + require.Len(t, tracker.blockContainer.blocks, int(totalNumOfPreCachedBlocks)) + require.Len(t, tracker.blockContainer.numToHashMap, int(totalNumOfPreCachedBlocks)) + + // track new block + latestBlock := ðgo.Block{ + Number: block.Number + 1, + Hash: ethgo.Hash{byte(block.Number + 1)}, + ParentHash: block.Hash, + } + require.NoError(t, tracker.trackBlock(latestBlock)) + + // check if the last cached block is as expected + require.Equal(t, latestBlock.Number, tracker.blockContainer.LastCachedBlock()) + // check if the last confirmed block processed is as expected + require.Equal(t, numOfConfirmedBlocks, tracker.blockContainer.LastProcessedBlock()) + // check if the last confirmed block is saved in db as well + lastProcessedConfirmedBlock, err := tracker.store.GetLastProcessedBlock() + require.NoError(t, err) + require.Equal(t, numOfConfirmedBlocks, lastProcessedConfirmedBlock) + // check that in memory cache removed processed confirmed logs + expectedNumOfBlocksInCache := totalNumOfPreCachedBlocks + 1 - numOfConfirmedBlocks + require.Len(t, tracker.blockContainer.blocks, int(expectedNumOfBlocksInCache)) + require.Len(t, tracker.blockContainer.numToHashMap, int(expectedNumOfBlocksInCache)) + + for i := uint64(1); i <= numOfConfirmedBlocks; i++ { + _, exists := tracker.blockContainer.numToHashMap[i] + require.False(t, exists) + require.Equal(t, -1, tracker.blockContainer.indexOf(i)) + } + + blockProviderMock.AssertExpectations(t) + }) + + t.Run("Add block by block - have confirmed blocks with logs", func(t *testing.T) { + t.Parallel() + + numBlockConfirmations := uint64(3) + totalNumOfPreCachedBlocks := numBlockConfirmations + 1 + numOfConfirmedBlocks := totalNumOfPreCachedBlocks - numBlockConfirmations + 1 + + // mock logs return so that no confirmed block has any logs we need + logs := []*ethgo.Log{ + createTestLogForStateSyncEvent(t, 1, 1), + createTestLogForStateSyncEvent(t, 1, 11), + createTestLogForStateSyncEvent(t, 2, 3), + } + blockProviderMock := new(mockProvider) + blockProviderMock.On("GetLogs", mock.Anything).Return(logs, nil).Once() + + tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, 10, 0), + newTestTrackerStore(t), 0) + require.NoError(t, err) + + tracker.config.BlockProvider = blockProviderMock + + // add some blocks + var block *ethgo.Block + for i := uint64(1); i <= totalNumOfPreCachedBlocks; i++ { + block = ðgo.Block{ + Number: i, + Hash: ethgo.Hash{byte(i)}, + ParentHash: ethgo.Hash{byte(i - 1)}, + } + require.NoError(t, tracker.blockContainer.AddBlock(block)) + } + + // check that we have correct number of cached blocks + require.Len(t, tracker.blockContainer.blocks, int(totalNumOfPreCachedBlocks)) + require.Len(t, tracker.blockContainer.numToHashMap, int(totalNumOfPreCachedBlocks)) + + // track new block + latestBlock := ðgo.Block{ + Number: block.Number + 1, + Hash: ethgo.Hash{byte(block.Number + 1)}, + ParentHash: block.Hash, + } + require.NoError(t, tracker.trackBlock(latestBlock)) + + // check if the last cached block is as expected + require.Equal(t, latestBlock.Number, tracker.blockContainer.LastCachedBlock()) + // check if the last confirmed block processed is as expected + require.Equal(t, numOfConfirmedBlocks, tracker.blockContainer.LastProcessedBlock()) + // check if the last confirmed block is saved in db as well + lastProcessedConfirmedBlock, err := tracker.store.GetLastProcessedBlock() + require.NoError(t, err) + require.Equal(t, numOfConfirmedBlocks, lastProcessedConfirmedBlock) + // check if we have logs in store + for _, log := range logs { + logFromDB, err := tracker.store.GetLog(log.BlockNumber, log.LogIndex) + require.NoError(t, err) + require.Equal(t, log.Address, logFromDB.Address) + require.Equal(t, log.BlockNumber, log.BlockNumber) + require.Equal(t, log.LogIndex, logFromDB.LogIndex) + } + // check that in memory cache removed processed confirmed logs + expectedNumOfBlocksInCache := totalNumOfPreCachedBlocks + 1 - numOfConfirmedBlocks + require.Len(t, tracker.blockContainer.blocks, int(expectedNumOfBlocksInCache)) + require.Len(t, tracker.blockContainer.numToHashMap, int(expectedNumOfBlocksInCache)) + + for i := uint64(1); i <= numOfConfirmedBlocks; i++ { + _, exists := tracker.blockContainer.numToHashMap[i] + require.False(t, exists) + require.Equal(t, -1, tracker.blockContainer.indexOf(i)) + } + + blockProviderMock.AssertExpectations(t) + }) + + t.Run("Add block by block - an error occurs on getting logs", func(t *testing.T) { + t.Parallel() + + numBlockConfirmations := uint64(3) + totalNumOfPreCachedBlocks := numBlockConfirmations + 1 + + // mock logs return so that no confirmed block has any logs we need + blockProviderMock := new(mockProvider) + blockProviderMock.On("GetLogs", mock.Anything).Return(nil, errors.New("some error occurred")).Once() + + tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, 10, 0), + newTestTrackerStore(t), 0) + require.NoError(t, err) + + tracker.config.BlockProvider = blockProviderMock + + // add some blocks + var block *ethgo.Block + for i := uint64(1); i <= totalNumOfPreCachedBlocks; i++ { + block = ðgo.Block{ + Number: i, + Hash: ethgo.Hash{byte(i)}, + ParentHash: ethgo.Hash{byte(i - 1)}, + } + require.NoError(t, tracker.blockContainer.AddBlock(block)) + } + + // check that we have correct number of cached blocks + require.Len(t, tracker.blockContainer.blocks, int(totalNumOfPreCachedBlocks)) + require.Len(t, tracker.blockContainer.numToHashMap, int(totalNumOfPreCachedBlocks)) + + // track new block + latestBlock := ðgo.Block{ + Number: block.Number + 1, + Hash: ethgo.Hash{byte(block.Number + 1)}, + ParentHash: block.Hash, + } + require.ErrorContains(t, tracker.trackBlock(latestBlock), "some error occurred") + + // check if the last cached block is as expected + require.Equal(t, latestBlock.Number, tracker.blockContainer.LastCachedBlock()) + // check if the last confirmed block processed is as expected, in this case 0, because an error occurred + require.Equal(t, uint64(0), tracker.blockContainer.LastProcessedBlock()) + // check if the last confirmed block is saved in db as well + lastProcessedConfirmedBlock, err := tracker.store.GetLastProcessedBlock() + require.NoError(t, err) + require.Equal(t, uint64(0), lastProcessedConfirmedBlock) + // check that in memory cache nothing got removed, and that we have the latest block as well + expectedNumOfBlocksInCache := totalNumOfPreCachedBlocks + 1 // because of the latest block + require.Len(t, tracker.blockContainer.blocks, int(expectedNumOfBlocksInCache)) + require.Len(t, tracker.blockContainer.numToHashMap, int(expectedNumOfBlocksInCache)) + + blockProviderMock.AssertExpectations(t) + }) + + t.Run("Starting tracker - sync up in batches", func(t *testing.T) { + t.Parallel() + + batchSize := uint64(4) + numBlockConfirmations := uint64(3) + numOfMissedBlocks := batchSize * 2 + + blockProviderMock := &mockProvider{blocks: make(map[uint64]*ethgo.Block)} + + // mock logs return so that no confirmed block has any logs we need + logs := []*ethgo.Log{ + createTestLogForStateSyncEvent(t, 1, 1), + createTestLogForStateSyncEvent(t, 2, 3), + createTestLogForStateSyncEvent(t, 6, 11), + } + blockProviderMock.logs = logs + // we will have three groups of confirmed blocks + // syncing blocks: 1, 2, 3, 4, 5, 6, 7, 8, 9 + // first batch of gotten blocks: 1, 2, 3, 4 - confirmed blocks: 1 + // second batch of gotten blocks: 5, 6, 7, 8 - confirmed blocks: 2, 3, 4, 5 + // process the latest block as well (block 9) - confirmed blocks: 6 + // just mock the call, it will use the provider.logs map to handle proper returns + blockProviderMock.On("GetLogs", mock.Anything).Return(nil, nil).Times(len(logs)) + // just mock the call, it will use the provider.blocks map to handle proper returns + blockProviderMock.On("GetBlockByNumber", mock.Anything, mock.Anything).Return(nil, nil).Times(int(numOfMissedBlocks)) + + tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, batchSize, 0), + newTestTrackerStore(t), 0) + require.NoError(t, err) + + tracker.config.BlockProvider = blockProviderMock + + // mock getting missed blocks + var block *ethgo.Block + for i := uint64(1); i <= numOfMissedBlocks; i++ { + block = ðgo.Block{ + Number: i, + Hash: ethgo.Hash{byte(i)}, + ParentHash: ethgo.Hash{byte(i - 1)}, + } + blockProviderMock.blocks[i] = block + } + + // check that initially we don't have anything cached + require.Len(t, tracker.blockContainer.blocks, 0) + require.Len(t, tracker.blockContainer.numToHashMap, 0) + + // track new block + latestBlock := ðgo.Block{ + Number: block.Number + 1, + Hash: ethgo.Hash{byte(block.Number + 1)}, + ParentHash: block.Hash, + } + require.NoError(t, tracker.trackBlock(latestBlock)) + + // check if the last cached block is as expected + require.Equal(t, latestBlock.Number, tracker.blockContainer.LastCachedBlock()) + // check if the last confirmed block processed is as expected + expectedLastProcessed := numOfMissedBlocks + 1 - numBlockConfirmations + require.Equal(t, expectedLastProcessed, tracker.blockContainer.LastProcessedBlock()) + // check if the last confirmed block is saved in db as well + lastProcessedConfirmedBlock, err := tracker.store.GetLastProcessedBlock() + require.NoError(t, err) + require.Equal(t, expectedLastProcessed, lastProcessedConfirmedBlock) + // check if we have logs in store + logsFromDB, err := tracker.store.GetAllLogs() + require.NoError(t, err) + require.Len(t, logsFromDB, len(logs)) + + // check that in memory cache removed processed confirmed logs + require.Len(t, tracker.blockContainer.blocks, int(numOfMissedBlocks+1-expectedLastProcessed)) + require.Len(t, tracker.blockContainer.numToHashMap, int(numOfMissedBlocks+1-expectedLastProcessed)) + for i := expectedLastProcessed + 1; i <= numOfMissedBlocks+1; i++ { + _, exists := tracker.blockContainer.numToHashMap[i] + require.True(t, exists) + require.Equal(t, i, tracker.blockContainer.blocks[i-expectedLastProcessed-1]) + } + + blockProviderMock.AssertExpectations(t) + }) + + t.Run("Sync up in batches - have cached blocks - no reorgs", func(t *testing.T) { + t.Parallel() + + batchSize := uint64(4) + numBlockConfirmations := uint64(3) + numOfMissedBlocks := batchSize * 2 + numOfCachedBlocks := uint64(4) + + blockProviderMock := &mockProvider{blocks: make(map[uint64]*ethgo.Block)} + + // mock logs return so that no confirmed block has any logs we need + logs := []*ethgo.Log{ + createTestLogForStateSyncEvent(t, 1, 1), + createTestLogForStateSyncEvent(t, 2, 3), + createTestLogForStateSyncEvent(t, 6, 11), + createTestLogForStateSyncEvent(t, 10, 1), + } + blockProviderMock.logs = logs + // we will have three groups of confirmed blocks + // have cached blocks, 1, 2, 3, 4 + // cleans state + // syncing blocks: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 + // first batch of gotten blocks: 1, 2, 3, 4 - confirmed blocks: 1 + // second batch of gotten blocks: 5, 6, 7, 8 - confirmed blocks: 2, 3, 4, 5 + // third batch of gotten blocks: 9, 10, 11, 12 - confirmed blocks: 6, 7, 8, 9 + // process the latest block as well (block 13) - confirmed blocks: 10 + // just mock the call, it will use the provider.logs map to handle proper returns + blockProviderMock.On("GetLogs", mock.Anything).Return(nil, nil).Times(len(logs)) + // just mock the call, it will use the provider.blocks map to handle proper returns + blockProviderMock.On("GetBlockByNumber", mock.Anything, mock.Anything).Return(nil, nil).Times(int(numOfMissedBlocks + numOfCachedBlocks)) + + tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, batchSize, 0), + newTestTrackerStore(t), 0) + require.NoError(t, err) + + tracker.config.BlockProvider = blockProviderMock + + var block *ethgo.Block + + // add some cached blocks + for i := uint64(1); i <= numOfCachedBlocks; i++ { + block = ðgo.Block{ + Number: i, + Hash: ethgo.Hash{byte(i)}, + ParentHash: ethgo.Hash{byte(i - 1)}, + } + + blockProviderMock.blocks[i] = block + require.NoError(t, tracker.blockContainer.AddBlock(block)) + } + + // check that initially we have some cached blocks + require.Len(t, tracker.blockContainer.blocks, int(numOfCachedBlocks)) + require.Len(t, tracker.blockContainer.numToHashMap, int(numOfCachedBlocks)) + + // mock getting missed blocks + for i := numOfCachedBlocks + 1; i <= numOfMissedBlocks+numOfCachedBlocks; i++ { + block = ðgo.Block{ + Number: i, + Hash: ethgo.Hash{byte(i)}, + ParentHash: ethgo.Hash{byte(i - 1)}, + } + blockProviderMock.blocks[i] = block + } + + // track new block + latestBlock := ðgo.Block{ + Number: block.Number + 1, + Hash: ethgo.Hash{byte(block.Number + 1)}, + ParentHash: block.Hash, + } + require.NoError(t, tracker.trackBlock(latestBlock)) + + // check if the last cached block is as expected + require.Equal(t, latestBlock.Number, tracker.blockContainer.LastCachedBlock()) + // check if the last confirmed block processed is as expected + expectedLastProcessed := numOfMissedBlocks + numOfCachedBlocks + 1 - numBlockConfirmations + require.Equal(t, expectedLastProcessed, tracker.blockContainer.LastProcessedBlock()) + // check if the last confirmed block is saved in db as well + lastProcessedConfirmedBlock, err := tracker.store.GetLastProcessedBlock() + require.NoError(t, err) + require.Equal(t, expectedLastProcessed, lastProcessedConfirmedBlock) + // check if we have logs in store + logsFromDB, err := tracker.store.GetAllLogs() + require.NoError(t, err) + require.Len(t, logsFromDB, len(logs)) + + // check that in memory cache removed processed confirmed logs + expectedNumOfNonProcessedBlocks := int(numOfMissedBlocks + numOfCachedBlocks + 1 - expectedLastProcessed) + require.Len(t, tracker.blockContainer.blocks, expectedNumOfNonProcessedBlocks) + require.Len(t, tracker.blockContainer.numToHashMap, expectedNumOfNonProcessedBlocks) + for i := expectedLastProcessed + 1; i <= numOfMissedBlocks+1; i++ { + _, exists := tracker.blockContainer.numToHashMap[i] + require.True(t, exists) + require.Equal(t, i, tracker.blockContainer.blocks[i-expectedLastProcessed-1]) + } + + blockProviderMock.AssertExpectations(t) + }) + + t.Run("Sync up in batches - have cached blocks - a reorg happened", func(t *testing.T) { + t.Parallel() + + batchSize := uint64(4) + numBlockConfirmations := uint64(3) + numOfCachedBlocks := uint64(4) + + blockProviderMock := &mockProvider{blocks: make(map[uint64]*ethgo.Block)} + + // mock logs return so that no confirmed block has any logs we need + logs := []*ethgo.Log{ + createTestLogForStateSyncEvent(t, 1, 1), + createTestLogForStateSyncEvent(t, 2, 3), + } + blockProviderMock.logs = logs + // we will have 2 groups of confirmed blocks + // have cached blocks, 1, 2, 3, 4 + // notice there was a reorg on block 5 + // cleans state + // syncing blocks: 1, 2, 3, 4, 5 + // first batch of gotten blocks: 1, 2, 3, 4 - confirmed blocks: 1 + // process the latest block as well (block 5) - confirmed blocks: 2 + // just mock the call, it will use the provider.logs map to handle proper returns + blockProviderMock.On("GetLogs", mock.Anything).Return(nil, nil).Times(len(logs)) + // just mock the call, it will use the provider.blocks map to handle proper returns + blockProviderMock.On("GetBlockByNumber", mock.Anything, mock.Anything).Return(nil, nil).Times(int(numOfCachedBlocks)) + + tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, batchSize, 0), + newTestTrackerStore(t), 0) + require.NoError(t, err) + + tracker.config.BlockProvider = blockProviderMock + + var block *ethgo.Block + + // add some cached blocks + for i := uint64(1); i <= numOfCachedBlocks; i++ { + block = ðgo.Block{ + Number: i, + Hash: ethgo.Hash{byte(i + numOfCachedBlocks)}, + ParentHash: ethgo.Hash{byte(i + numOfCachedBlocks - 1)}, + } + require.NoError(t, tracker.blockContainer.AddBlock(block)) + } + + // check that initially we have some cached blocks + require.Len(t, tracker.blockContainer.blocks, int(numOfCachedBlocks)) + require.Len(t, tracker.blockContainer.numToHashMap, int(numOfCachedBlocks)) + + // mock getting new state + for i := uint64(1); i <= numOfCachedBlocks; i++ { + block = ðgo.Block{ + Number: i, + Hash: ethgo.Hash{byte(i)}, + ParentHash: ethgo.Hash{byte(i - 1)}, + } + blockProviderMock.blocks[i] = block + } + + // track new block + latestBlock := ðgo.Block{ + Number: block.Number + 1, + Hash: ethgo.Hash{byte(block.Number + 1)}, + ParentHash: block.Hash, + } + require.NoError(t, tracker.trackBlock(latestBlock)) + + // check if the last cached block is as expected + require.Equal(t, latestBlock.Number, tracker.blockContainer.LastCachedBlock()) + // check if the last confirmed block processed is as expected + expectedLastProcessed := numOfCachedBlocks + 1 - numBlockConfirmations + require.Equal(t, expectedLastProcessed, tracker.blockContainer.LastProcessedBlock()) + // check if the last confirmed block is saved in db as well + lastProcessedConfirmedBlock, err := tracker.store.GetLastProcessedBlock() + require.NoError(t, err) + require.Equal(t, expectedLastProcessed, lastProcessedConfirmedBlock) + // check if we have logs in store + logsFromDB, err := tracker.store.GetAllLogs() + require.NoError(t, err) + require.Len(t, logsFromDB, len(logs)) + + // check that in memory cache removed processed confirmed logs + expectedNumOfNonProcessedBlocks := int(numOfCachedBlocks + 1 - expectedLastProcessed) + require.Len(t, tracker.blockContainer.blocks, expectedNumOfNonProcessedBlocks) + require.Len(t, tracker.blockContainer.numToHashMap, expectedNumOfNonProcessedBlocks) + for i := expectedLastProcessed + 1; i <= numOfCachedBlocks+1; i++ { + _, exists := tracker.blockContainer.numToHashMap[i] + require.True(t, exists) + require.Equal(t, i, tracker.blockContainer.blocks[i-expectedLastProcessed-1]) + } + + blockProviderMock.AssertExpectations(t) + }) +} + +func createTestTrackerConfig(t *testing.T, numBlockConfirmations, batchSize, + numOfBlocksToReconcile uint64) *EventTrackerConfig { + t.Helper() + + var stateSyncEvent contractsapi.StateSyncedEvent + + return &EventTrackerConfig{ + RPCEndpoint: "http://some-rpc-url.com", + NumBlockConfirmations: numBlockConfirmations, + SyncBatchSize: batchSize, + NumOfBlocksToReconcile: numOfBlocksToReconcile, + PollInterval: 2 * time.Second, + Logger: hclog.NewNullLogger(), + LogFilter: map[ethgo.Address][]ethgo.Hash{ + ethgo.ZeroAddress: {stateSyncEvent.Sig()}, + }, + EventSubscriber: new(mockEventSubscriber), + BlockProvider: new(mockProvider), + } +} diff --git a/consensus/polybft/polybft.go b/consensus/polybft/polybft.go index 9e77c545c1..0093db3ed9 100644 --- a/consensus/polybft/polybft.go +++ b/consensus/polybft/polybft.go @@ -543,16 +543,19 @@ func (p *Polybft) Start() error { // initRuntime creates consensus runtime func (p *Polybft) initRuntime() error { runtimeConfig := &runtimeConfig{ - PolyBFTConfig: p.consensusConfig, - Key: p.key, - DataDir: p.dataDir, - State: p.state, - blockchain: p.blockchain, - polybftBackend: p, - txPool: p.txPool, - bridgeTopic: p.bridgeTopic, - numBlockConfirmations: p.config.NumBlockConfirmations, - consensusConfig: p.config.Config, + PolyBFTConfig: p.consensusConfig, + Key: p.key, + DataDir: p.dataDir, + State: p.state, + blockchain: p.blockchain, + polybftBackend: p, + txPool: p.txPool, + bridgeTopic: p.bridgeTopic, + consensusConfig: p.config.Config, + // event tracker + numBlockConfirmations: p.config.NumBlockConfirmations, + trackerSyncBatchSize: p.config.TrackerSyncBatchSize, + trackerBlocksToReconcile: p.config.TrackerBlocksToReconcile, } runtime, err := newConsensusRuntime(p.logger, runtimeConfig) diff --git a/consensus/polybft/state_sync_manager.go b/consensus/polybft/state_sync_manager.go index d29680a4f4..ae92b4cd81 100644 --- a/consensus/polybft/state_sync_manager.go +++ b/consensus/polybft/state_sync_manager.go @@ -1,7 +1,6 @@ package polybft import ( - "context" "encoding/hex" "encoding/json" "errors" @@ -12,16 +11,17 @@ import ( "github.com/0xPolygon/polygon-edge/consensus/polybft/bitmap" "github.com/0xPolygon/polygon-edge/consensus/polybft/contractsapi" + "github.com/0xPolygon/polygon-edge/consensus/polybft/eventtracker" polybftProto "github.com/0xPolygon/polygon-edge/consensus/polybft/proto" bls "github.com/0xPolygon/polygon-edge/consensus/polybft/signer" "github.com/0xPolygon/polygon-edge/consensus/polybft/validator" "github.com/0xPolygon/polygon-edge/consensus/polybft/wallet" "github.com/0xPolygon/polygon-edge/contracts" - "github.com/0xPolygon/polygon-edge/tracker" "github.com/0xPolygon/polygon-edge/types" "github.com/hashicorp/go-hclog" "github.com/libp2p/go-libp2p/core/peer" "github.com/umbracle/ethgo" + "github.com/umbracle/ethgo/jsonrpc" bolt "go.etcd.io/bbolt" "google.golang.org/protobuf/proto" ) @@ -73,14 +73,17 @@ func (d *dummyStateSyncManager) ProcessLog(header *types.Header, // stateSyncConfig holds the configuration data of state sync manager type stateSyncConfig struct { - stateSenderAddr types.Address - stateSenderStartBlock uint64 - jsonrpcAddr string - dataDir string - topic topic - key *wallet.Key - maxCommitmentSize uint64 + stateSenderAddr types.Address + stateSenderStartBlock uint64 + jsonrpcAddr string + dataDir string + topic topic + key *wallet.Key + maxCommitmentSize uint64 + // event tracker numBlockConfirmations uint64 + trackerSyncBatchSize uint64 + trackerBlocksToReconcile uint64 blockTrackerPollInterval time.Duration } @@ -103,6 +106,7 @@ type stateSyncManager struct { nextCommittedIndex uint64 runtime Runtime + tracker *eventtracker.EventTracker } // topic is an interface for p2p message gossiping @@ -138,28 +142,46 @@ func (s *stateSyncManager) Init() error { func (s *stateSyncManager) Close() { close(s.closeCh) + + if s.tracker != nil { + s.tracker.Close() + } } -// initTracker starts a new event tracker (to receive new state sync events) +// initTracker sets up and starts the event tracker implementation func (s *stateSyncManager) initTracker() error { - ctx, cancelFn := context.WithCancel(context.Background()) - - evtTracker := tracker.NewEventTracker( - path.Join(s.config.dataDir, "/deposit.db"), - s.config.jsonrpcAddr, - ethgo.Address(s.config.stateSenderAddr), - s, - s.config.numBlockConfirmations, - s.config.stateSenderStartBlock, - s.logger, - s.config.blockTrackerPollInterval) - - go func() { - <-s.closeCh - cancelFn() - }() - - return evtTracker.Start(ctx) + store, err := eventtracker.NewBoltDBEventTrackerStore(path.Join(s.config.dataDir, "/deposit.db")) + if err != nil { + return err + } + + clt, err := jsonrpc.NewClient(s.config.jsonrpcAddr) + if err != nil { + return err + } + + var stateSyncEvent contractsapi.StateSyncedEvent + + tracker, err := eventtracker.NewEventTracker(&eventtracker.EventTrackerConfig{ + RPCEndpoint: s.config.jsonrpcAddr, + NumBlockConfirmations: s.config.numBlockConfirmations, + SyncBatchSize: s.config.trackerSyncBatchSize, + NumOfBlocksToReconcile: s.config.trackerBlocksToReconcile, + PollInterval: s.config.blockTrackerPollInterval, + Logger: s.logger, + EventSubscriber: s, + BlockProvider: clt.Eth(), + LogFilter: map[ethgo.Address][]ethgo.Hash{ + ethgo.Address(s.config.stateSenderAddr): {stateSyncEvent.Sig()}, + }, + }, store, s.config.stateSenderStartBlock) + if err != nil { + return err + } + + s.tracker = tracker + + return tracker.Start() } // initTransport subscribes to bridge topics (getting votes for commitments) diff --git a/consensus/polybft/state_sync_manager_test.go b/consensus/polybft/state_sync_manager_test.go index 4748f074ac..0088b324f7 100644 --- a/consensus/polybft/state_sync_manager_test.go +++ b/consensus/polybft/state_sync_manager_test.go @@ -5,14 +5,12 @@ import ( "math/rand" "os" "testing" - "time" "github.com/hashicorp/go-hclog" "github.com/libp2p/go-libp2p/core/peer" "github.com/stretchr/testify/require" "github.com/umbracle/ethgo" "github.com/umbracle/ethgo/abi" - "github.com/umbracle/ethgo/testutil" "google.golang.org/protobuf/proto" "github.com/0xPolygon/polygon-edge/consensus/polybft/contractsapi" @@ -476,49 +474,6 @@ func TestStateSyncerManager_AddLog_BuildCommitments(t *testing.T) { }) } -func TestStateSyncerManager_EventTracker_Sync(t *testing.T) { - t.Parallel() - - vals := validator.NewTestValidators(t, 5) - s := newTestStateSyncManager(t, vals.GetValidator("0"), &mockRuntime{isActiveValidator: true}) - - server := testutil.DeployTestServer(t, nil) - - // Deploy contract - contractReceipt, err := server.SendTxn(ðgo.Transaction{ - Input: contractsapi.StateSender.Bytecode, - }) - require.NoError(t, err) - - // Create contract function call payload - encodedSyncStateData, err := (&contractsapi.SyncStateStateSenderFn{ - Receiver: types.BytesToAddress(server.Account(0).Bytes()), - Data: []byte{}, - }).EncodeAbi() - require.NoError(t, err) - - // prefill with 10 events - for i := 0; i < 10; i++ { - receipt, err := server.SendTxn(ðgo.Transaction{ - To: &contractReceipt.ContractAddress, - Input: encodedSyncStateData, - }) - require.NoError(t, err) - require.Equal(t, uint64(types.ReceiptSuccess), receipt.Status) - } - - s.config.stateSenderAddr = types.Address(contractReceipt.ContractAddress) - s.config.jsonrpcAddr = server.HTTPAddr() - - require.NoError(t, s.initTracker()) - - time.Sleep(2 * time.Second) - - events, err := s.state.StateSyncStore.getStateSyncEventsForCommitment(1, 10, nil) - require.NoError(t, err) - require.Len(t, events, 10) -} - func TestStateSyncManager_Close(t *testing.T) { t.Parallel() diff --git a/server/config.go b/server/config.go index 63240f6ddf..49c79e053f 100644 --- a/server/config.go +++ b/server/config.go @@ -44,8 +44,12 @@ type Config struct { Relayer bool - NumBlockConfirmations uint64 - MetricsInterval time.Duration + MetricsInterval time.Duration + + // event tracker + NumBlockConfirmations uint64 + TrackerSyncBatchSize uint64 + TrackerBlocksToReconcile uint64 } // Telemetry holds the config details for metric services diff --git a/server/server.go b/server/server.go index e88e3f27ca..dba5196c76 100644 --- a/server/server.go +++ b/server/server.go @@ -563,18 +563,21 @@ func (s *Server) setupConsensus() error { consensus, err := engine( &consensus.Params{ - Context: context.Background(), - Config: config, - TxPool: s.txpool, - Network: s.network, - Blockchain: s.blockchain, - Executor: s.executor, - Grpc: s.grpcServer, - Logger: s.logger, - SecretsManager: s.secretsManager, - BlockTime: uint64(blockTime.Seconds()), - NumBlockConfirmations: s.config.NumBlockConfirmations, - MetricsInterval: s.config.MetricsInterval, + Context: context.Background(), + Config: config, + TxPool: s.txpool, + Network: s.network, + Blockchain: s.blockchain, + Executor: s.executor, + Grpc: s.grpcServer, + Logger: s.logger, + SecretsManager: s.secretsManager, + BlockTime: uint64(blockTime.Seconds()), + MetricsInterval: s.config.MetricsInterval, + // event tracker + NumBlockConfirmations: s.config.NumBlockConfirmations, + TrackerSyncBatchSize: s.config.TrackerSyncBatchSize, + TrackerBlocksToReconcile: s.config.TrackerBlocksToReconcile, }, )