diff --git a/dht.go b/dht.go index 653e453..cc651d8 100644 --- a/dht.go +++ b/dht.go @@ -265,23 +265,38 @@ func (d *DHT) AddNode(addr string) { // Asks for more peers for a torrent. func (d *DHT) getPeers(infoHash InfoHash) { closest := d.routingTable.lookupFiltered(infoHash) - for _, r := range closest { - // *remoteNode is nil if it got filtered. - if r != nil { - d.getPeersFrom(r, infoHash) + if len(closest) == 0 { + for _, s := range strings.Split(d.config.DHTRouters, ",") { + if s != "" { + r, e := d.routingTable.getOrCreateNode("", s, d.config.UDPProto) + if e == nil { + d.getPeersFrom(r, infoHash) + } + } } } + for _, r := range closest { + d.getPeersFrom(r, infoHash) + } } // Find a DHT node. func (d *DHT) findNode(id string) { ih := InfoHash(id) closest := d.routingTable.lookupFiltered(ih) - for _, r := range closest { - if r != nil { - d.findNodeFrom(r, id) + if len(closest) == 0 { + for _, s := range strings.Split(d.config.DHTRouters, ",") { + if s != "" { + r, e := d.routingTable.getOrCreateNode("", s, d.config.UDPProto) + if e == nil { + d.findNodeFrom(r, id) + } + } } } + for _, r := range closest { + d.findNodeFrom(r, id) + } } // Start launches the dht node. It starts a listener @@ -326,6 +341,22 @@ func (d *DHT) initSocket() (err error) { return nil } + +func (d *DHT) bootstrap() { + // Bootstrap the network (only if there are configured dht routers). + for _, s := range strings.Split(d.config.DHTRouters, ",") { + if s != "" { + d.ping(s) + r, e := d.routingTable.getOrCreateNode("", s, d.config.UDPProto) + if e == nil { + d.findNodeFrom(r, d.nodeId) + } + } + } + d.findNode(d.nodeId) + d.getMorePeers(nil) +} + // loop is the main working section of dht. // It bootstraps a routing table, if necessary, // and listens for incoming DHT requests until d.Stop() @@ -343,12 +374,7 @@ func (d *DHT) loop() { socketChan := make(chan packetType) go readFromSocket(d.conn, socketChan, bytesArena, d.stop) - // Bootstrap the network (only if there are configured dht routers). - if d.config.DHTRouters != "" { - for _, s := range strings.Split(d.config.DHTRouters, ",") { - d.ping(s) - } - } + d.bootstrap() cleanupTicker := time.Tick(d.config.CleanupPeriod) secretRotateTicker := time.Tick(secretRotatePeriod) @@ -405,9 +431,7 @@ func (d *DHT) loop() { d.peerStore.addLocalDownload(ih) } - if d.peerStore.count(ih) < d.config.NumTargetPeers { - d.getPeers(ih) - } + d.getPeers(ih) // I might have enough peers in the peerstore, but no seeds } case req := <-d.nodesRequest: @@ -446,8 +470,11 @@ func (d *DHT) loop() { tokenBucket += d.config.RateLimit / 10 } case <-cleanupTicker: - needPing := d.routingTable.cleanup(d.config.CleanupPeriod) + needPing := d.routingTable.cleanup(d.config.CleanupPeriod, d.peerStore) go pingSlowly(d.pingRequest, needPing, d.config.CleanupPeriod, d.stop) + if d.needMoreNodes() { + d.bootstrap() + } case node := <-d.pingRequest: d.pingNode(node) case <-secretRotateTicker: @@ -469,6 +496,22 @@ func (d *DHT) needMoreNodes() bool { return n < minNodes || n*2 < d.config.MaxNodes } +func (d *DHT) needMorePeers(ih InfoHash) bool { + return d.peerStore.alive(ih) < d.config.NumTargetPeers +} + +func (d *DHT) getMorePeers(r *remoteNode) { + for ih := range d.peerStore.localActiveDownloads { + if d.needMorePeers(ih) { + if r == nil { + d.getPeers(ih) + } else { + d.getPeersFrom(r, ih) + } + } + } +} + func (d *DHT) helloFromPeer(addr string) { // We've got a new node id. We need to: // - see if we know it already, skip accordingly. @@ -547,7 +590,7 @@ func (d *DHT) processPacket(p packetType) { } node.lastResponseTime = time.Now() node.pastQueries[r.T] = query - d.routingTable.neighborhoodUpkeep(node, d.config.UDPProto) + d.routingTable.neighborhoodUpkeep(node, d.config.UDPProto, d.peerStore) // If this is the first host added to the routing table, attempt a // recursive lookup of our own address, to build our neighborhood ASAP. @@ -630,10 +673,17 @@ func (d *DHT) pingNode(r *remoteNode) { } func (d *DHT) getPeersFrom(r *remoteNode, ih InfoHash) { + if r == nil { + return + } totalSentGetPeers.Add(1) ty := "get_peers" transId := r.newQuery(ty) - r.pendingQueries[transId].ih = ih + if _, ok := r.pendingQueries[transId]; ok { + r.pendingQueries[transId].ih = ih + } else { + r.pendingQueries[transId] = &queryType{ih: ih} + } queryArguments := map[string]interface{}{ "id": d.nodeId, "info_hash": ih, @@ -643,16 +693,24 @@ func (d *DHT) getPeersFrom(r *remoteNode, ih InfoHash) { x := hashDistance(InfoHash(r.id), ih) log.V(3).Infof("DHT sending get_peers. nodeID: %x@%v, InfoHash: %x , distance: %x", r.id, r.address, ih, x) } + r.lastSearchTime = time.Now() sendMsg(d.conn, r.address, query) } func (d *DHT) findNodeFrom(r *remoteNode, id string) { + if r == nil { + return + } totalSentFindNode.Add(1) ty := "find_node" transId := r.newQuery(ty) ih := InfoHash(id) log.V(3).Infof("findNodeFrom adding pendingQueries transId=%v ih=%x", transId, ih) - r.pendingQueries[transId].ih = ih + if _, ok := r.pendingQueries[transId]; ok { + r.pendingQueries[transId].ih = ih + } else { + r.pendingQueries[transId] = &queryType{ih: ih} + } queryArguments := map[string]interface{}{ "id": d.nodeId, "target": id, @@ -662,6 +720,7 @@ func (d *DHT) findNodeFrom(r *remoteNode, id string) { x := hashDistance(InfoHash(r.id), ih) log.V(3).Infof("DHT sending find_node. nodeID: %x@%v, target ID: %x , distance: %x", r.id, r.address, id, x) } + r.lastSearchTime = time.Now() sendMsg(d.conn, r.address, query) } @@ -770,7 +829,7 @@ func (d *DHT) nodesForInfoHash(ih InfoHash) string { binaryHost := r.id + nettools.DottedPortToBinary(r.address.String()) if binaryHost == "" { log.V(3).Infof("killing node with bogus address %v", r.address.String()) - d.routingTable.kill(r) + d.routingTable.kill(r, d.peerStore) } else { n = append(n, binaryHost) } @@ -804,13 +863,16 @@ func (d *DHT) replyFindNode(addr net.UDPAddr, r responseType) { R: r0, } - // XXX we currently can't give out the peer contact. Probably requires - // processing announce_peer. XXX If there was a total match, that guy - // is the last. - neighbors := d.routingTable.lookup(node) + neighbors := d.routingTable.lookupFiltered(node) + if len(neighbors) < kNodes { + neighbors = append(neighbors, d.routingTable.lookup(node)...) + } n := make([]string, 0, kNodes) for _, r := range neighbors { - n = append(n, r.id+r.addressBinaryFormat) + n = append(n, r.id + r.addressBinaryFormat) + if len(n) == kNodes { + break + } } log.V(3).Infof("replyFindNode: Nodes only. Giving %d", len(n)) reply.R["nodes"] = strings.Join(n, "") @@ -869,9 +931,6 @@ func (d *DHT) processGetPeerResults(node *remoteNode, resp responseType) { log.V(5).Infof("DHT got reference of self for get_peers, id %x", id) continue } - if d.peerStore.count(query.ih) >= d.config.NumTargetPeers { - return - } // If it's in our routing table already, ignore it. _, addr, existed, err := d.routingTable.hostPortToNode(address, d.config.UDPProto) @@ -901,8 +960,7 @@ func (d *DHT) processGetPeerResults(node *remoteNode, resp responseType) { log.Infof("DHT: Got new node reference: %x@%v from %x@%v. Distance: %x.", id, address, node.id, node.address, x) } - _, err := d.routingTable.getOrCreateNode(id, addr, d.config.UDPProto) - if err == nil && d.peerStore.count(query.ih) < d.config.NumTargetPeers { + if _, err := d.routingTable.getOrCreateNode(id, addr, d.config.UDPProto); err == nil && d.needMorePeers(query.ih) { // Re-add this request to the queue. This would in theory // batch similar requests, because new nodes are already // available in the routing table and will be used at the @@ -977,7 +1035,8 @@ func (d *DHT) processFindNodeResults(node *remoteNode, resp responseType) { // Includes the node in the routing table and ignores errors. // // Only continue the search if we really have to. - if _, err := d.routingTable.getOrCreateNode(id, addr, d.config.UDPProto); err != nil { + r, err := d.routingTable.getOrCreateNode(id, addr, d.config.UDPProto) + if err != nil { log.Warningf("processFindNodeResults calling getOrCreateNode: %v. Id=%x, Address=%q", err, id, addr) continue } @@ -991,6 +1050,7 @@ func (d *DHT) processFindNodeResults(node *remoteNode, resp responseType) { // information. } } + d.getMorePeers(r) } } } diff --git a/dht_test.go b/dht_test.go index af00938..3993ad4 100644 --- a/dht_test.go +++ b/dht_test.go @@ -13,12 +13,6 @@ import ( "github.com/nictuku/nettools" ) -func init() { - // TestDHTLocal requires contacting the same nodes multiple times, so - // shorten the retry period to make tests run faster. - searchRetryPeriod = time.Second -} - // ExampleDHT is a simple example that searches for a particular infohash and // exits when it finds any peers. A stand-alone version can be found in the // examples/ directory. @@ -46,6 +40,8 @@ func ExampleDHT() { tick := time.Tick(time.Second) var infoHashPeers map[InfoHash][]string + timer := time.NewTimer(30 * time.Second) + defer timer.Stop() M: for { select { @@ -55,7 +51,7 @@ M: d.PeersRequest(string(infoHash), false) case infoHashPeers = <-d.PeersRequestResults: break M - case <-time.After(30 * time.Second): + case <-timer.C: fmt.Printf("Could not find new peers: timed out") return } @@ -125,6 +121,7 @@ func TestDHTLocal(t *testing.T) { fmt.Println("Skipping TestDHTLocal") return } + searchRetryPeriod = time.Second infoHash, err := DecodeInfoHash("d1c5676ae7ac98e8b19f63565905105e3c4c37a2") if err != nil { t.Fatalf(err.Error()) @@ -165,6 +162,7 @@ func TestDHTLocal(t *testing.T) { n1.Stop() n2.Stop() n3.Stop() + searchRetryPeriod = time.Second * 15 } // Requires Internet access and can be flaky if the server or the internet is diff --git a/krpc.go b/krpc.go index 5705ceb..9b25bee 100644 --- a/krpc.go +++ b/krpc.go @@ -123,6 +123,9 @@ func (r *remoteNode) wasContactedRecently(ih InfoHash) bool { return true } } + if !r.lastSearchTime.IsZero() && time.Since(r.lastSearchTime) > searchRetryPeriod { + return false + } for _, q := range r.pastQueries { if q.ih == ih { return true diff --git a/neighborhood_test.go b/neighborhood_test.go index 8653fbb..a1c2c31 100644 --- a/neighborhood_test.go +++ b/neighborhood_test.go @@ -51,7 +51,7 @@ func TestUpkeep(t *testing.T) { // there should be no sign of them later on. n := randNodeId() n[0] = byte(0x3d) // Ensure long distance. - r.neighborhoodUpkeep(genremoteNode(string(n)), "udp") + r.neighborhoodUpkeep(genremoteNode(string(n)), "udp", newPeerStore(0, 0)) } // Current state: 8 neighbors with low proximity. @@ -59,7 +59,7 @@ func TestUpkeep(t *testing.T) { // Adds 7 neighbors from the static table. They should replace the // random ones, except for one. for _, v := range table[1:8] { - r.neighborhoodUpkeep(genremoteNode(v.rid), "udp") + r.neighborhoodUpkeep(genremoteNode(v.rid), "udp", newPeerStore(0, 0)) } // Current state: 7 close neighbors, one distant dude. @@ -80,7 +80,7 @@ func TestUpkeep(t *testing.T) { if r.boundaryNode == nil { t.Fatalf("tried to kill nil boundary node") } - r.kill(r.boundaryNode) + r.kill(r.boundaryNode, newPeerStore(0, 0)) // The resulting boundary neighbor should now be one from the static // table, with high proximity. diff --git a/peer_store.go b/peer_store.go index 0251da7..e091424 100644 --- a/peer_store.go +++ b/peer_store.go @@ -1,7 +1,5 @@ package dht -// TODO: Cleanup stale peer contacts. - import ( "container/ring" @@ -24,11 +22,31 @@ func (p *peerContactsSet) next() []string { count = len(p.set) } x := make([]string, 0, count) - var next *ring.Ring - for i := 0; i < count; i++ { - next = p.ring.Next() - x = append(x, next.Value.(string)) - p.ring = next + xx := make(map[string]bool) //maps are easier to dedupe + for range p.set { + nid := p.ring.Move(1).Value.(string) + if _, ok := xx[nid]; p.set[nid] && !ok { + xx[nid] = true + } + if len(xx) >= count { + break + } + } + + if len(xx) < count { + for range p.set { + nid := p.ring.Move(1).Value.(string) + if _, ok := xx[nid]; ok { + continue + } + xx[nid] = true + if len(xx) >= count { + break + } + } + } + for id := range xx { + x = append(x, id) } return x } @@ -53,11 +71,59 @@ func (p *peerContactsSet) put(peerContact string) bool { return true } +// drop cycles throught the peerContactSet and deletes the contact if it finds it +// if the argument is empty, it first tries to drop a dead peer +func (p *peerContactsSet) drop(peerContact string) string { + if peerContact == "" { + if c := p.dropDead(); c != "" { + return c + } else { + return p.drop(p.ring.Next().Value.(string)) + } + } + for i := 0; i < p.ring.Len()+1; i++ { + if p.ring.Move(1).Value.(string) == peerContact { + dn := p.ring.Unlink(1).Value.(string) + delete(p.set, dn) + return dn + } + } + return "" +} + +// dropDead drops the first dead contact, returns the id if a contact was dropped +func (p *peerContactsSet) dropDead() string { + for i := 0; i < p.ring.Len()+1; i++ { + if !p.set[p.ring.Move(1).Value.(string)] { + dn := p.ring.Unlink(1).Value.(string) + delete(p.set, dn) + return dn + } + } + return "" +} + +func (p *peerContactsSet) kill(peerContact string) { + if ok := p.set[peerContact]; ok { + p.set[peerContact] = false + } +} + // Size is the number of contacts known for an infohash. func (p *peerContactsSet) Size() int { return len(p.set) } +func (p *peerContactsSet) Alive() int { + var ret int = 0 + for ih := range p.set { + if p.set[ih] { + ret++ + } + } + return ret +} + func newPeerStore(maxInfoHashes, maxInfoHashPeers int) *peerStore { return &peerStore{ infoHashPeers: lru.New(maxInfoHashes), @@ -92,7 +158,15 @@ func (h *peerStore) count(ih InfoHash) int { if peers == nil { return 0 } - return len(peers.set) + return peers.Size() +} + +func (h *peerStore) alive(ih InfoHash) int { + peers := h.get(ih) + if peers == nil { + return 0 + } + return peers.Alive() } // peerContacts returns a random set of 8 peers for the ih InfoHash. @@ -113,11 +187,13 @@ func (h *peerStore) addContact(ih InfoHash, peerContact string) bool { var okType bool peers, okType = p.(*peerContactsSet) if okType && peers != nil { - if len(peers.set) >= h.maxInfoHashPeers { - // Already tracking too many peers for this infohash. - // TODO: Use a circular buffer and discard - // other contacts. - return false + if peers.Size() >= h.maxInfoHashPeers { + if _, ok := peers.set[peerContact]; ok { + return false + } + if peers.drop("") == "" { + return false + } } h.infoHashPeers.Add(string(ih), peers) return peers.put(peerContact) @@ -129,6 +205,17 @@ func (h *peerStore) addContact(ih InfoHash, peerContact string) bool { return peers.put(peerContact) } +func (h *peerStore) killContact(peerContact string) { + if h == nil { + return + } + for ih := range h.localActiveDownloads { + if p := h.get(ih); p != nil { + p.kill(peerContact) + } + } +} + func (h *peerStore) addLocalDownload(ih InfoHash) { h.localActiveDownloads[ih] = true } diff --git a/routing_table.go b/routing_table.go index b703183..50039de 100644 --- a/routing_table.go +++ b/routing_table.go @@ -168,7 +168,7 @@ func (r *routingTable) getOrCreateNode(id string, hostPort string, proto string) return node, r.insert(node, proto) } -func (r *routingTable) kill(n *remoteNode) { +func (r *routingTable) kill(n *remoteNode, p *peerStore) { delete(r.addresses, n.address.String()) r.nTree.cut(InfoHash(n.id), 0) totalKilledNodes.Add(1) @@ -176,6 +176,7 @@ func (r *routingTable) kill(n *remoteNode) { if r.boundaryNode != nil && n.id == r.boundaryNode.id { r.resetNeighborhoodBoundary() } + p.killContact(nettools.BinaryToDottedPort(n.addressBinaryFormat)) } func (r *routingTable) resetNeighborhoodBoundary() { @@ -190,19 +191,19 @@ func (r *routingTable) resetNeighborhoodBoundary() { } -func (r *routingTable) cleanup(cleanupPeriod time.Duration) (needPing []*remoteNode) { +func (r *routingTable) cleanup(cleanupPeriod time.Duration, p *peerStore) (needPing []*remoteNode) { needPing = make([]*remoteNode, 0, 10) t0 := time.Now() // Needs some serious optimization. for addr, n := range r.addresses { if addr != n.address.String() { log.V(3).Infof("cleanup: node address mismatches: %v != %v. Deleting node", addr, n.address.String()) - r.kill(n) + r.kill(n, p) continue } if addr == "" { log.V(3).Infof("cleanup: found empty address for node %x. Deleting node", n.id) - r.kill(n) + r.kill(n, p) continue } if n.reachable { @@ -210,9 +211,9 @@ func (r *routingTable) cleanup(cleanupPeriod time.Duration) (needPing []*remoteN goto PING } // Tolerate 2 cleanup cycles. - if time.Since(n.lastResponseTime) > cleanupPeriod*2+(time.Minute) { + if time.Since(n.lastResponseTime) > cleanupPeriod*2+(cleanupPeriod/15) { log.V(4).Infof("DHT: Old node seen %v ago. Deleting", time.Since(n.lastResponseTime)) - r.kill(n) + r.kill(n, p) continue } if time.Since(n.lastResponseTime).Nanoseconds() < cleanupPeriod.Nanoseconds()/2 { @@ -222,10 +223,10 @@ func (r *routingTable) cleanup(cleanupPeriod time.Duration) (needPing []*remoteN } else { // Not reachable. - if len(n.pendingQueries) > 2 { + if len(n.pendingQueries) > maxNodePendingQueries { // Didn't reply to 2 consecutive queries. log.V(4).Infof("DHT: Node never replied to ping. Deleting. %v", n.address) - r.kill(n) + r.kill(n, p) continue } } @@ -243,13 +244,13 @@ func (r *routingTable) cleanup(cleanupPeriod time.Duration) (needPing []*remoteN // neighborhoodUpkeep will update the routingtable if the node n is closer than // the 8 nodes in our neighborhood, by replacing the least close one // (boundary). n.id is assumed to have length 20. -func (r *routingTable) neighborhoodUpkeep(n *remoteNode, proto string) { +func (r *routingTable) neighborhoodUpkeep(n *remoteNode, proto string, p *peerStore) { if r.boundaryNode == nil { - r.addNewNeighbor(n, false, proto) + r.addNewNeighbor(n, false, proto, p) return } if r.length() < kNodes { - r.addNewNeighbor(n, false, proto) + r.addNewNeighbor(n, false, proto, p) return } cmp := commonBits(r.nodeId, n.id) @@ -258,19 +259,19 @@ func (r *routingTable) neighborhoodUpkeep(n *remoteNode, proto string) { return } if cmp > r.proximity { - r.addNewNeighbor(n, true, proto) + r.addNewNeighbor(n, true, proto, p) return } } -func (r *routingTable) addNewNeighbor(n *remoteNode, displaceBoundary bool, proto string) { +func (r *routingTable) addNewNeighbor(n *remoteNode, displaceBoundary bool, proto string, p *peerStore) { if err := r.insert(n, proto); err != nil { log.V(3).Infof("addNewNeighbor error: %v", err) return } if displaceBoundary && r.boundaryNode != nil { // This will also take care of setting a new boundary. - r.kill(r.boundaryNode) + r.kill(r.boundaryNode, p) } else { r.resetNeighborhoodBoundary() }