Skip to content

Commit

Permalink
chore: make sidecar container change backward compatible (#2236)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy authored Nov 22, 2024
1 parent c5afc90 commit ae05a7c
Show file tree
Hide file tree
Showing 12 changed files with 87 additions and 13 deletions.
2 changes: 2 additions & 0 deletions pkg/apis/numaflow/v1alpha1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ const (
EnvServingStoreTTL = "NUMAFLOW_SERVING_STORE_TTL"
EnvExecuteRustBinary = "NUMAFLOW_EXECUTE_RUST_BINARY"

EnvK8sServerVersion = "K8S_SERVER_VERSION"

PathVarRun = "/var/run/numaflow"
VertexMetricsPort = 2469
VertexMetricsPortName = "metrics"
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/numaflow/v1alpha1/container_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ func (b containerBuilder) resources(x corev1.ResourceRequirements) containerBuil
}

func (b containerBuilder) asSidecar() containerBuilder {
// TODO: (k8s 1.29) clean this up once we deprecate the support for k8s < 1.29
if !isSidecarSupported() {
return b
}
b.RestartPolicy = ptr.To[corev1.ContainerRestartPolicy](corev1.ContainerRestartPolicyAlways)
return b
}
Expand Down
33 changes: 33 additions & 0 deletions pkg/apis/numaflow/v1alpha1/deprecated.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
Copyright 2022 The Numaproj Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1alpha1

import (
"os"
"strconv"
)

// TODO: (k8s 1.29) Remove this once we deprecate the support for k8s < 1.29
func isSidecarSupported() bool {
v := os.Getenv(EnvK8sServerVersion)
if v == "" {
return true // default to true if the env var is not found
}
// e.g. 1.31
k8sVersion, _ := strconv.ParseFloat(v, 32)
return k8sVersion >= 1.29
}
7 changes: 6 additions & 1 deletion pkg/apis/numaflow/v1alpha1/mono_vertex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,12 @@ func (mv MonoVertex) GetPodSpec(req GetMonoVertexPodSpecReq) (*corev1.PodSpec, e

initContainers := []corev1.Container{}
initContainers = append(initContainers, mv.Spec.InitContainers...)
initContainers = append(initContainers, sidecarContainers...)
// TODO: (k8s 1.29) clean this up once we deprecate the support for k8s < 1.29
if isSidecarSupported() {
initContainers = append(initContainers, sidecarContainers...)
} else {
containers = append(containers, sidecarContainers...)
}

spec := &corev1.PodSpec{
Subdomain: mv.GetHeadlessServiceName(),
Expand Down
3 changes: 2 additions & 1 deletion pkg/apis/numaflow/v1alpha1/pipeline_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,8 @@ func Test_GetSideInputManagerDeployments(t *testing.T) {
deployments, err := testObj.GetSideInputsManagerDeployments(testGetSideInputDeploymentReq)
assert.Nil(t, err)
assert.Equal(t, 1, len(deployments))
assert.Equal(t, 2, len(deployments[0].Spec.Template.Spec.Containers))
assert.Equal(t, 1, len(deployments[0].Spec.Template.Spec.Containers))
assert.Equal(t, 2, len(deployments[0].Spec.Template.Spec.InitContainers))
})
}

Expand Down
21 changes: 15 additions & 6 deletions pkg/apis/numaflow/v1alpha1/side_inputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@ func (si SideInput) getManagerDeploymentObj(pipeline Pipeline, req GetSideInputD
volumes = append(volumes, si.Volumes...)
}
volumeMounts := []corev1.VolumeMount{{Name: varVolumeName, MountPath: PathVarRun}}
numaContainer.VolumeMounts = append(numaContainer.VolumeMounts, volumeMounts...)
sidecarContainer := si.getUDContainer(req)
sidecarContainer.VolumeMounts = append(sidecarContainer.VolumeMounts, volumeMounts...)
containers := []corev1.Container{*numaContainer}
initContainers := []corev1.Container{si.getInitContainer(pipeline, req), sidecarContainer}

// TODO: (k8s 1.29) clean this up once we deprecate the support for k8s < 1.29
if !isSidecarSupported() {
initContainers = []corev1.Container{si.getInitContainer(pipeline, req)}
containers = []corev1.Container{*numaContainer, sidecarContainer}
}

deployment := &appv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: pipeline.GetSideInputsManagerDeploymentName(si.Name),
Expand All @@ -93,8 +105,8 @@ func (si SideInput) getManagerDeploymentObj(pipeline Pipeline, req GetSideInputD
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{*numaContainer, si.getUDContainer(req)},
InitContainers: []corev1.Container{si.getInitContainer(pipeline, req)},
Containers: containers,
InitContainers: initContainers,
Volumes: volumes,
},
},
Expand All @@ -103,9 +115,6 @@ func (si SideInput) getManagerDeploymentObj(pipeline Pipeline, req GetSideInputD
if x := pipeline.Spec.Templates; x != nil && x.SideInputsManagerTemplate != nil {
x.SideInputsManagerTemplate.ApplyToPodTemplateSpec(&deployment.Spec.Template)
}
for i := range deployment.Spec.Template.Spec.Containers {
deployment.Spec.Template.Spec.Containers[i].VolumeMounts = append(deployment.Spec.Template.Spec.Containers[i].VolumeMounts, volumeMounts...)
}
return deployment, nil
}

Expand Down Expand Up @@ -157,7 +166,7 @@ func (si SideInput) getUDContainer(req GetSideInputDeploymentReq) corev1.Contain
cb := containerBuilder{}.
name(CtrUdSideInput).
image(si.Container.Image).
imagePullPolicy(req.PullPolicy)
imagePullPolicy(req.PullPolicy).asSidecar()
if si.Container.ImagePullPolicy != nil {
cb = cb.imagePullPolicy(*si.Container.ImagePullPolicy)
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/apis/numaflow/v1alpha1/side_inputs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
"k8s.io/utils/ptr"
)

var (
Expand Down Expand Up @@ -65,6 +66,7 @@ func Test_getUDContainer(t *testing.T) {
}
assert.Equal(t, envs[EnvUDContainerType], UDContainerSideInputs)
assert.Equal(t, imagePullNever, c.ImagePullPolicy)
assert.Equal(t, ptr.To[corev1.ContainerRestartPolicy](corev1.ContainerRestartPolicyAlways), c.RestartPolicy)
}

func Test_getNumaContainer(t *testing.T) {
Expand Down Expand Up @@ -99,7 +101,7 @@ func Test_getManagerDeploymentObj(t *testing.T) {
deploy, err := newObj.getManagerDeploymentObj(*testPipeline, testGetSideInputDeploymentReq)
assert.NoError(t, err)
assert.NotNil(t, deploy)
assert.Equal(t, 1, len(deploy.Spec.Template.Spec.InitContainers))
assert.Equal(t, 2, len(deploy.Spec.Template.Spec.Containers))
assert.Equal(t, 2, len(deploy.Spec.Template.Spec.InitContainers))
assert.Equal(t, 1, len(deploy.Spec.Template.Spec.Containers))
assert.Equal(t, 2, len(deploy.Spec.Template.Spec.Volumes))
}
7 changes: 6 additions & 1 deletion pkg/apis/numaflow/v1alpha1/vertex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,12 @@ func (v Vertex) GetPodSpec(req GetVertexPodSpecReq) (*corev1.PodSpec, error) {
}

// Add the sidecar containers
initContainers = append(initContainers, sidecarContainers...)
// TODO: (k8s 1.29) clean this up once we deprecate the support for k8s <1.29
if isSidecarSupported() {
initContainers = append(initContainers, sidecarContainers...)
} else {
containers = append(containers, sidecarContainers...)
}

if v.IsASource() && v.Spec.Source.Serving != nil {
servingContainer, err := v.getServingContainer(req)
Expand Down
11 changes: 11 additions & 0 deletions pkg/reconciler/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package cmd

import (
"context"
"fmt"
"os"
"time"

"go.uber.org/zap"
Expand Down Expand Up @@ -121,6 +123,15 @@ func Start(namespaced bool, managedNamespace string) {

kubeClient := kubernetes.NewForConfigOrDie(restConfig)

// TODO: clean up?
if svrVersion, err := kubeClient.ServerVersion(); err != nil {
logger.Fatalw("Failed to get k8s cluster server version", zap.Error(err))
} else {
k8sVersion := fmt.Sprintf("%s.%s", svrVersion.Major, svrVersion.Minor)
os.Setenv(dfv1.EnvK8sServerVersion, k8sVersion)
logger.Infof("Kubernetes server version: %s", k8sVersion)
}

// Readiness probe
if err := mgr.AddReadyzCheck("readiness", healthz.Ping); err != nil {
logger.Fatalw("Unable add a readiness check", zap.Error(err))
Expand Down
2 changes: 1 addition & 1 deletion pkg/reconciler/isbsvc/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (r *interStepBufferServiceReconciler) Reconcile(ctx context.Context, req ct
if !equality.Semantic.DeepEqual(isbSvc.Finalizers, isbSvcCopy.Finalizers) {
patchYaml := "metadata:\n finalizers: [" + strings.Join(isbSvcCopy.Finalizers, ",") + "]"
patchJson, _ := yaml.YAMLToJSON([]byte(patchYaml))
if err := r.client.Patch(ctx, isbSvc, client.RawPatch(types.MergePatchType, []byte(patchJson))); err != nil {
if err := r.client.Patch(ctx, isbSvc, client.RawPatch(types.MergePatchType, patchJson)); err != nil {
return ctrl.Result{}, err
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/reconciler/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (r *pipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
if !equality.Semantic.DeepEqual(pl.Finalizers, plCopy.Finalizers) {
patchYaml := "metadata:\n finalizers: [" + strings.Join(plCopy.Finalizers, ",") + "]"
patchJson, _ := yaml.YAMLToJSON([]byte(patchYaml))
if err := r.client.Patch(ctx, pl, client.RawPatch(types.MergePatchType, []byte(patchJson))); err != nil {
if err := r.client.Patch(ctx, pl, client.RawPatch(types.MergePatchType, patchJson)); err != nil {
return result, err
}
}
Expand Down
2 changes: 2 additions & 0 deletions test/sideinputs-e2e/testdata/map-sideinput-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ spec:
source:
http: {}
- name: si-e2e
scale:
min: 1
udf:
container:
# A map side input udf, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sideinput/examples/map_sideinput/udf
Expand Down

0 comments on commit ae05a7c

Please sign in to comment.