Skip to content

Commit

Permalink
feat(manager): stop applying blocks after node set unhealthy (#1194)
Browse files Browse the repository at this point in the history
  • Loading branch information
srene authored Nov 8, 2024
1 parent 2299320 commit 2c15921
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 34 deletions.
4 changes: 1 addition & 3 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,9 @@ func (m *Manager) applyBlockWithFraudHandling(block *types.Block, commit *types.
// in specific ways. For example, once a fault is detected, it publishes a DataHealthStatus event to the
// pubsub which sets the node in a frozen state.
m.FraudHandler.HandleFault(context.Background(), err)

return err
}

return nil
return err
}

// applyBlock applies the block to the store and the abci app.
Expand Down
5 changes: 1 addition & 4 deletions block/fraud.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ package block

import (
"context"

"github.com/dymensionxyz/dymint/node/events"
uevent "github.com/dymensionxyz/dymint/utils/event"
)

// FraudHandler is an interface that defines a method to handle faults.
Expand All @@ -22,7 +19,7 @@ type FreezeHandler struct {
}

func (f FreezeHandler) HandleFault(ctx context.Context, fault error) {
uevent.MustPublish(ctx, f.m.Pubsub, &events.DataHealthStatus{Error: fault}, events.HealthStatusList)
f.m.freezeNode(ctx, fault)
}

func NewFreezeHandler(manager *Manager) *FreezeHandler {
Expand Down
15 changes: 14 additions & 1 deletion block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import (

"github.com/dymensionxyz/dymint/da/registry"
"github.com/dymensionxyz/dymint/indexers/txindex"
"github.com/dymensionxyz/dymint/node/events"
"github.com/dymensionxyz/dymint/store"
uerrors "github.com/dymensionxyz/dymint/utils/errors"
uevent "github.com/dymensionxyz/dymint/utils/event"
"github.com/dymensionxyz/dymint/version"

"github.com/libp2p/go-libp2p/core/crypto"
Expand Down Expand Up @@ -247,7 +249,11 @@ func (m *Manager) Start(ctx context.Context) error {

// Start the settlement sync loop in the background
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.SettlementSyncLoop(ctx)
err := m.SettlementSyncLoop(ctx)
if err != nil {
m.freezeNode(context.Background(), err)
}
return nil
})

// Monitor sequencer set updates
Expand Down Expand Up @@ -371,3 +377,10 @@ func (m *Manager) setDA(daconfig string, dalcKV store.KV, logger log.Logger) err
func (m *Manager) setFraudHandler(handler *FreezeHandler) {
m.FraudHandler = handler
}

func (m *Manager) freezeNode(ctx context.Context, err error) {
uevent.MustPublish(ctx, m.Pubsub, &events.DataHealthStatus{Error: err}, events.HealthStatusList)
if m.RunMode == RunModeFullNode {
m.unsubscribeFullNodeEvents(ctx)
}
}
2 changes: 1 addition & 1 deletion block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ func TestDAFetch(t *testing.T) {
t.Run(c.name, func(t *testing.T) {
app.On("Commit", mock.Anything).Return(abci.ResponseCommit{Data: commitHash[:]}).Once()
app.On("Info", mock.Anything).Return(abci.ResponseInfo{
LastBlockHeight: int64(batch.EndHeight()),
LastBlockHeight: int64(batch.EndHeight() + 1),
LastBlockAppHash: commitHash[:],
})

Expand Down
39 changes: 32 additions & 7 deletions block/modes.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ import (
"golang.org/x/sync/errgroup"
)

const (
syncLoop = "syncLoop"
validateLoop = "validateLoop"
p2pGossipLoop = "applyGossipedBlocksLoop"
p2pBlocksyncLoop = "applyBlockSyncBlocksLoop"
)

// setFraudHandler sets the fraud handler for the block manager.
func (m *Manager) runAsFullNode(ctx context.Context, eg *errgroup.Group) error {
m.logger.Info("starting block manager", "mode", "full node")
Expand All @@ -26,13 +33,7 @@ func (m *Manager) runAsFullNode(ctx context.Context, eg *errgroup.Group) error {
return m.SettlementValidateLoop(ctx)
})

// Subscribe to new (or finalized) state updates events.
go uevent.MustSubscribe(ctx, m.Pubsub, "syncLoop", settlement.EventQueryNewSettlementBatchAccepted, m.onNewStateUpdate, m.logger)
go uevent.MustSubscribe(ctx, m.Pubsub, "validateLoop", settlement.EventQueryNewSettlementBatchFinalized, m.onNewStateUpdateFinalized, m.logger)

// Subscribe to P2P received blocks events (used for P2P syncing).
go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewGossipedBlock, m.OnReceivedBlock, m.logger)
go uevent.MustSubscribe(ctx, m.Pubsub, "applyBlockSyncBlocksLoop", p2p.EventQueryNewBlockSyncBlock, m.OnReceivedBlock, m.logger)
m.subscribeFullNodeEvents(ctx)

return nil
}
Expand Down Expand Up @@ -75,3 +76,27 @@ func (m *Manager) runAsProposer(ctx context.Context, eg *errgroup.Group) error {

return nil
}

func (m *Manager) subscribeFullNodeEvents(ctx context.Context) {
// Subscribe to new (or finalized) state updates events.
go uevent.MustSubscribe(ctx, m.Pubsub, syncLoop, settlement.EventQueryNewSettlementBatchAccepted, m.onNewStateUpdate, m.logger)
go uevent.MustSubscribe(ctx, m.Pubsub, validateLoop, settlement.EventQueryNewSettlementBatchFinalized, m.onNewStateUpdateFinalized, m.logger)

// Subscribe to P2P received blocks events (used for P2P syncing).
go uevent.MustSubscribe(ctx, m.Pubsub, p2pGossipLoop, p2p.EventQueryNewGossipedBlock, m.OnReceivedBlock, m.logger)
go uevent.MustSubscribe(ctx, m.Pubsub, p2pBlocksyncLoop, p2p.EventQueryNewBlockSyncBlock, m.OnReceivedBlock, m.logger)
}

func (m *Manager) unsubscribeFullNodeEvents(ctx context.Context) {
// unsubscribe for specific event (clientId)
unsubscribe := func(clientId string) {
err := m.Pubsub.UnsubscribeAll(ctx, clientId)
if err != nil {
m.logger.Error("Unsubscribe", "clientId", clientId, "error", err)
}
}
unsubscribe(syncLoop)
unsubscribe(validateLoop)
unsubscribe(p2pGossipLoop)
unsubscribe(p2pBlocksyncLoop)
}
4 changes: 1 addition & 3 deletions block/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ import (
"context"
"fmt"

"github.com/dymensionxyz/dymint/node/events"
"github.com/dymensionxyz/dymint/p2p"
"github.com/dymensionxyz/dymint/types"
uevent "github.com/dymensionxyz/dymint/utils/event"
"github.com/tendermint/tendermint/libs/pubsub"
)

Expand Down Expand Up @@ -60,7 +58,7 @@ func (m *Manager) OnReceivedBlock(event pubsub.Message) {

err := m.attemptApplyCachedBlocks()
if err != nil {
uevent.MustPublish(context.TODO(), m.Pubsub, &events.DataHealthStatus{Error: err}, events.HealthStatusList)
m.freezeNode(context.Background(), err)
m.logger.Error("Attempt apply cached blocks.", "err", err)
}
}
Expand Down
1 change: 1 addition & 0 deletions block/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func (m *Manager) applyLocalBlock(height uint64) error {
if err != nil {
return fmt.Errorf("load block: %w", gerrc.ErrNotFound)
}

commit, err := m.Store.LoadCommit(height)
if err != nil {
return fmt.Errorf("load commit: %w", gerrc.ErrNotFound)
Expand Down
24 changes: 9 additions & 15 deletions block/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ import (
"github.com/dymensionxyz/gerr-cosmos/gerrc"
"github.com/tendermint/tendermint/libs/pubsub"

"github.com/dymensionxyz/dymint/node/events"
"github.com/dymensionxyz/dymint/settlement"
uevent "github.com/dymensionxyz/dymint/utils/event"
)

// onNewStateUpdate will update the last submitted height and will update sequencers list from SL. After, it triggers syncing or validation, depending whether it needs to sync first or only validate.
Expand Down Expand Up @@ -71,18 +69,13 @@ func (m *Manager) SettlementSyncLoop(ctx context.Context) error {

settlementBatch, err := m.SLClient.GetBatchAtHeight(m.State.NextHeight())
if err != nil {
return fmt.Errorf("retrieve batch: %w", err)
return fmt.Errorf("retrieve SL batch err: %w", err)
}
m.logger.Info("Retrieved state update from SL.", "state_index", settlementBatch.StateIndex)

err = m.ApplyBatchFromSL(settlementBatch.Batch)
if err != nil {
m.logger.Error("process next DA batch", "err", err)
}

// if height havent been updated, we are stuck
if m.State.NextHeight() == currH {
return fmt.Errorf("stuck at height %d", currH)
return fmt.Errorf("process next DA batch. err:%w", err)
}

m.logger.Info("Synced from DA", "store height", m.State.Height(), "target height", m.LastSettlementHeight.Load())
Expand All @@ -92,16 +85,17 @@ func (m *Manager) SettlementSyncLoop(ctx context.Context) error {

err = m.attemptApplyCachedBlocks()
if err != nil {
uevent.MustPublish(context.TODO(), m.Pubsub, &events.DataHealthStatus{Error: err}, events.HealthStatusList)
m.logger.Error("Attempt apply cached blocks.", "err", err)
return fmt.Errorf("Attempt apply cached blocks. err:%w", err)
}

}

m.logger.Info("Synced.", "current height", m.State.Height(), "last submitted height", m.LastSettlementHeight.Load())

// nudge to signal to any listens that we're currently synced with the last settlement height we've seen so far
m.syncedFromSettlement.Nudge()
// avoid notifying as synced in case if fails before
if m.State.Height() == m.LastSettlementHeight.Load() {
m.logger.Info("Synced.", "current height", m.State.Height(), "last submitted height", m.LastSettlementHeight.Load())
// nudge to signal to any listens that we're currently synced with the last settlement height we've seen so far
m.syncedFromSettlement.Nudge()
}

}
}
Expand Down

0 comments on commit 2c15921

Please sign in to comment.