Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Provisioner logs to timeline #4776

Merged
merged 6 commits into from
Mar 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions backend/cron/go2proto.to.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions backend/provisioner/dev_provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ func NewDevProvisioner(postgresPort int, mysqlPort int, recreate bool) *InMemPro
}
func provisionMysql(mysqlPort int, recreate bool) InMemResourceProvisionerFn {
return func(ctx context.Context, changeset key.Changeset, deployment key.Deployment, res schema.Provisioned) (*schema.RuntimeElement, error) {
logger := log.FromContext(ctx)
logger := log.FromContext(ctx).Deployment(deployment)

dbName := strcase.ToLowerSnake(deployment.Payload.Module) + "_" + strcase.ToLowerSnake(res.ResourceID())

logger.Debugf("Provisioning mysql database: %s", dbName)
logger.Infof("Provisioning mysql database: %s", dbName) //nolint

// We assume that the DB hsas already been started when running in dev mode
mysqlDSN, err := dev.SetupMySQL(ctx, mysqlPort)
Expand Down Expand Up @@ -125,10 +125,10 @@ func ProvisionMySQLForTest(ctx context.Context, moduleName string, id string) (s

func provisionPostgres(postgresPort int, recreate bool) InMemResourceProvisionerFn {
return func(ctx context.Context, changeset key.Changeset, deployment key.Deployment, resource schema.Provisioned) (*schema.RuntimeElement, error) {
logger := log.FromContext(ctx)
logger := log.FromContext(ctx).Deployment(deployment)

dbName := strcase.ToLowerSnake(deployment.Payload.Module) + "_" + strcase.ToLowerSnake(resource.ResourceID())
logger.Debugf("Provisioning postgres database: %s", dbName)
logger.Infof("Provisioning postgres database: %s", dbName) //nolint

// We assume that the DB has already been started when running in dev mode
postgresDSN := dsn.PostgresDSN("ftl", dsn.Port(postgresPort))
Expand Down
10 changes: 9 additions & 1 deletion backend/provisioner/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/block/ftl/internal/key"
"github.com/block/ftl/internal/log"
"github.com/block/ftl/internal/schema/schemaeventsource"
timeline "github.com/block/ftl/internal/timelineclient"
)

// CommonProvisionerConfig is shared config between the production controller and development server.
Expand All @@ -35,6 +36,7 @@ type CommonProvisionerConfig struct {
type Config struct {
ControllerEndpoint *url.URL `name:"ftl-controller-endpoint" help:"Controller endpoint." env:"FTL_CONTROLLER_ENDPOINT" default:"http://127.0.0.1:8893"`
SchemaEndpoint *url.URL `help:"Schema service endpoint." env:"FTL_SCHEMA_ENDPOINT" default:"http://127.0.0.1:8897"`
TimelineEndpoint *url.URL `help:"Timeline endpoint." env:"FTL_TIMELINE_ENDPOINT" default:"http://127.0.0.1:8894"`
CommonProvisionerConfig
}

Expand Down Expand Up @@ -75,9 +77,13 @@ func Start(
ctx context.Context,
registry *ProvisionerRegistry,
schemaClient schemaconnect.SchemaServiceClient,
timelineClient *timeline.Client,
) error {
timelineLogSink := timeline.NewLogSink(timelineClient, log.Trace)
go timelineLogSink.RunLogLoop(ctx)
logger := log.FromContext(ctx).AddSink(timelineLogSink)
ctx = log.ContextWithLogger(ctx, logger)

logger := log.FromContext(ctx)
logger.Debugf("Starting FTL provisioner")

svc, err := New(ctx, registry, schemaClient)
Expand Down Expand Up @@ -185,6 +191,7 @@ func RegistryFromConfigFile(ctx context.Context, workingDir string, file *os.Fil

return registry, nil
}

func (s *Service) HandleChangesetPrepared(ctx context.Context, req key.Changeset) error {

_, err := s.schemaClient.CommitChangeset(ctx, connect.NewRequest(&ftlv1.CommitChangesetRequest{Changeset: req.String()}))
Expand All @@ -193,6 +200,7 @@ func (s *Service) HandleChangesetPrepared(ctx context.Context, req key.Changeset
}
return nil
}

func (s *Service) HandleChangesetCommitted(ctx context.Context, req *schema.Changeset) error {
go func() {
time.Sleep(time.Second * 5)
Expand Down
2 changes: 2 additions & 0 deletions charts/ftl/templates/provisioner-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ spec:
value: "/working"
- name: FTL_CONTROLLER_ENDPOINT
value: "http://{{ include "ftl.fullname" . }}-controller:{{ .Values.controller.port }}"
- name: FTL_TIMELINE_ENDPOINT
value: "http://{{ include "ftl.fullname" . }}-timeline:{{ .Values.timeline.port }}"
- name: FTL_SCHEMA_ENDPOINT
value: "http://{{ include "ftl.fullname" . }}-schema:{{ $.Values.schema.services.schema.port }}"
- name: LOG_LEVEL
Expand Down
21 changes: 18 additions & 3 deletions cmd/ftl-provisioner-cloudformation/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"net/url"
"strconv"
"time"

Expand All @@ -25,6 +26,7 @@ import (
"github.com/block/ftl/internal/provisioner"
"github.com/block/ftl/internal/provisioner/executor"
"github.com/block/ftl/internal/provisioner/state"
timeline "github.com/block/ftl/internal/timelineclient"
)

const (
Expand All @@ -41,6 +43,8 @@ type Config struct {
// TODO: remove this once we have module specific security groups
DatabaseSecurityGroup string `help:"SG for databases" env:"FTL_PROVISIONER_CF_DB_SECURITY_GROUP"`
MysqlSecurityGroup string `help:"SG for mysql" env:"FTL_PROVISIONER_CF_MYSQL_SECURITY_GROUP"`

TimelineEndpoint *url.URL `help:"Endpoint for the timeline service" env:"FTL_TIMELINE_ENDPOINT"`
}

type CloudformationProvisioner struct {
Expand All @@ -54,6 +58,9 @@ type CloudformationProvisioner struct {
var _ provisionerconnect.ProvisionerPluginServiceHandler = (*CloudformationProvisioner)(nil)

func NewCloudformationProvisioner(ctx context.Context, config Config) (context.Context, *CloudformationProvisioner, error) {
logger := log.FromContext(ctx)
logger.Debugf("Creating cloudformation provisioner")

client, err := createClient(ctx)
if err != nil {
return nil, nil, fmt.Errorf("failed to create cloudformation client: %w", err)
Expand All @@ -63,6 +70,12 @@ func NewCloudformationProvisioner(ctx context.Context, config Config) (context.C
return nil, nil, fmt.Errorf("failed to create secretsmanager client: %w", err)
}

timelineClient := timeline.NewClient(ctx, config.TimelineEndpoint)
timelineLogSink := timeline.NewLogSink(timelineClient, log.Info)
go timelineLogSink.RunLogLoop(ctx)
logger = logger.AddSink(timelineLogSink)
ctx = log.ContextWithLogger(ctx, logger)

return ctx, &CloudformationProvisioner{
client: client,
secrets: secrets,
Expand All @@ -76,12 +89,14 @@ func (c *CloudformationProvisioner) Ping(context.Context, *connect.Request[ftlv1
}

func (c *CloudformationProvisioner) Provision(ctx context.Context, req *connect.Request[provisionerpb.ProvisionRequest]) (*connect.Response[provisionerpb.ProvisionResponse], error) {
logger := log.FromContext(ctx)

module, err := schema.ModuleFromProto(req.Msg.DesiredModule)
if err != nil {
return nil, fmt.Errorf("failed to convert module from proto: %w", err)
}

logger := log.FromContext(ctx).Deployment(module.Runtime.Deployment.DeploymentKey).Module(module.Name)
ctx = log.ContextWithLogger(ctx, logger)

var acceptedKinds []schema.ResourceType
for _, k := range req.Msg.Kinds {
acceptedKinds = append(acceptedKinds, schema.ResourceType(k))
Expand Down Expand Up @@ -117,7 +132,7 @@ func (c *CloudformationProvisioner) Provision(ctx context.Context, req *connect.
return nil, fmt.Errorf("provisioner already running: %s", stackID)
}
logger.Debugf("Starting task for module %s: %s", req.Msg.DesiredModule.Name, stackID)
task.Start(ctx, module.Name)
task.Start(ctx, module.Name, module.Runtime.Deployment.DeploymentKey)
return connect.NewResponse(&provisionerpb.ProvisionResponse{
Status: provisionerpb.ProvisionResponse_PROVISION_RESPONSE_STATUS_SUBMITTED,
ProvisioningToken: stackID,
Expand Down
6 changes: 5 additions & 1 deletion cmd/ftl-provisioner-cloudformation/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ func (c *CloudformationProvisioner) Status(ctx context.Context, req *connect.Req
// in that case, we start a new task to query the existing stack
task, loaded := c.running.LoadOrStore(token, &provisioner.Task{})
if !loaded {
task.Start(ctx, req.Msg.DesiredModule.Name)
dk, err := key.ParseDeploymentKey(token)
if err != nil {
return nil, fmt.Errorf("failed to parse deployment key: %w", err)
}
task.Start(ctx, req.Msg.DesiredModule.Name, dk)
}

if task.Err() != nil {
Expand Down
20 changes: 18 additions & 2 deletions cmd/ftl-provisioner-sandbox/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"net/url"

"connectrpc.com/connect"
"github.com/aws/aws-sdk-go-v2/config"
Expand All @@ -20,12 +21,15 @@ import (
"github.com/block/ftl/internal/provisioner"
"github.com/block/ftl/internal/provisioner/executor"
"github.com/block/ftl/internal/provisioner/state"
timeline "github.com/block/ftl/internal/timelineclient"
)

type Config struct {
MySQLCredentialsSecretARN string `help:"ARN for the secret containing mysql credentials" env:"FTL_SANDBOX_MYSQL_ARN"`
MySQLEndpoint string `help:"Endpoint for the mysql database" env:"FTL_SANDBOX_MYSQL_ENDPOINT"`
KafkaBrokers []string `help:"Brokers for the kafka cluster" env:"FTL_SANDBOX_KAFKA_BROKERS"`

TimelineEndpoint *url.URL `help:"Endpoint for the timeline service" env:"FTL_TIMELINE_ENDPOINT"`
}

type SandboxProvisioner struct {
Expand All @@ -38,11 +42,20 @@ type SandboxProvisioner struct {
var _ provisionerconnect.ProvisionerPluginServiceHandler = (*SandboxProvisioner)(nil)

func NewSandboxProvisioner(ctx context.Context, config Config) (context.Context, *SandboxProvisioner, error) {
logger := log.FromContext(ctx)
logger.Debugf("Creating sandbox provisioner")

secrets, err := createSecretsClient(ctx)
if err != nil {
return nil, nil, fmt.Errorf("failed to create secretsmanager client: %w", err)
}

timelineClient := timeline.NewClient(ctx, config.TimelineEndpoint)
timelineLogSink := timeline.NewLogSink(timelineClient, log.Info)
go timelineLogSink.RunLogLoop(ctx)
logger = logger.AddSink(timelineLogSink)
ctx = log.ContextWithLogger(ctx, logger)

return ctx, &SandboxProvisioner{
secrets: secrets,
confg: &config,
Expand All @@ -59,11 +72,14 @@ func (c *SandboxProvisioner) Provision(ctx context.Context, req *connect.Request
if err != nil {
return nil, fmt.Errorf("failed to convert module from proto: %w", err)
}

logger := log.FromContext(ctx).Deployment(module.Runtime.Deployment.DeploymentKey).Module(module.Name)
ctx = log.ContextWithLogger(ctx, logger)

var acceptedKinds []schema.ResourceType
for _, k := range req.Msg.Kinds {
acceptedKinds = append(acceptedKinds, schema.ResourceType(k))
}
logger := log.FromContext(ctx).Module(module.Name)

inputStates := inputsFromSchema(ctx, module, acceptedKinds, req.Msg.DesiredModule.Name, c.confg)

Expand All @@ -88,7 +104,7 @@ func (c *SandboxProvisioner) Provision(ctx context.Context, req *connect.Request
return nil, fmt.Errorf("provisioner already running: %s", token)
}
logger.Debugf("Starting task %s", token)
task.Start(ctx, module.Name)
task.Start(ctx, module.Name, module.Runtime.Deployment.DeploymentKey)
return connect.NewResponse(&provisionerpb.ProvisionResponse{
Status: provisionerpb.ProvisionResponse_PROVISION_RESPONSE_STATUS_SUBMITTED,
ProvisioningToken: token,
Expand Down
4 changes: 3 additions & 1 deletion cmd/ftl-provisioner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/block/ftl/internal/observability"
_ "github.com/block/ftl/internal/prodinit"
"github.com/block/ftl/internal/rpc"
timeline "github.com/block/ftl/internal/timelineclient"
)

var cli struct {
Expand All @@ -38,6 +39,7 @@ func main() {

logger := log.Configure(os.Stderr, cli.LogConfig)
ctx := log.ContextWithLogger(context.Background(), logger)
timelineClient := timeline.NewClient(ctx, cli.ProvisionerConfig.TimelineEndpoint)
err := observability.Init(ctx, false, "", "ftl-provisioner", ftl.Version, cli.ObservabilityConfig)
kctx.FatalIfErrorf(err, "failed to initialize observability")

Expand Down Expand Up @@ -69,6 +71,6 @@ func main() {
logger.Debugf("Registered provisioner %s as fallback for runner", runnerBinding)
}

err = provisioner.Start(ctx, registry, schemaClient)
err = provisioner.Start(ctx, registry, schemaClient, timelineClient)
kctx.FatalIfErrorf(err, "failed to start provisioner")
}
2 changes: 1 addition & 1 deletion cmd/ftl/cmd_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func (s *serveCommonConfig) run(
}

wg.Go(func() error {
if err := provisioner.Start(provisionerCtx, provisionerRegistry, schemaClient); err != nil {
if err := provisioner.Start(provisionerCtx, provisionerRegistry, schemaClient, timelineClient); err != nil {
logger.Errorf(err, "provisionerfailed: %v", err)
return fmt.Errorf("provisionerfailed: %w", err)
}
Expand Down
7 changes: 7 additions & 0 deletions internal/log/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ import (
"github.com/alecthomas/atomic"
"github.com/alecthomas/types/optional"
"github.com/benbjohnson/clock"

"github.com/block/ftl/internal/key"
)

var _ Interface = (*Logger)(nil)

const scopeKey = "scope"
const moduleKey = "module"
const deploymentKey = "deployment"

type Entry struct {
Time time.Time `json:"-"`
Expand Down Expand Up @@ -54,6 +57,10 @@ func (l Logger) Module(module string) *Logger {
return l.Attrs(map[string]string{moduleKey: module})
}

func (l Logger) Deployment(deployment key.Deployment) *Logger {
return l.Attrs(map[string]string{deploymentKey: deployment.String()})
}

// Attrs creates a new logger with the given attributes.
func (l Logger) Attrs(attributes map[string]string) *Logger {
attr := make(map[string]string, len(l.attributes)+len(attributes))
Expand Down
5 changes: 5 additions & 0 deletions internal/provisioner/executor/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/block/ftl/common/schema"
"github.com/block/ftl/internal/concurrency"
"github.com/block/ftl/internal/dsn"
"github.com/block/ftl/internal/log"
"github.com/block/ftl/internal/provisioner"
"github.com/block/ftl/internal/provisioner/state"
)
Expand All @@ -40,6 +41,8 @@ func (e *ARNSecretMySQLSetup) Prepare(_ context.Context, input state.State) erro
func (e *ARNSecretMySQLSetup) Execute(ctx context.Context) ([]state.State, error) {
rg := concurrency.ResourceGroup[state.State]{}

logger := log.FromContext(ctx)

for _, input := range e.inputs {
if input, ok := input.(state.RDSInstanceReadyMySQL); ok {
rg.Go(func() (state.State, error) {
Expand All @@ -58,6 +61,8 @@ func (e *ARNSecretMySQLSetup) Execute(ctx context.Context) ([]state.State, error
return nil, err
}

logger.Infof("MySQL database created: %s", input.ResourceID) //nolint

return state.OutputMySQL{
Module: input.Module,
ResourceID: input.ResourceID,
Expand Down
3 changes: 3 additions & 0 deletions internal/provisioner/executor/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func (e *ARNSecretPostgresSetup) Prepare(_ context.Context, input state.State) e

func (e *ARNSecretPostgresSetup) Execute(ctx context.Context) ([]state.State, error) {
rg := concurrency.ResourceGroup[state.State]{}
logger := log.FromContext(ctx)

for _, input := range e.inputs {
if input, ok := input.(state.RDSInstanceReadyPostgres); ok {
Expand All @@ -66,6 +67,8 @@ func (e *ARNSecretPostgresSetup) Execute(ctx context.Context) ([]state.State, er
return nil, err
}

logger.Infof("Postgres database created: %s", input.ResourceID) //nolint

return state.OutputPostgres{
Module: input.Module,
ResourceID: input.ResourceID,
Expand Down
2 changes: 2 additions & 0 deletions internal/provisioner/executor/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ func (e *KafkaTopicSetup) Execute(ctx context.Context) ([]state.State, error) {
}
logger.Debugf("Output: %s", output.DebugString())
result = append(result, output)

logger.Infof("Kafka topic created: %s", topicID) //nolint
}
}
return result, nil
Expand Down
Loading
Loading