Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy committed Aug 8, 2024
1 parent 4c39efc commit 9d7cac6
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 21 deletions.
1 change: 1 addition & 0 deletions pkg/apis/numaflow/v1alpha1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ const (
DefaultBufferLength = 30000
DefaultBufferUsageLimit = 0.8
DefaultReadBatchSize = 500
DefaultReadTimeout = 1 * time.Second

// Auto scaling
DefaultLookbackSeconds = 120 // Default lookback seconds for calculating avg rate and pending
Expand Down
40 changes: 23 additions & 17 deletions pkg/apis/numaflow/v1alpha1/mono_vertex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,16 +149,7 @@ func (mv MonoVertex) GetDaemonServiceObj() *corev1.Service {
}

func (mv MonoVertex) GetDaemonDeploymentObj(req GetMonoVertexDaemonDeploymentReq) (*appv1.Deployment, error) {
mvVtxCopy := &MonoVertex{
ObjectMeta: metav1.ObjectMeta{
Namespace: mv.Namespace,
Name: mv.Name,
},
Spec: mv.Spec.WithoutReplicas(),
}
// TODO: lifecycle
// mvVtxCopy.Spec.Lifecycle = Lifecycle{}
mvVtxCopyBytes, err := json.Marshal(mvVtxCopy)
mvVtxCopyBytes, err := json.Marshal(mv.simpleCopy())
if err != nil {
return nil, fmt.Errorf("failed to marshal mono vertex spec")
}
Expand Down Expand Up @@ -270,15 +261,30 @@ func (mv MonoVertex) sidecarEnvs() []corev1.EnvVar {
}
}

func (mv MonoVertex) GetPodSpec(req GetMonoVertexPodSpecReq) (*corev1.PodSpec, error) {
monoVtxCopy := &MonoVertex{
func (mv MonoVertex) simpleCopy() MonoVertex {
m := MonoVertex{
ObjectMeta: metav1.ObjectMeta{
Namespace: mv.Namespace,
Name: mv.Name,
},
Spec: mv.Spec.WithoutReplicas(),
Spec: mv.Spec.DeepCopyWithoutReplicas(),
}
monoVtxBytes, err := json.Marshal(monoVtxCopy)
if m.Spec.Limits != nil {
m.Spec.Limits = &MonoVertexLimits{}
}
if m.Spec.Limits.ReadBatchSize != nil {
m.Spec.Limits.ReadBatchSize = ptr.To[uint64](DefaultReadBatchSize)
}
if m.Spec.Limits.ReadTimeout == nil {
m.Spec.Limits.ReadTimeout = &metav1.Duration{Duration: DefaultReadTimeout}
}
// TODO: lifecycle
// mvVtxCopy.Spec.Lifecycle = Lifecycle{}
return m
}

func (mv MonoVertex) GetPodSpec(req GetMonoVertexPodSpecReq) (*corev1.PodSpec, error) {
monoVtxBytes, err := json.Marshal(mv.simpleCopy())
if err != nil {
return nil, errors.New("failed to marshal mono vertex spec")
}
Expand Down Expand Up @@ -389,7 +395,7 @@ type MonoVertexSpec struct {
DaemonTemplate *DaemonTemplate `json:"daemonTemplate,omitempty" protobuf:"bytes,11,opt,name=daemonTemplate"`
}

func (mvspec MonoVertexSpec) WithoutReplicas() MonoVertexSpec {
func (mvspec MonoVertexSpec) DeepCopyWithoutReplicas() MonoVertexSpec {
x := *mvspec.DeepCopy()
x.Replicas = ptr.To[int32](0)
return x
Expand Down Expand Up @@ -426,14 +432,14 @@ type MonoVertexLimits struct {

func (mvl MonoVertexLimits) GetReadBatchSize() uint64 {
if mvl.ReadBatchSize == nil {
return 500
return DefaultReadBatchSize
}
return *mvl.ReadBatchSize
}

func (mvl MonoVertexLimits) GetReadTimeout() time.Duration {
if mvl.ReadTimeout == nil {
return 1 * time.Second
return DefaultReadTimeout
}
return mvl.ReadTimeout.Duration
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/numaflow/v1alpha1/vertex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (v Vertex) GetPodSpec(req GetVertexPodSpecReq) (*corev1.PodSpec, error) {
Namespace: v.Namespace,
Name: v.Name,
},
Spec: v.Spec.WithoutReplicas(),
Spec: v.Spec.DeepCopyWithoutReplicas(),
}
vertexBytes, err := json.Marshal(vertexCopy)
if err != nil {
Expand Down Expand Up @@ -480,7 +480,7 @@ func (v Vertex) getInitContainers(req GetVertexPodSpecReq) []corev1.Container {
return append(initContainers, v.Spec.InitContainers...)
}

func (vs VertexSpec) WithoutReplicas() VertexSpec {
func (vs VertexSpec) DeepCopyWithoutReplicas() VertexSpec {
x := *vs.DeepCopy()
x.Replicas = ptr.To[int32](0)
return x
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/vertex_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func TestWithoutReplicas(t *testing.T) {
s := &VertexSpec{
Replicas: ptr.To[int32](3),
}
assert.Equal(t, int32(0), *s.WithoutReplicas().Replicas)
assert.Equal(t, int32(0), *s.DeepCopyWithoutReplicas().Replicas)
}

func TestGetVertexReplicas(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/reconciler/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ func buildVertices(pl *dfv1.Pipeline) map[string]dfv1.Vertex {
Watermark: pl.Spec.Watermark,
Replicas: &replicas,
}
hash := sharedutil.MustHash(spec.WithoutReplicas())
hash := sharedutil.MustHash(spec.DeepCopyWithoutReplicas())
obj := dfv1.Vertex{
ObjectMeta: metav1.ObjectMeta{
Namespace: pl.Namespace,
Expand Down

0 comments on commit 9d7cac6

Please sign in to comment.