From a83e3b79e7c6e16de95a31eaaf8f386a2b2bf200 Mon Sep 17 00:00:00 2001 From: Thomas FOUBERT Date: Mon, 24 Jun 2024 18:37:27 -0400 Subject: [PATCH 01/16] feat: added state management for workers and an API route to query workers state - untested --- cmd/utils.go | 4 +- internal/pkg/crawl/api.go | 30 +++++++++ internal/pkg/crawl/capture.go | 41 ++++++------ internal/pkg/crawl/crawl.go | 101 ++++++++++++++++++++++++++++-- internal/pkg/crawl/finish.go | 3 +- internal/pkg/crawl/worker.go | 114 +++++++++++++++++++++++++++++----- 6 files changed, 252 insertions(+), 41 deletions(-) diff --git a/cmd/utils.go b/cmd/utils.go index 13498f81..24233f9d 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -10,7 +10,6 @@ import ( "github.com/internetarchive/Zeno/internal/pkg/frontier" "github.com/internetarchive/Zeno/internal/pkg/utils" "github.com/paulbellamy/ratecounter" - "github.com/remeh/sizedwaitgroup" "github.com/sirupsen/logrus" ) @@ -50,7 +49,8 @@ func InitCrawlWithCMD(flags config.Flags) *crawl.Crawl { c.JobPath = path.Join("jobs", flags.Job) c.Workers = flags.Workers - c.WorkerPool = sizedwaitgroup.New(c.Workers) + c.WorkerPool = make([]*crawl.Worker, 0) + c.WorkerStopTimeout = time.Second * 60 // Placeholder for WorkerStopTimeout c.MaxConcurrentAssets = flags.MaxConcurrentAssets c.Seencheck = flags.Seencheck diff --git a/internal/pkg/crawl/api.go b/internal/pkg/crawl/api.go index 8880f229..67bef045 100644 --- a/internal/pkg/crawl/api.go +++ b/internal/pkg/crawl/api.go @@ -2,6 +2,7 @@ package crawl import ( "os" + "strconv" "time" "github.com/gin-contrib/pprof" @@ -11,6 +12,17 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" ) +type APIWorkersState struct { + Workers []*APIWorkerState `json:"workers"` +} + +type APIWorkerState struct { + WorkerID int `json:"worker_id"` + Status string `json:"status"` + LastError string `json:"last_error"` + Locked bool `json:"locked"` +} + func (crawl *Crawl) startAPI() { gin.SetMode(gin.ReleaseMode) gin.DefaultWriter = logInfo.Out @@ -56,6 +68,24 @@ func (crawl *Crawl) startAPI() { r.GET("/metrics", gin.WrapH(promhttp.Handler())) } + r.GET("/workers", func(c *gin.Context) { + workersState := crawl.GetWorkerState(-1) + c.JSON(200, workersState) + }) + + r.GET("/workers/:worker_id", func(c *gin.Context) { + workerID := c.Param("worker_id") + workerIDInt, err := strconv.Atoi(workerID) + if err != nil { + c.JSON(404, gin.H{ + "error": "Worker not found", + }) + return + } + workersState := crawl.GetWorkerState(workerIDInt) + c.JSON(200, workersState) + }) + err := r.Run(":" + crawl.APIPort) if err != nil { logError.Fatalf("unable to start API: %s", err.Error()) diff --git a/internal/pkg/crawl/capture.go b/internal/pkg/crawl/capture.go index 6a311366..87fc0042 100644 --- a/internal/pkg/crawl/capture.go +++ b/internal/pkg/crawl/capture.go @@ -219,7 +219,7 @@ func (c *Crawl) captureAsset(item *frontier.Item, cookies []*http.Cookie) error } // Capture capture the URL and return the outlinks -func (c *Crawl) Capture(item *frontier.Item) { +func (c *Crawl) Capture(item *frontier.Item) error { var ( resp *http.Response waitGroup sync.WaitGroup @@ -237,7 +237,7 @@ func (c *Crawl) Capture(item *frontier.Item) { req, err := http.NewRequest("GET", utils.URLToString(item.URL), nil) if err != nil { logError.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while preparing GET request") - return + return err } if item.Hop > 0 && item.ParentItem != nil { @@ -307,14 +307,14 @@ func (c *Crawl) Capture(item *frontier.Item) { // Execute request resp, err = c.executeGET(item, req, false) if err != nil && err.Error() == "URL from redirection has already been seen" { - return + return err } else if err != nil && err.Error() == "URL is being rate limited, sending back to HQ" { c.HQProducerChannel <- frontier.NewItem(item.URL, item.ParentItem, item.Type, item.Hop, "", true) logError.WithFields(c.genLogFields(err, item.URL, nil)).Error("URL is being rate limited, sending back to HQ") - return + return err } else if err != nil { logError.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while executing GET request") - return + return err } defer resp.Body.Close() @@ -335,7 +335,7 @@ func (c *Crawl) Capture(item *frontier.Item) { base, err := url.Parse(utils.URLToString(resp.Request.URL)) if err != nil { logError.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while parsing base URL") - return + return err } // If the response is a JSON document, we want to scrape it for links @@ -343,19 +343,19 @@ func (c *Crawl) Capture(item *frontier.Item) { jsonBody, err := io.ReadAll(resp.Body) if err != nil { logError.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while reading JSON body") - return + return err } outlinksFromJSON, err := getURLsFromJSON(string(jsonBody)) if err != nil { logError.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while getting URLs from JSON") - return + return err } waitGroup.Add(1) go c.queueOutlinks(utils.MakeAbsolute(item.URL, utils.StringSliceToURLSlice(outlinksFromJSON)), item, &waitGroup) - return + return err } // If the response is an XML document, we want to scrape it for links @@ -363,13 +363,13 @@ func (c *Crawl) Capture(item *frontier.Item) { xmlBody, err := io.ReadAll(resp.Body) if err != nil { logError.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while reading XML body") - return + return err } mv, err := mxj.NewMapXml(xmlBody) if err != nil { logError.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while parsing XML body") - return + return err } for _, value := range mv.LeafValues() { @@ -390,14 +390,14 @@ func (c *Crawl) Capture(item *frontier.Item) { logError.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while reading response body") } - return + return err } // Turn the response into a doc that we will scrape for outlinks and assets. doc, err := goquery.NewDocumentFromReader(resp.Body) if err != nil { logError.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while creating goquery document") - return + return err } // Execute site-specific code on the document @@ -406,7 +406,7 @@ func (c *Crawl) Capture(item *frontier.Item) { cfstreamURLs, err := cloudflarestream.GetJSFiles(doc, base, *c.Client) if err != nil { logError.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while getting JS files from cloudflarestream") - return + return err } // Seencheck the URLs we captured, we ignore the returned value here @@ -464,26 +464,26 @@ func (c *Crawl) Capture(item *frontier.Item) { outlinks, err := extractOutlinks(base, doc) if err != nil { logError.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while extracting outlinks") - return + return err } waitGroup.Add(1) go c.queueOutlinks(outlinks, item, &waitGroup) if c.DisableAssetsCapture { - return + return err } // Extract and capture assets assets, err := c.extractAssets(base, item, doc) if err != nil { logError.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while extracting assets") - return + return err } // If we didn't find any assets, let's stop here if len(assets) == 0 { - return + return err } // If --local-seencheck is enabled, then we check if the assets are in the @@ -502,7 +502,7 @@ func (c *Crawl) Capture(item *frontier.Item) { } if len(seencheckedBatch) == 0 { - return + return err } assets = seencheckedBatch @@ -522,7 +522,7 @@ func (c *Crawl) Capture(item *frontier.Item) { } if len(assets) == 0 { - return + return err } } @@ -584,6 +584,7 @@ func (c *Crawl) Capture(item *frontier.Item) { } swg.Wait() + return err } func getURLsFromJSON(jsonString string) ([]string, error) { diff --git a/internal/pkg/crawl/crawl.go b/internal/pkg/crawl/crawl.go index 9897cdaa..af4f63f3 100644 --- a/internal/pkg/crawl/crawl.go +++ b/internal/pkg/crawl/crawl.go @@ -1,6 +1,7 @@ package crawl import ( + "fmt" "net/http" "sync" "time" @@ -11,7 +12,6 @@ import ( "github.com/internetarchive/Zeno/internal/pkg/utils" "github.com/paulbellamy/ratecounter" "github.com/prometheus/client_golang/prometheus" - "github.com/remeh/sizedwaitgroup" "github.com/sirupsen/logrus" "github.com/telanflow/cookiejar" "mvdan.cc/xurls/v2" @@ -42,8 +42,13 @@ type Crawl struct { // Frontier Frontier *frontier.Frontier + // Worker pool + WorkerMutex sync.RWMutex + WorkerPool []*Worker + WorkerStopSignal chan bool + WorkerStopTimeout time.Duration + // Crawl settings - WorkerPool sizedwaitgroup.SizedWaitGroup MaxConcurrentAssets int Client *warc.CustomHTTPClient ClientProxied *warc.CustomHTTPClient @@ -287,9 +292,11 @@ func (c *Crawl) Start() (err error) { // Fire up the desired amount of workers for i := 0; i < c.Workers; i++ { - c.WorkerPool.Add() - go c.Worker() + worker := newWorker(c) + c.WorkerPool = append(c.WorkerPool, worker) + go worker.Run() } + go c.WorkerWatcher() // Start the process responsible for printing live stats on the standard output if c.LiveStats { @@ -335,3 +342,89 @@ func (c *Crawl) Start() (err error) { return } + +func (c *Crawl) WorkerWatcher() { + var toEnd = false + + for { + select { + // Stop the workers + case <-c.WorkerStopSignal: + for _, worker := range c.WorkerPool { + worker.doneSignal <- true + } + toEnd = true + + // Check for finished workers and remove them from the pool + // End the watcher if a stop signal was received and all workers are completed + default: + c.WorkerMutex.RLock() + for i, worker := range c.WorkerPool { + if worker.state.status == completed { + // Remove the worker from the pool + c.WorkerMutex.RUnlock() + c.WorkerMutex.Lock() + c.WorkerPool = append(c.WorkerPool[:i], c.WorkerPool[i+1:]...) + c.WorkerMutex.Unlock() + } + } + + if toEnd && len(c.WorkerPool) == 0 { + return // All workers are completed + } + } + } +} + +func (c *Crawl) EnsureWorkersFinished() bool { + var workerPoolLen int + var timer = time.NewTimer(c.WorkerStopTimeout) + + for { + c.WorkerMutex.RLock() + workerPoolLen = len(c.WorkerPool) + if workerPoolLen == 0 { + c.WorkerMutex.RUnlock() + return true + } + c.WorkerMutex.RUnlock() + select { + case <-timer.C: + c.Logger.Warning(fmt.Sprintf("[WORKERS] Timeout reached. %d workers still running", workerPoolLen)) + return false + default: + c.Logger.Warning(fmt.Sprintf("[WORKERS] Waiting for %d workers to finish", workerPoolLen)) + time.Sleep(time.Second * 5) + } + } +} + +// GetWorkerState returns the state of a worker given its index in the worker pool +// if the provided index is -1 then the state of all workers is returned +func (c *Crawl) GetWorkerState(index int) interface{} { + if index == -1 { + var workersStatus = new(APIWorkersState) + for i, worker := range c.WorkerPool { + workersStatus.Workers = append(workersStatus.Workers, _getWorkerState(worker, i)) + } + return workersStatus + } + if index >= len(c.WorkerPool) { + return nil + } + return _getWorkerState(c.WorkerPool[index], index) +} + +func _getWorkerState(worker *Worker, index int) *APIWorkerState { + isLocked := false + if worker.TryLock() { + isLocked = true + worker.Unlock() + } + return &APIWorkerState{ + WorkerID: index, + Status: worker.state.status.String(), + LastError: worker.state.lastError.Error(), + Locked: isLocked, + } +} diff --git a/internal/pkg/crawl/finish.go b/internal/pkg/crawl/finish.go index f11443c3..89b5e967 100644 --- a/internal/pkg/crawl/finish.go +++ b/internal/pkg/crawl/finish.go @@ -32,6 +32,7 @@ func (crawl *Crawl) catchFinish() { } func (crawl *Crawl) finish() { + crawl.WorkerStopSignal <- true crawl.Finished.Set(true) // First we wait for the queue reader to finish its current work, @@ -45,7 +46,7 @@ func (crawl *Crawl) finish() { close(crawl.Frontier.PullChan) crawl.Logger.Warning("[WORKERS] Waiting for workers to finish") - crawl.WorkerPool.Wait() + crawl.EnsureWorkersFinished() crawl.Logger.Warning("[WORKERS] All workers finished") // When all workers are finished, we can safely close the HQ related channels diff --git a/internal/pkg/crawl/worker.go b/internal/pkg/crawl/worker.go index ddf72a1d..f168bd2d 100644 --- a/internal/pkg/crawl/worker.go +++ b/internal/pkg/crawl/worker.go @@ -1,8 +1,10 @@ package crawl import ( + "sync" "time" + "github.com/internetarchive/Zeno/internal/pkg/frontier" "github.com/internetarchive/Zeno/internal/pkg/utils" ) @@ -17,33 +19,117 @@ const ( GB = 1024 * MB ) -// Worker is the key component of a crawl, it's a background processed dispatched +type status int + +const ( + idle status = iota + processing + completed +) + +func (s status) String() string { + statusStr := map[status]string{ + idle: "idle", + processing: "processing", + completed: "completed", + } + return statusStr[s] +} + +type workerState struct { + currentItem *frontier.Item + previousItem *frontier.Item + status status + lastError error + lastSeen time.Time +} + +type Worker struct { + sync.Mutex + state *workerState + doneSignal chan bool + crawlParameters *Crawl +} + +// Run is the key component of a crawl, it's a background processed dispatched // when the crawl starts, it listens on a channel to get new URLs to archive, // and eventually push newly discovered URLs back in the frontier. -func (c *Crawl) Worker() { - defer c.WorkerPool.Done() - +func (w *Worker) Run() { // Start archiving the URLs! - for item := range c.Frontier.PullChan { + for item := range w.crawlParameters.Frontier.PullChan { item := item - // Check if the crawl is paused - for c.Paused.Get() { - time.Sleep(time.Second) + // Check if the crawl is paused or needs to be stopped + switch { + case <-w.doneSignal: + w.Lock() + w.state.currentItem = nil + w.state.status = completed + return + default: + for w.crawlParameters.Paused.Get() { + time.Sleep(time.Second) + } } // If the host of the item is in the host exclusion list, we skip it - if utils.StringInSlice(item.Host, c.ExcludedHosts) || !c.checkIncludedHosts(item.Host) { - if c.UseHQ { + if utils.StringInSlice(item.Host, w.crawlParameters.ExcludedHosts) || !w.crawlParameters.checkIncludedHosts(item.Host) { + if w.crawlParameters.UseHQ { // If we are using the HQ, we want to mark the item as done - c.HQFinishedChannel <- item + w.crawlParameters.HQFinishedChannel <- item } continue } - c.ActiveWorkers.Incr(1) - c.Capture(item) - c.ActiveWorkers.Incr(-1) + // Launches the capture of the given item + w.Capture(item) + } +} + +func (w *Worker) Capture(item *frontier.Item) { + // Locks the worker + w.Lock() + defer w.Unlock() + + // Signals that the worker is processing an item + w.crawlParameters.ActiveWorkers.Incr(1) + w.state.currentItem = item + w.state.status = processing + + // Capture the item + err := w.crawlParameters.Capture(item) + if err != nil { + w.PushLastError(err) + } + + // Signals that the worker has finished processing the item + w.state.status = idle + w.state.currentItem = nil + w.state.previousItem = item + w.crawlParameters.ActiveWorkers.Incr(-1) + w.state.lastSeen = time.Now() +} + +func (w *Worker) Stop() { + w.doneSignal <- true +} + +func (w *Worker) PushLastError(err error) { + w.Lock() + w.state.lastError = err + w.Unlock() +} + +func newWorker(crawlParameters *Crawl) *Worker { + return &Worker{ + state: &workerState{ + status: idle, + previousItem: nil, + currentItem: nil, + lastError: nil, + }, + doneSignal: make(chan bool), + crawlParameters: crawlParameters, } } From e4879525fcd17247ba9e9759c4a5da32aee99ac4 Mon Sep 17 00:00:00 2001 From: Thomas FOUBERT Date: Mon, 24 Jun 2024 18:42:05 -0400 Subject: [PATCH 02/16] feat: respect Worker Pool RWmutex --- internal/pkg/crawl/crawl.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/pkg/crawl/crawl.go b/internal/pkg/crawl/crawl.go index af4f63f3..a3b38564 100644 --- a/internal/pkg/crawl/crawl.go +++ b/internal/pkg/crawl/crawl.go @@ -402,6 +402,9 @@ func (c *Crawl) EnsureWorkersFinished() bool { // GetWorkerState returns the state of a worker given its index in the worker pool // if the provided index is -1 then the state of all workers is returned func (c *Crawl) GetWorkerState(index int) interface{} { + c.WorkerMutex.RLock() + defer c.WorkerMutex.RUnlock() + if index == -1 { var workersStatus = new(APIWorkersState) for i, worker := range c.WorkerPool { From 0bf5cf0cf1fd98b1b2fa88d7b9798bfe1d193744 Mon Sep 17 00:00:00 2001 From: Thomas FOUBERT Date: Sat, 29 Jun 2024 10:13:47 -0400 Subject: [PATCH 03/16] feat: small adjustments making prod suitable --- .gitignore | 1 + cmd/utils.go | 1 + internal/pkg/crawl/api.go | 15 ++++++++++++--- internal/pkg/crawl/crawl.go | 33 +++++++++++++++++++++------------ internal/pkg/crawl/worker.go | 6 ++++-- 5 files changed, 39 insertions(+), 17 deletions(-) diff --git a/.gitignore b/.gitignore index 5a81b306..2eeae776 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ jobs/* +jobs/ Zeno *.txt *.sh \ No newline at end of file diff --git a/cmd/utils.go b/cmd/utils.go index 24233f9d..f423e905 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -52,6 +52,7 @@ func InitCrawlWithCMD(flags config.Flags) *crawl.Crawl { c.WorkerPool = make([]*crawl.Worker, 0) c.WorkerStopTimeout = time.Second * 60 // Placeholder for WorkerStopTimeout c.MaxConcurrentAssets = flags.MaxConcurrentAssets + c.WorkerStopSignal = make(chan bool) c.Seencheck = flags.Seencheck c.HTTPTimeout = flags.HTTPTimeout diff --git a/internal/pkg/crawl/api.go b/internal/pkg/crawl/api.go index 67bef045..4586ebfc 100644 --- a/internal/pkg/crawl/api.go +++ b/internal/pkg/crawl/api.go @@ -17,9 +17,10 @@ type APIWorkersState struct { } type APIWorkerState struct { - WorkerID int `json:"worker_id"` + WorkerID uint `json:"worker_id"` Status string `json:"status"` LastError string `json:"last_error"` + LastSeen string `json:"last_seen"` Locked bool `json:"locked"` } @@ -73,16 +74,24 @@ func (crawl *Crawl) startAPI() { c.JSON(200, workersState) }) - r.GET("/workers/:worker_id", func(c *gin.Context) { + r.GET("/worker/:worker_id", func(c *gin.Context) { workerID := c.Param("worker_id") workerIDInt, err := strconv.Atoi(workerID) if err != nil { + c.JSON(400, gin.H{ + "error": "Unsupported worker ID", + }) + return + } + + workersState := crawl.GetWorkerState(workerIDInt) + if workersState == nil { c.JSON(404, gin.H{ "error": "Worker not found", }) return } - workersState := crawl.GetWorkerState(workerIDInt) + c.JSON(200, workersState) }) diff --git a/internal/pkg/crawl/crawl.go b/internal/pkg/crawl/crawl.go index a3b38564..373a1326 100644 --- a/internal/pkg/crawl/crawl.go +++ b/internal/pkg/crawl/crawl.go @@ -291,8 +291,8 @@ func (c *Crawl) Start() (err error) { } // Fire up the desired amount of workers - for i := 0; i < c.Workers; i++ { - worker := newWorker(c) + for i := uint(0); i < uint(c.Workers); i++ { + worker := newWorker(c, i) c.WorkerPool = append(c.WorkerPool, worker) go worker.Run() } @@ -348,7 +348,8 @@ func (c *Crawl) WorkerWatcher() { for { select { - // Stop the workers + + // Stop the workers when requested case <-c.WorkerStopSignal: for _, worker := range c.WorkerPool { worker.doneSignal <- true @@ -356,22 +357,22 @@ func (c *Crawl) WorkerWatcher() { toEnd = true // Check for finished workers and remove them from the pool - // End the watcher if a stop signal was received and all workers are completed + // End the watcher if a stop signal was received beforehand and all workers are completed default: - c.WorkerMutex.RLock() + c.WorkerMutex.Lock() for i, worker := range c.WorkerPool { if worker.state.status == completed { // Remove the worker from the pool - c.WorkerMutex.RUnlock() - c.WorkerMutex.Lock() c.WorkerPool = append(c.WorkerPool[:i], c.WorkerPool[i+1:]...) - c.WorkerMutex.Unlock() } + worker.id = uint(i) } if toEnd && len(c.WorkerPool) == 0 { + c.WorkerMutex.Unlock() return // All workers are completed } + c.WorkerMutex.Unlock() } } } @@ -419,15 +420,23 @@ func (c *Crawl) GetWorkerState(index int) interface{} { } func _getWorkerState(worker *Worker, index int) *APIWorkerState { - isLocked := false + lastErr := "" + isLocked := true + if worker.TryLock() { - isLocked = true + isLocked = false worker.Unlock() } + + if worker.state.lastError != nil { + lastErr = worker.state.lastError.Error() + } + return &APIWorkerState{ - WorkerID: index, + WorkerID: worker.id, Status: worker.state.status.String(), - LastError: worker.state.lastError.Error(), + LastSeen: worker.state.lastSeen.Format(time.RFC3339), + LastError: lastErr, Locked: isLocked, } } diff --git a/internal/pkg/crawl/worker.go b/internal/pkg/crawl/worker.go index f168bd2d..02d63629 100644 --- a/internal/pkg/crawl/worker.go +++ b/internal/pkg/crawl/worker.go @@ -46,6 +46,7 @@ type workerState struct { type Worker struct { sync.Mutex + id uint state *workerState doneSignal chan bool crawlParameters *Crawl @@ -60,7 +61,7 @@ func (w *Worker) Run() { item := item // Check if the crawl is paused or needs to be stopped - switch { + select { case <-w.doneSignal: w.Lock() w.state.currentItem = nil @@ -121,8 +122,9 @@ func (w *Worker) PushLastError(err error) { w.Unlock() } -func newWorker(crawlParameters *Crawl) *Worker { +func newWorker(crawlParameters *Crawl, id uint) *Worker { return &Worker{ + id: id, state: &workerState{ status: idle, previousItem: nil, From c0a610bd6929546d7645a12fb264462b27e8886f Mon Sep 17 00:00:00 2001 From: Thomas FOUBERT Date: Sat, 29 Jun 2024 10:33:18 -0400 Subject: [PATCH 04/16] feat: add a multiHandler implementation of slog.Handler to log different levels, formats and outputs --- internal/pkg/log/log.go | 179 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 179 insertions(+) create mode 100644 internal/pkg/log/log.go diff --git a/internal/pkg/log/log.go b/internal/pkg/log/log.go new file mode 100644 index 00000000..cc1b2de2 --- /dev/null +++ b/internal/pkg/log/log.go @@ -0,0 +1,179 @@ +// Package log provides a custom logging solution with multi-output support +package log + +import ( + "context" + "log/slog" + "os" + "sync" +) + +var ( + defaultLogger *Logger + once sync.Once +) + +// Logger wraps slog.Logger to provide multi-output functionality +type Logger struct { + slogger *slog.Logger +} + +// multiHandler implements slog.Handler interface for multiple outputs +type multiHandler struct { + handlers []slog.Handler +} + +// Config holds the configuration for the logger +type Config struct { + FileOutput string + FileLevel slog.Level + StdoutLevel slog.Level +} + +// New creates a new Logger instance with the given configuration. +// It sets up handlers for stdout (text format) and file output (JSON format) if specified. +// If FileOutput is empty, only stdout logging will be enabled. +// +// Parameters: +// - cfg: Config struct containing logger configuration options +// +// Returns: +// - *Logger: A new Logger instance +// - error: An error if there was a problem creating the logger (e.g., unable to open log file) +func New(cfg Config) (*Logger, error) { + var handlers []slog.Handler + + stdoutHandler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ + Level: cfg.StdoutLevel, + }) + handlers = append(handlers, stdoutHandler) + + if cfg.FileOutput != "" { + file, err := os.OpenFile(cfg.FileOutput, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + if err != nil { + return nil, err + } + jsonHandler := slog.NewJSONHandler(file, &slog.HandlerOptions{ + Level: cfg.FileLevel, + }) + handlers = append(handlers, jsonHandler) + } + + mh := &multiHandler{handlers: handlers} + + slogger := slog.New(mh) + + return &Logger{slogger: slogger}, nil +} + +// Default returns the default Logger instance. +// The default logger writes to both stdout (text format) and a file named "app.log" (JSON format). +// Both outputs are set to log messages at Info level and above. +// This function uses sync.Once to ensure that the default logger is only created once. +// +// Returns: +// - *Logger: The default Logger instance +func Default() *Logger { + once.Do(func() { + logger, err := New(Config{ + FileOutput: "zeno.log", + FileLevel: slog.LevelInfo, + StdoutLevel: slog.LevelInfo, + }) + if err != nil { + panic(err) + } + defaultLogger = logger + }) + return defaultLogger +} + +// Info logs a message at Info level. +// The first argument is the message to log, and subsequent arguments are key-value pairs +// that will be included in the log entry. +// +// Parameters: +// - msg: The message to log +// - args: Optional key-value pairs to include in the log entry +func (l *Logger) Info(msg string, args ...any) { + l.slogger.Info(msg, args...) +} + +// Warn logs a message at Warn level. +// The first argument is the message to log, and subsequent arguments are key-value pairs +// that will be included in the log entry. +// +// Parameters: +// - msg: The message to log +// - args: Optional key-value pairs to include in the log entry +func (l *Logger) Warn(msg string, args ...any) { + l.slogger.Warn(msg, args...) +} + +// Error logs a message at Error level. +// The first argument is the message to log, and subsequent arguments are key-value pairs +// that will be included in the log entry. +// +// Parameters: +// - msg: The message to log +// - args: Optional key-value pairs to include in the log entry +func (l *Logger) Error(msg string, args ...any) { + l.slogger.Error(msg, args...) +} + +// Debug logs a message at Debug level. +// The first argument is the message to log, and subsequent arguments are key-value pairs +// that will be included in the log entry. +// +// Parameters: +// - msg: The message to log +// - args: Optional key-value pairs to include in the log entry +func (l *Logger) Debug(msg string, args ...any) { + l.slogger.Debug(msg, args...) +} + +//------------------------------------------------------------------------------------- +// Following methods are used to implement the slog.Handler interface for multiHandler +//------------------------------------------------------------------------------------- + +// This method checks if any of the underlying handlers are enabled for a given log level. +// It's used internally to determine if a log message should be processed by a given handler +func (h *multiHandler) Enabled(ctx context.Context, level slog.Level) bool { + for _, handler := range h.handlers { + if handler.Enabled(ctx, level) { + return true + } + } + return false +} + +// This method is responsible for passing the log record to all underlying handlers. +// It's called internally when a log message needs to be written. +func (h *multiHandler) Handle(ctx context.Context, r slog.Record) error { + for _, handler := range h.handlers { + if err := handler.Handle(ctx, r); err != nil { + return err + } + } + return nil +} + +// This method creates a new handler with additional attributes. +// It's used internally when the logger is asked to include additional context with all subsequent log messages. +func (h *multiHandler) WithAttrs(attrs []slog.Attr) slog.Handler { + handlers := make([]slog.Handler, len(h.handlers)) + for i, handler := range h.handlers { + handlers[i] = handler.WithAttrs(attrs) + } + return &multiHandler{handlers: handlers} +} + +// This method creates a new handler with a new group added to the attribute grouping hierarchy. +// It's used internally when the logger is asked to group a set of attributes together. +func (h *multiHandler) WithGroup(name string) slog.Handler { + handlers := make([]slog.Handler, len(h.handlers)) + for i, handler := range h.handlers { + handlers[i] = handler.WithGroup(name) + } + return &multiHandler{handlers: handlers} +} From 89106be8b97067a77ccd7e429d5a39783cfd7c97 Mon Sep 17 00:00:00 2001 From: Thomas FOUBERT Date: Sat, 29 Jun 2024 11:08:36 -0400 Subject: [PATCH 05/16] feat: add fatal logging and implement a io.Writer interface for Gin to use --- internal/pkg/crawl/api.go | 12 +++++++----- internal/pkg/crawl/crawl.go | 10 ++++------ internal/pkg/log/log.go | 18 +++++++++++++++--- internal/pkg/log/writer.go | 29 +++++++++++++++++++++++++++++ 4 files changed, 55 insertions(+), 14 deletions(-) create mode 100644 internal/pkg/log/writer.go diff --git a/internal/pkg/crawl/api.go b/internal/pkg/crawl/api.go index 4586ebfc..e8602111 100644 --- a/internal/pkg/crawl/api.go +++ b/internal/pkg/crawl/api.go @@ -1,6 +1,7 @@ package crawl import ( + "log/slog" "os" "strconv" "time" @@ -26,13 +27,14 @@ type APIWorkerState struct { func (crawl *Crawl) startAPI() { gin.SetMode(gin.ReleaseMode) - gin.DefaultWriter = logInfo.Out + gin.DefaultWriter = crawl.Log.Writer(slog.LevelInfo) + gin.DefaultErrorWriter = crawl.Log.Writer(slog.LevelError) r := gin.Default() pprof.Register(r) - logInfo.Info("Starting API") + crawl.Log.Info("Starting API") r.GET("/", func(c *gin.Context) { crawledSeeds := crawl.CrawledSeeds.Value() crawledAssets := crawl.CrawledAssets.Value() @@ -54,7 +56,7 @@ func (crawl *Crawl) startAPI() { labels["crawljob"] = crawl.Job hostname, err := os.Hostname() if err != nil { - logWarning.Warn("Unable to retrieve hostname of machine") + crawl.Log.Warn("Unable to retrieve hostname of machine") hostname = "unknown" } labels["host"] = hostname + ":" + crawl.APIPort @@ -65,7 +67,7 @@ func (crawl *Crawl) startAPI() { Help: "The total number of crawled URI", }) - logInfo.Info("Starting Prometheus export") + crawl.Log.Info("Starting Prometheus export") r.GET("/metrics", gin.WrapH(promhttp.Handler())) } @@ -97,6 +99,6 @@ func (crawl *Crawl) startAPI() { err := r.Run(":" + crawl.APIPort) if err != nil { - logError.Fatalf("unable to start API: %s", err.Error()) + crawl.Log.Fatal("unable to start API", "error", err.Error()) } } diff --git a/internal/pkg/crawl/crawl.go b/internal/pkg/crawl/crawl.go index 373a1326..f81d6d70 100644 --- a/internal/pkg/crawl/crawl.go +++ b/internal/pkg/crawl/crawl.go @@ -9,6 +9,7 @@ import ( "git.archive.org/wb/gocrawlhq" "github.com/CorentinB/warc" "github.com/internetarchive/Zeno/internal/pkg/frontier" + "github.com/internetarchive/Zeno/internal/pkg/log" "github.com/internetarchive/Zeno/internal/pkg/utils" "github.com/paulbellamy/ratecounter" "github.com/prometheus/client_golang/prometheus" @@ -17,12 +18,6 @@ import ( "mvdan.cc/xurls/v2" ) -var ( - logInfo *logrus.Logger - logWarning *logrus.Logger - logError *logrus.Logger -) - // PrometheusMetrics define all the metrics exposed by the Prometheus exporter type PrometheusMetrics struct { Prefix string @@ -39,6 +34,9 @@ type Crawl struct { LiveStats bool ElasticSearchURL string + // Logger + Log *log.Logger + // Frontier Frontier *frontier.Frontier diff --git a/internal/pkg/log/log.go b/internal/pkg/log/log.go index cc1b2de2..e03cd0ba 100644 --- a/internal/pkg/log/log.go +++ b/internal/pkg/log/log.go @@ -88,6 +88,17 @@ func Default() *Logger { return defaultLogger } +// Debug logs a message at Debug level. +// The first argument is the message to log, and subsequent arguments are key-value pairs +// that will be included in the log entry. +// +// Parameters: +// - msg: The message to log +// - args: Optional key-value pairs to include in the log entry +func (l *Logger) Debug(msg string, args ...any) { + l.slogger.Debug(msg, args...) +} + // Info logs a message at Info level. // The first argument is the message to log, and subsequent arguments are key-value pairs // that will be included in the log entry. @@ -121,15 +132,16 @@ func (l *Logger) Error(msg string, args ...any) { l.slogger.Error(msg, args...) } -// Debug logs a message at Debug level. +// Fatal logs a message at Fatal level and then calls os.Exit(1). // The first argument is the message to log, and subsequent arguments are key-value pairs // that will be included in the log entry. // // Parameters: // - msg: The message to log // - args: Optional key-value pairs to include in the log entry -func (l *Logger) Debug(msg string, args ...any) { - l.slogger.Debug(msg, args...) +func (l *Logger) Fatal(msg string, args ...any) { + l.slogger.Log(context.Background(), slog.LevelError, msg, args...) + os.Exit(1) } //------------------------------------------------------------------------------------- diff --git a/internal/pkg/log/writer.go b/internal/pkg/log/writer.go new file mode 100644 index 00000000..fe420aa7 --- /dev/null +++ b/internal/pkg/log/writer.go @@ -0,0 +1,29 @@ +package log + +import ( + "context" + "io" + "log/slog" +) + +// logWriter implements io.Writer interface for compatibility with Gin +type logWriter struct { + logger *Logger + level slog.Level +} + +// Write implements io.Writer interface. +// It writes the log message at the specified level. +func (w *logWriter) Write(p []byte) (n int, err error) { + w.logger.slogger.Log(context.Background(), w.level, string(p)) + return len(p), nil +} + +// Writer returns an io.Writer that logs at the specified level. +// This can be used to integrate with Gin's logging system. +func (l *Logger) Writer(level slog.Level) io.Writer { + return &logWriter{ + logger: l, + level: level, + } +} From 1376ad07937bf11faf1ea82c6152d92e55d38e49 Mon Sep 17 00:00:00 2001 From: Thomas FOUBERT Date: Sat, 29 Jun 2024 11:42:58 -0400 Subject: [PATCH 06/16] feat: add WithFields method to replicate logrus behaviour --- internal/pkg/log/log.go | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/internal/pkg/log/log.go b/internal/pkg/log/log.go index e03cd0ba..0213204e 100644 --- a/internal/pkg/log/log.go +++ b/internal/pkg/log/log.go @@ -144,6 +144,35 @@ func (l *Logger) Fatal(msg string, args ...any) { os.Exit(1) } +// WithFields returns a new Logger with the given fields added to all log entries. +// The fields are key-value pairs that will be included in every subsequent log entry +// made by the returned logger. +// +// This method creates a new Logger instance and does not modify the original logger. +// It can be chained with other WithFields calls or logging methods. +// +// Parameters: +// - fields: A map of key-value pairs to be included in all log entries +// +// Returns: +// - *Logger: A new Logger instance with the specified fields attached +// +// Example: +// +// logger := log.Default() +// userLogger := logger.WithFields(map[string]interface{}{ +// "user_id": 12345, +// "ip": "192.168.1.1", +// }) +// userLogger.Info("User logged in") +func (l *Logger) WithFields(fields map[string]interface{}) *Logger { + args := make([]any, 0, len(fields)*2) + for k, v := range fields { + args = append(args, k, v) + } + return &Logger{slogger: l.slogger.With(args...)} +} + //------------------------------------------------------------------------------------- // Following methods are used to implement the slog.Handler interface for multiHandler //------------------------------------------------------------------------------------- From 3134d15da1c569491130f8a5336b210fac7919ae Mon Sep 17 00:00:00 2001 From: Thomas FOUBERT Date: Sat, 29 Jun 2024 11:46:54 -0400 Subject: [PATCH 07/16] chore: add types and function comments --- internal/pkg/crawl/api.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/pkg/crawl/api.go b/internal/pkg/crawl/api.go index e8602111..d2348009 100644 --- a/internal/pkg/crawl/api.go +++ b/internal/pkg/crawl/api.go @@ -13,10 +13,12 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" ) +// APIWorkersState represents the state of all API workers. type APIWorkersState struct { Workers []*APIWorkerState `json:"workers"` } +// APIWorkerState represents the state of an API worker. type APIWorkerState struct { WorkerID uint `json:"worker_id"` Status string `json:"status"` @@ -25,6 +27,7 @@ type APIWorkerState struct { Locked bool `json:"locked"` } +// startAPI starts the API server for the crawl. func (crawl *Crawl) startAPI() { gin.SetMode(gin.ReleaseMode) gin.DefaultWriter = crawl.Log.Writer(slog.LevelInfo) From 10f0b50e973668de97796e973b1b6668b66ade46 Mon Sep 17 00:00:00 2001 From: Thomas FOUBERT Date: Sat, 29 Jun 2024 12:05:45 -0400 Subject: [PATCH 08/16] feat: add a log.Entry type that is used by log.WithFields() to replicate the best way possible what logrus do --- internal/pkg/crawl/assets.go | 4 +- internal/pkg/log/withfields.go | 79 ++++++++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+), 2 deletions(-) create mode 100644 internal/pkg/log/withfields.go diff --git a/internal/pkg/crawl/assets.go b/internal/pkg/crawl/assets.go index 2fc9b6d2..9c7971eb 100644 --- a/internal/pkg/crawl/assets.go +++ b/internal/pkg/crawl/assets.go @@ -18,7 +18,7 @@ func (c *Crawl) extractAssets(base *url.URL, item *frontier.Item, doc *goquery.D if strings.Contains(base.Host, "cloudflarestream.com") { cloudflarestreamURLs, err := cloudflarestream.GetSegments(base, *c.Client) if err != nil { - logWarning.WithFields(c.genLogFields(err, item.URL, nil)).Warnln("error getting cloudflarestream segments") + c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Warn("error getting cloudflarestream segments") } if len(cloudflarestreamURLs) > 0 { @@ -144,7 +144,7 @@ func (c *Crawl) extractAssets(base *url.URL, item *frontier.Item, doc *goquery.D // Apply regex on the script's HTML to extract potential assets outerHTML, err := goquery.OuterHtml(item) if err != nil { - logWarning.Warning(err) + c.Log.Warn("crawl/assets.go:extractAssets():goquery.OuterHtml():", "error", err) } else { scriptLinks := utils.DedupeStrings(regexOutlinks.FindAllString(outerHTML, -1)) for _, scriptLink := range scriptLinks { diff --git a/internal/pkg/log/withfields.go b/internal/pkg/log/withfields.go new file mode 100644 index 00000000..a5d7c60e --- /dev/null +++ b/internal/pkg/log/withfields.go @@ -0,0 +1,79 @@ +package log + +import ( + "context" + "fmt" + "log/slog" + "os" + "time" +) + +// WithFields returns a new log entry with the given fields. +// The fields are key-value pairs that will be included only in the next log entry. +// +// This method returns a log Entry, which can be used to log a message with the specified fields. +// +// Parameters: +// - fields: A map of key-value pairs to be included in the next log entry +// +// Returns: +// - Entry: A log entry that can be used to log a message with the specified fields +// +// Example: +// +// logger := log.Default() +// logger.WithFields(map[string]interface{}{ +// "user_id": 12345, +// "ip": "192.168.1.1", +// }).Info("User logged in") +func (l *Logger) WithFields(fields map[string]interface{}) *Entry { + attrs := make([]slog.Attr, 0, len(fields)) + for k, v := range fields { + attrs = append(attrs, slog.Any(k, v)) + } + return &Entry{logger: l, attrs: attrs} +} + +// Entry is a log entry with fields. +type Entry struct { + logger *Logger + attrs []slog.Attr +} + +func (e *Entry) log(ctx context.Context, level slog.Level, msg string, args ...any) { + allAttrs := append(e.attrs, slog.Any("msg", msg)) + for i := 0; i < len(args); i += 2 { + if i+1 < len(args) { + allAttrs = append(allAttrs, slog.Any(fmt.Sprint(args[i]), args[i+1])) + } + } + r := slog.NewRecord(time.Now(), level, msg, 0) + r.AddAttrs(allAttrs...) + _ = e.logger.handler.Handle(ctx, r) +} + +// Info logs a message at Info level with the fields specified in WithFields. +func (e *Entry) Info(msg string, args ...any) { + e.log(context.Background(), slog.LevelInfo, msg, args...) +} + +// Warn logs a message at Warn level with the fields specified in WithFields. +func (e *Entry) Warn(msg string, args ...any) { + e.log(context.Background(), slog.LevelWarn, msg, args...) +} + +// Error logs a message at Error level with the fields specified in WithFields. +func (e *Entry) Error(msg string, args ...any) { + e.log(context.Background(), slog.LevelError, msg, args...) +} + +// Debug logs a message at Debug level with the fields specified in WithFields. +func (e *Entry) Debug(msg string, args ...any) { + e.log(context.Background(), slog.LevelDebug, msg, args...) +} + +// Fatal logs a message at Fatal level with the fields specified in WithFields, then calls os.Exit(1). +func (e *Entry) Fatal(msg string, args ...any) { + e.log(context.Background(), slog.LevelError, msg, args...) + os.Exit(1) +} From d6fff681a88971e73c652e236b409d71ddc57c5a Mon Sep 17 00:00:00 2001 From: Thomas FOUBERT Date: Sat, 29 Jun 2024 12:08:05 -0400 Subject: [PATCH 09/16] feat: make capture.go use custom logging + slight code enhancements --- internal/pkg/crawl/capture.go | 71 +++++++++++++++++------------------ 1 file changed, 34 insertions(+), 37 deletions(-) diff --git a/internal/pkg/crawl/capture.go b/internal/pkg/crawl/capture.go index 87fc0042..f4521ed0 100644 --- a/internal/pkg/crawl/capture.go +++ b/internal/pkg/crawl/capture.go @@ -92,7 +92,7 @@ func (c *Crawl) executeGET(item *frontier.Item, req *http.Request, isRedirection return nil, err } - logError.WithFields(c.genLogFields(err, req.URL, nil)).Error("error while executing GET request, retrying") + c.Log.WithFields(c.genLogFields(err, req.URL, nil)).Error("error while executing GET request, retrying") time.Sleep(sleepTime) @@ -100,11 +100,11 @@ func (c *Crawl) executeGET(item *frontier.Item, req *http.Request, isRedirection } if resp.StatusCode == 429 { - logWarning.WithFields(c.genLogFields(err, req.URL, map[string]interface{}{ + c.Log.WithFields(c.genLogFields(err, req.URL, map[string]interface{}{ "sleepTime": sleepTime.String(), "retryCount": retry, "statusCode": resp.StatusCode, - })).Debugf("we are being rate limited") + })).Info("we are being rate limited") // This ensures we aren't leaving the warc dialer hanging. // Do note, 429s are filtered out by WARC writer regardless. @@ -114,19 +114,17 @@ func (c *Crawl) executeGET(item *frontier.Item, req *http.Request, isRedirection // If --hq-rate-limiting-send-back is enabled, we send the URL back to HQ if c.UseHQ && c.HQRateLimitingSendBack { return nil, errors.New("URL is being rate limited, sending back to HQ") - } else { - logWarning.WithFields(c.genLogFields(err, req.URL, map[string]interface{}{ - "sleepTime": sleepTime.String(), - "retryCount": retry, - "statusCode": resp.StatusCode, - })).Warn("URL is being rate limited") } + c.Log.WithFields(c.genLogFields(err, req.URL, map[string]interface{}{ + "sleepTime": sleepTime.String(), + "retryCount": retry, + "statusCode": resp.StatusCode, + })).Warn("URL is being rate limited") continue - } else { - c.logCrawlSuccess(executionStart, resp.StatusCode, item) - break } + c.logCrawlSuccess(executionStart, resp.StatusCode, item) + break } // If a redirection is catched, then we execute the redirection @@ -236,7 +234,7 @@ func (c *Crawl) Capture(item *frontier.Item) error { // Prepare GET request req, err := http.NewRequest("GET", utils.URLToString(item.URL), nil) if err != nil { - logError.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while preparing GET request") + c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while preparing GET request") return err } @@ -251,10 +249,10 @@ func (c *Crawl) Capture(item *frontier.Item) error { // Get the API URL from the URL apiURL, err := truthsocial.GenerateAPIURL(utils.URLToString(item.URL)) if err != nil { - logError.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while generating API URL") + c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while generating API URL") } else { if apiURL == nil { - logError.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while generating API URL") + c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while generating API URL") } else { // Then we create an item apiItem := frontier.NewItem(apiURL, item, item.Type, item.Hop, item.ID, false) @@ -266,7 +264,7 @@ func (c *Crawl) Capture(item *frontier.Item) error { // Grab few embeds that are needed for the playback embedURLs, err := truthsocial.EmbedURLs() if err != nil { - logError.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while getting embed URLs") + c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while getting embed URLs") } else { for _, embedURL := range embedURLs { // Create the embed item @@ -281,10 +279,10 @@ func (c *Crawl) Capture(item *frontier.Item) error { // Generate the highwinds URL highwindsURL, err := libsyn.GenerateHighwindsURL(utils.URLToString(item.URL)) if err != nil { - logError.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while generating libsyn URL") + c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while generating libsyn URL") } else { if highwindsURL == nil { - logError.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while generating libsyn URL") + c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while generating libsyn URL") } else { c.Capture(frontier.NewItem(highwindsURL, item, item.Type, item.Hop, item.ID, false)) } @@ -310,10 +308,10 @@ func (c *Crawl) Capture(item *frontier.Item) error { return err } else if err != nil && err.Error() == "URL is being rate limited, sending back to HQ" { c.HQProducerChannel <- frontier.NewItem(item.URL, item.ParentItem, item.Type, item.Hop, "", true) - logError.WithFields(c.genLogFields(err, item.URL, nil)).Error("URL is being rate limited, sending back to HQ") + c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("URL is being rate limited, sending back to HQ") return err } else if err != nil { - logError.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while executing GET request") + c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while executing GET request") return err } defer resp.Body.Close() @@ -334,7 +332,7 @@ func (c *Crawl) Capture(item *frontier.Item) error { // Store the base URL to turn relative links into absolute links later base, err := url.Parse(utils.URLToString(resp.Request.URL)) if err != nil { - logError.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while parsing base URL") + c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while parsing base URL") return err } @@ -342,13 +340,13 @@ func (c *Crawl) Capture(item *frontier.Item) error { if strings.Contains(resp.Header.Get("Content-Type"), "json") { jsonBody, err := io.ReadAll(resp.Body) if err != nil { - logError.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while reading JSON body") + c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while reading JSON body") return err } outlinksFromJSON, err := getURLsFromJSON(string(jsonBody)) if err != nil { - logError.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while getting URLs from JSON") + c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while getting URLs from JSON") return err } @@ -362,13 +360,13 @@ func (c *Crawl) Capture(item *frontier.Item) error { if strings.Contains(resp.Header.Get("Content-Type"), "xml") { xmlBody, err := io.ReadAll(resp.Body) if err != nil { - logError.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while reading XML body") + c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while reading XML body") return err } mv, err := mxj.NewMapXml(xmlBody) if err != nil { - logError.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while parsing XML body") + c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while parsing XML body") return err } @@ -387,7 +385,7 @@ func (c *Crawl) Capture(item *frontier.Item) error { // Enforce reading all data from the response for WARC writing _, err := io.Copy(io.Discard, resp.Body) if err != nil { - logError.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while reading response body") + c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while reading response body") } return err @@ -396,7 +394,7 @@ func (c *Crawl) Capture(item *frontier.Item) error { // Turn the response into a doc that we will scrape for outlinks and assets. doc, err := goquery.NewDocumentFromReader(resp.Body) if err != nil { - logError.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while creating goquery document") + c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while creating goquery document") return err } @@ -405,7 +403,7 @@ func (c *Crawl) Capture(item *frontier.Item) error { // Look for JS files necessary for the playback of the video cfstreamURLs, err := cloudflarestream.GetJSFiles(doc, base, *c.Client) if err != nil { - logError.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while getting JS files from cloudflarestream") + c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while getting JS files from cloudflarestream") return err } @@ -419,7 +417,7 @@ func (c *Crawl) Capture(item *frontier.Item) error { } else if c.UseHQ { _, err := c.HQSeencheckURLs(utils.StringSliceToURLSlice(cfstreamURLs)) if err != nil { - logError.WithFields(c.genLogFields(err, item.URL, map[string]interface{}{ + c.Log.WithFields(c.genLogFields(err, item.URL, map[string]interface{}{ "urls": cfstreamURLs, })).Error("error while seenchecking assets via HQ") } @@ -427,7 +425,7 @@ func (c *Crawl) Capture(item *frontier.Item) error { // Log the archived URLs for _, cfstreamURL := range cfstreamURLs { - logInfo.WithFields(c.genLogFields(err, cfstreamURL, map[string]interface{}{ + c.Log.WithFields(c.genLogFields(err, cfstreamURL, map[string]interface{}{ "parentHop": item.Hop, "parentUrl": utils.URLToString(item.URL), "type": "asset", @@ -452,7 +450,7 @@ func (c *Crawl) Capture(item *frontier.Item) error { if exists { baseTagValue, err := url.Parse(link) if err != nil { - logError.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while parsing base tag value") + c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while parsing base tag value") } else { base = baseTagValue } @@ -463,7 +461,7 @@ func (c *Crawl) Capture(item *frontier.Item) error { // Extract outlinks outlinks, err := extractOutlinks(base, doc) if err != nil { - logError.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while extracting outlinks") + c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while extracting outlinks") return err } @@ -477,7 +475,7 @@ func (c *Crawl) Capture(item *frontier.Item) error { // Extract and capture assets assets, err := c.extractAssets(base, item, doc) if err != nil { - logError.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while extracting assets") + c.Log.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while extracting assets") return err } @@ -496,9 +494,8 @@ func (c *Crawl) Capture(item *frontier.Item) error { found := c.seencheckURL(utils.URLToString(URL), "asset") if found { continue - } else { - seencheckedBatch = append(seencheckedBatch, URL) } + seencheckedBatch = append(seencheckedBatch, URL) } if len(seencheckedBatch) == 0 { @@ -512,7 +509,7 @@ func (c *Crawl) Capture(item *frontier.Item) error { // if HQ is down or if the request failed. So if we get an error, we just // continue with the original list of assets. if err != nil { - logError.WithFields(c.genLogFields(err, nil, map[string]interface{}{ + c.Log.WithFields(c.genLogFields(err, nil, map[string]interface{}{ "urls": assets, "parentHop": item.Hop, "parentUrl": utils.URLToString(item.URL), @@ -569,7 +566,7 @@ func (c *Crawl) Capture(item *frontier.Item) error { // Capture the asset err = c.captureAsset(newAsset, resp.Cookies()) if err != nil { - logError.WithFields(c.genLogFields(err, &asset, map[string]interface{}{ + c.Log.WithFields(c.genLogFields(err, &asset, map[string]interface{}{ "parentHop": item.Hop, "parentUrl": utils.URLToString(item.URL), "type": "asset", From f1720118f373a3221669c6e92998ed246a446978 Mon Sep 17 00:00:00 2001 From: Thomas FOUBERT Date: Sat, 29 Jun 2024 12:55:35 -0400 Subject: [PATCH 10/16] feat: reflect logging changes on the rest of the code --- internal/pkg/crawl/crawl.go | 61 +++++++----------------------------- internal/pkg/crawl/finish.go | 30 +++++++++--------- internal/pkg/crawl/hq.go | 28 ++++++++--------- internal/pkg/crawl/log.go | 2 +- internal/pkg/log/log.go | 36 ++++----------------- 5 files changed, 47 insertions(+), 110 deletions(-) diff --git a/internal/pkg/crawl/crawl.go b/internal/pkg/crawl/crawl.go index f81d6d70..2ee5cecf 100644 --- a/internal/pkg/crawl/crawl.go +++ b/internal/pkg/crawl/crawl.go @@ -50,7 +50,6 @@ type Crawl struct { MaxConcurrentAssets int Client *warc.CustomHTTPClient ClientProxied *warc.CustomHTTPClient - Logger logrus.Logger DisabledHTMLTags []string ExcludedHosts []string IncludedHosts []string @@ -137,51 +136,13 @@ func (c *Crawl) Start() (err error) { if c.CrawlTimeLimit != 0 { go func() { time.Sleep(time.Second * time.Duration(c.CrawlTimeLimit)) - logInfo.Infoln("Crawl time limit reached: attempting to finish the crawl.") + c.Log.Info("Crawl time limit reached: attempting to finish the crawl.") go c.finish() time.Sleep((time.Duration(c.MaxCrawlTimeLimit) * time.Second) - (time.Duration(c.CrawlTimeLimit) * time.Second)) - logError.Fatal("Max crawl time limit reached, exiting..") + c.Log.Fatal("Max crawl time limit reached, exiting..") }() } - // Setup logging, every day at midnight UTC a new setup - // is triggered in order to change the ES index's name - if c.ElasticSearchURL != "" { - // Goroutine loop that fetch the machine's IP address every second - go func() { - for { - ip := utils.GetOutboundIP().String() - constants.Store("ip", ip) - time.Sleep(time.Second * 10) - } - }() - - logInfo, logWarning, logError = utils.SetupLogging(c.JobPath, c.LiveStats, c.ElasticSearchURL) - - go func() { - // Get the current time in UTC and figure out when the next midnight will occur - now := time.Now().UTC() - midnight := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC) - if now.After(midnight) { - midnight = midnight.Add(24 * time.Hour) - } - - // Calculate the duration until midnight and add a little extra time to avoid calling your function just before midnight - duration := midnight.Sub(now) + time.Second*10 - - // Create a timer that will wait until midnight - timer := time.NewTimer(duration) - - // Wait for the timer to finish (which will occur at midnight) - <-timer.C - - // Call your function - logInfo, logWarning, logError = utils.SetupLogging(c.JobPath, c.LiveStats, c.ElasticSearchURL) - }() - } else { - logInfo, logWarning, logError = utils.SetupLogging(c.JobPath, c.LiveStats, c.ElasticSearchURL) - } - // Start the background process that will handle os signals // to exit Zeno, like CTRL+C go c.setupCloseHandler() @@ -192,11 +153,11 @@ func (c *Crawl) Start() (err error) { for log := range frontierLoggingChan { switch log.Level { case logrus.ErrorLevel: - logError.WithFields(c.genLogFields(nil, nil, log.Fields)).Error(log.Message) + c.Log.WithFields(c.genLogFields(nil, nil, log.Fields)).Error(log.Message) case logrus.WarnLevel: - logWarning.WithFields(c.genLogFields(nil, nil, log.Fields)).Warn(log.Message) + c.Log.WithFields(c.genLogFields(nil, nil, log.Fields)).Warn(log.Message) case logrus.InfoLevel: - logInfo.WithFields(c.genLogFields(nil, nil, log.Fields)).Info(log.Message) + c.Log.WithFields(c.genLogFields(nil, nil, log.Fields)).Info(log.Message) } } }() @@ -245,7 +206,7 @@ func (c *Crawl) Start() (err error) { go func() { for err := range c.Client.ErrChan { - logError.WithFields(c.genLogFields(err, nil, nil)).Errorf("WARC HTTP client error") + c.Log.WithFields(c.genLogFields(err, nil, nil)).Error("WARC HTTP client error") } }() @@ -258,12 +219,12 @@ func (c *Crawl) Start() (err error) { c.ClientProxied, err = warc.NewWARCWritingHTTPClient(proxyHTTPClientSettings) if err != nil { - logError.Fatal("unable to init WARC writing (proxy) HTTP client") + c.Log.Fatal("unable to init WARC writing (proxy) HTTP client") } go func() { for err := range c.ClientProxied.ErrChan { - logError.WithFields(c.genLogFields(err, nil, nil)).Error("WARC HTTP client error") + c.Log.WithFields(c.genLogFields(err, nil, nil)).Error("WARC HTTP client error") } }() } @@ -282,7 +243,7 @@ func (c *Crawl) Start() (err error) { if c.CookieFile != "" { cookieJar, err := cookiejar.NewFileJar(c.CookieFile, nil) if err != nil { - logError.WithFields(c.genLogFields(err, nil, nil)).Fatal("unable to parse cookie file") + c.Log.WithFields(c.genLogFields(err, nil, nil)).Fatal("unable to parse cookie file") } c.Client.Jar = cookieJar @@ -389,10 +350,10 @@ func (c *Crawl) EnsureWorkersFinished() bool { c.WorkerMutex.RUnlock() select { case <-timer.C: - c.Logger.Warning(fmt.Sprintf("[WORKERS] Timeout reached. %d workers still running", workerPoolLen)) + c.Log.Warn(fmt.Sprintf("[WORKERS] Timeout reached. %d workers still running", workerPoolLen)) return false default: - c.Logger.Warning(fmt.Sprintf("[WORKERS] Waiting for %d workers to finish", workerPoolLen)) + c.Log.Warn(fmt.Sprintf("[WORKERS] Waiting for %d workers to finish", workerPoolLen)) time.Sleep(time.Second * 5) } } diff --git a/internal/pkg/crawl/finish.go b/internal/pkg/crawl/finish.go index 89b5e967..ae034427 100644 --- a/internal/pkg/crawl/finish.go +++ b/internal/pkg/crawl/finish.go @@ -45,23 +45,23 @@ func (crawl *Crawl) finish() { } close(crawl.Frontier.PullChan) - crawl.Logger.Warning("[WORKERS] Waiting for workers to finish") + crawl.Log.Warn("[WORKERS] Waiting for workers to finish") crawl.EnsureWorkersFinished() - crawl.Logger.Warning("[WORKERS] All workers finished") + crawl.Log.Warn("[WORKERS] All workers finished") // When all workers are finished, we can safely close the HQ related channels if crawl.UseHQ { - crawl.Logger.Warning("[HQ] Waiting for finished channel to be closed") + crawl.Log.Warn("[HQ] Waiting for finished channel to be closed") close(crawl.HQFinishedChannel) - crawl.Logger.Warning("[HQ] Finished channel closed") + crawl.Log.Warn("[HQ] Finished channel closed") - crawl.Logger.Warning("[HQ] Waiting for producer to finish") + crawl.Log.Warn("[HQ] Waiting for producer to finish") close(crawl.HQProducerChannel) - crawl.Logger.Warning("[HQ] Producer finished") + crawl.Log.Warn("[HQ] Producer finished") - crawl.Logger.Warning("[HQ] Waiting for all functions to return") + crawl.Log.Warn("[HQ] Waiting for all functions to return") crawl.HQChannelsWg.Wait() - crawl.Logger.Warning("[HQ] All functions returned") + crawl.Log.Warn("[HQ] All functions returned") } // Once all workers are done, it means nothing more is actively send to @@ -73,30 +73,30 @@ func (crawl *Crawl) finish() { time.Sleep(time.Second / 2) } - crawl.Logger.Warning("[WARC] Closing writer(s)..") + crawl.Log.Warn("[WARC] Closing writer(s)..") crawl.Client.Close() if crawl.Proxy != "" { crawl.ClientProxied.Close() } - crawl.Logger.Warning("[WARC] Writer(s) closed") + crawl.Log.Warn("[WARC] Writer(s) closed") // Closing the local queue used by the frontier crawl.Frontier.Queue.Close() - crawl.Logger.Warning("[FRONTIER] Queue closed") + crawl.Log.Warn("[FRONTIER] Queue closed") // Closing the seencheck database if crawl.Seencheck { crawl.Frontier.Seencheck.SeenDB.Close() - crawl.Logger.Warning("[SEENCHECK] Database closed") + crawl.Log.Warn("[SEENCHECK] Database closed") } // Dumping hosts pool and frontier stats to disk - crawl.Logger.Warning("[FRONTIER] Dumping hosts pool and frontier stats to " + path.Join(crawl.Frontier.JobPath, "frontier.gob")) + crawl.Log.Warn("[FRONTIER] Dumping hosts pool and frontier stats to " + path.Join(crawl.Frontier.JobPath, "frontier.gob")) crawl.Frontier.Save() - crawl.Logger.Warning("Finished!") + crawl.Log.Warn("Finished!") os.Exit(0) } @@ -105,7 +105,7 @@ func (crawl *Crawl) setupCloseHandler() { c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt, syscall.SIGTERM) <-c - crawl.Logger.Warning("CTRL+C catched.. cleaning up and exiting.") + crawl.Log.Warn("CTRL+C catched.. cleaning up and exiting.") signal.Stop(c) crawl.finish() } diff --git a/internal/pkg/crawl/hq.go b/internal/pkg/crawl/hq.go index 446024cf..f100e5bd 100644 --- a/internal/pkg/crawl/hq.go +++ b/internal/pkg/crawl/hq.go @@ -152,7 +152,7 @@ func (c *Crawl) HQConsumer() { var HQBatchSize = int(math.Ceil(float64(c.Workers) / 2)) if c.Finished.Get() { - c.Logger.Error("crawl finished, stopping HQ consumer") + c.Log.Error("crawl finished, stopping HQ consumer") break } @@ -177,9 +177,9 @@ func (c *Crawl) HQConsumer() { // get batch from crawl HQ batch, err := c.HQClient.Feed(HQBatchSize, c.HQStrategy) if err != nil { - c.Logger.WithFields(c.genLogFields(err, nil, map[string]interface{}{ + c.Log.WithFields(c.genLogFields(err, nil, map[string]interface{}{ "batchSize": HQBatchSize, - })).Errorln("error getting new URLs from crawl HQ") + })).Error("error getting new URLs from crawl HQ") } // send all URLs received in the batch to the frontier @@ -187,9 +187,9 @@ func (c *Crawl) HQConsumer() { for _, URL := range batch.URLs { newURL, err := url.Parse(URL.Value) if err != nil { - c.Logger.WithFields(c.genLogFields(err, nil, map[string]interface{}{ + c.Log.WithFields(c.genLogFields(err, nil, map[string]interface{}{ "batchSize": HQBatchSize, - })).Errorln("unable to parse URL received from crawl HQ, discarding") + })).Error("unable to parse URL received from crawl HQ, discarding") continue } @@ -209,7 +209,7 @@ func (c *Crawl) HQFinisher() { for finishedItem := range c.HQFinishedChannel { if finishedItem.ID == "" { - logWarning.WithFields(c.genLogFields(nil, finishedItem.URL, nil)).Warnln("URL has no ID, discarding") + c.Log.WithFields(c.genLogFields(nil, finishedItem.URL, nil)).Warn("URL has no ID, discarding") continue } @@ -220,9 +220,9 @@ func (c *Crawl) HQFinisher() { for { _, err := c.HQClient.Finished(finishedArray, locallyCrawledTotal) if err != nil { - logError.WithFields(c.genLogFields(err, nil, map[string]interface{}{ + c.Log.WithFields(c.genLogFields(err, nil, map[string]interface{}{ "finishedArray": finishedArray, - })).Errorln("error submitting finished urls to crawl HQ. retrying in one second...") + })).Error("error submitting finished urls to crawl HQ. retrying in one second...") time.Sleep(time.Second) continue } @@ -239,9 +239,9 @@ func (c *Crawl) HQFinisher() { for { _, err := c.HQClient.Finished(finishedArray, locallyCrawledTotal) if err != nil { - logError.WithFields(c.genLogFields(err, nil, map[string]interface{}{ + c.Log.WithFields(c.genLogFields(err, nil, map[string]interface{}{ "finishedArray": finishedArray, - })).Errorln("error submitting finished urls to crawl HQ. retrying in one second...") + })).Error("error submitting finished urls to crawl HQ. retrying in one second...") time.Sleep(time.Second) continue } @@ -263,10 +263,10 @@ func (c *Crawl) HQSeencheckURLs(URLs []*url.URL) (seencheckedBatch []*url.URL, e discoveredResponse, err := c.HQClient.Discovered(discoveredURLs, "asset", false, true) if err != nil { - logError.WithFields(c.genLogFields(err, nil, map[string]interface{}{ + c.Log.WithFields(c.genLogFields(err, nil, map[string]interface{}{ "batchLen": len(URLs), "urls": discoveredURLs, - })).Errorln("error sending seencheck payload to crawl HQ") + })).Error("error sending seencheck payload to crawl HQ") return seencheckedBatch, err } @@ -275,9 +275,9 @@ func (c *Crawl) HQSeencheckURLs(URLs []*url.URL) (seencheckedBatch []*url.URL, e // the returned payload only contain new URLs to be crawled by Zeno newURL, err := url.Parse(URL.Value) if err != nil { - logError.WithFields(c.genLogFields(err, URL, map[string]interface{}{ + c.Log.WithFields(c.genLogFields(err, URL, map[string]interface{}{ "batchLen": len(URLs), - })).Errorln("error parsing URL from HQ seencheck response") + })).Error("error parsing URL from HQ seencheck response") return seencheckedBatch, err } diff --git a/internal/pkg/crawl/log.go b/internal/pkg/crawl/log.go index f0e74c2a..317617af 100644 --- a/internal/pkg/crawl/log.go +++ b/internal/pkg/crawl/log.go @@ -87,5 +87,5 @@ func (c *Crawl) logCrawlSuccess(executionStart time.Time, statusCode int, item * fields["executionTime"] = time.Since(executionStart).Milliseconds() fields["url"] = utils.URLToString(item.URL) - logInfo.WithFields(fields).Info("URL archived") + c.Log.WithFields(fields).Info("URL archived") } diff --git a/internal/pkg/log/log.go b/internal/pkg/log/log.go index 0213204e..410bd930 100644 --- a/internal/pkg/log/log.go +++ b/internal/pkg/log/log.go @@ -15,6 +15,7 @@ var ( // Logger wraps slog.Logger to provide multi-output functionality type Logger struct { + handler *multiHandler slogger *slog.Logger } @@ -43,11 +44,13 @@ type Config struct { func New(cfg Config) (*Logger, error) { var handlers []slog.Handler + // Create stdout handler stdoutHandler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ Level: cfg.StdoutLevel, }) handlers = append(handlers, stdoutHandler) + // Create file handler if FileOutput is specified if cfg.FileOutput != "" { file, err := os.OpenFile(cfg.FileOutput, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) if err != nil { @@ -59,11 +62,13 @@ func New(cfg Config) (*Logger, error) { handlers = append(handlers, jsonHandler) } + // Create multi-handler mh := &multiHandler{handlers: handlers} + // Create slog.Logger slogger := slog.New(mh) - return &Logger{slogger: slogger}, nil + return &Logger{handler: mh, slogger: slogger}, nil } // Default returns the default Logger instance. @@ -144,35 +149,6 @@ func (l *Logger) Fatal(msg string, args ...any) { os.Exit(1) } -// WithFields returns a new Logger with the given fields added to all log entries. -// The fields are key-value pairs that will be included in every subsequent log entry -// made by the returned logger. -// -// This method creates a new Logger instance and does not modify the original logger. -// It can be chained with other WithFields calls or logging methods. -// -// Parameters: -// - fields: A map of key-value pairs to be included in all log entries -// -// Returns: -// - *Logger: A new Logger instance with the specified fields attached -// -// Example: -// -// logger := log.Default() -// userLogger := logger.WithFields(map[string]interface{}{ -// "user_id": 12345, -// "ip": "192.168.1.1", -// }) -// userLogger.Info("User logged in") -func (l *Logger) WithFields(fields map[string]interface{}) *Logger { - args := make([]any, 0, len(fields)*2) - for k, v := range fields { - args = append(args, k, v) - } - return &Logger{slogger: l.slogger.With(args...)} -} - //------------------------------------------------------------------------------------- // Following methods are used to implement the slog.Handler interface for multiHandler //------------------------------------------------------------------------------------- From e8865192e6cec41c5b477501cded221900dbb736 Mon Sep 17 00:00:00 2001 From: Thomas FOUBERT Date: Sat, 29 Jun 2024 13:03:00 -0400 Subject: [PATCH 11/16] =?UTF-8?q?feat:=20init=20the=20custom=20logger=20?= =?UTF-8?q?=E2=80=94=20will=20need=20to=20have=20flags=20in=20the=20future?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 3 ++- cmd/utils.go | 13 +++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 2eeae776..8e5c76cd 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ jobs/* jobs/ Zeno *.txt -*.sh \ No newline at end of file +*.sh +zeno.log \ No newline at end of file diff --git a/cmd/utils.go b/cmd/utils.go index f423e905..768ee745 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -1,6 +1,7 @@ package cmd import ( + "log/slog" "path" "time" @@ -8,6 +9,7 @@ import ( "github.com/internetarchive/Zeno/config" "github.com/internetarchive/Zeno/internal/pkg/crawl" "github.com/internetarchive/Zeno/internal/pkg/frontier" + "github.com/internetarchive/Zeno/internal/pkg/log" "github.com/internetarchive/Zeno/internal/pkg/utils" "github.com/paulbellamy/ratecounter" "github.com/sirupsen/logrus" @@ -18,6 +20,17 @@ import ( func InitCrawlWithCMD(flags config.Flags) *crawl.Crawl { var c = new(crawl.Crawl) + // Logger + customLogger, err := log.New(log.Config{ + FileOutput: "zeno.log", + FileLevel: slog.LevelDebug, + StdoutLevel: slog.LevelInfo, + }) + if err != nil { + panic(err) + } + c.Log = customLogger + // Statistics counters c.CrawledSeeds = new(ratecounter.Counter) c.CrawledAssets = new(ratecounter.Counter) From b7b341fbd1af9da36546fc0348414e4d3aaa9bbf Mon Sep 17 00:00:00 2001 From: Thomas FOUBERT Date: Sat, 29 Jun 2024 16:08:28 -0400 Subject: [PATCH 12/16] chore: functions comments --- internal/pkg/crawl/crawl.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/internal/pkg/crawl/crawl.go b/internal/pkg/crawl/crawl.go index 2ee5cecf..000c89a5 100644 --- a/internal/pkg/crawl/crawl.go +++ b/internal/pkg/crawl/crawl.go @@ -1,3 +1,4 @@ +// Package crawl handles all the crawling logic for Zeno package crawl import ( @@ -302,6 +303,8 @@ func (c *Crawl) Start() (err error) { return } +// WorkerWatcher is a background process that watches over the workers +// and remove them from the pool when they are done func (c *Crawl) WorkerWatcher() { var toEnd = false @@ -336,6 +339,7 @@ func (c *Crawl) WorkerWatcher() { } } +// EnsureWorkersFinished waits for all workers to finish func (c *Crawl) EnsureWorkersFinished() bool { var workerPoolLen int var timer = time.NewTimer(c.WorkerStopTimeout) @@ -367,18 +371,18 @@ func (c *Crawl) GetWorkerState(index int) interface{} { if index == -1 { var workersStatus = new(APIWorkersState) - for i, worker := range c.WorkerPool { - workersStatus.Workers = append(workersStatus.Workers, _getWorkerState(worker, i)) + for _, worker := range c.WorkerPool { + workersStatus.Workers = append(workersStatus.Workers, _getWorkerState(worker)) } return workersStatus } if index >= len(c.WorkerPool) { return nil } - return _getWorkerState(c.WorkerPool[index], index) + return _getWorkerState(c.WorkerPool[index]) } -func _getWorkerState(worker *Worker, index int) *APIWorkerState { +func _getWorkerState(worker *Worker) *APIWorkerState { lastErr := "" isLocked := true From bbcd99f601144f0b21817065f88adc1c34c75e40 Mon Sep 17 00:00:00 2001 From: Thomas FOUBERT Date: Sat, 29 Jun 2024 20:52:54 -0400 Subject: [PATCH 13/16] feat: add elasticsearch logging + stuff i dont remember --- cmd/cmd.go | 20 ++++- cmd/utils.go | 19 ++-- config/config.go | 9 +- go.mod | 10 ++- go.sum | 25 +++++- internal/pkg/crawl/api.go | 3 +- internal/pkg/crawl/crawl.go | 13 ++- internal/pkg/crawl/finish.go | 3 + internal/pkg/log/elasticsearch.go | 145 ++++++++++++++++++++++++++++++ internal/pkg/log/log.go | 67 +++++++++++--- internal/pkg/log/rotate.go | 93 +++++++++++++++++++ 11 files changed, 370 insertions(+), 37 deletions(-) create mode 100644 internal/pkg/log/elasticsearch.go create mode 100644 internal/pkg/log/rotate.go diff --git a/cmd/cmd.go b/cmd/cmd.go index d99e553b..50603b11 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -300,8 +300,24 @@ var GlobalFlags = []cli.Flag{ }, &cli.StringFlag{ Name: "es-url", - Usage: "ElasticSearch URL to use for indexing crawl logs.", - Destination: &config.App.Flags.ElasticSearchURL, + Usage: "comma-separated ElasticSearch URL to use for indexing crawl logs.", + Destination: &config.App.Flags.ElasticSearchURLs, + }, + &cli.StringFlag{ + Name: "es-user", + Usage: "ElasticSearch username to use for indexing crawl logs.", + Destination: &config.App.Flags.ElasticSearchURLs, + }, + &cli.StringFlag{ + Name: "es-password", + Usage: "ElasticSearch password to use for indexing crawl logs.", + Destination: &config.App.Flags.ElasticSearchURLs, + }, + &cli.StringFlag{ + Name: "es-index-prefix", + Usage: "ElasticSearch index prefix to use for indexing crawl logs. Default is : `zeno-`", + Value: "zeno-", + Destination: &config.App.Flags.ElasticSearchURLs, }, &cli.StringSliceFlag{ Name: "exclude-string", diff --git a/cmd/utils.go b/cmd/utils.go index 768ee745..c3924e77 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -3,6 +3,7 @@ package cmd import ( "log/slog" "path" + "strings" "time" "github.com/google/uuid" @@ -12,7 +13,6 @@ import ( "github.com/internetarchive/Zeno/internal/pkg/log" "github.com/internetarchive/Zeno/internal/pkg/utils" "github.com/paulbellamy/ratecounter" - "github.com/sirupsen/logrus" ) // InitCrawlWithCMD takes a config.Flags struct and return a @@ -21,10 +21,18 @@ func InitCrawlWithCMD(flags config.Flags) *crawl.Crawl { var c = new(crawl.Crawl) // Logger + elasticSearchURLs := strings.Split(flags.ElasticSearchURLs, ",") customLogger, err := log.New(log.Config{ - FileOutput: "zeno.log", - FileLevel: slog.LevelDebug, - StdoutLevel: slog.LevelInfo, + FileOutput: "zeno.log", + FileLevel: slog.LevelDebug, + StdoutLevel: slog.LevelInfo, + RotateLogFile: true, + RotateElasticSearchIndex: true, + ElasticsearchConfig: &log.ElasticsearchConfig{ + Addresses: elasticSearchURLs, + Username: flags.ElasticSearchUsername, + Password: flags.ElasticSearchPassword, + }, }) if err != nil { panic(err) @@ -38,7 +46,6 @@ func InitCrawlWithCMD(flags config.Flags) *crawl.Crawl { c.URIsPerSecond = ratecounter.NewRateCounter(1 * time.Second) c.LiveStats = flags.LiveStats - c.ElasticSearchURL = flags.ElasticSearchURL // Frontier c.Frontier = new(frontier.Frontier) @@ -50,7 +57,7 @@ func InitCrawlWithCMD(flags config.Flags) *crawl.Crawl { } else { UUID, err := uuid.NewUUID() if err != nil { - logrus.Fatal(err) + c.Log.Fatal("cmd/utils.go:InitCrawlWithCMD():uuid.NewUUID()", "error", err) } c.Job = UUID.String() diff --git a/config/config.go b/config/config.go index ee111b25..f8b696f1 100644 --- a/config/config.go +++ b/config/config.go @@ -62,9 +62,12 @@ type Flags struct { DisableAssetsCapture bool CertValidation bool - CloudflareStream bool - ElasticSearchURL string - ExcludedStrings cli.StringSlice + CloudflareStream bool + ElasticSearchURLs string + ElasticSearchUsername string + ElasticSearchPassword string + ElasticSearchIndexPrefix string + ExcludedStrings cli.StringSlice } type Application struct { diff --git a/go.mod b/go.mod index c73ebb62..e0503e56 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,8 @@ require ( github.com/beeker1121/goque v2.1.0+incompatible github.com/clbanning/mxj/v2 v2.7.0 github.com/dustin/go-humanize v1.0.1 + github.com/elastic/go-elasticsearch v0.0.0 + github.com/elastic/go-elasticsearch/v8 v8.14.0 github.com/gin-contrib/pprof v1.4.0 github.com/gin-gonic/gin v1.9.1 github.com/google/uuid v1.6.0 @@ -45,9 +47,12 @@ require ( github.com/cloudflare/circl v1.3.7 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect github.com/fatih/color v1.16.0 // indirect github.com/gabriel-vasile/mimetype v1.4.3 // indirect github.com/gin-contrib/sse v0.1.0 // indirect + github.com/go-logr/logr v1.4.1 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect github.com/go-playground/validator/v10 v10.19.0 // indirect @@ -88,10 +93,13 @@ require ( github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.2.12 // indirect github.com/xrash/smetrics v0.0.0-20231213231151-1d8dd44e695e // indirect + go.opentelemetry.io/otel v1.24.0 // indirect + go.opentelemetry.io/otel/metric v1.24.0 // indirect + go.opentelemetry.io/otel/trace v1.24.0 // indirect golang.org/x/arch v0.7.0 // indirect golang.org/x/crypto v0.21.0 // indirect golang.org/x/sync v0.6.0 // indirect - golang.org/x/sys v0.18.0 // indirect + golang.org/x/sys v0.20.0 // indirect golang.org/x/text v0.14.0 // indirect google.golang.org/protobuf v1.33.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index e85d1e74..4e20cd38 100644 --- a/go.sum +++ b/go.sum @@ -55,6 +55,12 @@ github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+m github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA= +github.com/elastic/elastic-transport-go/v8 v8.6.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk= +github.com/elastic/go-elasticsearch v0.0.0 h1:Pd5fqOuBxKxv83b0+xOAJDAkziWYwFinWnBO0y+TZaA= +github.com/elastic/go-elasticsearch v0.0.0/go.mod h1:TkBSJBuTyFdBnrNqoPc54FN0vKf5c04IdM4zuStJ7xg= +github.com/elastic/go-elasticsearch/v8 v8.14.0 h1:1ywU8WFReLLcxE1WJqii3hTtbPUE2hc38ZK/j4mMFow= +github.com/elastic/go-elasticsearch/v8 v8.14.0/go.mod h1:WRvnlGkSuZyp83M2U8El/LGXpCjYLrvlkSgkAH4O5I4= github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= @@ -71,8 +77,11 @@ github.com/gin-gonic/gin v1.9.1 h1:4idEAncQnU5cB7BeOkPtxjfCSye0AAm1R0RVIqJ+Jmg= github.com/gin-gonic/gin v1.9.1/go.mod h1:hPrL7YrpYKXt5YId3A/Tnip5kqbEAP+KLuI3SUcPTeU= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= -github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= -github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= +github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s= github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= @@ -300,6 +309,14 @@ github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= +go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= +go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= +go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI= +go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco= +go.opentelemetry.io/otel/sdk v1.21.0 h1:FTt8qirL1EysG6sTQRZ5TokkU8d0ugCj8htOgThZXQ8= +go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E= +go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI= +go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= @@ -361,8 +378,8 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= -golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= diff --git a/internal/pkg/crawl/api.go b/internal/pkg/crawl/api.go index d2348009..98e105ba 100644 --- a/internal/pkg/crawl/api.go +++ b/internal/pkg/crawl/api.go @@ -1,6 +1,7 @@ package crawl import ( + "fmt" "log/slog" "os" "strconv" @@ -100,7 +101,7 @@ func (crawl *Crawl) startAPI() { c.JSON(200, workersState) }) - err := r.Run(":" + crawl.APIPort) + err := r.Run(fmt.Sprintf(":%s", crawl.APIPort)) if err != nil { crawl.Log.Fatal("unable to start API", "error", err.Error()) } diff --git a/internal/pkg/crawl/crawl.go b/internal/pkg/crawl/crawl.go index 000c89a5..608380cb 100644 --- a/internal/pkg/crawl/crawl.go +++ b/internal/pkg/crawl/crawl.go @@ -28,12 +28,11 @@ type PrometheusMetrics struct { // Crawl define the parameters of a crawl process type Crawl struct { *sync.Mutex - StartTime time.Time - SeedList []frontier.Item - Paused *utils.TAtomBool - Finished *utils.TAtomBool - LiveStats bool - ElasticSearchURL string + StartTime time.Time + SeedList []frontier.Item + Paused *utils.TAtomBool + Finished *utils.TAtomBool + LiveStats bool // Logger Log *log.Logger @@ -178,7 +177,7 @@ func (c *Crawl) Start() (err error) { go c.writeFrontierToDisk() // Initialize WARC writer - logrus.Info("Initializing WARC writer..") + c.Log.Info("Initializing WARC writer..") // Init WARC rotator settings rotatorSettings := c.initWARCRotatorSettings() diff --git a/internal/pkg/crawl/finish.go b/internal/pkg/crawl/finish.go index ae034427..0b85fde7 100644 --- a/internal/pkg/crawl/finish.go +++ b/internal/pkg/crawl/finish.go @@ -98,6 +98,9 @@ func (crawl *Crawl) finish() { crawl.Log.Warn("Finished!") + crawl.Log.Warn("Shutting down the logger, bai bai") + crawl.Log.Stop() + os.Exit(0) } diff --git a/internal/pkg/log/elasticsearch.go b/internal/pkg/log/elasticsearch.go new file mode 100644 index 00000000..722720af --- /dev/null +++ b/internal/pkg/log/elasticsearch.go @@ -0,0 +1,145 @@ +package log + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "strings" + "time" + + "github.com/elastic/go-elasticsearch/esapi" + "github.com/elastic/go-elasticsearch/v8" +) + +// ElasticsearchConfig holds the configuration for Elasticsearch logging +type ElasticsearchConfig struct { + Addresses []string + Username string + Password string + Index string + Level slog.Level +} + +// ElasticsearchHandler implements slog.Handler for Elasticsearch +type ElasticsearchHandler struct { + client *elasticsearch.Client + index string + level slog.Level + attrs []slog.Attr + groups []string +} + +// Handle is responsible for passing the log record to all underlying handlers. +// It's called internally when a log message needs to be written. +func (h *ElasticsearchHandler) Handle(ctx context.Context, r slog.Record) error { + if !h.Enabled(ctx, r.Level) { + return nil + } + + doc := make(map[string]interface{}) + doc["timestamp"] = r.Time.Format(time.RFC3339) + doc["level"] = r.Level.String() + doc["message"] = r.Message + doc["attrs"] = make(map[string]interface{}) + + // Add pre-defined attributes + for _, attr := range h.attrs { + doc["attrs"].(map[string]interface{})[attr.Key] = attr.Value.Any() + } + + // Add record attributes + r.Attrs(func(a slog.Attr) bool { + doc["attrs"].(map[string]interface{})[a.Key] = a.Value.Any() + return true + }) + + // Handle groups + if len(h.groups) > 0 { + current := doc["attrs"].(map[string]interface{}) + for _, group := range h.groups { + next := make(map[string]interface{}) + current[group] = next + current = next + } + } + + payload, err := json.Marshal(doc) + if err != nil { + return err + } + + req := esapi.IndexRequest{ + Index: h.index, + Body: strings.NewReader(string(payload)), + } + + res, err := req.Do(ctx, h.client) + if err != nil { + return err + } + defer res.Body.Close() + + if res.IsError() { + return fmt.Errorf("error indexing document: %s", res.String()) + } + + return nil +} + +// Enabled checks if any of the underlying handlers are enabled for a given log level. +// It's used internally to determine if a log message should be processed by a given handler +func (h *ElasticsearchHandler) Enabled(ctx context.Context, level slog.Level) bool { + _, _ = ctx, level // Ignoring context and level + return level >= h.level +} + +// WithAttrs creates a new handler with additional attributes. +// It's used internally when the logger is asked to include additional context with all subsequent log messages. +func (h *ElasticsearchHandler) WithAttrs(attrs []slog.Attr) slog.Handler { + newHandler := *h + newHandler.attrs = append(h.attrs, attrs...) + return &newHandler +} + +// WithGroup creates a new handler with a new group added to the attribute grouping hierarchy. +// It's used internally when the logger is asked to group a set of attributes together. +func (h *ElasticsearchHandler) WithGroup(name string) slog.Handler { + newHandler := *h + newHandler.groups = append(h.groups, name) + return &newHandler +} + +func (h *ElasticsearchHandler) createIndex() error { + mapping := `{ + "mappings": { + "properties": { + "timestamp": {"type": "date"}, + "level": {"type": "keyword"}, + "message": {"type": "text"}, + "attrs": {"type": "object", "dynamic": true} + } + } + }` + + req := esapi.IndicesCreateRequest{ + Index: h.index, + Body: strings.NewReader(mapping), + } + + res, err := req.Do(context.Background(), h.client) + if err != nil { + return fmt.Errorf("error creating index: %w", err) + } + defer res.Body.Close() + + if res.IsError() { + // If the index already exists, that's okay + if strings.Contains(res.String(), "resource_already_exists_exception") { + return nil + } + return fmt.Errorf("error creating index: %s", res.String()) + } + + return nil +} diff --git a/internal/pkg/log/log.go b/internal/pkg/log/log.go index 410bd930..6a9d212d 100644 --- a/internal/pkg/log/log.go +++ b/internal/pkg/log/log.go @@ -3,9 +3,13 @@ package log import ( "context" + "fmt" "log/slog" "os" "sync" + "time" + + "github.com/elastic/go-elasticsearch/v8" ) var ( @@ -15,8 +19,10 @@ var ( // Logger wraps slog.Logger to provide multi-output functionality type Logger struct { - handler *multiHandler - slogger *slog.Logger + handler *multiHandler + slogger *slog.Logger + mu sync.Mutex + stopRotation chan struct{} } // multiHandler implements slog.Handler interface for multiple outputs @@ -26,9 +32,12 @@ type multiHandler struct { // Config holds the configuration for the logger type Config struct { - FileOutput string - FileLevel slog.Level - StdoutLevel slog.Level + FileOutput string + FileLevel slog.Level + StdoutLevel slog.Level + RotateLogFile bool + ElasticsearchConfig *ElasticsearchConfig + RotateElasticSearchIndex bool } // New creates a new Logger instance with the given configuration. @@ -56,10 +65,37 @@ func New(cfg Config) (*Logger, error) { if err != nil { return nil, err } - jsonHandler := slog.NewJSONHandler(file, &slog.HandlerOptions{ - Level: cfg.FileLevel, + fileHandler := &fileHandler{ + Handler: slog.NewJSONHandler(file, &slog.HandlerOptions{Level: cfg.FileLevel}), + filename: cfg.FileOutput, + file: file, + interval: 6 * time.Hour, + lastRotation: time.Now(), + } + handlers = append(handlers, fileHandler) + } + + // Create Elasticsearch handler if ElasticsearchConfig is specified + if cfg.ElasticsearchConfig != nil { + esClient, err := elasticsearch.NewClient(elasticsearch.Config{ + Addresses: cfg.ElasticsearchConfig.Addresses, + Username: cfg.ElasticsearchConfig.Username, + Password: cfg.ElasticsearchConfig.Password, }) - handlers = append(handlers, jsonHandler) + if err != nil { + return nil, fmt.Errorf("failed to create Elasticsearch client: %w", err) + } + esHandler := &ElasticsearchHandler{ + client: esClient, + index: fmt.Sprintf("zeno-%s", time.Now().Format("2006.01.02")), + level: cfg.ElasticsearchConfig.Level, + attrs: []slog.Attr{}, + groups: []string{}, + } + if err := esHandler.createIndex(); err != nil { + return nil, fmt.Errorf("failed to create Elasticsearch index: %w", err) + } + handlers = append(handlers, esHandler) } // Create multi-handler @@ -68,7 +104,12 @@ func New(cfg Config) (*Logger, error) { // Create slog.Logger slogger := slog.New(mh) - return &Logger{handler: mh, slogger: slogger}, nil + logger := &Logger{handler: mh, slogger: slogger} + + // Start rotation goroutine + logger.startRotation() + + return logger, nil } // Default returns the default Logger instance. @@ -153,7 +194,7 @@ func (l *Logger) Fatal(msg string, args ...any) { // Following methods are used to implement the slog.Handler interface for multiHandler //------------------------------------------------------------------------------------- -// This method checks if any of the underlying handlers are enabled for a given log level. +// Enabled checks if any of the underlying handlers are enabled for a given log level. // It's used internally to determine if a log message should be processed by a given handler func (h *multiHandler) Enabled(ctx context.Context, level slog.Level) bool { for _, handler := range h.handlers { @@ -164,7 +205,7 @@ func (h *multiHandler) Enabled(ctx context.Context, level slog.Level) bool { return false } -// This method is responsible for passing the log record to all underlying handlers. +// Handle is responsible for passing the log record to all underlying handlers. // It's called internally when a log message needs to be written. func (h *multiHandler) Handle(ctx context.Context, r slog.Record) error { for _, handler := range h.handlers { @@ -175,7 +216,7 @@ func (h *multiHandler) Handle(ctx context.Context, r slog.Record) error { return nil } -// This method creates a new handler with additional attributes. +// WithAttrs creates a new handler with additional attributes. // It's used internally when the logger is asked to include additional context with all subsequent log messages. func (h *multiHandler) WithAttrs(attrs []slog.Attr) slog.Handler { handlers := make([]slog.Handler, len(h.handlers)) @@ -185,7 +226,7 @@ func (h *multiHandler) WithAttrs(attrs []slog.Attr) slog.Handler { return &multiHandler{handlers: handlers} } -// This method creates a new handler with a new group added to the attribute grouping hierarchy. +// WithGroups creates a new handler with a new group added to the attribute grouping hierarchy. // It's used internally when the logger is asked to group a set of attributes together. func (h *multiHandler) WithGroup(name string) slog.Handler { handlers := make([]slog.Handler, len(h.handlers)) diff --git a/internal/pkg/log/rotate.go b/internal/pkg/log/rotate.go new file mode 100644 index 00000000..079c17e2 --- /dev/null +++ b/internal/pkg/log/rotate.go @@ -0,0 +1,93 @@ +package log + +import ( + "fmt" + "log/slog" + "os" + "time" +) + +// ... (previous Logger, multiHandler, logEntry, ElasticsearchHandler definitions remain the same) + +type rotateableHandler interface { + slog.Handler + Rotate() error + NextRotation() time.Time +} + +type fileHandler struct { + slog.Handler + filename string + file *os.File + interval time.Duration + lastRotation time.Time +} + +func (h *fileHandler) Rotate() error { + // ... (previous Rotate implementation remains the same) + h.lastRotation = time.Now() + return nil +} + +func (h *fileHandler) NextRotation() time.Time { + return h.lastRotation.Add(h.interval) +} + +func (h *ElasticsearchHandler) NextRotation() time.Time { + now := time.Now() + return time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()).Add(24 * time.Hour) +} + +func (l *Logger) startRotation() { + l.stopRotation = make(chan struct{}) + go func() { + for { + nextRotation := l.nextRotation() + select { + case <-time.After(time.Until(nextRotation)): + l.rotate() + case <-l.stopRotation: + return + } + } + }() +} + +func (l *Logger) nextRotation() time.Time { + l.mu.Lock() + defer l.mu.Unlock() + + var earliest time.Time + for _, h := range l.handler.handlers { + if rh, ok := h.(rotateableHandler); ok { + next := rh.NextRotation() + if earliest.IsZero() || next.Before(earliest) { + earliest = next + } + } + } + return earliest +} + +func (l *Logger) rotate() { + l.mu.Lock() + defer l.mu.Unlock() + + now := time.Now() + for _, h := range l.handler.handlers { + if rh, ok := h.(rotateableHandler); ok { + if now.After(rh.NextRotation()) || now.Equal(rh.NextRotation()) { + if err := rh.Rotate(); err != nil { + fmt.Printf("Error rotating handler: %v\n", err) + } + } + } + } +} + +// Stop stops the rotation goroutine +func (l *Logger) Stop() { + close(l.stopRotation) +} + +// ... (rest of the code remains the same) From 859cab8ef9845f8ec7a3968d64d7c89fd58ee3c9 Mon Sep 17 00:00:00 2001 From: Thomas FOUBERT Date: Sat, 29 Jun 2024 21:05:58 -0400 Subject: [PATCH 14/16] feat: catching logging errors like es or file errors --- cmd/utils.go | 15 +++++++++++++- internal/pkg/log/log.go | 46 ++++++++++++++++++++++++++++++++++------- 2 files changed, 52 insertions(+), 9 deletions(-) diff --git a/cmd/utils.go b/cmd/utils.go index c3924e77..7aa7bfa1 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -1,7 +1,9 @@ package cmd import ( + "fmt" "log/slog" + "os" "path" "strings" "time" @@ -35,10 +37,21 @@ func InitCrawlWithCMD(flags config.Flags) *crawl.Crawl { }, }) if err != nil { - panic(err) + fmt.Println(err) + os.Exit(1) } c.Log = customLogger + go func() { + errChan := c.Log.Errors() + for { + select { + case err := <-errChan: + fmt.Fprintf(os.Stderr, "Logging error: %v\n", err) + } + } + }() + // Statistics counters c.CrawledSeeds = new(ratecounter.Counter) c.CrawledAssets = new(ratecounter.Counter) diff --git a/internal/pkg/log/log.go b/internal/pkg/log/log.go index 6a9d212d..0d20f916 100644 --- a/internal/pkg/log/log.go +++ b/internal/pkg/log/log.go @@ -23,6 +23,7 @@ type Logger struct { slogger *slog.Logger mu sync.Mutex stopRotation chan struct{} + errorChan chan error } // multiHandler implements slog.Handler interface for multiple outputs @@ -104,7 +105,11 @@ func New(cfg Config) (*Logger, error) { // Create slog.Logger slogger := slog.New(mh) - logger := &Logger{handler: mh, slogger: slogger} + logger := &Logger{ + handler: mh, + slogger: slogger, + errorChan: make(chan error, 10), + } // Start rotation goroutine logger.startRotation() @@ -134,6 +139,27 @@ func Default() *Logger { return defaultLogger } +// Errors returns a channel that will receive logging errors +func (l *Logger) Errors() <-chan error { + return l.errorChan +} + +func (l *Logger) log(level slog.Level, msg string, args ...any) { + // Create a new Record with the message and args + r := slog.NewRecord(time.Now(), level, msg, 0) + r.Add(args...) + + err := l.handler.Handle(context.Background(), r) + if err != nil { + select { + case l.errorChan <- err: + default: + // If the error channel is full, log to stderr as a last resort + fmt.Fprintf(os.Stderr, "Logging error: %v\n", err) + } + } +} + // Debug logs a message at Debug level. // The first argument is the message to log, and subsequent arguments are key-value pairs // that will be included in the log entry. @@ -142,7 +168,7 @@ func Default() *Logger { // - msg: The message to log // - args: Optional key-value pairs to include in the log entry func (l *Logger) Debug(msg string, args ...any) { - l.slogger.Debug(msg, args...) + l.log(slog.LevelDebug, msg, args...) } // Info logs a message at Info level. @@ -153,7 +179,7 @@ func (l *Logger) Debug(msg string, args ...any) { // - msg: The message to log // - args: Optional key-value pairs to include in the log entry func (l *Logger) Info(msg string, args ...any) { - l.slogger.Info(msg, args...) + l.log(slog.LevelInfo, msg, args...) } // Warn logs a message at Warn level. @@ -164,7 +190,7 @@ func (l *Logger) Info(msg string, args ...any) { // - msg: The message to log // - args: Optional key-value pairs to include in the log entry func (l *Logger) Warn(msg string, args ...any) { - l.slogger.Warn(msg, args...) + l.log(slog.LevelWarn, msg, args...) } // Error logs a message at Error level. @@ -175,10 +201,10 @@ func (l *Logger) Warn(msg string, args ...any) { // - msg: The message to log // - args: Optional key-value pairs to include in the log entry func (l *Logger) Error(msg string, args ...any) { - l.slogger.Error(msg, args...) + l.log(slog.LevelError, msg, args...) } -// Fatal logs a message at Fatal level and then calls os.Exit(1). +// Fatal logs a message at Error level and then calls os.Exit(1). // The first argument is the message to log, and subsequent arguments are key-value pairs // that will be included in the log entry. // @@ -186,7 +212,7 @@ func (l *Logger) Error(msg string, args ...any) { // - msg: The message to log // - args: Optional key-value pairs to include in the log entry func (l *Logger) Fatal(msg string, args ...any) { - l.slogger.Log(context.Background(), slog.LevelError, msg, args...) + l.log(slog.LevelError, msg, args...) os.Exit(1) } @@ -208,11 +234,15 @@ func (h *multiHandler) Enabled(ctx context.Context, level slog.Level) bool { // Handle is responsible for passing the log record to all underlying handlers. // It's called internally when a log message needs to be written. func (h *multiHandler) Handle(ctx context.Context, r slog.Record) error { + var errs []error for _, handler := range h.handlers { if err := handler.Handle(ctx, r); err != nil { - return err + errs = append(errs, fmt.Errorf("handler error: %w", err)) } } + if len(errs) > 0 { + return fmt.Errorf("multiple handler errors: %v", errs) + } return nil } From f1de10151a04da153477f4b51581ff290e0c1215 Mon Sep 17 00:00:00 2001 From: Thomas FOUBERT Date: Sat, 29 Jun 2024 21:24:39 -0400 Subject: [PATCH 15/16] feat: elastic logging is working and tested --- cmd/cmd.go | 6 +++--- cmd/utils.go | 19 ++++++++++++++----- internal/pkg/log/elasticsearch.go | 13 ++++++++----- 3 files changed, 25 insertions(+), 13 deletions(-) diff --git a/cmd/cmd.go b/cmd/cmd.go index 50603b11..f2a39ed3 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -306,18 +306,18 @@ var GlobalFlags = []cli.Flag{ &cli.StringFlag{ Name: "es-user", Usage: "ElasticSearch username to use for indexing crawl logs.", - Destination: &config.App.Flags.ElasticSearchURLs, + Destination: &config.App.Flags.ElasticSearchUsername, }, &cli.StringFlag{ Name: "es-password", Usage: "ElasticSearch password to use for indexing crawl logs.", - Destination: &config.App.Flags.ElasticSearchURLs, + Destination: &config.App.Flags.ElasticSearchPassword, }, &cli.StringFlag{ Name: "es-index-prefix", Usage: "ElasticSearch index prefix to use for indexing crawl logs. Default is : `zeno-`", Value: "zeno-", - Destination: &config.App.Flags.ElasticSearchURLs, + Destination: &config.App.Flags.ElasticSearchIndexPrefix, }, &cli.StringSliceFlag{ Name: "exclude-string", diff --git a/cmd/utils.go b/cmd/utils.go index 7aa7bfa1..f255b1c0 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -23,18 +23,27 @@ func InitCrawlWithCMD(flags config.Flags) *crawl.Crawl { var c = new(crawl.Crawl) // Logger + var elasticSearchConfig *log.ElasticsearchConfig elasticSearchURLs := strings.Split(flags.ElasticSearchURLs, ",") + if elasticSearchURLs[0] == "" { + elasticSearchConfig = nil + } else { + elasticSearchConfig = &log.ElasticsearchConfig{ + Addresses: elasticSearchURLs, + Username: flags.ElasticSearchUsername, + Password: flags.ElasticSearchPassword, + IndexPrefix: flags.ElasticSearchIndexPrefix, + Level: slog.LevelDebug, + } + } + customLogger, err := log.New(log.Config{ FileOutput: "zeno.log", FileLevel: slog.LevelDebug, StdoutLevel: slog.LevelInfo, RotateLogFile: true, RotateElasticSearchIndex: true, - ElasticsearchConfig: &log.ElasticsearchConfig{ - Addresses: elasticSearchURLs, - Username: flags.ElasticSearchUsername, - Password: flags.ElasticSearchPassword, - }, + ElasticsearchConfig: elasticSearchConfig, }) if err != nil { fmt.Println(err) diff --git a/internal/pkg/log/elasticsearch.go b/internal/pkg/log/elasticsearch.go index 722720af..f414a557 100644 --- a/internal/pkg/log/elasticsearch.go +++ b/internal/pkg/log/elasticsearch.go @@ -14,11 +14,11 @@ import ( // ElasticsearchConfig holds the configuration for Elasticsearch logging type ElasticsearchConfig struct { - Addresses []string - Username string - Password string - Index string - Level slog.Level + Addresses []string + Username string + Password string + IndexPrefix string + Level slog.Level } // ElasticsearchHandler implements slog.Handler for Elasticsearch @@ -129,6 +129,9 @@ func (h *ElasticsearchHandler) createIndex() error { res, err := req.Do(context.Background(), h.client) if err != nil { + if strings.Contains(err.Error(), "EOF") { + return fmt.Errorf("error creating index: received EOF from Elasticsearch, is the server running? check your ES logs for more information") + } return fmt.Errorf("error creating index: %w", err) } defer res.Body.Close() From c2f492af2ce6deba2c5ac1a3e049af4360fcd0a9 Mon Sep 17 00:00:00 2001 From: Thomas FOUBERT Date: Sun, 30 Jun 2024 22:20:46 -0400 Subject: [PATCH 16/16] corrected pr comments + enhanced code and logic --- cmd/cmd.go | 11 +++- cmd/utils.go | 17 ++---- config/config.go | 2 +- internal/pkg/crawl/crawl.go | 10 ++-- internal/pkg/crawl/finish.go | 3 +- internal/pkg/frontier/frontier.go | 6 +- internal/pkg/log/elasticsearch.go | 29 ++++++++++ internal/pkg/log/file.go | 52 +++++++++++++++++ internal/pkg/log/log.go | 92 ++++++++----------------------- internal/pkg/log/misc.go | 26 +++++++++ internal/pkg/log/multi_handler.go | 58 +++++++++++++++++++ internal/pkg/log/rotate.go | 41 +++----------- 12 files changed, 224 insertions(+), 123 deletions(-) create mode 100644 internal/pkg/log/file.go create mode 100644 internal/pkg/log/misc.go create mode 100644 internal/pkg/log/multi_handler.go diff --git a/cmd/cmd.go b/cmd/cmd.go index f2a39ed3..5ffb177d 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -298,6 +298,13 @@ var GlobalFlags = []cli.Flag{ Usage: "If turned on, the crawler will send back URLs that hit a rate limit to crawl HQ.", Destination: &config.App.Flags.HQRateLimitingSendBack, }, + // Logging flags + &cli.StringFlag{ + Name: "log-file-output-dir", + Usage: "Directory to write log files to.", + Value: "jobs", + Destination: &config.App.Flags.LogFileOutputDir, + }, &cli.StringFlag{ Name: "es-url", Usage: "comma-separated ElasticSearch URL to use for indexing crawl logs.", @@ -315,8 +322,8 @@ var GlobalFlags = []cli.Flag{ }, &cli.StringFlag{ Name: "es-index-prefix", - Usage: "ElasticSearch index prefix to use for indexing crawl logs. Default is : `zeno-`", - Value: "zeno-", + Usage: "ElasticSearch index prefix to use for indexing crawl logs. Default is : `zeno`, without `-`", + Value: "zeno", Destination: &config.App.Flags.ElasticSearchIndexPrefix, }, &cli.StringSliceFlag{ diff --git a/cmd/utils.go b/cmd/utils.go index f255b1c0..155949b3 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -37,8 +37,12 @@ func InitCrawlWithCMD(flags config.Flags) *crawl.Crawl { } } + logFileOutput := &log.Logfile{ + Dir: strings.TrimRight(flags.LogFileOutputDir, "/"), + Prefix: "zeno", + } customLogger, err := log.New(log.Config{ - FileOutput: "zeno.log", + FileOutput: logFileOutput, FileLevel: slog.LevelDebug, StdoutLevel: slog.LevelInfo, RotateLogFile: true, @@ -51,16 +55,6 @@ func InitCrawlWithCMD(flags config.Flags) *crawl.Crawl { } c.Log = customLogger - go func() { - errChan := c.Log.Errors() - for { - select { - case err := <-errChan: - fmt.Fprintf(os.Stderr, "Logging error: %v\n", err) - } - } - }() - // Statistics counters c.CrawledSeeds = new(ratecounter.Counter) c.CrawledAssets = new(ratecounter.Counter) @@ -71,6 +65,7 @@ func InitCrawlWithCMD(flags config.Flags) *crawl.Crawl { // Frontier c.Frontier = new(frontier.Frontier) + c.Frontier.Log = c.Log // If the job name isn't specified, we generate a random name if flags.Job == "" { diff --git a/config/config.go b/config/config.go index f8b696f1..112d0e18 100644 --- a/config/config.go +++ b/config/config.go @@ -62,12 +62,12 @@ type Flags struct { DisableAssetsCapture bool CertValidation bool - CloudflareStream bool ElasticSearchURLs string ElasticSearchUsername string ElasticSearchPassword string ElasticSearchIndexPrefix string ExcludedStrings cli.StringSlice + LogFileOutputDir string } type Application struct { diff --git a/internal/pkg/crawl/crawl.go b/internal/pkg/crawl/crawl.go index 608380cb..802dfb1d 100644 --- a/internal/pkg/crawl/crawl.go +++ b/internal/pkg/crawl/crawl.go @@ -201,7 +201,7 @@ func (c *Crawl) Start() (err error) { c.Client, err = warc.NewWARCWritingHTTPClient(HTTPClientSettings) if err != nil { - logrus.Fatalf("Unable to init WARC writing HTTP client: %s", err) + c.Log.Fatal("Unable to init WARC writing HTTP client", "error", err) } go func() { @@ -211,7 +211,7 @@ func (c *Crawl) Start() (err error) { }() c.Client.Timeout = time.Duration(c.HTTPTimeout) * time.Second - logrus.Infof("HTTP client timeout set to %d seconds", c.HTTPTimeout) + c.Log.Info("HTTP client timeout set", "timeout", c.HTTPTimeout) if c.Proxy != "" { proxyHTTPClientSettings := HTTPClientSettings @@ -229,7 +229,7 @@ func (c *Crawl) Start() (err error) { }() } - logrus.Info("WARC writer initialized") + c.Log.Info("WARC writer initialized") // Process responsible for slowing or pausing the crawl // when the WARC writing queue gets too big @@ -280,13 +280,13 @@ func (c *Crawl) Start() (err error) { go c.HQWebsocket() } else { // Push the seed list to the queue - logrus.Info("Pushing seeds in the local queue..") + c.Log.Info("Pushing seeds in the local queue..") for _, item := range c.SeedList { item := item c.Frontier.PushChan <- &item } c.SeedList = nil - logrus.Info("All seeds are now in queue, crawling will start") + c.Log.Info("All seeds are now in queue, crawling will start") } // Start the background process that will catch when there diff --git a/internal/pkg/crawl/finish.go b/internal/pkg/crawl/finish.go index 0b85fde7..92dc9420 100644 --- a/internal/pkg/crawl/finish.go +++ b/internal/pkg/crawl/finish.go @@ -99,7 +99,8 @@ func (crawl *Crawl) finish() { crawl.Log.Warn("Finished!") crawl.Log.Warn("Shutting down the logger, bai bai") - crawl.Log.Stop() + crawl.Log.StopRotation() + crawl.Log.StopErrorLog() os.Exit(0) } diff --git a/internal/pkg/frontier/frontier.go b/internal/pkg/frontier/frontier.go index a9e73c02..15525188 100644 --- a/internal/pkg/frontier/frontier.go +++ b/internal/pkg/frontier/frontier.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/beeker1121/goque" + "github.com/internetarchive/Zeno/internal/pkg/log" "github.com/internetarchive/Zeno/internal/pkg/utils" "github.com/paulbellamy/ratecounter" "github.com/philippgille/gokv/leveldb" @@ -42,6 +43,7 @@ type Frontier struct { UseSeencheck bool Seencheck *Seencheck LoggingChan chan *FrontierLogMessage + Log *log.Logger } type FrontierLogMessage struct { @@ -70,7 +72,7 @@ func (f *Frontier) Init(jobPath string, loggingChan chan *FrontierLogMessage, wo f.QueueCount = new(ratecounter.Counter) f.QueueCount.Incr(int64(f.Queue.Length())) - logrus.Info("persistent queue initialized") + f.Log.Info("persistent queue initialized") // Initialize the seencheck f.UseSeencheck = useSeencheck @@ -82,7 +84,7 @@ func (f *Frontier) Init(jobPath string, loggingChan chan *FrontierLogMessage, wo return err } - logrus.Info("seencheck initialized") + f.Log.Info("seencheck initialized") } f.FinishingQueueReader = new(utils.TAtomBool) diff --git a/internal/pkg/log/elasticsearch.go b/internal/pkg/log/elasticsearch.go index f414a557..3bac1877 100644 --- a/internal/pkg/log/elasticsearch.go +++ b/internal/pkg/log/elasticsearch.go @@ -28,6 +28,7 @@ type ElasticsearchHandler struct { level slog.Level attrs []slog.Attr groups []string + config *ElasticsearchConfig } // Handle is responsible for passing the log record to all underlying handlers. @@ -146,3 +147,31 @@ func (h *ElasticsearchHandler) createIndex() error { return nil } + +// Rotate implements the rotation for the Elasticsearch handler. +// It updates the index name to use the current date and creates the new index if it doesn't exist. +func (h *ElasticsearchHandler) Rotate() error { + newIndex := fmt.Sprintf("%s-%s", h.config.IndexPrefix, time.Now().Format("2006.01.02")) + + // If the index name hasn't changed, no need to rotate + if newIndex == h.index { + return nil + } + + // Update the index name + h.index = newIndex + + // Create the new index + err := h.createIndex() + if err != nil { + return fmt.Errorf("failed to create new Elasticsearch index during rotation: %w", err) + } + + return nil +} + +// NextRotation calculates the next rotation time, which is the start of the next day +func (h *ElasticsearchHandler) NextRotation() time.Time { + now := time.Now() + return time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()).Add(24 * time.Hour) +} diff --git a/internal/pkg/log/file.go b/internal/pkg/log/file.go new file mode 100644 index 00000000..fb8527b7 --- /dev/null +++ b/internal/pkg/log/file.go @@ -0,0 +1,52 @@ +package log + +import ( + "fmt" + "log/slog" + "os" + "time" +) + +type fileHandler struct { + slog.Handler + filename string + file *os.File + rotationInterval time.Duration + lastRotation time.Time + level slog.Level + logfile *Logfile +} + +type Logfile struct { + Dir string + Prefix string +} + +func (h *fileHandler) Rotate() error { + if h.file != nil { + h.file.Close() + } + + h.filename = h.logfile.Filename() + + file, err := os.OpenFile(h.filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + if err != nil { + return fmt.Errorf("failed to open new log file: %w", err) + } + + h.file = file + h.Handler = slog.NewJSONHandler(file, &slog.HandlerOptions{ + Level: h.level, + }) + + h.lastRotation = time.Now() + return nil +} + +func (h *fileHandler) NextRotation() time.Time { + return h.lastRotation.Add(h.rotationInterval) +} + +func (s *Logfile) Filename() string { + return fmt.Sprintf("%s/%s.%s.log", s.Dir, s.Prefix, time.Now().Format("2006.01.02-15h")) +} diff --git a/internal/pkg/log/log.go b/internal/pkg/log/log.go index 0d20f916..bde46a9c 100644 --- a/internal/pkg/log/log.go +++ b/internal/pkg/log/log.go @@ -6,6 +6,7 @@ import ( "fmt" "log/slog" "os" + "path/filepath" "sync" "time" @@ -23,17 +24,13 @@ type Logger struct { slogger *slog.Logger mu sync.Mutex stopRotation chan struct{} + stopErrorLog chan struct{} errorChan chan error } -// multiHandler implements slog.Handler interface for multiple outputs -type multiHandler struct { - handlers []slog.Handler -} - // Config holds the configuration for the logger type Config struct { - FileOutput string + FileOutput *Logfile FileLevel slog.Level StdoutLevel slog.Level RotateLogFile bool @@ -61,17 +58,24 @@ func New(cfg Config) (*Logger, error) { handlers = append(handlers, stdoutHandler) // Create file handler if FileOutput is specified - if cfg.FileOutput != "" { - file, err := os.OpenFile(cfg.FileOutput, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + if cfg.FileOutput != nil { + // Create directories if they don't exist + err := os.MkdirAll(filepath.Dir(cfg.FileOutput.Filename()), 0755) + if err != nil { + return nil, err + } + + // Open log file + file, err := os.OpenFile(cfg.FileOutput.Filename(), os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) if err != nil { return nil, err } fileHandler := &fileHandler{ - Handler: slog.NewJSONHandler(file, &slog.HandlerOptions{Level: cfg.FileLevel}), - filename: cfg.FileOutput, - file: file, - interval: 6 * time.Hour, - lastRotation: time.Now(), + Handler: slog.NewJSONHandler(file, &slog.HandlerOptions{Level: cfg.FileLevel}), + filename: cfg.FileOutput.Filename(), + file: file, + rotationInterval: 6 * time.Hour, + lastRotation: time.Now(), } handlers = append(handlers, fileHandler) } @@ -88,10 +92,11 @@ func New(cfg Config) (*Logger, error) { } esHandler := &ElasticsearchHandler{ client: esClient, - index: fmt.Sprintf("zeno-%s", time.Now().Format("2006.01.02")), + index: fmt.Sprintf("%s-%s", cfg.ElasticsearchConfig.IndexPrefix, time.Now().Format("2006.01.02")), level: cfg.ElasticsearchConfig.Level, attrs: []slog.Attr{}, groups: []string{}, + config: cfg.ElasticsearchConfig, } if err := esHandler.createIndex(); err != nil { return nil, fmt.Errorf("failed to create Elasticsearch index: %w", err) @@ -106,9 +111,10 @@ func New(cfg Config) (*Logger, error) { slogger := slog.New(mh) logger := &Logger{ - handler: mh, - slogger: slogger, - errorChan: make(chan error, 10), + handler: mh, + slogger: slogger, + errorChan: make(chan error, 10), + stopErrorLog: make(chan struct{}), } // Start rotation goroutine @@ -127,7 +133,7 @@ func New(cfg Config) (*Logger, error) { func Default() *Logger { once.Do(func() { logger, err := New(Config{ - FileOutput: "zeno.log", + FileOutput: &Logfile{Dir: "jobs", Prefix: "zeno"}, FileLevel: slog.LevelInfo, StdoutLevel: slog.LevelInfo, }) @@ -215,53 +221,3 @@ func (l *Logger) Fatal(msg string, args ...any) { l.log(slog.LevelError, msg, args...) os.Exit(1) } - -//------------------------------------------------------------------------------------- -// Following methods are used to implement the slog.Handler interface for multiHandler -//------------------------------------------------------------------------------------- - -// Enabled checks if any of the underlying handlers are enabled for a given log level. -// It's used internally to determine if a log message should be processed by a given handler -func (h *multiHandler) Enabled(ctx context.Context, level slog.Level) bool { - for _, handler := range h.handlers { - if handler.Enabled(ctx, level) { - return true - } - } - return false -} - -// Handle is responsible for passing the log record to all underlying handlers. -// It's called internally when a log message needs to be written. -func (h *multiHandler) Handle(ctx context.Context, r slog.Record) error { - var errs []error - for _, handler := range h.handlers { - if err := handler.Handle(ctx, r); err != nil { - errs = append(errs, fmt.Errorf("handler error: %w", err)) - } - } - if len(errs) > 0 { - return fmt.Errorf("multiple handler errors: %v", errs) - } - return nil -} - -// WithAttrs creates a new handler with additional attributes. -// It's used internally when the logger is asked to include additional context with all subsequent log messages. -func (h *multiHandler) WithAttrs(attrs []slog.Attr) slog.Handler { - handlers := make([]slog.Handler, len(h.handlers)) - for i, handler := range h.handlers { - handlers[i] = handler.WithAttrs(attrs) - } - return &multiHandler{handlers: handlers} -} - -// WithGroups creates a new handler with a new group added to the attribute grouping hierarchy. -// It's used internally when the logger is asked to group a set of attributes together. -func (h *multiHandler) WithGroup(name string) slog.Handler { - handlers := make([]slog.Handler, len(h.handlers)) - for i, handler := range h.handlers { - handlers[i] = handler.WithGroup(name) - } - return &multiHandler{handlers: handlers} -} diff --git a/internal/pkg/log/misc.go b/internal/pkg/log/misc.go new file mode 100644 index 00000000..d5541dd3 --- /dev/null +++ b/internal/pkg/log/misc.go @@ -0,0 +1,26 @@ +package log + +import ( + "fmt" + "os" +) + +// WatchErrors watches for errors in the logger and prints them to stderr. +func (l *Logger) WatchErrors() { + go func() { + errChan := l.Errors() + for { + select { + case <-l.stopErrorLog: + return + case err := <-errChan: + fmt.Fprintf(os.Stderr, "Logging error: %v\n", err) + } + } + }() +} + +// StopErrorLog stops the error logger. +func (l *Logger) StopErrorLog() { + close(l.stopErrorLog) +} diff --git a/internal/pkg/log/multi_handler.go b/internal/pkg/log/multi_handler.go new file mode 100644 index 00000000..9e0a9104 --- /dev/null +++ b/internal/pkg/log/multi_handler.go @@ -0,0 +1,58 @@ +package log + +import ( + "context" + "fmt" + "log/slog" +) + +// multiHandler implements slog.Handler interface for multiple outputs +type multiHandler struct { + handlers []slog.Handler +} + +// Enabled checks if any of the underlying handlers are enabled for a given log level. +// It's used internally to determine if a log message should be processed by a given handler +func (h *multiHandler) Enabled(ctx context.Context, level slog.Level) bool { + for _, handler := range h.handlers { + if handler.Enabled(ctx, level) { + return true + } + } + return false +} + +// Handle is responsible for passing the log record to all underlying handlers. +// It's called internally when a log message needs to be written. +func (h *multiHandler) Handle(ctx context.Context, r slog.Record) error { + var errs []error + for _, handler := range h.handlers { + if err := handler.Handle(ctx, r); err != nil { + errs = append(errs, fmt.Errorf("handler error: %w", err)) + } + } + if len(errs) > 0 { + return fmt.Errorf("multiple handler errors: %v", errs) + } + return nil +} + +// WithAttrs creates a new handler with additional attributes. +// It's used internally when the logger is asked to include additional context with all subsequent log messages. +func (h *multiHandler) WithAttrs(attrs []slog.Attr) slog.Handler { + handlers := make([]slog.Handler, len(h.handlers)) + for i, handler := range h.handlers { + handlers[i] = handler.WithAttrs(attrs) + } + return &multiHandler{handlers: handlers} +} + +// WithGroups creates a new handler with a new group added to the attribute grouping hierarchy. +// It's used internally when the logger is asked to group a set of attributes together. +func (h *multiHandler) WithGroup(name string) slog.Handler { + handlers := make([]slog.Handler, len(h.handlers)) + for i, handler := range h.handlers { + handlers[i] = handler.WithGroup(name) + } + return &multiHandler{handlers: handlers} +} diff --git a/internal/pkg/log/rotate.go b/internal/pkg/log/rotate.go index 079c17e2..f9f3dd7c 100644 --- a/internal/pkg/log/rotate.go +++ b/internal/pkg/log/rotate.go @@ -3,41 +3,15 @@ package log import ( "fmt" "log/slog" - "os" "time" ) -// ... (previous Logger, multiHandler, logEntry, ElasticsearchHandler definitions remain the same) - -type rotateableHandler interface { +type rotatableHandler interface { slog.Handler Rotate() error NextRotation() time.Time } -type fileHandler struct { - slog.Handler - filename string - file *os.File - interval time.Duration - lastRotation time.Time -} - -func (h *fileHandler) Rotate() error { - // ... (previous Rotate implementation remains the same) - h.lastRotation = time.Now() - return nil -} - -func (h *fileHandler) NextRotation() time.Time { - return h.lastRotation.Add(h.interval) -} - -func (h *ElasticsearchHandler) NextRotation() time.Time { - now := time.Now() - return time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()).Add(24 * time.Hour) -} - func (l *Logger) startRotation() { l.stopRotation = make(chan struct{}) go func() { @@ -53,13 +27,15 @@ func (l *Logger) startRotation() { }() } +// nextRotation returns the earliest next rotation time +// of all rotatable handlers func (l *Logger) nextRotation() time.Time { l.mu.Lock() defer l.mu.Unlock() var earliest time.Time for _, h := range l.handler.handlers { - if rh, ok := h.(rotateableHandler); ok { + if rh, ok := h.(rotatableHandler); ok { next := rh.NextRotation() if earliest.IsZero() || next.Before(earliest) { earliest = next @@ -69,13 +45,14 @@ func (l *Logger) nextRotation() time.Time { return earliest } +// rotate rotates func (l *Logger) rotate() { l.mu.Lock() defer l.mu.Unlock() now := time.Now() for _, h := range l.handler.handlers { - if rh, ok := h.(rotateableHandler); ok { + if rh, ok := h.(rotatableHandler); ok { if now.After(rh.NextRotation()) || now.Equal(rh.NextRotation()) { if err := rh.Rotate(); err != nil { fmt.Printf("Error rotating handler: %v\n", err) @@ -85,9 +62,7 @@ func (l *Logger) rotate() { } } -// Stop stops the rotation goroutine -func (l *Logger) Stop() { +// StopRotation stops the rotation goroutine +func (l *Logger) StopRotation() { close(l.stopRotation) } - -// ... (rest of the code remains the same)