Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: impl batch map #66

Merged
merged 17 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Description: Makefile for Rust projects

# perform a cargo fmt on all directories containing a Cargo.toml file
.PHONY: lint
# find all directories containing Cargo.toml files
DIRS := $(shell find . -type f -name Cargo.toml -exec dirname {} \; | sort -u)
$(info Included directories: $(DIRS))
lint:
@for dir in $(DIRS); do \
echo "Formatting code in $$dir"; \
cargo fmt --all --manifest-path "$$dir/Cargo.toml"; \
done

# run cargo test on the repository root
.PHONY: test
test:
cargo test --workspace
1 change: 1 addition & 0 deletions build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ fn main() {
"proto/reduce.proto",
"proto/sink.proto",
"proto/sideinput.proto",
"proto/batchmap.proto",
],
&["proto"],
)
Expand Down
1 change: 1 addition & 0 deletions examples/batchmap-cat/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
target/
14 changes: 14 additions & 0 deletions examples/batchmap-cat/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "batchmap-cat"
version = "0.1.0"
edition = "2021"


[[bin]]
name = "server"
path = "src/main.rs"

[dependencies]
tonic = "0.12.0"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
numaflow = { path = "../../" }
20 changes: 20 additions & 0 deletions examples/batchmap-cat/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
FROM rust:1.75-bookworm AS build

RUN apt-get update
RUN apt-get install protobuf-compiler -y

WORKDIR /numaflow-rs
COPY ./ ./
WORKDIR /numaflow-rs/examples/batchmap-cat

# build for release
RUN cargo build --release

# our final base
FROM debian:bookworm AS map-cat

# copy the build artifact from the build stage
COPY --from=build /numaflow-rs/examples/batchmap-cat/target/release/server .

# set the startup command to run your binary
CMD ["./server"]
20 changes: 20 additions & 0 deletions examples/batchmap-cat/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
TAG ?= stable
PUSH ?= false
IMAGE_REGISTRY = quay.io/numaio/numaflow-rs/batchmap-cat:${TAG}
DOCKER_FILE_PATH = examples/batchmap-cat/Dockerfile

.PHONY: update
update:
cargo check
cargo update

.PHONY: image
image: update
cd ../../ && docker build \
-f ${DOCKER_FILE_PATH} \
-t ${IMAGE_REGISTRY} .
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi

.PHONY: clean
clean:
-rm -rf target
31 changes: 31 additions & 0 deletions examples/batchmap-cat/manifests/simple-batchmap-cat.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: rust-batchmap-cat
spec:
vertices:
- name: in
source:
# A self data generating source
generator:
rpu: 300
duration: 1s
keyCount: 5
value: 5
- name: cat
scale:
min: 1
udf:
container:
image: quay.io/numaio/numaflow-rs/batchmap-cat:stable
- name: out
sink:
# A simple log printing sink
log: { }
edges:
- from: in
to: cat
- from: cat
to: out


26 changes: 26 additions & 0 deletions examples/batchmap-cat/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use numaflow::batchmap;
use numaflow::batchmap::{BatchResponse, Datum, Message};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
batchmap::Server::new(Cat).start().await
}

struct Cat;

#[tonic::async_trait]
impl batchmap::BatchMapper for Cat {
async fn batchmap(&self, mut input: tokio::sync::mpsc::Receiver<Datum>) -> Vec<BatchResponse> {
let mut responses: Vec<BatchResponse> = Vec::new();
while let Some(datum) = input.recv().await {
let mut response = BatchResponse::from_id(datum.id);
response.append(Message {
keys: Some(datum.keys),
value: datum.value,
tags: None,
});
responses.push(response);
}
responses
}
}
8 changes: 5 additions & 3 deletions examples/reduce-counter/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {

mod counter {
use numaflow::reduce::{Message, ReduceRequest};
use numaflow::reduce::{Reducer, Metadata};
use numaflow::reduce::{Metadata, Reducer};
use tokio::sync::mpsc::Receiver;
use tonic::async_trait;

Expand Down Expand Up @@ -44,8 +44,10 @@ mod counter {
while input.recv().await.is_some() {
counter += 1;
}
let message = Message::new(counter.to_string().into_bytes()).tags(vec![]).keys(keys.clone());
let message = Message::new(counter.to_string().into_bytes())
.tags(vec![])
.keys(keys.clone());
vec![message]
}
}
}
}
7 changes: 4 additions & 3 deletions examples/sideinput/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use numaflow::sideinput::{self, SideInputer};
use std::sync::Mutex;
use std::time::{SystemTime, UNIX_EPOCH};
use numaflow::sideinput::{self, SideInputer};


use tonic::async_trait;

Expand Down Expand Up @@ -37,5 +36,7 @@ impl SideInputer for SideInputHandler {

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
sideinput::Server::new(SideInputHandler::new()).start().await
sideinput::Server::new(SideInputHandler::new())
.start()
.await
}
2 changes: 1 addition & 1 deletion examples/sideinput/udf/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::path::Path;

use notify::{RecursiveMode, Result, Watcher};
use numaflow::map::{Mapper, MapRequest, Message, Server};
use numaflow::map::{MapRequest, Mapper, Message, Server};
use tokio::spawn;
use tonic::async_trait;

Expand Down
50 changes: 50 additions & 0 deletions proto/batchmap.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
syntax = "proto3";

import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";

package batchmap.v1;

service BatchMap {
// IsReady is the heartbeat endpoint for gRPC.
rpc IsReady(google.protobuf.Empty) returns (ReadyResponse);

// BatchMapFn is a bi-directional streaming rpc which applies a
// Map function on each BatchMapRequest element of the stream and then returns streams
// back MapResponse elements.
rpc BatchMapFn(stream BatchMapRequest) returns (stream BatchMapResponse);
}

/**
* BatchMapRequest represents a request element.
*/
message BatchMapRequest {
repeated string keys = 1;
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
map<string, string> headers = 5;
// This ID is used uniquely identify a map request
string id = 6;
}

/**
* BatchMapResponse represents a response element.
*/
message BatchMapResponse {
message Result {
repeated string keys = 1;
bytes value = 2;
repeated string tags = 3;
}
repeated Result results = 1;
// This ID is used to refer the responses to the request it corresponds to.
string id = 2;
}

/**
* ReadyResponse is the health check result.
*/
message ReadyResponse {
bool ready = 1;
}
Loading
Loading