Skip to content

Commit

Permalink
fix: logging replay issues (#4706)
Browse files Browse the repository at this point in the history
- Fix scope
- Read all messages from plugins
  • Loading branch information
stuartwdouglas authored Feb 27, 2025
1 parent 3ecae18 commit 65b69ea
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 12 deletions.
4 changes: 2 additions & 2 deletions backend/cron/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func Start(ctx context.Context, config Config, eventSource *schemaeventsource.Ev
}

func executeJob(ctx context.Context, state *statemachine.SingleQueryHandle[struct{}, CronState, CronEvent], client routing.CallClient, job *cronJob, timelineClient *timelineclient.Client) error {
logger := log.FromContext(ctx).Scope("cron")
logger := log.FromContext(ctx).Scope("cron").Module(job.module)
logger.Debugf("Executing cron job %s", job)

view, err := state.View(ctx)
Expand Down Expand Up @@ -209,7 +209,7 @@ func executeJob(ctx context.Context, state *statemachine.SingleQueryHandle[struc
}

func callCronVerb(ctx context.Context, verbClient routing.CallClient, cronJob *cronJob) error {
logger := log.FromContext(ctx).Scope("cron")
logger := log.FromContext(ctx).Scope("cron").Module(cronJob.module)
ref := schema.Ref{Module: cronJob.module, Name: cronJob.verb.Name}
logger.Debugf("Calling cron job %s", cronJob)

Expand Down
4 changes: 3 additions & 1 deletion backend/provisioner/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,10 +278,12 @@ func (s *Service) deProvision(ctx context.Context, cs key.Changeset, modules []*
}

func (s *Service) HandleChangesetPreparing(ctx context.Context, req *schema.Changeset) error {
logger := log.FromContext(ctx)
mLogger := log.FromContext(ctx)
group := errgroup.Group{}
// TODO: Block deployments to make sure only one module is modified at a time
for _, module := range req.Modules {
logger := mLogger.Module(module.Name)
ctx := log.ContextWithLogger(ctx, logger)
moduleName := module.Name

existingModule, _ := s.currentModules.Load(moduleName)
Expand Down
8 changes: 8 additions & 0 deletions common/plugin/spawn.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,14 @@ func Spawn[Client rpc.Pingable[Req, Resp, RespPtr], Req any, Resp any, RespPtr r
}
cmd.Env = append(cmd.Env, "FTL_BIND="+pluginEndpoint.String())
cmd.Env = append(cmd.Env, "FTL_WORKING_DIR="+workingDir)
// If the log level is lower than debug we just leave it at the default.
// Otherwise we set the log level to debug so we can replay it if needed
// These messages are streamed through this processes logger, so will respect the normal log level
if logger.GetLevel() >= log.Debug {
cmd.Env = append(cmd.Env, "LOG_LEVEL=DEBUG")
} else {
cmd.Env = append(cmd.Env, "LOG_LEVEL="+logger.GetLevel().String())
}
cmd.Env = append(cmd.Env, opts.envars...)
if err = cmd.Start(); err != nil {
return nil, nil, err
Expand Down
11 changes: 11 additions & 0 deletions go-runtime/goplugin/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ type scaffoldingContext struct {
// CreateModule generates files for a new module with the requested name
func (s *Service) CreateModule(ctx context.Context, req *connect.Request[langpb.CreateModuleRequest]) (*connect.Response[langpb.CreateModuleResponse], error) {
logger := log.FromContext(ctx)
logger = logger.Module(req.Msg.Name)
ctx = log.ContextWithLogger(ctx, logger)
flags := req.Msg.Flags.AsMap()
projConfig := langpb.ProjectConfigFromProto(req.Msg.ProjectConfig)

Expand Down Expand Up @@ -202,6 +204,9 @@ func (s *Service) GetDependencies(ctx context.Context, req *connect.Request[lang
}

func (s *Service) GenerateStubs(ctx context.Context, req *connect.Request[langpb.GenerateStubsRequest]) (*connect.Response[langpb.GenerateStubsResponse], error) {
logger := log.FromContext(ctx)
logger = logger.Module(req.Msg.Module.Name)
ctx = log.ContextWithLogger(ctx, logger)
moduleSchema, err := schema.ValidatedModuleFromProto(req.Msg.Module)
if err != nil {
return nil, fmt.Errorf("invalid module: %w", err)
Expand All @@ -220,6 +225,9 @@ func (s *Service) GenerateStubs(ctx context.Context, req *connect.Request[langpb
}

func (s *Service) SyncStubReferences(ctx context.Context, req *connect.Request[langpb.SyncStubReferencesRequest]) (*connect.Response[langpb.SyncStubReferencesResponse], error) {
logger := log.FromContext(ctx)
logger = logger.Module(req.Msg.ModuleConfig.Name)
ctx = log.ContextWithLogger(ctx, logger)
config := langpb.ModuleConfigFromProto(req.Msg.ModuleConfig)
err := compile.SyncGeneratedStubReferences(ctx, config, req.Msg.StubsRoot, req.Msg.Modules)
if err != nil {
Expand All @@ -238,6 +246,9 @@ func (s *Service) SyncStubReferences(ctx context.Context, req *connect.Request[l
// rebuild must include the latest build context id provided by the request or subsequent BuildContextUpdated
// calls.
func (s *Service) Build(ctx context.Context, req *connect.Request[langpb.BuildRequest], stream *connect.ServerStream[langpb.BuildResponse]) error {
logger := log.FromContext(ctx)
logger = logger.Module(req.Msg.BuildContext.ModuleConfig.Name)
ctx = log.ContextWithLogger(ctx, logger)
events := make(chan updateEvent, 32)
s.updatesTopic.Subscribe(events)
defer s.updatesTopic.Unsubscribe(events)
Expand Down
21 changes: 12 additions & 9 deletions internal/log/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@ import (
"bufio"
"fmt"
"io"
"os"
"maps"
"runtime"
"time"

"github.com/alecthomas/atomic"
"github.com/alecthomas/types/optional"
"github.com/benbjohnson/clock"
"golang.org/x/exp/maps"
)

var _ Interface = (*Logger)(nil)
Expand Down Expand Up @@ -79,7 +78,12 @@ func (l Logger) GetLevel() Level {
}

func (l *Logger) Log(entry Entry) {
var mergedAttributes map[string]string
if t, ok := l.debugLogger.Get(); ok {
mergedAttributes = make(map[string]string, len(l.attributes)+len(entry.Attributes))
maps.Copy(mergedAttributes, l.attributes)
maps.Copy(mergedAttributes, entry.Attributes)
entry.Attributes = mergedAttributes
t.delegate.Log(entry)
}
if entry.Level < l.level.Load() {
Expand All @@ -90,14 +94,13 @@ func (l *Logger) Log(entry Entry) {
entry.Time = l.clock.Now().UTC()
}

mergedAttributes := make(map[string]string, len(l.attributes)+len(entry.Attributes))
maps.Copy(mergedAttributes, l.attributes)
maps.Copy(mergedAttributes, entry.Attributes)
entry.Attributes = mergedAttributes

if err := l.sink.Log(entry); err != nil {
fmt.Fprintf(os.Stderr, "ftl:log: failed to log entry: %v", err)
if mergedAttributes == nil {
mergedAttributes = make(map[string]string, len(l.attributes)+len(entry.Attributes))
maps.Copy(mergedAttributes, l.attributes)
maps.Copy(mergedAttributes, entry.Attributes)
entry.Attributes = mergedAttributes
}
_ = l.sink.Log(entry) // nolint:errcheck
}

func (l *Logger) Logf(level Level, format string, args ...interface{}) {
Expand Down

0 comments on commit 65b69ea

Please sign in to comment.