Skip to content

Commit

Permalink
Merge pull request #48 from soul9/peerstore
Browse files Browse the repository at this point in the history
Peerstore node maintenance implementation
  • Loading branch information
nictuku committed Jan 13, 2016
2 parents f9d42fe + b94748a commit 9d85e54
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 69 deletions.
124 changes: 92 additions & 32 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,23 +265,38 @@ func (d *DHT) AddNode(addr string) {
// Asks for more peers for a torrent.
func (d *DHT) getPeers(infoHash InfoHash) {
closest := d.routingTable.lookupFiltered(infoHash)
for _, r := range closest {
// *remoteNode is nil if it got filtered.
if r != nil {
d.getPeersFrom(r, infoHash)
if len(closest) == 0 {
for _, s := range strings.Split(d.config.DHTRouters, ",") {
if s != "" {
r, e := d.routingTable.getOrCreateNode("", s, d.config.UDPProto)
if e == nil {
d.getPeersFrom(r, infoHash)
}
}
}
}
for _, r := range closest {
d.getPeersFrom(r, infoHash)
}
}

// Find a DHT node.
func (d *DHT) findNode(id string) {
ih := InfoHash(id)
closest := d.routingTable.lookupFiltered(ih)
for _, r := range closest {
if r != nil {
d.findNodeFrom(r, id)
if len(closest) == 0 {
for _, s := range strings.Split(d.config.DHTRouters, ",") {
if s != "" {
r, e := d.routingTable.getOrCreateNode("", s, d.config.UDPProto)
if e == nil {
d.findNodeFrom(r, id)
}
}
}
}
for _, r := range closest {
d.findNodeFrom(r, id)
}
}

// Start launches the dht node. It starts a listener
Expand Down Expand Up @@ -326,6 +341,22 @@ func (d *DHT) initSocket() (err error) {
return nil
}


func (d *DHT) bootstrap() {
// Bootstrap the network (only if there are configured dht routers).
for _, s := range strings.Split(d.config.DHTRouters, ",") {
if s != "" {
d.ping(s)
r, e := d.routingTable.getOrCreateNode("", s, d.config.UDPProto)
if e == nil {
d.findNodeFrom(r, d.nodeId)
}
}
}
d.findNode(d.nodeId)
d.getMorePeers(nil)
}

// loop is the main working section of dht.
// It bootstraps a routing table, if necessary,
// and listens for incoming DHT requests until d.Stop()
Expand All @@ -343,12 +374,7 @@ func (d *DHT) loop() {
socketChan := make(chan packetType)
go readFromSocket(d.conn, socketChan, bytesArena, d.stop)

// Bootstrap the network (only if there are configured dht routers).
if d.config.DHTRouters != "" {
for _, s := range strings.Split(d.config.DHTRouters, ",") {
d.ping(s)
}
}
d.bootstrap()

cleanupTicker := time.Tick(d.config.CleanupPeriod)
secretRotateTicker := time.Tick(secretRotatePeriod)
Expand Down Expand Up @@ -405,9 +431,7 @@ func (d *DHT) loop() {
d.peerStore.addLocalDownload(ih)
}

if d.peerStore.count(ih) < d.config.NumTargetPeers {
d.getPeers(ih)
}
d.getPeers(ih) // I might have enough peers in the peerstore, but no seeds
}

case req := <-d.nodesRequest:
Expand Down Expand Up @@ -446,8 +470,11 @@ func (d *DHT) loop() {
tokenBucket += d.config.RateLimit / 10
}
case <-cleanupTicker:
needPing := d.routingTable.cleanup(d.config.CleanupPeriod)
needPing := d.routingTable.cleanup(d.config.CleanupPeriod, d.peerStore)
go pingSlowly(d.pingRequest, needPing, d.config.CleanupPeriod, d.stop)
if d.needMoreNodes() {
d.bootstrap()
}
case node := <-d.pingRequest:
d.pingNode(node)
case <-secretRotateTicker:
Expand All @@ -469,6 +496,22 @@ func (d *DHT) needMoreNodes() bool {
return n < minNodes || n*2 < d.config.MaxNodes
}

func (d *DHT) needMorePeers(ih InfoHash) bool {
return d.peerStore.alive(ih) < d.config.NumTargetPeers
}

func (d *DHT) getMorePeers(r *remoteNode) {
for ih := range d.peerStore.localActiveDownloads {
if d.needMorePeers(ih) {
if r == nil {
d.getPeers(ih)
} else {
d.getPeersFrom(r, ih)
}
}
}
}

func (d *DHT) helloFromPeer(addr string) {
// We've got a new node id. We need to:
// - see if we know it already, skip accordingly.
Expand Down Expand Up @@ -547,7 +590,7 @@ func (d *DHT) processPacket(p packetType) {
}
node.lastResponseTime = time.Now()
node.pastQueries[r.T] = query
d.routingTable.neighborhoodUpkeep(node, d.config.UDPProto)
d.routingTable.neighborhoodUpkeep(node, d.config.UDPProto, d.peerStore)

// If this is the first host added to the routing table, attempt a
// recursive lookup of our own address, to build our neighborhood ASAP.
Expand Down Expand Up @@ -630,10 +673,17 @@ func (d *DHT) pingNode(r *remoteNode) {
}

func (d *DHT) getPeersFrom(r *remoteNode, ih InfoHash) {
if r == nil {
return
}
totalSentGetPeers.Add(1)
ty := "get_peers"
transId := r.newQuery(ty)
r.pendingQueries[transId].ih = ih
if _, ok := r.pendingQueries[transId]; ok {
r.pendingQueries[transId].ih = ih
} else {
r.pendingQueries[transId] = &queryType{ih: ih}
}
queryArguments := map[string]interface{}{
"id": d.nodeId,
"info_hash": ih,
Expand All @@ -643,16 +693,24 @@ func (d *DHT) getPeersFrom(r *remoteNode, ih InfoHash) {
x := hashDistance(InfoHash(r.id), ih)
log.V(3).Infof("DHT sending get_peers. nodeID: %x@%v, InfoHash: %x , distance: %x", r.id, r.address, ih, x)
}
r.lastSearchTime = time.Now()
sendMsg(d.conn, r.address, query)
}

func (d *DHT) findNodeFrom(r *remoteNode, id string) {
if r == nil {
return
}
totalSentFindNode.Add(1)
ty := "find_node"
transId := r.newQuery(ty)
ih := InfoHash(id)
log.V(3).Infof("findNodeFrom adding pendingQueries transId=%v ih=%x", transId, ih)
r.pendingQueries[transId].ih = ih
if _, ok := r.pendingQueries[transId]; ok {
r.pendingQueries[transId].ih = ih
} else {
r.pendingQueries[transId] = &queryType{ih: ih}
}
queryArguments := map[string]interface{}{
"id": d.nodeId,
"target": id,
Expand All @@ -662,6 +720,7 @@ func (d *DHT) findNodeFrom(r *remoteNode, id string) {
x := hashDistance(InfoHash(r.id), ih)
log.V(3).Infof("DHT sending find_node. nodeID: %x@%v, target ID: %x , distance: %x", r.id, r.address, id, x)
}
r.lastSearchTime = time.Now()
sendMsg(d.conn, r.address, query)
}

Expand Down Expand Up @@ -770,7 +829,7 @@ func (d *DHT) nodesForInfoHash(ih InfoHash) string {
binaryHost := r.id + nettools.DottedPortToBinary(r.address.String())
if binaryHost == "" {
log.V(3).Infof("killing node with bogus address %v", r.address.String())
d.routingTable.kill(r)
d.routingTable.kill(r, d.peerStore)
} else {
n = append(n, binaryHost)
}
Expand Down Expand Up @@ -804,13 +863,16 @@ func (d *DHT) replyFindNode(addr net.UDPAddr, r responseType) {
R: r0,
}

// XXX we currently can't give out the peer contact. Probably requires
// processing announce_peer. XXX If there was a total match, that guy
// is the last.
neighbors := d.routingTable.lookup(node)
neighbors := d.routingTable.lookupFiltered(node)
if len(neighbors) < kNodes {
neighbors = append(neighbors, d.routingTable.lookup(node)...)
}
n := make([]string, 0, kNodes)
for _, r := range neighbors {
n = append(n, r.id+r.addressBinaryFormat)
n = append(n, r.id + r.addressBinaryFormat)
if len(n) == kNodes {
break
}
}
log.V(3).Infof("replyFindNode: Nodes only. Giving %d", len(n))
reply.R["nodes"] = strings.Join(n, "")
Expand Down Expand Up @@ -869,9 +931,6 @@ func (d *DHT) processGetPeerResults(node *remoteNode, resp responseType) {
log.V(5).Infof("DHT got reference of self for get_peers, id %x", id)
continue
}
if d.peerStore.count(query.ih) >= d.config.NumTargetPeers {
return
}

// If it's in our routing table already, ignore it.
_, addr, existed, err := d.routingTable.hostPortToNode(address, d.config.UDPProto)
Expand Down Expand Up @@ -901,8 +960,7 @@ func (d *DHT) processGetPeerResults(node *remoteNode, resp responseType) {
log.Infof("DHT: Got new node reference: %x@%v from %x@%v. Distance: %x.",
id, address, node.id, node.address, x)
}
_, err := d.routingTable.getOrCreateNode(id, addr, d.config.UDPProto)
if err == nil && d.peerStore.count(query.ih) < d.config.NumTargetPeers {
if _, err := d.routingTable.getOrCreateNode(id, addr, d.config.UDPProto); err == nil && d.needMorePeers(query.ih) {
// Re-add this request to the queue. This would in theory
// batch similar requests, because new nodes are already
// available in the routing table and will be used at the
Expand Down Expand Up @@ -977,7 +1035,8 @@ func (d *DHT) processFindNodeResults(node *remoteNode, resp responseType) {
// Includes the node in the routing table and ignores errors.
//
// Only continue the search if we really have to.
if _, err := d.routingTable.getOrCreateNode(id, addr, d.config.UDPProto); err != nil {
r, err := d.routingTable.getOrCreateNode(id, addr, d.config.UDPProto)
if err != nil {
log.Warningf("processFindNodeResults calling getOrCreateNode: %v. Id=%x, Address=%q", err, id, addr)
continue
}
Expand All @@ -991,6 +1050,7 @@ func (d *DHT) processFindNodeResults(node *remoteNode, resp responseType) {
// information.
}
}
d.getMorePeers(r)
}
}
}
Expand Down
12 changes: 5 additions & 7 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,6 @@ import (
"github.com/nictuku/nettools"
)

func init() {
// TestDHTLocal requires contacting the same nodes multiple times, so
// shorten the retry period to make tests run faster.
searchRetryPeriod = time.Second
}

// ExampleDHT is a simple example that searches for a particular infohash and
// exits when it finds any peers. A stand-alone version can be found in the
// examples/ directory.
Expand Down Expand Up @@ -46,6 +40,8 @@ func ExampleDHT() {
tick := time.Tick(time.Second)

var infoHashPeers map[InfoHash][]string
timer := time.NewTimer(30 * time.Second)
defer timer.Stop()
M:
for {
select {
Expand All @@ -55,7 +51,7 @@ M:
d.PeersRequest(string(infoHash), false)
case infoHashPeers = <-d.PeersRequestResults:
break M
case <-time.After(30 * time.Second):
case <-timer.C:
fmt.Printf("Could not find new peers: timed out")
return
}
Expand Down Expand Up @@ -125,6 +121,7 @@ func TestDHTLocal(t *testing.T) {
fmt.Println("Skipping TestDHTLocal")
return
}
searchRetryPeriod = time.Second
infoHash, err := DecodeInfoHash("d1c5676ae7ac98e8b19f63565905105e3c4c37a2")
if err != nil {
t.Fatalf(err.Error())
Expand Down Expand Up @@ -165,6 +162,7 @@ func TestDHTLocal(t *testing.T) {
n1.Stop()
n2.Stop()
n3.Stop()
searchRetryPeriod = time.Second * 15
}

// Requires Internet access and can be flaky if the server or the internet is
Expand Down
3 changes: 3 additions & 0 deletions krpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ func (r *remoteNode) wasContactedRecently(ih InfoHash) bool {
return true
}
}
if !r.lastSearchTime.IsZero() && time.Since(r.lastSearchTime) > searchRetryPeriod {
return false
}
for _, q := range r.pastQueries {
if q.ih == ih {
return true
Expand Down
6 changes: 3 additions & 3 deletions neighborhood_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@ func TestUpkeep(t *testing.T) {
// there should be no sign of them later on.
n := randNodeId()
n[0] = byte(0x3d) // Ensure long distance.
r.neighborhoodUpkeep(genremoteNode(string(n)), "udp")
r.neighborhoodUpkeep(genremoteNode(string(n)), "udp", newPeerStore(0, 0))
}

// Current state: 8 neighbors with low proximity.

// Adds 7 neighbors from the static table. They should replace the
// random ones, except for one.
for _, v := range table[1:8] {
r.neighborhoodUpkeep(genremoteNode(v.rid), "udp")
r.neighborhoodUpkeep(genremoteNode(v.rid), "udp", newPeerStore(0, 0))
}

// Current state: 7 close neighbors, one distant dude.
Expand All @@ -80,7 +80,7 @@ func TestUpkeep(t *testing.T) {
if r.boundaryNode == nil {
t.Fatalf("tried to kill nil boundary node")
}
r.kill(r.boundaryNode)
r.kill(r.boundaryNode, newPeerStore(0, 0))

// The resulting boundary neighbor should now be one from the static
// table, with high proximity.
Expand Down
Loading

0 comments on commit 9d85e54

Please sign in to comment.