Skip to content

Commit

Permalink
Add progress logger
Browse files Browse the repository at this point in the history
  • Loading branch information
patrick-ogrady committed Sep 29, 2020
1 parent 6af5313 commit aa37ee4
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 14 deletions.
4 changes: 4 additions & 0 deletions cmd/check_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ func runCheckDataCmd(cmd *cobra.Command, args []string) {
return tester.LogMemoryLoop(ctx)
})

g.Go(func() error {
return dataTester.StartProgressLogger(ctx)
})

sigListeners := []context.CancelFunc{cancel}
go handleSignals(&sigListeners)

Expand Down
62 changes: 49 additions & 13 deletions pkg/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,17 +110,6 @@ func (l *Logger) LogDataStats(ctx context.Context) error {
return nil
}

elapsedTime, err := l.CounterStorage.Get(ctx, TimeElapsedCounter)
if err != nil {
return fmt.Errorf("%w cannot get elapsed time", err)
}

if elapsedTime.Sign() == 0 { // wait for at least some elapsed time
return nil
}

blocksPerSecond := new(big.Int).Div(blocks, elapsedTime)

orphans, err := l.CounterStorage.Get(ctx, storage.OrphanCounter)
if err != nil {
return fmt.Errorf("%w cannot get orphan counter", err)
Expand All @@ -147,10 +136,9 @@ func (l *Logger) LogDataStats(ctx context.Context) error {
}

statsMessage := fmt.Sprintf(
"[STATS] Blocks: %s (Orphaned: %s, Rate: %s/sec) Transactions: %s Operations: %s",
"[STATS] Blocks: %s (Orphaned: %s) Transactions: %s Operations: %s",
blocks.String(),
orphans.String(),
blocksPerSecond.String(),
txs.String(),
ops.String(),
)
Expand Down Expand Up @@ -181,6 +169,54 @@ func (l *Logger) LogDataStats(ctx context.Context) error {
return nil
}

// LogTipEstimate logs information about the remaining blocks to sync.
func (l *Logger) LogTipEstimate(ctx context.Context, tipIndex int64) error {
blocks, err := l.CounterStorage.Get(ctx, storage.BlockCounter)
if err != nil {
return fmt.Errorf("%w cannot get block counter", err)
}

if blocks.Sign() == 0 { // wait for at least 1 block to be processed
return nil
}

orphans, err := l.CounterStorage.Get(ctx, storage.OrphanCounter)
if err != nil {
return fmt.Errorf("%w cannot get orphan counter", err)
}

adjustedBlocks := blocks.Int64() - orphans.Int64()
if tipIndex-adjustedBlocks <= 0 { // return if no blocks to sync
return nil
}

elapsedTime, err := l.CounterStorage.Get(ctx, TimeElapsedCounter)
if err != nil {
return fmt.Errorf("%w cannot get elapsed time", err)
}

if elapsedTime.Sign() == 0 { // wait for at least some elapsed time
return nil
}

blocksPerSecond := new(big.Float).Quo(new(big.Float).SetInt64(adjustedBlocks), new(big.Float).SetInt(elapsedTime))
blocksPerSecondFloat, _ := blocksPerSecond.Float64()
blocksSynced := new(big.Float).Quo(new(big.Float).SetInt64(adjustedBlocks), new(big.Float).SetInt64(tipIndex))
blocksSyncedFloat, _ := blocksSynced.Float64()

statsMessage := fmt.Sprintf(
"[PROGRESS] Blocks Synced: %d/%d (Completed: %f%%, Rate: %f/second) Time Remaining: %s",
adjustedBlocks,
tipIndex,
blocksSyncedFloat*utils.OneHundred,
blocksPerSecondFloat,
utils.TimeToTip(blocksPerSecondFloat, adjustedBlocks, tipIndex),
)

color.Cyan(statsMessage)
return nil
}

// LogConstructionStats logs all construction values in CounterStorage.
func (l *Logger) LogConstructionStats(ctx context.Context, inflightTransactions int) error {
transactionsCreated, err := l.CounterStorage.Get(ctx, storage.TransactionsCreatedCounter)
Expand Down
27 changes: 26 additions & 1 deletion pkg/tester/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,24 @@ func (t *DataTester) StartPeriodicLogger(
// Print stats one last time before exiting
_ = t.logger.LogDataStats(ctx)

return ctx.Err()
case <-tc.C:
_ = t.logger.LogDataStats(ctx)
}
}
}

// StartProgressLogger priunts out periodic
// estimates of sync duration if we are behind tip.
func (t *DataTester) StartProgressLogger(
ctx context.Context,
) error {
tc := time.NewTicker(PeriodicLoggingFrequency)
defer tc.Stop()

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-tc.C:
// Update the elapsed time in counter storage so that
Expand All @@ -364,7 +382,14 @@ func (t *DataTester) StartPeriodicLogger(
logger.TimeElapsedCounter,
big.NewInt(periodicLoggingSeconds),
)
_ = t.logger.LogDataStats(ctx)

status, fetchErr := t.fetcher.NetworkStatusRetry(ctx, t.network, nil)
if fetchErr != nil {
log.Printf("%v: unable to get network status\n", fetchErr.Err)
continue
}

_ = t.logger.LogTipEstimate(ctx, status.CurrentBlockIdentifier.Index)
}
}
}
Expand Down

0 comments on commit aa37ee4

Please sign in to comment.