diff --git a/cmd/flags.go b/cmd/flags.go index ea4d1ca..6fdf32d 100644 --- a/cmd/flags.go +++ b/cmd/flags.go @@ -1,6 +1,7 @@ package cmd const ( - flagHome = "home" - flagFile = "file" + flagHome = "home" + flagFile = "file" + flagMetricsListenAddr = "metrics-listen-addr" ) diff --git a/cmd/start.go b/cmd/start.go index 519bc69..651bc6a 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -31,9 +31,20 @@ $ %s start 1 12 # start relaying data from specific tunnelIDs.`, appName, a tunnelIDs = append(tunnelIDs, tunnelID) } - return app.Start(cmd.Context(), tunnelIDs) + metricsListenAddrFlag, err := cmd.Flags().GetString(flagMetricsListenAddr) + if err != nil { + return err + } + + return app.Start(cmd.Context(), tunnelIDs, metricsListenAddrFlag) }, } + cmd.Flags().String( + flagMetricsListenAddr, + "", + "address to use for metrics server. By default, "+ + "will be the metrics-listen-addr parameter in the global config. ", + ) return cmd } diff --git a/go.mod b/go.mod index 29cbff7..378cff2 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/miguelmota/go-ethereum-hdwallet v0.1.2 github.com/mitchellh/mapstructure v1.5.0 github.com/pelletier/go-toml/v2 v2.2.2 + github.com/prometheus/client_golang v1.20.5 github.com/shopspring/decimal v1.4.0 github.com/spf13/cobra v1.8.1 github.com/spf13/viper v1.19.0 @@ -138,7 +139,6 @@ require ( github.com/petermattis/goid v0.0.0-20231207134359-e60b3f734c67 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect - github.com/prometheus/client_golang v1.20.5 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.55.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect diff --git a/internal/relayermetrics/metrics.go b/internal/relayermetrics/metrics.go new file mode 100644 index 0000000..ee887bc --- /dev/null +++ b/internal/relayermetrics/metrics.go @@ -0,0 +1,220 @@ +package relayermetrics + +import ( + "context" + "fmt" + "net" + "net/http" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_golang/prometheus/promhttp" + "go.uber.org/zap" +) + +// metrics stores the Prometheus metrics instance. +var metrics *PrometheusMetrics + +// globalTelemetryEnabled indicates whether telemetry is enabled globally. +// It is set on initialization and does not change for the lifetime of the program. +var globalTelemetryEnabled bool + +type PrometheusMetrics struct { + PacketsRelayedSuccess *prometheus.CounterVec + UnrelayedPackets *prometheus.GaugeVec + TasksCount *prometheus.CounterVec + TaskExecutionTime *prometheus.SummaryVec + TunnelsPerDestinationChain *prometheus.CounterVec + ActiveTargetContractsCount prometheus.Gauge + TxsCount *prometheus.CounterVec + TxProcessTime *prometheus.SummaryVec + GasUsed *prometheus.SummaryVec +} + +func updateMetrics(updateFn func()) { + if globalTelemetryEnabled { + updateFn() + } +} + +// IncPacketsRelayedSuccess increments the count of successfully relayed packets for a specific tunnel. +func IncPacketsRelayedSuccess(tunnelID uint64) { + updateMetrics(func() { + metrics.PacketsRelayedSuccess.WithLabelValues(fmt.Sprintf("%d", tunnelID)).Inc() + }) +} + +// SetUnrelayedPackets sets the number of unrelayed packets for a specific tunnel. +func SetUnrelayedPackets(tunnelID uint64, unrelayedPackets float64) { + updateMetrics(func() { + metrics.UnrelayedPackets.WithLabelValues(fmt.Sprintf("%d", tunnelID)).Set(unrelayedPackets) + }) +} + +// IncTasksCount increments the total tasks count for a specific tunnel. +func IncTasksCount(tunnelID uint64) { + updateMetrics(func() { + metrics.TasksCount.WithLabelValues(fmt.Sprintf("%d", tunnelID)).Inc() + }) +} + +// ObserveTaskExecutionTime records the execution time of a task for a specific tunnel. +func ObserveTaskExecutionTime(tunnelID uint64, taskExecutionTime float64) { + updateMetrics(func() { + metrics.TaskExecutionTime.WithLabelValues(fmt.Sprintf("%d", tunnelID)).Observe(taskExecutionTime) + }) +} + +// IncTunnelsPerDestinationChain increments the total number of tunnels per specific destination chain. +func IncTunnelsPerDestinationChain(destinationChain string) { + updateMetrics(func() { + metrics.TunnelsPerDestinationChain.WithLabelValues(destinationChain).Inc() + }) +} + +// IncActiveTargetContractsCount increases the count of active target contracts. +func IncActiveTargetContractsCount() { + updateMetrics(func() { + metrics.ActiveTargetContractsCount.Inc() + }) +} + +// DecActiveTargetContractsCount decreases the count of active target contracts. +func DecActiveTargetContractsCount() { + updateMetrics(func() { + metrics.ActiveTargetContractsCount.Dec() + }) +} + +// IncTxsCount increments the transactions count metric for a specific tunnel. +func IncTxsCount(tunnelID uint64) { + updateMetrics(func() { + metrics.TxsCount.WithLabelValues(fmt.Sprintf("%d", tunnelID)).Inc() + }) +} + +// ObserveTxProcessTime tracks transaction processing time in seconds with millisecond precision. +func ObserveTxProcessTime(destinationChain string, txProcessTime float64) { + updateMetrics(func() { + metrics.TxProcessTime.WithLabelValues(destinationChain).Observe(txProcessTime) + }) +} + +// ObserveGasUsed tracks gas used for the each relayed transaction. +func ObserveGasUsed(tunnelID uint64, gasUsed uint64) { + updateMetrics(func() { + metrics.GasUsed.WithLabelValues(fmt.Sprintf("%d", tunnelID)).Observe(float64(gasUsed)) + }) +} + +func InitPrometheusMetrics() { + packetLabels := []string{"tunnel_id"} + taskLabels := []string{"tunnel_id"} + tunnelPerDestinationChainLabels := []string{"destination_chain"} + txLabels := []string{"tunnel_id"} + gasUsedLabels := []string{"tunnel_id"} + + metrics = &PrometheusMetrics{ + PacketsRelayedSuccess: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "falcon_packets_relayed_success", + Help: "Total number of packets successfully relayed from BandChain", + }, packetLabels), + UnrelayedPackets: promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "falcon_unrelayed_packets", + Help: "Number of unrelayed packets (the difference between total packets from BandChain and received packets from the target chain)", + }, packetLabels), + TasksCount: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "falcon_tasks_count", + Help: "Total number of successfully executed tasks", + }, taskLabels), + TaskExecutionTime: promauto.NewSummaryVec(prometheus.SummaryOpts{ + Name: "falcon_task_execution_time", + Help: "Task execution time in milliseconds", + Objectives: map[float64]float64{ + 0.5: 0.05, + 0.9: 0.01, + 0.99: 0.001, + }, + }, taskLabels), + TunnelsPerDestinationChain: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "falcon_tunnels_per_destination_chain", + Help: "Total number of destination chains", + }, tunnelPerDestinationChainLabels), + ActiveTargetContractsCount: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "falcon_active_target_contracts_count", + Help: "Number of active target chain contracts", + }), + TxsCount: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "falcon_txs_count", + Help: "Total number of transactions per tunnel", + }, txLabels), + TxProcessTime: promauto.NewSummaryVec(prometheus.SummaryOpts{ + Name: "falcon_tx_process_time", + Help: "Transaction processing time in milliseconds", + Objectives: map[float64]float64{ + 0.5: 0.05, + 0.9: 0.01, + 0.99: 0.001, + }, + }, txLabels), + GasUsed: promauto.NewSummaryVec(prometheus.SummaryOpts{ + Name: "falcon_gas_used", + Help: "Amount of gas used per transaction", + Objectives: map[float64]float64{ + 0.5: 0.05, + 0.9: 0.01, + 0.99: 0.001, + }, + }, gasUsedLabels), + } +} + +// StartMetricsServer starts a metrics server in a background goroutine, +// accepting connections on the given listener. +// Any HTTP logging will be written at info level to the given logger. +// The server will be forcefully shut down when ctx finishes. +func StartMetricsServer(ctx context.Context, log *zap.Logger, metricsListenAddr string) error { + ln, err := net.Listen("tcp", metricsListenAddr) + if err != nil { + log.Error( + "Failed to start metrics server you can change the address and port using metrics-listen-addr config setting or --metrics-listen-flag", + ) + + return fmt.Errorf("failed to listen on metrics address %q: %w", metricsListenAddr, err) + } + log = log.With(zap.String("sys", "metricshttp")) + log.Info("Metrics server listening", zap.String("addr", metricsListenAddr)) + + // allow for the global telemetry enabled state to be set. + globalTelemetryEnabled = true + + // initialize Prometheus metrics + InitPrometheusMetrics() + + // set up new mux identical to the default mux configuration in net/http/pprof. + mux := http.NewServeMux() + + // serve prometheus metrics + mux.Handle("/metrics", promhttp.Handler()) + + srv := &http.Server{ + Handler: mux, + ErrorLog: zap.NewStdLog(log), + BaseContext: func(net.Listener) context.Context { + return ctx + }, + ReadHeaderTimeout: 5 * time.Second, + } + + go func() { + _ = srv.Serve(ln) + }() + + go func() { + <-ctx.Done() + srv.Close() + }() + + return nil +} diff --git a/internal/relayertest/mocks/band_chain_query.go b/internal/relayertest/mocks/band_chain_query.go index d77f786..3e11396 100644 --- a/internal/relayertest/mocks/band_chain_query.go +++ b/internal/relayertest/mocks/band_chain_query.go @@ -13,8 +13,8 @@ import ( context "context" reflect "reflect" - types "github.com/bandprotocol/falcon/internal/bandchain/bandtss" - types0 "github.com/bandprotocol/falcon/internal/bandchain/tunnel" + bandtss "github.com/bandprotocol/falcon/internal/bandchain/bandtss" + tunnel "github.com/bandprotocol/falcon/internal/bandchain/tunnel" gomock "go.uber.org/mock/gomock" grpc "google.golang.org/grpc" ) @@ -44,14 +44,14 @@ func (m *MockQueryClient) EXPECT() *MockQueryClientMockRecorder { } // Packet mocks base method. -func (m *MockQueryClient) Packet(ctx context.Context, in *types0.QueryPacketRequest, opts ...grpc.CallOption) (*types0.QueryPacketResponse, error) { +func (m *MockQueryClient) Packet(ctx context.Context, in *tunnel.QueryPacketRequest, opts ...grpc.CallOption) (*tunnel.QueryPacketResponse, error) { m.ctrl.T.Helper() varargs := []any{ctx, in} for _, a := range opts { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "Packet", varargs...) - ret0, _ := ret[0].(*types0.QueryPacketResponse) + ret0, _ := ret[0].(*tunnel.QueryPacketResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -64,14 +64,14 @@ func (mr *MockQueryClientMockRecorder) Packet(ctx, in any, opts ...any) *gomock. } // Signing mocks base method. -func (m *MockQueryClient) Signing(ctx context.Context, in *types.QuerySigningRequest, opts ...grpc.CallOption) (*types.QuerySigningResponse, error) { +func (m *MockQueryClient) Signing(ctx context.Context, in *bandtss.QuerySigningRequest, opts ...grpc.CallOption) (*bandtss.QuerySigningResponse, error) { m.ctrl.T.Helper() varargs := []any{ctx, in} for _, a := range opts { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "Signing", varargs...) - ret0, _ := ret[0].(*types.QuerySigningResponse) + ret0, _ := ret[0].(*bandtss.QuerySigningResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -84,14 +84,14 @@ func (mr *MockQueryClientMockRecorder) Signing(ctx, in any, opts ...any) *gomock } // Tunnel mocks base method. -func (m *MockQueryClient) Tunnel(ctx context.Context, in *types0.QueryTunnelRequest, opts ...grpc.CallOption) (*types0.QueryTunnelResponse, error) { +func (m *MockQueryClient) Tunnel(ctx context.Context, in *tunnel.QueryTunnelRequest, opts ...grpc.CallOption) (*tunnel.QueryTunnelResponse, error) { m.ctrl.T.Helper() varargs := []any{ctx, in} for _, a := range opts { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "Tunnel", varargs...) - ret0, _ := ret[0].(*types0.QueryTunnelResponse) + ret0, _ := ret[0].(*tunnel.QueryTunnelResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -104,14 +104,14 @@ func (mr *MockQueryClientMockRecorder) Tunnel(ctx, in any, opts ...any) *gomock. } // Tunnels mocks base method. -func (m *MockQueryClient) Tunnels(ctx context.Context, in *types0.QueryTunnelsRequest, opts ...grpc.CallOption) (*types0.QueryTunnelsResponse, error) { +func (m *MockQueryClient) Tunnels(ctx context.Context, in *tunnel.QueryTunnelsRequest, opts ...grpc.CallOption) (*tunnel.QueryTunnelsResponse, error) { m.ctrl.T.Helper() varargs := []any{ctx, in} for _, a := range opts { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "Tunnels", varargs...) - ret0, _ := ret[0].(*types0.QueryTunnelsResponse) + ret0, _ := ret[0].(*tunnel.QueryTunnelsResponse) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/internal/relayertest/testdata/custom_config.toml b/internal/relayertest/testdata/custom_config.toml index f1a5ce3..cbb30e6 100644 --- a/internal/relayertest/testdata/custom_config.toml +++ b/internal/relayertest/testdata/custom_config.toml @@ -3,6 +3,7 @@ log_level = 'info' checking_packet_interval = 60000000000 sync_tunnels_interval = 300000000000 penalty_skip_rounds = 3 +metrics_listen_addr = '' [bandchain] rpc_endpoints = ['http://localhost:26657', 'http://localhost:26658'] diff --git a/internal/relayertest/testdata/custom_config_with_time_str.toml b/internal/relayertest/testdata/custom_config_with_time_str.toml index e1b465b..3dd98b1 100644 --- a/internal/relayertest/testdata/custom_config_with_time_str.toml +++ b/internal/relayertest/testdata/custom_config_with_time_str.toml @@ -3,6 +3,7 @@ log_level = 'info' checking_packet_interval = '1m' sync_tunnels_interval = '5m' penalty_skip_rounds = 3 +metrics_listen_addr = '' [bandchain] rpc_endpoints = ['http://localhost:26657', 'http://localhost:26658'] diff --git a/internal/relayertest/testdata/default_config.toml b/internal/relayertest/testdata/default_config.toml index 8202fd8..d6847bf 100644 --- a/internal/relayertest/testdata/default_config.toml +++ b/internal/relayertest/testdata/default_config.toml @@ -3,6 +3,7 @@ log_level = 'info' checking_packet_interval = 60000000000 sync_tunnels_interval = 300000000000 penalty_skip_rounds = 3 +metrics_listen_addr = '' [bandchain] rpc_endpoints = ['http://localhost:26657'] diff --git a/internal/relayertest/testdata/default_with_chain_config.toml b/internal/relayertest/testdata/default_with_chain_config.toml index 54a8b48..315c003 100644 --- a/internal/relayertest/testdata/default_with_chain_config.toml +++ b/internal/relayertest/testdata/default_with_chain_config.toml @@ -3,6 +3,7 @@ log_level = 'info' checking_packet_interval = 60000000000 sync_tunnels_interval = 300000000000 penalty_skip_rounds = 3 +metrics_listen_addr = '' [bandchain] rpc_endpoints = ['http://localhost:26657'] diff --git a/relayer/app.go b/relayer/app.go index 4e80b31..697cb48 100644 --- a/relayer/app.go +++ b/relayer/app.go @@ -14,6 +14,7 @@ import ( "go.uber.org/zap" "github.com/bandprotocol/falcon/internal" + "github.com/bandprotocol/falcon/internal/relayermetrics" "github.com/bandprotocol/falcon/relayer/band" bandtypes "github.com/bandprotocol/falcon/relayer/band/types" "github.com/bandprotocol/falcon/relayer/chains" @@ -521,20 +522,27 @@ func (a *App) ValidatePassphrase(envPassphrase string) error { } // Start starts the tunnel relayer program. -func (a *App) Start(ctx context.Context, tunnelIDs []uint64) error { +func (a *App) Start( + ctx context.Context, + tunnelIDs []uint64, + metricsListenAddrFlag string, +) error { a.Log.Info("Starting tunnel relayer") - // query tunnels - tunnels, err := a.getTunnels(ctx, tunnelIDs) - if err != nil { - a.Log.Error("Cannot get tunnels", zap.Error(err)) - } - // validate passphrase if err := a.ValidatePassphrase(a.EnvPassphrase); err != nil { return err } + // setup metrics server + metricsListenAddr := a.Config.Global.MetricsListenAddr + if metricsListenAddrFlag != "" { + metricsListenAddr = metricsListenAddrFlag + } + if err := a.setupMetricsServer(ctx, metricsListenAddr); err != nil { + return err + } + // initialize target chain providers for chainName, chainProvider := range a.TargetChains { if err := chainProvider.LoadFreeSenders(a.HomePath, a.EnvPassphrase); err != nil { @@ -554,39 +562,17 @@ func (a *App) Start(ctx context.Context, tunnelIDs []uint64) error { } } - // initialize the tunnel relayer - tunnelRelayers := []*TunnelRelayer{} - for _, tunnel := range tunnels { - chainProvider, ok := a.TargetChains[tunnel.TargetChainID] - if !ok { - return fmt.Errorf("target chain provider not found: %s", tunnel.TargetChainID) - } - - tr := NewTunnelRelayer( - a.Log, - tunnel.ID, - tunnel.TargetAddress, - a.Config.Global.CheckingPacketInterval, - a.BandClient, - chainProvider, - ) - tunnelRelayers = append(tunnelRelayers, &tr) - } - // start the tunnel relayers - isSyncTunnelsAllowed := (len(tunnelIDs) == 0) scheduler := NewScheduler( a.Log, - tunnelRelayers, a.Config.Global.CheckingPacketInterval, a.Config.Global.SyncTunnelsInterval, a.Config.Global.PenaltySkipRounds, - isSyncTunnelsAllowed, a.BandClient, a.TargetChains, ) - return scheduler.Start(ctx) + return scheduler.Start(ctx, tunnelIDs) } // Relay relays the packet from the source chain to the destination chain. @@ -628,22 +614,18 @@ func (a *App) Relay(ctx context.Context, tunnelID uint64) error { return err } -// GetTunnels retrieves the list of tunnels by given tunnel IDs. If no tunnel ID is provided, -// get all tunnels -func (a *App) getTunnels(ctx context.Context, tunnelIDs []uint64) ([]bandtypes.Tunnel, error) { - if len(tunnelIDs) == 0 { - return a.BandClient.GetTunnels(ctx) - } - - tunnels := make([]bandtypes.Tunnel, 0, len(tunnelIDs)) - for _, tunnelID := range tunnelIDs { - tunnel, err := a.BandClient.GetTunnel(ctx, tunnelID) - if err != nil { - return nil, err - } - - tunnels = append(tunnels, *tunnel) +// setupMetricsServer starts the metrics server if enabled. +func (a *App) setupMetricsServer( + ctx context.Context, + metricsListenAddr string, +) error { + if metricsListenAddr == "" { + a.Log.Warn( + "Metrics server is disabled. It is controlled by the global config, and setting --metrics-listen-addr will override it and enable the server.", + ) + return nil } - return tunnels, nil + // start server + return relayermetrics.StartMetricsServer(ctx, a.Log, metricsListenAddr) } diff --git a/relayer/chains/evm/provider.go b/relayer/chains/evm/provider.go index 9b4f1e3..5e44584 100644 --- a/relayer/chains/evm/provider.go +++ b/relayer/chains/evm/provider.go @@ -16,6 +16,7 @@ import ( "github.com/shopspring/decimal" "go.uber.org/zap" + "github.com/bandprotocol/falcon/internal/relayermetrics" bandtypes "github.com/bandprotocol/falcon/relayer/band/types" "github.com/bandprotocol/falcon/relayer/chains" chainstypes "github.com/bandprotocol/falcon/relayer/chains/types" @@ -192,6 +193,9 @@ func (cp *EVMChainProvider) RelayPacket(ctx context.Context, packet *bandtypes.P createdAt := time.Now() + // increment the transactions count metric for the current tunnel + relayermetrics.IncTxsCount(packet.TunnelID) + log.Info( "Submitted a message; checking transaction status", zap.String("tx_hash", txHash), @@ -221,6 +225,12 @@ func (cp *EVMChainProvider) RelayPacket(ctx context.Context, packet *bandtypes.P txStatus = result.Status switch result.Status { case TX_STATUS_SUCCESS: + // track transaction processing time in seconds with millisecond precision + relayermetrics.ObserveTxProcessTime(cp.ChainName, float64(time.Since(createdAt).Milliseconds())) + + // track gas used for the relayed transaction + relayermetrics.ObserveGasUsed(packet.TunnelID, result.GasUsed.Decimal.BigInt().Uint64()) + log.Info( "Packet is successfully relayed", zap.String("tx_hash", txHash), diff --git a/relayer/config.go b/relayer/config.go index c29acda..369f0ee 100644 --- a/relayer/config.go +++ b/relayer/config.go @@ -20,6 +20,7 @@ type GlobalConfig struct { CheckingPacketInterval time.Duration `mapstructure:"checking_packet_interval" toml:"checking_packet_interval"` SyncTunnelsInterval time.Duration `mapstructure:"sync_tunnels_interval" toml:"sync_tunnels_interval"` PenaltySkipRounds uint `mapstructure:"penalty_skip_rounds" toml:"penalty_skip_rounds"` + MetricsListenAddr string `mapstructure:"metrics_listen_addr" toml:"metrics_listen_addr"` } // Config defines the configuration for the falcon tunnel relayer. diff --git a/relayer/scheduler.go b/relayer/scheduler.go index 7e70fc6..626e498 100644 --- a/relayer/scheduler.go +++ b/relayer/scheduler.go @@ -6,51 +6,53 @@ import ( "go.uber.org/zap" + "github.com/bandprotocol/falcon/internal/relayermetrics" "github.com/bandprotocol/falcon/relayer/band" + bandtypes "github.com/bandprotocol/falcon/relayer/band/types" "github.com/bandprotocol/falcon/relayer/chains" ) // Scheduler is a struct to manage all tunnel relayers type Scheduler struct { Log *zap.Logger - TunnelRelayers []*TunnelRelayer CheckingPacketInterval time.Duration SyncTunnelsInterval time.Duration PenaltySkipRounds uint - PenaltySkipRemaining []uint - isSyncTunnelsAllowed bool - BandClient band.Client ChainProviders chains.ChainProviders + + tunnelRelayers []*TunnelRelayer + bandLatestTunnel int + penaltySkipRemaining []uint } // NewScheduler creates a new Scheduler func NewScheduler( log *zap.Logger, - tunnelRelayers []*TunnelRelayer, checkingPacketInterval time.Duration, syncTunnelsInterval time.Duration, - penaltyAttempts uint, - isSyncTunnelsAllowed bool, + penaltySkipRounds uint, bandClient band.Client, chainProviders chains.ChainProviders, ) *Scheduler { return &Scheduler{ Log: log, - TunnelRelayers: tunnelRelayers, CheckingPacketInterval: checkingPacketInterval, SyncTunnelsInterval: syncTunnelsInterval, - PenaltySkipRounds: penaltyAttempts, - PenaltySkipRemaining: make([]uint, len(tunnelRelayers)), - isSyncTunnelsAllowed: isSyncTunnelsAllowed, + PenaltySkipRounds: penaltySkipRounds, BandClient: bandClient, ChainProviders: chainProviders, + tunnelRelayers: []*TunnelRelayer{}, + bandLatestTunnel: 0, + penaltySkipRemaining: []uint{}, } } // Start starts all tunnel relayers -func (s *Scheduler) Start(ctx context.Context) error { +func (s *Scheduler) Start(ctx context.Context, tunnelIds []uint64) error { + s.SyncTunnels(ctx, tunnelIds) + ticker := time.NewTicker(s.CheckingPacketInterval) defer ticker.Stop() @@ -67,7 +69,9 @@ func (s *Scheduler) Start(ctx context.Context) error { return nil case <-syncTunnelTicker.C: - s.SyncTunnels(ctx) + if len(tunnelIds) == 0 { + s.SyncTunnels(ctx, tunnelIds) + } case <-ticker.C: s.Execute(ctx) } @@ -77,15 +81,15 @@ func (s *Scheduler) Start(ctx context.Context) error { // Execute executes the task for the tunnel relayer func (s *Scheduler) Execute(ctx context.Context) { // Execute the task for each tunnel relayer - for i, tr := range s.TunnelRelayers { - if s.PenaltySkipRemaining[i] > 0 { + for i, tr := range s.tunnelRelayers { + if s.penaltySkipRemaining[i] > 0 { s.Log.Debug( "Skipping tunnel execution due to penalty from previous failure.", zap.Uint64("tunnel_id", tr.TunnelID), zap.Int("relayer_id", i), - zap.Uint("penalty_skip_remaining", s.PenaltySkipRemaining[i]), + zap.Uint("penalty_skip_remaining", s.penaltySkipRemaining[i]), ) - s.PenaltySkipRemaining[i] -= 1 + s.penaltySkipRemaining[i] -= 1 continue } @@ -93,16 +97,21 @@ func (s *Scheduler) Execute(ctx context.Context) { // Execute the task, if error occurs, wait for the next round. task := NewTask(i, s.CheckingPacketInterval) go s.TriggerTunnelRelayer(ctx, task) + + // record metrics for the task execution for the current tunnel relayer + relayermetrics.IncTasksCount(tr.TunnelID) } } // TriggerTunnelRelayer triggers the tunnel relayer to check and relay the packet func (s *Scheduler) TriggerTunnelRelayer(ctx context.Context, task Task) { - tr := s.TunnelRelayers[task.RelayerID] + tr := s.tunnelRelayers[task.RelayerID] + + startExecutionTaskTime := time.Now() // Check and relay the packet, if error occurs, set the error flag. if isExecuting, err := tr.CheckAndRelay(ctx); err != nil && !isExecuting { - s.PenaltySkipRemaining[task.RelayerID] = s.PenaltySkipRounds + s.penaltySkipRemaining[task.RelayerID] = s.PenaltySkipRounds s.Log.Error( "Failed to execute, Penalty for the tunnel relayer", @@ -113,6 +122,12 @@ func (s *Scheduler) TriggerTunnelRelayer(ctx context.Context, task Task) { return } + // record the execution time of successful task. + relayermetrics.ObserveTaskExecutionTime( + tr.TunnelID, + float64(time.Since(startExecutionTaskTime).Milliseconds()), + ) + s.Log.Info( "Tunnel relayer finished execution", zap.Uint64("tunnel_id", tr.TunnelID), @@ -120,25 +135,20 @@ func (s *Scheduler) TriggerTunnelRelayer(ctx context.Context, task Task) { } // SyncTunnels synchronizes the Bandchain's tunnels with the latest tunnels. -func (s *Scheduler) SyncTunnels(ctx context.Context) { - if !s.isSyncTunnelsAllowed { - return - } - - s.Log.Info("Start syncing new tunnels") - tunnels, err := s.BandClient.GetTunnels(ctx) +func (s *Scheduler) SyncTunnels(ctx context.Context, tunnelIds []uint64) { + s.Log.Info("Start syncing tunnels from Bandchain") + tunnels, err := s.getTunnels(ctx, tunnelIds) if err != nil { s.Log.Error("Failed to fetch tunnels from BandChain", zap.Error(err)) return } - oldTunnelCount := len(s.TunnelRelayers) - if oldTunnelCount == len(tunnels) { + if s.bandLatestTunnel == len(tunnels) { s.Log.Info("No new tunnels to sync") return } - for i := oldTunnelCount; i < len(tunnels); i++ { + for i := s.bandLatestTunnel; i < len(tunnels); i++ { chainProvider, ok := s.ChainProviders[tunnels[i].TargetChainID] if !ok { s.Log.Warn( @@ -158,14 +168,40 @@ func (s *Scheduler) SyncTunnels(ctx context.Context) { chainProvider, ) - s.TunnelRelayers = append(s.TunnelRelayers, &tr) - s.PenaltySkipRemaining = append(s.PenaltySkipRemaining, 0) + s.tunnelRelayers = append(s.tunnelRelayers, &tr) + s.penaltySkipRemaining = append(s.penaltySkipRemaining, 0) + + // update the metric for the number of tunnels per destination chain + relayermetrics.IncTunnelsPerDestinationChain(tunnels[i].TargetChainID) + s.Log.Info( "New tunnel synchronized successfully", zap.String("chain_name", tunnels[i].TargetChainID), zap.Uint64("tunnel_id", tunnels[i].ID), ) } + + s.bandLatestTunnel = len(tunnels) +} + +// getTunnels retrieves the list of tunnels by given tunnel IDs. If no tunnel ID is provided, +// get all tunnels +func (s *Scheduler) getTunnels(ctx context.Context, tunnelIDs []uint64) ([]bandtypes.Tunnel, error) { + if len(tunnelIDs) == 0 { + return s.BandClient.GetTunnels(ctx) + } + + tunnels := make([]bandtypes.Tunnel, 0, len(tunnelIDs)) + for _, tunnelID := range tunnelIDs { + tunnel, err := s.BandClient.GetTunnel(ctx, tunnelID) + if err != nil { + return nil, err + } + + tunnels = append(tunnels, *tunnel) + } + + return tunnels, nil } // Task is a struct to manage the task for the tunnel relayer diff --git a/relayer/tunnel_relayer.go b/relayer/tunnel_relayer.go index 1431229..1cd8e26 100644 --- a/relayer/tunnel_relayer.go +++ b/relayer/tunnel_relayer.go @@ -9,6 +9,7 @@ import ( "go.uber.org/zap" tsstypes "github.com/bandprotocol/falcon/internal/bandchain/tss" + "github.com/bandprotocol/falcon/internal/relayermetrics" "github.com/bandprotocol/falcon/relayer/band" "github.com/bandprotocol/falcon/relayer/chains" ) @@ -22,7 +23,8 @@ type TunnelRelayer struct { BandClient band.Client TargetChainProvider chains.ChainProvider - mu *sync.Mutex + isTargetChainActive bool + mu *sync.Mutex } // NewTunnelRelayer creates a new TunnelRelayer @@ -41,6 +43,7 @@ func NewTunnelRelayer( CheckingPacketInterval: checkingPacketInterval, BandClient: bandClient, TargetChainProvider: targetChainProvider, + isTargetChainActive: false, mu: &sync.Mutex{}, } } @@ -83,11 +86,29 @@ func (t *TunnelRelayer) CheckAndRelay(ctx context.Context) (isExecuting bool, er if err != nil { return false, err } + + // update the metric for unrelayed packets based on the difference between the latest sequences on BandChain and the target chain + relayermetrics.SetUnrelayedPackets( + t.TunnelID, + float64(tunnelBandInfo.LatestSequence-tunnelChainInfo.LatestSequence), + ) + if !tunnelChainInfo.IsActive { + // decrease active status if the tunnel was previously active + if t.isTargetChainActive { + relayermetrics.DecActiveTargetContractsCount() + t.isTargetChainActive = false + } t.Log.Info("Tunnel is not active on target chain") return false, nil } + // increase active status if the tunnel was previously inactive + if tunnelChainInfo.IsActive && !t.isTargetChainActive { + relayermetrics.IncActiveTargetContractsCount() + t.isTargetChainActive = true + } + // end process if current packet is already relayed seq := tunnelChainInfo.LatestSequence + 1 if tunnelBandInfo.LatestSequence < seq { @@ -130,6 +151,9 @@ func (t *TunnelRelayer) CheckAndRelay(ctx context.Context) (isExecuting bool, er return false, err } + // Increment the metric for successfully relayed packets + relayermetrics.IncPacketsRelayedSuccess(t.TunnelID) + t.Log.Info("Successfully relayed packet", zap.Uint64("sequence", seq)) } }