Skip to content

Commit

Permalink
fix: log to timeline from provisioner plugins
Browse files Browse the repository at this point in the history
  • Loading branch information
jvmakine committed Mar 4, 2025
1 parent 49cfa9f commit e549eb8
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 14 deletions.
14 changes: 7 additions & 7 deletions backend/cron/go2proto.to.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions backend/provisioner/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,8 @@ func Start(
schemaClient schemaconnect.SchemaServiceClient,
timelineClient *timeline.Client,
) error {

timelineLogSink := timeline.NewLogSink(timelineClient, log.Trace)
go timelineLogSink.RunLogLoop(ctx)

logger := log.FromContext(ctx).AddSink(timelineLogSink)
ctx = log.ContextWithLogger(ctx, logger)

Expand Down
19 changes: 17 additions & 2 deletions cmd/ftl-provisioner-cloudformation/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"net/url"
"strconv"
"time"

Expand All @@ -25,6 +26,7 @@ import (
"github.com/block/ftl/internal/provisioner"
"github.com/block/ftl/internal/provisioner/executor"
"github.com/block/ftl/internal/provisioner/state"
timeline "github.com/block/ftl/internal/timelineclient"
)

const (
Expand All @@ -41,6 +43,8 @@ type Config struct {
// TODO: remove this once we have module specific security groups
DatabaseSecurityGroup string `help:"SG for databases" env:"FTL_PROVISIONER_CF_DB_SECURITY_GROUP"`
MysqlSecurityGroup string `help:"SG for mysql" env:"FTL_PROVISIONER_CF_MYSQL_SECURITY_GROUP"`

TimelineEndpoint *url.URL `help:"Endpoint for the timeline service" env:"FTL_TIMELINE_ENDPOINT"`
}

type CloudformationProvisioner struct {
Expand All @@ -54,6 +58,9 @@ type CloudformationProvisioner struct {
var _ provisionerconnect.ProvisionerPluginServiceHandler = (*CloudformationProvisioner)(nil)

func NewCloudformationProvisioner(ctx context.Context, config Config) (context.Context, *CloudformationProvisioner, error) {
logger := log.FromContext(ctx)
logger.Debugf("Creating cloudformation provisioner")

client, err := createClient(ctx)
if err != nil {
return nil, nil, fmt.Errorf("failed to create cloudformation client: %w", err)
Expand All @@ -63,6 +70,12 @@ func NewCloudformationProvisioner(ctx context.Context, config Config) (context.C
return nil, nil, fmt.Errorf("failed to create secretsmanager client: %w", err)
}

timelineClient := timeline.NewClient(ctx, config.TimelineEndpoint)
timelineLogSink := timeline.NewLogSink(timelineClient, log.Info)
go timelineLogSink.RunLogLoop(ctx)
logger = logger.AddSink(timelineLogSink)
ctx = log.ContextWithLogger(ctx, logger)

return ctx, &CloudformationProvisioner{
client: client,
secrets: secrets,
Expand All @@ -76,12 +89,14 @@ func (c *CloudformationProvisioner) Ping(context.Context, *connect.Request[ftlv1
}

func (c *CloudformationProvisioner) Provision(ctx context.Context, req *connect.Request[provisionerpb.ProvisionRequest]) (*connect.Response[provisionerpb.ProvisionResponse], error) {
logger := log.FromContext(ctx)

module, err := schema.ModuleFromProto(req.Msg.DesiredModule)
if err != nil {
return nil, fmt.Errorf("failed to convert module from proto: %w", err)
}

logger := log.FromContext(ctx).Deployment(module.Runtime.Deployment.DeploymentKey).Module(module.Name)
ctx = log.ContextWithLogger(ctx, logger)

var acceptedKinds []schema.ResourceType
for _, k := range req.Msg.Kinds {
acceptedKinds = append(acceptedKinds, schema.ResourceType(k))
Expand Down
18 changes: 17 additions & 1 deletion cmd/ftl-provisioner-sandbox/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"net/url"

"connectrpc.com/connect"
"github.com/aws/aws-sdk-go-v2/config"
Expand All @@ -20,12 +21,15 @@ import (
"github.com/block/ftl/internal/provisioner"
"github.com/block/ftl/internal/provisioner/executor"
"github.com/block/ftl/internal/provisioner/state"
timeline "github.com/block/ftl/internal/timelineclient"
)

type Config struct {
MySQLCredentialsSecretARN string `help:"ARN for the secret containing mysql credentials" env:"FTL_SANDBOX_MYSQL_ARN"`
MySQLEndpoint string `help:"Endpoint for the mysql database" env:"FTL_SANDBOX_MYSQL_ENDPOINT"`
KafkaBrokers []string `help:"Brokers for the kafka cluster" env:"FTL_SANDBOX_KAFKA_BROKERS"`

TimelineEndpoint *url.URL `help:"Endpoint for the timeline service" env:"FTL_TIMELINE_ENDPOINT"`
}

type SandboxProvisioner struct {
Expand All @@ -38,11 +42,20 @@ type SandboxProvisioner struct {
var _ provisionerconnect.ProvisionerPluginServiceHandler = (*SandboxProvisioner)(nil)

func NewSandboxProvisioner(ctx context.Context, config Config) (context.Context, *SandboxProvisioner, error) {
logger := log.FromContext(ctx)
logger.Debugf("Creating sandbox provisioner")

secrets, err := createSecretsClient(ctx)
if err != nil {
return nil, nil, fmt.Errorf("failed to create secretsmanager client: %w", err)
}

timelineClient := timeline.NewClient(ctx, config.TimelineEndpoint)
timelineLogSink := timeline.NewLogSink(timelineClient, log.Info)
go timelineLogSink.RunLogLoop(ctx)
logger = logger.AddSink(timelineLogSink)
ctx = log.ContextWithLogger(ctx, logger)

return ctx, &SandboxProvisioner{
secrets: secrets,
confg: &config,
Expand All @@ -59,11 +72,14 @@ func (c *SandboxProvisioner) Provision(ctx context.Context, req *connect.Request
if err != nil {
return nil, fmt.Errorf("failed to convert module from proto: %w", err)
}

logger := log.FromContext(ctx).Deployment(module.Runtime.Deployment.DeploymentKey).Module(module.Name)
ctx = log.ContextWithLogger(ctx, logger)

var acceptedKinds []schema.ResourceType
for _, k := range req.Msg.Kinds {
acceptedKinds = append(acceptedKinds, schema.ResourceType(k))
}
logger := log.FromContext(ctx).Module(module.Name)

inputStates := inputsFromSchema(ctx, module, acceptedKinds, req.Msg.DesiredModule.Name, c.confg)

Expand Down
3 changes: 1 addition & 2 deletions cmd/ftl-provisioner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@ func main() {
)
cli.ProvisionerConfig.SetDefaults()

timelineClient := timeline.NewClient(context.Background(), cli.ProvisionerConfig.TimelineEndpoint)

logger := log.Configure(os.Stderr, cli.LogConfig)
ctx := log.ContextWithLogger(context.Background(), logger)
timelineClient := timeline.NewClient(ctx, cli.ProvisionerConfig.TimelineEndpoint)
err := observability.Init(ctx, false, "", "ftl-provisioner", ftl.Version, cli.ObservabilityConfig)
kctx.FatalIfErrorf(err, "failed to initialize observability")

Expand Down
5 changes: 5 additions & 0 deletions internal/provisioner/executor/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/block/ftl/common/schema"
"github.com/block/ftl/internal/concurrency"
"github.com/block/ftl/internal/dsn"
"github.com/block/ftl/internal/log"
"github.com/block/ftl/internal/provisioner"
"github.com/block/ftl/internal/provisioner/state"
)
Expand All @@ -40,6 +41,8 @@ func (e *ARNSecretMySQLSetup) Prepare(_ context.Context, input state.State) erro
func (e *ARNSecretMySQLSetup) Execute(ctx context.Context) ([]state.State, error) {
rg := concurrency.ResourceGroup[state.State]{}

logger := log.FromContext(ctx)

for _, input := range e.inputs {
if input, ok := input.(state.RDSInstanceReadyMySQL); ok {
rg.Go(func() (state.State, error) {
Expand All @@ -58,6 +61,8 @@ func (e *ARNSecretMySQLSetup) Execute(ctx context.Context) ([]state.State, error
return nil, err
}

logger.Infof("MySQL database created: %s", input.ResourceID) //nolint

return state.OutputMySQL{
Module: input.Module,
ResourceID: input.ResourceID,
Expand Down
3 changes: 3 additions & 0 deletions internal/provisioner/executor/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func (e *ARNSecretPostgresSetup) Prepare(_ context.Context, input state.State) e

func (e *ARNSecretPostgresSetup) Execute(ctx context.Context) ([]state.State, error) {
rg := concurrency.ResourceGroup[state.State]{}
logger := log.FromContext(ctx)

for _, input := range e.inputs {
if input, ok := input.(state.RDSInstanceReadyPostgres); ok {
Expand All @@ -66,6 +67,8 @@ func (e *ARNSecretPostgresSetup) Execute(ctx context.Context) ([]state.State, er
return nil, err
}

logger.Infof("Postgres database created: %s", input.ResourceID) //nolint

return state.OutputPostgres{
Module: input.Module,
ResourceID: input.ResourceID,
Expand Down
2 changes: 2 additions & 0 deletions internal/provisioner/executor/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ func (e *KafkaTopicSetup) Execute(ctx context.Context) ([]state.State, error) {
}
logger.Debugf("Output: %s", output.DebugString())
result = append(result, output)

logger.Infof("Kafka topic created: %s", topicID) //nolint
}
}
return result, nil
Expand Down

0 comments on commit e549eb8

Please sign in to comment.