Skip to content

Commit

Permalink
fix incoming tipset bucketing
Browse files Browse the repository at this point in the history
  • Loading branch information
whyrusleeping committed Dec 11, 2019
1 parent f16cc03 commit f141849
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 5 deletions.
6 changes: 5 additions & 1 deletion chain/sub/incoming.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
}

go func() {
log.Infof("New block over pubsub: %s", blk.Cid())

start := time.Now()
log.Debug("about to fetch messages for block from pubsub")
bmsgs, err := s.Bsync.FetchMessagesByCids(context.TODO(), blk.BlsMessages)
if err != nil {
Expand All @@ -46,7 +49,8 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
return
}

log.Debugw("new block over pubsub", "cid", blk.Header.Cid(), "source", msg.GetFrom())
took := time.Since(start)
log.Infow("new block over pubsub", "cid", blk.Header.Cid(), "source", msg.GetFrom(), "msgfetch", took)
if delay := time.Now().Unix() - int64(blk.Header.Timestamp); delay > 5 {
log.Warnf("Received block with large delay %d from miner %s", delay, blk.Header.Miner)
}
Expand Down
1 change: 1 addition & 0 deletions chain/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ func (syncer *Syncer) tryLoadFullTipSet(cids []cid.Cid) (*store.FullTipSet, erro
}

func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error {
log.Info("SYNC TIME: ", maybeHead.Cids())
ctx, span := trace.StartSpan(ctx, "chain.Sync")
defer span.End()

Expand Down
6 changes: 2 additions & 4 deletions chain/sync_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,6 @@ func (stb *syncTargetBucket) sameChainAs(ts *types.TipSet) bool {
if types.CidArrsEqual(ts.Parents(), t.Cids()) {
return true
}
if types.CidArrsEqual(ts.Parents(), t.Parents()) {
return true
}
}
return false
}
Expand Down Expand Up @@ -283,6 +280,7 @@ func (sm *SyncManager) syncScheduler() {
}

func (sm *SyncManager) scheduleIncoming(ts *types.TipSet) {
log.Info("scheduling incoming tipset sync: ", ts.Cids())
if sm.getBootstrapState() == BSStateSelected {
sm.setBootstrapState(BSStateScheduled)
sm.syncTargets <- ts
Expand All @@ -295,7 +293,7 @@ func (sm *SyncManager) scheduleIncoming(ts *types.TipSet) {
break
}

if types.CidArrsEqual(ts.Parents(), acts.Cids()) || types.CidArrsEqual(ts.Parents(), acts.Parents()) {
if types.CidArrsEqual(ts.Parents(), acts.Cids()) {
// sync this next, after that sync process finishes
relatedToActiveSync = true
}
Expand Down
27 changes: 27 additions & 0 deletions chain/sync_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func TestSyncManager(t *testing.T) {
b := mock.TipSet(mock.MkBlock(a, 1, 2))
c1 := mock.TipSet(mock.MkBlock(b, 1, 3))
c2 := mock.TipSet(mock.MkBlock(b, 2, 4))
c3 := mock.TipSet(mock.MkBlock(b, 3, 5))
d := mock.TipSet(mock.MkBlock(c1, 4, 5))

runSyncMgrTest(t, "testBootstrap", 1, func(t *testing.T, sm *SyncManager, stc chan *syncOp) {
Expand Down Expand Up @@ -120,4 +121,30 @@ func TestSyncManager(t *testing.T) {

assertGetSyncOp(t, stc, d)
})

runSyncMgrTest(t, "testSyncIncomingTipset", 1, func(t *testing.T, sm *SyncManager, stc chan *syncOp) {
sm.SetPeerHead(ctx, "peer1", a)
assertGetSyncOp(t, stc, a)

sm.SetPeerHead(ctx, "peer2", b)
op := <-stc
op.done()

sm.SetPeerHead(ctx, "peer2", c1)
op1 := <-stc
fmt.Println("op1: ", op1.ts.Cids())

sm.SetPeerHead(ctx, "peer2", c2)
sm.SetPeerHead(ctx, "peer2", c3)

op1.done()

op2 := <-stc
fmt.Println("op2: ", op2.ts.Cids())
op2.done()

op3 := <-stc
fmt.Println("op3: ", op3.ts.Cids())
op3.done()
})
}

0 comments on commit f141849

Please sign in to comment.