diff --git a/pkg/exporter/consensus/jobs/beacon.go b/pkg/exporter/consensus/jobs/beacon.go new file mode 100644 index 0000000..38bcd67 --- /dev/null +++ b/pkg/exporter/consensus/jobs/beacon.go @@ -0,0 +1,282 @@ +package jobs + +import ( + "context" + "errors" + "time" + + eth2client "github.com/attestantio/go-eth2-client" + v1 "github.com/attestantio/go-eth2-client/api/v1" + "github.com/attestantio/go-eth2-client/spec" + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" +) + +// Beacon reports Beacon information about the beacon chain. +type Beacon struct { + client eth2client.Service + log logrus.FieldLogger + Slot prometheus.GaugeVec + Transactions prometheus.GaugeVec + Slashings prometheus.GaugeVec + Attestations prometheus.GaugeVec + Deposits prometheus.GaugeVec + VoluntaryExits prometheus.GaugeVec + FinalityCheckpoints prometheus.GaugeVec + ReOrgs prometheus.Counter + ReOrgDepth prometheus.Counter + + currentVersion string +} + +const ( + NameBeacon = "beacon" +) + +// NewBeacon creates a new Beacon instance. +func NewBeaconJob(client eth2client.Service, log logrus.FieldLogger, namespace string, constLabels map[string]string) Beacon { + constLabels["module"] = NameBeacon + namespace += "_beacon" + + return Beacon{ + client: client, + log: log, + Slot: *prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Name: "slot", + Help: "The slot number in the block.", + ConstLabels: constLabels, + }, + []string{ + "block_id", + "version", + }, + ), + Transactions: *prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Name: "transactions", + Help: "The amount of transactions in the block.", + ConstLabels: constLabels, + }, + []string{ + "block_id", + "version", + }, + ), + Slashings: *prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Name: "slashings", + Help: "The amount of slashings in the block.", + ConstLabels: constLabels, + }, + []string{ + "block_id", + "version", + "type", + }, + ), + Attestations: *prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Name: "attestations", + Help: "The amount of attestations in the block.", + ConstLabels: constLabels, + }, + []string{ + "block_id", + "version", + }, + ), + Deposits: *prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Name: "deposits", + Help: "The amount of deposits in the block.", + ConstLabels: constLabels, + }, + []string{ + "block_id", + "version", + }, + ), + VoluntaryExits: *prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Name: "voluntary_exits", + Help: "The amount of voluntary exits in the block.", + ConstLabels: constLabels, + }, + []string{ + "block_id", + "version", + }, + ), + FinalityCheckpoints: *prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Name: "finality_checkpoint_epochs", + Help: "That epochs of the finality checkpoints.", + ConstLabels: constLabels, + }, + []string{ + "state_id", + "checkpoint", + }, + ), + ReOrgs: prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "reorg_count", + Help: "The count of reorgs.", + ConstLabels: constLabels, + }, + ), + ReOrgDepth: prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "reorg_depth", + Help: "The number of reorgs.", + ConstLabels: constLabels, + }, + ), + } +} + +func (b *Beacon) Name() string { + return NameBeacon +} + +func (b *Beacon) Start(ctx context.Context) { + b.tick(ctx) + + for { + select { + case <-ctx.Done(): + return + case <-time.After(time.Second * 5): + b.tick(ctx) + } + } +} + +func (b *Beacon) tick(ctx context.Context) { + for _, id := range []string{"head", "finalized", "justified"} { + if id != "justified" { + if err := b.GetFinality(ctx, id); err != nil { + b.log.WithError(err).Error("Failed to get finality") + } + } + + if err := b.GetSignedBeaconBlock(ctx, id); err != nil { + b.log.WithError(err).Error("Failed to get signed beacon block") + } + } +} + +func (b *Beacon) HandleEvent(ctx context.Context, event *v1.Event) { + if event.Topic == EventTopicBlock { + if err := b.GetSignedBeaconBlock(ctx, "head"); err != nil { + b.log.WithError(err).Error("Failed to get signed beacon block") + } + } + + if event.Topic == EventTopicChainReorg { + b.handleChainReorg(event) + } +} + +func (b *Beacon) handleChainReorg(event *v1.Event) { + reorg, ok := event.Data.(*v1.ChainReorgEvent) + if !ok { + return + } + + b.ReOrgs.Inc() + b.ReOrgDepth.Add(float64(reorg.Depth)) +} + +func (b *Beacon) GetSignedBeaconBlock(ctx context.Context, blockID string) error { + provider, isProvider := b.client.(eth2client.SignedBeaconBlockProvider) + if !isProvider { + return errors.New("client does not implement eth2client.SignedBeaconBlockProvider") + } + + signedBeaconBlock, err := provider.SignedBeaconBlock(ctx, blockID) + if err != nil { + return err + } + + if err := b.handleSingleBlock(blockID, signedBeaconBlock); err != nil { + return err + } + + return nil +} + +func (b *Beacon) GetFinality(ctx context.Context, stateID string) error { + provider, isProvider := b.client.(eth2client.FinalityProvider) + if !isProvider { + return errors.New("client does not implement eth2client.FinalityProvider") + } + + finality, err := provider.Finality(ctx, stateID) + if err != nil { + return err + } + + b.FinalityCheckpoints. + WithLabelValues(stateID, "previous_justified"). + Set(float64(finality.PreviousJustified.Epoch)) + + b.FinalityCheckpoints. + WithLabelValues(stateID, "justified"). + Set(float64(finality.Justified.Epoch)) + + b.FinalityCheckpoints. + WithLabelValues(stateID, "finalized"). + Set(float64(finality.Finalized.Epoch)) + + return nil +} + +func (b *Beacon) handleSingleBlock(blockID string, block *spec.VersionedSignedBeaconBlock) error { + if b.currentVersion != block.Version.String() { + b.Transactions.Reset() + b.Slashings.Reset() + b.Attestations.Reset() + b.Deposits.Reset() + b.VoluntaryExits.Reset() + + b.currentVersion = block.Version.String() + } + + var beaconBlock BeaconBlock + + switch block.Version { + case spec.DataVersionPhase0: + beaconBlock = NewBeaconBlockFromPhase0(block) + case spec.DataVersionAltair: + beaconBlock = NewBeaconBlockFromAltair(block) + case spec.DataVersionBellatrix: + beaconBlock = NewBeaconBlockFromBellatrix(block) + default: + return errors.New("received beacon block of unknown spec version") + } + + b.recordNewBeaconBlock(blockID, block.Version.String(), beaconBlock) + + return nil +} + +func (b *Beacon) recordNewBeaconBlock(blockID, version string, block BeaconBlock) { + b.Slot.WithLabelValues(blockID, version).Set(float64(block.Slot)) + b.Slashings.WithLabelValues(blockID, version, "proposer").Set(float64(block.ProposerSlashings)) + b.Slashings.WithLabelValues(blockID, version, "attester").Set(float64(block.ProposerSlashings)) + b.Attestations.WithLabelValues(blockID, version).Set(float64(block.Attestations)) + b.Deposits.WithLabelValues(blockID, version).Set(float64(block.Deposits)) + b.VoluntaryExits.WithLabelValues(blockID, version).Set(float64(block.VoluntaryExits)) + b.Transactions.WithLabelValues(blockID, version).Set(float64(block.Transactions)) +} diff --git a/pkg/exporter/consensus/jobs/beacon_block.go b/pkg/exporter/consensus/jobs/beacon_block.go new file mode 100644 index 0000000..f1dabde --- /dev/null +++ b/pkg/exporter/consensus/jobs/beacon_block.go @@ -0,0 +1,51 @@ +package jobs + +import ( + "github.com/attestantio/go-eth2-client/spec" +) + +type BeaconBlock struct { + AttesterSlashings int + ProposerSlashings int + Transactions int + Deposits int + VoluntaryExits int + Attestations int + Slot uint64 +} + +func NewBeaconBlockFromPhase0(block *spec.VersionedSignedBeaconBlock) BeaconBlock { + return BeaconBlock{ + AttesterSlashings: len(block.Phase0.Message.Body.AttesterSlashings), + ProposerSlashings: len(block.Phase0.Message.Body.ProposerSlashings), + Transactions: 0, + Deposits: len(block.Phase0.Message.Body.Deposits), + VoluntaryExits: len(block.Phase0.Message.Body.VoluntaryExits), + Attestations: len(block.Phase0.Message.Body.Attestations), + Slot: uint64(block.Phase0.Message.Slot), + } +} + +func NewBeaconBlockFromAltair(block *spec.VersionedSignedBeaconBlock) BeaconBlock { + return BeaconBlock{ + AttesterSlashings: len(block.Altair.Message.Body.AttesterSlashings), + ProposerSlashings: len(block.Altair.Message.Body.ProposerSlashings), + Transactions: 0, + Deposits: len(block.Altair.Message.Body.Deposits), + VoluntaryExits: len(block.Altair.Message.Body.VoluntaryExits), + Attestations: len(block.Altair.Message.Body.Attestations), + Slot: uint64(block.Altair.Message.Slot), + } +} + +func NewBeaconBlockFromBellatrix(block *spec.VersionedSignedBeaconBlock) BeaconBlock { + return BeaconBlock{ + AttesterSlashings: len(block.Bellatrix.Message.Body.AttesterSlashings), + ProposerSlashings: len(block.Bellatrix.Message.Body.ProposerSlashings), + Transactions: len(block.Bellatrix.Message.Body.ExecutionPayload.Transactions), + Deposits: len(block.Bellatrix.Message.Body.Deposits), + VoluntaryExits: len(block.Bellatrix.Message.Body.VoluntaryExits), + Attestations: len(block.Bellatrix.Message.Body.Attestations), + Slot: uint64(block.Bellatrix.Message.Slot), + } +} diff --git a/pkg/exporter/consensus/jobs/event.go b/pkg/exporter/consensus/jobs/event.go new file mode 100644 index 0000000..355261f --- /dev/null +++ b/pkg/exporter/consensus/jobs/event.go @@ -0,0 +1,51 @@ +package jobs + +import ( + "context" + + eth2client "github.com/attestantio/go-eth2-client" + v1 "github.com/attestantio/go-eth2-client/api/v1" + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" +) + +// Event reports event counts. +type Event struct { + log logrus.FieldLogger + Count prometheus.CounterVec +} + +const ( + NameEvent = "event" +) + +// NewEvent creates a new Event instance. +func NewEventJob(client eth2client.Service, log logrus.FieldLogger, namespace string, constLabels map[string]string) Event { + constLabels["module"] = NameEvent + namespace += "_event" + + return Event{ + log: log, + Count: *prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "count", + Help: "The count of beacon events.", + ConstLabels: constLabels, + }, + []string{ + "name", + }, + ), + } +} + +func (b *Event) Name() string { + return NameEvent +} + +func (b *Event) Start(ctx context.Context) {} + +func (b *Event) HandleEvent(ctx context.Context, event *v1.Event) { + b.Count.WithLabelValues(event.Topic).Inc() +} diff --git a/pkg/exporter/consensus/jobs/forks.go b/pkg/exporter/consensus/jobs/forks.go index 50ad77d..75f0b56 100644 --- a/pkg/exporter/consensus/jobs/forks.go +++ b/pkg/exporter/consensus/jobs/forks.go @@ -11,6 +11,7 @@ import ( "github.com/spf13/cast" eth2client "github.com/attestantio/go-eth2-client" + v1 "github.com/attestantio/go-eth2-client/api/v1" ) // Forks reports the state of any forks (previous, active or upcoming). @@ -101,6 +102,9 @@ func (f *Forks) tick(ctx context.Context) { } } +func (f *Forks) HandleEvent(ctx context.Context, event *v1.Event) { +} + func (f *Forks) ForkEpochs(ctx context.Context) error { spec, err := f.getSpec(ctx) if err != nil { diff --git a/pkg/exporter/consensus/jobs/general.go b/pkg/exporter/consensus/jobs/general.go index 54c2fe6..d08860f 100644 --- a/pkg/exporter/consensus/jobs/general.go +++ b/pkg/exporter/consensus/jobs/general.go @@ -13,13 +13,11 @@ import ( // General reports general information about the node. type General struct { - client eth2client.Service - log logrus.FieldLogger - Slots prometheus.GaugeVec - NodeVersion prometheus.GaugeVec - ReOrgs prometheus.Counter - ReOrgDepth prometheus.Counter - FinalityCheckpoints prometheus.GaugeVec + client eth2client.Service + log logrus.FieldLogger + Slots prometheus.GaugeVec + NodeVersion prometheus.GaugeVec + ClientName prometheus.GaugeVec } const ( @@ -55,34 +53,6 @@ func NewGeneralJob(client eth2client.Service, log logrus.FieldLogger, namespace "version", }, ), - ReOrgs: prometheus.NewCounter( - prometheus.CounterOpts{ - Namespace: namespace, - Name: "reorg_count", - Help: "The count of reorgs.", - ConstLabels: constLabels, - }, - ), - ReOrgDepth: prometheus.NewCounter( - prometheus.CounterOpts{ - Namespace: namespace, - Name: "reorg_depth", - Help: "The number of reorgs.", - ConstLabels: constLabels, - }, - ), - FinalityCheckpoints: *prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: namespace, - Name: "finality_checkpoint_epochs", - Help: "That epochs of the finality checkpoints.", - ConstLabels: constLabels, - }, - []string{ - "state_id", - "checkpoint", - }, - ), } } @@ -93,63 +63,18 @@ func (g *General) Name() string { func (g *General) Start(ctx context.Context) { g.tick(ctx) - subscribed := false - - if err := g.startSubscriptions(ctx); err == nil { - subscribed = true - } - for { select { case <-ctx.Done(): return case <-time.After(time.Second * 15): g.tick(ctx) - - if !subscribed { - if err := g.startSubscriptions(ctx); err == nil { - subscribed = true - } - } } } } -func (g *General) startSubscriptions(ctx context.Context) error { - g.log.Info("starting subscriptions") - - provider, isProvider := g.client.(eth2client.EventsProvider) - if !isProvider { - return errors.New("client does not implement eth2client.Subscriptions") - } - - topics := []string{ - "chain_reorg", - } - - if err := provider.Events(ctx, topics, g.handleEvent); err != nil { - return err - } - - return nil -} - -func (g *General) handleEvent(event *v1.Event) { - //nolint:gocritic // new subscription topics coming soon - switch event.Topic { - case "chain_reorg": - g.handleChainReorg(event) - } -} - -func (g *General) handleChainReorg(event *v1.Event) { - reorg, ok := event.Data.(*v1.ChainReorgEvent) - if !ok { - return - } +func (g *General) HandleEvent(ctx context.Context, event *v1.Event) { - g.ReOrgs.Inc() - g.ReOrgDepth.Add(float64(reorg.Depth)) } func (g *General) tick(ctx context.Context) { @@ -163,10 +88,6 @@ func (g *General) tick(ctx context.Context) { if err := g.GetBeaconSlot(ctx, checkpoint); err != nil { g.log.WithError(err).Error("Failed to get beacon slot: ", checkpoint) } - - if err := g.GetFinality(ctx, checkpoint); err != nil { - g.log.WithError(err).Error("Failed to get finality checkpoint: ", checkpoint) - } } } @@ -181,6 +102,7 @@ func (g *General) GetNodeVersion(ctx context.Context) error { return err } + g.NodeVersion.Reset() g.NodeVersion.WithLabelValues(version).Set(1) return nil @@ -217,29 +139,3 @@ func (g *General) GetBeaconSlot(ctx context.Context, identifier string) error { func (g *General) ObserveSlot(identifier string, slot uint64) { g.Slots.WithLabelValues(identifier).Set(float64(slot)) } - -func (g *General) GetFinality(ctx context.Context, stateID string) error { - provider, isProvider := g.client.(eth2client.FinalityProvider) - if !isProvider { - return errors.New("client does not implement eth2client.FinalityProvider") - } - - finality, err := provider.Finality(ctx, stateID) - if err != nil { - return err - } - - g.FinalityCheckpoints. - WithLabelValues(stateID, "previous_justified"). - Set(float64(finality.PreviousJustified.Epoch)) - - g.FinalityCheckpoints. - WithLabelValues(stateID, "justified"). - Set(float64(finality.Justified.Epoch)) - - g.FinalityCheckpoints. - WithLabelValues(stateID, "finalized"). - Set(float64(finality.Finalized.Epoch)) - - return nil -} diff --git a/pkg/exporter/consensus/jobs/spec.go b/pkg/exporter/consensus/jobs/spec.go index 6561745..2a43156 100644 --- a/pkg/exporter/consensus/jobs/spec.go +++ b/pkg/exporter/consensus/jobs/spec.go @@ -8,6 +8,7 @@ import ( "time" eth2client "github.com/attestantio/go-eth2-client" + v1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" "github.com/spf13/cast" @@ -276,6 +277,10 @@ func (s *Spec) tick(ctx context.Context) { } } +func (s *Spec) HandleEvent(ctx context.Context, event *v1.Event) { + +} + func (s *Spec) GetSpec(ctx context.Context) error { provider, isProvider := s.client.(eth2client.SpecProvider) if !isProvider { diff --git a/pkg/exporter/consensus/jobs/syncstatus.go b/pkg/exporter/consensus/jobs/syncstatus.go index af23943..d00a0c8 100644 --- a/pkg/exporter/consensus/jobs/syncstatus.go +++ b/pkg/exporter/consensus/jobs/syncstatus.go @@ -6,6 +6,7 @@ import ( "time" eth2client "github.com/attestantio/go-eth2-client" + v1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) @@ -94,6 +95,9 @@ func (s *Sync) Start(ctx context.Context) { } } +func (s *Sync) HandleEvent(ctx context.Context, event *v1.Event) { +} + func (s *Sync) tick(ctx context.Context) { if err := s.GetSyncState(ctx); err != nil { s.log.WithError(err).Error("failed to get sync state") diff --git a/pkg/exporter/consensus/jobs/topics.go b/pkg/exporter/consensus/jobs/topics.go new file mode 100644 index 0000000..c50710f --- /dev/null +++ b/pkg/exporter/consensus/jobs/topics.go @@ -0,0 +1,13 @@ +package jobs + +type EventTopic string + +const ( + EventTopicBlock = "block" + EventTopicHead = "head" + EventTopicAttestation = "attestation" + EventTopicChainReorg = "chain_reorg" + EventTopicFinalizedCheckpoint = "finalized_checkpoint" + EventTopicVoluntaryExit = "voluntary_exit" + EventTopicContributionAndProof = "contribution_and_proof" +) diff --git a/pkg/exporter/consensus/metrics.go b/pkg/exporter/consensus/metrics.go index a77e8d5..c6a1056 100644 --- a/pkg/exporter/consensus/metrics.go +++ b/pkg/exporter/consensus/metrics.go @@ -2,8 +2,11 @@ package consensus import ( "context" + "errors" + "time" eth2client "github.com/attestantio/go-eth2-client" + v1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/prometheus/client_golang/prometheus" "github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/jobs" "github.com/sirupsen/logrus" @@ -18,10 +21,14 @@ type Metrics interface { type metrics struct { log logrus.FieldLogger + client eth2client.Service + generalMetrics jobs.General syncMetrics jobs.Sync specMetrics jobs.Spec forkMetrics jobs.Forks + beaconMetrics jobs.Beacon + eventMetrics jobs.Event } // NewMetrics returns a new metrics object. @@ -32,17 +39,17 @@ func NewMetrics(client eth2client.Service, log logrus.FieldLogger, nodeName, nam m := &metrics{ log: log, + client: client, generalMetrics: jobs.NewGeneralJob(client, log, namespace, constLabels), specMetrics: jobs.NewSpecJob(client, log, namespace, constLabels), syncMetrics: jobs.NewSyncJob(client, log, namespace, constLabels), forkMetrics: jobs.NewForksJob(client, log, namespace, constLabels), + beaconMetrics: jobs.NewBeaconJob(client, log, namespace, constLabels), + eventMetrics: jobs.NewEventJob(client, log, namespace, constLabels), } prometheus.MustRegister(m.generalMetrics.Slots) prometheus.MustRegister(m.generalMetrics.NodeVersion) - prometheus.MustRegister(m.generalMetrics.ReOrgs) - prometheus.MustRegister(m.generalMetrics.ReOrgDepth) - prometheus.MustRegister(m.generalMetrics.FinalityCheckpoints) prometheus.MustRegister(m.syncMetrics.Percentage) prometheus.MustRegister(m.syncMetrics.EstimatedHighestSlot) @@ -79,6 +86,18 @@ func NewMetrics(client eth2client.Service, log logrus.FieldLogger, nodeName, nam prometheus.MustRegister(m.forkMetrics.Current) prometheus.MustRegister(m.forkMetrics.Activated) + prometheus.MustRegister(m.beaconMetrics.Attestations) + prometheus.MustRegister(m.beaconMetrics.Deposits) + prometheus.MustRegister(m.beaconMetrics.Slashings) + prometheus.MustRegister(m.beaconMetrics.Transactions) + prometheus.MustRegister(m.beaconMetrics.VoluntaryExits) + prometheus.MustRegister(m.beaconMetrics.Slot) + prometheus.MustRegister(m.beaconMetrics.FinalityCheckpoints) + prometheus.MustRegister(m.beaconMetrics.ReOrgs) + prometheus.MustRegister(m.beaconMetrics.ReOrgDepth) + + prometheus.MustRegister(m.eventMetrics.Count) + return m } @@ -87,4 +106,57 @@ func (m *metrics) StartAsync(ctx context.Context) { go m.specMetrics.Start(ctx) go m.syncMetrics.Start(ctx) go m.forkMetrics.Start(ctx) + go m.beaconMetrics.Start(ctx) + go m.eventMetrics.Start(ctx) + go m.subscriptionLoop(ctx) +} + +func (m *metrics) subscriptionLoop(ctx context.Context) { + subscribed := false + + for { + if !subscribed { + if err := m.startSubscriptions(ctx); err != nil { + m.log.Errorf("Failed to subscribe to eth2 node: %v", err) + } else { + subscribed = true + } + } + + time.Sleep(5 * time.Second) + } +} + +func (m *metrics) startSubscriptions(ctx context.Context) error { + m.log.Info("starting subscriptions") + + provider, isProvider := m.client.(eth2client.EventsProvider) + if !isProvider { + return errors.New("client does not implement eth2client.Subscriptions") + } + + topics := []string{} + + for key, supported := range v1.SupportedEventTopics { + if supported { + topics = append(topics, key) + } + } + + if err := provider.Events(ctx, topics, func(event *v1.Event) { + m.handleEvent(ctx, event) + }); err != nil { + return err + } + + return nil +} + +func (m *metrics) handleEvent(ctx context.Context, event *v1.Event) { + m.generalMetrics.HandleEvent(ctx, event) + m.specMetrics.HandleEvent(ctx, event) + m.syncMetrics.HandleEvent(ctx, event) + m.forkMetrics.HandleEvent(ctx, event) + m.beaconMetrics.HandleEvent(ctx, event) + m.eventMetrics.HandleEvent(ctx, event) }