diff --git a/backend/cron/go2proto.to.go b/backend/cron/go2proto.to.go index 0d79fb1a94..765a0b92e4 100644 --- a/backend/cron/go2proto.to.go +++ b/backend/cron/go2proto.to.go @@ -12,7 +12,6 @@ import "github.com/alecthomas/types/result" import "time" - var _ fmt.Stringer var _ = timestamppb.Timestamp{} var _ = durationpb.Duration{} @@ -82,7 +81,7 @@ func orZero[T any](v *T) T { } func orZeroR[T any](v result.Result[*T]) result.Result[T] { - if v.Err() != nil { + if v.Err() != nil { return result.Err[T](v.Err()) } r, _ := v.Get() @@ -171,7 +170,6 @@ func unmarshallText[T any, TPtr textUnmarshallable[T]](v []byte, f TPtr) result. return result.Ok[*T](&to) } - func (x *CronState) ToProto() *destpb.CronState { if x == nil { return nil @@ -188,13 +186,15 @@ func CronStateFromProto(v *destpb.CronState) (out *CronState, err error) { } out = &CronState{} - if out.LastExecutions, err = mapValuesR(v.LastExecutions, func(v *timestamppb.Timestamp) result.Result[time.Time] { return orZeroR(result.From(setNil(ptr(v.AsTime()), v), nil)) }).Result(); err != nil { + if out.LastExecutions, err = mapValuesR(v.LastExecutions, func(v *timestamppb.Timestamp) result.Result[time.Time] { + return orZeroR(result.From(setNil(ptr(v.AsTime()), v), nil)) + }).Result(); err != nil { return nil, fmt.Errorf("LastExecutions: %w", err) } - if out.NextExecutions, err = mapValuesR(v.NextExecutions, func(v *timestamppb.Timestamp) result.Result[time.Time] { return orZeroR(result.From(setNil(ptr(v.AsTime()), v), nil)) }).Result(); err != nil { + if out.NextExecutions, err = mapValuesR(v.NextExecutions, func(v *timestamppb.Timestamp) result.Result[time.Time] { + return orZeroR(result.From(setNil(ptr(v.AsTime()), v), nil)) + }).Result(); err != nil { return nil, fmt.Errorf("NextExecutions: %w", err) } return out, nil } - - \ No newline at end of file diff --git a/backend/provisioner/dev_provisioner.go b/backend/provisioner/dev_provisioner.go index f6417ca161..8ec98e07c0 100644 --- a/backend/provisioner/dev_provisioner.go +++ b/backend/provisioner/dev_provisioner.go @@ -33,11 +33,11 @@ func NewDevProvisioner(postgresPort int, mysqlPort int, recreate bool) *InMemPro } func provisionMysql(mysqlPort int, recreate bool) InMemResourceProvisionerFn { return func(ctx context.Context, changeset key.Changeset, deployment key.Deployment, res schema.Provisioned) (*schema.RuntimeElement, error) { - logger := log.FromContext(ctx) + logger := log.FromContext(ctx).Deployment(deployment) dbName := strcase.ToLowerSnake(deployment.Payload.Module) + "_" + strcase.ToLowerSnake(res.ResourceID()) - logger.Debugf("Provisioning mysql database: %s", dbName) + logger.Infof("Provisioning mysql database: %s", dbName) //nolint // We assume that the DB hsas already been started when running in dev mode mysqlDSN, err := dev.SetupMySQL(ctx, mysqlPort) @@ -125,10 +125,10 @@ func ProvisionMySQLForTest(ctx context.Context, moduleName string, id string) (s func provisionPostgres(postgresPort int, recreate bool) InMemResourceProvisionerFn { return func(ctx context.Context, changeset key.Changeset, deployment key.Deployment, resource schema.Provisioned) (*schema.RuntimeElement, error) { - logger := log.FromContext(ctx) + logger := log.FromContext(ctx).Deployment(deployment) dbName := strcase.ToLowerSnake(deployment.Payload.Module) + "_" + strcase.ToLowerSnake(resource.ResourceID()) - logger.Debugf("Provisioning postgres database: %s", dbName) + logger.Infof("Provisioning postgres database: %s", dbName) //nolint // We assume that the DB has already been started when running in dev mode postgresDSN := dsn.PostgresDSN("ftl", dsn.Port(postgresPort)) diff --git a/backend/provisioner/service.go b/backend/provisioner/service.go index 04ba76930b..cb2b9aa069 100644 --- a/backend/provisioner/service.go +++ b/backend/provisioner/service.go @@ -24,6 +24,7 @@ import ( "github.com/block/ftl/internal/key" "github.com/block/ftl/internal/log" "github.com/block/ftl/internal/schema/schemaeventsource" + timeline "github.com/block/ftl/internal/timelineclient" ) // CommonProvisionerConfig is shared config between the production controller and development server. @@ -35,6 +36,7 @@ type CommonProvisionerConfig struct { type Config struct { ControllerEndpoint *url.URL `name:"ftl-controller-endpoint" help:"Controller endpoint." env:"FTL_CONTROLLER_ENDPOINT" default:"http://127.0.0.1:8893"` SchemaEndpoint *url.URL `help:"Schema service endpoint." env:"FTL_SCHEMA_ENDPOINT" default:"http://127.0.0.1:8897"` + TimelineEndpoint *url.URL `help:"Timeline endpoint." env:"FTL_TIMELINE_ENDPOINT" default:"http://127.0.0.1:8894"` CommonProvisionerConfig } @@ -75,9 +77,13 @@ func Start( ctx context.Context, registry *ProvisionerRegistry, schemaClient schemaconnect.SchemaServiceClient, + timelineClient *timeline.Client, ) error { + timelineLogSink := timeline.NewLogSink(timelineClient, log.Trace) + go timelineLogSink.RunLogLoop(ctx) + logger := log.FromContext(ctx).AddSink(timelineLogSink) + ctx = log.ContextWithLogger(ctx, logger) - logger := log.FromContext(ctx) logger.Debugf("Starting FTL provisioner") svc, err := New(ctx, registry, schemaClient) @@ -185,6 +191,7 @@ func RegistryFromConfigFile(ctx context.Context, workingDir string, file *os.Fil return registry, nil } + func (s *Service) HandleChangesetPrepared(ctx context.Context, req key.Changeset) error { _, err := s.schemaClient.CommitChangeset(ctx, connect.NewRequest(&ftlv1.CommitChangesetRequest{Changeset: req.String()})) @@ -193,6 +200,7 @@ func (s *Service) HandleChangesetPrepared(ctx context.Context, req key.Changeset } return nil } + func (s *Service) HandleChangesetCommitted(ctx context.Context, req *schema.Changeset) error { go func() { time.Sleep(time.Second * 5) diff --git a/charts/ftl/templates/provisioner-deployment.yaml b/charts/ftl/templates/provisioner-deployment.yaml index 35345139d8..9726ae01ab 100644 --- a/charts/ftl/templates/provisioner-deployment.yaml +++ b/charts/ftl/templates/provisioner-deployment.yaml @@ -44,6 +44,8 @@ spec: value: "/working" - name: FTL_CONTROLLER_ENDPOINT value: "http://{{ include "ftl.fullname" . }}-controller:{{ .Values.controller.port }}" + - name: FTL_TIMELINE_ENDPOINT + value: "http://{{ include "ftl.fullname" . }}-timeline:{{ .Values.timeline.port }}" - name: FTL_SCHEMA_ENDPOINT value: "http://{{ include "ftl.fullname" . }}-schema:{{ $.Values.schema.services.schema.port }}" - name: LOG_LEVEL diff --git a/cmd/ftl-provisioner-cloudformation/provisioner.go b/cmd/ftl-provisioner-cloudformation/provisioner.go index 14a1dc147f..85b916c6e9 100644 --- a/cmd/ftl-provisioner-cloudformation/provisioner.go +++ b/cmd/ftl-provisioner-cloudformation/provisioner.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "net/url" "strconv" "time" @@ -25,6 +26,7 @@ import ( "github.com/block/ftl/internal/provisioner" "github.com/block/ftl/internal/provisioner/executor" "github.com/block/ftl/internal/provisioner/state" + timeline "github.com/block/ftl/internal/timelineclient" ) const ( @@ -41,6 +43,8 @@ type Config struct { // TODO: remove this once we have module specific security groups DatabaseSecurityGroup string `help:"SG for databases" env:"FTL_PROVISIONER_CF_DB_SECURITY_GROUP"` MysqlSecurityGroup string `help:"SG for mysql" env:"FTL_PROVISIONER_CF_MYSQL_SECURITY_GROUP"` + + TimelineEndpoint *url.URL `help:"Endpoint for the timeline service" env:"FTL_TIMELINE_ENDPOINT"` } type CloudformationProvisioner struct { @@ -54,6 +58,9 @@ type CloudformationProvisioner struct { var _ provisionerconnect.ProvisionerPluginServiceHandler = (*CloudformationProvisioner)(nil) func NewCloudformationProvisioner(ctx context.Context, config Config) (context.Context, *CloudformationProvisioner, error) { + logger := log.FromContext(ctx) + logger.Debugf("Creating cloudformation provisioner") + client, err := createClient(ctx) if err != nil { return nil, nil, fmt.Errorf("failed to create cloudformation client: %w", err) @@ -63,6 +70,12 @@ func NewCloudformationProvisioner(ctx context.Context, config Config) (context.C return nil, nil, fmt.Errorf("failed to create secretsmanager client: %w", err) } + timelineClient := timeline.NewClient(ctx, config.TimelineEndpoint) + timelineLogSink := timeline.NewLogSink(timelineClient, log.Info) + go timelineLogSink.RunLogLoop(ctx) + logger = logger.AddSink(timelineLogSink) + ctx = log.ContextWithLogger(ctx, logger) + return ctx, &CloudformationProvisioner{ client: client, secrets: secrets, @@ -76,12 +89,14 @@ func (c *CloudformationProvisioner) Ping(context.Context, *connect.Request[ftlv1 } func (c *CloudformationProvisioner) Provision(ctx context.Context, req *connect.Request[provisionerpb.ProvisionRequest]) (*connect.Response[provisionerpb.ProvisionResponse], error) { - logger := log.FromContext(ctx) - module, err := schema.ModuleFromProto(req.Msg.DesiredModule) if err != nil { return nil, fmt.Errorf("failed to convert module from proto: %w", err) } + + logger := log.FromContext(ctx).Deployment(module.Runtime.Deployment.DeploymentKey).Module(module.Name) + ctx = log.ContextWithLogger(ctx, logger) + var acceptedKinds []schema.ResourceType for _, k := range req.Msg.Kinds { acceptedKinds = append(acceptedKinds, schema.ResourceType(k)) @@ -117,7 +132,7 @@ func (c *CloudformationProvisioner) Provision(ctx context.Context, req *connect. return nil, fmt.Errorf("provisioner already running: %s", stackID) } logger.Debugf("Starting task for module %s: %s", req.Msg.DesiredModule.Name, stackID) - task.Start(ctx, module.Name) + task.Start(ctx, module.Name, module.Runtime.Deployment.DeploymentKey) return connect.NewResponse(&provisionerpb.ProvisionResponse{ Status: provisionerpb.ProvisionResponse_PROVISION_RESPONSE_STATUS_SUBMITTED, ProvisioningToken: stackID, diff --git a/cmd/ftl-provisioner-cloudformation/status.go b/cmd/ftl-provisioner-cloudformation/status.go index 3eafe92a27..bb4ddfe3e5 100644 --- a/cmd/ftl-provisioner-cloudformation/status.go +++ b/cmd/ftl-provisioner-cloudformation/status.go @@ -23,7 +23,11 @@ func (c *CloudformationProvisioner) Status(ctx context.Context, req *connect.Req // in that case, we start a new task to query the existing stack task, loaded := c.running.LoadOrStore(token, &provisioner.Task{}) if !loaded { - task.Start(ctx, req.Msg.DesiredModule.Name) + dk, err := key.ParseDeploymentKey(token) + if err != nil { + return nil, fmt.Errorf("failed to parse deployment key: %w", err) + } + task.Start(ctx, req.Msg.DesiredModule.Name, dk) } if task.Err() != nil { diff --git a/cmd/ftl-provisioner-sandbox/provisioner.go b/cmd/ftl-provisioner-sandbox/provisioner.go index 567ec7bc69..f73d5d8bd3 100644 --- a/cmd/ftl-provisioner-sandbox/provisioner.go +++ b/cmd/ftl-provisioner-sandbox/provisioner.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "net/url" "connectrpc.com/connect" "github.com/aws/aws-sdk-go-v2/config" @@ -20,12 +21,15 @@ import ( "github.com/block/ftl/internal/provisioner" "github.com/block/ftl/internal/provisioner/executor" "github.com/block/ftl/internal/provisioner/state" + timeline "github.com/block/ftl/internal/timelineclient" ) type Config struct { MySQLCredentialsSecretARN string `help:"ARN for the secret containing mysql credentials" env:"FTL_SANDBOX_MYSQL_ARN"` MySQLEndpoint string `help:"Endpoint for the mysql database" env:"FTL_SANDBOX_MYSQL_ENDPOINT"` KafkaBrokers []string `help:"Brokers for the kafka cluster" env:"FTL_SANDBOX_KAFKA_BROKERS"` + + TimelineEndpoint *url.URL `help:"Endpoint for the timeline service" env:"FTL_TIMELINE_ENDPOINT"` } type SandboxProvisioner struct { @@ -38,11 +42,20 @@ type SandboxProvisioner struct { var _ provisionerconnect.ProvisionerPluginServiceHandler = (*SandboxProvisioner)(nil) func NewSandboxProvisioner(ctx context.Context, config Config) (context.Context, *SandboxProvisioner, error) { + logger := log.FromContext(ctx) + logger.Debugf("Creating sandbox provisioner") + secrets, err := createSecretsClient(ctx) if err != nil { return nil, nil, fmt.Errorf("failed to create secretsmanager client: %w", err) } + timelineClient := timeline.NewClient(ctx, config.TimelineEndpoint) + timelineLogSink := timeline.NewLogSink(timelineClient, log.Info) + go timelineLogSink.RunLogLoop(ctx) + logger = logger.AddSink(timelineLogSink) + ctx = log.ContextWithLogger(ctx, logger) + return ctx, &SandboxProvisioner{ secrets: secrets, confg: &config, @@ -59,11 +72,14 @@ func (c *SandboxProvisioner) Provision(ctx context.Context, req *connect.Request if err != nil { return nil, fmt.Errorf("failed to convert module from proto: %w", err) } + + logger := log.FromContext(ctx).Deployment(module.Runtime.Deployment.DeploymentKey).Module(module.Name) + ctx = log.ContextWithLogger(ctx, logger) + var acceptedKinds []schema.ResourceType for _, k := range req.Msg.Kinds { acceptedKinds = append(acceptedKinds, schema.ResourceType(k)) } - logger := log.FromContext(ctx).Module(module.Name) inputStates := inputsFromSchema(ctx, module, acceptedKinds, req.Msg.DesiredModule.Name, c.confg) @@ -88,7 +104,7 @@ func (c *SandboxProvisioner) Provision(ctx context.Context, req *connect.Request return nil, fmt.Errorf("provisioner already running: %s", token) } logger.Debugf("Starting task %s", token) - task.Start(ctx, module.Name) + task.Start(ctx, module.Name, module.Runtime.Deployment.DeploymentKey) return connect.NewResponse(&provisionerpb.ProvisionResponse{ Status: provisionerpb.ProvisionResponse_PROVISION_RESPONSE_STATUS_SUBMITTED, ProvisioningToken: token, diff --git a/cmd/ftl-provisioner/main.go b/cmd/ftl-provisioner/main.go index e97214045b..394dda424b 100644 --- a/cmd/ftl-provisioner/main.go +++ b/cmd/ftl-provisioner/main.go @@ -17,6 +17,7 @@ import ( "github.com/block/ftl/internal/observability" _ "github.com/block/ftl/internal/prodinit" "github.com/block/ftl/internal/rpc" + timeline "github.com/block/ftl/internal/timelineclient" ) var cli struct { @@ -38,6 +39,7 @@ func main() { logger := log.Configure(os.Stderr, cli.LogConfig) ctx := log.ContextWithLogger(context.Background(), logger) + timelineClient := timeline.NewClient(ctx, cli.ProvisionerConfig.TimelineEndpoint) err := observability.Init(ctx, false, "", "ftl-provisioner", ftl.Version, cli.ObservabilityConfig) kctx.FatalIfErrorf(err, "failed to initialize observability") @@ -69,6 +71,6 @@ func main() { logger.Debugf("Registered provisioner %s as fallback for runner", runnerBinding) } - err = provisioner.Start(ctx, registry, schemaClient) + err = provisioner.Start(ctx, registry, schemaClient, timelineClient) kctx.FatalIfErrorf(err, "failed to start provisioner") } diff --git a/cmd/ftl/cmd_serve.go b/cmd/ftl/cmd_serve.go index 885a16559f..009e5af55b 100644 --- a/cmd/ftl/cmd_serve.go +++ b/cmd/ftl/cmd_serve.go @@ -324,7 +324,7 @@ func (s *serveCommonConfig) run( } wg.Go(func() error { - if err := provisioner.Start(provisionerCtx, provisionerRegistry, schemaClient); err != nil { + if err := provisioner.Start(provisionerCtx, provisionerRegistry, schemaClient, timelineClient); err != nil { logger.Errorf(err, "provisionerfailed: %v", err) return fmt.Errorf("provisionerfailed: %w", err) } diff --git a/internal/log/logger.go b/internal/log/logger.go index c8725ab115..11477bf13b 100644 --- a/internal/log/logger.go +++ b/internal/log/logger.go @@ -11,12 +11,15 @@ import ( "github.com/alecthomas/atomic" "github.com/alecthomas/types/optional" "github.com/benbjohnson/clock" + + "github.com/block/ftl/internal/key" ) var _ Interface = (*Logger)(nil) const scopeKey = "scope" const moduleKey = "module" +const deploymentKey = "deployment" type Entry struct { Time time.Time `json:"-"` @@ -54,6 +57,10 @@ func (l Logger) Module(module string) *Logger { return l.Attrs(map[string]string{moduleKey: module}) } +func (l Logger) Deployment(deployment key.Deployment) *Logger { + return l.Attrs(map[string]string{deploymentKey: deployment.String()}) +} + // Attrs creates a new logger with the given attributes. func (l Logger) Attrs(attributes map[string]string) *Logger { attr := make(map[string]string, len(l.attributes)+len(attributes)) diff --git a/internal/provisioner/executor/mysql.go b/internal/provisioner/executor/mysql.go index 9b5a482147..035ae8b68b 100644 --- a/internal/provisioner/executor/mysql.go +++ b/internal/provisioner/executor/mysql.go @@ -14,6 +14,7 @@ import ( "github.com/block/ftl/common/schema" "github.com/block/ftl/internal/concurrency" "github.com/block/ftl/internal/dsn" + "github.com/block/ftl/internal/log" "github.com/block/ftl/internal/provisioner" "github.com/block/ftl/internal/provisioner/state" ) @@ -40,6 +41,8 @@ func (e *ARNSecretMySQLSetup) Prepare(_ context.Context, input state.State) erro func (e *ARNSecretMySQLSetup) Execute(ctx context.Context) ([]state.State, error) { rg := concurrency.ResourceGroup[state.State]{} + logger := log.FromContext(ctx) + for _, input := range e.inputs { if input, ok := input.(state.RDSInstanceReadyMySQL); ok { rg.Go(func() (state.State, error) { @@ -58,6 +61,8 @@ func (e *ARNSecretMySQLSetup) Execute(ctx context.Context) ([]state.State, error return nil, err } + logger.Infof("MySQL database created: %s", input.ResourceID) //nolint + return state.OutputMySQL{ Module: input.Module, ResourceID: input.ResourceID, diff --git a/internal/provisioner/executor/postgres.go b/internal/provisioner/executor/postgres.go index d2235bf343..456ec84e1b 100644 --- a/internal/provisioner/executor/postgres.go +++ b/internal/provisioner/executor/postgres.go @@ -41,6 +41,7 @@ func (e *ARNSecretPostgresSetup) Prepare(_ context.Context, input state.State) e func (e *ARNSecretPostgresSetup) Execute(ctx context.Context) ([]state.State, error) { rg := concurrency.ResourceGroup[state.State]{} + logger := log.FromContext(ctx) for _, input := range e.inputs { if input, ok := input.(state.RDSInstanceReadyPostgres); ok { @@ -66,6 +67,8 @@ func (e *ARNSecretPostgresSetup) Execute(ctx context.Context) ([]state.State, er return nil, err } + logger.Infof("Postgres database created: %s", input.ResourceID) //nolint + return state.OutputPostgres{ Module: input.Module, ResourceID: input.ResourceID, diff --git a/internal/provisioner/executor/pubsub.go b/internal/provisioner/executor/pubsub.go index ed963006c2..8fa7ccc28c 100644 --- a/internal/provisioner/executor/pubsub.go +++ b/internal/provisioner/executor/pubsub.go @@ -91,6 +91,8 @@ func (e *KafkaTopicSetup) Execute(ctx context.Context) ([]state.State, error) { } logger.Debugf("Output: %s", output.DebugString()) result = append(result, output) + + logger.Infof("Kafka topic created: %s", topicID) //nolint } } return result, nil diff --git a/internal/provisioner/executor_test.go b/internal/provisioner/executor_test.go index af6f2382dd..cf7e299e29 100644 --- a/internal/provisioner/executor_test.go +++ b/internal/provisioner/executor_test.go @@ -53,7 +53,7 @@ func TestExecutorSelection(t *testing.T) { }, } - states, err := runner.Run(ctx, "test") + states, err := runner.Run(ctx) assert.NoError(t, err) assert.Equal(t, 2, len(states)) diff --git a/internal/provisioner/runner.go b/internal/provisioner/runner.go index 6c193e7de2..27ac44e622 100644 --- a/internal/provisioner/runner.go +++ b/internal/provisioner/runner.go @@ -8,6 +8,7 @@ import ( "github.com/alecthomas/atomic" "golang.org/x/sync/errgroup" + "github.com/block/ftl/internal/key" "github.com/block/ftl/internal/log" "github.com/block/ftl/internal/provisioner/state" ) @@ -41,8 +42,8 @@ type Runner struct { Stages []RunnerStage } -func (r *Runner) Run(ctx context.Context, module string) ([]state.State, error) { - logger := log.FromContext(ctx).Module(module) +func (r *Runner) Run(ctx context.Context) ([]state.State, error) { + logger := log.FromContext(ctx) for _, stage := range r.Stages { logger.Debugf("running stage %s", stage.Name) @@ -120,11 +121,12 @@ func (r *Runner) execute(ctx context.Context, stage *RunnerStage) ([]state.State return result, nil } -func (t *Task) Start(oldCtx context.Context, module string) { +func (t *Task) Start(oldCtx context.Context, module string, deployment key.Deployment) { ctx := context.WithoutCancel(oldCtx) - logger := log.FromContext(ctx).Module(module) + logger := log.FromContext(ctx).Module(module).Deployment(deployment) + ctx = log.ContextWithLogger(ctx, logger) go func() { - outputs, err := t.runner.Run(ctx, module) + outputs, err := t.runner.Run(ctx) if err != nil { logger.Errorf(err, "failed to execute provisioner") t.err.Store(err)