Skip to content

Commit

Permalink
Update to latest version of gocrawlhq (#156)
Browse files Browse the repository at this point in the history
* Initial HQv3 update support

* Update gocrawlhq

* fix: change error with new URLs to debug for now.

 per corentin

* update crawlhq.

* update gocrawlhq

* comment out useless error

- per corentin "it's fine"

* update gocrawlhq
  • Loading branch information
NGTmeaty authored Nov 6, 2024
1 parent fc0b683 commit 6d48952
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 21 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/gosuri/uilive v0.0.4
github.com/gosuri/uitable v0.0.4
github.com/grafov/m3u8 v0.12.0
github.com/internetarchive/gocrawlhq v1.2.14
github.com/internetarchive/gocrawlhq v1.2.19
github.com/paulbellamy/ratecounter v0.2.0
github.com/philippgille/gokv/leveldb v0.7.0
github.com/prometheus/client_golang v1.20.4
Expand Down Expand Up @@ -87,7 +87,7 @@ require (
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect
golang.org/x/mod v0.21.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/text v0.18.0 // indirect
golang.org/x/tools v0.25.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
Expand Down
12 changes: 12 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ github.com/internetarchive/gocrawlhq v1.2.13 h1:ALfUrWR7nRez5gWhHRJ7ZklIpGMjERGM
github.com/internetarchive/gocrawlhq v1.2.13/go.mod h1:JQIKgebFmpbxmEalNRjID3RwCxHkslt3PHAnum82KtM=
github.com/internetarchive/gocrawlhq v1.2.14 h1:g3MPMonpA6mTkCpjBvW3paeBHiH+gGgwSvkyX/lxu7s=
github.com/internetarchive/gocrawlhq v1.2.14/go.mod h1:IOHVfWsptADzh+r2J+UnSm22EB9r8TiVVeAuP9WRFoc=
github.com/internetarchive/gocrawlhq v1.2.15 h1:Llv6tvxxRUxoC9G4GsjkpbfKX0anbQUU+pwFiROlxzg=
github.com/internetarchive/gocrawlhq v1.2.15/go.mod h1:Rjkyx2ttWDG4vzXOrl7ilzdtbODJ3XSe2PkO77bxSTs=
github.com/internetarchive/gocrawlhq v1.2.16 h1:D9JJdLL8uqpHUDU3SxxcXUjQETbxnk08e9xo929xrlE=
github.com/internetarchive/gocrawlhq v1.2.16/go.mod h1:Rjkyx2ttWDG4vzXOrl7ilzdtbODJ3XSe2PkO77bxSTs=
github.com/internetarchive/gocrawlhq v1.2.17 h1:nSjFHpDp5C9Q8SrDPibC4Iiih6kpw18+2GnifJiVpO0=
github.com/internetarchive/gocrawlhq v1.2.17/go.mod h1:Rjkyx2ttWDG4vzXOrl7ilzdtbODJ3XSe2PkO77bxSTs=
github.com/internetarchive/gocrawlhq v1.2.18 h1:PPe7UqJ2NNOljn70SmUhoKdgPreeqRUk9XVrYShCn4w=
github.com/internetarchive/gocrawlhq v1.2.18/go.mod h1:Rjkyx2ttWDG4vzXOrl7ilzdtbODJ3XSe2PkO77bxSTs=
github.com/internetarchive/gocrawlhq v1.2.19 h1:bvDliaeWjt97x64bOf+rKXStQX7VE+ZON/I1FS3sQ6A=
github.com/internetarchive/gocrawlhq v1.2.19/go.mod h1:gHrdMewIi5OBWE/xEZGqSrNHyTXPbt+h+XUWpp9fZek=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
Expand Down Expand Up @@ -239,6 +249,8 @@ golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
Expand Down
40 changes: 21 additions & 19 deletions internal/pkg/crawl/hq.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (c *Crawl) HQProducer() {
// is already closed, so no other goroutine can write to the slice
if len(discoveredArray) > 0 {
for {
_, err := c.HQClient.Discovered(discoveredArray, "seed", false, false)
err := c.HQClient.Add(discoveredArray, false)
if err != nil {
c.Log.WithFields(c.genLogFields(err, nil, map[string]interface{}{})).Error("error sending payload to crawl HQ, waiting 1s then retrying..")
time.Sleep(time.Second)
Expand All @@ -85,7 +85,7 @@ func (c *Crawl) HQProducer() {
mutex.Lock()
if (len(discoveredArray) >= int(math.Ceil(float64(c.Workers.Count)/2)) || time.Since(HQLastSent) >= time.Second*10) && len(discoveredArray) > 0 {
for {
_, err := c.HQClient.Discovered(discoveredArray, "seed", false, false)
err := c.HQClient.Add(discoveredArray, false)
if err != nil {
c.Log.WithFields(c.genLogFields(err, nil, map[string]interface{}{})).Error("error sending payload to crawl HQ, waiting 1s then retrying..")
time.Sleep(time.Second)
Expand Down Expand Up @@ -123,7 +123,7 @@ func (c *Crawl) HQProducer() {
// gob's encode/decode doesn't properly support booleans
if discoveredItem.BypassSeencheck {
for {
_, err := c.HQClient.Discovered([]gocrawlhq.URL{discoveredURL}, "seed", true, false)
err := c.HQClient.Add([]gocrawlhq.URL{discoveredURL}, true)
if err != nil {
c.Log.WithFields(c.genLogFields(err, nil, map[string]interface{}{
"bypassSeencheck": discoveredItem.BypassSeencheck,
Expand Down Expand Up @@ -177,20 +177,20 @@ func (c *Crawl) HQConsumer() {

// get batch from crawl HQ
c.HQConsumerState = "waitingOnFeed"
batch, err := c.HQClient.Feed(HQBatchSize, c.HQStrategy)
URLs, err := c.HQClient.Feed(HQBatchSize, c.HQStrategy)
if err != nil {
c.Log.WithFields(c.genLogFields(err, nil, map[string]interface{}{
"batchSize": HQBatchSize,
"err": err,
})).Error("error getting new URLs from crawl HQ")
// c.Log.WithFields(c.genLogFields(err, nil, map[string]interface{}{
// "batchSize": HQBatchSize,
// "err": err,
// })).Debug("error getting new URLs from crawl HQ")
continue
}
c.HQConsumerState = "feedCompleted"

// send all URLs received in the batch to the queue
var items = make([]*queue.Item, 0, len(batch.URLs))
if len(batch.URLs) > 0 {
for _, URL := range batch.URLs {
var items = make([]*queue.Item, 0, len(URLs))
if len(URLs) > 0 {
for _, URL := range URLs {
c.HQConsumerState = "urlParse"
newURL, err := url.Parse(URL.Value)
if err != nil {
Expand Down Expand Up @@ -246,7 +246,7 @@ func (c *Crawl) HQFinisher() {

if len(finishedArray) == int(math.Ceil(float64(c.Workers.Count)/2)) {
for {
_, err := c.HQClient.Finished(finishedArray, locallyCrawledTotal)
err := c.HQClient.Delete(finishedArray, locallyCrawledTotal)
if err != nil {
c.Log.WithFields(c.genLogFields(err, nil, map[string]interface{}{
"finishedArray": finishedArray,
Expand All @@ -265,7 +265,7 @@ func (c *Crawl) HQFinisher() {
// send remaining finished URLs
if len(finishedArray) > 0 {
for {
_, err := c.HQClient.Finished(finishedArray, locallyCrawledTotal)
err := c.HQClient.Delete(finishedArray, locallyCrawledTotal)
if err != nil {
c.Log.WithFields(c.genLogFields(err, nil, map[string]interface{}{
"finishedArray": finishedArray,
Expand All @@ -286,10 +286,11 @@ func (c *Crawl) HQSeencheckURLs(URLs []*url.URL) (seencheckedBatch []*url.URL, e
for _, URL := range URLs {
discoveredURLs = append(discoveredURLs, gocrawlhq.URL{
Value: utils.URLToString(URL),
Type: "asset",
})
}

discoveredResponse, err := c.HQClient.Discovered(discoveredURLs, "asset", false, true)
outputURLs, err := c.HQClient.Seencheck(discoveredURLs)
if err != nil {
c.Log.WithFields(c.genLogFields(err, nil, map[string]interface{}{
"batchLen": len(URLs),
Expand All @@ -298,8 +299,8 @@ func (c *Crawl) HQSeencheckURLs(URLs []*url.URL) (seencheckedBatch []*url.URL, e
return seencheckedBatch, err
}

if discoveredResponse.URLs != nil {
for _, URL := range discoveredResponse.URLs {
if outputURLs != nil {
for _, URL := range outputURLs {
// the returned payload only contain new URLs to be crawled by Zeno
newURL, err := url.Parse(URL.Value)
if err != nil {
Expand All @@ -324,16 +325,17 @@ func (c *Crawl) HQSeencheckURLs(URLs []*url.URL) (seencheckedBatch []*url.URL, e
func (c *Crawl) HQSeencheckURL(URL *url.URL) (bool, error) {
discoveredURL := gocrawlhq.URL{
Value: utils.URLToString(URL),
Type: "asset",
}

discoveredResponse, err := c.HQClient.Discovered([]gocrawlhq.URL{discoveredURL}, "asset", false, true)
outputURLs, err := c.HQClient.Seencheck([]gocrawlhq.URL{discoveredURL})
if err != nil {
c.Log.Error("error sending seencheck payload to crawl HQ", "err", err, "url", utils.URLToString(URL))
return true, err // return true, don't discard the URL if there's an error
}

if discoveredResponse.URLs != nil {
for _, URL := range discoveredResponse.URLs {
if outputURLs != nil {
for _, URL := range outputURLs {
// the returned payload only contain new URLs to be crawled by Zeno
if URL.Value == discoveredURL.Value {
return true, nil
Expand Down

0 comments on commit 6d48952

Please sign in to comment.