diff --git a/.golangci.yml b/.golangci.yml index ae62dda..65ff741 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -50,12 +50,8 @@ linters-settings: - regexpMust - appendAssign - ifElseChain - maligned: - # print struct with more effective memory layout or not, false by default - suggest-new: true nolintlint: allow-unused: false - allow-leading-space: true require-explanation: false require-specific: false revive: diff --git a/internal/relayertest/constants.go b/internal/relayertest/constants.go index e89dcb0..b13a059 100644 --- a/internal/relayertest/constants.go +++ b/internal/relayertest/constants.go @@ -21,11 +21,10 @@ var CustomCfgTextWithTimeStr string var CustomCfg = falcon.Config{ Global: falcon.GlobalConfig{ - CheckingPacketInterval: 1 * time.Minute, - SyncTunnelsInterval: 5 * time.Minute, - MaxCheckingPacketPenaltyDuration: 1 * time.Hour, - PenaltyExponentialFactor: 1.1, - LogLevel: "info", + CheckingPacketInterval: 1 * time.Minute, + SyncTunnelsInterval: 5 * time.Minute, + PenaltySkipRounds: 3, + LogLevel: "info", }, BandChain: band.Config{ RpcEndpoints: []string{"http://localhost:26657", "http://localhost:26658"}, diff --git a/internal/relayertest/testdata/custom_config.toml b/internal/relayertest/testdata/custom_config.toml index c677a19..f1a5ce3 100644 --- a/internal/relayertest/testdata/custom_config.toml +++ b/internal/relayertest/testdata/custom_config.toml @@ -2,8 +2,7 @@ log_level = 'info' checking_packet_interval = 60000000000 sync_tunnels_interval = 300000000000 -max_checking_packet_penalty_duration = 3600000000000 -penalty_exponential_factor = 1.1 +penalty_skip_rounds = 3 [bandchain] rpc_endpoints = ['http://localhost:26657', 'http://localhost:26658'] diff --git a/internal/relayertest/testdata/custom_config_with_time_str.toml b/internal/relayertest/testdata/custom_config_with_time_str.toml index 2475f31..e1b465b 100644 --- a/internal/relayertest/testdata/custom_config_with_time_str.toml +++ b/internal/relayertest/testdata/custom_config_with_time_str.toml @@ -2,8 +2,7 @@ log_level = 'info' checking_packet_interval = '1m' sync_tunnels_interval = '5m' -max_checking_packet_penalty_duration = '1h' -penalty_exponential_factor = 1.1 +penalty_skip_rounds = 3 [bandchain] rpc_endpoints = ['http://localhost:26657', 'http://localhost:26658'] diff --git a/internal/relayertest/testdata/default_config.toml b/internal/relayertest/testdata/default_config.toml index 2cbafd4..8202fd8 100644 --- a/internal/relayertest/testdata/default_config.toml +++ b/internal/relayertest/testdata/default_config.toml @@ -2,8 +2,7 @@ log_level = 'info' checking_packet_interval = 60000000000 sync_tunnels_interval = 300000000000 -max_checking_packet_penalty_duration = 3600000000000 -penalty_exponential_factor = 1.0 +penalty_skip_rounds = 3 [bandchain] rpc_endpoints = ['http://localhost:26657'] diff --git a/internal/relayertest/testdata/default_with_chain_config.toml b/internal/relayertest/testdata/default_with_chain_config.toml index 7f6f637..54a8b48 100644 --- a/internal/relayertest/testdata/default_with_chain_config.toml +++ b/internal/relayertest/testdata/default_with_chain_config.toml @@ -2,8 +2,7 @@ log_level = 'info' checking_packet_interval = 60000000000 sync_tunnels_interval = 300000000000 -max_checking_packet_penalty_duration = 3600000000000 -penalty_exponential_factor = 1.0 +penalty_skip_rounds = 3 [bandchain] rpc_endpoints = ['http://localhost:26657'] diff --git a/relayer/app.go b/relayer/app.go index 1e4e441..4e80b31 100644 --- a/relayer/app.go +++ b/relayer/app.go @@ -580,8 +580,7 @@ func (a *App) Start(ctx context.Context, tunnelIDs []uint64) error { tunnelRelayers, a.Config.Global.CheckingPacketInterval, a.Config.Global.SyncTunnelsInterval, - a.Config.Global.MaxCheckingPacketPenaltyDuration, - a.Config.Global.PenaltyExponentialFactor, + a.Config.Global.PenaltySkipRounds, isSyncTunnelsAllowed, a.BandClient, a.TargetChains, @@ -624,7 +623,9 @@ func (a *App) Relay(ctx context.Context, tunnelID uint64) error { chainProvider, ) - return tr.CheckAndRelay(ctx) + _, err = tr.CheckAndRelay(ctx) + + return err } // GetTunnels retrieves the list of tunnels by given tunnel IDs. If no tunnel ID is provided, diff --git a/relayer/config.go b/relayer/config.go index a0fe8c5..c29acda 100644 --- a/relayer/config.go +++ b/relayer/config.go @@ -16,11 +16,10 @@ import ( // GlobalConfig is the global configuration for the falcon tunnel relayer type GlobalConfig struct { - LogLevel string `mapstructure:"log_level" toml:"log_level"` - CheckingPacketInterval time.Duration `mapstructure:"checking_packet_interval" toml:"checking_packet_interval"` - SyncTunnelsInterval time.Duration `mapstructure:"sync_tunnels_interval" toml:"sync_tunnels_interval"` - MaxCheckingPacketPenaltyDuration time.Duration `mapstructure:"max_checking_packet_penalty_duration" toml:"max_checking_packet_penalty_duration"` - PenaltyExponentialFactor float64 `mapstructure:"penalty_exponential_factor" toml:"penalty_exponential_factor"` + LogLevel string `mapstructure:"log_level" toml:"log_level"` + CheckingPacketInterval time.Duration `mapstructure:"checking_packet_interval" toml:"checking_packet_interval"` + SyncTunnelsInterval time.Duration `mapstructure:"sync_tunnels_interval" toml:"sync_tunnels_interval"` + PenaltySkipRounds uint `mapstructure:"penalty_skip_rounds" toml:"penalty_skip_rounds"` } // Config defines the configuration for the falcon tunnel relayer. @@ -134,11 +133,10 @@ func DefaultConfig() *Config { }, TargetChains: make(map[string]chains.ChainProviderConfig), Global: GlobalConfig{ - LogLevel: "info", - CheckingPacketInterval: time.Minute, - SyncTunnelsInterval: 5 * time.Minute, - MaxCheckingPacketPenaltyDuration: time.Hour, - PenaltyExponentialFactor: 1.0, + LogLevel: "info", + CheckingPacketInterval: time.Minute, + PenaltySkipRounds: 3, + SyncTunnelsInterval: 5 * time.Minute, }, } } diff --git a/relayer/scheduler.go b/relayer/scheduler.go index 422b074..7e70fc6 100644 --- a/relayer/scheduler.go +++ b/relayer/scheduler.go @@ -10,20 +10,16 @@ import ( "github.com/bandprotocol/falcon/relayer/chains" ) -const penaltyTaskChSize = 1000 - // Scheduler is a struct to manage all tunnel relayers type Scheduler struct { - Log *zap.Logger - TunnelRelayers []*TunnelRelayer - CheckingPacketInterval time.Duration - SyncTunnelsInterval time.Duration - MaxCheckingPacketPenaltyDuration time.Duration - ExponentialFactor float64 - - isErrorOnHolds []bool + Log *zap.Logger + TunnelRelayers []*TunnelRelayer + CheckingPacketInterval time.Duration + SyncTunnelsInterval time.Duration + PenaltySkipRounds uint + + PenaltySkipRemaining []uint isSyncTunnelsAllowed bool - penaltyTaskCh chan Task BandClient band.Client ChainProviders chains.ChainProviders @@ -35,24 +31,21 @@ func NewScheduler( tunnelRelayers []*TunnelRelayer, checkingPacketInterval time.Duration, syncTunnelsInterval time.Duration, - maxCheckingPacketPenaltyDuration time.Duration, - exponentialFactor float64, + penaltyAttempts uint, isSyncTunnelsAllowed bool, bandClient band.Client, chainProviders chains.ChainProviders, ) *Scheduler { return &Scheduler{ - Log: log, - TunnelRelayers: tunnelRelayers, - CheckingPacketInterval: checkingPacketInterval, - SyncTunnelsInterval: syncTunnelsInterval, - MaxCheckingPacketPenaltyDuration: maxCheckingPacketPenaltyDuration, - ExponentialFactor: exponentialFactor, - isErrorOnHolds: make([]bool, len(tunnelRelayers)), - isSyncTunnelsAllowed: isSyncTunnelsAllowed, - penaltyTaskCh: make(chan Task, penaltyTaskChSize), - BandClient: bandClient, - ChainProviders: chainProviders, + Log: log, + TunnelRelayers: tunnelRelayers, + CheckingPacketInterval: checkingPacketInterval, + SyncTunnelsInterval: syncTunnelsInterval, + PenaltySkipRounds: penaltyAttempts, + PenaltySkipRemaining: make([]uint, len(tunnelRelayers)), + isSyncTunnelsAllowed: isSyncTunnelsAllowed, + BandClient: bandClient, + ChainProviders: chainProviders, } } @@ -77,16 +70,6 @@ func (s *Scheduler) Start(ctx context.Context) error { s.SyncTunnels(ctx) case <-ticker.C: s.Execute(ctx) - case task := <-s.penaltyTaskCh: - // Execute the task with penalty waiting period - go func(task Task) { - executeFn := func(ctx context.Context, t Task) { - s.isErrorOnHolds[task.RelayerID] = false - s.TriggerTunnelRelayer(ctx, task) - } - - task.Wait(ctx, executeFn) - }(task) } } } @@ -95,12 +78,14 @@ func (s *Scheduler) Start(ctx context.Context) error { func (s *Scheduler) Execute(ctx context.Context) { // Execute the task for each tunnel relayer for i, tr := range s.TunnelRelayers { - if s.isErrorOnHolds[i] { + if s.PenaltySkipRemaining[i] > 0 { s.Log.Debug( - "Skipping this tunnel: the operation is on hold due to error on last round.", + "Skipping tunnel execution due to penalty from previous failure.", zap.Uint64("tunnel_id", tr.TunnelID), zap.Int("relayer_id", i), + zap.Uint("penalty_skip_remaining", s.PenaltySkipRemaining[i]), ) + s.PenaltySkipRemaining[i] -= 1 continue } @@ -115,21 +100,9 @@ func (s *Scheduler) Execute(ctx context.Context) { func (s *Scheduler) TriggerTunnelRelayer(ctx context.Context, task Task) { tr := s.TunnelRelayers[task.RelayerID] - // if the tunnel relayer is executing, skip the round - if tr.IsExecuting() { - s.Log.Debug( - "Skipping this tunnel: tunnel relayer is executing on another process", - zap.Uint64("tunnel_id", tr.TunnelID), - ) - return - } - - s.Log.Info("Executing task", zap.Uint64("tunnel_id", tr.TunnelID)) - // Check and relay the packet, if error occurs, set the error flag. - if err := tr.CheckAndRelay(ctx); err != nil { - s.isErrorOnHolds[task.RelayerID] = true - newInterval := s.calculatePenaltyInterval(task.WaitingInterval) + if isExecuting, err := tr.CheckAndRelay(ctx); err != nil && !isExecuting { + s.PenaltySkipRemaining[task.RelayerID] = s.PenaltySkipRounds s.Log.Error( "Failed to execute, Penalty for the tunnel relayer", @@ -137,14 +110,9 @@ func (s *Scheduler) TriggerTunnelRelayer(ctx context.Context, task Task) { zap.Uint64("tunnel_id", tr.TunnelID), ) - newTask := NewTask(task.RelayerID, newInterval) - - s.penaltyTaskCh <- newTask return } - // If the task is successful, reset the error flag. - s.isErrorOnHolds[task.RelayerID] = false s.Log.Info( "Tunnel relayer finished execution", zap.Uint64("tunnel_id", tr.TunnelID), @@ -191,7 +159,7 @@ func (s *Scheduler) SyncTunnels(ctx context.Context) { ) s.TunnelRelayers = append(s.TunnelRelayers, &tr) - s.isErrorOnHolds = append(s.isErrorOnHolds, false) + s.PenaltySkipRemaining = append(s.PenaltySkipRemaining, 0) s.Log.Info( "New tunnel synchronized successfully", zap.String("chain_name", tunnels[i].TargetChainID), @@ -200,15 +168,6 @@ func (s *Scheduler) SyncTunnels(ctx context.Context) { } } -// calculatePenaltyInterval applies exponential backoff with a max limit -func (s *Scheduler) calculatePenaltyInterval(interval time.Duration) time.Duration { - newInterval := time.Duration(float64(interval) * s.ExponentialFactor) - if newInterval > s.MaxCheckingPacketPenaltyDuration { - newInterval = s.MaxCheckingPacketPenaltyDuration - } - return newInterval -} - // Task is a struct to manage the task for the tunnel relayer type Task struct { RelayerID int @@ -222,13 +181,3 @@ func NewTask(relayerID int, waitingInterval time.Duration) Task { WaitingInterval: waitingInterval, } } - -// Wait waits for the task to be executed -func (t Task) Wait(ctx context.Context, executeFn func(ctx context.Context, t Task)) { - select { - case <-ctx.Done(): - // Do nothing - case <-time.After(t.WaitingInterval): - executeFn(ctx, t) - } -} diff --git a/relayer/tunnel_relayer.go b/relayer/tunnel_relayer.go index cdee2c5..1431229 100644 --- a/relayer/tunnel_relayer.go +++ b/relayer/tunnel_relayer.go @@ -3,6 +3,7 @@ package relayer import ( "context" "fmt" + "sync" "time" "go.uber.org/zap" @@ -21,7 +22,7 @@ type TunnelRelayer struct { BandClient band.Client TargetChainProvider chains.ChainProvider - isExecuting bool + mu *sync.Mutex } // NewTunnelRelayer creates a new TunnelRelayer @@ -40,15 +41,22 @@ func NewTunnelRelayer( CheckingPacketInterval: checkingPacketInterval, BandClient: bandClient, TargetChainProvider: targetChainProvider, - isExecuting: false, + mu: &sync.Mutex{}, } } // CheckAndRelay checks the tunnel and relays the packet -func (t *TunnelRelayer) CheckAndRelay(ctx context.Context) (err error) { - t.isExecuting = true +func (t *TunnelRelayer) CheckAndRelay(ctx context.Context) (isExecuting bool, err error) { + if !t.mu.TryLock() { + // if the tunnel relayer is executing, skip the round + t.Log.Debug( + "Skipping this tunnel: tunnel relayer is executing on another process", + zap.Uint64("tunnel_id", t.TunnelID), + ) + return true, nil + } defer func() { - t.isExecuting = false + t.mu.Unlock() // Recover from panic if r := recover(); r != nil { @@ -60,29 +68,31 @@ func (t *TunnelRelayer) CheckAndRelay(ctx context.Context) (err error) { } }() + t.Log.Info("Executing task", zap.Uint64("tunnel_id", t.TunnelID)) + for { // Query tunnel info from BandChain tunnelBandInfo, err := t.BandClient.GetTunnel(ctx, t.TunnelID) if err != nil { t.Log.Error("Failed to get tunnel", zap.Error(err)) - return err + return false, err } // Query tunnel info from TargetChain tunnelChainInfo, err := t.TargetChainProvider.QueryTunnelInfo(ctx, t.TunnelID, t.ContractAddress) if err != nil { - return err + return false, err } if !tunnelChainInfo.IsActive { t.Log.Info("Tunnel is not active on target chain") - return nil + return false, nil } // end process if current packet is already relayed seq := tunnelChainInfo.LatestSequence + 1 if tunnelBandInfo.LatestSequence < seq { t.Log.Info("No new packet to relay", zap.Uint64("sequence", tunnelChainInfo.LatestSequence)) - return nil + return false, nil } t.Log.Info("Relaying packet", zap.Uint64("sequence", seq)) @@ -91,13 +101,14 @@ func (t *TunnelRelayer) CheckAndRelay(ctx context.Context) (err error) { packet, err := t.BandClient.GetTunnelPacket(ctx, t.TunnelID, seq) if err != nil { t.Log.Error("Failed to get packet", zap.Error(err), zap.Uint64("sequence", seq)) - return err + return false, err } // Check signing status; if it is waiting, wait for the completion of the EVM signature. // If it is not success (Failed or Undefined), return error. signing := packet.CurrentGroupSigning - if signing == nil { + if signing == nil || + signing.SigningStatus == tsstypes.SIGNING_STATUS_FALLEN { signing = packet.IncomingGroupSigning } @@ -106,23 +117,19 @@ func (t *TunnelRelayer) CheckAndRelay(ctx context.Context) (err error) { "The current packet must wait for the completion of the EVM signature", zap.Uint64("sequence", seq), ) - return nil + return false, nil } else if signing.SigningStatus != tsstypes.SIGNING_STATUS_SUCCESS { err := fmt.Errorf("signing status is not success") t.Log.Error("Failed to relay packet", zap.Error(err), zap.Uint64("sequence", seq)) - return err + return false, err } // Relay the packet to the target chain if err := t.TargetChainProvider.RelayPacket(ctx, packet); err != nil { t.Log.Error("Failed to relay packet", zap.Error(err), zap.Uint64("sequence", seq)) - return err + return false, err } t.Log.Info("Successfully relayed packet", zap.Uint64("sequence", seq)) } } - -func (t *TunnelRelayer) IsExecuting() bool { - return t.isExecuting -} diff --git a/relayer/tunnel_relayer_test.go b/relayer/tunnel_relayer_test.go index 430ce52..6c033be 100644 --- a/relayer/tunnel_relayer_test.go +++ b/relayer/tunnel_relayer_test.go @@ -85,7 +85,11 @@ func (s *TunnelRelayerTestSuite) mockQueryTunnelInfo(sequence uint64, isActive b } // Helper function to create a mock Packet. -func createMockPacket(tunnelID, sequence uint64, status tss.SigningStatus) *bandtypes.Packet { +func createMockPacket( + tunnelID, sequence uint64, + currentStatus int32, + incomingStatus int32, +) *bandtypes.Packet { signalPrices := []bandtypes.SignalPrice{ {SignalID: "signal1", Price: 100}, {SignalID: "signal2", Price: 200}, @@ -94,20 +98,33 @@ func createMockPacket(tunnelID, sequence uint64, status tss.SigningStatus) *band cmbytes.HexBytes("0x1234"), cmbytes.HexBytes("0xabcd"), ) + var currentGroupSigning *bandtypes.Signing + var incomingGroupSigning *bandtypes.Signing - signing := bandtypes.NewSigning( - 1, - cmbytes.HexBytes("0xdeadbeef"), - evmSignature, - status, - ) + if currentStatus != -1 { + currentGroupSigning = bandtypes.NewSigning( + 1, + cmbytes.HexBytes("0xdeadbeef"), + evmSignature, + tss.SigningStatus(currentStatus), + ) + } + + if incomingStatus != -1 { + incomingGroupSigning = bandtypes.NewSigning( + 1, + cmbytes.HexBytes("0xdeadbeef"), + evmSignature, + tss.SigningStatus(incomingStatus), + ) + } return bandtypes.NewPacket( tunnelID, sequence, signalPrices, - signing, - nil, + currentGroupSigning, + incomingGroupSigning, ) } @@ -126,7 +143,8 @@ func (s *TunnelRelayerTestSuite) TestCheckAndRelay() { packet := createMockPacket( s.tunnelRelayer.TunnelID, defaultTargetChainSequence+1, - tss.SIGNING_STATUS_SUCCESS, + int32(tss.SIGNING_STATUS_SUCCESS), + -1, ) s.client.EXPECT(). GetTunnelPacket(gomock.Any(), s.tunnelRelayer.TunnelID, defaultTargetChainSequence+1). @@ -186,7 +204,29 @@ func (s *TunnelRelayerTestSuite) TestCheckAndRelay() { err: fmt.Errorf("failed to get packet"), }, { - name: "signing status fallen", + name: "fallen signing status of current group but incoming group success", + preprocess: func() { + s.mockGetTunnel(defaultBandLatestSequence) + s.mockQueryTunnelInfo(defaultTargetChainSequence, true) + + packet := createMockPacket( + s.tunnelRelayer.TunnelID, + defaultTargetChainSequence+1, + int32(tss.SIGNING_STATUS_FALLEN), + int32(tss.SIGNING_STATUS_SUCCESS), + ) + s.client.EXPECT(). + GetTunnelPacket(gomock.Any(), s.tunnelRelayer.TunnelID, defaultTargetChainSequence+1). + Return(packet, nil) + s.chainProvider.EXPECT().RelayPacket(gomock.Any(), packet).Return(nil) + + // Check and relay the packet for the second time + s.mockGetTunnel(defaultBandLatestSequence) + s.mockQueryTunnelInfo(defaultTargetChainSequence+1, true) + }, + }, + { + name: "incoming group signing status fallen", preprocess: func() { s.mockGetTunnel(defaultBandLatestSequence) s.mockQueryTunnelInfo(defaultTargetChainSequence, true) @@ -194,7 +234,8 @@ func (s *TunnelRelayerTestSuite) TestCheckAndRelay() { packet := createMockPacket( s.tunnelRelayer.TunnelID, defaultTargetChainSequence+1, - tss.SIGNING_STATUS_FALLEN, + int32(tss.SIGNING_STATUS_FALLEN), + int32(tss.SIGNING_STATUS_FALLEN), ) s.client.EXPECT(). @@ -212,7 +253,8 @@ func (s *TunnelRelayerTestSuite) TestCheckAndRelay() { packet := createMockPacket( s.tunnelRelayer.TunnelID, defaultTargetChainSequence+1, - tss.SIGNING_STATUS_WAITING, + int32(tss.SIGNING_STATUS_WAITING), + -1, ) s.client.EXPECT(). @@ -230,7 +272,8 @@ func (s *TunnelRelayerTestSuite) TestCheckAndRelay() { packet := createMockPacket( s.tunnelRelayer.TunnelID, defaultTargetChainSequence+1, - tss.SIGNING_STATUS_SUCCESS, + int32(tss.SIGNING_STATUS_SUCCESS), + -1, ) s.client.EXPECT(). @@ -248,9 +291,10 @@ func (s *TunnelRelayerTestSuite) TestCheckAndRelay() { tc.preprocess() } - err := s.tunnelRelayer.CheckAndRelay(s.ctx) + isExecuting, err := s.tunnelRelayer.CheckAndRelay(s.ctx) if tc.err != nil { s.Require().ErrorContains(err, tc.err.Error()) + s.Require().False(isExecuting) } else { s.Require().NoError(err) }