Skip to content

Files

This branch is up to date with numaproj/numaflow:main.

examples

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
May 3, 2024
Oct 7, 2022
May 13, 2023
Jun 19, 2024
Oct 7, 2022
Oct 27, 2024
May 31, 2024
May 31, 2024
Aug 15, 2023
May 31, 2024
May 31, 2024
May 31, 2024
May 31, 2024
May 31, 2024
Jul 17, 2024
May 31, 2024
Jun 9, 2023
Dec 4, 2024
Jun 1, 2022
Jun 1, 2022
Oct 4, 2022
May 31, 2024
May 31, 2024
Jun 5, 2024
May 31, 2024
Jul 11, 2023
Apr 23, 2024

Numaflow by Example

Welcome to the Numaflow Community! This document provides an example-by-example guide to using Numaflow.

If you haven't already, install Numaflow by following the QUICK START instructions.

The top-level abstraction in Numaflow is the Pipeline. A Pipeline consists of a set of vertices connected by edges. A vertex can be a source, sink, or processing vertex. In the example below, we have a source vertex named in that generates messages at a specified rate, a sink vertex named out that logs messages, and a processing vertex named cat that produces any input message as output. Lastly, there are two edges, one connecting the in to the cat vertex and another connecting the cat to the out vertex. The resulting pipeline simply copies internally generated messages to the log.

apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
  name: simple-pipeline
spec:
  vertices:
    - name: in
      source:
        # Generate 5 messages every second
        # xxx what does rpu stand for? how large are the messages by default, what data is contained in the message? rename duration to interval?
        generator:
          rpu: 5
          duration: 1s
    - name: cat
      udf: # A user-defined function
        builtin: # Use a built-in function as the udf
          name: cat # cats the message
    - name: out
      sink:
        # Output message to the stdout log
        log: {}

  # in -> cat -> out
  edges:
    - from: in
      to: cat
    - from: cat
      to: out

Below we have a simple variation on the above example that takes input from an http endpoint.

apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
  name: http-pipeline
spec:
  vertices:
    - name: in
      source:
        http:
          # Whether to create a ClusterIP Service, defaults to false
          service: true
          # Optional bearer token auth
    #         auth:
    #           # A secret selector pointing to the secret contains token
    #           token:
    #             name: my-secret
    #             key: my-key
    - name: cat
      udf:
        builtin:
          name: cat # A built-in UDF which simply cats the message
    - name: out
      sink:
        # A simple log printing sink
        log: {}
  edges:
    - from: in
      to: cat
    - from: cat
      to: out

Let's modify the UDF in the first example to pass-through only messages with an id less than 100

apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
  name: filter-pipeline
spec:
  vertices:
    - name: in
      source:
        generator:
          rpu: 5
          duration: 1s
    - name: filter
      udf:
        builtin:
          name: filter
          kwargs:
            expression: int(json(payload).id) < 100
    - name: out
      sink:
        log: {}
  edges:
    - from: in
      to: filter
    - from: filter
      to: out