Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: rename pb module name #33

Merged
merged 3 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 14 additions & 17 deletions src/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@ use std::path::PathBuf;
use tokio::sync::oneshot;
use tonic::{async_trait, Request, Response, Status};

use crate::map::mapper::{
map_response, map_server, MapRequest as RPCMapRequest, MapResponse, ReadyResponse,
};
use crate::shared;

mod mapper {
mod proto {
tonic::include_proto!("map.v1");
}

Expand Down Expand Up @@ -53,24 +50,24 @@ pub trait Mapper {
}

#[async_trait]
impl<T> map_server::Map for MapService<T>
impl<T> proto::map_server::Map for MapService<T>
where
T: Mapper + Send + Sync + 'static,
{
async fn map_fn(
&self,
request: Request<RPCMapRequest>,
) -> Result<Response<MapResponse>, Status> {
request: Request<proto::MapRequest>,
) -> Result<Response<proto::MapResponse>, Status> {
let request = request.into_inner();
let result = self.handler.map(request.into()).await;

Ok(Response::new(MapResponse {
Ok(Response::new(proto::MapResponse {
results: result.into_iter().map(|msg| msg.into()).collect(),
}))
}

async fn is_ready(&self, _: Request<()>) -> Result<Response<ReadyResponse>, Status> {
Ok(Response::new(ReadyResponse { ready: true }))
async fn is_ready(&self, _: Request<()>) -> Result<Response<proto::ReadyResponse>, Status> {
Ok(Response::new(proto::ReadyResponse { ready: true }))
}
}

Expand All @@ -85,9 +82,9 @@ pub struct Message {
pub tags: Vec<String>,
}

impl From<Message> for map_response::Result {
impl From<Message> for proto::map_response::Result {
fn from(value: Message) -> Self {
map_response::Result {
proto::map_response::Result {
keys: value.keys,
value: value.value,
tags: value.tags,
Expand All @@ -107,8 +104,8 @@ pub struct MapRequest {
pub eventtime: DateTime<Utc>,
}

impl From<RPCMapRequest> for MapRequest {
fn from(value: RPCMapRequest) -> Self {
impl From<proto::MapRequest> for MapRequest {
fn from(value: proto::MapRequest) -> Self {
Self {
keys: value.keys,
value: value.value,
Expand Down Expand Up @@ -187,7 +184,7 @@ impl<T> Server<T> {
let listener = shared::create_listener_stream(&self.sock_addr, &self.server_info_file)?;
let handler = self.map_svc.take().unwrap();
let map_svc = MapService { handler };
let map_svc = map_server::MapServer::new(map_svc)
let map_svc = proto::map_server::MapServer::new(map_svc)
.max_encoding_message_size(self.max_message_size)
.max_decoding_message_size(self.max_message_size);

Expand Down Expand Up @@ -220,7 +217,7 @@ mod tests {
use tower::service_fn;

use crate::map;
use crate::map::mapper::map_client::MapClient;
use crate::map::proto::map_client::MapClient;
use tempfile::TempDir;
use tokio::sync::oneshot;
use tonic::transport::Uri;
Expand Down Expand Up @@ -267,7 +264,7 @@ mod tests {
.await?;

let mut client = MapClient::new(channel);
let request = tonic::Request::new(map::mapper::MapRequest {
let request = tonic::Request::new(map::proto::MapRequest {
keys: vec!["first".into(), "second".into()],
value: "hello".into(),
watermark: Some(prost_types::Timestamp::default()),
Expand Down
18 changes: 8 additions & 10 deletions src/sideinput.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use tonic::{async_trait, Request, Response, Status};

use crate::sideinput::sideinputer::{side_input_server, ReadyResponse, SideInputResponse};

mod sideinputer {
mod proto {
tonic::include_proto!("sideinput.v1");
}

Expand All @@ -16,21 +14,21 @@ pub trait SideInputer {
}

#[async_trait]
impl<T> side_input_server::SideInput for SideInputService<T>
impl<T> proto::side_input_server::SideInput for SideInputService<T>
where
T: SideInputer + Send + Sync + 'static,
{
async fn retrieve_side_input(
&self,
_: Request<()>,
) -> Result<Response<SideInputResponse>, Status> {
) -> Result<Response<proto::SideInputResponse>, Status> {
let msg = self.handler.retrieve_sideinput().await;
let si = match msg {
Some(value) => SideInputResponse {
Some(value) => proto::SideInputResponse {
value,
no_broadcast: false,
},
None => SideInputResponse {
None => proto::SideInputResponse {
value: Vec::new(),
no_broadcast: true,
},
Expand All @@ -39,8 +37,8 @@ where
Ok(Response::new(si))
}

async fn is_ready(&self, _: Request<()>) -> Result<Response<ReadyResponse>, Status> {
Ok(Response::new(ReadyResponse { ready: true }))
async fn is_ready(&self, _: Request<()>) -> Result<Response<proto::ReadyResponse>, Status> {
Ok(Response::new(proto::ReadyResponse { ready: true }))
}
}

Expand All @@ -58,7 +56,7 @@ where
let si_svc = SideInputService { handler: m };

tonic::transport::Server::builder()
.add_service(side_input_server::SideInputServer::new(si_svc))
.add_service(proto::side_input_server::SideInputServer::new(si_svc))
.serve_with_incoming(listener)
.await
.map_err(Into::into)
Expand Down
35 changes: 16 additions & 19 deletions src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,9 @@ use chrono::{DateTime, Utc};
use tokio::sync::{mpsc, oneshot};
use tonic::{Request, Status, Streaming};

use crate::sink::sinker_grpc::{
sink_response,
sink_server::{Sink, SinkServer},
ReadyResponse, SinkRequest as RPCSinkRequest, SinkResponse,
};

use crate::shared;

mod sinker_grpc {
mod proto {
tonic::include_proto!("sink.v1");
}

Expand Down Expand Up @@ -93,8 +87,8 @@ pub struct SinkRequest {
pub id: String,
}

impl From<RPCSinkRequest> for SinkRequest {
fn from(sr: RPCSinkRequest) -> Self {
impl From<proto::SinkRequest> for SinkRequest {
fn from(sr: proto::SinkRequest) -> Self {
Self {
keys: sr.keys,
value: sr.value,
Expand All @@ -116,7 +110,7 @@ pub struct Response {
pub err: String,
}

impl From<Response> for sink_response::Result {
impl From<Response> for proto::sink_response::Result {
fn from(r: Response) -> Self {
Self {
id: r.id,
Expand All @@ -127,14 +121,14 @@ impl From<Response> for sink_response::Result {
}

#[tonic::async_trait]
impl<T> Sink for SinkService<T>
impl<T> proto::sink_server::Sink for SinkService<T>
where
T: Sinker + Send + Sync + 'static,
{
async fn sink_fn(
&self,
request: Request<Streaming<RPCSinkRequest>>,
) -> Result<tonic::Response<SinkResponse>, Status> {
request: Request<Streaming<proto::SinkRequest>>,
) -> Result<tonic::Response<proto::SinkResponse>, Status> {
let mut stream = request.into_inner();

// TODO: what should be the idle buffer size?
Expand All @@ -160,13 +154,16 @@ where
// wait for the sink handle to respond
let responses = sink_handle.await;

Ok(tonic::Response::new(SinkResponse {
Ok(tonic::Response::new(proto::SinkResponse {
results: responses.into_iter().map(|r| r.into()).collect(),
}))
}

async fn is_ready(&self, _: Request<()>) -> Result<tonic::Response<ReadyResponse>, Status> {
Ok(tonic::Response::new(ReadyResponse { ready: true }))
async fn is_ready(
&self,
_: Request<()>,
) -> Result<tonic::Response<proto::ReadyResponse>, Status> {
Ok(tonic::Response::new(proto::ReadyResponse { ready: true }))
}
}

Expand Down Expand Up @@ -239,7 +236,7 @@ impl<T> Server<T> {
let listener = shared::create_listener_stream(&self.sock_addr, &self.server_info_file)?;
let handler = self.svc.take().unwrap();
let svc = SinkService { handler };
let svc = SinkServer::new(svc)
let svc = proto::sink_server::SinkServer::new(svc)
.max_encoding_message_size(self.max_message_size)
.max_decoding_message_size(self.max_message_size);

Expand Down Expand Up @@ -272,7 +269,7 @@ mod tests {
use tower::service_fn;

use crate::sink;
use crate::sink::sinker_grpc::sink_client::SinkClient;
use crate::sink::proto::sink_client::SinkClient;
use tempfile::TempDir;
use tokio::sync::oneshot;
use tonic::transport::Uri;
Expand Down Expand Up @@ -345,7 +342,7 @@ mod tests {
.await?;

let mut client = SinkClient::new(channel);
let request = sink::sinker_grpc::SinkRequest {
let request = sink::proto::SinkRequest {
keys: vec!["first".into(), "second".into()],
value: "hello".into(),
watermark: Some(prost_types::Timestamp::default()),
Expand Down
Loading