-
Notifications
You must be signed in to change notification settings - Fork 21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use samllvec to reduce heap usage, use inline functions and general improvements #115
base: main
Are you sure you want to change the base?
Changes from all commits
2dfb84e
4de7330
43c0e47
33e0d0b
1145279
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,8 @@ | |
use crate::permissions::{PermissionError, Permissions}; | ||
pub use crate::types; | ||
|
||
use smallvec::SmallVec; | ||
|
||
use crate::query; | ||
pub use crate::types::{ChangeType, DataType, DataValue, EntryType}; | ||
|
||
|
@@ -22,7 +24,7 @@ use tokio_stream::wrappers::BroadcastStream; | |
use tokio_stream::wrappers::ReceiverStream; | ||
use tokio_stream::{Stream, StreamExt}; | ||
|
||
use std::collections::{HashMap, HashSet}; | ||
use std::collections::HashMap; | ||
use std::convert::TryFrom; | ||
use std::sync::atomic::{AtomicI32, Ordering}; | ||
use std::sync::Arc; | ||
|
@@ -112,6 +114,10 @@ pub enum Field { | |
ActuatorTarget, | ||
MetadataUnit, | ||
} | ||
#[derive(Debug, Clone)] | ||
pub struct StackVecField { | ||
pub svec: SmallVec<[Field; 3]>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we expect 3 to be the "optimal" number because if I understand correctly after 3 elements it will store it anyways in the heap right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for |
||
} | ||
|
||
#[derive(Default)] | ||
pub struct Database { | ||
|
@@ -142,7 +148,7 @@ pub struct QueryField { | |
pub struct ChangeNotification { | ||
pub id: i32, | ||
pub update: EntryUpdate, | ||
pub fields: HashSet<Field>, | ||
pub fields: StackVecField, | ||
} | ||
|
||
#[derive(Debug, Default, Clone)] | ||
|
@@ -201,7 +207,7 @@ pub struct QuerySubscription { | |
} | ||
|
||
pub struct ChangeSubscription { | ||
entries: HashMap<i32, HashSet<Field>>, | ||
entries: HashMap<i32, StackVecField>, | ||
sender: broadcast::Sender<EntryUpdates>, | ||
permissions: Permissions, | ||
} | ||
|
@@ -233,7 +239,63 @@ pub struct EntryUpdate { | |
pub unit: Option<String>, | ||
} | ||
|
||
impl StackVecField { | ||
#[inline] | ||
pub fn new() -> Self { | ||
Self { | ||
svec: SmallVec::new(), | ||
} | ||
} | ||
|
||
#[inline] | ||
pub fn push(&mut self, value: Field) { | ||
self.svec.push(value); | ||
} | ||
|
||
#[inline] | ||
pub fn contains(&self, element: &Field) -> bool { | ||
self.svec.contains(element) | ||
} | ||
|
||
#[inline] | ||
pub fn extend_from_stack(&mut self, other: &StackVecField) { | ||
self.svec.extend(other.svec.iter().cloned()); | ||
} | ||
|
||
#[inline] | ||
pub fn with_elements(elements: SmallVec<[Field; 3]>) -> Self { | ||
Self { svec: elements } | ||
} | ||
|
||
#[inline] | ||
pub fn are_disjoint(&self, other: &StackVecField) -> bool { | ||
for item in &self.svec { | ||
if other.svec.contains(item) { | ||
return false; // Found a common element | ||
} | ||
} | ||
true // No common elements found | ||
} | ||
|
||
#[inline] | ||
pub fn is_empty(&self) -> bool { | ||
self.svec.is_empty() | ||
} | ||
|
||
#[inline] | ||
pub fn iter(&self) -> impl Iterator<Item = &Field> { | ||
self.svec.iter() | ||
} | ||
} | ||
|
||
impl Default for StackVecField { | ||
fn default() -> Self { | ||
Self::new() | ||
} | ||
} | ||
|
||
impl Entry { | ||
#[inline] | ||
pub fn diff(&self, mut update: EntryUpdate) -> EntryUpdate { | ||
if let Some(datapoint) = &update.datapoint { | ||
if self.metadata.change_type != ChangeType::Continuous { | ||
|
@@ -710,16 +772,17 @@ impl Entry { | |
self.lag_datapoint = self.datapoint.clone(); | ||
} | ||
|
||
pub fn apply(&mut self, update: EntryUpdate) -> HashSet<Field> { | ||
let mut changed = HashSet::new(); | ||
#[inline] | ||
pub fn apply(&mut self, update: EntryUpdate) -> StackVecField { | ||
let mut changed = StackVecField::new(); | ||
if let Some(datapoint) = update.datapoint { | ||
self.lag_datapoint = self.datapoint.clone(); | ||
self.datapoint = datapoint; | ||
changed.insert(Field::Datapoint); | ||
changed.push(Field::Datapoint); | ||
} | ||
if let Some(actuator_target) = update.actuator_target { | ||
self.actuator_target = actuator_target; | ||
changed.insert(Field::ActuatorTarget); | ||
changed.push(Field::ActuatorTarget); | ||
} | ||
|
||
if let Some(updated_allowed) = update.allowed { | ||
|
@@ -755,10 +818,23 @@ impl Subscriptions { | |
|
||
pub async fn notify( | ||
&self, | ||
changed: Option<&HashMap<i32, HashSet<Field>>>, | ||
changed: Option<&HashMap<i32, StackVecField>>, | ||
db: &Database, | ||
) -> Result<Option<HashMap<String, ()>>, NotificationError> { | ||
let mut error = None; | ||
|
||
for sub in &self.change_subscriptions { | ||
match sub.notify(changed, db).await { | ||
Ok(_) => {} | ||
Err(err) => error = Some(err), | ||
} | ||
} | ||
|
||
//Leave method here if error is none and query_subscription is empty | ||
if error.is_none() && self.query_subscriptions.is_empty() { | ||
return Ok(None); | ||
} | ||
|
||
let mut lag_updates: HashMap<String, ()> = HashMap::new(); | ||
for sub in &self.query_subscriptions { | ||
match sub.notify(changed, db).await { | ||
|
@@ -774,13 +850,6 @@ impl Subscriptions { | |
} | ||
} | ||
|
||
for sub in &self.change_subscriptions { | ||
match sub.notify(changed, db).await { | ||
Ok(_) => {} | ||
Err(err) => error = Some(err), | ||
} | ||
} | ||
|
||
match error { | ||
Some(err) => Err(err), | ||
None => { | ||
|
@@ -837,7 +906,7 @@ impl Subscriptions { | |
impl ChangeSubscription { | ||
async fn notify( | ||
&self, | ||
changed: Option<&HashMap<i32, HashSet<Field>>>, | ||
changed: Option<&HashMap<i32, StackVecField>>, | ||
db: &Database, | ||
) -> Result<(), NotificationError> { | ||
let db_read = db.authorized_read_access(&self.permissions); | ||
|
@@ -846,7 +915,7 @@ impl ChangeSubscription { | |
let mut matches = false; | ||
for (id, changed_fields) in changed { | ||
if let Some(fields) = self.entries.get(id) { | ||
if !fields.is_disjoint(changed_fields) { | ||
if !fields.are_disjoint(changed_fields) { | ||
matches = true; | ||
break; | ||
} | ||
|
@@ -858,25 +927,25 @@ impl ChangeSubscription { | |
let mut notifications = EntryUpdates::default(); | ||
for (id, changed_fields) in changed { | ||
if let Some(fields) = self.entries.get(id) { | ||
if !fields.is_disjoint(changed_fields) { | ||
if !fields.are_disjoint(changed_fields) { | ||
match db_read.get_entry_by_id(*id) { | ||
Ok(entry) => { | ||
let mut update = EntryUpdate::default(); | ||
let mut notify_fields = HashSet::new(); | ||
let mut notify_fields = StackVecField::new(); | ||
// TODO: Perhaps make path optional | ||
update.path = Some(entry.metadata.path.clone()); | ||
if changed_fields.contains(&Field::Datapoint) | ||
&& fields.contains(&Field::Datapoint) | ||
{ | ||
update.datapoint = Some(entry.datapoint.clone()); | ||
notify_fields.insert(Field::Datapoint); | ||
notify_fields.push(Field::Datapoint); | ||
} | ||
if changed_fields.contains(&Field::ActuatorTarget) | ||
&& fields.contains(&Field::ActuatorTarget) | ||
{ | ||
update.actuator_target = | ||
Some(entry.actuator_target.clone()); | ||
notify_fields.insert(Field::ActuatorTarget); | ||
notify_fields.push(Field::ActuatorTarget); | ||
} | ||
// fill unit field always | ||
update.unit.clone_from(&entry.metadata.unit); | ||
|
@@ -922,16 +991,16 @@ impl ChangeSubscription { | |
match db_read.get_entry_by_id(*id) { | ||
Ok(entry) => { | ||
let mut update = EntryUpdate::default(); | ||
let mut notify_fields = HashSet::new(); | ||
let mut notify_fields = StackVecField::new(); | ||
// TODO: Perhaps make path optional | ||
update.path = Some(entry.metadata.path.clone()); | ||
if fields.contains(&Field::Datapoint) { | ||
update.datapoint = Some(entry.datapoint.clone()); | ||
notify_fields.insert(Field::Datapoint); | ||
notify_fields.push(Field::Datapoint); | ||
} | ||
if fields.contains(&Field::ActuatorTarget) { | ||
update.actuator_target = Some(entry.actuator_target.clone()); | ||
notify_fields.insert(Field::ActuatorTarget); | ||
notify_fields.push(Field::ActuatorTarget); | ||
} | ||
notifications.updates.push(ChangeNotification { | ||
id: *id, | ||
|
@@ -989,7 +1058,7 @@ impl QuerySubscription { | |
} | ||
fn check_if_changes_match( | ||
query: &CompiledQuery, | ||
changed_origin: Option<&HashMap<i32, HashSet<Field>>>, | ||
changed_origin: Option<&HashMap<i32, StackVecField>>, | ||
db: &DatabaseReadAccess, | ||
) -> bool { | ||
match changed_origin { | ||
|
@@ -1039,7 +1108,7 @@ impl QuerySubscription { | |
} | ||
fn generate_input( | ||
&self, | ||
changed: Option<&HashMap<i32, HashSet<Field>>>, | ||
changed: Option<&HashMap<i32, StackVecField>>, | ||
db: &DatabaseReadAccess, | ||
) -> Option<impl ExecutionInput> { | ||
let id_used_in_query = QuerySubscription::check_if_changes_match(&self.query, changed, db); | ||
|
@@ -1055,7 +1124,7 @@ impl QuerySubscription { | |
|
||
async fn notify( | ||
&self, | ||
changed: Option<&HashMap<i32, HashSet<Field>>>, | ||
changed: Option<&HashMap<i32, StackVecField>>, | ||
db: &Database, | ||
) -> Result<Option<impl query::ExecutionInput>, NotificationError> { | ||
let db_read = db.authorized_read_access(&self.permissions); | ||
|
@@ -1190,6 +1259,11 @@ impl DatabaseReadAccess<'_, '_> { | |
self.db.entries.get(&id).map(|entry| &entry.metadata) | ||
} | ||
|
||
#[inline] | ||
pub fn contains_id(&self, id: i32) -> bool { | ||
self.db.entries.contains_key(&id) | ||
} | ||
|
||
pub fn get_metadata_by_path(&self, path: &str) -> Option<&Metadata> { | ||
let id = self.db.path_to_id.get(path)?; | ||
self.get_metadata_by_id(*id) | ||
|
@@ -1208,7 +1282,7 @@ impl DatabaseWriteAccess<'_, '_> { | |
&mut self, | ||
path: &str, | ||
update: EntryUpdate, | ||
) -> Result<HashSet<Field>, UpdateError> { | ||
) -> Result<StackVecField, UpdateError> { | ||
match self.db.path_to_id.get(path) { | ||
Some(id) => self.update(*id, update), | ||
None => Err(UpdateError::NotFound), | ||
|
@@ -1228,7 +1302,7 @@ impl DatabaseWriteAccess<'_, '_> { | |
} | ||
} | ||
|
||
pub fn update(&mut self, id: i32, update: EntryUpdate) -> Result<HashSet<Field>, UpdateError> { | ||
pub fn update(&mut self, id: i32, update: EntryUpdate) -> Result<StackVecField, UpdateError> { | ||
match self.db.entries.get_mut(&id) { | ||
Some(entry) => { | ||
if update.path.is_some() | ||
|
@@ -1493,6 +1567,15 @@ impl AuthorizedAccess<'_, '_> { | |
.cloned() | ||
} | ||
|
||
pub async fn contains_id(&self, id: i32) -> bool { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same name as function in line 1263 intended? |
||
self.broker | ||
.database | ||
.read() | ||
.await | ||
.authorized_read_access(self.permissions) | ||
.contains_id(id) | ||
} | ||
|
||
pub async fn get_metadata_by_path(&self, path: &str) -> Option<Metadata> { | ||
self.broker | ||
.database | ||
|
@@ -1569,7 +1652,7 @@ impl AuthorizedAccess<'_, '_> { | |
|
||
let cleanup_needed = { | ||
let changed = { | ||
let mut changed = HashMap::<i32, HashSet<Field>>::new(); | ||
let mut changed = HashMap::<i32, StackVecField>::new(); | ||
for (id, update) in updates { | ||
debug!("setting id {} to {:?}", id, update); | ||
match db_write.update(id, update) { | ||
|
@@ -1631,7 +1714,7 @@ impl AuthorizedAccess<'_, '_> { | |
|
||
pub async fn subscribe( | ||
&self, | ||
valid_entries: HashMap<i32, HashSet<Field>>, | ||
valid_entries: HashMap<i32, StackVecField>, | ||
buffer_size: Option<usize>, | ||
) -> Result<impl Stream<Item = EntryUpdates>, SubscriptionError> { | ||
if valid_entries.is_empty() { | ||
|
@@ -4262,7 +4345,10 @@ pub mod tests { | |
|
||
let mut stream = broker | ||
.subscribe( | ||
HashMap::from([(id1, HashSet::from([Field::Datapoint]))]), | ||
HashMap::from([( | ||
id1, | ||
StackVecField::with_elements(smallvec::smallvec![Field::Datapoint]), | ||
)]), | ||
buffer_size, | ||
) | ||
.await | ||
|
@@ -4371,7 +4457,10 @@ pub mod tests { | |
|
||
match broker | ||
.subscribe( | ||
HashMap::from([(id1, HashSet::from([Field::Datapoint]))]), | ||
HashMap::from([( | ||
id1, | ||
StackVecField::with_elements(smallvec::smallvec![Field::Datapoint]), | ||
)]), | ||
// 1001 is just outside valid range 0-1000 | ||
Some(1001), | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume you should update the lock file as well, or?