From a268b6108864dfa232d69a26c7348385fa12fc7c Mon Sep 17 00:00:00 2001 From: Richard Liu Date: Thu, 13 Jun 2024 21:51:40 -0700 Subject: [PATCH] Change Flyte CR naming scheme to better support namespace_mapping - Typically Flyte is configured so that each project / domain has its own Kubernetes namespace. Certain environments may change this behavior by using the Flyteadmin namespace_mapping setting to put all executions in fewer (or a singular) Kubernetes namespace. This is problematic because it can lead to collisions in the naming of the CR that flyteadmin generates. - This patch fixes 2 important things to make this work properly inside of Flyte: * it adds a random element to the CR name in Flyte so that the CR is named by the execution + some unique value when created by flyteadmin Without this change, an execution Foo in project A will prevent an execution Foo in project B from launching, because the name of the CR thats generated in Kubernetes *assumes* that the namespace the CRs are put into is different for project A and project B When namespace_mapping is set to a singular value, that assumption is wrong * it makes sure that when flytepropeller cleans up the CR resource that it uses Kubernetes labels to find the correct CR -- so instead of assuming that it can use the execution name, it instead uses the project, domain and execution labels - Use deterministic hash of execution id, name, project, as the FlyteWorkflow CR name. Add workflow-cr-name-hash-length to flytepropeller config. Signed-off-by: ddl-ebrown Signed-off-by: ddl-rliu --- .../pkg/workflowengine/impl/k8s_executor.go | 28 +++++++++++-- .../workflowengine/impl/k8s_executor_test.go | 27 +++++++----- .../pkg/compiler/transformers/k8s/workflow.go | 37 +++++++++++++++- .../transformers/k8s/workflow_test.go | 42 +++++++++++++++++++ .../pkg/controller/config/config.go | 2 + 5 files changed, 122 insertions(+), 14 deletions(-) diff --git a/flyteadmin/pkg/workflowengine/impl/k8s_executor.go b/flyteadmin/pkg/workflowengine/impl/k8s_executor.go index d941cc8309..c7a73e9832 100644 --- a/flyteadmin/pkg/workflowengine/impl/k8s_executor.go +++ b/flyteadmin/pkg/workflowengine/impl/k8s_executor.go @@ -12,6 +12,7 @@ import ( execClusterInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/executioncluster/interfaces" runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces" "github.com/flyteorg/flyte/flyteadmin/pkg/workflowengine/interfaces" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flytestdlib/logger" "github.com/flyteorg/flyte/flytestdlib/storage" ) @@ -87,6 +88,23 @@ func (e K8sWorkflowExecutor) Execute(ctx context.Context, data interfaces.Execut }, nil } +const ( + // Labels that are set on the FlyteWorkflow CRD + DomainLabel = "domain" + ExecutionIDLabel = "execution-id" + ProjectLabel = "project" +) + +func executionLabelSelector(executionID *core.WorkflowExecutionIdentifier) *v1.LabelSelector { + return &v1.LabelSelector{ + MatchLabels: map[string]string{ + DomainLabel: executionID.GetDomain(), + ExecutionIDLabel: executionID.GetName(), + ProjectLabel: executionID.GetProject(), + }, + } +} + func (e K8sWorkflowExecutor) Abort(ctx context.Context, data interfaces.AbortData) error { target, err := e.executionCluster.GetTarget(ctx, &executioncluster.ExecutionTargetSpec{ TargetID: data.Cluster, @@ -94,9 +112,13 @@ func (e K8sWorkflowExecutor) Abort(ctx context.Context, data interfaces.AbortDat if err != nil { return errors.NewFlyteAdminErrorf(codes.Internal, err.Error()) } - err = target.FlyteClient.FlyteworkflowV1alpha1().FlyteWorkflows(data.Namespace).Delete(ctx, data.ExecutionID.GetName(), v1.DeleteOptions{ - PropagationPolicy: &deletePropagationBackground, - }) + err = target.FlyteClient.FlyteworkflowV1alpha1().FlyteWorkflows(data.Namespace).DeleteCollection( + ctx, + v1.DeleteOptions{PropagationPolicy: &deletePropagationBackground}, + v1.ListOptions{ + LabelSelector: v1.FormatLabelSelector(executionLabelSelector(data.ExecutionID)), + }, + ) // An IsNotFound error indicates the resource is already deleted. if err != nil && !k8_api_err.IsNotFound(err) { return errors.NewFlyteAdminErrorf(codes.Internal, "failed to terminate execution: %v with err %v", data.ExecutionID, err) diff --git a/flyteadmin/pkg/workflowengine/impl/k8s_executor_test.go b/flyteadmin/pkg/workflowengine/impl/k8s_executor_test.go index a2ecb51364..bc3888667d 100644 --- a/flyteadmin/pkg/workflowengine/impl/k8s_executor_test.go +++ b/flyteadmin/pkg/workflowengine/impl/k8s_executor_test.go @@ -31,11 +31,11 @@ import ( var fakeFlyteWF = FakeFlyteWorkflowV1alpha1{} type createCallback func(*v1alpha1.FlyteWorkflow, v1.CreateOptions) (*v1alpha1.FlyteWorkflow, error) -type deleteCallback func(name string, options *v1.DeleteOptions) error +type deleteCollectionCallback func(*v1.DeleteOptions, *v1.ListOptions) error type FakeFlyteWorkflow struct { v1alpha12.FlyteWorkflowInterface - createCallback createCallback - deleteCallback deleteCallback + createCallback + deleteCollectionCallback } func (b *FakeFlyteWorkflow) Create(ctx context.Context, wf *v1alpha1.FlyteWorkflow, opts v1.CreateOptions) (*v1alpha1.FlyteWorkflow, error) { @@ -45,9 +45,9 @@ func (b *FakeFlyteWorkflow) Create(ctx context.Context, wf *v1alpha1.FlyteWorkfl return nil, nil } -func (b *FakeFlyteWorkflow) Delete(ctx context.Context, name string, options v1.DeleteOptions) error { - if b.deleteCallback != nil { - return b.deleteCallback(name, &options) +func (b *FakeFlyteWorkflow) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error { + if b.deleteCollectionCallback != nil { + return b.deleteCollectionCallback(&opts, &listOpts) } return nil } @@ -280,8 +280,15 @@ func TestExecute_MiscError(t *testing.T) { func TestAbort(t *testing.T) { fakeFlyteWorkflow := FakeFlyteWorkflow{} - fakeFlyteWorkflow.deleteCallback = func(name string, options *v1.DeleteOptions) error { - assert.Equal(t, execID.Name, name) + fakeFlyteWorkflow.deleteCollectionCallback = func(options *v1.DeleteOptions, listOpts *v1.ListOptions) error { + selector := v1.FormatLabelSelector(&v1.LabelSelector{ + MatchLabels: map[string]string{ + DomainLabel: execID.GetDomain(), + ExecutionIDLabel: execID.GetName(), + ProjectLabel: execID.GetProject(), + }, + }) + assert.Equal(t, selector, listOpts.LabelSelector) assert.Equal(t, options.PropagationPolicy, &deletePropagationBackground) return nil } @@ -302,7 +309,7 @@ func TestAbort(t *testing.T) { func TestAbort_Notfound(t *testing.T) { fakeFlyteWorkflow := FakeFlyteWorkflow{} - fakeFlyteWorkflow.deleteCallback = func(name string, options *v1.DeleteOptions) error { + fakeFlyteWorkflow.deleteCollectionCallback = func(*v1.DeleteOptions, *v1.ListOptions) error { return k8_api_err.NewNotFound(schema.GroupResource{ Group: "foo", Resource: "bar", @@ -325,7 +332,7 @@ func TestAbort_Notfound(t *testing.T) { func TestAbort_MiscError(t *testing.T) { fakeFlyteWorkflow := FakeFlyteWorkflow{} - fakeFlyteWorkflow.deleteCallback = func(name string, options *v1.DeleteOptions) error { + fakeFlyteWorkflow.deleteCollectionCallback = func(*v1.DeleteOptions, *v1.ListOptions) error { return errors.New("call failed") } fakeFlyteWF.flyteWorkflowsCallback = func(ns string) v1alpha12.FlyteWorkflowInterface { diff --git a/flytepropeller/pkg/compiler/transformers/k8s/workflow.go b/flytepropeller/pkg/compiler/transformers/k8s/workflow.go index 2421ddf9bb..73e518c13e 100644 --- a/flytepropeller/pkg/compiler/transformers/k8s/workflow.go +++ b/flytepropeller/pkg/compiler/transformers/k8s/workflow.go @@ -2,17 +2,21 @@ package k8s import ( + "context" "fmt" "hash/fnv" "strings" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/rand" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" "github.com/flyteorg/flyte/flytepropeller/pkg/compiler/common" "github.com/flyteorg/flyte/flytepropeller/pkg/compiler/errors" + "github.com/flyteorg/flyte/flytepropeller/pkg/controller/config" "github.com/flyteorg/flyte/flytepropeller/pkg/utils" + "github.com/flyteorg/flyte/flytestdlib/logger" ) const ( @@ -30,6 +34,11 @@ const ( ShardKeyLabel = "shard-key" // The fully qualified FlyteWorkflow name WorkflowNameLabel = "workflow-name" + + // Length of hash to use as a suffix on the workflow CR name. Used when config.UseWorkflowCRNameSuffix is true. + // The workflow CR name should be at or under 63 characters long, here it is 52 + 1 + 10 = 63 + workflowCRNameHashLength = 10 + workflowCRNameSuffixFmt = "%.52s-%s" ) func requiresInputs(w *core.WorkflowTemplate) bool { @@ -159,6 +168,20 @@ func generateName(wfID *core.Identifier, execID *core.WorkflowExecutionIdentifie } } +func hashIdentifier(identifier core.Identifier) uint64 { + h := fnv.New64() + _, err := h.Write([]byte(fmt.Sprintf("%s:%s:%s", + identifier.Project, identifier.Domain, identifier.Name))) + if err != nil { + // This shouldn't occur. + logger.Errorf(context.Background(), + "failed to hash execution identifier: %+v with err: %v", identifier, err) + return 0 + } + logger.Debugf(context.Background(), "Returning hash for [%+v]: %d", identifier, h.Sum64()) + return h.Sum64() +} + // BuildFlyteWorkflow builds v1alpha1.FlyteWorkflow resource. Returned error, if not nil, is of type errors.CompilerErrors. func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.LiteralMap, executionID *core.WorkflowExecutionIdentifier, namespace string) (*v1alpha1.FlyteWorkflow, error) { @@ -231,7 +254,19 @@ func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.Li errs.Collect(errors.NewWorkflowBuildError(err)) } - obj.ObjectMeta.Name = name + if config.GetConfig().UseWorkflowCRNameSuffix { + // Seed the randomness before generating the name with random suffix + hashedIdentifier := hashIdentifier(core.Identifier{ + Project: project, + Domain: domain, + Name: name, + }) + rand.Seed(int64(hashedIdentifier)) + obj.ObjectMeta.Name = fmt.Sprintf(workflowCRNameSuffixFmt, name, rand.String(workflowCRNameHashLength)) + } else { + obj.ObjectMeta.Name = name + } + obj.ObjectMeta.GenerateName = generatedName obj.ObjectMeta.Labels[ExecutionIDLabel] = label obj.ObjectMeta.Labels[ProjectLabel] = project diff --git a/flytepropeller/pkg/compiler/transformers/k8s/workflow_test.go b/flytepropeller/pkg/compiler/transformers/k8s/workflow_test.go index dbb51e25eb..893f81119f 100644 --- a/flytepropeller/pkg/compiler/transformers/k8s/workflow_test.go +++ b/flytepropeller/pkg/compiler/transformers/k8s/workflow_test.go @@ -11,6 +11,7 @@ import ( "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flytepropeller/pkg/compiler/common" "github.com/flyteorg/flyte/flytepropeller/pkg/compiler/errors" + "github.com/flyteorg/flyte/flytepropeller/pkg/controller/config" "github.com/flyteorg/flyte/flytestdlib/utils" ) @@ -251,6 +252,47 @@ func TestBuildFlyteWorkflow_withUnionInputs(t *testing.T) { assert.Equal(t, "hello", wf.Inputs.Literals["y"].GetScalar().GetUnion().GetValue().GetScalar().GetPrimitive().GetStringValue()) } +func TestBuildFlyteWorkflow_setWorkflowCRNameHashLength(t *testing.T) { + for name, tt := range map[string]struct { + useSuffix bool + expected string + }{ + "default does not use hash as workflow CR name": { + useSuffix: false, + expected: "", + }, + "use hash as workflow CR name": { + useSuffix: true, + expected: "-x6m7gswrdl", + }, + } { + t.Run(name, func(t *testing.T) { + flyteConfig := config.GetConfig() + flyteConfig.UseWorkflowCRNameSuffix = tt.useSuffix + + w := createSampleMockWorkflow() + + errors.SetConfig(errors.Config{IncludeSource: true}) + wf, err := BuildFlyteWorkflow( + &core.CompiledWorkflowClosure{ + Primary: w.GetCoreWorkflow(), + Tasks: []*core.CompiledTask{ + { + Template: &core.TaskTemplate{ + Id: &core.Identifier{Name: "ref_1"}, + }, + }, + }, + }, + nil, nil, "") + assert.Equal(t, tt.expected, wf.ObjectMeta.Name) + assert.NoError(t, err) + assert.NotNil(t, wf) + errors.SetConfig(errors.Config{}) + }) + } +} + func TestGenerateName(t *testing.T) { t.Run("Invalid params", func(t *testing.T) { _, _, _, _, _, err := generateName(nil, nil) diff --git a/flytepropeller/pkg/controller/config/config.go b/flytepropeller/pkg/controller/config/config.go index a0217e186a..7700bc873c 100644 --- a/flytepropeller/pkg/controller/config/config.go +++ b/flytepropeller/pkg/controller/config/config.go @@ -120,6 +120,7 @@ var ( EventVersion: 0, DefaultParallelismBehavior: ParallelismBehaviorUnlimited, }, + UseWorkflowCRNameSuffix: false, } ) @@ -161,6 +162,7 @@ type Config struct { CreateFlyteWorkflowCRD bool `json:"create-flyteworkflow-crd" pflag:",Enable creation of the FlyteWorkflow CRD on startup"` NodeExecutionWorkerCount int `json:"node-execution-worker-count" pflag:",Number of workers to evaluate node executions, currently only used for array nodes"` ArrayNode ArrayNodeConfig `json:"array-node-config,omitempty" pflag:",Configuration for array nodes"` + UseWorkflowCRNameSuffix bool `json:"use-workflow-cr-name-suffix" pflag:",If false, the execution ID will be used as the workflow CR name. Otherwise, a hash of the execution ID, project, domain will be used as a suffix on the CR name."` } // KubeClientConfig contains the configuration used by flytepropeller to configure its internal Kubernetes Client.