Skip to content

Commit

Permalink
Implement Provider Actuation
Browse files Browse the repository at this point in the history
  • Loading branch information
wba2hi committed Sep 26, 2024
1 parent c9f1533 commit 1db79ba
Show file tree
Hide file tree
Showing 6 changed files with 595 additions and 35 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion databroker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,14 @@ axum = { version = "0.6.20", optional = true, features = ["ws"] }
futures = { version = "0.3.28", optional = true }
chrono = { version = "0.4.31", optional = true, features = ["std"] }
uuid = { version = "1.4.1", optional = true, features = ["v4"] }
async-trait = "0.1.82"

# systemd related dependency, only relevant on linux systems
[target.'cfg(target_os = "linux")'.dependencies]
sd-notify = "0.4.1"

[features]
default = ["tls"]
default = ["tls", "dep:futures"]
tls = ["tonic/tls"]
jemalloc = ["dep:jemallocator"]
viss = ["dep:axum", "dep:chrono", "dep:futures", "dep:uuid"]
Expand Down
208 changes: 206 additions & 2 deletions databroker/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ pub struct Database {

#[derive(Default)]
pub struct Subscriptions {
actuation_subscriptions: Vec<ActuationSubscription>,
query_subscriptions: Vec<QuerySubscription>,
change_subscriptions: Vec<ChangeSubscription>,
}
Expand Down Expand Up @@ -179,6 +180,23 @@ pub struct DataBroker {
shutdown_trigger: broadcast::Sender<()>,
}

#[async_trait::async_trait]
pub trait ActuationProvider {
async fn actuate(&self, actuation_changes: Vec<ActuationChange>) -> Result<(), tonic::Status>;
}

#[derive(Clone)]
pub struct ActuationChange {
pub id: i32,
pub data_value: DataValue,
}

pub struct ActuationSubscription {
vss_ids: Vec<i32>,
actuation_provider: Box<dyn ActuationProvider + Send + Sync + 'static>,
permissions: Permissions,
}

pub struct QuerySubscription {
query: query::CompiledQuery,
sender: mpsc::Sender<QueryResponse>,
Expand Down Expand Up @@ -630,6 +648,10 @@ pub enum SuccessfulUpdate {
}

impl Subscriptions {
pub fn add_actuation_subscription(&mut self, subscription: ActuationSubscription) {
self.actuation_subscriptions.push(subscription);
}

pub fn add_query_subscription(&mut self, subscription: QuerySubscription) {
self.query_subscriptions.push(subscription)
}
Expand Down Expand Up @@ -679,11 +701,13 @@ impl Subscriptions {
}

pub fn clear(&mut self) {
self.actuation_subscriptions.clear();
self.query_subscriptions.clear();
self.change_subscriptions.clear();
}

pub fn cleanup(&mut self) {
// TODO how to cleanup actuation_subscriptions?
self.query_subscriptions.retain(|sub| {
if sub.sender.is_closed() {
info!("Subscriber gone: removing subscription");
Expand Down Expand Up @@ -1561,6 +1585,185 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
Err(e) => Err(QueryError::CompilationError(format!("{e:?}"))),
}
}

pub async fn provide_actuation(
&self,
vss_ids: Vec<i32>,
actuation_provider: Box<dyn ActuationProvider + Send + Sync + 'static>,
) -> Result<(), tonic::Status> {
for vss_id in vss_ids.clone() {
let result_entry = self.get_entry_by_id(vss_id).await;
match result_entry {
Ok(entry) => {
let vss_path = entry.metadata.path;
let result_can_write_actuator = self.permissions.can_write_actuator_target(&vss_path);
if result_can_write_actuator.is_err() {
let can_write_actuator = result_can_write_actuator.unwrap_err();
let message = format!("Can not provide actuation for vss_path '{}' due to permission error {:?}", vss_path, can_write_actuator);
return Err(tonic::Status::permission_denied(message));
}
},
Err(error) => {
let message = format!("Could not resolve vss_path for vss_id {}: {:?}", vss_id, error);
return Err(tonic::Status::invalid_argument(message));
},
}
}

let provided_vss_ids: Vec<i32> = self.broker.subscriptions.blocking_read().actuation_subscriptions.iter().flat_map(|subscription| subscription.vss_ids.clone()).collect();
let intersection: Vec<_> = vss_ids.iter().filter(|&x| provided_vss_ids.contains(x)).collect();
if !intersection.is_empty() {
let message = format!("ActuationProvider(s) for the following vss_ids already registered: {:?}", intersection);
return Err(tonic::Status::invalid_argument(message));
}

let actuation_subscription: ActuationSubscription = ActuationSubscription {
vss_ids,
actuation_provider,
permissions: self.permissions.clone(),
};
self.broker.subscriptions.blocking_write().add_actuation_subscription(actuation_subscription);

Ok(())
}

async fn map_actuation_changes_by_vss_id(
&self,
actuation_changes: Vec<ActuationChange>
) -> HashMap<i32, Vec<ActuationChange>> {
let mut actuation_changes_per_vss_id: HashMap<i32, Vec<ActuationChange>> = HashMap::new();
for ele in actuation_changes {
let vss_id = ele.id;

let opt_vss_ids = actuation_changes_per_vss_id.get_mut(&vss_id);
match opt_vss_ids {
Some(vss_ids) => {
vss_ids.push(ele.clone());
},
None => {
let vec = vec![ele.clone()];
actuation_changes_per_vss_id.insert(vss_id, vec);
},
}
}
return actuation_changes_per_vss_id;
}

pub async fn batch_actuate(
&self,
actuation_changes: Vec<ActuationChange>
) -> Result<(), tonic::Status> {
let actuation_subscriptions = &self.broker.subscriptions.blocking_read().actuation_subscriptions;

for actuation_change in &actuation_changes {
let vss_id = actuation_change.id;
let result_entry = self.get_entry_by_id(vss_id).await;
match result_entry {
Ok(entry) => {
let vss_path = entry.metadata.path;
let result_can_write_actuator = self.permissions.can_write_actuator_target(&vss_path);
if result_can_write_actuator.is_err() {
let can_write_actuator = result_can_write_actuator.unwrap_err();
let message = format!("Can not actuate vss_path '{}' due to permission error {:?}", vss_path, can_write_actuator);
return Err(tonic::Status::permission_denied(message));
}
},
Err(error) => {
let message = format!("Could not resolve vss_path for vss_id {}: {:?}", vss_id, error);
return Err(tonic::Status::invalid_argument(message));
},
}
}

let actuation_changes_per_vss_id = &self.map_actuation_changes_by_vss_id(actuation_changes).await;
for ele in actuation_changes_per_vss_id {
let vss_id = ele.0.clone();
let actuation_changes = ele.1.clone();

let opt_actuation_subscription = actuation_subscriptions.iter().find(|subscription| subscription.vss_ids.contains(&vss_id));
match opt_actuation_subscription {
Some(actuation_subscription) => {
actuation_subscription.actuation_provider.actuate(actuation_changes).await?
},
None => {
let message = format!("No actuation provider available for vss_id: {}", vss_id);
return Err(tonic::Status::unavailable(message));
},
}
}

return Ok(())
}

pub async fn actuate(
&self,
vss_id: &i32,
data_value: &DataValue,
) -> Result<(), tonic::Status> {

let vss_id = vss_id.clone();
let result_entry = self.get_entry_by_id(vss_id).await;
match result_entry {
Ok(entry) => {
let vss_path = entry.metadata.path;
let result_can_write_actuator = self.permissions.can_write_actuator_target(&vss_path);
if result_can_write_actuator.is_err() {
let can_write_actuator = result_can_write_actuator.unwrap_err();
let message = format!("Can not actuate vss_path '{}' due to permission error {:?}", vss_path, can_write_actuator);
return Err(tonic::Status::permission_denied(message));
}
},
Err(error) => {
let message = format!("Could not resolve vss_path for vss_id {}: {:?}", vss_id, error);
return Err(tonic::Status::invalid_argument(message));
},
}

let entry_update = EntryUpdate {
path: None,
datapoint: None,
actuator_target: Some(Some(Datapoint {
ts: SystemTime::now(),
source_ts: None,
value: data_value.clone(),
})),
entry_type: None,
data_type: None,
description: None,
allowed: None,
unit: None,
};

let entry_updates = [(vss_id.clone(), entry_update)];
let legacy_result = self.update_entries(entry_updates).await;
if legacy_result.is_err() {
let update_errors = legacy_result.unwrap_err();
let opt_error = update_errors.get(0);
if opt_error.is_some() {
let error = opt_error.unwrap();
let message = format!("Could not set actuator target for vss_id {}: {:?}", vss_id, error.1);
warn!(message);
}
}

let guard = self.broker.subscriptions.blocking_read();
let opt_actuation_subscription = guard.actuation_subscriptions.iter().find(|subscription|subscription.vss_ids.contains(&vss_id));
match opt_actuation_subscription {
Some(ref actuation_subscription) => {
actuation_subscription.actuation_provider.actuate(vec![
ActuationChange {
id: vss_id.clone(),
data_value: data_value.clone()
},
]
).await
},
None => {
let message = format!("No actuation provider found for vss_id {}", vss_id);
return Err(tonic::Status::new(tonic::Code::Unavailable, message))
}
}
}
}

impl DataBroker {
Expand Down Expand Up @@ -1588,13 +1791,14 @@ impl DataBroker {
pub fn start_housekeeping_task(&self) {
info!("Starting housekeeping task");
let subscriptions = self.subscriptions.clone();

tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));

loop {
interval.tick().await;
// Cleanup dropped subscriptions
subscriptions.write().await.cleanup();

subscriptions.write().await.cleanup(); // Cleanup dropped subscriptions
}
});
}
Expand Down
Loading

0 comments on commit 1db79ba

Please sign in to comment.