Skip to content

Commit

Permalink
Update mapper examples
Browse files Browse the repository at this point in the history
Signed-off-by: Sreekanth <[email protected]>
  • Loading branch information
BulkBeing committed Feb 2, 2024
1 parent 2393ff4 commit e9ac3f3
Show file tree
Hide file tree
Showing 11 changed files with 81 additions and 142 deletions.
1 change: 1 addition & 0 deletions examples/map-cat/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
target/
2 changes: 1 addition & 1 deletion examples/map-cat/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ name = "server"
path = "src/main.rs"

[dependencies]
tonic = "0.9"
tonic = "0.10.2"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", branch = "main" }
4 changes: 2 additions & 2 deletions examples/map-cat/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM rust:1.70 as build
FROM rust:1.75-bookworm as build

RUN apt-get update
RUN apt-get install protobuf-compiler -y
Expand All @@ -16,7 +16,7 @@ COPY ./Cargo.lock ./Cargo.lock
RUN cargo build --release

# our final base
FROM rust
FROM debian:bookworm

# copy the build artifact from the build stage
COPY --from=build /examples/target/release/server .
Expand Down
42 changes: 13 additions & 29 deletions examples/map-cat/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,20 @@
use numaflow::map::start_uds_server;
use numaflow::map;
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let map_handler = cat::Cat::new();

start_uds_server(map_handler).await?;

Ok(())
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
map::Server::new(Cat).start().await
}

pub(crate) mod cat {
pub(crate) struct Cat {}

impl Cat {
pub(crate) fn new() -> Self {
Self {}
}
}

use numaflow::map;
struct Cat;

#[tonic::async_trait]
impl map::Mapper for Cat {
async fn map<T>(&self, input: T) -> Vec<map::Message>
where
T: map::Datum + Send + Sync + 'static,
{
vec![map::Message {
keys: input.keys().clone(),
value: input.value().clone(),
tags: vec![],
}]
}
#[tonic::async_trait]
impl map::Mapper for Cat {
async fn map(&self, input: map::MapRequest) -> Vec<map::Message> {
vec![map::Message {
keys: input.keys,
value: input.value,
tags: vec![],
}]
}
}
1 change: 1 addition & 0 deletions examples/map-tickgen-serde/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
target/
2 changes: 1 addition & 1 deletion examples/map-tickgen-serde/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ name = "server"
path = "src/main.rs"

[dependencies]
tonic = "0.9"
tonic = "0.10.2"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
serde = { version = "1.0.103", features = ["derive"] }
serde_json = "1.0.103"
Expand Down
4 changes: 2 additions & 2 deletions examples/map-tickgen-serde/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM rust:1.70 as build
FROM rust:1.75-bookworm as build

RUN apt-get update
RUN apt-get install protobuf-compiler -y
Expand All @@ -16,7 +16,7 @@ COPY ./Cargo.lock ./Cargo.lock
RUN cargo build --release

# our final base
FROM rust
FROM debian:bookworm

# copy the build artifact from the build stage
COPY --from=build /examples/target/release/server .
Expand Down
125 changes: 40 additions & 85 deletions examples/map-tickgen-serde/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,96 +1,51 @@
use numaflow::map::start_uds_server;
use numaflow::map;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let map_handler = tickgen::TickGen::new();

start_uds_server(map_handler).await?;

Ok(())
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
map::Server::new(TickGen).start().await
}

pub(crate) mod tickgen {
use chrono::{SecondsFormat, TimeZone, Utc};
use numaflow::map;
use numaflow::function::{Datum, Message, Metadata};
use serde::Serialize;
use tokio::sync::mpsc::Receiver;

pub(crate) struct TickGen {}

#[derive(serde::Deserialize)]
struct Data {
value: u64,
}

#[derive(serde::Deserialize)]
struct Payload {
#[serde(rename = "Data")]
data: Data,
#[serde(rename = "Createdts")]
created_ts: i64,
}

#[cfg(test)]
mod tests {
use super::*;
use chrono::{SecondsFormat, TimeZone, Utc};
use chrono::{SecondsFormat, TimeZone, Utc};
use serde::Serialize;

#[test]
fn deserialize() {
let input = r#"{"Data":{"value":5},"Createdts":1689723721606016637}"#;
let payload: Payload = serde_json::from_str(input).unwrap();
assert_eq!(payload.data.value, 5);
assert_eq!(payload.created_ts, 1689723721606016637);
}
struct TickGen;

#[test]
fn to_rfc3339nanos() {
let input = r#"{"Data":{"value":5},"Createdts":1689723721606016637}"#;
let payload: Payload = serde_json::from_str(input).unwrap();
assert_eq!(
Utc.timestamp_nanos(payload.created_ts)
.to_rfc3339_opts(SecondsFormat::Nanos, true),
"2023-07-18T23:42:01.606016637Z"
);
}
}
#[derive(serde::Deserialize)]
struct Data {
value: u64,
}

impl TickGen {
pub(crate) fn new() -> Self {
Self {}
}
}
#[derive(serde::Deserialize)]
struct Payload {
#[serde(rename = "Data")]
data: Data,
#[serde(rename = "Createdts")]
created_ts: i64,
}

#[derive(Serialize)]
struct ResultPayload {
value: u64,
time: String,
}
#[derive(Serialize)]
struct ResultPayload {
value: u64,
time: String,
}

#[tonic::async_trait]
impl map::Mapper for TickGen {
async fn map<T: map::Datum + Send + Sync + 'static>(
&self,
input: T,
) -> Vec<map::Message> {
let value = input.value();
if let Ok(payload) = serde_json::from_slice::<Payload>(value) {
let ts = Utc
.timestamp_nanos(payload.created_ts)
.to_rfc3339_opts(SecondsFormat::Nanos, true);
vec![map::Message {
keys: input.keys().clone(),
value: serde_json::to_vec(&ResultPayload {
value: payload.data.value,
time: ts,
})
.unwrap_or(vec![]),
tags: vec![],
}]
} else {
vec![]
}
}
#[tonic::async_trait]
impl map::Mapper for TickGen {
async fn map(&self, input: map::MapRequest) -> Vec<map::Message> {
let Ok(payload) = serde_json::from_slice::<Payload>(&input.value) else {
return vec![];
};
let ts = Utc
.timestamp_nanos(payload.created_ts)
.to_rfc3339_opts(SecondsFormat::Nanos, true);
vec![map::Message {
keys: input.keys,
value: serde_json::to_vec(&ResultPayload {
value: payload.data.value,
time: ts,
})
.unwrap_or_default(),
tags: vec![],
}]
}
}
2 changes: 1 addition & 1 deletion examples/sideinput-udf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ name = "server"
path = "src/main.rs"

[dependencies]
tonic = "0.9"
tonic = "0.10.2"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", branch = "main" }
chrono = "0.4.30"
Expand Down
6 changes: 4 additions & 2 deletions examples/sideinput-udf/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM rust:1.70 as build
FROM rust:1.75-bookworm as build

RUN apt-get update
RUN apt-get install protobuf-compiler -y
Expand All @@ -16,10 +16,12 @@ COPY ./Cargo.lock ./Cargo.lock
RUN cargo build --release

# our final base
FROM rust
FROM debian:bookworm

# copy the build artifact from the build stage
COPY --from=build /examples/target/release/server .

RUN mkdir -p /var/numaflow/side-inputs

# set the startup command to run your binary
CMD ["./server"]
34 changes: 15 additions & 19 deletions examples/sideinput-udf/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use notify::{Watcher, RecursiveMode, Result};
use numaflow::map::{Mapper,Message,start_uds_server,Datum};
use notify::{RecursiveMode, Result, Watcher};
use numaflow::map::{MapRequest, Mapper, Message, Server};
use std::path::Path;
use tonic::{async_trait};
use tokio::spawn;
use tonic::async_trait;

const DIR_PATH: &str = "/var/numaflow/side-inputs";
struct UdfMapper {}

struct UdfMapper;

#[async_trait]
impl Mapper for UdfMapper {
async fn map<T: Datum + Send + Sync + 'static>(&self, _request:T) -> Vec<Message> {
async fn map(&self, _input: MapRequest) -> Vec<Message> {
let message = Message {
keys: vec![],
value: b"some_value".to_vec(),
Expand All @@ -17,30 +19,24 @@ impl Mapper for UdfMapper {
vec![message]
}
}

#[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
async fn main() -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Spawn the file watcher task
spawn(async {
match file_watcher().await {
Ok(_) => println!("File watcher is running"),
Err(e) => println!("File watcher error: {:?}", e),
}
});

let udf_map=UdfMapper{};
start_uds_server(udf_map).await?;

Ok(())
Server::new(UdfMapper).start().await
}


async fn file_watcher() -> Result<()>{
let mut watcher = notify::recommended_watcher(|res| {
match res {
Ok(event) => println!("event: {:?}", event),
Err(e) => println!("watch error: {:?}", e),
}
async fn file_watcher() -> Result<()> {
let mut watcher = notify::recommended_watcher(|res| match res {
Ok(event) => println!("event: {:?}", event),
Err(e) => println!("watch error: {:?}", e),
})?;
watcher.watch(Path::new(DIR_PATH), RecursiveMode::Recursive)?;
Ok(())
}
}

0 comments on commit e9ac3f3

Please sign in to comment.