Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Introducing Factory to Create Reducer #40

Merged
merged 3 commits into from
May 7, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 15 additions & 20 deletions src/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;
use tokio::task::JoinSet;
use tokio_stream::wrappers::ReceiverStream;
use tonic::metadata::MetadataMap;
use tonic::{async_trait, Request, Response, Status};
use tonic::metadata::MetadataMap;

use crate::shared;

const KEY_JOIN_DELIMITER: &str = ":";
Expand All @@ -25,10 +26,7 @@ pub mod proto {
tonic::include_proto!("reduce.v1");
}

struct ReduceService<C>
where
C: ReducerCreator + Send + Sync + 'static,
{
struct ReduceService<C> {
creator: C,
}

Expand Down Expand Up @@ -173,7 +171,7 @@ impl Metadata {

/// Metadata are additional information passed into the [`Reducer::reduce`].
pub struct Metadata {
pub interval_window: IntervalWindow
pub interval_window: IntervalWindow,
}

/// Message is the response from the user's [`Reducer::reduce`].
Expand Down Expand Up @@ -239,8 +237,8 @@ fn get_window_details(request: &MetadataMap) -> (DateTime<Utc>, DateTime<Utc>) {

#[async_trait]
impl<C> proto::reduce_server::Reduce for ReduceService<C>
where
C: ReducerCreator + Send + Sync + 'static,
where
C: ReducerCreator + Send + Sync + 'static,
{
type ReduceFnStream = ReceiverStream<Result<proto::ReduceResponse, Status>>;
async fn reduce_fn(
Expand Down Expand Up @@ -307,8 +305,8 @@ where
tx.send(Ok(proto::ReduceResponse {
results: datum_responses,
}))
.await
.unwrap();
.await
.unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to tackle need unwraps later on.

}
});

Expand All @@ -323,20 +321,14 @@ where

/// gRPC server to start a reduce service
#[derive(Debug)]
pub struct Server<C>
where
C: ReducerCreator + Send + Sync + 'static,
{
pub struct Server<C> {
sock_addr: PathBuf,
max_message_size: usize,
server_info_file: PathBuf,
creator: Option<C>,
}

impl<C> Server<C>
where
C: ReducerCreator + Send + Sync + 'static,
{
impl<C> Server<C> {
/// Create a new Server with the given reduce service
pub fn new(creator: C) -> Self {
Server {
Expand Down Expand Up @@ -387,7 +379,8 @@ impl<C> Server<C>
shutdown: F,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
where
F: Future<Output = ()>,
F: Future<Output=()>,
C: ReducerCreator + Send + Sync + 'static,
{
let listener = shared::create_listener_stream(&self.sock_addr, &self.server_info_file)?;
let creator = self.creator.take().unwrap();
Expand All @@ -404,7 +397,9 @@ impl<C> Server<C>
}

/// Starts the gRPC server. Automatically registers signal handlers for SIGINT and SIGTERM and initiates graceful shutdown of gRPC server when either one of the signal arrives.
pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
where
C: ReducerCreator + Send + Sync + 'static,
{
self.start_with_shutdown(shared::shutdown_signal()).await
}
Expand Down
Loading