Skip to content

Commit

Permalink
Cleanup: update receiver names
Browse files Browse the repository at this point in the history
  • Loading branch information
terencechain committed Oct 30, 2022
1 parent 7b7bff9 commit 8af9c67
Show file tree
Hide file tree
Showing 11 changed files with 117 additions and 117 deletions.
86 changes: 43 additions & 43 deletions arbnode/inbox_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,33 +167,33 @@ func (r *InboxReader) DelayedBridge() *DelayedBridge {
return r.delayedBridge
}

func (ir *InboxReader) run(ctx context.Context, hadError bool) error {
from, err := ir.getNextBlockToRead()
func (r *InboxReader) run(ctx context.Context, hadError bool) error {
from, err := r.getNextBlockToRead()
if err != nil {
return err
}
newHeaders, unsubscribe := ir.l1Reader.Subscribe(false)
newHeaders, unsubscribe := r.l1Reader.Subscribe(false)
defer unsubscribe()
blocksToFetch := uint64(ir.config().DefaultBlocksToRead)
blocksToFetch := uint64(r.config().DefaultBlocksToRead)
if hadError {
blocksToFetch = 1
}
seenBatchCount := uint64(0)
seenBatchCountStored := uint64(math.MaxUint64)
storeSeenBatchCount := func() {
if seenBatchCountStored != seenBatchCount {
atomic.StoreUint64(&ir.lastSeenBatchCount, seenBatchCount)
atomic.StoreUint64(&r.lastSeenBatchCount, seenBatchCount)
seenBatchCountStored = seenBatchCount
}
}
defer storeSeenBatchCount() // in case of error
for {

latestHeader, err := ir.l1Reader.LastHeader(ctx)
latestHeader, err := r.l1Reader.LastHeader(ctx)
if err != nil {
return err
}
config := ir.config()
config := r.config()
currentHeight := latestHeader.Number

neededBlockAdvance := config.DelayBlocks + arbmath.SaturatingUSub(config.MinBlocksToRead, 1)
Expand All @@ -218,8 +218,8 @@ func (ir *InboxReader) run(ctx context.Context, hadError bool) error {

if config.DelayBlocks > 0 {
currentHeight = new(big.Int).Sub(currentHeight, new(big.Int).SetUint64(config.DelayBlocks))
if currentHeight.Cmp(ir.firstMessageBlock) < 0 {
currentHeight = new(big.Int).Set(ir.firstMessageBlock)
if currentHeight.Cmp(r.firstMessageBlock) < 0 {
currentHeight = new(big.Int).Set(r.firstMessageBlock)
}
}

Expand All @@ -229,11 +229,11 @@ func (ir *InboxReader) run(ctx context.Context, hadError bool) error {
missingSequencer := false

{
checkingDelayedCount, err := ir.delayedBridge.GetMessageCount(ctx, currentHeight)
checkingDelayedCount, err := r.delayedBridge.GetMessageCount(ctx, currentHeight)
if err != nil {
return err
}
ourLatestDelayedCount, err := ir.tracker.GetDelayedCount()
ourLatestDelayedCount, err := r.tracker.GetDelayedCount()
if err != nil {
return err
}
Expand All @@ -242,18 +242,18 @@ func (ir *InboxReader) run(ctx context.Context, hadError bool) error {
missingDelayed = true
} else if ourLatestDelayedCount > checkingDelayedCount {
log.Info("backwards reorg of delayed messages", "from", ourLatestDelayedCount, "to", checkingDelayedCount)
err = ir.tracker.ReorgDelayedTo(checkingDelayedCount, config.HardReorg)
err = r.tracker.ReorgDelayedTo(checkingDelayedCount, config.HardReorg)
if err != nil {
return err
}
}
if checkingDelayedCount > 0 {
checkingDelayedSeqNum := checkingDelayedCount - 1
l1DelayedAcc, err := ir.delayedBridge.GetAccumulator(ctx, checkingDelayedSeqNum, currentHeight)
l1DelayedAcc, err := r.delayedBridge.GetAccumulator(ctx, checkingDelayedSeqNum, currentHeight)
if err != nil {
return err
}
dbDelayedAcc, err := ir.tracker.GetDelayedAcc(checkingDelayedSeqNum)
dbDelayedAcc, err := r.tracker.GetDelayedAcc(checkingDelayedSeqNum)
if err != nil {
return err
}
Expand All @@ -263,33 +263,33 @@ func (ir *InboxReader) run(ctx context.Context, hadError bool) error {
}
}

seenBatchCount, err = ir.sequencerInbox.GetBatchCount(ctx, currentHeight)
seenBatchCount, err = r.sequencerInbox.GetBatchCount(ctx, currentHeight)
if err != nil {
seenBatchCount = 0
return err
}
checkingBatchCount := seenBatchCount
{
ourLatestBatchCount, err := ir.tracker.GetBatchCount()
ourLatestBatchCount, err := r.tracker.GetBatchCount()
if err != nil {
return err
}
if ourLatestBatchCount < checkingBatchCount {
checkingBatchCount = ourLatestBatchCount
missingSequencer = true
} else if ourLatestBatchCount > checkingBatchCount && config.HardReorg {
err = ir.tracker.ReorgBatchesTo(checkingBatchCount)
err = r.tracker.ReorgBatchesTo(checkingBatchCount)
if err != nil {
return err
}
}
if checkingBatchCount > 0 {
checkingBatchSeqNum := checkingBatchCount - 1
l1BatchAcc, err := ir.sequencerInbox.GetAccumulator(ctx, checkingBatchSeqNum, currentHeight)
l1BatchAcc, err := r.sequencerInbox.GetAccumulator(ctx, checkingBatchSeqNum, currentHeight)
if err != nil {
return err
}
dbBatchAcc, err := ir.tracker.GetBatchAcc(checkingBatchSeqNum)
dbBatchAcc, err := r.tracker.GetBatchAcc(checkingBatchSeqNum)
if err != nil {
return err
}
Expand All @@ -303,10 +303,10 @@ func (ir *InboxReader) run(ctx context.Context, hadError bool) error {
// There's nothing to do
from = arbmath.BigAddByUint(currentHeight, 1)
blocksToFetch = config.DefaultBlocksToRead
ir.lastReadMutex.Lock()
ir.lastReadBlock = currentHeight.Uint64()
ir.lastReadBatchCount = checkingBatchCount
ir.lastReadMutex.Unlock()
r.lastReadMutex.Lock()
r.lastReadBlock = currentHeight.Uint64()
r.lastReadBatchCount = checkingBatchCount
r.lastReadMutex.Unlock()
storeSeenBatchCount()
continue
}
Expand Down Expand Up @@ -335,35 +335,35 @@ func (ir *InboxReader) run(ctx context.Context, hadError bool) error {
if to.Cmp(currentHeight) > 0 {
to.Set(currentHeight)
}
sequencerBatches, err := ir.sequencerInbox.LookupBatchesInRange(ctx, from, to)
sequencerBatches, err := r.sequencerInbox.LookupBatchesInRange(ctx, from, to)
if err != nil {
return err
}
delayedMessages, err := ir.delayedBridge.LookupMessagesInRange(ctx, from, to, func(batchNum uint64) ([]byte, error) {
delayedMessages, err := r.delayedBridge.LookupMessagesInRange(ctx, from, to, func(batchNum uint64) ([]byte, error) {
if len(sequencerBatches) > 0 && batchNum >= sequencerBatches[0].SequenceNumber {
idx := int(batchNum - sequencerBatches[0].SequenceNumber)
if idx < len(sequencerBatches) {
return sequencerBatches[idx].Serialize(ctx, ir.l1Reader.Client())
return sequencerBatches[idx].Serialize(ctx, r.l1Reader.Client())
} else {
log.Warn("missing mentioned batch in L1 message lookup", "batch", batchNum)
}
}
return ir.GetSequencerMessageBytes(ctx, batchNum)
return r.GetSequencerMessageBytes(ctx, batchNum)
})
if err != nil {
return err
}
if !ir.caughtUp && to.Cmp(currentHeight) == 0 {
if !r.caughtUp && to.Cmp(currentHeight) == 0 {
// TODO better caught up tracking
ir.caughtUp = true
ir.caughtUpChan <- true
r.caughtUp = true
r.caughtUpChan <- true
}
if len(sequencerBatches) > 0 {
missingSequencer = false
reorgingSequencer = false
firstBatch := sequencerBatches[0]
if firstBatch.SequenceNumber > 0 {
haveAcc, err := ir.tracker.GetBatchAcc(firstBatch.SequenceNumber - 1)
haveAcc, err := r.tracker.GetBatchAcc(firstBatch.SequenceNumber - 1)
if errors.Is(err, AccumulatorNotFoundErr) {
reorgingSequencer = true
} else if err != nil {
Expand All @@ -376,7 +376,7 @@ func (ir *InboxReader) run(ctx context.Context, hadError bool) error {
// Skip any batches we already have in the database
for len(sequencerBatches) > 0 {
batch := sequencerBatches[0]
haveAcc, err := ir.tracker.GetBatchAcc(batch.SequenceNumber)
haveAcc, err := r.tracker.GetBatchAcc(batch.SequenceNumber)
if errors.Is(err, AccumulatorNotFoundErr) {
// This batch is new
break
Expand Down Expand Up @@ -409,7 +409,7 @@ func (ir *InboxReader) run(ctx context.Context, hadError bool) error {
return err
}
if beforeCount > 0 {
haveAcc, err := ir.tracker.GetDelayedAcc(beforeCount - 1)
haveAcc, err := r.tracker.GetDelayedAcc(beforeCount - 1)
if errors.Is(err, AccumulatorNotFoundErr) {
reorgingDelayed = true
} else if err != nil {
Expand All @@ -426,7 +426,7 @@ func (ir *InboxReader) run(ctx context.Context, hadError bool) error {

log.Trace("looking up messages", "from", from.String(), "to", to.String(), "missingDelayed", missingDelayed, "missingSequencer", missingSequencer, "reorgingDelayed", reorgingDelayed, "reorgingSequencer", reorgingSequencer)
if !reorgingDelayed && !reorgingSequencer && (len(delayedMessages) != 0 || len(sequencerBatches) != 0) {
delayedMismatch, err := ir.addMessages(ctx, sequencerBatches, delayedMessages)
delayedMismatch, err := r.addMessages(ctx, sequencerBatches, delayedMessages)
if err != nil {
return err
}
Expand All @@ -435,15 +435,15 @@ func (ir *InboxReader) run(ctx context.Context, hadError bool) error {
}
if len(sequencerBatches) > 0 {
readAnyBatches = true
ir.lastReadMutex.Lock()
ir.lastReadBlock = to.Uint64()
ir.lastReadBatchCount = sequencerBatches[len(sequencerBatches)-1].SequenceNumber + 1
ir.lastReadMutex.Unlock()
r.lastReadMutex.Lock()
r.lastReadBlock = to.Uint64()
r.lastReadBatchCount = sequencerBatches[len(sequencerBatches)-1].SequenceNumber + 1
r.lastReadMutex.Unlock()
storeSeenBatchCount()
}
}
if reorgingDelayed || reorgingSequencer {
from, err = ir.getPrevBlockForReorg(from)
from, err = r.getPrevBlockForReorg(from)
if err != nil {
return err
}
Expand All @@ -465,10 +465,10 @@ func (ir *InboxReader) run(ctx context.Context, hadError bool) error {
}

if !readAnyBatches {
ir.lastReadMutex.Lock()
ir.lastReadBlock = currentHeight.Uint64()
ir.lastReadBatchCount = checkingBatchCount
ir.lastReadMutex.Unlock()
r.lastReadMutex.Lock()
r.lastReadBlock = currentHeight.Uint64()
r.lastReadBatchCount = checkingBatchCount
r.lastReadMutex.Unlock()
storeSeenBatchCount()
}
}
Expand Down
8 changes: 4 additions & 4 deletions arbos/incomingmessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,11 @@ func (h *L1IncomingMessageHeader) Equals(other *L1IncomingMessageHeader) bool {
h.L1BaseFee == other.L1BaseFee
}

func (h *L1IncomingMessage) FillInBatchGasCost(batchFetcher FallibleBatchFetcher) error {
if batchFetcher == nil || h.Header.Kind != L1MessageType_BatchPostingReport || h.BatchGasCost != nil {
func (msg *L1IncomingMessage) FillInBatchGasCost(batchFetcher FallibleBatchFetcher) error {
if batchFetcher == nil || msg.Header.Kind != L1MessageType_BatchPostingReport || msg.BatchGasCost != nil {
return nil
}
_, _, batchHash, batchNum, _, err := parseBatchPostingReportMessageFields(bytes.NewReader(h.L2msg))
_, _, batchHash, batchNum, _, err := parseBatchPostingReportMessageFields(bytes.NewReader(msg.L2msg))
if err != nil {
return fmt.Errorf("failed to parse batch posting report: %w", err)
}
Expand All @@ -161,7 +161,7 @@ func (h *L1IncomingMessage) FillInBatchGasCost(batchFetcher FallibleBatchFetcher
return fmt.Errorf("batch fetcher returned incorrect data hash %v (wanted %v for batch %v)", gotHash, batchHash, batchNum)
}
gas := computeBatchGasCost(batchData)
h.BatchGasCost = &gas
msg.BatchGasCost = &gas
return nil
}

Expand Down
Loading

0 comments on commit 8af9c67

Please sign in to comment.