Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Map Streamer #109

Merged
merged 11 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .rustfmt.toml
Original file line number Diff line number Diff line change
@@ -1 +1 @@
edition = "2021"
edition = "2021"
9 changes: 9 additions & 0 deletions examples/flatmap-stream/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[package]
name = "flatmap-stream"
version = "0.1.0"
edition = "2021"

[dependencies]
tonic = "0.12.0"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
numaflow = { path = "../../numaflow" }
20 changes: 20 additions & 0 deletions examples/flatmap-stream/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
FROM rust:1.82-bullseye AS build

RUN apt-get update
RUN apt-get install protobuf-compiler -y

WORKDIR /numaflow-rs
COPY ./ ./
WORKDIR /numaflow-rs/examples/flatmap-stream

# build for release
RUN cargo build --release

# our final base
FROM debian:bullseye AS flatmap-stream

# copy the build artifact from the build stage
COPY --from=build /numaflow-rs/target/release/flatmap-stream .

# set the startup command to run your binary
CMD ["./flatmap-stream"]
20 changes: 20 additions & 0 deletions examples/flatmap-stream/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
TAG ?= stable
PUSH ?= false
IMAGE_REGISTRY = quay.io/numaio/numaflow-rs/flatmap-stream:${TAG}
DOCKER_FILE_PATH = examples/flatmap-stream/Dockerfile

.PHONY: update
update:
cargo check
cargo update

.PHONY: image
image: update
cd ../../ && docker build \
-f ${DOCKER_FILE_PATH} \
-t ${IMAGE_REGISTRY} .
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi

.PHONY: clean
clean:
-rm -rf target
29 changes: 29 additions & 0 deletions examples/flatmap-stream/manifests/simple-flatmap-stream.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: simple-flatmap-stream
spec:
vertices:
- name: in
source:
# A self data generating source
generator:
rpu: 300
duration: 1s
keyCount: 5
value: 5
- name: cat
scale:
min: 1
udf:
container:
image: quay.io/numaio/numaflow-rs/flatmap-stream:stable
- name: out
sink:
# A simple log printing sink
log: { }
edges:
- from: in
to: cat
- from: cat
to: out
27 changes: 27 additions & 0 deletions examples/flatmap-stream/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use numaflow::mapstream;
use numaflow::mapstream::Message;
use tokio::sync::mpsc::Sender;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
mapstream::Server::new(Cat).start().await
}

struct Cat;

#[tonic::async_trait]
impl mapstream::MapStreamer for Cat {
async fn map_stream(&self, input: mapstream::MapStreamRequest, tx: Sender<Message>) {
let payload_str = String::from_utf8(input.value).unwrap_or_default();
let splits: Vec<&str> = payload_str.split(',').collect();

for split in splits {
let message = Message::new(split.as_bytes().to_vec())
.keys(input.keys.clone())
.tags(vec![]);
if tx.send(message).await.is_err() {
break;
}
}
}
}
6 changes: 4 additions & 2 deletions examples/mapt-event-time-filter/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::error::Error;

use filter_impl::filter_event_time;
use numaflow::sourcetransform;
use numaflow::sourcetransform::{Message, SourceTransformRequest};
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
Expand Down Expand Up @@ -41,9 +42,10 @@ mod filter_impl {

#[cfg(test)]
mod tests {
use crate::filter_impl::filter_event_time;
use chrono::{TimeZone, Utc};
use numaflow::sourcetransform::SourceTransformRequest;

use crate::filter_impl::filter_event_time;
/// Tests that events from 2022 are tagged as within the year 2022.
#[test]
fn test_filter_event_time_should_return_within_year_2022() {
Expand Down
2 changes: 1 addition & 1 deletion examples/sideinput/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use numaflow::sideinput::{self, SideInputer};
use std::sync::Mutex;
use std::time::{SystemTime, UNIX_EPOCH};

use numaflow::sideinput::{self, SideInputer};
use tonic::async_trait;

struct SideInputHandler {
Expand Down
4 changes: 2 additions & 2 deletions examples/simple-source/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
}

pub(crate) mod simple_source {
use chrono::Utc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{collections::HashSet, sync::RwLock};
use tokio::sync::mpsc::Sender;

use chrono::Utc;
use numaflow::source::{Message, Offset, SourceReadRequest, Sourcer};
use tokio::sync::mpsc::Sender;

/// SimpleSource is a data generator which generates monotonically increasing offsets and data. It is a shared state which is protected using Locks
/// or Atomics to provide concurrent access. Numaflow actually does not require concurrent access but we are forced to do this because the SDK
Expand Down
13 changes: 7 additions & 6 deletions numaflow/src/batchmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,6 @@ use std::fs;
use std::path::PathBuf;
use std::sync::Arc;

use crate::error::Error;
use crate::error::ErrorKind::{InternalError, UserDefinedError};
use crate::servers::map as proto;
use crate::servers::map::map_server::Map;
use crate::servers::map::{MapRequest, MapResponse, ReadyResponse};
use crate::shared::{self, shutdown_signal, ContainerType};
use chrono::{DateTime, Utc};
use tokio::sync::mpsc::channel;
use tokio::sync::{mpsc, oneshot};
Expand All @@ -18,6 +12,13 @@ use tokio_util::sync::CancellationToken;
use tonic::{Request, Response, Status, Streaming};
use tracing::{debug, info};

use crate::error::Error;
use crate::error::ErrorKind::{InternalError, UserDefinedError};
use crate::servers::map as proto;
use crate::servers::map::map_server::Map;
use crate::servers::map::{MapRequest, MapResponse, ReadyResponse};
use crate::shared::{self, shutdown_signal, ContainerType};

const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024;
const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/batchmap.sock";
const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/mapper-server-info";
Expand Down
5 changes: 4 additions & 1 deletion numaflow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@ pub mod sink;
/// building [side input](https://numaflow.numaproj.io/user-guide/reference/side-inputs/)
pub mod sideinput;

/// batchmap is for writing the [batch map mode](https://numaflow.numaproj.io/user-guide/user-defined-functions/map/batchmap/) handlers.
/// batchmap is for writing the map in [batch mode](https://numaflow.numaproj.io/user-guide/user-defined-functions/map/map/#batch-map-mode) handlers.
pub mod batchmap;

/// mapstream is for writing the map in [stream mode](https://numaflow.numaproj.io/user-guide/user-defined-functions/map/map/#streaming-mode) handlers.
pub mod mapstream;

mod servers;

// Error handling on Numaflow SDKs!
Expand Down
4 changes: 4 additions & 0 deletions numaflow/src/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,7 @@ impl<C> Drop for Server<C> {
// same address. UnixListener doesn't implement Drop trait, so we have to manually remove the socket file.
fn drop(&mut self) {
let _ = fs::remove_file(&self.sock_addr);
let _ = fs::remove_file(&self.server_info_file);
}
}

Expand Down Expand Up @@ -830,6 +831,9 @@ mod tests {
};
tx.send(request).await.unwrap();

let resp = stream.message().await;
assert!(resp.is_err());

// server should shut down gracefully because there was a panic in the handler.
for _ in 0..10 {
tokio::time::sleep(Duration::from_millis(10)).await;
Expand Down
Loading
Loading