diff --git a/internal/pkg/crawl/worker.go b/internal/pkg/crawl/worker.go index f81efac1..a38958bb 100644 --- a/internal/pkg/crawl/worker.go +++ b/internal/pkg/crawl/worker.go @@ -54,7 +54,7 @@ type Worker struct { state *workerState doneSignal chan bool pool *WorkerPool - logger *log.Entry + logger *log.FieldedLogger } // Run is the key component of a crawl, it's a background processed dispatched diff --git a/internal/pkg/log/log.go b/internal/pkg/log/log.go index d53ee707..e5b56b61 100644 --- a/internal/pkg/log/log.go +++ b/internal/pkg/log/log.go @@ -16,14 +16,16 @@ import ( "os" "path/filepath" "sync" + "sync/atomic" "time" "github.com/elastic/go-elasticsearch/v8" ) var ( - defaultLogger *Logger - once sync.Once + isLoggerInit *atomic.Bool + storedLogger *Logger + once sync.Once ) // Logger wraps slog.Logger to provide multi-output functionality @@ -45,11 +47,15 @@ type Config struct { RotateLogFile bool ElasticsearchConfig *ElasticsearchConfig RotateElasticSearchIndex bool + isDefault bool } // New creates a new Logger instance with the given configuration. // It sets up handlers for stdout (text format) and file output (JSON format) if specified. // If FileOutput is empty, only stdout logging will be enabled. +// Only the first call to New will store the logger to be reused. Subsequent calls will return a new logger instance. +// Only the first call to New will rotate the logs destinations. +// Please refrain from calling New multiple times in the same program. // // Parameters: // - cfg: Config struct containing logger configuration options @@ -128,32 +134,51 @@ func New(cfg Config) (*Logger, error) { stopErrorLog: make(chan struct{}), } - // Start rotation goroutine - logger.startRotation() + if !cfg.isDefault { + once.Do(func() { + isLoggerInit = new(atomic.Bool) + storedLogger = logger + isLoggerInit.CompareAndSwap(false, true) + + // Start rotation goroutine + logger.startRotation() + }) + } return logger, nil } -// Default returns the default Logger instance. +// DefaultOrStored returns the default Logger instance or if already initialized, the logger created by first call to New(). // The default logger writes to both stdout (text format) and a file named "app.log" (JSON format). // Both outputs are set to log messages at Info level and above. // This function uses sync.Once to ensure that the default logger is only created once. // // Returns: // - *Logger: The default Logger instance -func Default() *Logger { +// - bool: True if the logger was created by this function, false if the logger was already initialized +func DefaultOrStored() (*Logger, bool) { + var created = false once.Do(func() { + isLoggerInit = new(atomic.Bool) logger, err := New(Config{ FileConfig: &LogfileConfig{Dir: "jobs", Prefix: "zeno"}, FileLevel: slog.LevelInfo, StdoutLevel: slog.LevelInfo, + isDefault: true, }) if err != nil { panic(err) } - defaultLogger = logger + storedLogger = logger + created = isLoggerInit.CompareAndSwap(false, true) }) - return defaultLogger + return storedLogger, created +} + +// GetStoredLogger returns the logger created by the first call to New() or DefaultOrStored(). +// If the logger has not been initialized, it will return nil. +func GetStoredLogger() *Logger { + return storedLogger } // Errors returns a channel that will receive logging errors diff --git a/internal/pkg/log/withfields.go b/internal/pkg/log/withfields.go index a5d7c60e..350ba018 100644 --- a/internal/pkg/log/withfields.go +++ b/internal/pkg/log/withfields.go @@ -8,16 +8,16 @@ import ( "time" ) -// WithFields returns a new log entry with the given fields. -// The fields are key-value pairs that will be included only in the next log entry. +// WithFields returns a new fielded logger with the given fields. +// The fields are key-value pairs that will be included in the given logger. // -// This method returns a log Entry, which can be used to log a message with the specified fields. +// This method returns a log FieldedLogger, which can be used to log a message with the specified fields. // // Parameters: -// - fields: A map of key-value pairs to be included in the next log entry +// - fields: A map of key-value pairs to be included in the fielded logger // // Returns: -// - Entry: A log entry that can be used to log a message with the specified fields +// - FieldedLogger: A logger with fields that can be used to log messages with the given fields // // Example: // @@ -26,21 +26,21 @@ import ( // "user_id": 12345, // "ip": "192.168.1.1", // }).Info("User logged in") -func (l *Logger) WithFields(fields map[string]interface{}) *Entry { +func (l *Logger) WithFields(fields map[string]interface{}) *FieldedLogger { attrs := make([]slog.Attr, 0, len(fields)) for k, v := range fields { attrs = append(attrs, slog.Any(k, v)) } - return &Entry{logger: l, attrs: attrs} + return &FieldedLogger{logger: l, attrs: attrs} } -// Entry is a log entry with fields. -type Entry struct { +// FieldedLogger is a logger with fields. +type FieldedLogger struct { logger *Logger attrs []slog.Attr } -func (e *Entry) log(ctx context.Context, level slog.Level, msg string, args ...any) { +func (e *FieldedLogger) log(ctx context.Context, level slog.Level, msg string, args ...any) { allAttrs := append(e.attrs, slog.Any("msg", msg)) for i := 0; i < len(args); i += 2 { if i+1 < len(args) { @@ -53,27 +53,27 @@ func (e *Entry) log(ctx context.Context, level slog.Level, msg string, args ...a } // Info logs a message at Info level with the fields specified in WithFields. -func (e *Entry) Info(msg string, args ...any) { +func (e *FieldedLogger) Info(msg string, args ...any) { e.log(context.Background(), slog.LevelInfo, msg, args...) } // Warn logs a message at Warn level with the fields specified in WithFields. -func (e *Entry) Warn(msg string, args ...any) { +func (e *FieldedLogger) Warn(msg string, args ...any) { e.log(context.Background(), slog.LevelWarn, msg, args...) } // Error logs a message at Error level with the fields specified in WithFields. -func (e *Entry) Error(msg string, args ...any) { +func (e *FieldedLogger) Error(msg string, args ...any) { e.log(context.Background(), slog.LevelError, msg, args...) } // Debug logs a message at Debug level with the fields specified in WithFields. -func (e *Entry) Debug(msg string, args ...any) { +func (e *FieldedLogger) Debug(msg string, args ...any) { e.log(context.Background(), slog.LevelDebug, msg, args...) } // Fatal logs a message at Fatal level with the fields specified in WithFields, then calls os.Exit(1). -func (e *Entry) Fatal(msg string, args ...any) { +func (e *FieldedLogger) Fatal(msg string, args ...any) { e.log(context.Background(), slog.LevelError, msg, args...) os.Exit(1) } diff --git a/internal/pkg/queue/index/manager.go b/internal/pkg/queue/index/manager.go index c6d08fee..14a6da74 100644 --- a/internal/pkg/queue/index/manager.go +++ b/internal/pkg/queue/index/manager.go @@ -3,11 +3,12 @@ package index import ( "encoding/gob" "fmt" - "log/slog" "os" "sync" "sync/atomic" "time" + + "github.com/internetarchive/Zeno/internal/pkg/log" ) var dumpFrequency = 60 // seconds @@ -58,6 +59,9 @@ type IndexManager struct { WalIoPercent int // [1, 100] limit max io percentage for WAL sync WalMinInterval time.Duration // minimum interval **between** between after-sync and next sync stopChan chan struct{} + + // Logging + logger *log.Logger } // NewIndexManager creates a new IndexManager instance and loads the index from the index file. @@ -94,6 +98,10 @@ func NewIndexManager(walPath, indexPath, queueDirPath string, useCommit bool) (* stopChan: make(chan struct{}), } + // Logger + logger, _ := log.DefaultOrStored() + im.logger = logger + // Init WAL commit if enabled if useCommit { im.walCommit = new(atomic.Uint64) @@ -121,7 +129,7 @@ func NewIndexManager(walPath, indexPath, queueDirPath string, useCommit bool) (* indexFile.Close() return nil, fmt.Errorf("failed to recover from crash: %w", err) } - fmt.Println("Recovered from crash") + im.logger.Warn("Recovered from crash") } else { err = im.loadIndex() if err != nil { @@ -142,7 +150,7 @@ func NewIndexManager(walPath, indexPath, queueDirPath string, useCommit bool) (* return case err := <-errChan: if err != nil { - slog.Error("Periodic dump failed", "error", err) // No better way to log this, will wait for https://github.com/internetarchive/Zeno/issues/92 + im.logger.Error("Periodic dump failed", "error", err) } } } @@ -162,14 +170,14 @@ func (im *IndexManager) unsafeWalSync() error { func (im *IndexManager) walCommitsSyncer() { if swaped := im.walSyncerRunning.CompareAndSwap(false, true); !swaped { - slog.Warn("another walCommitsSyncer is running") + im.logger.Warn("another walCommitsSyncer is running") return } defer im.walSyncerRunning.Store(false) defer close(im.walStopChan) if im.WalIoPercent < 1 || im.WalIoPercent > 100 { - slog.Warn("invalid WAL_IO_PERCENT", "value", im.WalIoPercent, "setting to", 10) + im.logger.Warn("invalid WAL_IO_PERCENT", "value", im.WalIoPercent, "setting to", 10) im.WalIoPercent = 10 } @@ -182,7 +190,7 @@ func (im *IndexManager) walCommitsSyncer() { } select { case <-im.walStopChan: - slog.Info("walCommitsSyncer performing final sync before stopping") + im.logger.Info("walCommitsSyncer performing final sync before stopping") stopping = true default: } @@ -191,7 +199,7 @@ func (im *IndexManager) walCommitsSyncer() { if sleepTime < im.WalMinInterval { sleepTime = im.WalMinInterval } - slog.Debug("walCommitsSyncer sleeping", "sleepTime", sleepTime, "lastTrySyncDuration", lastTrySyncDuration) + im.logger.Debug("walCommitsSyncer sleeping", "sleepTime", sleepTime, "lastTrySyncDuration", lastTrySyncDuration) time.Sleep(sleepTime) start := time.Now() @@ -201,14 +209,14 @@ func (im *IndexManager) walCommitsSyncer() { im.Unlock() lastTrySyncDuration = time.Since(start) if lastTrySyncDuration > 2*time.Second { - slog.Warn("WAL sync took too long", "lastTrySyncDuration", lastTrySyncDuration) + im.logger.Warn("WAL sync took too long", "lastTrySyncDuration", lastTrySyncDuration) } if err != nil { if stopping { - slog.Error("failed to sync WAL before stopping", "error", err) + im.logger.Error("failed to sync WAL before stopping", "error", err) return // we are stopping, no need to retry } - slog.Error("failed to sync WAL, retrying", "error", err) + im.logger.Error("failed to sync WAL, retrying", "error", err) continue // we may infinitely retry, but it's better than losing data } commited := flyingCommit @@ -219,7 +227,7 @@ func (im *IndexManager) walCommitsSyncer() { // should never happen if listeners number is accurate. for len(im.walCommitedNotify) > 0 { <-im.walCommitedNotify - slog.Warn("unconsumed commited id in walCommitedNotify") + im.logger.Warn("unconsumed commited id in walCommitedNotify") } // Send the commited id to all listeners @@ -243,11 +251,11 @@ func (im *IndexManager) WALCommit() uint64 { // DO NOT call this function with im.Lock() held, it will deadlock. func (im *IndexManager) AwaitWALCommitted(commit uint64) { if commit == 0 { - slog.Warn("AwaitWALCommited called with commit 0") + im.logger.Warn("AwaitWALCommited called with commit 0") return } if !im.walSyncerRunning.Load() { - slog.Warn("AwaitWALCommited called without Syncer running, beaware of hanging") + im.logger.Warn("AwaitWALCommited called without Syncer running, beaware of hanging") } if im.IsWALCommited(commit) { return @@ -425,8 +433,8 @@ func (im *IndexManager) pop(host string) (id string, position uint64, size uint6 // Close closes the index manager and performs a final dump of the index to disk. func (im *IndexManager) Close() error { - slog.Info("Closing index manager") - defer slog.Info("Index manager closed") + im.logger.Info("Closing index manager") + defer im.logger.Info("Index manager closed") im.dumpTicker.Stop() im.stopChan <- struct{}{} diff --git a/internal/pkg/queue/index/recovery.go b/internal/pkg/queue/index/recovery.go index 70acd2f6..a0ee771f 100644 --- a/internal/pkg/queue/index/recovery.go +++ b/internal/pkg/queue/index/recovery.go @@ -10,7 +10,7 @@ func (im *IndexManager) RecoverFromCrash() error { im.Lock() defer im.Unlock() - fmt.Println("Starting crash recovery process...") + im.logger.Warn("starting crash recovery process") // Step 1: Load the index file into the in-memory index if err := im.loadIndex(); err != nil { @@ -48,7 +48,7 @@ func (im *IndexManager) RecoverFromCrash() error { // Step 5: Remove old index file if err := os.Remove(oldIndexPath); err != nil { - fmt.Printf("Warning: failed to remove old index file: %v\n", err) + im.logger.Warn("failed to remove old index file", "error", err) } // Step 6: Truncate and reset WAL diff --git a/internal/pkg/queue/index/recovery_test.go b/internal/pkg/queue/index/recovery_test.go index 6abfe803..f5bb1829 100644 --- a/internal/pkg/queue/index/recovery_test.go +++ b/internal/pkg/queue/index/recovery_test.go @@ -5,9 +5,10 @@ import ( "os" "path" "strconv" - "sync/atomic" "testing" "time" + + "github.com/internetarchive/Zeno/internal/pkg/log" ) func Test_Recovery(t *testing.T) { @@ -32,34 +33,29 @@ func Test_Recovery(t *testing.T) { } im := &IndexManager{ - hostIndex: newIndex(), - walFile: walFile, - indexFile: indexFile, - walEncoder: gob.NewEncoder(walFile), - walDecoder: gob.NewDecoder(walFile), - indexEncoder: gob.NewEncoder(indexFile), - indexDecoder: gob.NewDecoder(indexFile), - dumpTicker: time.NewTicker(time.Duration(dumpFrequency) * time.Second), - lastDumpTime: time.Now(), - walCommit: new(atomic.Uint64), - walCommited: new(atomic.Uint64), - walNotifyListeners: new(atomic.Int64), - WalIoPercent: 100, - WalMinInterval: time.Duration(0), - walCommitedNotify: make(chan uint64), - walStopChan: make(chan struct{}), - } - go im.walCommitsSyncer() + hostIndex: newIndex(), + walFile: walFile, + indexFile: indexFile, + walEncoder: gob.NewEncoder(walFile), + walDecoder: gob.NewDecoder(walFile), + indexEncoder: gob.NewEncoder(indexFile), + indexDecoder: gob.NewDecoder(indexFile), + dumpTicker: time.NewTicker(time.Duration(dumpFrequency) * time.Second), + lastDumpTime: time.Now(), + useCommit: false, + } + + // Logger + logger, _ := log.DefaultOrStored() + im.logger = logger // Add entries to the index - var commit uint64 = 0 for i := 0; i < 1000; i++ { - commit, err = im.Add("example.com", "id"+strconv.Itoa(i), uint64(i*200), uint64(200)) + _, err := im.Add("example.com", "id"+strconv.Itoa(i), uint64(i*200), uint64(200)) if err != nil { t.Fatalf("failed to add entry to index: %v", err) } } - im.AwaitWALCommitted(commit) im.Lock() @@ -109,8 +105,12 @@ func Test_Recovery(t *testing.T) { indexDecoder: gob.NewDecoder(indexFile), dumpTicker: time.NewTicker(time.Duration(dumpFrequency) * time.Second), lastDumpTime: time.Now(), + useCommit: false, } + // Logger + im.logger = logger + err = im.RecoverFromCrash() if err != nil { t.Fatalf("failed to recover from crash: %v", err) @@ -142,36 +142,29 @@ func Test_RecoveryAfterOneIndexDumpAndWALNotEmpty(t *testing.T) { } im := &IndexManager{ - hostIndex: newIndex(), - walFile: walFile, - indexFile: indexFile, - walEncoder: gob.NewEncoder(walFile), - walDecoder: gob.NewDecoder(walFile), - indexEncoder: gob.NewEncoder(indexFile), - indexDecoder: gob.NewDecoder(indexFile), - dumpTicker: time.NewTicker(time.Duration(dumpFrequency) * time.Second), - walCommit: new(atomic.Uint64), - walCommited: new(atomic.Uint64), - walNotifyListeners: new(atomic.Int64), - walCommitedNotify: make(chan uint64), - WalIoPercent: 100, - WalMinInterval: time.Duration(0), - lastDumpTime: time.Now(), - } - go im.walCommitsSyncer() + hostIndex: newIndex(), + walFile: walFile, + indexFile: indexFile, + walEncoder: gob.NewEncoder(walFile), + walDecoder: gob.NewDecoder(walFile), + indexEncoder: gob.NewEncoder(indexFile), + indexDecoder: gob.NewDecoder(indexFile), + dumpTicker: time.NewTicker(time.Duration(dumpFrequency) * time.Second), + lastDumpTime: time.Now(), + useCommit: false, + } + + // Logger + logger, _ := log.DefaultOrStored() + im.logger = logger // Add entries to the index - t.Log("Adding entries to index") - var commit uint64 = 0 for i := 0; i < 50; i++ { - commit, err = im.Add("example.com", "id"+strconv.Itoa(i), uint64(i*200), uint64(200)) + _, err := im.Add("example.com", "id"+strconv.Itoa(i), uint64(i*200), uint64(200)) if err != nil { t.Fatalf("failed to add entry to index: %v", err) } } - t.Log("Awaiting WAL commit") - im.AwaitWALCommitted(commit) - t.Log("WAL commit received") // Perform a disk dump err = im.performDump() @@ -181,12 +174,11 @@ func Test_RecoveryAfterOneIndexDumpAndWALNotEmpty(t *testing.T) { // Add more entries to the index for i := 50; i < 100; i++ { - commit, err = im.Add("example.com", "id"+strconv.Itoa(i), uint64(i*200), uint64(200)) + _, err := im.Add("example.com", "id"+strconv.Itoa(i), uint64(i*200), uint64(200)) if err != nil { t.Fatalf("failed to add entry to index: %v", err) } } - im.AwaitWALCommitted(commit) im.Lock() @@ -236,9 +228,11 @@ func Test_RecoveryAfterOneIndexDumpAndWALNotEmpty(t *testing.T) { indexDecoder: gob.NewDecoder(indexFile), dumpTicker: time.NewTicker(time.Duration(dumpFrequency) * time.Second), lastDumpTime: time.Now(), + useCommit: false, } - t.Log("Recovering from crash") + // Logger + im.logger = logger err = im.RecoverFromCrash() if err != nil { diff --git a/internal/pkg/queue/queue.go b/internal/pkg/queue/queue.go index dba341ca..6ed7b7ec 100644 --- a/internal/pkg/queue/queue.go +++ b/internal/pkg/queue/queue.go @@ -4,13 +4,13 @@ import ( "encoding/gob" "errors" "fmt" - "log/slog" "net/url" "os" "path" "sync" "sync/atomic" + "github.com/internetarchive/Zeno/internal/pkg/log" "github.com/internetarchive/Zeno/internal/pkg/queue/index" "github.com/internetarchive/Zeno/internal/pkg/utils" ) @@ -43,7 +43,7 @@ type PersistentGroupedQueue struct { batchEnqueueOp func(...*Item) error dequeueOp func() (*Item, error) - logger *slog.Logger + logger *log.Logger } type Item struct { @@ -105,6 +105,10 @@ func NewPersistentGroupedQueue(queueDirPath string, useHandover bool, useCommit }, } + // Logging + logger, _ := log.DefaultOrStored() + q.logger = logger + // Set the queue as not paused and current host to 0 // Set the queue as not empty considering that it might have items at the beginning when resuming, the first dequeue will update it to true if needed q.Empty.Set(false)