Skip to content

Commit

Permalink
fix(manager): resume block production right when skew < max skew (#1252)
Browse files Browse the repository at this point in the history
  • Loading branch information
srene authored Nov 26, 2024
1 parent 5c2f6f8 commit 7ef37e5
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 15 deletions.
4 changes: 2 additions & 2 deletions block/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context, bytesProducedC chan int)
return nil
case bytesProducedC <- bytesProducedN:
default:
evt := &events.DataHealthStatus{Error: fmt.Errorf("bytes produced channel is full: %w", gerrc.ErrResourceExhausted)}
evt := &events.DataHealthStatus{Error: fmt.Errorf("Block production paused. Time between last block produced and last block submitted higher than max skew time: %s last block in settlement time: %s %w", m.Conf.MaxSkewTime, time.Unix(0, m.LastBlockTimeInSettlement.Load()), gerrc.ErrResourceExhausted)}
uevent.MustPublish(ctx, m.Pubsub, evt, events.HealthStatusList)
m.logger.Error("Pausing block production until new batch is submitted.", "Batch skew time", m.GetBatchSkewTime(), "Max batch skew time", m.Conf.MaxSkewTime)
m.logger.Error("Pausing block production until new batch is submitted.", "Batch skew time", m.GetBatchSkewTime(), "Max batch skew time", m.Conf.MaxSkewTime, "Last block in settlement time", time.Unix(0, m.LastBlockTimeInSettlement.Load()))
select {
case <-ctx.Done():
return nil
Expand Down
29 changes: 16 additions & 13 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ func SubmitLoopInner(
ctx context.Context,
logger types.Logger,
bytesProduced chan int, // a channel of block and commit bytes produced
maxProduceSubmitSkewTime time.Duration, // max time between last submitted block and last produced block allowed. if this threshold is reached block production is stopped.
unsubmittedBlocksNum func() uint64,
unsubmittedBlocksBytes func() int,
batchSkewTime func() time.Duration,
maxSkewTime time.Duration, // max time between last submitted block and last produced block allowed. if this threshold is reached block production is stopped.
unsubmittedBlocksNum func() uint64, // func that returns the amount of non-submitted blocks
unsubmittedBlocksBytes func() int, // func that returns bytes from non-submitted blocks
batchSkewTime func() time.Duration, // func that returns measured time between last submitted block and last produced block
maxBatchSubmitTime time.Duration, // max time to allow between batches
maxBatchSubmitBytes uint64, // max size of serialised batch in bytes
createAndSubmitBatch func(maxSizeBytes uint64) (bytes uint64, err error),
Expand All @@ -73,9 +73,8 @@ func SubmitLoopInner(

submitter.Nudge()

if maxProduceSubmitSkewTime < batchSkewTime() {
// too much stuff is pending submission
// we block here until we get a progress nudge from the submitter thread
// if the time between the last produced block and last submitted is greater than maxSkewTime we block here until we get a progress nudge from the submitter thread
if maxSkewTime < batchSkewTime() {
select {
case <-ctx.Done():
return nil
Expand All @@ -87,8 +86,8 @@ func SubmitLoopInner(
})

eg.Go(func() error {
// 'submitter': this thread actually creates and submits batches, and will do it on a timer if he isn't nudged by block production
ticker := time.NewTicker(maxBatchSubmitTime / 10) // interval does not need to match max batch time since we keep track anyway, it's just to wakeup
// 'submitter': this thread actually creates and submits batches. this thread is woken up every batch_submit_time (in addition to every block produced) to check if there is anything to submit even if no new blocks have been produced
ticker := time.NewTicker(maxBatchSubmitTime)
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -128,11 +127,15 @@ func SubmitLoopInner(
}
return err
}
ticker.Reset(maxBatchSubmitTime)
pending = uint64(unsubmittedBlocksBytes()) //nolint:gosec // bytes size is always positive
logger.Info("Submitted a batch to both sub-layers.", "n bytes consumed from pending", nConsumed, "pending after", pending) // TODO: debug level
pending = uint64(unsubmittedBlocksBytes()) //nolint:gosec // bytes size is always positive
// after new batch submitted we check the skew time to wake up 'trigger' thread and restart block production
if batchSkewTime() < maxSkewTime {
trigger.Nudge()
}
logger.Debug("Submitted a batch to both sub-layers.", "n bytes consumed from pending", nConsumed, "pending after", pending, "skew time", batchSkewTime())
}
trigger.Nudge()
// update pendingBytes with non submitted block bytes after all pending batches have been submitted
pendingBytes.Store(pending)
}
})

Expand Down
2 changes: 2 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ const defaultConfigTemplate = `
block_time = "{{ .BlockManagerConfig.BlockTime }}"
# block production interval in case of no transactions ("0s" produces empty blocks)
max_idle_time = "{{ .BlockManagerConfig.MaxIdleTime }}"
# block production interval after block with no transactions
max_proof_time = "{{ .BlockManagerConfig.MaxProofTime }}"
# maximum time the node will produce blocks without submitting to SL before stopping block production
max_skew_time = "{{ .BlockManagerConfig.MaxSkewTime }}"
Expand Down

0 comments on commit 7ef37e5

Please sign in to comment.