Skip to content

Commit

Permalink
Merge pull request #117 from internetarchive/feat/distribute-logger-t…
Browse files Browse the repository at this point in the history
…o-queue

Enable `log` package to distribute a stored logger to all other packages
  • Loading branch information
equals215 authored Aug 5, 2024
2 parents 691838d + 99ae01f commit f447a27
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 91 deletions.
2 changes: 1 addition & 1 deletion internal/pkg/crawl/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type Worker struct {
state *workerState
doneSignal chan bool
pool *WorkerPool
logger *log.Entry
logger *log.FieldedLogger
}

// Run is the key component of a crawl, it's a background processed dispatched
Expand Down
41 changes: 33 additions & 8 deletions internal/pkg/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@ import (
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"

"github.com/elastic/go-elasticsearch/v8"
)

var (
defaultLogger *Logger
once sync.Once
isLoggerInit *atomic.Bool
storedLogger *Logger
once sync.Once
)

// Logger wraps slog.Logger to provide multi-output functionality
Expand All @@ -45,11 +47,15 @@ type Config struct {
RotateLogFile bool
ElasticsearchConfig *ElasticsearchConfig
RotateElasticSearchIndex bool
isDefault bool
}

// New creates a new Logger instance with the given configuration.
// It sets up handlers for stdout (text format) and file output (JSON format) if specified.
// If FileOutput is empty, only stdout logging will be enabled.
// Only the first call to New will store the logger to be reused. Subsequent calls will return a new logger instance.
// Only the first call to New will rotate the logs destinations.
// Please refrain from calling New multiple times in the same program.
//
// Parameters:
// - cfg: Config struct containing logger configuration options
Expand Down Expand Up @@ -128,32 +134,51 @@ func New(cfg Config) (*Logger, error) {
stopErrorLog: make(chan struct{}),
}

// Start rotation goroutine
logger.startRotation()
if !cfg.isDefault {
once.Do(func() {
isLoggerInit = new(atomic.Bool)
storedLogger = logger
isLoggerInit.CompareAndSwap(false, true)

// Start rotation goroutine
logger.startRotation()
})
}

return logger, nil
}

// Default returns the default Logger instance.
// DefaultOrStored returns the default Logger instance or if already initialized, the logger created by first call to New().
// The default logger writes to both stdout (text format) and a file named "app.log" (JSON format).
// Both outputs are set to log messages at Info level and above.
// This function uses sync.Once to ensure that the default logger is only created once.
//
// Returns:
// - *Logger: The default Logger instance
func Default() *Logger {
// - bool: True if the logger was created by this function, false if the logger was already initialized
func DefaultOrStored() (*Logger, bool) {
var created = false
once.Do(func() {
isLoggerInit = new(atomic.Bool)
logger, err := New(Config{
FileConfig: &LogfileConfig{Dir: "jobs", Prefix: "zeno"},
FileLevel: slog.LevelInfo,
StdoutLevel: slog.LevelInfo,
isDefault: true,
})
if err != nil {
panic(err)
}
defaultLogger = logger
storedLogger = logger
created = isLoggerInit.CompareAndSwap(false, true)
})
return defaultLogger
return storedLogger, created
}

// GetStoredLogger returns the logger created by the first call to New() or DefaultOrStored().
// If the logger has not been initialized, it will return nil.
func GetStoredLogger() *Logger {
return storedLogger
}

// Errors returns a channel that will receive logging errors
Expand Down
30 changes: 15 additions & 15 deletions internal/pkg/log/withfields.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ import (
"time"
)

// WithFields returns a new log entry with the given fields.
// The fields are key-value pairs that will be included only in the next log entry.
// WithFields returns a new fielded logger with the given fields.
// The fields are key-value pairs that will be included in the given logger.
//
// This method returns a log Entry, which can be used to log a message with the specified fields.
// This method returns a log FieldedLogger, which can be used to log a message with the specified fields.
//
// Parameters:
// - fields: A map of key-value pairs to be included in the next log entry
// - fields: A map of key-value pairs to be included in the fielded logger
//
// Returns:
// - Entry: A log entry that can be used to log a message with the specified fields
// - FieldedLogger: A logger with fields that can be used to log messages with the given fields
//
// Example:
//
Expand All @@ -26,21 +26,21 @@ import (
// "user_id": 12345,
// "ip": "192.168.1.1",
// }).Info("User logged in")
func (l *Logger) WithFields(fields map[string]interface{}) *Entry {
func (l *Logger) WithFields(fields map[string]interface{}) *FieldedLogger {
attrs := make([]slog.Attr, 0, len(fields))
for k, v := range fields {
attrs = append(attrs, slog.Any(k, v))
}
return &Entry{logger: l, attrs: attrs}
return &FieldedLogger{logger: l, attrs: attrs}
}

// Entry is a log entry with fields.
type Entry struct {
// FieldedLogger is a logger with fields.
type FieldedLogger struct {
logger *Logger
attrs []slog.Attr
}

func (e *Entry) log(ctx context.Context, level slog.Level, msg string, args ...any) {
func (e *FieldedLogger) log(ctx context.Context, level slog.Level, msg string, args ...any) {
allAttrs := append(e.attrs, slog.Any("msg", msg))
for i := 0; i < len(args); i += 2 {
if i+1 < len(args) {
Expand All @@ -53,27 +53,27 @@ func (e *Entry) log(ctx context.Context, level slog.Level, msg string, args ...a
}

// Info logs a message at Info level with the fields specified in WithFields.
func (e *Entry) Info(msg string, args ...any) {
func (e *FieldedLogger) Info(msg string, args ...any) {
e.log(context.Background(), slog.LevelInfo, msg, args...)
}

// Warn logs a message at Warn level with the fields specified in WithFields.
func (e *Entry) Warn(msg string, args ...any) {
func (e *FieldedLogger) Warn(msg string, args ...any) {
e.log(context.Background(), slog.LevelWarn, msg, args...)
}

// Error logs a message at Error level with the fields specified in WithFields.
func (e *Entry) Error(msg string, args ...any) {
func (e *FieldedLogger) Error(msg string, args ...any) {
e.log(context.Background(), slog.LevelError, msg, args...)
}

// Debug logs a message at Debug level with the fields specified in WithFields.
func (e *Entry) Debug(msg string, args ...any) {
func (e *FieldedLogger) Debug(msg string, args ...any) {
e.log(context.Background(), slog.LevelDebug, msg, args...)
}

// Fatal logs a message at Fatal level with the fields specified in WithFields, then calls os.Exit(1).
func (e *Entry) Fatal(msg string, args ...any) {
func (e *FieldedLogger) Fatal(msg string, args ...any) {
e.log(context.Background(), slog.LevelError, msg, args...)
os.Exit(1)
}
38 changes: 23 additions & 15 deletions internal/pkg/queue/index/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package index
import (
"encoding/gob"
"fmt"
"log/slog"
"os"
"sync"
"sync/atomic"
"time"

"github.com/internetarchive/Zeno/internal/pkg/log"
)

var dumpFrequency = 60 // seconds
Expand Down Expand Up @@ -58,6 +59,9 @@ type IndexManager struct {
WalIoPercent int // [1, 100] limit max io percentage for WAL sync
WalMinInterval time.Duration // minimum interval **between** between after-sync and next sync
stopChan chan struct{}

// Logging
logger *log.Logger
}

// NewIndexManager creates a new IndexManager instance and loads the index from the index file.
Expand Down Expand Up @@ -94,6 +98,10 @@ func NewIndexManager(walPath, indexPath, queueDirPath string, useCommit bool) (*
stopChan: make(chan struct{}),
}

// Logger
logger, _ := log.DefaultOrStored()
im.logger = logger

// Init WAL commit if enabled
if useCommit {
im.walCommit = new(atomic.Uint64)
Expand Down Expand Up @@ -121,7 +129,7 @@ func NewIndexManager(walPath, indexPath, queueDirPath string, useCommit bool) (*
indexFile.Close()
return nil, fmt.Errorf("failed to recover from crash: %w", err)
}
fmt.Println("Recovered from crash")
im.logger.Warn("Recovered from crash")
} else {
err = im.loadIndex()
if err != nil {
Expand All @@ -142,7 +150,7 @@ func NewIndexManager(walPath, indexPath, queueDirPath string, useCommit bool) (*
return
case err := <-errChan:
if err != nil {
slog.Error("Periodic dump failed", "error", err) // No better way to log this, will wait for https://github.com/internetarchive/Zeno/issues/92
im.logger.Error("Periodic dump failed", "error", err)
}
}
}
Expand All @@ -162,14 +170,14 @@ func (im *IndexManager) unsafeWalSync() error {

func (im *IndexManager) walCommitsSyncer() {
if swaped := im.walSyncerRunning.CompareAndSwap(false, true); !swaped {
slog.Warn("another walCommitsSyncer is running")
im.logger.Warn("another walCommitsSyncer is running")
return
}
defer im.walSyncerRunning.Store(false)
defer close(im.walStopChan)

if im.WalIoPercent < 1 || im.WalIoPercent > 100 {
slog.Warn("invalid WAL_IO_PERCENT", "value", im.WalIoPercent, "setting to", 10)
im.logger.Warn("invalid WAL_IO_PERCENT", "value", im.WalIoPercent, "setting to", 10)
im.WalIoPercent = 10
}

Expand All @@ -182,7 +190,7 @@ func (im *IndexManager) walCommitsSyncer() {
}
select {
case <-im.walStopChan:
slog.Info("walCommitsSyncer performing final sync before stopping")
im.logger.Info("walCommitsSyncer performing final sync before stopping")
stopping = true
default:
}
Expand All @@ -191,7 +199,7 @@ func (im *IndexManager) walCommitsSyncer() {
if sleepTime < im.WalMinInterval {
sleepTime = im.WalMinInterval
}
slog.Debug("walCommitsSyncer sleeping", "sleepTime", sleepTime, "lastTrySyncDuration", lastTrySyncDuration)
im.logger.Debug("walCommitsSyncer sleeping", "sleepTime", sleepTime, "lastTrySyncDuration", lastTrySyncDuration)
time.Sleep(sleepTime)

start := time.Now()
Expand All @@ -201,14 +209,14 @@ func (im *IndexManager) walCommitsSyncer() {
im.Unlock()
lastTrySyncDuration = time.Since(start)
if lastTrySyncDuration > 2*time.Second {
slog.Warn("WAL sync took too long", "lastTrySyncDuration", lastTrySyncDuration)
im.logger.Warn("WAL sync took too long", "lastTrySyncDuration", lastTrySyncDuration)
}
if err != nil {
if stopping {
slog.Error("failed to sync WAL before stopping", "error", err)
im.logger.Error("failed to sync WAL before stopping", "error", err)
return // we are stopping, no need to retry
}
slog.Error("failed to sync WAL, retrying", "error", err)
im.logger.Error("failed to sync WAL, retrying", "error", err)
continue // we may infinitely retry, but it's better than losing data
}
commited := flyingCommit
Expand All @@ -219,7 +227,7 @@ func (im *IndexManager) walCommitsSyncer() {
// should never happen if listeners number is accurate.
for len(im.walCommitedNotify) > 0 {
<-im.walCommitedNotify
slog.Warn("unconsumed commited id in walCommitedNotify")
im.logger.Warn("unconsumed commited id in walCommitedNotify")
}

// Send the commited id to all listeners
Expand All @@ -243,11 +251,11 @@ func (im *IndexManager) WALCommit() uint64 {
// DO NOT call this function with im.Lock() held, it will deadlock.
func (im *IndexManager) AwaitWALCommitted(commit uint64) {
if commit == 0 {
slog.Warn("AwaitWALCommited called with commit 0")
im.logger.Warn("AwaitWALCommited called with commit 0")
return
}
if !im.walSyncerRunning.Load() {
slog.Warn("AwaitWALCommited called without Syncer running, beaware of hanging")
im.logger.Warn("AwaitWALCommited called without Syncer running, beaware of hanging")
}
if im.IsWALCommited(commit) {
return
Expand Down Expand Up @@ -425,8 +433,8 @@ func (im *IndexManager) pop(host string) (id string, position uint64, size uint6

// Close closes the index manager and performs a final dump of the index to disk.
func (im *IndexManager) Close() error {
slog.Info("Closing index manager")
defer slog.Info("Index manager closed")
im.logger.Info("Closing index manager")
defer im.logger.Info("Index manager closed")
im.dumpTicker.Stop()
im.stopChan <- struct{}{}

Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/queue/index/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ func (im *IndexManager) RecoverFromCrash() error {
im.Lock()
defer im.Unlock()

fmt.Println("Starting crash recovery process...")
im.logger.Warn("starting crash recovery process")

// Step 1: Load the index file into the in-memory index
if err := im.loadIndex(); err != nil {
Expand Down Expand Up @@ -48,7 +48,7 @@ func (im *IndexManager) RecoverFromCrash() error {

// Step 5: Remove old index file
if err := os.Remove(oldIndexPath); err != nil {
fmt.Printf("Warning: failed to remove old index file: %v\n", err)
im.logger.Warn("failed to remove old index file", "error", err)
}

// Step 6: Truncate and reset WAL
Expand Down
Loading

0 comments on commit f447a27

Please sign in to comment.