Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(manager): improve unhealthy status management #1304

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 11 additions & 27 deletions block/block.go
Original file line number Diff line number Diff line change
@@ -1,44 +1,28 @@
package block

import (
"errors"
"fmt"

"github.com/dymensionxyz/gerr-cosmos/gerrc"

errorsmod "cosmossdk.io/errors"

"github.com/dymensionxyz/dymint/types"
)

// applyBlockWithFraudHandling calls applyBlock and validateBlockBeforeApply with fraud handling.
func (m *Manager) applyBlockWithFraudHandling(block *types.Block, commit *types.Commit, blockMetaData types.BlockMetaData) error {
validateWithFraud := func() error {
if m.Conf.SkipValidationHeight != block.Header.Height {
if err := m.validateBlockBeforeApply(block, commit); err != nil {
m.blockCache.Delete(block.Header.Height)
// TODO: can we take an action here such as dropping the peer / reducing their reputation?
return fmt.Errorf("block not valid at height %d, dropping it: err:%w", block.Header.Height, err)
}
}

if err := m.applyBlock(block, commit, blockMetaData); err != nil {
return fmt.Errorf("apply block: %w", err)
// validateAndApplyBlock calls validateBlockBeforeApply and applyBlock.
func (m *Manager) validateAndApplyBlock(block *types.Block, commit *types.Commit, blockMetaData types.BlockMetaData) error {
if m.Conf.SkipValidationHeight != block.Header.Height {
if err := m.validateBlockBeforeApply(block, commit); err != nil {
m.blockCache.Delete(block.Header.Height)
// TODO: can we take an action here such as dropping the peer / reducing their reputation?
return fmt.Errorf("block not valid at height %d, dropping it: err:%w", block.Header.Height, err)
}

return nil
}

err := validateWithFraud()
if errors.Is(err, gerrc.ErrFault) {
// Here we handle the fault by calling the fraud handler.
// FraudHandler is an interface that defines a method to handle faults. Implement this interface to handle faults
// in specific ways. For example, once a fault is detected, it publishes a DataHealthStatus event to the
// pubsub which sets the node in a frozen state.
m.FraudHandler.HandleFault(m.Ctx, err)
if err := m.applyBlock(block, commit, blockMetaData); err != nil {
return fmt.Errorf("apply block: %w", err)
}

return err
return nil
}

// applyBlock applies the block to the store and the abci app.
Expand Down Expand Up @@ -230,7 +214,7 @@ func (m *Manager) attemptApplyCachedBlocks() error {
if cachedBlock.Block.GetRevision() != m.State.GetRevision() {
break
}
err := m.applyBlockWithFraudHandling(cachedBlock.Block, cachedBlock.Commit, types.BlockMetaData{Source: cachedBlock.Source})
err := m.validateAndApplyBlock(cachedBlock.Block, cachedBlock.Commit, types.BlockMetaData{Source: cachedBlock.Source})
if err != nil {
return fmt.Errorf("apply cached block: expected height: %d: %w", expectedHeight, err)
}
Expand Down
15 changes: 10 additions & 5 deletions block/fork.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

const (
ForkMonitorInterval = 15 * time.Second
ForkMessage = "rollapp fork detected. please rollback to height previous to rollapp_revision_start_height."
ForkMonitorMessage = "rollapp fork detected. please rollback to height previous to rollapp_revision_start_height."
)

// MonitorForkUpdateLoop monitors the hub for fork updates in a loop
Expand All @@ -26,8 +26,11 @@ func (m *Manager) MonitorForkUpdateLoop(ctx context.Context) error {
defer ticker.Stop()

for {
if err := m.checkForkUpdate(ForkMessage); err != nil {
if err := m.checkForkUpdate(ForkMonitorMessage); err != nil {
m.logger.Error("Check for update.", err)
if errors.Is(err, ErrNonRecoverable) {
return err
}
}
select {
case <-ctx.Done():
Expand Down Expand Up @@ -63,7 +66,9 @@ func (m *Manager) checkForkUpdate(msg string) error {
if err != nil {
return err
}
m.freezeNode(fmt.Errorf("%s local_block_height: %d rollapp_revision_start_height: %d local_revision: %d rollapp_revision: %d", msg, m.State.Height(), expectedRevision.StartHeight, actualRevision, expectedRevision.Number))

err = fmt.Errorf("%s local_block_height: %d rollapp_revision_start_height: %d local_revision: %d rollapp_revision: %d", msg, m.State.Height(), expectedRevision.StartHeight, actualRevision, expectedRevision.Number)
return errors.Join(ErrNonRecoverable, err)
}

return nil
Expand Down Expand Up @@ -148,7 +153,7 @@ func (m *Manager) prepareDRSUpgradeMessages(obsoleteDRS []uint32) ([]proto.Messa
return nil, err
}

// if binary DRS is obsolete return error (to panic)
// if binary DRS is obsolete return error
for _, drs := range obsoleteDRS {
if drs == drsVersion {
return nil, gerrc.ErrCancelled.Wrapf("obsolete DRS version: %d", drs)
Expand Down Expand Up @@ -283,7 +288,7 @@ func (m *Manager) doForkWhenNewRevision() error {

// this cannot happen. it means the revision number obtained is not the same or the next revision. unable to fork.
if expectedRevision.Number != m.State.GetRevision() {
panic("Inconsistent expected revision number from Hub. Unable to fork")
return fmt.Errorf("inconsistent expected revision number from Hub (%d != %d). Unable to fork", expectedRevision.Number, m.State.GetRevision())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

params are better at the end but nvm

}

// remove instruction file after fork
Expand Down
10 changes: 3 additions & 7 deletions block/fraud.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
package block

import (
"context"
)

// FraudHandler is an interface that defines a method to handle faults.
// Contract: should not be blocking.
type FraudHandler interface {
// HandleFault handles a fault that occurred in the system.
// The fault is passed as an error type.
HandleFault(ctx context.Context, fault error)
HandleFault(fault error)
}

// FreezeHandler is used to handle faults coming from executing and validating blocks.
Expand All @@ -18,8 +14,8 @@ type FreezeHandler struct {
m *Manager
}

func (f FreezeHandler) HandleFault(ctx context.Context, fault error) {
f.m.freezeNode(fault)
func (f FreezeHandler) HandleFault(fault error) {
f.m.StopManager(fault)
}

func NewFreezeHandler(manager *Manager) *FreezeHandler {
Expand Down
Loading
Loading