Skip to content

Commit

Permalink
feat: Map Streamer (#109)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
  • Loading branch information
yhl25 and vigith authored Dec 20, 2024
1 parent b237ec6 commit c9806a0
Show file tree
Hide file tree
Showing 24 changed files with 1,042 additions and 120 deletions.
2 changes: 1 addition & 1 deletion .rustfmt.toml
Original file line number Diff line number Diff line change
@@ -1 +1 @@
edition = "2021"
edition = "2021"
2 changes: 1 addition & 1 deletion examples/batchmap-cat/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ impl batchmap::BatchMapper for Cat {
let mut responses: Vec<BatchResponse> = Vec::new();
while let Some(datum) = input.recv().await {
let mut response = BatchResponse::from_id(datum.id);
response.append(Message::new(datum.value).keys(datum.keys.clone()));
response.append(Message::new(datum.value).with_keys(datum.keys.clone()));
responses.push(response);
}
responses
Expand Down
2 changes: 1 addition & 1 deletion examples/batchmap-flatmap/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl batchmap::BatchMapper for Flatmap {

// return the resulting parts
for part in parts {
response.append(Message::new(Vec::from(part)).keys(datum.keys.clone()));
response.append(Message::new(Vec::from(part)).with_keys(datum.keys.clone()));
}
responses.push(response);
}
Expand Down
9 changes: 9 additions & 0 deletions examples/flatmap-stream/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[package]
name = "flatmap-stream"
version = "0.1.0"
edition = "2021"

[dependencies]
tonic = "0.12.0"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
numaflow = { path = "../../numaflow" }
20 changes: 20 additions & 0 deletions examples/flatmap-stream/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
FROM rust:1.82-bullseye AS build

RUN apt-get update
RUN apt-get install protobuf-compiler -y

WORKDIR /numaflow-rs
COPY ./ ./
WORKDIR /numaflow-rs/examples/flatmap-stream

# build for release
RUN cargo build --release

# our final base
FROM debian:bullseye AS flatmap-stream

# copy the build artifact from the build stage
COPY --from=build /numaflow-rs/target/release/flatmap-stream .

# set the startup command to run your binary
CMD ["./flatmap-stream"]
20 changes: 20 additions & 0 deletions examples/flatmap-stream/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
TAG ?= stable
PUSH ?= false
IMAGE_REGISTRY = quay.io/numaio/numaflow-rs/flatmap-stream:${TAG}
DOCKER_FILE_PATH = examples/flatmap-stream/Dockerfile

.PHONY: update
update:
cargo check
cargo update

.PHONY: image
image: update
cd ../../ && docker build \
-f ${DOCKER_FILE_PATH} \
-t ${IMAGE_REGISTRY} .
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi

.PHONY: clean
clean:
-rm -rf target
29 changes: 29 additions & 0 deletions examples/flatmap-stream/manifests/simple-flatmap-stream.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: simple-flatmap-stream
spec:
vertices:
- name: in
source:
# A self data generating source
generator:
rpu: 300
duration: 1s
keyCount: 5
value: 5
- name: cat
scale:
min: 1
udf:
container:
image: quay.io/numaio/numaflow-rs/flatmap-stream:stable
- name: out
sink:
# A simple log printing sink
log: { }
edges:
- from: in
to: cat
- from: cat
to: out
27 changes: 27 additions & 0 deletions examples/flatmap-stream/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use numaflow::mapstream;
use numaflow::mapstream::Message;
use tokio::sync::mpsc::Sender;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
mapstream::Server::new(Cat).start().await
}

struct Cat;

#[tonic::async_trait]
impl mapstream::MapStreamer for Cat {
async fn map_stream(&self, input: mapstream::MapStreamRequest, tx: Sender<Message>) {
let payload_str = String::from_utf8(input.value).unwrap_or_default();
let splits: Vec<&str> = payload_str.split(',').collect();

for split in splits {
let message = Message::new(split.as_bytes().to_vec())
.with_keys(input.keys.clone())
.with_tags(vec![]);
if tx.send(message).await.is_err() {
break;
}
}
}
}
2 changes: 1 addition & 1 deletion examples/map-cat/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ struct Cat;
#[tonic::async_trait]
impl map::Mapper for Cat {
async fn map(&self, input: map::MapRequest) -> Vec<map::Message> {
vec![map::Message::new(input.value).keys(input.keys.clone())]
vec![map::Message::new(input.value).with_keys(input.keys.clone())]
}
}
2 changes: 1 addition & 1 deletion examples/map-tickgen-serde/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl map::Mapper for TickGen {
})
.unwrap_or_default(),
)
.keys(input.keys.clone());
.with_keys(input.keys.clone());
vec![message]
}
}
10 changes: 6 additions & 4 deletions examples/mapt-event-time-filter/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::error::Error;

use filter_impl::filter_event_time;
use numaflow::sourcetransform;
use numaflow::sourcetransform::{Message, SourceTransformRequest};
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
Expand Down Expand Up @@ -31,19 +32,20 @@ mod filter_impl {
vec![Message::message_to_drop(input.eventtime)]
} else if input.eventtime < jan_first_2023 {
vec![Message::new(input.value, jan_first_2022)
.tags(vec![String::from("within_year_2022")])]
.with_tags(vec![String::from("within_year_2022")])]
} else {
vec![Message::new(input.value, jan_first_2023)
.tags(vec![String::from("after_year_2022")])]
.with_tags(vec![String::from("after_year_2022")])]
}
}
}

#[cfg(test)]
mod tests {
use crate::filter_impl::filter_event_time;
use chrono::{TimeZone, Utc};
use numaflow::sourcetransform::SourceTransformRequest;

use crate::filter_impl::filter_event_time;
/// Tests that events from 2022 are tagged as within the year 2022.
#[test]
fn test_filter_event_time_should_return_within_year_2022() {
Expand Down
2 changes: 1 addition & 1 deletion examples/reduce-counter/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ mod counter {
while input.recv().await.is_some() {
counter += 1;
}
vec![Message::new(counter.to_string().into_bytes()).keys(keys.clone())]
vec![Message::new(counter.to_string().into_bytes()).with_keys(keys.clone())]
}
}
}
2 changes: 1 addition & 1 deletion examples/sideinput/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use numaflow::sideinput::{self, SideInputer};
use std::sync::Mutex;
use std::time::{SystemTime, UNIX_EPOCH};

use numaflow::sideinput::{self, SideInputer};
use tonic::async_trait;

struct SideInputHandler {
Expand Down
4 changes: 2 additions & 2 deletions examples/simple-source/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
}

pub(crate) mod simple_source {
use chrono::Utc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{collections::HashSet, sync::RwLock};
use tokio::sync::mpsc::Sender;

use chrono::Utc;
use numaflow::source::{Message, Offset, SourceReadRequest, Sourcer};
use tokio::sync::mpsc::Sender;

/// SimpleSource is a data generator which generates monotonically increasing offsets and data. It is a shared state which is protected using Locks
/// or Atomics to provide concurrent access. Numaflow actually does not require concurrent access but we are forced to do this because the SDK
Expand Down
2 changes: 1 addition & 1 deletion examples/source-transformer-now/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl sourcetransform::SourceTransformer for NowCat {
) -> Vec<sourcetransform::Message> {
vec![
sourcetransform::Message::new(input.value, chrono::offset::Utc::now())
.keys(input.keys.clone()),
.with_keys(input.keys.clone()),
]
}
}
38 changes: 11 additions & 27 deletions numaflow/src/batchmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,6 @@ use std::fs;
use std::path::PathBuf;
use std::sync::Arc;

use crate::error::Error;
use crate::error::ErrorKind::{InternalError, UserDefinedError};
use crate::servers::map as proto;
use crate::servers::map::map_server::Map;
use crate::servers::map::{MapRequest, MapResponse, ReadyResponse};
use crate::shared::{self, shutdown_signal, ContainerType};
use chrono::{DateTime, Utc};
use tokio::sync::mpsc::channel;
use tokio::sync::{mpsc, oneshot};
Expand All @@ -18,6 +12,13 @@ use tokio_util::sync::CancellationToken;
use tonic::{Request, Response, Status, Streaming};
use tracing::{debug, info};

use crate::error::Error;
use crate::error::ErrorKind::{InternalError, UserDefinedError};
use crate::servers::map as proto;
use crate::servers::map::map_server::Map;
use crate::servers::map::{MapRequest, MapResponse, ReadyResponse};
use crate::shared::{self, shutdown_signal, ContainerType};

const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024;
const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/batchmap.sock";
const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/mapper-server-info";
Expand Down Expand Up @@ -172,9 +173,9 @@ impl Message {
///
/// ```
/// use numaflow::batchmap::Message;
/// let message = Message::new(vec![1, 2, 3]).keys(vec!["key1".to_string(), "key2".to_string()]);
/// let message = Message::new(vec![1, 2, 3]).with_keys(vec!["key1".to_string(), "key2".to_string()]);
/// ```
pub fn keys(mut self, keys: Vec<String>) -> Self {
pub fn with_keys(mut self, keys: Vec<String>) -> Self {
self.keys = Some(keys);
self
}
Expand All @@ -189,29 +190,12 @@ impl Message {
///
/// ```
/// use numaflow::batchmap::Message;
/// let message = Message::new(vec![1, 2, 3]).tags(vec!["tag1".to_string(), "tag2".to_string()]);
/// let message = Message::new(vec![1, 2, 3]).with_tags(vec!["tag1".to_string(), "tag2".to_string()]);
/// ```
pub fn tags(mut self, tags: Vec<String>) -> Self {
pub fn with_tags(mut self, tags: Vec<String>) -> Self {
self.tags = Some(tags);
self
}

/// Replaces the value of the message.
///
/// # Arguments
///
/// * `value` - A new vector of bytes that replaces the current message value.
///
/// # Examples
///
/// ```
/// use numaflow::batchmap::Message;
/// let message = Message::new(vec![1, 2, 3]).value(vec![4, 5, 6]);
/// ```
pub fn value(mut self, value: Vec<u8>) -> Self {
self.value = value;
self
}
}
/// The result of the call to [`BatchMapper::batchmap`] method.
pub struct BatchResponse {
Expand Down
5 changes: 4 additions & 1 deletion numaflow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@ pub mod sink;
/// building [side input](https://numaflow.numaproj.io/user-guide/reference/side-inputs/)
pub mod sideinput;

/// batchmap is for writing the [batch map mode](https://numaflow.numaproj.io/user-guide/user-defined-functions/map/batchmap/) handlers.
/// batchmap is for writing the map in [batch mode](https://numaflow.numaproj.io/user-guide/user-defined-functions/map/map/#batch-map-mode) handlers.
pub mod batchmap;

/// mapstream is for writing the map in [stream mode](https://numaflow.numaproj.io/user-guide/user-defined-functions/map/map/#streaming-mode) handlers.
pub mod mapstream;

mod servers;

// Error handling on Numaflow SDKs!
Expand Down
33 changes: 10 additions & 23 deletions numaflow/src/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub trait Mapper {
/// impl map::Mapper for Cat {
/// async fn map(&self, input: map::MapRequest) -> Vec<map::Message> {
/// use numaflow::map::Message;
/// let message=Message::new(input.value).keys(input.keys).tags(vec![]);
/// let message=Message::new(input.value).with_keys(input.keys).with_tags(vec![]);
/// vec![message]
/// }
/// }
Expand Down Expand Up @@ -122,9 +122,9 @@ impl Message {
///
/// ```
/// use numaflow::map::Message;
/// let message = Message::new(vec![1, 2, 3]).keys(vec!["key1".to_string(), "key2".to_string()]);
/// let message = Message::new(vec![1, 2, 3]).with_keys(vec!["key1".to_string(), "key2".to_string()]);
/// ```
pub fn keys(mut self, keys: Vec<String>) -> Self {
pub fn with_keys(mut self, keys: Vec<String>) -> Self {
self.keys = Some(keys);
self
}
Expand All @@ -139,29 +139,12 @@ impl Message {
///
/// ```
/// use numaflow::map::Message;
/// let message = Message::new(vec![1, 2, 3]).tags(vec!["tag1".to_string(), "tag2".to_string()]);
/// let message = Message::new(vec![1, 2, 3]).with_tags(vec!["tag1".to_string(), "tag2".to_string()]);
/// ```
pub fn tags(mut self, tags: Vec<String>) -> Self {
pub fn with_tags(mut self, tags: Vec<String>) -> Self {
self.tags = Some(tags);
self
}

/// Replaces the value of the message.
///
/// # Arguments
///
/// * `value` - A new vector of bytes that replaces the current message value.
///
/// # Examples
///
/// ```
/// use numaflow::map::Message;
/// let message = Message::new(vec![1, 2, 3]).value(vec![4, 5, 6]);
/// ```
pub fn value(mut self, value: Vec<u8>) -> Self {
self.value = value;
self
}
}

impl From<Message> for proto::map_response::Result {
Expand Down Expand Up @@ -370,7 +353,7 @@ async fn run_map<T>(
Err(e) => {
error!("Failed to run map function: {e:?}");
error_tx
.send(Error::MapError(ErrorKind::InternalError(format!(
.send(Error::MapError(ErrorKind::UserDefinedError(format!(
"panicked: {e:?}"
))))
.await
Expand Down Expand Up @@ -538,6 +521,7 @@ impl<C> Drop for Server<C> {
// same address. UnixListener doesn't implement Drop trait, so we have to manually remove the socket file.
fn drop(&mut self) {
let _ = fs::remove_file(&self.sock_addr);
let _ = fs::remove_file(&self.server_info_file);
}
}

Expand Down Expand Up @@ -830,6 +814,9 @@ mod tests {
};
tx.send(request).await.unwrap();

let resp = stream.message().await;
assert!(resp.is_err());

// server should shut down gracefully because there was a panic in the handler.
for _ in 0..10 {
tokio::time::sleep(Duration::from_millis(10)).await;
Expand Down
Loading

0 comments on commit c9806a0

Please sign in to comment.