Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleanup: fullrt #1062

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 21 additions & 43 deletions fullrt/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"time"

"github.com/multiformats/go-base32"
"github.com/multiformats/go-multiaddr"
ma "github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multihash"

"github.com/libp2p/go-libp2p-routing-helpers/tracing"
Expand Down Expand Up @@ -97,7 +97,7 @@ type FullRT struct {
keyToPeerMap map[string]peer.ID

peerAddrsLk sync.RWMutex
peerAddrs map[peer.ID][]multiaddr.Multiaddr
peerAddrs map[peer.ID][]ma.Multiaddr

bootstrapPeers []*peer.AddrInfo

Expand Down Expand Up @@ -208,7 +208,7 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful
keyToPeerMap: make(map[string]peer.ID),
bucketSize: dhtcfg.BucketSize,

peerAddrs: make(map[peer.ID][]multiaddr.Multiaddr),
peerAddrs: make(map[peer.ID][]ma.Multiaddr),
bootstrapPeers: bsPeers,

triggerRefresh: make(chan struct{}),
Expand All @@ -229,11 +229,6 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful
return rt, nil
}

type crawlVal struct {
addrs []multiaddr.Multiaddr
key kadkey.Key
}

func (dht *FullRT) runSubscriber() {
defer dht.wg.Done()
ms, ok := dht.messageSender.(dht_pb.MessageSenderWithDisconnect)
Expand Down Expand Up @@ -310,7 +305,7 @@ func (dht *FullRT) runCrawler(ctx context.Context) {
defer dht.wg.Done()
t := time.NewTicker(dht.crawlerInterval)

foundPeers := make(map[peer.ID]*crawlVal)
foundPeers := make(map[peer.ID][]ma.Multiaddr)
foundPeersLk := sync.Mutex{}

initialTrigger := make(chan struct{}, 1)
Expand All @@ -334,36 +329,20 @@ func (dht *FullRT) runCrawler(ctx context.Context) {
addrs = append(addrs, dht.bootstrapPeers...)
dht.peerAddrsLk.Unlock()

for k := range foundPeers {
delete(foundPeers, k)
}
clear(foundPeers)

start := time.Now()
limitErrOnce := sync.Once{}
dht.crawler.Run(ctx, addrs,
func(p peer.ID, rtPeers []*peer.AddrInfo) {
conns := dht.h.Network().ConnsToPeer(p)
var addrs []multiaddr.Multiaddr
for _, conn := range conns {
addr := conn.RemoteMultiaddr()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't want to record the current connect addresses since remote nodes could be using upnp, and may not use the same address in the future.

we want to record the listen addresses they share with us using identify, which are stored in the host's peerstore.

addrs = append(addrs, addr)
}

if len(addrs) == 0 {
logger.Debugf("no connections to %v after successful query. keeping addresses from the peerstore", p)
addrs = dht.h.Peerstore().Addrs(p)
}

keep := kaddht.PublicRoutingTableFilter(dht, p)
if !keep {
return
}

foundPeersLk.Lock()
defer foundPeersLk.Unlock()
foundPeers[p] = &crawlVal{
addrs: addrs,
}
foundPeers[p] = dht.h.Peerstore().Addrs(p)
},
func(p peer.ID, err error) {
dialErr, ok := err.(*swarm.DialError)
Expand All @@ -382,14 +361,14 @@ func (dht *FullRT) runCrawler(ctx context.Context) {
dur := time.Since(start)
logger.Infof("crawl took %v", dur)

peerAddrs := make(map[peer.ID][]multiaddr.Multiaddr)
peerAddrs := make(map[peer.ID][]ma.Multiaddr)
kPeerMap := make(map[string]peer.ID)
newRt := trie.New()
for peerID, foundCrawlVal := range foundPeers {
foundCrawlVal.key = kadkey.KbucketIDToKey(kb.ConvertPeerID(peerID))
peerAddrs[peerID] = foundCrawlVal.addrs
kPeerMap[string(foundCrawlVal.key)] = peerID
newRt.Add(foundCrawlVal.key)
for peerID, foundAddrs := range foundPeers {
kadKey := kadkey.KbucketIDToKey(kb.ConvertPeerID(peerID))
peerAddrs[peerID] = foundAddrs
kPeerMap[string(kadKey)] = peerID
newRt.Add(kadKey)
}

dht.peerAddrsLk.Lock()
Expand Down Expand Up @@ -427,12 +406,12 @@ func (dht *FullRT) CheckPeers(ctx context.Context, peers ...peer.ID) (int, int)
ctx, span := internal.StartSpan(ctx, "FullRT.CheckPeers", trace.WithAttributes(attribute.Int("NumPeers", len(peers))))
defer span.End()

var peerAddrs chan interface{}
var peerAddrs chan peer.AddrInfo
var total int
if len(peers) == 0 {
dht.peerAddrsLk.RLock()
total = len(dht.peerAddrs)
peerAddrs = make(chan interface{}, total)
peerAddrs = make(chan peer.AddrInfo, total)
for k, v := range dht.peerAddrs {
peerAddrs <- peer.AddrInfo{
ID: k,
Expand All @@ -443,7 +422,7 @@ func (dht *FullRT) CheckPeers(ctx context.Context, peers ...peer.ID) (int, int)
dht.peerAddrsLk.RUnlock()
} else {
total = len(peers)
peerAddrs = make(chan interface{}, total)
peerAddrs = make(chan peer.AddrInfo, total)
dht.peerAddrsLk.RLock()
for _, p := range peers {
peerAddrs <- peer.AddrInfo{
Expand All @@ -457,19 +436,18 @@ func (dht *FullRT) CheckPeers(ctx context.Context, peers ...peer.ID) (int, int)

var success uint64

workers(100, func(i interface{}) {
a := i.(peer.AddrInfo)
workers(100, func(ai peer.AddrInfo) {
dialctx, dialcancel := context.WithTimeout(ctx, time.Second*3)
if err := dht.h.Connect(dialctx, a); err == nil {
if err := dht.h.Connect(dialctx, ai); err == nil {
atomic.AddUint64(&success, 1)
}
dialcancel()
}, peerAddrs)
return int(success), total
}

func workers(numWorkers int, fn func(interface{}), inputs <-chan interface{}) {
jobs := make(chan interface{})
func workers(numWorkers int, fn func(peer.AddrInfo), inputs <-chan peer.AddrInfo) {
jobs := make(chan peer.AddrInfo)
defer close(jobs)
for i := 0; i < numWorkers; i++ {
go func() {
Expand Down Expand Up @@ -1436,7 +1414,7 @@ func (dht *FullRT) FindPeer(ctx context.Context, id peer.ID) (pi peer.AddrInfo,
defer cancelquery()

addrsCh := make(chan *peer.AddrInfo, 1)
newAddrs := make([]multiaddr.Multiaddr, 0)
newAddrs := make([]ma.Multiaddr, 0)

wg := sync.WaitGroup{}
wg.Add(1)
Expand Down Expand Up @@ -1596,7 +1574,7 @@ func (dht *FullRT) FindLocal(id peer.ID) peer.AddrInfo {
return peer.AddrInfo{}
}

func (dht *FullRT) maybeAddAddrs(p peer.ID, addrs []multiaddr.Multiaddr, ttl time.Duration) {
func (dht *FullRT) maybeAddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
// Don't add addresses for self or our connected peers. We have better ones.
if p == dht.h.ID() || hasValidConnectedness(dht.h, p) {
return
Expand Down