Skip to content

Commit

Permalink
Merge branch 'main' into release-pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
BulkBeing authored Oct 10, 2024
2 parents 3db7349 + 0870af9 commit 0a3fc26
Show file tree
Hide file tree
Showing 8 changed files with 466 additions and 201 deletions.
28 changes: 22 additions & 6 deletions proto/map.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package map.v1;

service Map {
// MapFn applies a function to each map request element.
rpc MapFn(MapRequest) returns (MapResponse);
rpc MapFn(stream MapRequest) returns (stream MapResponse);

// IsReady is the heartbeat endpoint for gRPC.
rpc IsReady(google.protobuf.Empty) returns (ReadyResponse);
Expand All @@ -17,12 +17,25 @@ service Map {
* MapRequest represents a request element.
*/
message MapRequest {
repeated string keys = 1;
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
map<string, string> headers = 5;
message Request {
repeated string keys = 1;
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
map<string, string> headers = 5;
}
Request request = 1;
// This ID is used to uniquely identify a map request
string id = 2;
optional Handshake handshake = 3;
}

/*
* Handshake message between client and server to indicate the start of transmission.
*/
message Handshake {
// Required field indicating the start of transmission.
bool sot = 1;
}

/**
Expand All @@ -35,6 +48,9 @@ message MapResponse {
repeated string tags = 3;
}
repeated Result results = 1;
// This ID is used to refer the responses to the request it corresponds to.
string id = 2;
optional Handshake handshake = 3;
}

/**
Expand Down
28 changes: 13 additions & 15 deletions src/batchmap.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use chrono::{DateTime, Utc};
use std::collections::HashMap;
use std::fs;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use chrono::{DateTime, Utc};
use tokio::sync::mpsc::channel;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::ReceiverStream;
Expand Down Expand Up @@ -118,7 +119,7 @@ pub struct Message {
}

/// Represents a message that can be modified and forwarded.
impl crate::batchmap::Message {
impl Message {
/// Creates a new message with the specified value.
///
/// This constructor initializes the message with no keys, tags, or specific event time.
Expand Down Expand Up @@ -148,11 +149,11 @@ impl crate::batchmap::Message {
/// use numaflow::batchmap::Message;
/// let dropped_message = Message::message_to_drop();
/// ```
pub fn message_to_drop() -> crate::batchmap::Message {
crate::batchmap::Message {
pub fn message_to_drop() -> Message {
Message {
keys: None,
value: vec![],
tags: Some(vec![crate::batchmap::DROP.to_string()]),
tags: Some(vec![DROP.to_string()]),
}
}

Expand Down Expand Up @@ -245,11 +246,8 @@ impl<T> BatchMap for BatchMapService<T>
where
T: BatchMapper + Send + Sync + 'static,
{
async fn is_ready(
&self,
_: Request<()>,
) -> Result<tonic::Response<proto::ReadyResponse>, Status> {
Ok(tonic::Response::new(proto::ReadyResponse { ready: true }))
async fn is_ready(&self, _: Request<()>) -> Result<Response<proto::ReadyResponse>, Status> {
Ok(Response::new(proto::ReadyResponse { ready: true }))
}

type BatchMapFnStream = ReceiverStream<Result<proto::BatchMapResponse, Status>>;
Expand All @@ -261,7 +259,7 @@ where
let mut stream = request.into_inner();

// Create a channel to send the messages to the user defined function.
let (tx, rx) = mpsc::channel::<Datum>(1);
let (tx, rx) = channel::<Datum>(1);

// Create a channel to send the response back to the grpc client.
let (grpc_response_tx, grpc_response_rx) =
Expand Down Expand Up @@ -418,9 +416,9 @@ pub struct Server<T> {
server_info_file: PathBuf,
svc: Option<T>,
}
impl<T> crate::batchmap::Server<T> {
impl<T> Server<T> {
pub fn new(batch_map_svc: T) -> Self {
crate::batchmap::Server {
Server {
sock_addr: DEFAULT_SOCK_ADDR.into(),
max_message_size: DEFAULT_MAX_MESSAGE_SIZE,
server_info_file: DEFAULT_SERVER_INFO_FILE.into(),
Expand Down Expand Up @@ -478,8 +476,8 @@ impl<T> crate::batchmap::Server<T> {
let cln_token = CancellationToken::new();

// Create a channel to send shutdown signal to the server to do graceful shutdown in case of non retryable errors.
let (internal_shutdown_tx, internal_shutdown_rx) = mpsc::channel(1);
let map_svc = crate::batchmap::BatchMapService {
let (internal_shutdown_tx, internal_shutdown_rx) = channel(1);
let map_svc = BatchMapService {
handler: Arc::new(handler),
_shutdown_tx: internal_shutdown_tx,
cancellation_token: cln_token.clone(),
Expand Down
Loading

0 comments on commit 0a3fc26

Please sign in to comment.