Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Map Streamer #109

Merged
merged 11 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .rustfmt.toml
Original file line number Diff line number Diff line change
@@ -1 +1 @@
edition = "2021"
edition = "2021"
2 changes: 1 addition & 1 deletion examples/batchmap-cat/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ impl batchmap::BatchMapper for Cat {
let mut responses: Vec<BatchResponse> = Vec::new();
while let Some(datum) = input.recv().await {
let mut response = BatchResponse::from_id(datum.id);
response.append(Message::new(datum.value).keys(datum.keys.clone()));
response.append(Message::new(datum.value).with_keys(datum.keys.clone()));
responses.push(response);
}
responses
Expand Down
2 changes: 1 addition & 1 deletion examples/batchmap-flatmap/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl batchmap::BatchMapper for Flatmap {

// return the resulting parts
for part in parts {
response.append(Message::new(Vec::from(part)).keys(datum.keys.clone()));
response.append(Message::new(Vec::from(part)).with_keys(datum.keys.clone()));
}
responses.push(response);
}
Expand Down
9 changes: 9 additions & 0 deletions examples/flatmap-stream/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[package]
name = "flatmap-stream"
version = "0.1.0"
edition = "2021"

[dependencies]
tonic = "0.12.0"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
numaflow = { path = "../../numaflow" }
20 changes: 20 additions & 0 deletions examples/flatmap-stream/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
FROM rust:1.82-bullseye AS build

RUN apt-get update
RUN apt-get install protobuf-compiler -y

WORKDIR /numaflow-rs
COPY ./ ./
WORKDIR /numaflow-rs/examples/flatmap-stream

# build for release
RUN cargo build --release

# our final base
FROM debian:bullseye AS flatmap-stream

# copy the build artifact from the build stage
COPY --from=build /numaflow-rs/target/release/flatmap-stream .

# set the startup command to run your binary
CMD ["./flatmap-stream"]
20 changes: 20 additions & 0 deletions examples/flatmap-stream/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
TAG ?= stable
PUSH ?= false
IMAGE_REGISTRY = quay.io/numaio/numaflow-rs/flatmap-stream:${TAG}
DOCKER_FILE_PATH = examples/flatmap-stream/Dockerfile

.PHONY: update
update:
cargo check
cargo update

.PHONY: image
image: update
cd ../../ && docker build \
-f ${DOCKER_FILE_PATH} \
-t ${IMAGE_REGISTRY} .
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi

.PHONY: clean
clean:
-rm -rf target
29 changes: 29 additions & 0 deletions examples/flatmap-stream/manifests/simple-flatmap-stream.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: simple-flatmap-stream
spec:
vertices:
- name: in
source:
# A self data generating source
generator:
rpu: 300
duration: 1s
keyCount: 5
value: 5
- name: cat
scale:
min: 1
udf:
container:
image: quay.io/numaio/numaflow-rs/flatmap-stream:stable
- name: out
sink:
# A simple log printing sink
log: { }
edges:
- from: in
to: cat
- from: cat
to: out
27 changes: 27 additions & 0 deletions examples/flatmap-stream/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use numaflow::mapstream;
use numaflow::mapstream::Message;
use tokio::sync::mpsc::Sender;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
mapstream::Server::new(Cat).start().await
}

struct Cat;

#[tonic::async_trait]
impl mapstream::MapStreamer for Cat {
async fn map_stream(&self, input: mapstream::MapStreamRequest, tx: Sender<Message>) {
let payload_str = String::from_utf8(input.value).unwrap_or_default();
let splits: Vec<&str> = payload_str.split(',').collect();

for split in splits {
let message = Message::new(split.as_bytes().to_vec())
.with_keys(input.keys.clone())
.with_tags(vec![]);
if tx.send(message).await.is_err() {
break;
}
}
}
}
2 changes: 1 addition & 1 deletion examples/map-cat/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ struct Cat;
#[tonic::async_trait]
impl map::Mapper for Cat {
async fn map(&self, input: map::MapRequest) -> Vec<map::Message> {
vec![map::Message::new(input.value).keys(input.keys.clone())]
vec![map::Message::new(input.value).with_keys(input.keys.clone())]
}
}
2 changes: 1 addition & 1 deletion examples/map-tickgen-serde/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl map::Mapper for TickGen {
})
.unwrap_or_default(),
)
.keys(input.keys.clone());
.with_keys(input.keys.clone());
vec![message]
}
}
10 changes: 6 additions & 4 deletions examples/mapt-event-time-filter/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::error::Error;

use filter_impl::filter_event_time;
use numaflow::sourcetransform;
use numaflow::sourcetransform::{Message, SourceTransformRequest};
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
Expand Down Expand Up @@ -31,19 +32,20 @@ mod filter_impl {
vec![Message::message_to_drop(input.eventtime)]
} else if input.eventtime < jan_first_2023 {
vec![Message::new(input.value, jan_first_2022)
.tags(vec![String::from("within_year_2022")])]
.with_tags(vec![String::from("within_year_2022")])]
} else {
vec![Message::new(input.value, jan_first_2023)
.tags(vec![String::from("after_year_2022")])]
.with_tags(vec![String::from("after_year_2022")])]
}
}
}

#[cfg(test)]
mod tests {
use crate::filter_impl::filter_event_time;
use chrono::{TimeZone, Utc};
use numaflow::sourcetransform::SourceTransformRequest;

use crate::filter_impl::filter_event_time;
/// Tests that events from 2022 are tagged as within the year 2022.
#[test]
fn test_filter_event_time_should_return_within_year_2022() {
Expand Down
2 changes: 1 addition & 1 deletion examples/reduce-counter/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ mod counter {
while input.recv().await.is_some() {
counter += 1;
}
vec![Message::new(counter.to_string().into_bytes()).keys(keys.clone())]
vec![Message::new(counter.to_string().into_bytes()).with_keys(keys.clone())]
}
}
}
2 changes: 1 addition & 1 deletion examples/sideinput/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use numaflow::sideinput::{self, SideInputer};
use std::sync::Mutex;
use std::time::{SystemTime, UNIX_EPOCH};

use numaflow::sideinput::{self, SideInputer};
use tonic::async_trait;

struct SideInputHandler {
Expand Down
4 changes: 2 additions & 2 deletions examples/simple-source/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
}

pub(crate) mod simple_source {
use chrono::Utc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{collections::HashSet, sync::RwLock};
use tokio::sync::mpsc::Sender;

use chrono::Utc;
use numaflow::source::{Message, Offset, SourceReadRequest, Sourcer};
use tokio::sync::mpsc::Sender;

/// SimpleSource is a data generator which generates monotonically increasing offsets and data. It is a shared state which is protected using Locks
/// or Atomics to provide concurrent access. Numaflow actually does not require concurrent access but we are forced to do this because the SDK
Expand Down
2 changes: 1 addition & 1 deletion examples/source-transformer-now/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl sourcetransform::SourceTransformer for NowCat {
) -> Vec<sourcetransform::Message> {
vec![
sourcetransform::Message::new(input.value, chrono::offset::Utc::now())
.keys(input.keys.clone()),
.with_keys(input.keys.clone()),
]
}
}
38 changes: 11 additions & 27 deletions numaflow/src/batchmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,6 @@ use std::fs;
use std::path::PathBuf;
use std::sync::Arc;

use crate::error::Error;
use crate::error::ErrorKind::{InternalError, UserDefinedError};
use crate::servers::map as proto;
use crate::servers::map::map_server::Map;
use crate::servers::map::{MapRequest, MapResponse, ReadyResponse};
use crate::shared::{self, shutdown_signal, ContainerType};
use chrono::{DateTime, Utc};
use tokio::sync::mpsc::channel;
use tokio::sync::{mpsc, oneshot};
Expand All @@ -18,6 +12,13 @@ use tokio_util::sync::CancellationToken;
use tonic::{Request, Response, Status, Streaming};
use tracing::{debug, info};

use crate::error::Error;
use crate::error::ErrorKind::{InternalError, UserDefinedError};
use crate::servers::map as proto;
use crate::servers::map::map_server::Map;
use crate::servers::map::{MapRequest, MapResponse, ReadyResponse};
use crate::shared::{self, shutdown_signal, ContainerType};

const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024;
const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/batchmap.sock";
const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/mapper-server-info";
Expand Down Expand Up @@ -172,9 +173,9 @@ impl Message {
///
/// ```
/// use numaflow::batchmap::Message;
/// let message = Message::new(vec![1, 2, 3]).keys(vec!["key1".to_string(), "key2".to_string()]);
/// let message = Message::new(vec![1, 2, 3]).with_keys(vec!["key1".to_string(), "key2".to_string()]);
/// ```
pub fn keys(mut self, keys: Vec<String>) -> Self {
pub fn with_keys(mut self, keys: Vec<String>) -> Self {
self.keys = Some(keys);
self
}
Expand All @@ -189,29 +190,12 @@ impl Message {
///
/// ```
/// use numaflow::batchmap::Message;
/// let message = Message::new(vec![1, 2, 3]).tags(vec!["tag1".to_string(), "tag2".to_string()]);
/// let message = Message::new(vec![1, 2, 3]).with_tags(vec!["tag1".to_string(), "tag2".to_string()]);
/// ```
pub fn tags(mut self, tags: Vec<String>) -> Self {
pub fn with_tags(mut self, tags: Vec<String>) -> Self {
self.tags = Some(tags);
self
}

/// Replaces the value of the message.
///
/// # Arguments
///
/// * `value` - A new vector of bytes that replaces the current message value.
///
/// # Examples
///
/// ```
/// use numaflow::batchmap::Message;
/// let message = Message::new(vec![1, 2, 3]).value(vec![4, 5, 6]);
/// ```
pub fn value(mut self, value: Vec<u8>) -> Self {
self.value = value;
self
}
}
/// The result of the call to [`BatchMapper::batchmap`] method.
pub struct BatchResponse {
Expand Down
5 changes: 4 additions & 1 deletion numaflow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@ pub mod sink;
/// building [side input](https://numaflow.numaproj.io/user-guide/reference/side-inputs/)
pub mod sideinput;

/// batchmap is for writing the [batch map mode](https://numaflow.numaproj.io/user-guide/user-defined-functions/map/batchmap/) handlers.
/// batchmap is for writing the map in [batch mode](https://numaflow.numaproj.io/user-guide/user-defined-functions/map/map/#batch-map-mode) handlers.
pub mod batchmap;

/// mapstream is for writing the map in [stream mode](https://numaflow.numaproj.io/user-guide/user-defined-functions/map/map/#streaming-mode) handlers.
pub mod mapstream;

mod servers;

// Error handling on Numaflow SDKs!
Expand Down
33 changes: 10 additions & 23 deletions numaflow/src/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub trait Mapper {
/// impl map::Mapper for Cat {
/// async fn map(&self, input: map::MapRequest) -> Vec<map::Message> {
/// use numaflow::map::Message;
/// let message=Message::new(input.value).keys(input.keys).tags(vec![]);
/// let message=Message::new(input.value).with_keys(input.keys).with_tags(vec![]);
/// vec![message]
/// }
/// }
Expand Down Expand Up @@ -122,9 +122,9 @@ impl Message {
///
/// ```
/// use numaflow::map::Message;
/// let message = Message::new(vec![1, 2, 3]).keys(vec!["key1".to_string(), "key2".to_string()]);
/// let message = Message::new(vec![1, 2, 3]).with_keys(vec!["key1".to_string(), "key2".to_string()]);
/// ```
pub fn keys(mut self, keys: Vec<String>) -> Self {
pub fn with_keys(mut self, keys: Vec<String>) -> Self {
self.keys = Some(keys);
self
}
Expand All @@ -139,29 +139,12 @@ impl Message {
///
/// ```
/// use numaflow::map::Message;
/// let message = Message::new(vec![1, 2, 3]).tags(vec!["tag1".to_string(), "tag2".to_string()]);
/// let message = Message::new(vec![1, 2, 3]).with_tags(vec!["tag1".to_string(), "tag2".to_string()]);
/// ```
pub fn tags(mut self, tags: Vec<String>) -> Self {
pub fn with_tags(mut self, tags: Vec<String>) -> Self {
self.tags = Some(tags);
self
}

/// Replaces the value of the message.
///
/// # Arguments
///
/// * `value` - A new vector of bytes that replaces the current message value.
///
/// # Examples
///
/// ```
/// use numaflow::map::Message;
/// let message = Message::new(vec![1, 2, 3]).value(vec![4, 5, 6]);
/// ```
pub fn value(mut self, value: Vec<u8>) -> Self {
self.value = value;
self
}
}

impl From<Message> for proto::map_response::Result {
Expand Down Expand Up @@ -370,7 +353,7 @@ async fn run_map<T>(
Err(e) => {
error!("Failed to run map function: {e:?}");
error_tx
.send(Error::MapError(ErrorKind::InternalError(format!(
.send(Error::MapError(ErrorKind::UserDefinedError(format!(
"panicked: {e:?}"
))))
.await
Expand Down Expand Up @@ -538,6 +521,7 @@ impl<C> Drop for Server<C> {
// same address. UnixListener doesn't implement Drop trait, so we have to manually remove the socket file.
fn drop(&mut self) {
let _ = fs::remove_file(&self.sock_addr);
let _ = fs::remove_file(&self.server_info_file);
}
}

Expand Down Expand Up @@ -830,6 +814,9 @@ mod tests {
};
tx.send(request).await.unwrap();

let resp = stream.message().await;
assert!(resp.is_err());

// server should shut down gracefully because there was a panic in the handler.
for _ in 0..10 {
tokio::time::sleep(Duration::from_millis(10)).await;
Expand Down
Loading
Loading