Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP node: add support to recover from missing a stream update event #2091

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 12 additions & 17 deletions contracts/src/river/registry/facets/stream/StreamRegistry.sol
Original file line number Diff line number Diff line change
Expand Up @@ -104,23 +104,18 @@ contract StreamRegistry is IStreamRegistry, RegistryModifiers {
continue;
}

// TODO: Check is disabled to allow over-writing miniblocks
// It should be re-enabled when go code is fixed to handle this correctly.
// Check if the lastMiniblockNum is the next expected miniblock and
// the prevMiniblockHash is correct
// if (
// // stream.lastMiniblockNum + 1 != miniblock.lastMiniblockNum ||
// // stream.lastMiniblockHash != miniblock.prevMiniBlockHash
// stream.lastMiniblockNum >= miniblock.lastMiniblockNum
// ) {
// emit StreamLastMiniblockUpdateFailed(
// miniblock.streamId,
// miniblock.lastMiniblockHash,
// miniblock.lastMiniblockNum,
// RiverRegistryErrors.BAD_ARG
// );
// continue;
// }
if (
stream.lastMiniblockNum + 1 != miniblock.lastMiniblockNum ||
stream.lastMiniblockHash != miniblock.prevMiniBlockHash
) {
emit StreamLastMiniblockUpdateFailed(
miniblock.streamId,
miniblock.lastMiniblockHash,
miniblock.lastMiniblockNum,
RiverRegistryErrors.BAD_ARG
);
continue;
}

// Delete genesis miniblock bytes if the stream is moving beyond genesis
if (stream.lastMiniblockNum == 0) {
Expand Down
2 changes: 1 addition & 1 deletion core/contracts/river/deploy/mock_river_registry.go

Large diffs are not rendered by default.

61 changes: 59 additions & 2 deletions core/node/events/miniblock_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ type miniblockProducer struct {
}

var _ MiniblockProducer = (*miniblockProducer)(nil)
var _ TestMiniblockProducer = (*miniblockProducer)(nil)

// mbJos tracks single miniblock production attempt for a single stream.
type mbJob struct {
Expand Down Expand Up @@ -631,6 +630,7 @@ func (p *miniblockProducer) submitProposalBatch(ctx context.Context, proposals [
// Only register miniblocks when it's time. If it's not time assume registration was successful.
// This is to reduce the number of transactions/calldata size.
var success []StreamId
var invalidProposals []StreamId
var failed []StreamId
var filteredProposals []*mbJob
freq := int64(p.cfg.Get().StreamMiniblockRegistrationFrequency)
Expand Down Expand Up @@ -665,9 +665,10 @@ func (p *miniblockProducer) submitProposalBatch(ctx context.Context, proposals [
}

var err error
successRegistered, failed, err := p.streamCache.Params().Registry.SetStreamLastMiniblockBatch(ctx, mbs)
successRegistered, invalid, failed, err := p.streamCache.Params().Registry.SetStreamLastMiniblockBatch(ctx, mbs)
if err == nil {
success = append(success, successRegistered...)
invalidProposals = append(invalidProposals, invalid...)
if len(failed) > 0 {
log.Errorw("processMiniblockProposalBatch: Failed to register some miniblocks", "failed", failed)
}
Expand All @@ -684,6 +685,8 @@ func (p *miniblockProducer) submitProposalBatch(ctx context.Context, proposals [
"mbFrequency", freq,
)

streamsOutOfSync := make([]*mbJob, 0, len(invalidProposals))

for _, job := range proposals {
if slices.Contains(success, job.stream.streamId) {
err := job.stream.ApplyMiniblock(ctx, job.candidate)
Expand All @@ -696,7 +699,61 @@ func (p *miniblockProducer) submitProposalBatch(ctx context.Context, proposals [
err,
)
}

p.jobDone(ctx, job)
} else if slices.Contains(invalidProposals, job.stream.streamId) {
streamsOutOfSync = append(streamsOutOfSync, job)
}
}

if err := p.promoteConfirmedCandidates(ctx, streamsOutOfSync); err != nil {
log.Error("processMiniblockProposalBatch: Error promoting confirmed miniblock candidates", "err", err)
}
}

// promoteConfirmedCandidates tries to promote local candidates that are registered in the Stream Registry
// but not yet applied.
func (p *miniblockProducer) promoteConfirmedCandidates(ctx context.Context, jobs []*mbJob) error {
if len(jobs) == 0 {
return nil
}

log := logging.FromCtx(ctx)
registry := p.streamCache.Params().Registry

headNum, err := p.streamCache.Params().RiverChain.Client.BlockNumber(ctx)
if err != nil {
return AsRiverError(err, Err_CANNOT_CALL_CONTRACT).
Message("Unable to determine River Chain block number")
}

for _, job := range jobs {
stream, err := registry.GetStream(ctx, job.stream.streamId, crypto.BlockNumber(headNum))
if err != nil {
log.Error("Unable to retrieve stream details from registry",
"streamId", job.stream.streamId, "err", err)
}

committedLocalCandidateRef := MiniblockRef{
Hash: stream.LastMiniblockHash,
Num: int64(stream.LastMiniblockNum),
}

if err := job.stream.promoteCandidate(ctx, &committedLocalCandidateRef); err == nil {
log.Info("Promoted miniblock candidate",
"streamId", job.stream.streamId,
"num", committedLocalCandidateRef.Num,
"hash", committedLocalCandidateRef.Hash)
} else {
log.Error("Unable to promote candidate",
"streamId", job.stream.streamId,
"num", committedLocalCandidateRef.Num,
"hash", committedLocalCandidateRef.Hash,
"err", err)
}

p.jobDone(ctx, job)
}

return nil
}
17 changes: 10 additions & 7 deletions core/node/events/parsed_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ import (
"strings"

"github.com/ethereum/go-ethereum/common"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"

. "github.com/river-build/river/core/node/base"
. "github.com/river-build/river/core/node/crypto"
. "github.com/river-build/river/core/node/protocol"
. "github.com/river-build/river/core/node/shared"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)

type ParsedEvent struct {
Expand All @@ -33,7 +32,7 @@ func (e *ParsedEvent) GetEnvelopeBytes() ([]byte, error) {
Func("GetEnvelopeBytes")
}

func ParseEvent(envelope *Envelope) (*ParsedEvent, error) {
func ParseEventWithCachedStreamEvent(envelope *Envelope, streamEvent *StreamEvent) (*ParsedEvent, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changes in this file seem to be not related to the rest of PR?

hash := RiverHash(envelope.Event)
if !bytes.Equal(hash[:], envelope.Hash) {
return nil, RiverError(Err_BAD_EVENT_HASH, "Bad hash provided", "computed", hash, "got", envelope.Hash)
Expand All @@ -44,8 +43,7 @@ func ParseEvent(envelope *Envelope) (*ParsedEvent, error) {
return nil, err
}

var streamEvent StreamEvent
err = proto.Unmarshal(envelope.Event, &streamEvent)
err = proto.Unmarshal(envelope.Event, streamEvent)
if err != nil {
return nil, AsRiverError(err, Err_INVALID_ARGUMENT).
Message("Failed to decode stream event from bytes").
Expand Down Expand Up @@ -76,7 +74,7 @@ func ParseEvent(envelope *Envelope) (*ParsedEvent, error) {
}

return &ParsedEvent{
Event: &streamEvent,
Event: streamEvent,
Envelope: envelope,
Hash: common.BytesToHash(envelope.Hash),
MiniblockRef: &MiniblockRef{
Expand All @@ -87,6 +85,11 @@ func ParseEvent(envelope *Envelope) (*ParsedEvent, error) {
}, nil
}

func ParseEvent(envelope *Envelope) (*ParsedEvent, error) {
streamEvent := new(StreamEvent)
return ParseEventWithCachedStreamEvent(envelope, streamEvent)
}

func (e *ParsedEvent) ShortDebugStr() string {
if e == nil {
return "nil"
Expand Down
7 changes: 2 additions & 5 deletions core/node/events/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,16 +353,12 @@ func (s *streamImpl) promoteCandidate(ctx context.Context, mb *MiniblockRef) err
return s.promoteCandidateLocked(ctx, mb)
}

// promoteCandidateLocked shouldbe called with a lock held.
// promoteCandidateLocked should be called with a lock held.
func (s *streamImpl) promoteCandidateLocked(ctx context.Context, mb *MiniblockRef) error {
if s.local == nil {
return nil
}

if s.local == nil {
return nil
}

if err := s.loadInternal(ctx); err != nil {
return err
}
Expand All @@ -381,6 +377,7 @@ func (s *streamImpl) promoteCandidateLocked(ctx context.Context, mb *MiniblockRe
"lastBlockHash", s.view().LastBlock().Ref.Hash,
)
}

return nil
}

Expand Down
23 changes: 14 additions & 9 deletions core/node/events/stream_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/gammazero/workerpool"
"github.com/prometheus/client_golang/prometheus"
"github.com/puzpuzpuz/xsync/v3"

"github.com/river-build/river/core/config"
"github.com/river-build/river/core/contracts/river"
. "github.com/river-build/river/core/node/base"
Expand Down Expand Up @@ -41,6 +40,7 @@ type StreamCacheParams struct {
Metrics infra.MetricsFactory
RemoteMiniblockProvider RemoteMiniblockProvider
Scrubber Scrubber
disableCallbacks bool // for test purposes
}

type StreamCache interface {
Expand Down Expand Up @@ -74,9 +74,13 @@ type streamCacheImpl struct {
streamCacheRemoteGauge prometheus.Gauge

onlineSyncWorkerPool *workerpool.WorkerPool

disableCallbacks bool
}

var _ StreamCache = (*streamCacheImpl)(nil)
var (
_ StreamCache = (*streamCacheImpl)(nil)
)

func NewStreamCache(
ctx context.Context,
Expand Down Expand Up @@ -108,6 +112,7 @@ func NewStreamCache(
),
chainConfig: params.ChainConfig,
onlineSyncWorkerPool: workerpool.New(params.Config.StreamReconciliation.OnlineWorkerPoolSize),
disableCallbacks: params.disableCallbacks,
}
}

Expand Down Expand Up @@ -156,15 +161,15 @@ func (s *streamCacheImpl) Start(ctx context.Context) error {
s.appliedBlockNum.Store(uint64(s.params.AppliedBlockNum))

// Close initial worker pool after all tasks are executed.
go func() {
initialSyncWorkerPool.StopWait()
}()
go initialSyncWorkerPool.StopWait()

// TODO: add buffered channel to avoid blocking ChainMonitor
s.params.RiverChain.ChainMonitor.OnBlockWithLogs(
s.params.AppliedBlockNum+1,
s.onBlockWithLogs,
)
if !s.disableCallbacks {
s.params.RiverChain.ChainMonitor.OnBlockWithLogs(
s.params.AppliedBlockNum+1,
s.onBlockWithLogs,
)
}

go s.runCacheCleanup(ctx)

Expand Down
Loading
Loading