Skip to content

Commit

Permalink
Add message metadata to RpcHandler (#221)
Browse files Browse the repository at this point in the history
  • Loading branch information
AnotherDaniel authored Dec 10, 2024
1 parent a4d84a0 commit 66830f3
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 10 deletions.
3 changes: 2 additions & 1 deletion examples/simple_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use up_rust::{
ServiceInvocationError, UPayload,
},
local_transport::LocalTransport,
LocalUriProvider, StaticUriProvider,
LocalUriProvider, StaticUriProvider, UAttributes,
};

struct EchoOperation {}
Expand All @@ -34,6 +34,7 @@ impl RequestHandler for EchoOperation {
async fn handle_request(
&self,
_resource_id: u16,
_message_attributes: &UAttributes,
request_payload: Option<UPayload>,
) -> Result<Option<UPayload>, ServiceInvocationError> {
if let Some(req_payload) = request_payload {
Expand Down
25 changes: 17 additions & 8 deletions src/communication/in_memory_rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,11 @@ impl RequestListener {

debug!(ttl = request_timeout, id = %request_id, "processing RPC request");

let invocation_result_future =
request_handler_clone.handle_request(resource_id, request_payload);
let invocation_result_future = request_handler_clone.handle_request(
resource_id,
&request_message.attributes,
request_payload,
);
let outcome = tokio::time::timeout(
Duration::from_millis(request_timeout as u64),
invocation_result_future,
Expand Down Expand Up @@ -552,19 +555,24 @@ mod tests {
};
let message_id = UUID::build();
let message_id_clone = message_id.clone();
let message_source = UUri::try_from("up://localhost/A100/1/0").unwrap();
let message_source_clone = message_source.clone();

request_handler
.expect_handle_request()
.once()
.withf(|resource_id, request_payload| {
.withf(move |resource_id, message_attributes, request_payload| {
if let Some(pl) = request_payload {
let message_source = message_attributes.source.as_ref().unwrap();
let msg: StringValue = pl.extract_protobuf().unwrap();
msg.value == *"Hello" && *resource_id == 0x7000_u16
msg.value == *"Hello"
&& *resource_id == 0x7000_u16
&& *message_source == message_source_clone
} else {
false
}
})
.returning(|_resource_id, _request_payload| {
.returning(|_resource_id, _message_attributes, _request_payload| {
let response_payload = UPayload::try_from_protobuf(StringValue {
value: "Hello World".to_string(),
..Default::default()
Expand Down Expand Up @@ -597,7 +605,7 @@ mod tests {
});
let request_message = UMessageBuilder::request(
UUri::try_from("up://localhost/A200/1/7000").unwrap(),
UUri::try_from("up://localhost/A100/1/0").unwrap(),
message_source,
5_000,
)
.with_message_id(message_id)
Expand Down Expand Up @@ -625,8 +633,8 @@ mod tests {
request_handler
.expect_handle_request()
.once()
.withf(|resource_id, _request_payload| *resource_id == 0x7000_u16)
.returning(|_resource_id, _request_payload| {
.withf(|resource_id, _message_attributes, _request_payload| *resource_id == 0x7000_u16)
.returning(|_resource_id, _message_attributes, _request_payload| {
Err(ServiceInvocationError::NotFound(
"no such object".to_string(),
))
Expand Down Expand Up @@ -684,6 +692,7 @@ mod tests {
async fn handle_request(
&self,
resource_id: u16,
_message_attributes: &UAttributes,
_request_payload: Option<UPayload>,
) -> Result<Option<UPayload>, ServiceInvocationError> {
assert_eq!(resource_id, 0x7000);
Expand Down
4 changes: 3 additions & 1 deletion src/communication/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use async_trait::async_trait;
use protobuf::MessageFull;

use crate::communication::RegistrationError;
use crate::{UCode, UStatus, UUri};
use crate::{UAttributes, UCode, UStatus, UUri};

use super::{CallOptions, UPayload};

Expand Down Expand Up @@ -223,6 +223,7 @@ pub trait RequestHandler: Send + Sync {
/// # Arguments
///
/// * `resource_id` - The resource identifier of the method to invoke.
/// * `message_attributes` - Any metadata that is associated with the request message.
/// * `request_payload` - The raw payload that contains the input data for the method.
///
/// # Returns
Expand All @@ -235,6 +236,7 @@ pub trait RequestHandler: Send + Sync {
async fn handle_request(
&self,
resource_id: u16,
message_attributes: &UAttributes,
request_payload: Option<UPayload>,
) -> Result<Option<UPayload>, ServiceInvocationError>;
}
Expand Down

0 comments on commit 66830f3

Please sign in to comment.