Skip to content

Commit

Permalink
Merge branch 'main' into serving-sourcer-traits
Browse files Browse the repository at this point in the history
  • Loading branch information
BulkBeing authored Jan 2, 2025
2 parents ca8cdeb + bb4a0de commit 76c2371
Show file tree
Hide file tree
Showing 15 changed files with 373 additions and 82 deletions.
11 changes: 11 additions & 0 deletions pkg/apis/numaflow/v1alpha1/deprecated.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,14 @@ func isSidecarSupported() bool {
k8sVersion, _ := strconv.ParseFloat(v, 32)
return k8sVersion >= 1.29
}

// TODO: (k8s 1.27) Remove this once we deprecate the support for k8s < 1.27
func IsPVCRetentionPolicySupported() bool {
v := os.Getenv(EnvK8sServerVersion)
if v == "" {
return true // default to true if the env var is not found
}
// e.g. 1.31
k8sVersion, _ := strconv.ParseFloat(v, 32)
return k8sVersion >= 1.27
}
4 changes: 4 additions & 0 deletions pkg/apis/numaflow/v1alpha1/jetstream_buffer_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,10 @@ func (j JetStreamBufferService) GetStatefulSetSpec(req GetJetStreamStatefulSetSp
}
j.AbstractPodTemplate.ApplyToPodSpec(podSpec)
spec := appv1.StatefulSetSpec{
PersistentVolumeClaimRetentionPolicy: &appv1.StatefulSetPersistentVolumeClaimRetentionPolicy{
WhenDeleted: appv1.DeletePersistentVolumeClaimRetentionPolicyType,
WhenScaled: appv1.RetainPersistentVolumeClaimRetentionPolicyType,
},
PodManagementPolicy: appv1.ParallelPodManagement,
Replicas: &replicas,
ServiceName: req.ServiceName,
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/numaflow/v1alpha1/jetstream_buffer_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
appv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
)
Expand Down Expand Up @@ -74,6 +75,9 @@ func TestJetStreamGetStatefulSetSpec(t *testing.T) {
},
}
spec := s.GetStatefulSetSpec(req)
assert.NotNil(t, spec.PersistentVolumeClaimRetentionPolicy)
assert.Equal(t, appv1.DeletePersistentVolumeClaimRetentionPolicyType, spec.PersistentVolumeClaimRetentionPolicy.WhenDeleted)
assert.Equal(t, appv1.RetainPersistentVolumeClaimRetentionPolicyType, spec.PersistentVolumeClaimRetentionPolicy.WhenScaled)
assert.True(t, len(spec.VolumeClaimTemplates) > 0)
})

Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/numaflow/v1alpha1/redis_buffer_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,10 @@ redis_exporter`},
nr.AbstractPodTemplate.ApplyToPodSpec(podSpec)

spec := appv1.StatefulSetSpec{
PersistentVolumeClaimRetentionPolicy: &appv1.StatefulSetPersistentVolumeClaimRetentionPolicy{
WhenDeleted: appv1.DeletePersistentVolumeClaimRetentionPolicyType,
WhenScaled: appv1.RetainPersistentVolumeClaimRetentionPolicyType,
},
Replicas: &replicas,
ServiceName: req.ServiceName,
Selector: &metav1.LabelSelector{
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/numaflow/v1alpha1/redis_buffer_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
appv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
)
Expand Down Expand Up @@ -66,6 +67,9 @@ func TestRedisGetStatefulSetSpec(t *testing.T) {
},
}
spec := s.GetStatefulSetSpec(req)
assert.NotNil(t, spec.PersistentVolumeClaimRetentionPolicy)
assert.Equal(t, appv1.DeletePersistentVolumeClaimRetentionPolicyType, spec.PersistentVolumeClaimRetentionPolicy.WhenDeleted)
assert.Equal(t, appv1.RetainPersistentVolumeClaimRetentionPolicyType, spec.PersistentVolumeClaimRetentionPolicy.WhenScaled)
assert.True(t, len(spec.VolumeClaimTemplates) > 0)
assert.True(t, len(spec.Template.Spec.InitContainers) > 0)
assert.NotNil(t, spec.Template.Spec.SecurityContext)
Expand Down
10 changes: 8 additions & 2 deletions pkg/reconciler/isbsvc/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ import (
)

const (
finalizerName = dfv1.ControllerISBSvc
finalizerName = "numaflow.numaproj.io/" + dfv1.ControllerISBSvc
// TODO: clean up the deprecated finalizer in v1.7
deprecatedFinalizerName = dfv1.ControllerISBSvc
)

// interStepBufferReconciler reconciles an Inter-Step Buffer Service object.
Expand Down Expand Up @@ -97,19 +99,23 @@ func (r *interStepBufferServiceReconciler) reconcile(ctx context.Context, isbSvc
log := logging.FromContext(ctx)
if !isbSvc.DeletionTimestamp.IsZero() {
log.Info("Deleting ISB Service")
if controllerutil.ContainsFinalizer(isbSvc, finalizerName) {
if controllerutil.ContainsFinalizer(isbSvc, finalizerName) || controllerutil.ContainsFinalizer(isbSvc, deprecatedFinalizerName) {
// Finalizer logic should be added here.
if err := installer.Uninstall(ctx, isbSvc, r.client, r.kubeClient, r.config, log, r.recorder); err != nil {
log.Errorw("Failed to uninstall", zap.Error(err))
isbSvc.Status.SetPhase(dfv1.ISBSvcPhaseDeleting, err.Error())
return err
}
controllerutil.RemoveFinalizer(isbSvc, finalizerName)
controllerutil.RemoveFinalizer(isbSvc, deprecatedFinalizerName)
// Clean up metrics
_ = reconciler.ISBSvcHealth.DeleteLabelValues(isbSvc.Namespace, isbSvc.Name)
}
return nil
}
if controllerutil.ContainsFinalizer(isbSvc, deprecatedFinalizerName) { // Remove deprecated finalizer if exists
controllerutil.RemoveFinalizer(isbSvc, deprecatedFinalizerName)
}
if needsFinalizer(isbSvc) {
controllerutil.AddFinalizer(isbSvc, finalizerName)
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/reconciler/isbsvc/installer/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,11 @@ func (r *jetStreamInstaller) createConfigMap(ctx context.Context) error {
func (r *jetStreamInstaller) Uninstall(ctx context.Context) error {
// Clean up metrics
_ = reconciler.JetStreamISBSvcReplicas.DeleteLabelValues(r.isbSvc.Namespace, r.isbSvc.Name)
return r.uninstallPVCs(ctx)
// TODO: (k8s 1.27) Remove this once we deprecate the support for k8s < 1.27
if !dfv1.IsPVCRetentionPolicySupported() {
return r.uninstallPVCs(ctx)
}
return nil
}

func (r *jetStreamInstaller) uninstallPVCs(ctx context.Context) error {
Expand Down
6 changes: 5 additions & 1 deletion pkg/reconciler/isbsvc/installer/native_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,11 @@ func (r *redisInstaller) createStatefulSet(ctx context.Context) error {
func (r *redisInstaller) Uninstall(ctx context.Context) error {
// Clean up metrics
_ = reconciler.RedisISBSvcReplicas.DeleteLabelValues(r.isbSvc.Namespace, r.isbSvc.Name)
return r.uninstallPVCs(ctx)
// TODO: (k8s 1.27) Remove this once we deprecate the support for k8s < 1.27
if !dfv1.IsPVCRetentionPolicySupported() {
return r.uninstallPVCs(ctx)
}
return nil
}

func (r *redisInstaller) uninstallPVCs(ctx context.Context) error {
Expand Down
11 changes: 9 additions & 2 deletions pkg/reconciler/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ import (
)

const (
finalizerName = dfv1.ControllerPipeline
finalizerName = "numaflow.numaproj.io/" + dfv1.ControllerPipeline
// TODO: clean up the deprecated finalizer in v1.7
deprecatedFinalizerName = dfv1.ControllerPipeline

pauseTimestampPath = `/metadata/annotations/numaflow.numaproj.io~1pause-timestamp`
)
Expand Down Expand Up @@ -111,7 +113,7 @@ func (r *pipelineReconciler) reconcile(ctx context.Context, pl *dfv1.Pipeline) (
log := logging.FromContext(ctx)
if !pl.DeletionTimestamp.IsZero() {
log.Info("Deleting pipeline")
if controllerutil.ContainsFinalizer(pl, finalizerName) {
if controllerutil.ContainsFinalizer(pl, finalizerName) || controllerutil.ContainsFinalizer(pl, deprecatedFinalizerName) {
if time.Now().Before(pl.DeletionTimestamp.Add(time.Duration(pl.GetTerminationGracePeriodSeconds()) * time.Second)) {
safeToDelete, err := r.safeToDelete(ctx, pl)
if err != nil {
Expand All @@ -135,6 +137,7 @@ func (r *pipelineReconciler) reconcile(ctx context.Context, pl *dfv1.Pipeline) (

}
controllerutil.RemoveFinalizer(pl, finalizerName)
controllerutil.RemoveFinalizer(pl, deprecatedFinalizerName)
// Clean up metrics
_ = reconciler.PipelineHealth.DeleteLabelValues(pl.Namespace, pl.Name)
// Delete corresponding vertex metrics
Expand All @@ -155,6 +158,10 @@ func (r *pipelineReconciler) reconcile(ctx context.Context, pl *dfv1.Pipeline) (
pl.Status.InitConditions()
pl.Status.SetObservedGeneration(pl.Generation)

if controllerutil.ContainsFinalizer(pl, deprecatedFinalizerName) { // Remove deprecated finalizer if exists
controllerutil.RemoveFinalizer(pl, deprecatedFinalizerName)
}

if !controllerutil.ContainsFinalizer(pl, finalizerName) {
controllerutil.AddFinalizer(pl, finalizerName)
}
Expand Down
13 changes: 11 additions & 2 deletions ui/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,19 @@
"@monaco-editor/react": "^4.5.2",
"@mui/icons-material": "^5.6.2",
"@mui/material": "^5.6.3",
"@mui/x-date-pickers": "^7.21.0",
"@mui/x-date-pickers": "^7.23.2",
"@stardazed/streams-polyfill": "^2.4.0",
"@testing-library/jest-dom": "^6.1.4",
"@testing-library/react": "^14.0.0",
"@testing-library/user-event": "^14.5.1",
"@types/d3-selection": "^3.0.2",
"@types/dagre": "^0.7.47",
"@types/jest": "^27.0.1",
"@types/jquery": "^3.5.32",
"@types/lodash": "^4.14.195",
"@types/node": "^16.7.13",
"@types/react": "^18.0.0",
"@types/react-bootstrap-daterangepicker": "^7.0.0",
"@types/react-dom": "^18.0.0",
"@types/react-router-dom": "^5.3.3",
"@types/react-test-renderer": "^18.0.0",
Expand All @@ -48,15 +50,22 @@
"@visx/responsive": "^2.8.0",
"@visx/shape": "^2.4.0",
"@visx/tooltip": "^2.8.0",
"bootstrap": "^5.3.3",
"bootstrap-daterangepicker": "^3.1.0",
"d3-color": "^3.1.0",
"d3-scale": "^4.0.2",
"d3-selection": "^3.0.0",
"dagre": "^0.8.5",
"date-fns": "^4.1.0",
"dayjs": "^1.11.13",
"moment": "^2.29.4",
"jquery": "^3.7.1",
"moment": "^2.30.1",
"monaco-editor": "0.40.0",
"msw": "^0.47.4",
"react": "^18.0.0",
"react-bootstrap-daterangepicker": "^8.0.0",
"react-datetime": "^3.3.1",
"react-datetime-picker": "^6.0.1",
"react-dom": "^18.0.0",
"react-highlight-words": "^0.18.0",
"react-json-view": "^1.21.3",
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 76c2371

Please sign in to comment.