From 9d7cac6b9c81e9a8bcb66327e590b9f31d51e9f6 Mon Sep 17 00:00:00 2001 From: Derek Wang Date: Thu, 8 Aug 2024 10:17:48 -0700 Subject: [PATCH] . Signed-off-by: Derek Wang --- pkg/apis/numaflow/v1alpha1/const.go | 1 + .../numaflow/v1alpha1/mono_vertex_types.go | 40 +++++++++++-------- pkg/apis/numaflow/v1alpha1/vertex_types.go | 4 +- .../numaflow/v1alpha1/vertex_types_test.go | 2 +- pkg/reconciler/pipeline/controller.go | 2 +- 5 files changed, 28 insertions(+), 21 deletions(-) diff --git a/pkg/apis/numaflow/v1alpha1/const.go b/pkg/apis/numaflow/v1alpha1/const.go index bf08279514..19a3274cef 100644 --- a/pkg/apis/numaflow/v1alpha1/const.go +++ b/pkg/apis/numaflow/v1alpha1/const.go @@ -168,6 +168,7 @@ const ( DefaultBufferLength = 30000 DefaultBufferUsageLimit = 0.8 DefaultReadBatchSize = 500 + DefaultReadTimeout = 1 * time.Second // Auto scaling DefaultLookbackSeconds = 120 // Default lookback seconds for calculating avg rate and pending diff --git a/pkg/apis/numaflow/v1alpha1/mono_vertex_types.go b/pkg/apis/numaflow/v1alpha1/mono_vertex_types.go index 7d855e6347..1930261fb5 100644 --- a/pkg/apis/numaflow/v1alpha1/mono_vertex_types.go +++ b/pkg/apis/numaflow/v1alpha1/mono_vertex_types.go @@ -149,16 +149,7 @@ func (mv MonoVertex) GetDaemonServiceObj() *corev1.Service { } func (mv MonoVertex) GetDaemonDeploymentObj(req GetMonoVertexDaemonDeploymentReq) (*appv1.Deployment, error) { - mvVtxCopy := &MonoVertex{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: mv.Namespace, - Name: mv.Name, - }, - Spec: mv.Spec.WithoutReplicas(), - } - // TODO: lifecycle - // mvVtxCopy.Spec.Lifecycle = Lifecycle{} - mvVtxCopyBytes, err := json.Marshal(mvVtxCopy) + mvVtxCopyBytes, err := json.Marshal(mv.simpleCopy()) if err != nil { return nil, fmt.Errorf("failed to marshal mono vertex spec") } @@ -270,15 +261,30 @@ func (mv MonoVertex) sidecarEnvs() []corev1.EnvVar { } } -func (mv MonoVertex) GetPodSpec(req GetMonoVertexPodSpecReq) (*corev1.PodSpec, error) { - monoVtxCopy := &MonoVertex{ +func (mv MonoVertex) simpleCopy() MonoVertex { + m := MonoVertex{ ObjectMeta: metav1.ObjectMeta{ Namespace: mv.Namespace, Name: mv.Name, }, - Spec: mv.Spec.WithoutReplicas(), + Spec: mv.Spec.DeepCopyWithoutReplicas(), } - monoVtxBytes, err := json.Marshal(monoVtxCopy) + if m.Spec.Limits != nil { + m.Spec.Limits = &MonoVertexLimits{} + } + if m.Spec.Limits.ReadBatchSize != nil { + m.Spec.Limits.ReadBatchSize = ptr.To[uint64](DefaultReadBatchSize) + } + if m.Spec.Limits.ReadTimeout == nil { + m.Spec.Limits.ReadTimeout = &metav1.Duration{Duration: DefaultReadTimeout} + } + // TODO: lifecycle + // mvVtxCopy.Spec.Lifecycle = Lifecycle{} + return m +} + +func (mv MonoVertex) GetPodSpec(req GetMonoVertexPodSpecReq) (*corev1.PodSpec, error) { + monoVtxBytes, err := json.Marshal(mv.simpleCopy()) if err != nil { return nil, errors.New("failed to marshal mono vertex spec") } @@ -389,7 +395,7 @@ type MonoVertexSpec struct { DaemonTemplate *DaemonTemplate `json:"daemonTemplate,omitempty" protobuf:"bytes,11,opt,name=daemonTemplate"` } -func (mvspec MonoVertexSpec) WithoutReplicas() MonoVertexSpec { +func (mvspec MonoVertexSpec) DeepCopyWithoutReplicas() MonoVertexSpec { x := *mvspec.DeepCopy() x.Replicas = ptr.To[int32](0) return x @@ -426,14 +432,14 @@ type MonoVertexLimits struct { func (mvl MonoVertexLimits) GetReadBatchSize() uint64 { if mvl.ReadBatchSize == nil { - return 500 + return DefaultReadBatchSize } return *mvl.ReadBatchSize } func (mvl MonoVertexLimits) GetReadTimeout() time.Duration { if mvl.ReadTimeout == nil { - return 1 * time.Second + return DefaultReadTimeout } return mvl.ReadTimeout.Duration } diff --git a/pkg/apis/numaflow/v1alpha1/vertex_types.go b/pkg/apis/numaflow/v1alpha1/vertex_types.go index d87df76116..c93af36a98 100644 --- a/pkg/apis/numaflow/v1alpha1/vertex_types.go +++ b/pkg/apis/numaflow/v1alpha1/vertex_types.go @@ -214,7 +214,7 @@ func (v Vertex) GetPodSpec(req GetVertexPodSpecReq) (*corev1.PodSpec, error) { Namespace: v.Namespace, Name: v.Name, }, - Spec: v.Spec.WithoutReplicas(), + Spec: v.Spec.DeepCopyWithoutReplicas(), } vertexBytes, err := json.Marshal(vertexCopy) if err != nil { @@ -480,7 +480,7 @@ func (v Vertex) getInitContainers(req GetVertexPodSpecReq) []corev1.Container { return append(initContainers, v.Spec.InitContainers...) } -func (vs VertexSpec) WithoutReplicas() VertexSpec { +func (vs VertexSpec) DeepCopyWithoutReplicas() VertexSpec { x := *vs.DeepCopy() x.Replicas = ptr.To[int32](0) return x diff --git a/pkg/apis/numaflow/v1alpha1/vertex_types_test.go b/pkg/apis/numaflow/v1alpha1/vertex_types_test.go index 0ae463859d..a664fee1d0 100644 --- a/pkg/apis/numaflow/v1alpha1/vertex_types_test.go +++ b/pkg/apis/numaflow/v1alpha1/vertex_types_test.go @@ -121,7 +121,7 @@ func TestWithoutReplicas(t *testing.T) { s := &VertexSpec{ Replicas: ptr.To[int32](3), } - assert.Equal(t, int32(0), *s.WithoutReplicas().Replicas) + assert.Equal(t, int32(0), *s.DeepCopyWithoutReplicas().Replicas) } func TestGetVertexReplicas(t *testing.T) { diff --git a/pkg/reconciler/pipeline/controller.go b/pkg/reconciler/pipeline/controller.go index 320eb9179b..c9d824e941 100644 --- a/pkg/reconciler/pipeline/controller.go +++ b/pkg/reconciler/pipeline/controller.go @@ -623,7 +623,7 @@ func buildVertices(pl *dfv1.Pipeline) map[string]dfv1.Vertex { Watermark: pl.Spec.Watermark, Replicas: &replicas, } - hash := sharedutil.MustHash(spec.WithoutReplicas()) + hash := sharedutil.MustHash(spec.DeepCopyWithoutReplicas()) obj := dfv1.Vertex{ ObjectMeta: metav1.ObjectMeta{ Namespace: pl.Namespace,