From 0a542ccf3abf478ddb0527aba45c3b862db09d46 Mon Sep 17 00:00:00 2001 From: Sadiqur Rahman Date: Tue, 1 Nov 2022 20:20:32 +0600 Subject: [PATCH] fix : request selector (#233) * fixed code * removed peer string * fixed lint * fixed lint * merged update request with existing for loop --- crawler/crawl/crawl.go | 10 +++++++++- models/peer.go | 2 +- store/peerstore/mongo/mongo.go | 19 +++---------------- store/peerstore/store.go | 1 - 4 files changed, 13 insertions(+), 19 deletions(-) diff --git a/crawler/crawl/crawl.go b/crawler/crawl/crawl.go index 4c16c2b..85c6daf 100644 --- a/crawler/crawl/crawl.go +++ b/crawler/crawl/crawl.go @@ -132,6 +132,15 @@ func (c *crawler) selectPendingAndExecute(ctx context.Context) { return } for _, req := range reqs { + // update the pr, so it won't be picked again in 24 hours + // We have to update the LastUpdated field here and cannot rety on the worker to update it + // That is because the same request will be picked again when it is in worker. + req.LastUpdated = time.Now().Unix() + err = c.peerStore.Update(ctx, req) + if err != nil { + log.Error("error updating request", log.Ctx{"err": err}) + continue + } select { case <-ctx.Done(): log.Error("update selector stopped", log.Ctx{"err": ctx.Err()}) @@ -183,7 +192,6 @@ func (c *crawler) updatePeerInfo(ctx context.Context, peer *models.Peer) { } return } - peer.LastUpdated = time.Now().Unix() err := c.peerStore.Update(ctx, peer) if err != nil { log.Error("failed on updating peerstore", log.Ctx{"err": err}) diff --git a/models/peer.go b/models/peer.go index 48636d1..9cb04e2 100644 --- a/models/peer.go +++ b/models/peer.go @@ -50,7 +50,7 @@ func init() { LodestarClient: {"lodestar", "js-libp2p"}, NimbusClient: {"nimbus"}, TrinityClient: {"trinity"}, - GrandineClient: {"grandine", "rust"}, + GrandineClient: {"grandine", "rust-libp2p"}, } } diff --git a/store/peerstore/mongo/mongo.go b/store/peerstore/mongo/mongo.go index 32fcdb7..53987e1 100644 --- a/store/peerstore/mongo/mongo.go +++ b/store/peerstore/mongo/mongo.go @@ -8,13 +8,12 @@ import ( "context" "encoding/hex" "errors" - "eth2-crawler/graph/model" - "eth2-crawler/store/peerstore" "fmt" "time" + "eth2-crawler/graph/model" "eth2-crawler/models" - + "eth2-crawler/store/peerstore" "eth2-crawler/utils/config" "github.com/libp2p/go-libp2p-core/peer" @@ -30,18 +29,6 @@ type mongoStore struct { timeout time.Duration } -func (s *mongoStore) Upsert(ctx context.Context, peer *models.Peer) error { - _, err := s.View(ctx, peer.ID) - if err != nil { - if errors.Is(err, peerstore.ErrPeerNotFound) { - return s.Create(ctx, peer) - } - return err - } - - return s.Update(ctx, peer) -} - func (s *mongoStore) Create(ctx context.Context, peer *models.Peer) error { _, err := s.View(ctx, peer.ID) if err != nil { @@ -58,7 +45,7 @@ func (s *mongoStore) Update(ctx context.Context, peer *models.Peer) error { filter := bson.D{ {Key: "_id", Value: peer.ID}, } - _, err := s.coll.UpdateOne(ctx, filter, bson.D{{Key: "$set", Value: peer}}) + _, err := s.coll.ReplaceOne(ctx, filter, peer) if err != nil { return err } diff --git a/store/peerstore/store.go b/store/peerstore/store.go index c784123..ac7aab9 100644 --- a/store/peerstore/store.go +++ b/store/peerstore/store.go @@ -18,7 +18,6 @@ import ( type Provider interface { Create(ctx context.Context, peer *models.Peer) error Update(ctx context.Context, peer *models.Peer) error - Upsert(ctx context.Context, peer *models.Peer) error View(ctx context.Context, peerID peer.ID) (*models.Peer, error) Delete(ctx context.Context, peer *models.Peer) error // Todo: accept filter and find options to get limited information