Skip to content

Commit

Permalink
perf: remove unncessary goroutine pileup of skippeer workers (#4993)
Browse files Browse the repository at this point in the history
  • Loading branch information
istae authored Feb 6, 2025
1 parent 0752977 commit 4940ef8
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 11 deletions.
4 changes: 2 additions & 2 deletions pkg/pushsync/pushsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func New(
metrics: newMetrics(),
tracer: tracer,
signer: signer,
errSkip: skippeers.NewList(),
errSkip: skippeers.NewList(time.Minute),
warmupPeriod: time.Now().Add(warmupTime),
shallowReceiptTolerance: shallowReceiptTolerance,
}
Expand Down Expand Up @@ -387,7 +387,7 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo
return nil, fmt.Errorf("pushsync: storage radius: %w", err)
}

skip := skippeers.NewList()
skip := skippeers.NewList(0)
defer skip.Close()

for sentErrorsLeft > 0 {
Expand Down
4 changes: 2 additions & 2 deletions pkg/retrieval/retrieval.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func New(
metrics: newMetrics(),
tracer: tracer,
caching: forwarderCaching,
errSkip: skippeers.NewList(),
errSkip: skippeers.NewList(time.Minute),
}
}

Expand Down Expand Up @@ -158,7 +158,7 @@ func (s *Service) RetrieveChunk(ctx context.Context, chunkAddr, sourcePeerAddr s

v, _, err := s.singleflight.Do(ctx, flightRoute, func(ctx context.Context) (swarm.Chunk, error) {

skip := skippeers.NewList()
skip := skippeers.NewList(0)
defer skip.Close()

var preemptiveTicker <-chan time.Time
Expand Down
12 changes: 7 additions & 5 deletions pkg/skippeers/skippeers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,26 @@ type List struct {
wg sync.WaitGroup
}

func NewList() *List {
func NewList(workerWakeUpDur time.Duration) *List {
l := &List{
skip: make(map[string]map[string]int64),
durC: make(chan time.Duration),
quit: make(chan struct{}),
}

l.wg.Add(1)
go l.worker()
if workerWakeUpDur > 0 {
l.wg.Add(1)
go l.worker(workerWakeUpDur)
}

return l
}

func (l *List) worker() {
func (l *List) worker(d time.Duration) {

defer l.wg.Done()

ticker := time.NewTicker(time.Minute)
ticker := time.NewTicker(d)
defer ticker.Stop()

for {
Expand Down
41 changes: 39 additions & 2 deletions pkg/skippeers/skippeers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
func TestPruneExpiresAfter(t *testing.T) {
t.Parallel()

skipList := skippeers.NewList()
skipList := skippeers.NewList(0)
t.Cleanup(func() { skipList.Close() })

chunk := swarm.RandAddress(t)
Expand Down Expand Up @@ -58,7 +58,7 @@ func TestPruneExpiresAfter(t *testing.T) {
func TestPeerWait(t *testing.T) {
t.Parallel()

skipList := skippeers.NewList()
skipList := skippeers.NewList(0)
t.Cleanup(func() { skipList.Close() })

chunk1 := swarm.RandAddress(t)
Expand Down Expand Up @@ -105,3 +105,40 @@ func TestPeerWait(t *testing.T) {
t.Fatal("entry should be pruned")
}
}

func TestPruneWorker(t *testing.T) {
t.Parallel()

chunk1 := swarm.RandAddress(t)
chunk2 := swarm.RandAddress(t)
peer1 := swarm.RandAddress(t)
peer2 := swarm.RandAddress(t)

skipList := skippeers.NewList(time.Millisecond * 500)
t.Cleanup(func() { skipList.Close() })

skipList.Add(chunk1, peer1, time.Second)
if !swarm.ContainsAddress(skipList.ChunkPeers(chunk1), peer1) {
t.Fatal("peer should be in skiplist")
}

skipList.Add(chunk2, peer1, time.Second)
if !swarm.ContainsAddress(skipList.ChunkPeers(chunk2), peer1) {
t.Fatal("peer should be in skiplist")
}

skipList.Add(chunk2, peer2, time.Minute)
if !swarm.ContainsAddress(skipList.ChunkPeers(chunk2), peer1) {
t.Fatal("peer should be in skiplist")
}

time.Sleep(time.Millisecond * 1600)

if len(skipList.ChunkPeers(chunk1)) != 0 {
t.Fatal("entry should be pruned")
}

if len(skipList.ChunkPeers(chunk2)) != 1 || !swarm.ContainsAddress(skipList.ChunkPeers(chunk2), peer2) {
t.Fatal("peer2 should be in skiplist")
}
}

0 comments on commit 4940ef8

Please sign in to comment.