From 72782a9b0219784fb54b18a1f7451059c3a7845e Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 12 Mar 2019 01:20:39 -0700 Subject: [PATCH] add finish "slop" We try to "finish" with `KValue + slop` peers but only wait for KValue. This means we don't have to wait for timeouts. --- query.go | 28 +++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/query.go b/query.go index 61f101761..2ae7946a5 100644 --- a/query.go +++ b/query.go @@ -22,6 +22,10 @@ import ( var maxQueryConcurrency = AlphaValue +// We "finish" queries with KValue + finishSlop peers but only wait for KValue +// peers. This helps us account for unreachable peers. +var finishSlop = 4 + type dhtQuery struct { dht *IpfsDHT key string // the key we're querying for @@ -192,9 +196,6 @@ func (r *dhtQueryRecurseResult) Finish(ctx context.Context) ([]peer.ID, error) { } func (r *dhtQueryRecurseResult) FinishWith(ctx context.Context, fn func(context.Context, peer.ID) error) ([]peer.ID, error) { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - // Get a sorted list of peers to query. failed := make(map[peer.ID]bool, len(r.failed)) for _, p := range r.failed { @@ -218,9 +219,10 @@ func (r *dhtQueryRecurseResult) FinishWith(ctx context.Context, fn func(context. closest = kb.SortClosestPeers(closest, kb.ConvertKey(r.query.key)) // Query them. + sloppyK := KValue + finishSlop bucket := make([]peer.ID, 0, KValue) workQ := make(chan peer.ID) - resultQ := make(chan peer.ID, KValue) + resultQ := make(chan peer.ID, sloppyK) newQuery := fn != nil if !newQuery { @@ -231,8 +233,13 @@ func (r *dhtQueryRecurseResult) FinishWith(ctx context.Context, fn func(context. } var wg sync.WaitGroup - wg.Add(KValue) - for i := 0; i < KValue; i++ { + defer wg.Wait() + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + wg.Add(sloppyK) + for i := 0; i < sloppyK; i++ { go func() { defer wg.Done() for p := range workQ { @@ -243,6 +250,7 @@ func (r *dhtQueryRecurseResult) FinishWith(ctx context.Context, fn func(context. } }() } + go func() { wg.Wait() close(resultQ) @@ -269,11 +277,17 @@ func (r *dhtQueryRecurseResult) FinishWith(ctx context.Context, fn func(context. bucket = append(bucket, successPeer) } } + close(workQ) - for p := range resultQ { + for len(bucket) < KValue { + p, ok := <-resultQ + if !ok { + break + } bucket = append(bucket, p) } + return bucket, ctx.Err() }