Skip to content

Commit

Permalink
Change Flyte CR naming scheme to better support namespace_mapping
Browse files Browse the repository at this point in the history
 - Typically Flyte is configured so that each project / domain has its
   own Kubernetes namespace.

   Certain environments may change this behavior by using the Flyteadmin
   namespace_mapping setting to put all executions in fewer (or a singular)
   Kubernetes namespace. This is problematic because it can lead to
   collisions in the naming of the CR that flyteadmin generates.

 - This patch fixes 2 important things to make this work properly inside
   of Flyte:

   * it adds a random element to the CR name in Flyte so that the CR is
     named by the execution + some unique value when created by
     flyteadmin

     Without this change, an execution Foo in project A will prevent an
     execution Foo in project B from launching, because the name of the
     CR thats generated in Kubernetes *assumes* that the namespace the
     CRs are put into is different for project A and project B

     When namespace_mapping is set to a singular value, that assumption
     is wrong

   * it makes sure that when flytepropeller cleans up the CR resource
     that it uses Kubernetes labels to find the correct CR -- so instead
     of assuming that it can use the execution name, it instead uses the
     project, domain and execution labels

 - Use deterministic hash of execution id, name, project, as the FlyteWorkflow
   CR name. Add workflow-cr-name-hash-length to flytepropeller config.

Signed-off-by: ddl-ebrown <[email protected]>
Signed-off-by: ddl-rliu <[email protected]>
  • Loading branch information
ddl-rliu authored and ddl-ebrown committed Aug 28, 2024
1 parent 7136919 commit a268b61
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 14 deletions.
28 changes: 25 additions & 3 deletions flyteadmin/pkg/workflowengine/impl/k8s_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
execClusterInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/executioncluster/interfaces"
runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyte/flyteadmin/pkg/workflowengine/interfaces"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/storage"
)
Expand Down Expand Up @@ -87,16 +88,37 @@ func (e K8sWorkflowExecutor) Execute(ctx context.Context, data interfaces.Execut
}, nil
}

const (
// Labels that are set on the FlyteWorkflow CRD
DomainLabel = "domain"
ExecutionIDLabel = "execution-id"
ProjectLabel = "project"
)

func executionLabelSelector(executionID *core.WorkflowExecutionIdentifier) *v1.LabelSelector {
return &v1.LabelSelector{
MatchLabels: map[string]string{
DomainLabel: executionID.GetDomain(),
ExecutionIDLabel: executionID.GetName(),
ProjectLabel: executionID.GetProject(),
},
}
}

func (e K8sWorkflowExecutor) Abort(ctx context.Context, data interfaces.AbortData) error {
target, err := e.executionCluster.GetTarget(ctx, &executioncluster.ExecutionTargetSpec{
TargetID: data.Cluster,
})
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, err.Error())
}
err = target.FlyteClient.FlyteworkflowV1alpha1().FlyteWorkflows(data.Namespace).Delete(ctx, data.ExecutionID.GetName(), v1.DeleteOptions{
PropagationPolicy: &deletePropagationBackground,
})
err = target.FlyteClient.FlyteworkflowV1alpha1().FlyteWorkflows(data.Namespace).DeleteCollection(
ctx,
v1.DeleteOptions{PropagationPolicy: &deletePropagationBackground},
v1.ListOptions{
LabelSelector: v1.FormatLabelSelector(executionLabelSelector(data.ExecutionID)),
},
)
// An IsNotFound error indicates the resource is already deleted.
if err != nil && !k8_api_err.IsNotFound(err) {
return errors.NewFlyteAdminErrorf(codes.Internal, "failed to terminate execution: %v with err %v", data.ExecutionID, err)
Expand Down
27 changes: 17 additions & 10 deletions flyteadmin/pkg/workflowengine/impl/k8s_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ import (
var fakeFlyteWF = FakeFlyteWorkflowV1alpha1{}

type createCallback func(*v1alpha1.FlyteWorkflow, v1.CreateOptions) (*v1alpha1.FlyteWorkflow, error)
type deleteCallback func(name string, options *v1.DeleteOptions) error
type deleteCollectionCallback func(*v1.DeleteOptions, *v1.ListOptions) error
type FakeFlyteWorkflow struct {
v1alpha12.FlyteWorkflowInterface
createCallback createCallback
deleteCallback deleteCallback
createCallback
deleteCollectionCallback
}

func (b *FakeFlyteWorkflow) Create(ctx context.Context, wf *v1alpha1.FlyteWorkflow, opts v1.CreateOptions) (*v1alpha1.FlyteWorkflow, error) {
Expand All @@ -45,9 +45,9 @@ func (b *FakeFlyteWorkflow) Create(ctx context.Context, wf *v1alpha1.FlyteWorkfl
return nil, nil
}

func (b *FakeFlyteWorkflow) Delete(ctx context.Context, name string, options v1.DeleteOptions) error {
if b.deleteCallback != nil {
return b.deleteCallback(name, &options)
func (b *FakeFlyteWorkflow) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error {
if b.deleteCollectionCallback != nil {
return b.deleteCollectionCallback(&opts, &listOpts)
}
return nil
}
Expand Down Expand Up @@ -280,8 +280,15 @@ func TestExecute_MiscError(t *testing.T) {

func TestAbort(t *testing.T) {
fakeFlyteWorkflow := FakeFlyteWorkflow{}
fakeFlyteWorkflow.deleteCallback = func(name string, options *v1.DeleteOptions) error {
assert.Equal(t, execID.Name, name)
fakeFlyteWorkflow.deleteCollectionCallback = func(options *v1.DeleteOptions, listOpts *v1.ListOptions) error {
selector := v1.FormatLabelSelector(&v1.LabelSelector{
MatchLabels: map[string]string{
DomainLabel: execID.GetDomain(),
ExecutionIDLabel: execID.GetName(),
ProjectLabel: execID.GetProject(),
},
})
assert.Equal(t, selector, listOpts.LabelSelector)
assert.Equal(t, options.PropagationPolicy, &deletePropagationBackground)
return nil
}
Expand All @@ -302,7 +309,7 @@ func TestAbort(t *testing.T) {

func TestAbort_Notfound(t *testing.T) {
fakeFlyteWorkflow := FakeFlyteWorkflow{}
fakeFlyteWorkflow.deleteCallback = func(name string, options *v1.DeleteOptions) error {
fakeFlyteWorkflow.deleteCollectionCallback = func(*v1.DeleteOptions, *v1.ListOptions) error {
return k8_api_err.NewNotFound(schema.GroupResource{
Group: "foo",
Resource: "bar",
Expand All @@ -325,7 +332,7 @@ func TestAbort_Notfound(t *testing.T) {

func TestAbort_MiscError(t *testing.T) {
fakeFlyteWorkflow := FakeFlyteWorkflow{}
fakeFlyteWorkflow.deleteCallback = func(name string, options *v1.DeleteOptions) error {
fakeFlyteWorkflow.deleteCollectionCallback = func(*v1.DeleteOptions, *v1.ListOptions) error {
return errors.New("call failed")
}
fakeFlyteWF.flyteWorkflowsCallback = func(ns string) v1alpha12.FlyteWorkflowInterface {
Expand Down
37 changes: 36 additions & 1 deletion flytepropeller/pkg/compiler/transformers/k8s/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,21 @@
package k8s

import (
"context"
"fmt"
"hash/fnv"
"strings"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/rand"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flyte/flytepropeller/pkg/compiler/common"
"github.com/flyteorg/flyte/flytepropeller/pkg/compiler/errors"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flyte/flytepropeller/pkg/utils"
"github.com/flyteorg/flyte/flytestdlib/logger"
)

const (
Expand All @@ -30,6 +34,11 @@ const (
ShardKeyLabel = "shard-key"
// The fully qualified FlyteWorkflow name
WorkflowNameLabel = "workflow-name"

// Length of hash to use as a suffix on the workflow CR name. Used when config.UseWorkflowCRNameSuffix is true.
// The workflow CR name should be at or under 63 characters long, here it is 52 + 1 + 10 = 63
workflowCRNameHashLength = 10
workflowCRNameSuffixFmt = "%.52s-%s"
)

func requiresInputs(w *core.WorkflowTemplate) bool {
Expand Down Expand Up @@ -159,6 +168,20 @@ func generateName(wfID *core.Identifier, execID *core.WorkflowExecutionIdentifie
}
}

func hashIdentifier(identifier core.Identifier) uint64 {
h := fnv.New64()
_, err := h.Write([]byte(fmt.Sprintf("%s:%s:%s",
identifier.Project, identifier.Domain, identifier.Name)))
if err != nil {
// This shouldn't occur.
logger.Errorf(context.Background(),
"failed to hash execution identifier: %+v with err: %v", identifier, err)
return 0

Check warning on line 179 in flytepropeller/pkg/compiler/transformers/k8s/workflow.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/compiler/transformers/k8s/workflow.go#L177-L179

Added lines #L177 - L179 were not covered by tests
}
logger.Debugf(context.Background(), "Returning hash for [%+v]: %d", identifier, h.Sum64())
return h.Sum64()
}

// BuildFlyteWorkflow builds v1alpha1.FlyteWorkflow resource. Returned error, if not nil, is of type errors.CompilerErrors.
func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.LiteralMap,
executionID *core.WorkflowExecutionIdentifier, namespace string) (*v1alpha1.FlyteWorkflow, error) {
Expand Down Expand Up @@ -231,7 +254,19 @@ func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.Li
errs.Collect(errors.NewWorkflowBuildError(err))
}

obj.ObjectMeta.Name = name
if config.GetConfig().UseWorkflowCRNameSuffix {
// Seed the randomness before generating the name with random suffix
hashedIdentifier := hashIdentifier(core.Identifier{
Project: project,
Domain: domain,
Name: name,
})
rand.Seed(int64(hashedIdentifier))
obj.ObjectMeta.Name = fmt.Sprintf(workflowCRNameSuffixFmt, name, rand.String(workflowCRNameHashLength))
} else {
obj.ObjectMeta.Name = name
}

obj.ObjectMeta.GenerateName = generatedName
obj.ObjectMeta.Labels[ExecutionIDLabel] = label
obj.ObjectMeta.Labels[ProjectLabel] = project
Expand Down
42 changes: 42 additions & 0 deletions flytepropeller/pkg/compiler/transformers/k8s/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytepropeller/pkg/compiler/common"
"github.com/flyteorg/flyte/flytepropeller/pkg/compiler/errors"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flyte/flytestdlib/utils"
)

Expand Down Expand Up @@ -251,6 +252,47 @@ func TestBuildFlyteWorkflow_withUnionInputs(t *testing.T) {
assert.Equal(t, "hello", wf.Inputs.Literals["y"].GetScalar().GetUnion().GetValue().GetScalar().GetPrimitive().GetStringValue())
}

func TestBuildFlyteWorkflow_setWorkflowCRNameHashLength(t *testing.T) {
for name, tt := range map[string]struct {
useSuffix bool
expected string
}{
"default does not use hash as workflow CR name": {
useSuffix: false,
expected: "",
},
"use hash as workflow CR name": {
useSuffix: true,
expected: "-x6m7gswrdl",
},
} {
t.Run(name, func(t *testing.T) {
flyteConfig := config.GetConfig()
flyteConfig.UseWorkflowCRNameSuffix = tt.useSuffix

w := createSampleMockWorkflow()

errors.SetConfig(errors.Config{IncludeSource: true})
wf, err := BuildFlyteWorkflow(
&core.CompiledWorkflowClosure{
Primary: w.GetCoreWorkflow(),
Tasks: []*core.CompiledTask{
{
Template: &core.TaskTemplate{
Id: &core.Identifier{Name: "ref_1"},
},
},
},
},
nil, nil, "")
assert.Equal(t, tt.expected, wf.ObjectMeta.Name)
assert.NoError(t, err)
assert.NotNil(t, wf)
errors.SetConfig(errors.Config{})
})
}
}

func TestGenerateName(t *testing.T) {
t.Run("Invalid params", func(t *testing.T) {
_, _, _, _, _, err := generateName(nil, nil)
Expand Down
2 changes: 2 additions & 0 deletions flytepropeller/pkg/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ var (
EventVersion: 0,
DefaultParallelismBehavior: ParallelismBehaviorUnlimited,
},
UseWorkflowCRNameSuffix: false,
}
)

Expand Down Expand Up @@ -161,6 +162,7 @@ type Config struct {
CreateFlyteWorkflowCRD bool `json:"create-flyteworkflow-crd" pflag:",Enable creation of the FlyteWorkflow CRD on startup"`
NodeExecutionWorkerCount int `json:"node-execution-worker-count" pflag:",Number of workers to evaluate node executions, currently only used for array nodes"`
ArrayNode ArrayNodeConfig `json:"array-node-config,omitempty" pflag:",Configuration for array nodes"`
UseWorkflowCRNameSuffix bool `json:"use-workflow-cr-name-suffix" pflag:",If false, the execution ID will be used as the workflow CR name. Otherwise, a hash of the execution ID, project, domain will be used as a suffix on the CR name."`
}

// KubeClientConfig contains the configuration used by flytepropeller to configure its internal Kubernetes Client.
Expand Down

0 comments on commit a268b61

Please sign in to comment.