Skip to content

Commit

Permalink
Merge pull request #20 from matthewd0123/feature/streamer_publish
Browse files Browse the repository at this point in the history
Add publish to up-streamer rust
  • Loading branch information
PLeVasseur authored Aug 12, 2024
2 parents 06af0fa + 7c73d07 commit 15ffc8b
Show file tree
Hide file tree
Showing 32 changed files with 1,793 additions and 49 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@ lcov.info
tarpaulin-report.html

.idea/
.vscode/launch.json
.vscode/settings.json
32 changes: 32 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ members = [
"utils/hello-world-protos",
"utils/integration-test-utils",
"up-linux-streamer",
"up-streamer",
]
"up-streamer", "subscription-cache", "utils/usubscription-static-file"]

[workspace.package]
rust-version = "1.76.0"
Expand Down
23 changes: 23 additions & 0 deletions subscription-cache/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
name = "subscription-cache"
rust-version.workspace = true
version.workspace = true
repository.workspace = true
homepage.workspace = true
edition.workspace = true
keywords.workspace = true
license.workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-std = { workspace = true, features = ["unstable"] }
async-trait = { workspace = true }
env_logger = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
uuid = { workspace = true }
serde_json = { workspace = true }
serde = { workspace = true }
up-rust = { workspace = true, features = ["usubscription"] }
protobuf = { version = "3.3", features = ["with-bytes"] }
150 changes: 150 additions & 0 deletions subscription-cache/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/********************************************************************************
* Copyright (c) 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::sync::Mutex;
use up_rust::core::usubscription::{
EventDeliveryConfig, FetchSubscriptionsResponse, SubscribeAttributes, SubscriberInfo,
SubscriptionStatus,
};
use up_rust::UUri;
use up_rust::{UCode, UStatus};

pub type SubscribersMap = Mutex<HashMap<String, HashSet<SubscriptionInformation>>>;

// Tracks subscription information inside the SubscriptionCache
pub struct SubscriptionInformation {
pub topic: UUri,
pub subscriber: SubscriberInfo,
pub status: SubscriptionStatus,
pub attributes: SubscribeAttributes,
pub config: EventDeliveryConfig,
}

// Will be moving this to up-rust
// Issue: https://github.com/eclipse-uprotocol/up-rust/issues/178
impl Eq for SubscriptionInformation {}

impl PartialEq for SubscriptionInformation {
fn eq(&self, other: &Self) -> bool {
self.subscriber == other.subscriber
}
}

impl Hash for SubscriptionInformation {
fn hash<H: Hasher>(&self, state: &mut H) {
self.subscriber.hash(state);
}
}

impl Clone for SubscriptionInformation {
fn clone(&self) -> Self {
Self {
topic: self.topic.clone(),
subscriber: self.subscriber.clone(),
status: self.status.clone(),
attributes: self.attributes.clone(),
config: self.config.clone(),
}
}
}

pub struct SubscriptionCache {
subscription_cache_map: SubscribersMap,
}

impl Default for SubscriptionCache {
fn default() -> Self {
Self {
subscription_cache_map: Mutex::new(HashMap::new()),
}
}
}

/// A [`SubscriptionCache`] is used to store and manage subscriptions to
/// topics. It is kept local to the streamer. The streamer will receive updates
/// from the subscription service, and update the SubscriptionCache accordingly.
impl SubscriptionCache {
pub fn new(subscription_cache_map: FetchSubscriptionsResponse) -> Result<Self, UStatus> {
let mut subscription_cache_hash_map = HashMap::new();
for subscription in subscription_cache_map.subscriptions {
let topic = subscription.topic.into_option().ok_or_else(|| {
UStatus::fail_with_code(
UCode::INVALID_ARGUMENT,
"Unable to retrieve topic".to_string(),
)
})?;
let subscriber = subscription.subscriber.into_option().ok_or_else(|| {
UStatus::fail_with_code(
UCode::INVALID_ARGUMENT,
"Unable to retrieve topic".to_string(),
)
})?;
// At minimum, topic and subscriber are required to track a subscription.
// status, attributes, and config can be used either within the subscription service,
// or for tracking pending subscriptions, but they are not required for forwarding
// subscriptions across the streamer, so if not included, they will be set to default.
let status = if let Some(status) = subscription.status.into_option() {
status
} else {
println!("Unable to parse status from subscription, setting as default");
SubscriptionStatus::default()
};
let attributes = if let Some(attributes) = subscription.attributes.into_option() {
attributes
} else {
println!("Unable to parse attributes from subscription, setting as default");
SubscribeAttributes::default()
};
let config = if let Some(config) = subscription.config.into_option() {
config
} else {
println!("Unable to parse config from subscription, setting as default");
EventDeliveryConfig::default()
};
// Create new hashset if the key does not exist and insert the subscription
let subscription_information = SubscriptionInformation {
topic: topic.clone(),
subscriber: subscriber.clone(),
status,
attributes,
config,
};
let subscriber_authority_name = match subscription_information.subscriber.uri.as_ref() {
Some(uri) => uri.authority_name.clone(),
None => {
return Err(UStatus::fail_with_code(
UCode::INVALID_ARGUMENT,
"Unable to retrieve authority name",
))
}
};
subscription_cache_hash_map
.entry(subscriber_authority_name)
.or_insert_with(HashSet::new)
.insert(subscription_information);
}
Ok(Self {
subscription_cache_map: Mutex::new(subscription_cache_hash_map),
})
}

pub fn fetch_cache_entry(&self, entry: String) -> Option<HashSet<SubscriptionInformation>> {
let map = match self.subscription_cache_map.lock() {
Ok(map) => map,
Err(_) => return None,
};
map.get(&entry).cloned()
}
}
2 changes: 2 additions & 0 deletions up-linux-streamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ up-streamer = { path = "../up-streamer" }
up-transport-zenoh = { git = "https://github.com/eclipse-uprotocol/up-transport-zenoh-rust.git", rev = "7c839e7a94f526a82027564a609f48a79a3f4eae" }
up-transport-vsomeip = { git = "https://github.com/eclipse-uprotocol/up-transport-vsomeip-rust.git", rev = "acbb0d0c9b8b48dd35c74f461e97151f1e922000", default-features = false }
zenoh = { version = "0.11.0-rc.3", features = ["unstable"]}
usubscription-static-file = {path = "../utils/usubscription-static-file"}
chrono = "0.4"

[dev-dependencies]
hello-world-protos = { path = "../utils/hello-world-protos" }
8 changes: 8 additions & 0 deletions up-linux-streamer/DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@
// Used when initializing host transport
authority: "linux"
},
usubscription_config: {
// Lists the path to the subscription file when using static file
file_path: "./utils/usubscription-static-file/static-configs/testdata.json"
},
zenoh_transport_config: {
// Configuration file which is where zenoh config information is stored
config_file: "./up-linux-streamer/ZENOH_CONFIG.json5"
},
someip_config: {
// Determines the authority_name of the mechatronics network
// Used when initializing SOME/IP transport
Expand Down
2 changes: 2 additions & 0 deletions up-linux-streamer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ Concrete implementation of a uStreamer as a binary.

Reference the `DEFAULT_CONFIG.json5` configuration file to understand configuration options.

As well, the `ZENOH_CONFIG.json5` file is used to set Zenoh configurations. By default, it is only used to set listening endpoints, but can be used with more configurations according to [Zenoh's page on it](https://zenoh.io/docs/manual/configuration/#configuration-files).

### Bundled vsomeip or bring your own

The default is to build a bundled version of vsomeip for use by the `up-transport-vsomeip` crate.
Expand Down
Loading

0 comments on commit 15ffc8b

Please sign in to comment.