From 6c81dbd0ffc4903981b98ba9f3c476bcd46addf8 Mon Sep 17 00:00:00 2001 From: lukasmittag Date: Wed, 4 Sep 2024 16:29:13 +0200 Subject: [PATCH] Adapt to changes of API v2 that came recently in --- databroker/src/broker.rs | 31 +++++++ databroker/src/grpc/kuksa_val_v2/val.rs | 110 ++++++++++++------------ 2 files changed, 85 insertions(+), 56 deletions(-) diff --git a/databroker/src/broker.rs b/databroker/src/broker.rs index 90c3fb86..cbeb9c8c 100644 --- a/databroker/src/broker.rs +++ b/databroker/src/broker.rs @@ -43,6 +43,37 @@ pub enum UpdateError { PermissionExpired, } +impl UpdateError { + pub fn to_status_with_code(&self, id: &i32) -> tonic::Status { + match self { + UpdateError::NotFound => tonic::Status::new( + tonic::Code::NotFound, + format!("Signal not found (id: {})", id), + ), + UpdateError::WrongType => tonic::Status::new( + tonic::Code::InvalidArgument, + format!("Wrong type provided (id: {})", id), + ), + UpdateError::OutOfBounds => tonic::Status::new( + tonic::Code::OutOfRange, + format!("Index out of bounds (id: {})", id), + ), + UpdateError::UnsupportedType => tonic::Status::new( + tonic::Code::Unimplemented, + format!("Unsupported type (id: {})", id), + ), + UpdateError::PermissionDenied => tonic::Status::new( + tonic::Code::PermissionDenied, + format!("Permission denied (id: {})", id), + ), + UpdateError::PermissionExpired => tonic::Status::new( + tonic::Code::Unauthenticated, + format!("Permission expired (id: {})", id), + ), + } + } +} + #[derive(Debug, Clone)] pub enum ReadError { NotFound, diff --git a/databroker/src/grpc/kuksa_val_v2/val.rs b/databroker/src/grpc/kuksa_val_v2/val.rs index 85068220..95f2061b 100644 --- a/databroker/src/grpc/kuksa_val_v2/val.rs +++ b/databroker/src/grpc/kuksa_val_v2/val.rs @@ -93,14 +93,21 @@ impl proto::val_server::Val for broker::DataBroker { let request = request.into_inner(); - let signal_ids = request.signal_ids; - let size = signal_ids.len(); + let signal_paths = request.signal_paths; + let size = signal_paths.len(); let mut valid_requests: HashMap> = HashMap::with_capacity(size); - for signal_id in signal_ids { + for path in signal_paths { valid_requests.insert( - match get_signal_id(Some(signal_id), &broker).await { + match get_signal( + Some(proto::SignalId { + signal: Some(proto::signal_id::Signal::Path(path)), + }), + &broker, + ) + .await + { Ok(signal_id) => signal_id, Err(err) => return Err(err), }, @@ -126,6 +133,25 @@ impl proto::val_server::Val for broker::DataBroker { } } + type SubscribeIdStream = Pin< + Box< + dyn Stream> + + Send + + Sync + + 'static, + >, + >; + + async fn subscribe_id( + &self, + _request: tonic::Request, + ) -> Result, tonic::Status> { + Err(tonic::Status::new( + tonic::Code::Unimplemented, + "Unimplemented", + )) + } + async fn actuate( &self, _request: tonic::Request, @@ -278,7 +304,7 @@ impl proto::val_server::Val for broker::DataBroker { let mut updates: HashMap = HashMap::with_capacity(1); updates.insert( - match get_signal_id(request.signal_id, &broker).await { + match get_signal(request.signal_id, &broker).await { Ok(signal_id) => signal_id, Err(err) => return Err(err), }, @@ -295,18 +321,12 @@ impl proto::val_server::Val for broker::DataBroker { ); match broker.update_entries(updates).await { - Ok(()) => Ok(tonic::Response::new(proto::PublishValueResponse { - error: None, - })), + Ok(()) => Ok(tonic::Response::new(proto::PublishValueResponse {})), Err(errors) => { if errors.is_empty() { - Ok(tonic::Response::new(proto::PublishValueResponse { - error: None, - })) - } else if let Some((_, err)) = errors.first() { - Ok(tonic::Response::new(proto::PublishValueResponse { - error: Some(err.into()), - })) + Ok(tonic::Response::new(proto::PublishValueResponse {})) + } else if let Some((id, err)) = errors.first() { + Err(err.to_status_with_code(id)) } else { Err(tonic::Status::internal( "There is no error provided for the entry", @@ -573,7 +593,7 @@ async fn publish_values( } } -async fn get_signal_id( +async fn get_signal( signal_id: Option, broker: &AuthorizedAccess<'_, '_>, ) -> Result { @@ -724,9 +744,7 @@ mod tests { Ok(response) => { // Handle the successful response let publish_response = response.into_inner(); - - // Check if there is an error in the response - assert_eq!(publish_response.error, None); + assert_eq!(publish_response, proto::PublishValueResponse {}) } Err(status) => { // Handle the error from the publish_value function @@ -799,7 +817,8 @@ mod tests { let f = false; let broker = DataBroker::default(); let authorized_access = broker.authorized_access(&permissions::ALLOW_ALL); - let entry_id_1 = authorized_access + + let entry_id = authorized_access .add_entry( "test.datapoint1".to_owned(), broker::DataType::Bool, @@ -812,30 +831,8 @@ mod tests { .await .unwrap(); - let entry_id_2 = authorized_access - .add_entry( - "test.datapoint2".to_owned(), - broker::DataType::Bool, - broker::ChangeType::OnChange, - broker::EntryType::Sensor, - "Test datapoint 2".to_owned(), - None, - None, - ) - .await - .unwrap(); - let mut request = tonic::Request::new(proto::SubscribeRequest { - signal_ids: vec![ - proto::SignalId { - signal: Some(proto::signal_id::Signal::Path( - "test.datapoint1".to_string(), - )), - }, - proto::SignalId { - signal: Some(proto::signal_id::Signal::Id(entry_id_2)), - }, - ], + signal_paths: vec!["test.datapoint1".to_string()], }); request @@ -849,9 +846,9 @@ mod tests { rt.block_on(broker.subscribe(request)) }); - let mut request_1 = tonic::Request::new(proto::PublishValueRequest { + let mut request = tonic::Request::new(proto::PublishValueRequest { signal_id: Some(proto::SignalId { - signal: Some(proto::signal_id::Signal::Id(entry_id_1)), + signal: Some(proto::signal_id::Signal::Id(entry_id)), }), data_point: Some(proto::Datapoint { timestamp: None, @@ -860,15 +857,16 @@ mod tests { })), }), }); - request_1 + request .extensions_mut() .insert(permissions::ALLOW_ALL.clone()); - match broker.publish_value(request_1).await { + match broker.publish_value(request).await { Ok(response) => { // Handle the successful response let publish_response = response.into_inner(); - assert_eq!(publish_response.error, None); + // Check if there is an error in the response + assert_eq!(publish_response, proto::PublishValueResponse {}); } Err(status) => { // Handle the error from the publish_value function @@ -876,27 +874,27 @@ mod tests { } } - let mut request_2 = tonic::Request::new(proto::PublishValueRequest { + let mut request_false = tonic::Request::new(proto::PublishValueRequest { signal_id: Some(proto::SignalId { - signal: Some(proto::signal_id::Signal::Id(entry_id_2)), + signal: Some(proto::signal_id::Signal::Id(entry_id)), }), data_point: Some(proto::Datapoint { timestamp: None, value_state: Some(proto::datapoint::ValueState::Value(proto::Value { - typed_value: Some(proto::value::TypedValue::Bool(true)), + typed_value: Some(proto::value::TypedValue::Bool(false)), })), }), }); - request_2 + request_false .extensions_mut() .insert(permissions::ALLOW_ALL.clone()); - match broker.publish_value(request_2).await { + match broker.publish_value(request_false).await { Ok(response) => { // Handle the successful response let publish_response = response.into_inner(); // Check if there is an error in the response - assert_eq!(publish_response.error, None); + assert_eq!(publish_response, proto::PublishValueResponse {}); } Err(status) => { // Handle the error from the publish_value function @@ -931,12 +929,12 @@ mod tests { check_stream_next(&item, expected_entries.clone()).await; expected_entries.clear(); expected_entries.insert( - "test.datapoint2".to_string(), + "test.datapoint1".to_string(), proto::Datapoint { timestamp: None, value_state: Some(proto::datapoint::ValueState::Value( proto::Value { - typed_value: Some(proto::value::TypedValue::Bool(true)), + typed_value: Some(proto::value::TypedValue::Bool(false)), }, )), },