Skip to content

Commit

Permalink
Merge branch 'main' into mvtx-adapt
Browse files Browse the repository at this point in the history
  • Loading branch information
kohlisid authored Dec 28, 2024
2 parents b1a749c + bb4a0de commit 30de2f1
Show file tree
Hide file tree
Showing 52 changed files with 3,487 additions and 271 deletions.
2 changes: 1 addition & 1 deletion api/json-schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -21764,7 +21764,7 @@
"description": "BackOff specifies the parameters for the backoff strategy, controlling how delays between retries should increase."
},
"onFailure": {
"description": "OnFailure specifies the action to take when a retry fails. The default action is to retry.",
"description": "OnFailure specifies the action to take when the specified retry strategy fails. The possible values are: 1. \"retry\": start another round of retrying the operation, 2. \"fallback\": re-route the operation to a fallback sink and 3. \"drop\": drop the operation and perform no further action. The default action is to retry.",
"type": "string"
}
},
Expand Down
2 changes: 1 addition & 1 deletion api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -21751,7 +21751,7 @@
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.Backoff"
},
"onFailure": {
"description": "OnFailure specifies the action to take when a retry fails. The default action is to retry.",
"description": "OnFailure specifies the action to take when the specified retry strategy fails. The possible values are: 1. \"retry\": start another round of retrying the operation, 2. \"fallback\": re-route the operation to a fallback sink and 3. \"drop\": drop the operation and perform no further action. The default action is to retry.",
"type": "string"
}
}
Expand Down
7 changes: 5 additions & 2 deletions docs/APIs.md
Original file line number Diff line number Diff line change
Expand Up @@ -9077,8 +9077,11 @@ OnFailureRetryStrategy </a> </em>
<em>(Optional)</em>
<p>

OnFailure specifies the action to take when a retry fails. The default
action is to retry.
OnFailure specifies the action to take when the specified retry strategy
fails. The possible values are: 1. “retry”: start another round of
retrying the operation, 2. “fallback”: re-route the operation to a
fallback sink and 3. “drop”: drop the operation and perform no further
action. The default action is to retry.
</p>

</td>
Expand Down
48 changes: 32 additions & 16 deletions docs/user-guide/sinks/fallback.md
Original file line number Diff line number Diff line change
@@ -1,25 +1,32 @@
# Fallback Sink

A `Fallback` Sink functions as a `Dead Letter Queue (DLQ)` Sink and can be configured to serve as a backup when the primary sink is down,
unavailable, or under maintenance. This is particularly useful when multiple sinks are in a pipeline; if a sink fails, the resulting
back-pressure will back-propagate and stop the source vertex from reading more data. A `Fallback` Sink can beset up to prevent this from happening.
This backup sink stores data while the primary sink is offline. The stored data can be replayed once the primary sink is back online.
A `Fallback` Sink functions as a `Dead Letter Queue (DLQ)` Sink.
It can be configured to serve as a backup sink when the primary sink fails processing messages.

Note: The `fallback` field is optional.
## The Use Case

Users are required to return a fallback response from the [user-defined sink](https://numaflow.numaproj.io/user-guide/sinks/user-defined-sinks/) when the primary sink fails; only
then the messages will be directed to the fallback sink.
Fallback Sink is useful to prevent back pressures caused by failed messages in the primary sink.

Example of a fallback response in a user-defined sink: [here](https://github.com/numaproj/numaflow-go/blob/main/pkg/sinker/examples/fallback/main.go)
In a pipeline without fallback sinks, if a sink fails to process certain messages,
the failed messages, by default, can get retried indefinitely,
causing back pressures propagated all the way back to the source vertex.
Eventually, the pipeline will be blocked, and no new messages will be processed.
A fallback sink can be set up to prevent this from happening, by storing the failed messages in a separate sink.

## CAVEATs
The `fallback` field can only be utilized when the primary sink is a `User Defined Sink.`
## Caveats

A fallback sink can only be configured when the primary sink is a user-defined sink.

## Example
## How to use

### Builtin Kafka
An example using builtin kafka as fallback sink:
To configure a fallback sink,
changes need to be made on both the pipeline specification and the user-defined sink implementation.

### Step 1 - update the specification

Add a `fallback` field to the sink configuration in the pipeline specification file.

The following example uses the builtin kafka as a fallback sink.

```yaml
- name: out
Expand All @@ -34,10 +41,9 @@ An example using builtin kafka as fallback sink:
- my-broker2:19700
topic: my-topic
```
### UD Sink
An example using custom user-defined sink as fallback sink.
User Defined Sink as a fallback sink:
A fallback sink can also be a user-defined sink.
```yaml
- name: out
sink:
Expand All @@ -49,3 +55,13 @@ User Defined Sink as a fallback sink:
container:
image: my-sink:latest
```
### Step 2 - update the user-defined sink implementation
Code changes have to be made in the primary sink to generate either a **failed** response or a **fallback** response,
based on the use case.
* a **failed** response gets processed following the [retry strategy](https://numaflow.numaproj.io/user-guide/sinks/retry-strategy/), and if the retry strategy is set to `fallback`, the message will be directed to the fallback sink after the retries are exhausted.
* a **fallback** response doesn't respect the sink retry strategy. It gets immediately directed to the fallback sink without getting retried.

SDK methods to generate either a fallback or a failed response in a primary user-defined sink can be found here:
[Golang](https://github.com/numaproj/numaflow-go/blob/main/pkg/sinker/types.go), [Java](https://github.com/numaproj/numaflow-java/blob/main/src/main/java/io/numaproj/numaflow/sinker/Response.java), [Python](https://github.com/numaproj/numaflow-python/blob/main/pynumaflow/sinker/_dtypes.py)
8 changes: 5 additions & 3 deletions docs/user-guide/sinks/retry-strategy.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
# Retry Strategy

### Overview

The `RetryStrategy` is used to configure the behavior for a sink after encountering failures during a write operation.
This structure allows the user to specify how Numaflow should respond to different fail-over scenarios for Sinks, ensuring that the writing can be resilient and handle
unexpected issues efficiently.

`RetryStrategy` ONLY gets applied to failed messages. To return a failed messages, use the methods provided by the SDKs.
- `ResponseFailure`for [Golang](https://github.com/numaproj/numaflow-go/blob/main/pkg/sinker/types.go)
- `responseFailure` for [Java](https://github.com/numaproj/numaflow-java/blob/main/src/main/java/io/numaproj/numaflow/sinker/Response.java#L40)
- `as_fallback` for [Python](https://github.com/numaproj/numaflow-python/blob/main/pynumaflow/sinker/_dtypes.py)

### Struct Explanation


`retryStrategy` is optional, and can be added to the Sink spec configurations where retry logic is necessary.



```yaml
sink:
retryStrategy:
Expand Down
11 changes: 11 additions & 0 deletions pkg/apis/numaflow/v1alpha1/deprecated.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,14 @@ func isSidecarSupported() bool {
k8sVersion, _ := strconv.ParseFloat(v, 32)
return k8sVersion >= 1.29
}

// TODO: (k8s 1.27) Remove this once we deprecate the support for k8s < 1.27
func IsPVCRetentionPolicySupported() bool {
v := os.Getenv(EnvK8sServerVersion)
if v == "" {
return true // default to true if the env var is not found
}
// e.g. 1.31
k8sVersion, _ := strconv.ParseFloat(v, 32)
return k8sVersion >= 1.27
}
7 changes: 6 additions & 1 deletion pkg/apis/numaflow/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/apis/numaflow/v1alpha1/jetstream_buffer_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,10 @@ func (j JetStreamBufferService) GetStatefulSetSpec(req GetJetStreamStatefulSetSp
}
j.AbstractPodTemplate.ApplyToPodSpec(podSpec)
spec := appv1.StatefulSetSpec{
PersistentVolumeClaimRetentionPolicy: &appv1.StatefulSetPersistentVolumeClaimRetentionPolicy{
WhenDeleted: appv1.DeletePersistentVolumeClaimRetentionPolicyType,
WhenScaled: appv1.RetainPersistentVolumeClaimRetentionPolicyType,
},
PodManagementPolicy: appv1.ParallelPodManagement,
Replicas: &replicas,
ServiceName: req.ServiceName,
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/numaflow/v1alpha1/jetstream_buffer_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
appv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
)
Expand Down Expand Up @@ -74,6 +75,9 @@ func TestJetStreamGetStatefulSetSpec(t *testing.T) {
},
}
spec := s.GetStatefulSetSpec(req)
assert.NotNil(t, spec.PersistentVolumeClaimRetentionPolicy)
assert.Equal(t, appv1.DeletePersistentVolumeClaimRetentionPolicyType, spec.PersistentVolumeClaimRetentionPolicy.WhenDeleted)
assert.Equal(t, appv1.RetainPersistentVolumeClaimRetentionPolicyType, spec.PersistentVolumeClaimRetentionPolicy.WhenScaled)
assert.True(t, len(spec.VolumeClaimTemplates) > 0)
})

Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/numaflow/v1alpha1/redis_buffer_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,10 @@ redis_exporter`},
nr.AbstractPodTemplate.ApplyToPodSpec(podSpec)

spec := appv1.StatefulSetSpec{
PersistentVolumeClaimRetentionPolicy: &appv1.StatefulSetPersistentVolumeClaimRetentionPolicy{
WhenDeleted: appv1.DeletePersistentVolumeClaimRetentionPolicyType,
WhenScaled: appv1.RetainPersistentVolumeClaimRetentionPolicyType,
},
Replicas: &replicas,
ServiceName: req.ServiceName,
Selector: &metav1.LabelSelector{
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/numaflow/v1alpha1/redis_buffer_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
appv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
)
Expand Down Expand Up @@ -66,6 +67,9 @@ func TestRedisGetStatefulSetSpec(t *testing.T) {
},
}
spec := s.GetStatefulSetSpec(req)
assert.NotNil(t, spec.PersistentVolumeClaimRetentionPolicy)
assert.Equal(t, appv1.DeletePersistentVolumeClaimRetentionPolicyType, spec.PersistentVolumeClaimRetentionPolicy.WhenDeleted)
assert.Equal(t, appv1.RetainPersistentVolumeClaimRetentionPolicyType, spec.PersistentVolumeClaimRetentionPolicy.WhenScaled)
assert.True(t, len(spec.VolumeClaimTemplates) > 0)
assert.True(t, len(spec.Template.Spec.InitContainers) > 0)
assert.NotNil(t, spec.Template.Spec.SecurityContext)
Expand Down
7 changes: 6 additions & 1 deletion pkg/apis/numaflow/v1alpha1/retry_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ type RetryStrategy struct {
// BackOff specifies the parameters for the backoff strategy, controlling how delays between retries should increase.
// +optional
BackOff *Backoff `json:"backoff,omitempty" protobuf:"bytes,1,opt,name=backoff"`
// OnFailure specifies the action to take when a retry fails. The default action is to retry.
// OnFailure specifies the action to take when the specified retry strategy fails.
// The possible values are:
// 1. "retry": start another round of retrying the operation,
// 2. "fallback": re-route the operation to a fallback sink and
// 3. "drop": drop the operation and perform no further action.
// The default action is to retry.
// +optional
// +kubebuilder:default="retry"
OnFailure *OnFailureRetryStrategy `json:"onFailure,omitempty" protobuf:"bytes,2,opt,name=onFailure"`
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/numaflow/v1alpha1/udf.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ func (in UDF) getContainers(req getContainerReq) ([]corev1.Container, []corev1.C

func (in UDF) getMainContainer(req getContainerReq) corev1.Container {
if in.GroupBy == nil {
if req.executeRustBinary {
return containerBuilder{}.init(req).command(NumaflowRustBinary).args("processor", "--type="+string(VertexTypeMapUDF), "--isbsvc-type="+string(req.isbSvcType), "--rust").build()
}
args := []string{"processor", "--type=" + string(VertexTypeMapUDF), "--isbsvc-type=" + string(req.isbSvcType)}
return containerBuilder{}.
init(req).args(args...).build()
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 8 additions & 2 deletions pkg/reconciler/isbsvc/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ import (
)

const (
finalizerName = dfv1.ControllerISBSvc
finalizerName = "numaflow.numaproj.io/" + dfv1.ControllerISBSvc
// TODO: clean up the deprecated finalizer in v1.7
deprecatedFinalizerName = dfv1.ControllerISBSvc
)

// interStepBufferReconciler reconciles an Inter-Step Buffer Service object.
Expand Down Expand Up @@ -97,19 +99,23 @@ func (r *interStepBufferServiceReconciler) reconcile(ctx context.Context, isbSvc
log := logging.FromContext(ctx)
if !isbSvc.DeletionTimestamp.IsZero() {
log.Info("Deleting ISB Service")
if controllerutil.ContainsFinalizer(isbSvc, finalizerName) {
if controllerutil.ContainsFinalizer(isbSvc, finalizerName) || controllerutil.ContainsFinalizer(isbSvc, deprecatedFinalizerName) {
// Finalizer logic should be added here.
if err := installer.Uninstall(ctx, isbSvc, r.client, r.kubeClient, r.config, log, r.recorder); err != nil {
log.Errorw("Failed to uninstall", zap.Error(err))
isbSvc.Status.SetPhase(dfv1.ISBSvcPhaseDeleting, err.Error())
return err
}
controllerutil.RemoveFinalizer(isbSvc, finalizerName)
controllerutil.RemoveFinalizer(isbSvc, deprecatedFinalizerName)
// Clean up metrics
_ = reconciler.ISBSvcHealth.DeleteLabelValues(isbSvc.Namespace, isbSvc.Name)
}
return nil
}
if controllerutil.ContainsFinalizer(isbSvc, deprecatedFinalizerName) { // Remove deprecated finalizer if exists
controllerutil.RemoveFinalizer(isbSvc, deprecatedFinalizerName)
}
if needsFinalizer(isbSvc) {
controllerutil.AddFinalizer(isbSvc, finalizerName)
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/reconciler/isbsvc/installer/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,11 @@ func (r *jetStreamInstaller) createConfigMap(ctx context.Context) error {
func (r *jetStreamInstaller) Uninstall(ctx context.Context) error {
// Clean up metrics
_ = reconciler.JetStreamISBSvcReplicas.DeleteLabelValues(r.isbSvc.Namespace, r.isbSvc.Name)
return r.uninstallPVCs(ctx)
// TODO: (k8s 1.27) Remove this once we deprecate the support for k8s < 1.27
if !dfv1.IsPVCRetentionPolicySupported() {
return r.uninstallPVCs(ctx)
}
return nil
}

func (r *jetStreamInstaller) uninstallPVCs(ctx context.Context) error {
Expand Down
6 changes: 5 additions & 1 deletion pkg/reconciler/isbsvc/installer/native_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,11 @@ func (r *redisInstaller) createStatefulSet(ctx context.Context) error {
func (r *redisInstaller) Uninstall(ctx context.Context) error {
// Clean up metrics
_ = reconciler.RedisISBSvcReplicas.DeleteLabelValues(r.isbSvc.Namespace, r.isbSvc.Name)
return r.uninstallPVCs(ctx)
// TODO: (k8s 1.27) Remove this once we deprecate the support for k8s < 1.27
if !dfv1.IsPVCRetentionPolicySupported() {
return r.uninstallPVCs(ctx)
}
return nil
}

func (r *redisInstaller) uninstallPVCs(ctx context.Context) error {
Expand Down
11 changes: 9 additions & 2 deletions pkg/reconciler/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ import (
)

const (
finalizerName = dfv1.ControllerPipeline
finalizerName = "numaflow.numaproj.io/" + dfv1.ControllerPipeline
// TODO: clean up the deprecated finalizer in v1.7
deprecatedFinalizerName = dfv1.ControllerPipeline

pauseTimestampPath = `/metadata/annotations/numaflow.numaproj.io~1pause-timestamp`
)
Expand Down Expand Up @@ -111,7 +113,7 @@ func (r *pipelineReconciler) reconcile(ctx context.Context, pl *dfv1.Pipeline) (
log := logging.FromContext(ctx)
if !pl.DeletionTimestamp.IsZero() {
log.Info("Deleting pipeline")
if controllerutil.ContainsFinalizer(pl, finalizerName) {
if controllerutil.ContainsFinalizer(pl, finalizerName) || controllerutil.ContainsFinalizer(pl, deprecatedFinalizerName) {
if time.Now().Before(pl.DeletionTimestamp.Add(time.Duration(pl.GetTerminationGracePeriodSeconds()) * time.Second)) {
safeToDelete, err := r.safeToDelete(ctx, pl)
if err != nil {
Expand All @@ -135,6 +137,7 @@ func (r *pipelineReconciler) reconcile(ctx context.Context, pl *dfv1.Pipeline) (

}
controllerutil.RemoveFinalizer(pl, finalizerName)
controllerutil.RemoveFinalizer(pl, deprecatedFinalizerName)
// Clean up metrics
_ = reconciler.PipelineHealth.DeleteLabelValues(pl.Namespace, pl.Name)
// Delete corresponding vertex metrics
Expand All @@ -155,6 +158,10 @@ func (r *pipelineReconciler) reconcile(ctx context.Context, pl *dfv1.Pipeline) (
pl.Status.InitConditions()
pl.Status.SetObservedGeneration(pl.Generation)

if controllerutil.ContainsFinalizer(pl, deprecatedFinalizerName) { // Remove deprecated finalizer if exists
controllerutil.RemoveFinalizer(pl, deprecatedFinalizerName)
}

if !controllerutil.ContainsFinalizer(pl, finalizerName) {
controllerutil.AddFinalizer(pl, finalizerName)
}
Expand Down
24 changes: 23 additions & 1 deletion rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust/numaflow-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ log = "0.4.22"

[dev-dependencies]
tempfile = "3.11.0"
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", rev = "ddd879588e11455921f1ca958ea2b3c076689293" }
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", rev = "9ca9362ad511084501520e5a37d40cdcd0cdc9d9" }
pulsar = { version = "6.3.0", default-features = false, features = ["tokio-rustls-runtime"] }

[build-dependencies]
Loading

0 comments on commit 30de2f1

Please sign in to comment.