-
The question is: how can we debug this situation? Please let me know if anyone has any ideas on how to analyze such a bug.
If I delete pipeline in Numaflow Web UI. And create the same pipepeline again. It works fine. |
Beta Was this translation helpful? Give feedback.
Replies: 6 comments 3 replies
-
|
Beta Was this translation helpful? Give feedback.
-
Currently I am on numalfow v 1.4.0. ISB Spec: apiVersion: numaflow.numaproj.io/v1alpha1
kind: InterStepBufferService
metadata:
name: default
namespace: inqu-analytics
spec:
jetstream:
replicas: 1
settings: |
max_payload: 8388608
max_memory_store: 8073741824
version: 2.10.17
persistence:
volumeSize: 1Gi
#storageClassName: local-path # Optional, will use K8s cluster default storage class if not specified
bufferConfig: |
# The properties of the buffers (streams) to be created in this JetStream service
stream:
# 0: Limits, 1: Interest, 2: WorkQueue
retention: 2
maxAge: 1h
# 0: File, 1: Memory
storage: 0
maxMsgs: 1000000
# The consumer properties for the created streams
consumer:
ackWait: 1h
maxAckPending: 1000000
replicas: 1
storage: 1
otBucket:
maxValueSize: 0
history: 1
ttl: 24h
maxBytes: 0
storage: 1 ### <- this switch to RAM only mode
replicas: 1 ### <- this does not replicate data
procBucket:
maxValueSize: 0
history: 1
ttl: 24h
maxBytes: 0
storage: 1 ### <- this switch to RAM only mode
replicas: 1 ### <- this does not replicate data and my pipeline apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: numaflow-pipeline
namespace: inqu-analytics
spec:
limits:
bufferMaxLength: 1000000
vertices:
- name: source1
scale:
disabled: true
source:
udsource:
container:
image: drswvs-proget1.rosen-nxt.com/docker-inqu/inqu-numa-source:0.0.58-dev # {"$imagepolicy": "flux-system:inqu-numa-source"}
imagePullPolicy: Always
env:
- name: CONFIGURATION_PREFECT_BLOCK
value: "pipeline-source1-configuration"
- name: PREFECT_LOGGING_LEVEL
value: "WARNING"
- name: source2
scale:
disabled: true
source:
udsource:
container:
image: drswvs-proget1.rosen-nxt.com/docker-inqu/inqu-numa-source:0.0.58-dev # {"$imagepolicy": "flux-system:inqu-numa-source"}
imagePullPolicy: Always
env:
- name: CONFIGURATION_PREFECT_BLOCK
value: "pipeline-source2-configuration"
- name: PREFECT_LOGGING_LEVEL
value: "WARNING"
# limits:
# readBatchSize:
- name: nats-endpoint
source:
nats:
url: nats:4222
subject: analytics
queue: analytics
auth:
token:
name: numaflow-token
key: token
# transformer:
# container:
# image: drswvs-proget1.rosen-nxt.com/docker-inqu/inqu-numa-transform:0.0.58-feature_numaflow
# imagePullPolicy: Always
# limits:
# readBatchSize: 1
- name: http-endpoint
source:
http:
service: True
auth:
token:
name: numaflow-token
key: token
- name: merge
partitions: 1
scale:
disabled: true
udf:
container:
image: drswvs-proget1.rosen-nxt.com/docker-inqu/inqu-numa-merge:0.0.58-dev # {"$imagepolicy": "flux-system:inqu-numa-merge"}
imagePullPolicy: Always
env:
- name: MAX_BATCH_SIZE
value: "1000"
groupBy:
window:
fixed:
length: 5s
streaming: true # set streaming to true to enable reduce streamer
storage:
persistentVolumeClaim:
volumeSize: 1Gi
accessMode: ReadWriteOnce
- name: map
scale:
disabled: true
udf:
container:
image: drswvs-proget1.rosen-nxt.com/docker-inqu/inqu-numa-map:0.0.58-dev # {"$imagepolicy": "flux-system:inqu-numa-map"}
imagePullPolicy: Always
env:
- name: INQU_LOGGING_LEVEL
value: "ERROR"
- name: sink
scale:
disabled: true
sink:
udsink:
container:
image: drswvs-proget1.rosen-nxt.com/docker-inqu/inqu-numa-sink:0.0.58-dev # {"$imagepolicy": "flux-system:inqu-numa-sink"}
imagePullPolicy: Always
env:
- name: PREFECT_LOGGING_LEVEL
value: "WARNING"
fallback:
log: {}
- name: out-log
scale:
disabled: true
sink:
log: {}
edges:
- from: nats-endpoint
to: merge
onFull: discardLatest
- from: source1
to: map
onFull: discardLatest
- from: http-endpoint
to: map
onFull: discardLatest
- from: source2
to: map
onFull: discardLatest
- from: merge
to: map
onFull: discardLatest
- from: map
to: sink
conditions:
tags:
values:
- dwh
onFull: discardLatest
- from: map
to: out-log
conditions:
tags:
values:
- unknown
onFull: discardLatest
watermark:
idleSource:
threshold: 15s # The pipeline will be considered idle if the source has not emitted any data for given threshold value.
incrementBy: 3s # If source is found to be idle then increment the watermark by given incrementBy value.
stepInterval: 2s # If source is idling then publish the watermark only when step interval has passed. |
Beta Was this translation helpful? Give feedback.
-
Can you check the status of the |
Beta Was this translation helpful? Give feedback.
-
Status of my pv and pvc after cluster restart is
pvc-describe
default storage class: apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
annotations:
defaultVolumeType: local
objectset.rio.cattle.io/id: ""
objectset.rio.cattle.io/owner-gvk: k3s.cattle.io/v1, Kind=Addon
objectset.rio.cattle.io/owner-name: local-storage
objectset.rio.cattle.io/owner-namespace: kube-system
storageclass.kubernetes.io/is-default-class: "true"
creationTimestamp: "2025-01-14T14:05:57Z"
labels:
objectset.rio.cattle.io/hash: 183f35c65ffbc3064603f43f1580d8c68a2dabd4
name: local-path
resourceVersion: "281"
uid: 1a592580-829a-42e7-b6fb-6b36a648f492
provisioner: rancher.io/local-path
reclaimPolicy: Delete
volumeBindingMode: WaitForFirstConsumer
|
Beta Was this translation helpful? Give feedback.
-
This seems to be a jetstream problem: when it's not running in cluster mode (single node), if the stream or k/v bucket uses RAM storage mode, after restarting, the stream or k/v bucket will be lost. What you can do is, either use cluster mode (e.g. 3 replicas), or use file storage with 1 replica. |
Beta Was this translation helpful? Give feedback.
-
Thanks for your suggestions. Both suggestions have worked and What I do not understand is why the memory storage doesn´t work with non cluster ISB Service. Is it a Bug in Nats JetStream or is there any reason behind.
apiVersion: numaflow.numaproj.io/v1alpha1
kind: InterStepBufferService
metadata:
name: default
namespace: inqu-analytics
spec:
jetstream:
replicas: 1
persistence:
volumeSize: 1Gi
#storageClassName: local-path # Optional, will use K8s cluster default storage class if not specified
bufferConfig: |
# The properties of the buffers (streams) to be created in this JetStream service
stream:
# 0: File, 1: Memory
storage: 0
consumer:
replicas: 1
storage: 1 ### <- this switch to file storage
otBucket:
storage: 1 ### <- this switch to file storage
replicas: 1 ### <- this does not replicate data
procBucket:
storage: 1 ### <- this switch to file storage
replicas: 1 ### <- this does not replicate data
apiVersion: numaflow.numaproj.io/v1alpha1
kind: InterStepBufferService
metadata:
name: default
namespace: inqu-analytics
spec:
jetstream:
replicas: 3 ### <- 3 replicas cluster
persistence:
volumeSize: 1Gi
#storageClassName: local-path # Optional, will use K8s cluster default storage class if not specified
bufferConfig: |
# The properties of the buffers (streams) to be created in this JetStream service
stream:
# 0: File, 1: Memory
storage: 0
consumer:
replicas: 0
storage: 1 ### <- this switch to file storage
otBucket:
storage: 0 ### <- this switch to file storage
replicas: 1 ### <- this does not replicate data
procBucket:
storage: 0 ### <- this switch to file storage
replicas: 1 ### <- this does not replicate data |
Beta Was this translation helpful? Give feedback.
Thanks for your suggestions. Both suggestions have worked and
ISB Settings for both of them I have put bellow for other users to follow.
What I do not understand is why the memory storage doesn´t work with non cluster ISB Service. Is it a Bug in Nats JetStream or is there any reason behind.