Skip to content

Commit

Permalink
add: --hq-batch-concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
CorentinB committed Nov 12, 2024
1 parent 5719f94 commit 76f39f7
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 10 deletions.
3 changes: 2 additions & 1 deletion cmd/get_hq.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ func getHQCmdFlags(getHQCmd *cobra.Command) {
getHQCmd.PersistentFlags().String("hq-key", "", "Crawl HQ key.")
getHQCmd.PersistentFlags().String("hq-secret", "", "Crawl HQ secret.")
getHQCmd.PersistentFlags().String("hq-project", "", "Crawl HQ project.")
getHQCmd.PersistentFlags().Int64("hq-batch-size", 0, "Crawl HQ feeding batch size.")
getHQCmd.PersistentFlags().Bool("hq-continuous-pull", false, "If turned on, the crawler will pull URLs from Crawl HQ continuously.")
getHQCmd.PersistentFlags().String("hq-strategy", "lifo", "Crawl HQ feeding strategy.")
getHQCmd.PersistentFlags().Int64("hq-batch-size", 0, "Crawl HQ feeding batch size.")
getHQCmd.PersistentFlags().Int64("hq-batch-concurrency", 1, "Number of concurrent requests to do to get the --hq-batch-size, if batch size is 300 and batch-concurrency is 10, 30 requests will be done concurrently.")
getHQCmd.PersistentFlags().Bool("hq-rate-limiting-send-back", false, "If turned on, the crawler will send back URLs that hit a rate limit to crawl HQ.")

getHQCmd.MarkPersistentFlagRequired("hq-address")
Expand Down
3 changes: 2 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type Config struct {
HQSecret string `mapstructure:"hq-secret"`
HQProject string `mapstructure:"hq-project"`
HQStrategy string `mapstructure:"hq-strategy"`
HQBatchSize int64 `mapstructure:"hq-batch-size"`
HQBatchConcurrency int `mapstructure:"hq-batch-concurrency"`
LogFileOutputDir string `mapstructure:"log-file-output-dir"`
ElasticSearchUsername string `mapstructure:"es-user"`
ElasticSearchPassword string `mapstructure:"es-password"`
Expand All @@ -54,7 +56,6 @@ type Config struct {
MinSpaceRequired int `mapstructure:"min-space-required"`
WARCPoolSize int `mapstructure:"warc-pool-size"`
WARCDedupeSize int `mapstructure:"warc-dedupe-size"`
HQBatchSize int64 `mapstructure:"hq-batch-size"`
KeepCookies bool `mapstructure:"keep-cookies"`
Headless bool `mapstructure:"headless"`
DisableSeencheck bool `mapstructure:"disable-seencheck"`
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/gosuri/uilive v0.0.4
github.com/gosuri/uitable v0.0.4
github.com/grafov/m3u8 v0.12.0
github.com/internetarchive/gocrawlhq v1.2.19
github.com/internetarchive/gocrawlhq v1.2.20
github.com/paulbellamy/ratecounter v0.2.0
github.com/philippgille/gokv/leveldb v0.7.0
github.com/prometheus/client_golang v1.20.4
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ github.com/internetarchive/gocrawlhq v1.2.18 h1:PPe7UqJ2NNOljn70SmUhoKdgPreeqRUk
github.com/internetarchive/gocrawlhq v1.2.18/go.mod h1:Rjkyx2ttWDG4vzXOrl7ilzdtbODJ3XSe2PkO77bxSTs=
github.com/internetarchive/gocrawlhq v1.2.19 h1:bvDliaeWjt97x64bOf+rKXStQX7VE+ZON/I1FS3sQ6A=
github.com/internetarchive/gocrawlhq v1.2.19/go.mod h1:gHrdMewIi5OBWE/xEZGqSrNHyTXPbt+h+XUWpp9fZek=
github.com/internetarchive/gocrawlhq v1.2.20 h1:0mIIt9lhPacKr6L2JeISoopQ8EgzC3dISJ3ITGGbOp4=
github.com/internetarchive/gocrawlhq v1.2.20/go.mod h1:gHrdMewIi5OBWE/xEZGqSrNHyTXPbt+h+XUWpp9fZek=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
Expand Down
2 changes: 2 additions & 0 deletions internal/pkg/crawl/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ type Crawl struct {
HQKey string
HQSecret string
HQStrategy string
HQBatchConcurrency int
HQBatchSize int
HQContinuousPull bool
HQClient *gocrawlhq.Client
Expand Down Expand Up @@ -319,6 +320,7 @@ func GenerateCrawlConfig(config *config.Config) (*Crawl, error) {
c.HQSecret = config.HQSecret
c.HQStrategy = config.HQStrategy
c.HQBatchSize = int(config.HQBatchSize)
c.HQBatchConcurrency = config.HQBatchConcurrency
c.HQContinuousPull = config.HQContinuousPull
c.HQRateLimitingSendBack = config.HQRateLimitSendBack

Expand Down
54 changes: 47 additions & 7 deletions internal/pkg/crawl/hq.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,53 @@ func (c *Crawl) HQConsumer() {

// get batch from crawl HQ
c.HQConsumerState = "waitingOnFeed"
URLs, err := c.HQClient.Feed(HQBatchSize, c.HQStrategy)
if err != nil {
// c.Log.WithFields(c.genLogFields(err, nil, map[string]interface{}{
// "batchSize": HQBatchSize,
// "err": err,
// })).Debug("error getting new URLs from crawl HQ")
continue
var URLs []gocrawlhq.URL
var err error
if c.HQBatchConcurrency == 1 {
URLs, err = c.HQClient.Get(HQBatchSize, c.HQStrategy)
if err != nil {
// c.Log.WithFields(c.genLogFields(err, nil, map[string]interface{}{
// "batchSize": HQBatchSize,
// "err": err,
// })).Debug("error getting new URLs from crawl HQ")
continue
}
} else {
var mu sync.Mutex
var wg sync.WaitGroup
batchSize := HQBatchSize / c.HQBatchConcurrency
URLsChan := make(chan []gocrawlhq.URL, c.HQBatchConcurrency)

// Start goroutines to get URLs from crawl HQ, each will request
// HQBatchSize / HQConcurrentBatch URLs
for i := 0; i < c.HQBatchConcurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
URLs, err := c.HQClient.Get(batchSize, c.HQStrategy)
if err != nil {
// c.Log.WithFields(c.genLogFields(err, nil, map[string]interface{}{
// "batchSize": batchSize,
// "err": err,
// })).Debug("error getting new URLs from crawl HQ")
return
}
URLsChan <- URLs
}()
}

// Wait for all goroutines to finish
go func() {
wg.Wait()
close(URLsChan)
}()

// Collect all URLs from the channels
for URLsFromChan := range URLsChan {
mu.Lock()
URLs = append(URLs, URLsFromChan...)
mu.Unlock()
}
}
c.HQConsumerState = "feedCompleted"

Expand Down

0 comments on commit 76f39f7

Please sign in to comment.