Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: minor updates on docs and examples #87

Merged
merged 1 commit into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Rust SDK for Numaflow

This SDK provides the interface for writing [User Defined Sources](https://numaflow.numaproj.io/user-guide/sources/user-defined-sources/), [UDFs](https://numaflow.numaproj.io/user-guide/user-defined-functions/user-defined-functions/)
This SDK provides the interface for writing [User Defined Sources](https://numaflow.numaproj.io/user-guide/sources/user-defined-sources/),
[User Defined Source Tranformers](https://numaflow.numaproj.io/user-guide/sources/transformer/),
[UDFs](https://numaflow.numaproj.io/user-guide/user-defined-functions/user-defined-functions/)
and [User Defined Sinks](https://numaflow.numaproj.io/user-guide/sinks/user-defined-sinks/) in [Rust](https://www.rust-lang.org/).

> This rust crate is being actively developed and it supports
Expand All @@ -9,6 +11,4 @@ and [User Defined Sinks](https://numaflow.numaproj.io/user-guide/sinks/user-defi

## Examples

You may find examples in the [examples folder](./examples).


You may find examples in the [examples folder](./examples).
2 changes: 1 addition & 1 deletion examples/batchmap-cat/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ WORKDIR /numaflow-rs/examples/batchmap-cat
RUN cargo build --release

# our final base
FROM debian:bookworm AS map-cat
FROM debian:bookworm AS batchmap-cat

# copy the build artifact from the build stage
COPY --from=build /numaflow-rs/examples/batchmap-cat/target/release/server .
Expand Down
8 changes: 3 additions & 5 deletions examples/batchmap-cat/manifests/simple-batchmap-cat.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: rust-batchmap-cat
name: simple-batchmap-cat
spec:
vertices:
- name: in
Expand All @@ -21,11 +21,9 @@ spec:
- name: out
sink:
# A simple log printing sink
log: { }
log: {}
edges:
- from: in
to: cat
- from: cat
to: out


to: out
6 changes: 1 addition & 5 deletions examples/batchmap-cat/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@ impl batchmap::BatchMapper for Cat {
let mut responses: Vec<BatchResponse> = Vec::new();
while let Some(datum) = input.recv().await {
let mut response = BatchResponse::from_id(datum.id);
response.append(Message {
keys: Some(datum.keys),
value: datum.value,
tags: None,
});
response.append(Message::new(datum.value).keys(datum.keys.clone()));
responses.push(response);
}
responses
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: rust-batchmap-batchmap
name: simple-batchmap-flatmap
spec:
vertices:
- name: in
Expand All @@ -21,11 +21,9 @@ spec:
- name: out
sink:
# A simple log printing sink
log: { }
log: {}
edges:
- from: in
to: cat
- from: cat
to: out


to: out
6 changes: 1 addition & 5 deletions examples/batchmap-flatmap/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,7 @@ impl batchmap::BatchMapper for Flatmap {

// return the resulting parts
for part in parts {
response.append(Message {
keys: Some(datum.keys.clone()),
value: Vec::from(part),
tags: None,
});
response.append(Message::new(Vec::from(part)).keys(datum.keys.clone()));
}
responses.push(response);
}
Expand Down
8 changes: 3 additions & 5 deletions examples/map-cat/manifests/simple-map-cat.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: simple-rust-map-cat
name: simple-map-cat
spec:
vertices:
- name: in
Expand All @@ -21,11 +21,9 @@ spec:
- name: out
sink:
# A simple log printing sink
log: { }
log: {}
edges:
- from: in
to: cat
- from: cat
to: out


to: out
3 changes: 1 addition & 2 deletions examples/map-cat/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ struct Cat;
#[tonic::async_trait]
impl map::Mapper for Cat {
async fn map(&self, input: map::MapRequest) -> Vec<map::Message> {
let message = map::Message::new(input.value).keys(input.keys).tags(vec![]);
vec![message]
vec![map::Message::new(input.value).keys(input.keys.clone())]
}
}
6 changes: 3 additions & 3 deletions examples/map-tickgen-serde/manifests/simple-map-udf.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: simple-rust-tickgen-serde
name: simple-tickgen-serde
spec:
vertices:
- name: in
Expand All @@ -21,9 +21,9 @@ spec:
- name: out
sink:
# A simple log printing sink
log: { }
log: {}
edges:
- from: in
to: tickgen-value
- from: tickgen-value
to: out
to: out
5 changes: 2 additions & 3 deletions examples/map-tickgen-serde/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ struct ResultPayload {

#[tonic::async_trait]
impl map::Mapper for TickGen {
async fn map(&self, input: map::MapRequest) -> Vec<map::Message> {
async fn map(&self, input: map::MapRequest) -> Vec<Message> {
let Ok(payload) = serde_json::from_slice::<Payload>(&input.value) else {
return vec![];
};
Expand All @@ -45,8 +45,7 @@ impl map::Mapper for TickGen {
})
.unwrap_or_default(),
)
.keys(input.keys)
.tags(vec![]);
.keys(input.keys.clone());
vec![message]
}
}
18 changes: 11 additions & 7 deletions examples/mapt-event-time-filter/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ use filter_impl::filter_event_time;
use numaflow::sourcetransform;
use numaflow::sourcetransform::{Message, SourceTransformRequest};
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
sourcetransform::Server::new(EventTimeFilter).start().await
}

struct EventTimeFilter;

#[tonic::async_trait]
impl sourcetransform::SourceTransformer for EventTimeFilter {
/// Asynchronously transforms input messages based on their event time.
Expand Down Expand Up @@ -43,53 +46,54 @@ mod tests {
use numaflow::sourcetransform::{Message, SourceTransformRequest};
/// Tests that events from 2022 are tagged as within the year 2022.
#[test]
fn test_filter_event_time_should_return_after_year_2022() {
fn test_filter_event_time_should_return_within_year_2022() {
let time = Utc.with_ymd_and_hms(2022, 7, 2, 2, 0, 0).unwrap();

let source_request = SourceTransformRequest {
keys: vec![],
value: vec![],
watermark: Default::default(),
eventtime: time,
headers: Default::default(),
};

let messages = filter_event_time(source_request);
assert_eq!((&messages).len(), 1);

assert_eq!((&messages).len(), 1);
assert_eq!((&messages)[0].tags.as_ref().unwrap()[0], "within_year_2022")
}

/// Tests that events from 2023 are tagged as after the year 2022.
#[test]
fn test_filter_event_time_should_return_within_year_2022() {
fn test_filter_event_time_should_return_after_year_2022() {
let time = Utc.with_ymd_and_hms(2023, 7, 2, 2, 0, 0).unwrap();

let source_request = SourceTransformRequest {
keys: vec![],
value: vec![],
watermark: Default::default(),
eventtime: time,
headers: Default::default(),
};

let messages = filter_event_time(source_request);
assert_eq!((&messages).len(), 1);

assert_eq!((&messages).len(), 1);
assert_eq!((&messages)[0].tags.as_ref().unwrap()[0], "after_year_2022")
}

/// Tests that events before 2022 are dropped.
#[test]
fn test_filter_event_time_should_drop() {
let time = Utc.with_ymd_and_hms(2021, 7, 2, 2, 0, 0).unwrap();

let source_request = SourceTransformRequest {
keys: vec![],
value: vec![],
watermark: Default::default(),
eventtime: time,
headers: Default::default(),
};

let messages = filter_event_time(source_request);

assert_eq!((&messages).len(), 1);
assert_eq!((&messages)[0].tags.as_ref().unwrap()[0], "U+005C__DROP__")
}
Expand Down
7 changes: 3 additions & 4 deletions examples/reduce-counter/manifests/simple-reduce.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: simple-rust-reduce
name: simple-reduce
spec:
vertices:
- name: in
Expand Down Expand Up @@ -33,10 +33,9 @@ spec:
- name: out
sink:
# A simple log printing sink
log: { }
log: {}
edges:
- from: in
to: counter
- from: counter
to: out

to: out
5 changes: 1 addition & 4 deletions examples/reduce-counter/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,7 @@ mod counter {
while input.recv().await.is_some() {
counter += 1;
}
let message = Message::new(counter.to_string().into_bytes())
.tags(vec![])
.keys(keys.clone());
vec![message]
vec![Message::new(counter.to_string().into_bytes()).keys(keys.clone())]
}
}
}
4 changes: 2 additions & 2 deletions examples/sideinput/manifests/simple-sideinput.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: my-pipeline
name: simple-sideinput
spec:
sideInputs:
- name: myticker
Expand Down Expand Up @@ -32,7 +32,7 @@ spec:
- name: out
sink:
# A simple log printing sink
log: { }
log: {}
edges:
- from: in
to: si-log
Expand Down
4 changes: 2 additions & 2 deletions examples/simple-source/manifests/simple-source.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: kafka-pl
name: simple-source
spec:
vertices:
- name: in
Expand All @@ -16,7 +16,7 @@ spec:
scale:
min: 1
sink:
log: { }
log: {}
edges:
- from: in
to: out
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: simple-rust-log-sink
name: simple-log-sink
spec:
vertices:
- name: in
Expand All @@ -19,4 +19,4 @@ spec:
image: quay.io/numaio/numaflow-rs/sink-log:stable
edges:
- from: in
to: out
to: out
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: simple-rust-transformer
name: simple-source-transformer
spec:
vertices:
- name: in
Expand All @@ -19,9 +19,7 @@ spec:
- name: out
sink:
# A simple log printing sink
log: { }
log: {}
edges:
- from: in
to: out


to: out
5 changes: 1 addition & 4 deletions examples/source-transformer-now/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ impl sourcetransform::SourceTransformer for NowCat {
&self,
input: sourcetransform::SourceTransformRequest,
) -> Vec<sourcetransform::Message> {
let message = sourcetransform::Message::new(input.value, chrono::offset::Utc::now())
.keys(input.keys)
.tags(vec![]);
vec![message]
vec![sourcetransform::Message::new(input.value, chrono::offset::Utc::now()).keys(input.keys.clone())]
}
}
6 changes: 4 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! A Rust SDK for [Numaflow]. The Rust SDK is experimental has only implemented the most important
//! features. It will support all the core features eventually. It supports [Map], [Reduce], and
//! [User Defined Sinks].
//! features.
//! It will support all the core features eventually.
//! It supports [Map], [Reduce], [User Defined Sources], [User Defined Source Transformer] and [User Defined Sinks].
//!
//! Please note that the Rust SDK is experimental and will be refactored in the future to make it more
//! idiomatic.
Expand All @@ -9,6 +10,7 @@
//! [Map]: https://numaflow.numaproj.io/user-guide/user-defined-functions/map/map/
//! [Reduce]: https://numaflow.numaproj.io/user-guide/user-defined-functions/reduce/reduce/
//! [User Defined Sources]: https://numaflow.numaproj.io/user-guide/sources/user-defined-sources/
//! [User Defined Source Transformer]: https://numaflow.numaproj.io/user-guide/sources/transformer/
//! [User Defined Sinks]: https://numaflow.numaproj.io/user-guide/sinks/user-defined-sinks/

/// start up code
Expand Down
Loading