Skip to content

Commit

Permalink
feat(warcrimes): Server non-finalized state from finalized
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed Feb 25, 2025
1 parent ff67a86 commit a2669cf
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 39 deletions.
30 changes: 30 additions & 0 deletions pkg/beacon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,28 @@ import (
"errors"
"fmt"

"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/ethpandaops/checkpointz/pkg/beacon/store"
)

type YoloMode struct {
Enabled bool `yaml:"enabled" default:"false"`

RootStr string `yaml:"root"`
Epoch phase0.Epoch `yaml:"epoch"`
}

func (y *YoloMode) Root() phase0.Root {
var root phase0.Root

err := root.UnmarshalJSON([]byte(fmt.Sprintf(`"%s"`, y.RootStr)))
if err != nil {
return phase0.Root{}
}

return root
}

// Config holds configuration for running a FinalityProvider config
type Config struct {
// Mode sets the operational mode of the provider.
Expand All @@ -19,6 +38,9 @@ type Config struct {

// Cache holds configuration for the caches.
Frontend FrontendConfig `yaml:"frontend"`

// YoloMode enables a mode where we don't check for syncing nodes, we just use the first healthy node.
YoloMode YoloMode `yaml:"yolo_mode"`
}

// Cache configuration holds configuration for the caches.
Expand Down Expand Up @@ -64,6 +86,14 @@ func (c *Config) Validate() error {
return fmt.Errorf("historical_epoch_count (%d) cannot be higher than 200", c.HistoricalEpochCount)
}

if c.YoloMode.Enabled && c.YoloMode.Epoch == 0 {
return errors.New("yolo mode enabled but no epoch provided")
}

if c.YoloMode.Enabled && c.YoloMode.RootStr == "" {
return errors.New("yolo mode enabled but no root provided")
}

return nil
}

Expand Down
76 changes: 70 additions & 6 deletions pkg/beacon/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import (
"github.com/sirupsen/logrus"
)

const yoloMode = true

type Default struct {
log logrus.FieldLogger

Expand Down Expand Up @@ -104,16 +102,53 @@ func (d *Default) Start(ctx context.Context) error {
return err
}

if d.config.YoloMode.Enabled {
if d.config.YoloMode.Epoch == 0 {
return errors.New("yolo mode enabled but no epoch provided")
}

if d.config.YoloMode.RootStr == "" {
return errors.New("yolo mode enabled but no root provided")
}

d.head = &v1.Finality{
Finalized: &phase0.Checkpoint{
Epoch: d.config.YoloMode.Epoch,
Root: d.config.YoloMode.Root(),
},
}

d.log.WithFields(logrus.Fields{
"epoch": d.config.YoloMode.Epoch,
"root": eth.RootAsString(d.config.YoloMode.Root()),
}).Info("Yolo mode enabled, skipping health checks and serving a static beacon state. Good luck soldier.")
}

go func() {
for {
var nd *Node

var err error

if yoloMode {
if d.config.YoloMode.Enabled {
healthy, err := d.Healthy(ctx)
if err != nil {
d.log.WithError(err).Error("Waiting for a healthy node before beginning..")
time.Sleep(time.Second * 5)

continue
}

if !healthy {
d.log.Error("No healthy nodes found, waiting for a healthy node before beginning..")
time.Sleep(time.Second * 5)

continue
}

nd, err = d.nodes.Healthy(ctx).RandomNode(ctx)
if err != nil {
d.log.WithError(err).Error("Waiting for a healthy, non-syncing node before beginning..")
d.log.WithError(err).Error("Waiting for a healthy node before beginning..")
time.Sleep(time.Second * 5)

continue
Expand Down Expand Up @@ -165,6 +200,12 @@ func (d *Default) Start(ctx context.Context) error {
})

n.Beacon.OnFinalityCheckpointUpdated(ctx, func(ctx context.Context, event *beacon.FinalityCheckpointUpdated) error {
if d.config.YoloMode.Enabled {
logCtx.Info("Yolo mode enabled, skipping finality checkpoint updated")

return nil
}

logCtx.WithFields(logrus.Fields{
"epoch": event.Finality.Finalized.Epoch,
"root": fmt.Sprintf("%#x", event.Finality.Finalized.Root),
Expand Down Expand Up @@ -479,6 +520,10 @@ func (d *Default) shouldDownloadStates() bool {
}

func (d *Default) checkFinality(ctx context.Context) error {
if d.config.YoloMode.Enabled {
return nil
}

d.majorityMutex.Lock()
defer d.majorityMutex.Unlock()

Expand Down Expand Up @@ -520,7 +565,7 @@ func (d *Default) checkFinality(ctx context.Context) error {
func (d *Default) refreshSpec(ctx context.Context) error {
d.log.Debug("Fetching beacon spec")

upstream, err := d.nodes.Ready(ctx).DataProviders(ctx).RandomNode(ctx)
upstream, err := d.nodes.DataProviders(ctx).RandomNode(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -556,7 +601,26 @@ func (d *Default) checkGenesisTime(ctx context.Context) error {

d.log.Debug("Fetching genesis time")

upstream, err := d.nodes.Ready(ctx).DataProviders(ctx).RandomNode(ctx)
if d.config.YoloMode.Enabled {
upstream, err := d.nodes.DataProviders(ctx).RandomNode(ctx)
if err != nil {
return err
}

g, err := upstream.Beacon.Genesis()
if err != nil {
return err
}

// store the genesis time
d.genesis = g

d.log.Info("Fetched genesis time")

return nil
}

upstream, err := d.nodes.Healthy(ctx).DataProviders(ctx).RandomNode(ctx)
if err != nil {
return err
}
Expand Down
65 changes: 40 additions & 25 deletions pkg/beacon/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,61 +15,70 @@ import (
)

func (d *Default) downloadServingCheckpoint(ctx context.Context, checkpoint *v1.Finality) error {
if checkpoint == nil {
return errors.New("checkpoint is nil")
root := checkpoint.Finalized.Root
epoch := checkpoint.Finalized.Epoch

if d.config.YoloMode.Enabled {
epoch = d.config.YoloMode.Epoch
root = d.config.YoloMode.Root()
}

if checkpoint.Finalized == nil {
return errors.New("finalized checkpoint is nil")
if epoch == 0 || root == (phase0.Root{}) {
return errors.New("invalid epoch or root")
}

sp, err := d.Spec()
if err != nil {
return fmt.Errorf("failed to fetch spec: %w", err)
}

fork, err := sp.ForkEpochs.CurrentFork(checkpoint.Finalized.Epoch)
fork, err := sp.ForkEpochs.CurrentFork(epoch)
if err != nil {
return fmt.Errorf("failed to get current fork: %w", err)
}

d.log.
WithField("epoch", checkpoint.Finalized.Epoch).
WithField("epoch", epoch).
WithField("fork_name", fork.Name).
Info("Downloading serving checkpoint")

upstream, err := d.nodes.
Ready(ctx).
DataProviders(ctx).
PastFinalizedCheckpoint(ctx, checkpoint). // Ensure we attempt to fetch the bundle from a node that knows about the checkpoint.
RandomNode(ctx)
nodes := d.nodes.
DataProviders(ctx)

if !d.config.YoloMode.Enabled {
nodes = nodes.PastFinalizedCheckpoint(ctx, checkpoint)
}

upstream, err := nodes.RandomNode(ctx)
if err != nil {
return perrors.Wrap(err, "no data provider node available")
}

block, err := d.fetchBundle(ctx, checkpoint.Finalized.Root, upstream)
block, err := d.fetchBundle(ctx, root, upstream)
if err != nil {
return perrors.Wrap(err, "failed to fetch bundle")
}

// Validate that everything is ok to serve.
// Lighthouse ref: https://lighthouse-book.sigmaprime.io/checkpoint-sync.html#alignment-requirements
blockSlot, err := block.Slot()
if err != nil {
return fmt.Errorf("failed to get slot from block: %w", err)
}
if !d.config.YoloMode.Enabled {
// Validate that everything is ok to serve.
// Lighthouse ref: https://lighthouse-book.sigmaprime.io/checkpoint-sync.html#alignment-requirements
blockSlot, err := block.Slot()
if err != nil {
return fmt.Errorf("failed to get slot from block: %w", err)
}

if blockSlot%sp.SlotsPerEpoch != 0 {
return fmt.Errorf("block slot is not aligned from an epoch boundary: %d", blockSlot)
if blockSlot%sp.SlotsPerEpoch != 0 {
return fmt.Errorf("block slot is not aligned from an epoch boundary: %d", blockSlot)
}
}

d.servingBundle = checkpoint
d.metrics.ObserveServingEpoch(checkpoint.Finalized.Epoch)
d.metrics.ObserveServingEpoch(epoch)

d.log.WithFields(
logrus.Fields{
"epoch": checkpoint.Finalized.Epoch,
"root": fmt.Sprintf("%#x", checkpoint.Finalized.Root),
"epoch": epoch,
"root": fmt.Sprintf("%#x", root),
},
).Info("Serving a new finalized checkpoint bundle")

Expand All @@ -95,7 +104,7 @@ func (d *Default) checkGenesis(ctx context.Context) error {

d.log.Debug("Fetching genesis state")

readyNodes := d.nodes.Ready(ctx)
readyNodes := d.nodes.Healthy(ctx)
if len(readyNodes) == 0 {
return errors.New("no nodes ready")
}
Expand All @@ -120,7 +129,7 @@ func (d *Default) checkGenesis(ctx context.Context) error {
return err
}

upstream, err := d.nodes.Ready(ctx).DataProviders(ctx).RandomNode(ctx)
upstream, err := d.nodes.DataProviders(ctx).RandomNode(ctx)
if err != nil {
return err
}
Expand All @@ -138,6 +147,12 @@ func (d *Default) checkGenesis(ctx context.Context) error {
}

func (d *Default) fetchHistoricalCheckpoints(ctx context.Context, checkpoint *v1.Finality) error {
if d.config.YoloMode.Enabled {
d.log.Info("Yolo mode enabled, skipping historical checkpoint fetching")

return nil
}

d.historicalMutex.Lock()
defer d.historicalMutex.Unlock()

Expand Down
10 changes: 2 additions & 8 deletions pkg/beacon/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,25 +136,19 @@ func (n Nodes) NotOptimisticEL(ctx context.Context) Nodes {
}

func (n Nodes) Ready(ctx context.Context) Nodes {
if yoloMode {
return n.Healthy(ctx)
}

return n.
Healthy(ctx).
NotSyncing(ctx).
NotOptimisticEL(ctx)
}

func (n Nodes) RandomNode(ctx context.Context) (*Node, error) {
nodes := n.Ready(ctx)

if len(nodes) == 0 {
if len(n) == 0 {
return nil, errors.New("no nodes found")
}

//nolint:gosec // not critical to worry about/will probably be replaced.
return nodes[rand.Intn(len(nodes))], nil
return n[rand.Intn(len(n))], nil
}

func (n Nodes) Filter(ctx context.Context, f func(*Node) bool) Nodes {
Expand Down

0 comments on commit a2669cf

Please sign in to comment.