Skip to content

Commit

Permalink
[Feature] Improve tunnel penalty logic (#31)
Browse files Browse the repository at this point in the history
* rm tunnel penalty

* rename testcase name

* change var name

* change var

* add lock

* del isExecuting

* swap return postion

* change lint

* rm malign

* rm aligncheck

* del first mutex

---------

Co-authored-by: Tanut Lertwarachai <[email protected]>
  • Loading branch information
tanut32039 and Tanut Lertwarachai authored Feb 20, 2025
1 parent db08b24 commit 2982260
Show file tree
Hide file tree
Showing 11 changed files with 128 additions and 138 deletions.
4 changes: 0 additions & 4 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,8 @@ linters-settings:
- regexpMust
- appendAssign
- ifElseChain
maligned:
# print struct with more effective memory layout or not, false by default
suggest-new: true
nolintlint:
allow-unused: false
allow-leading-space: true
require-explanation: false
require-specific: false
revive:
Expand Down
9 changes: 4 additions & 5 deletions internal/relayertest/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ var CustomCfgTextWithTimeStr string

var CustomCfg = falcon.Config{
Global: falcon.GlobalConfig{
CheckingPacketInterval: 1 * time.Minute,
SyncTunnelsInterval: 5 * time.Minute,
MaxCheckingPacketPenaltyDuration: 1 * time.Hour,
PenaltyExponentialFactor: 1.1,
LogLevel: "info",
CheckingPacketInterval: 1 * time.Minute,
SyncTunnelsInterval: 5 * time.Minute,
PenaltySkipRounds: 3,
LogLevel: "info",
},
BandChain: band.Config{
RpcEndpoints: []string{"http://localhost:26657", "http://localhost:26658"},
Expand Down
3 changes: 1 addition & 2 deletions internal/relayertest/testdata/custom_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
log_level = 'info'
checking_packet_interval = 60000000000
sync_tunnels_interval = 300000000000
max_checking_packet_penalty_duration = 3600000000000
penalty_exponential_factor = 1.1
penalty_skip_rounds = 3

[bandchain]
rpc_endpoints = ['http://localhost:26657', 'http://localhost:26658']
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
log_level = 'info'
checking_packet_interval = '1m'
sync_tunnels_interval = '5m'
max_checking_packet_penalty_duration = '1h'
penalty_exponential_factor = 1.1
penalty_skip_rounds = 3

[bandchain]
rpc_endpoints = ['http://localhost:26657', 'http://localhost:26658']
Expand Down
3 changes: 1 addition & 2 deletions internal/relayertest/testdata/default_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
log_level = 'info'
checking_packet_interval = 60000000000
sync_tunnels_interval = 300000000000
max_checking_packet_penalty_duration = 3600000000000
penalty_exponential_factor = 1.0
penalty_skip_rounds = 3

[bandchain]
rpc_endpoints = ['http://localhost:26657']
Expand Down
3 changes: 1 addition & 2 deletions internal/relayertest/testdata/default_with_chain_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
log_level = 'info'
checking_packet_interval = 60000000000
sync_tunnels_interval = 300000000000
max_checking_packet_penalty_duration = 3600000000000
penalty_exponential_factor = 1.0
penalty_skip_rounds = 3

[bandchain]
rpc_endpoints = ['http://localhost:26657']
Expand Down
7 changes: 4 additions & 3 deletions relayer/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,8 +580,7 @@ func (a *App) Start(ctx context.Context, tunnelIDs []uint64) error {
tunnelRelayers,
a.Config.Global.CheckingPacketInterval,
a.Config.Global.SyncTunnelsInterval,
a.Config.Global.MaxCheckingPacketPenaltyDuration,
a.Config.Global.PenaltyExponentialFactor,
a.Config.Global.PenaltySkipRounds,
isSyncTunnelsAllowed,
a.BandClient,
a.TargetChains,
Expand Down Expand Up @@ -624,7 +623,9 @@ func (a *App) Relay(ctx context.Context, tunnelID uint64) error {
chainProvider,
)

return tr.CheckAndRelay(ctx)
_, err = tr.CheckAndRelay(ctx)

return err
}

// GetTunnels retrieves the list of tunnels by given tunnel IDs. If no tunnel ID is provided,
Expand Down
18 changes: 8 additions & 10 deletions relayer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ import (

// GlobalConfig is the global configuration for the falcon tunnel relayer
type GlobalConfig struct {
LogLevel string `mapstructure:"log_level" toml:"log_level"`
CheckingPacketInterval time.Duration `mapstructure:"checking_packet_interval" toml:"checking_packet_interval"`
SyncTunnelsInterval time.Duration `mapstructure:"sync_tunnels_interval" toml:"sync_tunnels_interval"`
MaxCheckingPacketPenaltyDuration time.Duration `mapstructure:"max_checking_packet_penalty_duration" toml:"max_checking_packet_penalty_duration"`
PenaltyExponentialFactor float64 `mapstructure:"penalty_exponential_factor" toml:"penalty_exponential_factor"`
LogLevel string `mapstructure:"log_level" toml:"log_level"`
CheckingPacketInterval time.Duration `mapstructure:"checking_packet_interval" toml:"checking_packet_interval"`
SyncTunnelsInterval time.Duration `mapstructure:"sync_tunnels_interval" toml:"sync_tunnels_interval"`
PenaltySkipRounds uint `mapstructure:"penalty_skip_rounds" toml:"penalty_skip_rounds"`
}

// Config defines the configuration for the falcon tunnel relayer.
Expand Down Expand Up @@ -134,11 +133,10 @@ func DefaultConfig() *Config {
},
TargetChains: make(map[string]chains.ChainProviderConfig),
Global: GlobalConfig{
LogLevel: "info",
CheckingPacketInterval: time.Minute,
SyncTunnelsInterval: 5 * time.Minute,
MaxCheckingPacketPenaltyDuration: time.Hour,
PenaltyExponentialFactor: 1.0,
LogLevel: "info",
CheckingPacketInterval: time.Minute,
PenaltySkipRounds: 3,
SyncTunnelsInterval: 5 * time.Minute,
},
}
}
Expand Down
99 changes: 24 additions & 75 deletions relayer/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,16 @@ import (
"github.com/bandprotocol/falcon/relayer/chains"
)

const penaltyTaskChSize = 1000

// Scheduler is a struct to manage all tunnel relayers
type Scheduler struct {
Log *zap.Logger
TunnelRelayers []*TunnelRelayer
CheckingPacketInterval time.Duration
SyncTunnelsInterval time.Duration
MaxCheckingPacketPenaltyDuration time.Duration
ExponentialFactor float64

isErrorOnHolds []bool
Log *zap.Logger
TunnelRelayers []*TunnelRelayer
CheckingPacketInterval time.Duration
SyncTunnelsInterval time.Duration
PenaltySkipRounds uint

PenaltySkipRemaining []uint
isSyncTunnelsAllowed bool
penaltyTaskCh chan Task

BandClient band.Client
ChainProviders chains.ChainProviders
Expand All @@ -35,24 +31,21 @@ func NewScheduler(
tunnelRelayers []*TunnelRelayer,
checkingPacketInterval time.Duration,
syncTunnelsInterval time.Duration,
maxCheckingPacketPenaltyDuration time.Duration,
exponentialFactor float64,
penaltyAttempts uint,
isSyncTunnelsAllowed bool,
bandClient band.Client,
chainProviders chains.ChainProviders,
) *Scheduler {
return &Scheduler{
Log: log,
TunnelRelayers: tunnelRelayers,
CheckingPacketInterval: checkingPacketInterval,
SyncTunnelsInterval: syncTunnelsInterval,
MaxCheckingPacketPenaltyDuration: maxCheckingPacketPenaltyDuration,
ExponentialFactor: exponentialFactor,
isErrorOnHolds: make([]bool, len(tunnelRelayers)),
isSyncTunnelsAllowed: isSyncTunnelsAllowed,
penaltyTaskCh: make(chan Task, penaltyTaskChSize),
BandClient: bandClient,
ChainProviders: chainProviders,
Log: log,
TunnelRelayers: tunnelRelayers,
CheckingPacketInterval: checkingPacketInterval,
SyncTunnelsInterval: syncTunnelsInterval,
PenaltySkipRounds: penaltyAttempts,
PenaltySkipRemaining: make([]uint, len(tunnelRelayers)),
isSyncTunnelsAllowed: isSyncTunnelsAllowed,
BandClient: bandClient,
ChainProviders: chainProviders,
}
}

Expand All @@ -77,16 +70,6 @@ func (s *Scheduler) Start(ctx context.Context) error {
s.SyncTunnels(ctx)
case <-ticker.C:
s.Execute(ctx)
case task := <-s.penaltyTaskCh:
// Execute the task with penalty waiting period
go func(task Task) {
executeFn := func(ctx context.Context, t Task) {
s.isErrorOnHolds[task.RelayerID] = false
s.TriggerTunnelRelayer(ctx, task)
}

task.Wait(ctx, executeFn)
}(task)
}
}
}
Expand All @@ -95,12 +78,14 @@ func (s *Scheduler) Start(ctx context.Context) error {
func (s *Scheduler) Execute(ctx context.Context) {
// Execute the task for each tunnel relayer
for i, tr := range s.TunnelRelayers {
if s.isErrorOnHolds[i] {
if s.PenaltySkipRemaining[i] > 0 {
s.Log.Debug(
"Skipping this tunnel: the operation is on hold due to error on last round.",
"Skipping tunnel execution due to penalty from previous failure.",
zap.Uint64("tunnel_id", tr.TunnelID),
zap.Int("relayer_id", i),
zap.Uint("penalty_skip_remaining", s.PenaltySkipRemaining[i]),
)
s.PenaltySkipRemaining[i] -= 1

continue
}
Expand All @@ -115,36 +100,19 @@ func (s *Scheduler) Execute(ctx context.Context) {
func (s *Scheduler) TriggerTunnelRelayer(ctx context.Context, task Task) {
tr := s.TunnelRelayers[task.RelayerID]

// if the tunnel relayer is executing, skip the round
if tr.IsExecuting() {
s.Log.Debug(
"Skipping this tunnel: tunnel relayer is executing on another process",
zap.Uint64("tunnel_id", tr.TunnelID),
)
return
}

s.Log.Info("Executing task", zap.Uint64("tunnel_id", tr.TunnelID))

// Check and relay the packet, if error occurs, set the error flag.
if err := tr.CheckAndRelay(ctx); err != nil {
s.isErrorOnHolds[task.RelayerID] = true
newInterval := s.calculatePenaltyInterval(task.WaitingInterval)
if isExecuting, err := tr.CheckAndRelay(ctx); err != nil && !isExecuting {
s.PenaltySkipRemaining[task.RelayerID] = s.PenaltySkipRounds

s.Log.Error(
"Failed to execute, Penalty for the tunnel relayer",
zap.Error(err),
zap.Uint64("tunnel_id", tr.TunnelID),
)

newTask := NewTask(task.RelayerID, newInterval)

s.penaltyTaskCh <- newTask
return
}

// If the task is successful, reset the error flag.
s.isErrorOnHolds[task.RelayerID] = false
s.Log.Info(
"Tunnel relayer finished execution",
zap.Uint64("tunnel_id", tr.TunnelID),
Expand Down Expand Up @@ -191,7 +159,7 @@ func (s *Scheduler) SyncTunnels(ctx context.Context) {
)

s.TunnelRelayers = append(s.TunnelRelayers, &tr)
s.isErrorOnHolds = append(s.isErrorOnHolds, false)
s.PenaltySkipRemaining = append(s.PenaltySkipRemaining, 0)
s.Log.Info(
"New tunnel synchronized successfully",
zap.String("chain_name", tunnels[i].TargetChainID),
Expand All @@ -200,15 +168,6 @@ func (s *Scheduler) SyncTunnels(ctx context.Context) {
}
}

// calculatePenaltyInterval applies exponential backoff with a max limit
func (s *Scheduler) calculatePenaltyInterval(interval time.Duration) time.Duration {
newInterval := time.Duration(float64(interval) * s.ExponentialFactor)
if newInterval > s.MaxCheckingPacketPenaltyDuration {
newInterval = s.MaxCheckingPacketPenaltyDuration
}
return newInterval
}

// Task is a struct to manage the task for the tunnel relayer
type Task struct {
RelayerID int
Expand All @@ -222,13 +181,3 @@ func NewTask(relayerID int, waitingInterval time.Duration) Task {
WaitingInterval: waitingInterval,
}
}

// Wait waits for the task to be executed
func (t Task) Wait(ctx context.Context, executeFn func(ctx context.Context, t Task)) {
select {
case <-ctx.Done():
// Do nothing
case <-time.After(t.WaitingInterval):
executeFn(ctx, t)
}
}
Loading

0 comments on commit 2982260

Please sign in to comment.