From 2287fac9a1b17179827e1c2f73f712e2d6c43832 Mon Sep 17 00:00:00 2001 From: nisdas Date: Thu, 26 Dec 2024 20:53:49 +0800 Subject: [PATCH] Add Fix to Prune Mesh Periodically --- gossipsub.go | 2 +- gossipsub_test.go | 47 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/gossipsub.go b/gossipsub.go index 222c71ae..849f2fa1 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -1605,7 +1605,7 @@ func (gs *GossipSubRouter) heartbeat() { } // do we have too many peers? - if len(peers) > gs.params.Dhi { + if len(peers) >= gs.params.Dhi { plst := peerMapToList(peers) // sort by score (but shuffle first for the case we don't use the score) diff --git a/gossipsub_test.go b/gossipsub_test.go index d515654f..93edeeca 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -3176,6 +3176,53 @@ func TestGossipsubIdontwantClear(t *testing.T) { <-ctx.Done() } +func TestGossipsubPruneMeshCorrectly(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hosts := getDefaultHosts(t, 9) + + msgID := func(pmsg *pb.Message) string { + // silly content-based test message-ID: just use the data as whole + return base64.URLEncoding.EncodeToString(pmsg.Data) + } + + params := DefaultGossipSubParams() + params.Dhi = 8 + + psubs := make([]*PubSub, 9) + for i := 0; i < 9; i++ { + psubs[i] = getGossipsub(ctx, hosts[i], + WithGossipSubParams(params), + WithMessageIdFn(msgID)) + } + + topic := "foobar" + for _, ps := range psubs { + _, err := ps.Subscribe(topic) + if err != nil { + t.Fatal(err) + } + } + + // Connect first peer with the rest of the 8 other + // peers. + for i := 1; i < 9; i++ { + connect(t, hosts[0], hosts[i]) + } + + // Wait for 2 heartbeats to be able to prune excess peers back down to D. + totalTimeToWait := params.HeartbeatInitialDelay + 2*params.HeartbeatInterval + time.Sleep(totalTimeToWait) + + meshPeers, ok := psubs[0].rt.(*GossipSubRouter).mesh[topic] + if !ok { + t.Fatal("mesh does not exist for topic") + } + if len(meshPeers) != params.D { + t.Fatalf("mesh does not have the correct number of peers. Wanted %d but got %d", params.D, len(meshPeers)) + } +} + func BenchmarkAllocDoDropRPC(b *testing.B) { gs := GossipSubRouter{tracer: &pubsubTracer{}}