From adab652433ec15d31fc6c936c0e667ab2fffcf8d Mon Sep 17 00:00:00 2001 From: ValMobBIllich Date: Fri, 15 Nov 2024 16:35:50 +0100 Subject: [PATCH 1/2] added mqtt-entities and zenoh-mqtt streamer reference impl, moved impls to their own folder --- .gitignore | 2 + Cargo.lock | 103 +++++ Cargo.toml | 3 +- README.md | 33 +- .../Cargo.toml | 9 +- .../DEFAULT_CONFIG.json5 | 28 +- example-streamer-implementations/README.md | 153 +++++++ .../ZENOH_CONFIG.json5 | 0 .../src/bin/config/mod.rs | 14 +- .../src/bin/zenoh_mqtt.rs | 138 ++++++ .../src/bin/zenoh_someip.rs | 153 +++++++ .../subscription_data.json | 4 + .../vsomeip-configs/point_to_point.json | 14 + example-streamer-uses/Cargo.toml | 34 +- example-streamer-uses/README.md | 80 ++-- example-streamer-uses/ZENOH_CONFIG.json5 | 419 ++++++++++++++++++ example-streamer-uses/src/bin/common/mod.rs | 94 ++++ example-streamer-uses/src/bin/mqtt_client.rs | 105 +++++ .../src/bin/mqtt_publisher.rs | 94 ++++ example-streamer-uses/src/bin/mqtt_service.rs | 81 ++++ .../src/bin/mqtt_subscriber.rs | 76 ++++ .../bin/{me_client.rs => someip_client.rs} | 44 +- .../{me_publisher.rs => someip_publisher.rs} | 11 +- .../bin/{me_service.rs => someip_service.rs} | 57 +-- ...{me_subscriber.rs => someip_subscriber.rs} | 50 +-- .../src/bin/{ue_client.rs => zenoh_client.rs} | 38 +- .../{ue_publisher.rs => zenoh_publisher.rs} | 2 +- .../bin/{ue_service.rs => zenoh_service.rs} | 56 +-- .../{ue_subscriber.rs => zenoh_subscriber.rs} | 38 +- .../{mE_client.json => someip_client.json} | 0 ...E_publisher.json => someip_publisher.json} | 0 .../{mE_service.json => someip_service.json} | 0 ...subscriber.json => someip_subscriber.json} | 0 up-linux-streamer-plugin/README.md | 2 +- up-linux-streamer/README.md | 23 - up-linux-streamer/src/main.rs | 177 -------- .../vsomeip-configs/point_to_point.json | 16 - utils/mosquitto/docker-compose.yaml | 8 + utils/mosquitto/mosquitto.config | 8 + .../static-configs/testdata.json | 4 +- 40 files changed, 1656 insertions(+), 515 deletions(-) rename {up-linux-streamer => example-streamer-implementations}/Cargo.toml (88%) rename {up-linux-streamer => example-streamer-implementations}/DEFAULT_CONFIG.json5 (67%) create mode 100644 example-streamer-implementations/README.md rename {up-linux-streamer => example-streamer-implementations}/ZENOH_CONFIG.json5 (100%) rename up-linux-streamer/src/config.rs => example-streamer-implementations/src/bin/config/mod.rs (85%) create mode 100644 example-streamer-implementations/src/bin/zenoh_mqtt.rs create mode 100644 example-streamer-implementations/src/bin/zenoh_someip.rs create mode 100644 example-streamer-implementations/subscription_data.json create mode 100644 example-streamer-implementations/vsomeip-configs/point_to_point.json create mode 100644 example-streamer-uses/ZENOH_CONFIG.json5 create mode 100644 example-streamer-uses/src/bin/common/mod.rs create mode 100644 example-streamer-uses/src/bin/mqtt_client.rs create mode 100644 example-streamer-uses/src/bin/mqtt_publisher.rs create mode 100644 example-streamer-uses/src/bin/mqtt_service.rs create mode 100644 example-streamer-uses/src/bin/mqtt_subscriber.rs rename example-streamer-uses/src/bin/{me_client.rs => someip_client.rs} (73%) rename example-streamer-uses/src/bin/{me_publisher.rs => someip_publisher.rs} (92%) rename example-streamer-uses/src/bin/{me_service.rs => someip_service.rs} (59%) rename example-streamer-uses/src/bin/{me_subscriber.rs => someip_subscriber.rs} (66%) rename example-streamer-uses/src/bin/{ue_client.rs => zenoh_client.rs} (77%) rename example-streamer-uses/src/bin/{ue_publisher.rs => zenoh_publisher.rs} (98%) rename example-streamer-uses/src/bin/{ue_service.rs => zenoh_service.rs} (61%) rename example-streamer-uses/src/bin/{ue_subscriber.rs => zenoh_subscriber.rs} (73%) rename example-streamer-uses/vsomeip-configs/{mE_client.json => someip_client.json} (100%) rename example-streamer-uses/vsomeip-configs/{mE_publisher.json => someip_publisher.json} (100%) rename example-streamer-uses/vsomeip-configs/{mE_service.json => someip_service.json} (100%) rename example-streamer-uses/vsomeip-configs/{mE_subscriber.json => someip_subscriber.json} (100%) delete mode 100644 up-linux-streamer/README.md delete mode 100644 up-linux-streamer/src/main.rs delete mode 100644 up-linux-streamer/vsomeip-configs/point_to_point.json create mode 100644 utils/mosquitto/docker-compose.yaml create mode 100644 utils/mosquitto/mosquitto.config diff --git a/.gitignore b/.gitignore index 72c2c178..13cbcf48 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,5 @@ tarpaulin-report.html .vscode/launch.json .vscode/settings.json reports/ + +.cargo/ \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 0b402928..19706d0b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1163,6 +1163,7 @@ dependencies = [ "serde", "tokio", "up-rust", + "up-transport-mqtt5", "up-transport-vsomeip", "up-transport-zenoh", "zenoh", @@ -1330,6 +1331,12 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" +[[package]] +name = "futures-timer" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" + [[package]] name = "futures-util" version = "0.3.31" @@ -2265,6 +2272,15 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-src" +version = "300.4.0+3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a709e02f2b4aca747929cca5ed248880847c650233cf8b8cdc48f40aaf4898a6" +dependencies = [ + "cc", +] + [[package]] name = "openssl-sys" version = "0.9.104" @@ -2273,6 +2289,7 @@ checksum = "45abf306cbf99debc8195b66b7346498d7b10c210de50418b5ccd7ceba08c741" dependencies = [ "cc", "libc", + "openssl-src", "pkg-config", "vcpkg", ] @@ -2301,12 +2318,61 @@ version = "3.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f" +[[package]] +name = "paho-mqtt" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8367868d51cef74c28da328ed8f60529ddd3f04dca1867dd825fcc3085a4308" +dependencies = [ + "async-channel 1.9.0", + "crossbeam-channel", + "futures", + "futures-timer", + "libc", + "log", + "paho-mqtt-sys", + "thiserror", +] + +[[package]] +name = "paho-mqtt-sys" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e482419d847af4ec43c07eed70f5f94f87dc712d267aecc91ab940944ab6bf4" +dependencies = [ + "cmake", + "openssl-sys", +] + [[package]] name = "parking" version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" +[[package]] +name = "parking_lot" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets 0.52.6", +] + [[package]] name = "paste" version = "1.0.15" @@ -2820,6 +2886,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "redox_syscall" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b6dfecf2c74bce2466cabf93f6664d6998a69eb21e39f4207930065b27b771f" +dependencies = [ + "bitflags 2.6.0", +] + [[package]] name = "redox_users" version = "0.4.6" @@ -3725,7 +3800,9 @@ dependencies = [ "bytes", "libc", "mio", + "parking_lot", "pin-project-lite", + "signal-hook-registry", "socket2 0.5.7", "tokio-macros", "tracing", @@ -4019,6 +4096,7 @@ dependencies = [ "tokio", "up-rust", "up-streamer", + "up-transport-mqtt5", "up-transport-vsomeip", "up-transport-zenoh", "usubscription-static-file", @@ -4092,6 +4170,31 @@ dependencies = [ "uuid", ] +[[package]] +name = "up-transport-mqtt5" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0063dbc750c320501e17ce9d0834a616b26ec49ef6c41887f30bc2788ec25405" +dependencies = [ + "async-channel 1.9.0", + "async-trait", + "bytes", + "env_logger 0.10.2", + "futures", + "log", + "paho-mqtt", + "protobuf", + "rand", + "regex", + "serde", + "serde_json", + "tokio", + "tokio-macros", + "up-rust", + "url", + "uuid", +] + [[package]] name = "up-transport-vsomeip" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index 3157f30e..b3d03200 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ members = [ "example-streamer-uses", "utils/hello-world-protos", "utils/integration-test-utils", - "up-linux-streamer", "up-linux-streamer-plugin", + "example-streamer-implementations", "up-linux-streamer-plugin", "up-streamer", "subscription-cache", "utils/usubscription-static-file"] [workspace.package] @@ -44,6 +44,7 @@ protobuf = { version = "3.3", features = ["with-bytes"] } up-rust = { version = "0.2.0", default-features = false } up-transport-zenoh = { version = "0.3.0" } up-transport-vsomeip = { git = "https://github.com/eclipse-uprotocol/up-transport-vsomeip-rust.git", tag = "v0.3.0", default-features = false } +up-transport-mqtt5 = { version = "0.1.0" } zenoh = { version = "1.0.0", features = ["default", "plugins"] } zenoh-core = { version = "1.0.0" } zenoh-plugin-trait = { version = "1.0.0" } diff --git a/README.md b/README.md index ddbdae0b..a47e332a 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# up-streamer-rust +# up-streamer-rust: what is in this repository ## up-streamer @@ -7,11 +7,17 @@ to write a uStreamer application to bridge from one transport to another. Reference its README.md for further details. -## up-linux-streamer +## example-streamer-implementations -Concrete implementation of a uStreamer as a binary. +Two concrete implementations of a uStreamer as a binary. These can be used out of the box either to try running different UStreamer setups or to directly use them in a project! -Reference its README.md for further details. +Reference the README.md there for more details. + +## example-streamer-uses + +A number of UEntity examples for SOME/IP, Zenoh and MQTT5. These can be used together with the example streamer implementations to run basic setups of either a publisher and a subscriber, or a service and a client. + +Reference the README.md there for more details. ## Building @@ -25,22 +31,21 @@ cargo build ### Also build the reference Zenoh, vsomeip streamer implementations -You'll need to use the feature flags `vsomeip-transport` and `zenoh-transport`. You then also have the option of including your own vsomeip or using one bundled in `up-transport-vsomeip`. +You'll need to use the feature flags `vsomeip-transport`, `zenoh-transport` or `mqtt-transport` depending on which implementation you want to build. You then also have the option of including your own vsomeip or using one bundled in `up-transport-vsomeip`. -For the bundled option, the following: +For the bundled option, set the following environment variables (for example in your .cargo/config.toml file): +```toml +[env] +GENERIC_CPP_STDLIB_PATH= +ARCH_SPECIFIC_CPP_STDLIB_PATH= ``` -GENERIC_CPP_STDLIB_PATH= ARCH_SPECIFIC_CPP_STDLIB_PATH= cargo build --features vsomeip-transport,bundled-vsomeip,zenoh-transport -``` - -where for reference on my machine: -``` -GENERIC_CPP_STDLIB_PATH=/usr/include/c++/13 -ARCH_SPECIFIC_CPP_STDLIB_PATH=/usr/include/x86_64-linux-gnu/c++/13 +```bash +cargo build --features vsomeip-transport,bundled-vsomeip,zenoh-transport ``` -These environment varialbes are necessary because of a workaround done in `up-transport-vsomeip` due to not being able to figure out another way to compile vsomeip without them. (If you can figure out how to avoid this, I'm all ears!) +The environment variables are necessary because of a workaround done in `up-transport-vsomeip` due to not being able to figure out another way to compile vsomeip without them. (If you can figure out how to avoid this, I'm all ears!) Please reference the documentation for [vsomeip-sys](https://github.com/eclipse-uprotocol/up-transport-vsomeip-rust/tree/main/vsomeip-sys) for more details on: * the build requirements for vsomeip in the linked documentation in the COVESA repo diff --git a/up-linux-streamer/Cargo.toml b/example-streamer-implementations/Cargo.toml similarity index 88% rename from up-linux-streamer/Cargo.toml rename to example-streamer-implementations/Cargo.toml index f9537e1f..8c0ad367 100644 --- a/up-linux-streamer/Cargo.toml +++ b/example-streamer-implementations/Cargo.toml @@ -20,13 +20,17 @@ keywords.workspace = true license.workspace = true [[bin]] -name = "up-linux-streamer" -path = "src/main.rs" +name = "zenoh_someip" required-features = ["zenoh-transport", "vsomeip-transport"] +[[bin]] +name = "zenoh_mqtt" +required-features = ["zenoh-transport", "mqtt-transport"] + [features] default = [] zenoh-transport = ["up-transport-zenoh", "zenoh"] +mqtt-transport = ["up-transport-mqtt5"] vsomeip-transport = ["up-transport-vsomeip"] bundled-vsomeip = ["up-transport-vsomeip/bundled"] @@ -44,6 +48,7 @@ up-rust = { workspace = true } up-streamer = { path = "../up-streamer" } up-transport-zenoh = { workspace = true, optional = true } up-transport-vsomeip = { workspace = true, optional = true } +up-transport-mqtt5 = { workspace = true, optional = true } zenoh = { workspace = true, optional = true } usubscription-static-file = {path = "../utils/usubscription-static-file"} diff --git a/up-linux-streamer/DEFAULT_CONFIG.json5 b/example-streamer-implementations/DEFAULT_CONFIG.json5 similarity index 67% rename from up-linux-streamer/DEFAULT_CONFIG.json5 rename to example-streamer-implementations/DEFAULT_CONFIG.json5 index 83d97e96..473a6f9a 100644 --- a/up-linux-streamer/DEFAULT_CONFIG.json5 +++ b/example-streamer-implementations/DEFAULT_CONFIG.json5 @@ -11,7 +11,7 @@ streamer_uuri: { // Determines the authority_name of the host device // Used when initializing host transport - authority: "linux", + authority: "authority_B", // Determines the ue_id of the streamer // Used when initializing host transport ue_id: 78, @@ -21,21 +21,35 @@ }, usubscription_config: { // Lists the path to the subscription file when using static file - file_path: "./utils/usubscription-static-file/static-configs/testdata.json" + file_path: "../example-streamer-implementations/subscription_data.json" }, zenoh_transport_config: { // Configuration file which is where zenoh config information is stored - config_file: "./up-linux-streamer/ZENOH_CONFIG.json5" + config_file: "../example-streamer-implementations/ZENOH_CONFIG.json5" }, // Configurations related to the host device we are running the streamer on host_config: { // Determines which transport to initialize for the host device transport: "Zenoh", }, + mqtt_config: { + // The URL of the MQTT broker (the provided mosquitto broker runs on locahost but docker networks might complicate that) + hostname: "localhost", + // The port of the broker (unencrypted MQTT like the provided mosquitto broker typically uses 1883, encrypted uses 8883) + port: 1883, + // How many messages the broker should buffer for this connections + max_buffered_messages: 100, + // How many individual topic subscriptions are supported through this connection + max_subscriptions: 100, + // How long the connection should stay open for + session_expiry_interval: 3600, + // The username that the mqtt client gives the broker when connecting (usually not important) + username: "user" + }, someip_config: { // Determines the authority_name of the mechatronics network // Used when initializing SOME/IP transport - authority: "me_authority", + authority: "authority_A", // The vsomeip configuration file to be used when initializing the vsomeip transport // // Some guidance: @@ -46,11 +60,9 @@ // called // * the `id` field should be chosen such that it matches the ue_id of the uE on the host // device - config_file: "../../up-linux-streamer/vsomeip-configs/point_to_point.json", + config_file: "../../example-streamer-implementations/vsomeip-configs/point_to_point.json", // An ID to use for a vsomeip application which will represent all subscriptions to // publish messages output over vsomeip - default_someip_application_id_for_someip_subscriptions: 10, - // Whether to enable bridging across to the mechatronics network - enabled: true + default_someip_application_id_for_someip_subscriptions: 10 }, } diff --git a/example-streamer-implementations/README.md b/example-streamer-implementations/README.md new file mode 100644 index 00000000..ad7ba04c --- /dev/null +++ b/example-streamer-implementations/README.md @@ -0,0 +1,153 @@ +# up-linux-streamer + +These are reference implementations of a uStreamer. +We implement the Streamers so that they forward between two components, either between Zenoh and MQTT or between Zenoh and SOME/IP. +A component running with Zenoh could be a component running in an ECU, one running with MQTT could be a component running in some sort of cloud and one running SOME/IP could be a mechatronics component. +The Streamer by itself does not do much, but its designed to be run together with two of the transport examples in the example-streamer-uses folder. + +## Supported Setups + +Here are the setups that can be built with these streamers and different entities with the Zenoh, SOME/IP and MQTT transports. +Depending on which setup you want to try, run the streamer with: + +```bash +cargo run --bin --features= -- --config="DEFAULT_CONFIG.json5" +``` + +### Client-Service Setups + +In a setup with one client and one service, the service runs in the background while the client periodically makes requests to it. +Once the server receives a request it will respond with a reply. +The request message contains information on a "sink" (the URI of the service entity which it tries to reach) and a source (the URI of the client so that the service knows where to send the response to). + +For a single setup you can choose either: +- a Zenoh Client and an MQTT Service (A cars' software requesting information from the cloud) +- an MQTT Client and a Zenoh Service (A backend service trying to pull telemetry data from a running car) +- a Zenoh Client and a SOME/IP Service (The infotainment system requesting some mechatronics sensor data) +- or a SOME/IP Client and a Zenoh Service (A mechatronics component asking the infotainment system for input) + +### Publish-Subscribe Setups + +This setup is more straight forward and consists of one publisher broadcasting messages to a topic and a subscriber who listens to the topic. +Messages that the publisher sends contain a "topic" that the message will be available on. There is no response expected so publish messages do not contain the publishers URI. +The subscriber listens to one or multiple topics via a filter. + +For this setup you can also choose: +- a Zenoh Publisher and an MQTT Subscriber (A backend service getting live data from a car) +- an MQTT Publisher and a Zenoh Subscriber (A car getting over the air traffic information) +- a Zenoh Publisher and a SOME/IP Subscriber (An autoedge app pushing some configurations to a mechatronics component) +- or a SOME/IP Publisher and a Zenoh Subscriber (An infotainment app getting live data from a sensor) + +### Notification setups + +There are currently no example entities for notification type messages. These do not exist for SOME/IP but do exist for Zenoh and MQTT. It should be relatively straight forward to implement the yourself if your system needs them! + +## Understanding the Configuration Files + +Reference the `DEFAULT_CONFIG.json5` configuration file to understand the basic configuration options for the Streamer. + +The `ZENOH_CONFIG.json5` file is used to set Zenoh configurations. By default, it is only used to set listening endpoints, but can be used with more configurations according to [Zenoh's page on it](https://zenoh.io/docs/manual/configuration/#configuration-files). + +The 'static_subscriptions.json' is only needed when you set up a publish-subscribe system and can be ignored for a client-service system. +Make sure that the UURI of each pub-sub entity is present at least as a key in this json file! + +The 'vsomeip-config/point_to_point.json' is a configuration file only needed for SOME/IP implementations. The list of "services" must include the UEntity IDs of all entities running on the host-protocol (in the reference implementations that means all components running with the Zenoh transport)! The term service in this context comes from SOME/IP and should not be confused with UService entity. + +The MQTT Transport has its configuration hardcoded for now. For the settings seach the binaries for "mqtt_config" to set the MQTT brokers address and port as well as ssl-options and others. + +## Running the Streamer in a system consisting of an ECU that supports Zenoh and a cloud component that runs MQTT + +### Running the `zenoh_mqtt` binary as the Streamer instance + +To run one of the basic examples and see two entities with different transports communicate, you'll need to first run the `zenoh_mqtt` bin to bridge between the two transports in a terminal. This implementation should work out of the box with the given examples. + +First run an MQTT broker. You can use your own or use the Mosquitto instance provided in this repo. For that execute the following: + +```bash +cd ../utils/mosquitto +docker compose up +``` + +Run the linux streamer with the default configuration file from here (the example-streamer-implementation folder): + +```bash +cargo run -bin zenoh_mqtt --features="zenoh-transport mqtt-transport" -- --config="DEFAULT_CONFIG.json5" +``` + +This starts the streamer which should now be idle. As soon as a client tries to connect with the streamer, the connection will be logged. +The streamer is set to have Zenoh as its "host protocol" or "host transport". This means that the streamer lives in the same component as the Zenoh transport, and shares its authority. +In this setup "authority_B" is the authority of the Zenoh component (in this example the ECU), "authority_A" is the authority of the MQTT component (i.e. the cloud). + +### Running the Entities + +Execute the following command from the project root directory to start two of the example UEntities: + +```bash +cargo run --bin --features= +``` + +Depending on the setup you want to test, chose any of these combinations for your two UEntities: + +| Entity 1 | Entity 2 | +| --------------- | ------------- | +| mqtt_client | zenoh_service | +| mqtt_service | zenoh_client | +| mqtt_publisher | zenoh_subscriber | +| mqtt_subscriber | zenoh_publisher | + +The service and client will run forever. Every second a new request message is sent from the client via zenoh. That Zenoh message is caught and routed over MQTT to the service. The response to the request makes the same journey in reverse. + +## Running the Streamer in a system consisting of an ECU that supports Zenoh and a mechatronics component using SOME/IP + +### Running the `zenoh_someip` binary as the Streamer instance + +The other available reference implementation streams between the Zenoh and SOME/IP transports. + +For this implementation you need to add the path to your vsomeip library to the environment, for example by setting it in your .cargo/config.toml: + +```toml +[env] +VSOMEIP_INSTALL_PATH= +``` + +Run the "zenoh_someip" binary of the streamer with the default configuration file: + +```bash +cargo run -bin zenoh_someip --features="zenoh-transport vsomeip-transport bundled-someip" -- --config="DEFAULT_CONFIG.json5" +``` + +For the feature flags choose either "vsomeip-transport" or "bundled-vsomeip" depending on your vsomeip configuration. +Make sure that you have set the required environment variables for the someip transport! + +This starts the streamer which should now be idle. As soon as a client tries to connect with the streamer, the connection will be logged. +The streamer is set to have Zenoh as its "host protocol" or "host transport". This means that the streamer lives in the same component as the Zenoh transport, and shares its authority. +In this setup "authority_B" is the authority of the Zenoh component (in this example the ECU), "authority_A" is the authority of the SOME/IP component (i.e. the mechatronics component). + +### Running the Entities + +Execute the following command from the project root directory to start one of the Entities: + +```bash +cargo run -p example-streamer-uses --bin +``` + +Depending on the setup you want to test, chose any of these examples: + +| Entity 1 | Entity 2 | +| --------------- | ------------- | +| someip_client | zenoh_service | +| someip_service | zenoh_client | +| someip_publisher | zenoh_subscriber | +| someip_subscriber | zenoh_publisher | + +The two entities will run forever and exchange messages between each other. + +## Going forward from here + +If you have familiarized yourself with the streamer to this point you should be able to continue by yourself. +If the two reference implementations are not enough for your system you can consider the following next steps: + +- Create a streamer between SOME/IP and MQTT (mind that SOME/IP cannot act as the host-transport) +- Run a streamer that can forward messages between all three transports +- Implement your own custom or proprietary UTransport and connect it to one of the three officially supported ones +- Try out a system with your own UEntities. Between MQTT and Zenoh its also possible to send notification type messages diff --git a/up-linux-streamer/ZENOH_CONFIG.json5 b/example-streamer-implementations/ZENOH_CONFIG.json5 similarity index 100% rename from up-linux-streamer/ZENOH_CONFIG.json5 rename to example-streamer-implementations/ZENOH_CONFIG.json5 diff --git a/up-linux-streamer/src/config.rs b/example-streamer-implementations/src/bin/config/mod.rs similarity index 85% rename from up-linux-streamer/src/config.rs rename to example-streamer-implementations/src/bin/config/mod.rs index f722d346..f97557ad 100644 --- a/up-linux-streamer/src/config.rs +++ b/example-streamer-implementations/src/bin/config/mod.rs @@ -23,6 +23,7 @@ pub struct Config { pub(crate) zenoh_transport_config: ZenohTransportConfig, pub(crate) host_config: HostConfig, pub(crate) someip_config: SomeipConfig, + pub(crate) mqtt_config: MqttConfig, } #[derive(Deserialize, Serialize, Debug, Clone)] @@ -63,10 +64,21 @@ pub struct SomeipConfig { pub(crate) authority: String, pub(crate) config_file: PathBuf, pub(crate) default_someip_application_id_for_someip_subscriptions: u16, - pub(crate) enabled: bool, +} + +#[derive(Deserialize, Serialize, Debug, Clone)] +#[serde(deny_unknown_fields)] +pub struct MqttConfig { + pub(crate) hostname: String, + pub(crate) port: u16, + pub(crate) max_buffered_messages: i32, + pub(crate) max_subscriptions: i32, + pub(crate) session_expiry_interval: i32, + pub(crate) username: String, } #[derive(Deserialize, Serialize, Debug, Clone)] pub enum HostTransport { Zenoh, + Mqtt, } diff --git a/example-streamer-implementations/src/bin/zenoh_mqtt.rs b/example-streamer-implementations/src/bin/zenoh_mqtt.rs new file mode 100644 index 00000000..2943fc71 --- /dev/null +++ b/example-streamer-implementations/src/bin/zenoh_mqtt.rs @@ -0,0 +1,138 @@ +/******************************************************************************** + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +mod config; + +use crate::config::Config; +use clap::Parser; +use log::{info, trace}; +use std::fs::File; +use std::io::Read; +use std::sync::Arc; +use std::thread; +use up_rust::{UCode, UStatus, UTransport, UUri, UUID}; +use up_streamer::{Endpoint, UStreamer}; +use up_transport_mqtt5::{MqttConfig, MqttProtocol, UPClientMqtt, UPClientMqttType}; +use up_transport_zenoh::UPTransportZenoh; +use usubscription_static_file::USubscriptionStaticFile; +use zenoh::config::Config as ZenohConfig; + +#[derive(Parser)] +#[command()] +struct StreamerArgs { + #[arg(short, long, value_name = "FILE")] + config: String, +} + +#[tokio::main] +async fn main() -> Result<(), UStatus> { + env_logger::init(); + + info!("Started up-linux-streamer-mqtt-zenoh"); + + // Get the config file. + let args = StreamerArgs::parse(); + let mut file = File::open(args.config) + .map_err(|e| UStatus::fail_with_code(UCode::NOT_FOUND, format!("File not found: {e:?}")))?; + let mut contents = String::new(); + file.read_to_string(&mut contents).map_err(|e| { + UStatus::fail_with_code( + UCode::INTERNAL, + format!("Unable to read config file: {e:?}"), + ) + })?; + + let config: Config = json5::from_str(&contents).map_err(|e| { + UStatus::fail_with_code( + UCode::INTERNAL, + format!("Unable to parse config file: {e:?}"), + ) + })?; + + let subscription_path = config.usubscription_config.file_path; + let usubscription = Arc::new(USubscriptionStaticFile::new(subscription_path)); + + // Start the streamer instance. + let mut streamer = UStreamer::new( + "up-streamer", + config.up_streamer_config.message_queue_size, + usubscription, + ) + .expect("Failed to create uStreamer"); + + // In this implementation we define that the streamer lives in the same ecu component as the zenoh entity and so shares its authority name but with a different service ID. + let streamer_uuri = UUri::try_from_parts( + &config.streamer_uuri.authority, + config.streamer_uuri.ue_id, + config.streamer_uuri.ue_version_major, + 0, + ) + .expect("Unable to form streamer_uuri"); + + trace!("streamer_uuri: {streamer_uuri:#?}"); + let zenoh_config = ZenohConfig::from_file(config.zenoh_transport_config.config_file).unwrap(); + + let zenoh_transport: Arc = Arc::new( + UPTransportZenoh::new(zenoh_config, streamer_uuri) + .await + .expect("Unable to initialize Zenoh UTransport"), + ); + + // Because the streamer runs on the ecu side in this implementation, we call the zenoh endpoint the "host". + let zenoh_endpoint = Endpoint::new( + "host_endpoint", + &config.streamer_uuri.authority, + zenoh_transport.clone(), + ); + + let mqtt_config = MqttConfig { + mqtt_protocol: MqttProtocol::Mqtt, + mqtt_hostname: config.mqtt_config.hostname, + mqtt_port: config.mqtt_config.port, + max_buffered_messages: config.mqtt_config.max_buffered_messages, + max_subscriptions: config.mqtt_config.max_subscriptions, + session_expiry_interval: config.mqtt_config.session_expiry_interval, + ssl_options: None, + username: config.mqtt_config.username, + }; + + let mqtt_transport: Arc = Arc::new( + UPClientMqtt::new( + mqtt_config, + UUID::build(), + "cloud".to_string(), + UPClientMqttType::Device, + ) + .await + .expect("Could not create mqtt transport."), + ); + + // In this implementation, the mqtt entity runs in the cloud and has its own authority. + let mqtt_endpoint = Endpoint::new("cloud_endpoint", "authority_A", mqtt_transport.clone()); + + // Here we tell the streamer to forward any zenoh messages to the mqtt endpoint + streamer + .add_forwarding_rule(zenoh_endpoint.clone(), mqtt_endpoint.clone()) + .await + .expect("Could not add zenoh -> mqtt forwarding rule"); + + // And here we set up the forwarding in the other direction. + streamer + .add_forwarding_rule(mqtt_endpoint.clone(), zenoh_endpoint.clone()) + .await + .expect("Could not add mqtt -> zenoh forwarding rule"); + + thread::park(); + + Ok(()) +} diff --git a/example-streamer-implementations/src/bin/zenoh_someip.rs b/example-streamer-implementations/src/bin/zenoh_someip.rs new file mode 100644 index 00000000..d6d09055 --- /dev/null +++ b/example-streamer-implementations/src/bin/zenoh_someip.rs @@ -0,0 +1,153 @@ +/******************************************************************************** + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +mod config; + +use crate::config::Config; +use clap::Parser; +use log::trace; +use std::fs::File; +use std::io::Read; +use std::sync::Arc; +use std::{env, thread}; +use up_rust::{UCode, UStatus, UTransport, UUri}; +use up_streamer::{Endpoint, UStreamer}; +use up_transport_vsomeip::UPTransportVsomeip; +use up_transport_zenoh::UPTransportZenoh; +use usubscription_static_file::USubscriptionStaticFile; +use zenoh::config::Config as ZenohConfig; + +#[derive(Parser)] +#[command()] +struct StreamerArgs { + #[arg(short, long, value_name = "FILE")] + config: String, +} + +#[tokio::main(flavor = "multi_thread")] +async fn main() -> Result<(), UStatus> { + env_logger::init(); + + let args = StreamerArgs::parse(); + + let mut file = File::open(args.config) + .map_err(|e| UStatus::fail_with_code(UCode::NOT_FOUND, format!("File not found: {e:?}")))?; + let mut contents = String::new(); + file.read_to_string(&mut contents).map_err(|e| { + UStatus::fail_with_code( + UCode::INTERNAL, + format!("Unable to read config file: {e:?}"), + ) + })?; + + let config: Config = json5::from_str(&contents).map_err(|e| { + UStatus::fail_with_code( + UCode::INTERNAL, + format!("Unable to parse config file: {e:?}"), + ) + })?; + + let subscription_path = config.usubscription_config.file_path; + let usubscription = Arc::new(USubscriptionStaticFile::new(subscription_path)); + + // Start the streamer instance. + let mut streamer = UStreamer::new( + "up-streamer", + config.up_streamer_config.message_queue_size, + usubscription, + ) + .expect("Failed to create uStreamer"); + + let streamer_uuri = UUri::try_from_parts( + &config.streamer_uuri.authority, + config.streamer_uuri.ue_id, + config.streamer_uuri.ue_version_major, + 0, + ) + .expect("Unable to form streamer_uuri"); + + trace!("streamer_uuri: {streamer_uuri:#?}"); + + let zenoh_config = ZenohConfig::from_file(config.zenoh_transport_config.config_file).unwrap(); + + let zenoh_transport: Arc = Arc::new( + UPTransportZenoh::new(zenoh_config, streamer_uuri) + .await + .expect("Unable to initialize Zenoh UTransport"), + ); + + // Because the streamer runs on the ecu side in this implementation, we call the zenoh endpoint the "host". + let zenoh_endpoint = Endpoint::new( + "host_endpoint", + &config.streamer_uuri.authority, + zenoh_transport.clone(), + ); + + let someip_config_file_abs_path = if config.someip_config.config_file.is_relative() { + env::current_exe() + .unwrap() + .parent() + .unwrap() + .join(&config.someip_config.config_file) + } else { + config.someip_config.config_file + }; + trace!("someip_config_file_abs_path: {someip_config_file_abs_path:?}"); + if !someip_config_file_abs_path.exists() { + panic!("The specified someip config_file doesn't exist: {someip_config_file_abs_path:?}"); + } + + let host_uuri = UUri::try_from_parts( + &config.streamer_uuri.authority, + config + .someip_config + .default_someip_application_id_for_someip_subscriptions as u32, + 1, + 0, + ) + .expect("Unable to make host_uuri"); + + // There will be at most one vsomeip_transport, as there is a connection into device and a streamer + let someip_transport: Arc = Arc::new( + UPTransportVsomeip::new_with_config( + host_uuri, + &config.someip_config.authority, + &someip_config_file_abs_path, + None, + ) + .expect("Unable to initialize vsomeip UTransport"), + ); + + // In this implementation, the mqtt entity runs in the cloud and has its own authority. + let someip_endpoint = Endpoint::new( + "someip_endpoint", + &config.someip_config.authority, + someip_transport.clone(), + ); + + // Here we tell the streamer to forward any zenoh messages to the someip endpoint + streamer + .add_forwarding_rule(zenoh_endpoint.clone(), someip_endpoint.clone()) + .await + .expect("Could not add zenoh -> someip forwarding rule"); + + // And here we set up the forwarding in the other direction. + streamer + .add_forwarding_rule(someip_endpoint.clone(), zenoh_endpoint.clone()) + .await + .expect("Could not add someip -> zenoh forwarding rule"); + + thread::park(); + + Ok(()) +} diff --git a/example-streamer-implementations/subscription_data.json b/example-streamer-implementations/subscription_data.json new file mode 100644 index 00000000..ca03eaa2 --- /dev/null +++ b/example-streamer-implementations/subscription_data.json @@ -0,0 +1,4 @@ +{ + "//authority_B/3039/1/8001": ["//authority_A/5678/1/1234"], + "//authority_A/5BA0/1/8001": ["//authority_B/5678/1/1234"] +} diff --git a/example-streamer-implementations/vsomeip-configs/point_to_point.json b/example-streamer-implementations/vsomeip-configs/point_to_point.json new file mode 100644 index 00000000..bf7e0eab --- /dev/null +++ b/example-streamer-implementations/vsomeip-configs/point_to_point.json @@ -0,0 +1,14 @@ +{ + "applications": [ + { + "name": "foo", + "id": "0x1236" + } + ], + "services": [ + { + "service": "0x1236", + "instance": "0x0001" + } + ] +} diff --git a/example-streamer-uses/Cargo.toml b/example-streamer-uses/Cargo.toml index 32f1ba7e..5483aaac 100644 --- a/example-streamer-uses/Cargo.toml +++ b/example-streamer-uses/Cargo.toml @@ -20,41 +20,58 @@ keywords.workspace = true license.workspace = true [[bin]] -name = "me_client" +name = "someip_client" required-features = ["vsomeip-transport"] [[bin]] -name = "me_publisher" +name = "someip_publisher" required-features = ["vsomeip-transport"] [[bin]] -name = "me_service" +name = "someip_service" required-features = ["vsomeip-transport"] [[bin]] -name = "me_subscriber" +name = "someip_subscriber" required-features = ["vsomeip-transport"] [[bin]] -name = "ue_client" +name = "zenoh_client" required-features = ["zenoh-transport"] [[bin]] -name = "ue_publisher" +name = "zenoh_publisher" required-features = ["zenoh-transport"] [[bin]] -name = "ue_service" +name = "zenoh_service" required-features = ["zenoh-transport"] [[bin]] -name = "ue_subscriber" +name = "zenoh_subscriber" required-features = ["zenoh-transport"] +[[bin]] +name = "mqtt_client" +required-features = ["mqtt-transport"] + +[[bin]] +name = "mqtt_publisher" +required-features = ["mqtt-transport"] + +[[bin]] +name = "mqtt_service" +required-features = ["mqtt-transport"] + +[[bin]] +name = "mqtt_subscriber" +required-features = ["mqtt-transport"] + [features] zenoh-transport = ["up-transport-zenoh", "zenoh"] vsomeip-transport = ["up-transport-vsomeip"] bundled-vsomeip = ["up-transport-vsomeip/bundled"] +mqtt-transport = ["up-transport-mqtt5"] [dependencies] async-trait = { workspace = true } @@ -70,4 +87,5 @@ tokio = { workspace = true } up-rust = { workspace = true } up-transport-zenoh = { workspace = true, optional = true } up-transport-vsomeip = { workspace = true, optional = true } +up-transport-mqtt5 = { workspace = true, optional = true } zenoh = { workspace = true, optional = true } diff --git a/example-streamer-uses/README.md b/example-streamer-uses/README.md index e3615eb2..cf6595fb 100644 --- a/example-streamer-uses/README.md +++ b/example-streamer-uses/README.md @@ -1,66 +1,76 @@ -# Examples -## Running examples +# Running entities -### Running the `up-linux-streamer` +## Running a UStreamer -To run one of the examples below and see client and service communicate, you'll need to run the `up-linux-streamer` to bridge between the transports in a terminal. See `up-linux-streamer` README for details. +To run one of the examples below and see two entities communicate, you'll need to run one of the two example implementations of the UStreamer to bridge between the transports in a terminal. Check out the `example-streamer-implementation` README for details. -### Mechatronics client to high compute service +## Running two UEntities -Launch the `uE_service` example in another terminal: +To launch any of the entities choose the correct bin and give it the appropriate feature flags: -```bash -LD_LIBRARY_PATH=$LD_LIBRARY_PATH: cargo run --example uE_service -- --endpoint tcp/0.0.0.0:7445 -``` +| --bin | --features | +| ---------- | ------------- | +| mqtt_* | mqtt-transport | +| zenoh_* | zenoh-transport | +| someip* | vsomeip-transport | -In this example, the "--endpoint" flag will set the endpoint address upon which the zenoh client will listen. - -Launch the `mE_client` example in another terminal: +Combined: ```bash -LD_LIBRARY_PATH=$LD_LIBRARY_PATH: cargo run --example mE_client +cargo run --bin --features <*-transport> ``` -The service and client will run forever. Every second a new message is sent from the mE_client via vsomeip. That vsomeip message is caught and routed over Zenoh to the uE_service. The response makes the same journey in reverse. +To check which combinations of entities are possible for a test run check the `example-streamer-implementation` README. +Before running any entity that uses MQTT remember to first start up a MQTT broker, for example the mosquitto broker found in this repo: -It's intended that you see the following in the terminal running the `mE_client`: +```bash +cd ../utils/mosquitto +docker compose up +``` -> Sending Request message: -UMessage { attributes: MessageField(Some(UAttributes { id: MessageField(Some(UUID { msb: 112888656100425728, lsb: 9811577761723054400, special_fields: SpecialFields { unknown_fields: UnknownFields { fields: None }, cached_size: CachedSize { size: 0 } } })), type_: UMESSAGE_TYPE_REQUEST, source: MessageField(Some(UUri { authority_name: "me_authority", ue_id: 22136, ue_version_major: 1, resource_id: 0, special_fields: SpecialFields { unknown_fields: UnknownFields { fields: None }, cached_size: CachedSize { size: 0 } } })), sink: MessageField(Some(UUri { authority_name: "linux", ue_id: 4662, ue_version_major: 1, resource_id: 2198, special_fields: SpecialFields { unknown_fields: UnknownFields { fields: None }, cached_size: CachedSize { size: 0 } } })), priority: UPRIORITY_CS4, ttl: Some(1000), permission_level: None, commstatus: None, reqid: MessageField(None), token: None, traceparent: None, payload_format: UPAYLOAD_FORMAT_PROTOBUF, special_fields: SpecialFields { unknown_fields: UnknownFields { fields: None }, cached_size: CachedSize { size: 0 } } })), **payload: Some(b"\n\rme_client@i=3")**, special_fields: SpecialFields { unknown_fields: UnknownFields { fields: None }, cached_size: CachedSize { size: 0 } } } +# Working setup: a mechatronics client and a high compute service -This is the request message we will send _from_ the mE_client over vsomeip. +This example should help with understanding what the streamer does and how it can be used. The first part of the system that we are building here is a mechatronics component, which could be some embedded chip deep down in the belly of a car, connected to some mechanical parts like a windshield wiper. This component can (or wants to) only use SOME/IP as a transport protocol either because thats what its developers like to use or because of some hardware limitation. The second component is a high compute unit running some small Linux system that serves as the vehicles boardcomputer. The software thats running here uses Zenoh as its preferred transport protocol because a solution architect once decided that it should do that. The UStreamer comes into play as the system that allows these two components to communicate with each other while staying true to their respective protocols. -Note the payload is listed with `@i=3`. This number is incremented on each send so that we can trace the message back and forth over the `up-linux-streamer`. +Because the streamer cant run on the mechatronics chip, it runs on the high compute unit. In our setup we want the mechatronics entity to request some information from the high compute unit, so the entity type on the SOME/IP side should be a client. If the chip indeed controls the windshield wipers it might want to periodically ask the board computer to provide information about if and how much its raining. The high compute unit should respond to the chip with the latest data that it has available, so it must be an entity of the type service. -You should then see something like this in the `uE_service` terminal: +## Run the Zenoh-SOME/IP Streamer -> ServiceRequestResponder: Received a message: UMessage { attributes: MessageField(Some(UAttributes { id: MessageField(Some(UUID { msb: 112888656100622336, lsb: 10998499817480005337, special_fields: SpecialFields { unknown_fields: UnknownFields { fields: None }, cached_size: CachedSize { size: 0 } } })), type_: UMESSAGE_TYPE_REQUEST, source: MessageField(Some(UUri { authority_name: "me_authority", ue_id: 257, ue_version_major: 1, resource_id: 0, special_fields: SpecialFields { unknown_fields: UnknownFields { fields: None }, cached_size: CachedSize { size: 0 } } })), sink: MessageField(Some(UUri { authority_name: "linux", ue_id: 4662, ue_version_major: 1, resource_id: 2198, special_fields: SpecialFields { unknown_fields: UnknownFields { fields: None }, cached_size: CachedSize { size: 0 } } })), priority: UPRIORITY_CS4, ttl: Some(1000), permission_level: None, commstatus: None, reqid: MessageField(None), token: None, traceparent: None, payload_format: UPAYLOAD_FORMAT_UNSPECIFIED, special_fields: SpecialFields { unknown_fields: UnknownFields { fields: None }, cached_size: CachedSize { size: 0 } } })), **payload: Some(b"\n\rme_client@i=3")**, special_fields: SpecialFields { unknown_fields: UnknownFields { fields: None }, cached_size: CachedSize { size: 0 } } } +To launch the streamer, go to the example-streamer-implementations folder and run the binary with the correct feature flags and the provided config file in a new terminal: -Note that the message we received in the uE_service through the streamer still shows the payload with `@i=3`. +```bash +cd ../example-streamer-implementations +cargo run --bin zenoh_someip --features="zenoh-transport vsomeip-transport" -- --config='DEFAULT_CONFIG.json5' +``` -If you again reference the terminal where `mE_client` is running you should see the the response message printed: +This will start the streamer. By itself it should not do much, except for logging the vsomeip version every now and then. -> ServiceResponseListener: Received a message: UMessage { attributes: MessageField(Some(UAttributes { id: MessageField(Some(UUID { msb: 112888656101015552, lsb: 9811577761723054400, special_fields: SpecialFields { unknown_fields: UnknownFields { fields: None }, cached_size: CachedSize { size: 0 } } })), type_: UMESSAGE_TYPE_RESPONSE, source: MessageField(Some(UUri { authority_name: "linux", ue_id: 4662, ue_version_major: 1, resource_id: 2198, special_fields: SpecialFields { unknown_fields: UnknownFields { fields: None }, cached_size: CachedSize { size: 0 } } })), sink: MessageField(Some(UUri { authority_name: "me_authority", ue_id: 257, ue_version_major: 1, resource_id: 0, special_fields: SpecialFields { unknown_fields: UnknownFields { fields: None }, cached_size: CachedSize { size: 0 } } })), priority: UPRIORITY_CS4, ttl: None, permission_level: None, commstatus: Some(INTERNAL), reqid: MessageField(Some(UUID { msb: 112888656100425728, lsb: 9811577761723054400, special_fields: SpecialFields { unknown_fields: UnknownFields { fields: None }, cached_size: CachedSize { size: 0 } } })), token: None, traceparent: None, payload_format: UPAYLOAD_FORMAT_UNSPECIFIED, special_fields: SpecialFields { unknown_fields: UnknownFields { fields: None }, cached_size: CachedSize { size: 0 } } })), **payload: Some(b"\nThe response to the request: me_client@i=3")**, special_fields: SpecialFields { unknown_fields: UnknownFields { fields: None }, cached_size: CachedSize { size: 0 } } } +## Run the mechatronics client -Note that the response also contains `@i=3` showing that we have received a response for the original request containing that in the payload. +To launch the mechatronics entity, run the binary `someip_client` example in a new terminal: -Further, you will see printed the deserialized `HelloResponse` ProtoBuf object: +```bash +cargo run --bin someip_client --features someip_transport +``` -> Here we received response: HelloResponse { message: "The response to the request: me_client@i=3", special_fields: SpecialFields { unknown_fields: UnknownFields { fields: None }, cached_size: CachedSize { size: 0 } } } +You should see in the log that the client is trying to send messages of type UMESSAGE_TYPE_REQUEST. These messages should now also show up in the logs of your streamer terminal just after the SOME/IP client sends them. Since there is no other component they are not yet being forwarded anywhere though. -### High compute service to mechatronics client +## Run the high compute unit -Launch the `mE_service` example in another terminal: +Lastly to launch the high compute unit that has access to whatever data the mechatronics part is requesting, launch a zenoh service in yet another terminal: ```bash -LD_LIBRARY_PATH=$LD_LIBRARY_PATH: cargo run --example mE_service +cargo run --bin zenoh_service --features zenoh_transport ``` -Launch the `uE_client` example in another terminal: +Immediately after launching you should see that this entity is logging incoming requests of type UMESSAGE_TYPE_REQUEST and right after is trying to answer them with UMESSAGE_TYPE_RESPONSE messages. These should then show up again in the log of the UStreamer, and also in the log of the mechatronics SOME/IP client. -```bash -LD_LIBRARY_PATH=$LD_LIBRARY_PATH: cargo run --example uE_client -- --endpoint tcp/0.0.0.0:7444 -``` +The service and client will run forever. Every second a new request is made from the someip_client via vsomeip. That vsomeip message is caught and routed over Zenoh to the zenoh_service. The response makes the same journey in reverse. + +To better track the messages you can also check that the payload is listed with `@i=n` where n is incremented on each send so that we can trace the message back and forth over the streamer. + + +Further, you will see printed the deserialized `HelloResponse` ProtoBuf object: -We omit a detail explanation of the expected terminal output as it's a mirror of the `Mechatronics client to high compute service` heading above. +> Here we received response: HelloResponse { message: "The response to the request: someip_client@i=n", special_fields: SpecialFields { unknown_fields: UnknownFields { fields: None }, cached_size: CachedSize { size: 0 } } } diff --git a/example-streamer-uses/ZENOH_CONFIG.json5 b/example-streamer-uses/ZENOH_CONFIG.json5 new file mode 100644 index 00000000..fb6ef030 --- /dev/null +++ b/example-streamer-uses/ZENOH_CONFIG.json5 @@ -0,0 +1,419 @@ +/// This file attempts to list and document available configuration elements. +/// For a more complete view of the configuration's structure, check out `zenoh/src/config.rs`'s `Config` structure. +/// Note that the values here are correctly typed, but may not be sensible, so copying this file to change only the parts that matter to you is not good practice. +{ + /// The identifier (as unsigned 128bit integer in hexadecimal lowercase - leading zeros are not accepted) + /// that zenoh runtime will use. + /// If not set, a random unsigned 128bit integer will be used. + /// WARNING: this id must be unique in your zenoh network. + // id: "1234567890abcdef", + + /// The node's mode (router, peer or client) +// mode: "peer", + +// /// The node's metadata (name, location, DNS name, etc.) Arbitrary JSON data not interpreted by zenohd and available in admin space @/router/ +// metadata: { +// name: "strawberry", +// location: "Penny Lane" +// }, + +// /// Which endpoints to connect to. E.g. tcp/localhost:7447. +// /// By configuring the endpoints, it is possible to tell zenoh which router/peer to connect to at startup. +// connect: { +// endpoints: [ +// "tcp/0.0.0.0:7447" +// ], +// }, + + /// Which endpoints to listen on. E.g. tcp/localhost:7447. + /// By configuring the endpoints, it is possible to tell zenoh which are the endpoints that other routers, + /// peers, or client can use to establish a zenoh session. + listen: { + endpoints: [ + "tcp/0.0.0.0:7447" + ], + }, + /// Configure the scouting mechanisms and their behaviours +// scouting: { +// /// In client mode, the period dedicated to scouting for a router before failing +// timeout: 3000, +// /// In peer mode, the period dedicated to scouting remote peers before attempting other operations +// delay: 200, +// /// The multicast scouting configuration. +// multicast: { +// /// Whether multicast scouting is enabled or not +// enabled: true, +// /// The socket which should be used for multicast scouting +// address: "224.0.0.224:7446", +// /// The network interface which should be used for multicast scouting +// interface: "auto", // If not set or set to "auto" the interface if picked automatically +// /// Which type of Zenoh instances to automatically establish sessions with upon discovery on UDP multicast. +// /// Accepts a single value or different values for router, peer and client. +// /// Each value is bit-or-like combinations of "peer", "router" and "client". +// autoconnect: { router: "", peer: "router|peer" }, +// /// Whether or not to listen for scout messages on UDP multicast and reply to them. +// listen: true, +// }, +// /// The gossip scouting configuration. +// gossip: { +// /// Whether gossip scouting is enabled or not +// enabled: true, +// /// When true, gossip scouting informations are propagated multiple hops to all nodes in the local network. +// /// When false, gossip scouting informations are only propagated to the next hop. +// /// Activating multihop gossip implies more scouting traffic and a lower scalability. +// /// It mostly makes sense when using "linkstate" routing mode where all nodes in the subsystem don't have +// /// direct connectivity with each other. +// multihop: false, +// /// Which type of Zenoh instances to automatically establish sessions with upon discovery on gossip. +// /// Accepts a single value or different values for router, peer and client. +// /// Each value is bit-or-like combinations of "peer", "router" and "client". +// autoconnect: { router: "", peer: "router|peer" }, +// }, +// }, + +// /// Configuration of data messages timestamps management. +// timestamping: { +// /// Whether data messages should be timestamped if not already. +// /// Accepts a single boolean value or different values for router, peer and client. +// enabled: { router: true, peer: false, client: false }, +// /// Whether data messages with timestamps in the future should be dropped or not. +// /// If set to false (default), messages with timestamps in the future are retimestamped. +// /// Timestamps are ignored if timestamping is disabled. +// drop_future_timestamp: false, +// }, + +// /// The default timeout to apply to queries in milliseconds. +// queries_default_timeout: 10000, + +// /// The routing strategy to use and it's configuration. +// routing: { +// /// The routing strategy to use in routers and it's configuration. +// router: { +// /// When set to true a router will forward data between two peers +// /// directly connected to it if it detects that those peers are not +// /// connected to each other. +// /// The failover brokering only works if gossip discovery is enabled. +// peers_failover_brokering: true, +// }, +// /// The routing strategy to use in peers and it's configuration. +// peer: { +// /// The routing strategy to use in peers. ("peer_to_peer" or "linkstate"). +// mode: "peer_to_peer", +// }, +// }, + + // /// The declarations aggregation strategy. + // aggregation: { + // /// A list of key-expressions for which all included subscribers will be aggregated into. + // subscribers: [ + // // key_expression + // ], + // /// A list of key-expressions for which all included publishers will be aggregated into. + // publishers: [ + // // key_expression + // ], + // }, + +// /// Configure internal transport parameters +// transport: { +// unicast: { +// /// Timeout in milliseconds when opening a link +// accept_timeout: 10000, +// /// Maximum number of zenoh session in pending state while accepting +// accept_pending: 100, +// /// Maximum number of sessions that can be simultaneously alive +// max_sessions: 1000, +// /// Maximum number of incoming links that are admitted per session +// max_links: 1, +// /// Enables the LowLatency transport +// /// This option does not make LowLatency transport mandatory, the actual implementation of transport +// /// used will depend on Establish procedure and other party's settings +// /// +// /// NOTE: Currently, the LowLatency transport doesn't preserve QoS prioritization. +// /// NOTE: Due to the note above, 'lowlatency' is incompatible with 'qos' option, so in order to +// /// enable 'lowlatency' you need to explicitly disable 'qos'. +// lowlatency: false, +// /// Enables QoS on unicast communications. +// qos: { +// enabled: true, +// }, +// /// Enables compression on unicast communications. +// /// Compression capabilities are negotiated during session establishment. +// /// If both Zenoh nodes support compression, then compression is activated. +// compression: { +// enabled: false, +// }, +// }, +// multicast: { +// /// Enables QoS on multicast communication. +// /// Default to false for Zenoh-to-Zenoh-Pico out-of-the-box compatibility. +// qos: { +// enabled: false, +// }, +// /// Enables compression on multicast communication. +// /// Default to false for Zenoh-to-Zenoh-Pico out-of-the-box compatibility. +// compression: { +// enabled: false, +// }, +// }, +// link: { +// /// An optional whitelist of protocols to be used for accepting and opening sessions. +// /// If not configured, all the supported protocols are automatically whitelisted. +// /// The supported protocols are: ["tcp" , "udp", "tls", "quic", "ws", "unixsock-stream"] +// /// For example, to only enable "tls" and "quic": +// // protocols: ["tls", "quic"], +// /// Configure the zenoh TX parameters of a link +// tx: { +// /// The resolution in bits to be used for the message sequence numbers. +// /// When establishing a session with another Zenoh instance, the lowest value of the two instances will be used. +// /// Accepted values: 8bit, 16bit, 32bit, 64bit. +// sequence_number_resolution: "32bit", +// /// Link lease duration in milliseconds to announce to other zenoh nodes +// lease: 10000, +// /// Number of keep-alive messages in a link lease duration. If no data is sent, keep alive +// /// messages will be sent at the configured time interval. +// /// NOTE: In order to consider eventual packet loss and transmission latency and jitter, +// /// set the actual keep_alive timeout to one fourth of the lease time. +// /// This is in-line with the ITU-T G.8013/Y.1731 specification on continous connectivity +// /// check which considers a link as failed when no messages are received in 3.5 times the +// /// target interval. +// keep_alive: 4, +// /// Batch size in bytes is expressed as a 16bit unsigned integer. +// /// Therefore, the maximum batch size is 2^16-1 (i.e. 65535). +// /// The default batch size value is the maximum batch size: 65535. +// batch_size: 65535, +// /// Each zenoh link has a transmission queue that can be configured +// queue: { +// /// The size of each priority queue indicates the number of batches a given queue can contain. +// /// The amount of memory being allocated for each queue is then SIZE_XXX * BATCH_SIZE. +// /// In the case of the transport link MTU being smaller than the ZN_BATCH_SIZE, +// /// then amount of memory being allocated for each queue is SIZE_XXX * LINK_MTU. +// /// If qos is false, then only the DATA priority will be allocated. +// size: { +// control: 1, +// real_time: 1, +// interactive_high: 1, +// interactive_low: 1, +// data_high: 2, +// data: 4, +// data_low: 4, +// background: 4, +// }, +// /// The initial exponential backoff time in nanoseconds to allow the batching to eventually progress. +// /// Higher values lead to a more aggressive batching but it will introduce additional latency. +// backoff: 100, +// // Number of threads dedicated to transmission +// // By default, the number of threads is calculated as follows: 1 + ((#cores - 1) / 4) +// // threads: 4, +// }, +// }, +// /// Configure the zenoh RX parameters of a link +// rx: { +// /// Receiving buffer size in bytes for each link +// /// The default the rx_buffer_size value is the same as the default batch size: 65335. +// /// For very high throughput scenarios, the rx_buffer_size can be increased to accomodate +// /// more in-flight data. This is particularly relevant when dealing with large messages. +// /// E.g. for 16MiB rx_buffer_size set the value to: 16777216. +// buffer_size: 65535, +// /// Maximum size of the defragmentation buffer at receiver end. +// /// Fragmented messages that are larger than the configured size will be dropped. +// /// The default value is 1GiB. This would work in most scenarios. +// /// NOTE: reduce the value if you are operating on a memory constrained device. +// max_message_size: 1073741824, +// }, +// /// Configure TLS specific parameters +// tls: { +// /// Path to the certificate of the certificate authority used to validate either the server +// /// or the client's keys and certificates, depending on the node's mode. If not specified +// /// on router mode then the default WebPKI certificates are used instead. +// root_ca_certificate: null, +// /// Path to the TLS server private key +// server_private_key: null, +// /// Path to the TLS server public certificate +// server_certificate: null, +// /// Client authentication, if true enables mTLS (mutual authentication) +// client_auth: false, +// /// Path to the TLS client private key +// client_private_key: null, +// /// Path to the TLS client public certificate +// client_certificate: null, +// // Whether or not to use server name verification, if set to false zenoh will disregard the common names of the certificates when verifying servers. +// // This could be dangerous because your CA can have signed a server cert for foo.com, that's later being used to host a server at baz.com. If you wan't your +// // ca to verify that the server at baz.com is actually baz.com, let this be true (default). +// server_name_verification: null, +// }, +// }, +// /// Shared memory configuration +// shared_memory: { +// enabled: false, +// }, +// /// Access control configuration +// auth: { +// /// The configuration of authentification. +// /// A password implies a username is required. +// usrpwd: { +// user: null, +// password: null, +// /// The path to a file containing the user password dictionary +// dictionary_file: null, +// }, +// pubkey: { +// public_key_pem: null, +// private_key_pem: null, +// public_key_file: null, +// private_key_file: null, +// key_size: null, +// known_keys_file: null, +// }, +// }, +// }, + + /// Configure the Admin Space + /// Unstable: this configuration part works as advertised, but may change in a future release + // adminspace: { + // // read and/or write permissions on the admin space + // permissions: { + // read: true, + // write: false, + // }, + // }, + + /// + /// Plugins configurations + /// + // /// Directories where plugins configured by name should be looked for. Plugins configured by __path__ are not subject to lookup + // plugins_search_dirs: [], + // /// Plugins are only loaded if present in the configuration. When starting + // /// Once loaded, they may react to changes in the configuration made through the zenoh instance's adminspace. + // plugins: { + // /// If no `__path__` is given to a plugin, zenohd will automatically search for a shared library matching the plugin's name (here, `libzenoh_plugin_rest.so` would be searched for on linux) + // + // /// Plugin settings may contain field `__config__` + // /// - If `__config__` is specified, it's content is merged into plugin configuration + // /// - Properties loaded from `__config__` file overrides existing properties + // /// - If json objects in loaded file contains `__config__` properties, they are processed recursively + // /// This is used in the 'storcge_manager' which supports subplugins, each with it's own config + // /// + // /// See below exapmle of plugin configuration using `__config__` property + // + // /// Configure the REST API plugin + // rest: { + // /// Setting this option to true allows zenohd to panic should it detect issues with this plugin. Setting it to false politely asks the plugin not to panic. + // __required__: true, // defaults to false + // /// load configuration from the file + // __config__: "./plugins/zenoh-plugin-rest/config.json5", + // /// http port to answer to rest requests + // http_port: 8000, + // }, + // + // /// Configure the storage manager plugin + // storage_manager: { + // /// When a path is present, automatic search is disabled, and zenohd will instead select the first path which manages to load. + // __path__: [ + // "./target/release/libzenoh_plugin_storage_manager.so", + // "./target/release/libzenoh_plugin_storage_manager.dylib", + // ], + // /// Directories where plugins configured by name should be looked for. Plugins configured by __path__ are not subject to lookup + // backend_search_dirs: [], + // /// The "memory" volume is always available, but you may create other volumes here, with various backends to support the actual storing. + // volumes: { + // /// An influxdb backend is also available at https://github.com/eclipse-zenoh/zenoh-backend-influxdb + // influxdb: { + // url: "https://myinfluxdb.example", + // /// Some plugins may need passwords in their configuration. + // /// To avoid leaking them through the adminspace, they may be masked behind a privacy barrier. + // /// any value held at the key "private" will not be shown in the adminspace. + // private: { + // username: "user1", + // password: "pw1", + // }, + // }, + // influxdb2: { + // /// A second backend of the same type can be spawned using `__path__`, for examples when different DBs are needed. + // backend: "influxdb", + // private: { + // username: "user2", + // password: "pw2", + // }, + // url: "https://localhost:8086", + // }, + // }, + // + // /// Configure the storages supported by the volumes + // storages: { + // demo: { + // /// Storages always need to know what set of keys they must work with. These sets are defined by a key expression. + // key_expr: "demo/memory/**", + // /// Storages also need to know which volume will be used to actually store their key-value pairs. + // /// The "memory" volume is always available, and doesn't require any per-storage options, so requesting "memory" by string is always sufficient. + // volume: "memory", + // }, + // demo2: { + // key_expr: "demo/memory2/**", + // volume: "memory", + // /// Storage manager plugin handles metadata in order to ensure convergence of distributed storages configured in Zenoh. + // /// Metadata includes the set of wild card updates and deletions (tombstones). + // /// Once the samples are guaranteed to be delivered, the metadata can be garbage collected. + // garbage_collection: { + // /// The garbage collection event will be periodic with this duration. + // /// The duration is specified in seconds. + // period: 30, + // /// Metadata older than this parameter will be garbage collected. + // /// The duration is specified in seconds. + // lifespan: 86400, + // }, + // /// If multiple storages subscribing to the same key_expr should be synchronized, declare them as replicas. + // /// In the absence of this configuration, a normal storage is initialized + // /// Note: all the samples to be stored in replicas should be timestamped + // replica_config: { + // /// Specifying the parameters is optional, by default the values provided will be used. + // /// Time interval between different synchronization attempts in seconds + // publication_interval: 5, + // /// Expected propagation delay of the network in milliseconds + // propagation_delay: 200, + // /// This is the chunk that you would like your data to be divide into in time, in milliseconds. + // /// Higher the frequency of updates, lower the delta should be chosen + // /// To be efficient, delta should be the time containing no more than 100,000 samples + // delta: 1000, + // } + // }, + // demo3: { + // key_expr: "demo/memory3/**", + // volume: "memory", + // /// A complete storage advertises itself as containing all the known keys matching the configured key expression. + // /// If not configured, complete defaults to false. + // complete: "true", + // }, + // influx_demo: { + // key_expr: "demo/influxdb/**", + // /// This prefix will be stripped of the received keys when storing. + // strip_prefix: "demo/influxdb", + // /// influxdb-backed volumes need a bit more configuration, which is passed like-so: + // volume: { + // id: "influxdb", + // db: "example", + // }, + // }, + // influx_demo2: { + // key_expr: "demo/influxdb2/**", + // strip_prefix: "demo/influxdb2", + // volume: { + // id: "influxdb2", + // db: "example", + // }, + // }, + // }, + // }, + // }, + + // /// Plugin configuration example using `__config__` property + // plugins: { + // rest: { + // __config__: "./plugins/zenoh-plugin-rest/config.json5", + // }, + // storage_manager: { + // __config__: "./plugins/zenoh-plugin-storage-manager/config.json5", + // } + // }, + +} diff --git a/example-streamer-uses/src/bin/common/mod.rs b/example-streamer-uses/src/bin/common/mod.rs new file mode 100644 index 00000000..41dc83b4 --- /dev/null +++ b/example-streamer-uses/src/bin/common/mod.rs @@ -0,0 +1,94 @@ +use async_trait::async_trait; +use hello_world_protos::{ + hello_world_service::{HelloRequest, HelloResponse}, + hello_world_topics::Timer, +}; +use log::{debug, error, info}; +use protobuf::Message; +use std::sync::Arc; +use up_rust::{UListener, UMessage, UMessageBuilder, UTransport}; + +#[allow(dead_code)] +pub(crate) struct ServiceResponseListener; + +#[async_trait] +impl UListener for ServiceResponseListener { + async fn on_receive(&self, msg: UMessage) { + info!("ServiceResponseListener: Received a message: {msg:?}"); + + let Some(payload_bytes) = msg.payload else { + panic!("No payload bytes"); + }; + + let Ok(hello_response) = HelloResponse::parse_from_bytes(&payload_bytes) else { + panic!("Unable to parse into HelloResponse"); + }; + + debug!("Here we received response: {hello_response:?}"); + } +} + +#[allow(dead_code)] +pub(crate) struct ServiceRequestResponder { + client: Arc, +} +impl ServiceRequestResponder { + #[allow(dead_code)] + pub(crate) fn new(client: Arc) -> Self { + Self { client } + } +} + +#[async_trait] +impl UListener for ServiceRequestResponder { + async fn on_receive(&self, msg: UMessage) { + info!("ServiceResponseListener: Received a message: {msg:?}"); + + let Some(payload_bytes) = msg.payload else { + panic!("No bytes available"); + }; + let hello_request = match HelloRequest::parse_from_bytes(&payload_bytes) { + Ok(hello_request) => { + debug!("hello_request: {hello_request:?}"); + hello_request + } + Err(err) => { + error!("Unable to parse HelloRequest: {err:?}"); + return; + } + }; + + let hello_response = HelloResponse { + message: format!("The response to the request: {}", hello_request.name), + ..Default::default() + }; + + let response_msg = UMessageBuilder::response_for_request(msg.attributes.as_ref().unwrap()) + .build_with_protobuf_payload(&hello_response) + .unwrap(); + info!("Sending Response message:\n{:?}", &response_msg); + self.client.send(response_msg).await.unwrap(); + } +} + +#[allow(dead_code)] +pub(crate) struct PublishReceiver; + +#[async_trait] +impl UListener for PublishReceiver { + async fn on_receive(&self, msg: UMessage) { + info!("PublishReceiver: Received a message: {msg:?}"); + + let Some(payload_bytes) = msg.payload else { + panic!("No bytes available"); + }; + match Timer::parse_from_bytes(&payload_bytes) { + Ok(timer_message) => { + debug!("timer: {timer_message:?}"); + } + Err(err) => { + error!("Unable to parse Timer Message: {err:?}"); + } + }; + } +} diff --git a/example-streamer-uses/src/bin/mqtt_client.rs b/example-streamer-uses/src/bin/mqtt_client.rs new file mode 100644 index 00000000..f882c33d --- /dev/null +++ b/example-streamer-uses/src/bin/mqtt_client.rs @@ -0,0 +1,105 @@ +/******************************************************************************** + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +mod common; + +use common::ServiceResponseListener; +use hello_world_protos::hello_world_service::HelloRequest; +use log::info; +use std::sync::Arc; +use std::time::Duration; +use up_rust::{UListener, UMessageBuilder, UStatus, UTransport, UUri, UUID}; +use up_transport_mqtt5::{MqttConfig, UPClientMqtt, UPClientMqttType}; + +const SERVICE_AUTHORITY: &str = "authority_B"; +const SERVICE_UE_ID: u32 = 0x1236; +const SERVICE_UE_VERSION_MAJOR: u8 = 1; +const SERVICE_RESOURCE_ID: u16 = 0x0421; + +const CLIENT_AUTHORITY: &str = "authority_A"; +const CLIENT_UE_ID: u32 = 0x4321; +const CLIENT_UE_VERSION_MAJOR: u8 = 1; +const CLIENT_RESOURCE_ID: u16 = 0; + +const REQUEST_TTL: u32 = 1000; + +#[tokio::main] +async fn main() -> Result<(), UStatus> { + env_logger::init(); + + info!("Started mqtt_client."); + + // Source represents the client (specifically the topic that the client sends to) + let source = UUri::try_from_parts( + CLIENT_AUTHORITY, + CLIENT_UE_ID, + CLIENT_UE_VERSION_MAJOR, + CLIENT_RESOURCE_ID, + ) + .unwrap(); + // Sink is the destination entity which the streamer should rout our messages to. + let sink = UUri::try_from_parts( + SERVICE_AUTHORITY, + SERVICE_UE_ID, + SERVICE_UE_VERSION_MAJOR, + SERVICE_RESOURCE_ID, + ) + .unwrap(); + + let ssl_options = None; + + let mqtt_config = MqttConfig { + mqtt_protocol: up_transport_mqtt5::MqttProtocol::Mqtt, + mqtt_port: 1883, + mqtt_hostname: "localhost".to_string(), + max_buffered_messages: 100, + max_subscriptions: 100, + session_expiry_interval: 3600, + ssl_options: ssl_options, + username: "user".to_string(), + }; + + let client: Arc = Arc::new( + UPClientMqtt::new( + mqtt_config, + UUID::build(), + CLIENT_AUTHORITY.to_string(), + UPClientMqttType::Device, + ) + .await + .expect("Could not create mqtt transport."), + ); + + let service_response_listener: Arc = Arc::new(ServiceResponseListener); + client + .register_listener(&sink, Some(&source), service_response_listener) + .await?; + + let mut i = 0; + loop { + tokio::time::sleep(Duration::from_millis(1000)).await; + + let hello_request = HelloRequest { + name: format!("mqtt_client@i={}", i).to_string(), + ..Default::default() + }; + i += 1; + + let request_msg = UMessageBuilder::request(sink.clone(), source.clone(), REQUEST_TTL) + .build_with_protobuf_payload(&hello_request) + .unwrap(); + info!("Sending Request message:\n{request_msg:?}"); + + client.send(request_msg).await?; + } +} diff --git a/example-streamer-uses/src/bin/mqtt_publisher.rs b/example-streamer-uses/src/bin/mqtt_publisher.rs new file mode 100644 index 00000000..7fcd1c70 --- /dev/null +++ b/example-streamer-uses/src/bin/mqtt_publisher.rs @@ -0,0 +1,94 @@ +/******************************************************************************** + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +use chrono::Local; +use chrono::Timelike; +use hello_world_protos::hello_world_topics::Timer; +use hello_world_protos::timeofday::TimeOfDay; +use log::info; +use std::sync::Arc; +use std::time::Duration; +use up_rust::{UMessageBuilder, UStatus, UTransport, UUri, UUID}; +use up_transport_mqtt5::{MqttConfig, UPClientMqtt, UPClientMqttType}; + +const PUB_TOPIC_AUTHORITY: &str = "authority_A"; +const PUB_TOPIC_UE_ID: u32 = 0x5BA0; +const PUB_TOPIC_UE_VERSION_MAJOR: u8 = 1; +const PUB_TOPIC_RESOURCE_ID: u16 = 0x8001; + +#[tokio::main] +async fn main() -> Result<(), UStatus> { + env_logger::init(); + + info!("Started mqtt_publisher."); + + // This is the URI of the publisher entity + let source = UUri::try_from_parts( + PUB_TOPIC_AUTHORITY, + PUB_TOPIC_UE_ID, + PUB_TOPIC_UE_VERSION_MAJOR, + PUB_TOPIC_RESOURCE_ID, + ) + .unwrap(); + + let ssl_options = None; + + let mqtt_config = MqttConfig { + mqtt_protocol: up_transport_mqtt5::MqttProtocol::Mqtt, + mqtt_port: 1883, + mqtt_hostname: "localhost".to_string(), + max_buffered_messages: 100, + max_subscriptions: 100, + session_expiry_interval: 3600, + ssl_options: ssl_options, + username: "user".to_string(), + }; + + let publisher: Arc = Arc::new( + UPClientMqtt::new( + mqtt_config, + UUID::build(), + PUB_TOPIC_AUTHORITY.to_string(), + UPClientMqttType::Device, + ) + .await + .expect("Could not create mqtt transport."), + ); + + loop { + tokio::time::sleep(Duration::from_millis(1000)).await; + + let now = Local::now(); + + let time_of_day = TimeOfDay { + hours: now.hour() as i32, + minutes: now.minute() as i32, + seconds: now.second() as i32, + nanos: now.nanosecond() as i32, + ..Default::default() + }; + + let timer_message = Timer { + time: Some(time_of_day).into(), + ..Default::default() + }; + + // Publish messages signed with the source URI + let publish_msg = UMessageBuilder::publish(source.clone()) + .build_with_protobuf_payload(&timer_message) + .unwrap(); + info!("Sending Publish message:\n{publish_msg:?}"); + + publisher.send(publish_msg).await?; + } +} diff --git a/example-streamer-uses/src/bin/mqtt_service.rs b/example-streamer-uses/src/bin/mqtt_service.rs new file mode 100644 index 00000000..207a077c --- /dev/null +++ b/example-streamer-uses/src/bin/mqtt_service.rs @@ -0,0 +1,81 @@ +/******************************************************************************** + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +mod common; + +use common::ServiceRequestResponder; +use log::info; +use std::sync::Arc; +use std::thread; +use up_rust::{UListener, UStatus, UTransport, UUri, UUID}; +use up_transport_mqtt5::{MqttConfig, UPClientMqtt, UPClientMqttType}; + +const SERVICE_AUTHORITY: &str = "authority_A"; +const SERVICE_UE_ID: u32 = 0x4321; +const SERVICE_UE_VERSION_MAJOR: u8 = 1; +const SERVICE_RESOURCE_ID: u16 = 0x0421; + +#[tokio::main] +async fn main() -> Result<(), UStatus> { + env_logger::init(); + + info!("Started mqtt_service."); + + // We set the source filter to "any" so that we process messages from all device that send some. + let source_filter = UUri::any(); + // The sink filter gets specified so that we only process messages directed at this entity. + let sink_filter = UUri::try_from_parts( + SERVICE_AUTHORITY, + SERVICE_UE_ID, + SERVICE_UE_VERSION_MAJOR, + SERVICE_RESOURCE_ID, + ) + .unwrap(); + + let ssl_options = None; + + let mqtt_config = MqttConfig { + mqtt_protocol: up_transport_mqtt5::MqttProtocol::Mqtt, + mqtt_port: 1883, + mqtt_hostname: "localhost".to_string(), + max_buffered_messages: 100, + max_subscriptions: 100, + session_expiry_interval: 3600, + ssl_options: ssl_options, + username: "user".to_string(), + }; + + let service: Arc = Arc::new( + UPClientMqtt::new( + mqtt_config, + UUID::build(), + SERVICE_AUTHORITY.to_string(), + UPClientMqttType::Device, // Todo: make sure that UPClientMqttType::Cloud also works + ) + .await + .expect("Could not create mqtt transport."), + ); + + let service_request_responder: Arc = + Arc::new(ServiceRequestResponder::new(service.clone())); + service + .register_listener( + &source_filter, + Some(&sink_filter), + service_request_responder.clone(), + ) + .await?; + + thread::park(); + Ok(()) +} diff --git a/example-streamer-uses/src/bin/mqtt_subscriber.rs b/example-streamer-uses/src/bin/mqtt_subscriber.rs new file mode 100644 index 00000000..c4c46944 --- /dev/null +++ b/example-streamer-uses/src/bin/mqtt_subscriber.rs @@ -0,0 +1,76 @@ +/******************************************************************************** + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +mod common; + +use common::PublishReceiver; +use log::info; +use std::sync::Arc; +use std::thread; +use up_rust::{UListener, UStatus, UTransport, UUri, UUID}; +use up_transport_mqtt5::{MqttConfig, UPClientMqtt, UPClientMqttType}; + +const PUB_TOPIC_AUTHORITY: &str = "authority_B"; +const PUB_TOPIC_UE_ID: u32 = 0x3039; +const PUB_TOPIC_UE_VERSION_MAJOR: u8 = 1; +const PUB_TOPIC_RESOURCE_ID: u16 = 0x8001; + +const SUB_TOPIC_AUTHORITY: &str = "authority_A"; + +#[tokio::main] +async fn main() -> Result<(), UStatus> { + env_logger::init(); + + info!("Started mqtt_subscriber."); + + // Here we define which sources we want to accept messages from + let source_filter = UUri::try_from_parts( + PUB_TOPIC_AUTHORITY, + PUB_TOPIC_UE_ID, + PUB_TOPIC_UE_VERSION_MAJOR, + PUB_TOPIC_RESOURCE_ID, + ) + .unwrap(); + + let ssl_options = None; + + let mqtt_config = MqttConfig { + mqtt_protocol: up_transport_mqtt5::MqttProtocol::Mqtt, + mqtt_port: 1883, + mqtt_hostname: "localhost".to_string(), + max_buffered_messages: 100, + max_subscriptions: 100, + session_expiry_interval: 3600, + ssl_options: ssl_options, + username: "user".to_string(), + }; + + let subscriber: Arc = Arc::new( + UPClientMqtt::new( + mqtt_config, + UUID::build(), + SUB_TOPIC_AUTHORITY.to_string(), + UPClientMqttType::Device, + ) + .await + .expect("Could not create mqtt transport."), + ); + + let publish_receiver: Arc = Arc::new(PublishReceiver); + subscriber + .register_listener(&source_filter, None, publish_receiver.clone()) + .await?; + + thread::park(); + Ok(()) +} diff --git a/example-streamer-uses/src/bin/me_client.rs b/example-streamer-uses/src/bin/someip_client.rs similarity index 73% rename from example-streamer-uses/src/bin/me_client.rs rename to example-streamer-uses/src/bin/someip_client.rs index 541098a5..cd6eaf84 100644 --- a/example-streamer-uses/src/bin/me_client.rs +++ b/example-streamer-uses/src/bin/someip_client.rs @@ -11,63 +11,45 @@ * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ -use async_trait::async_trait; -use hello_world_protos::hello_world_service::{HelloRequest, HelloResponse}; -use log::trace; -use protobuf::Message; +mod common; + +use common::ServiceResponseListener; +use hello_world_protos::hello_world_service::HelloRequest; +use log::{info, trace}; use std::fs::canonicalize; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; -use up_rust::{UListener, UMessage, UMessageBuilder, UStatus, UTransport, UUri}; +use up_rust::{UListener, UMessageBuilder, UStatus, UTransport, UUri}; use up_transport_vsomeip::UPTransportVsomeip; -const SERVICE_AUTHORITY: &str = "linux"; +const SERVICE_AUTHORITY: &str = "authority_B"; const SERVICE_UE_ID: u32 = 0x1236; const SERVICE_UE_VERSION_MAJOR: u8 = 1; -const SERVICE_RESOURCE_ID: u16 = 0x0896; +const SERVICE_RESOURCE_ID: u16 = 0x0421; -const CLIENT_AUTHORITY: &str = "me_authority"; +const CLIENT_AUTHORITY: &str = "authority_A"; const CLIENT_UE_ID: u32 = 0x5678; const CLIENT_UE_VERSION_MAJOR: u8 = 1; const CLIENT_RESOURCE_ID: u16 = 0; const REQUEST_TTL: u32 = 1000; -const REMOTE_AUTHORITY: &str = "linux"; +const REMOTE_AUTHORITY: &str = "authority_B"; fn client_uuri() -> UUri { UUri::try_from_parts(CLIENT_AUTHORITY, CLIENT_UE_ID, CLIENT_UE_VERSION_MAJOR, 0).unwrap() } -struct ServiceResponseListener; - -#[async_trait] -impl UListener for ServiceResponseListener { - async fn on_receive(&self, msg: UMessage) { - println!("ServiceResponseListener: Received a message: {msg:?}"); - - let Some(payload_bytes) = msg.payload else { - panic!("No payload bytes"); - }; - - let Ok(hello_response) = HelloResponse::parse_from_bytes(&payload_bytes) else { - panic!("Unable to parse into HelloResponse"); - }; - - println!("Here we received response: {hello_response:?}"); - } -} - #[tokio::main] async fn main() -> Result<(), UStatus> { env_logger::init(); - println!("mE_client"); + info!("Started someip_client"); let crate_dir = env!("CARGO_MANIFEST_DIR"); // TODO: Make configurable to pass the path to the vsomeip config as a command line argument - let vsomeip_config = PathBuf::from(crate_dir).join("vsomeip-configs/mE_client.json"); + let vsomeip_config = PathBuf::from(crate_dir).join("vsomeip-configs/someip_client.json"); let vsomeip_config = canonicalize(vsomeip_config).ok(); trace!("vsomeip_config: {vsomeip_config:?}"); @@ -116,7 +98,7 @@ async fn main() -> Result<(), UStatus> { let request_msg = UMessageBuilder::request(sink.clone(), source.clone(), REQUEST_TTL) .build_with_protobuf_payload(&hello_request) .unwrap(); - println!("Sending Request message:\n{request_msg:?}"); + info!("Sending Request message:\n{request_msg:?}"); client.send(request_msg).await?; } diff --git a/example-streamer-uses/src/bin/me_publisher.rs b/example-streamer-uses/src/bin/someip_publisher.rs similarity index 92% rename from example-streamer-uses/src/bin/me_publisher.rs rename to example-streamer-uses/src/bin/someip_publisher.rs index f038ad46..43fed92c 100644 --- a/example-streamer-uses/src/bin/me_publisher.rs +++ b/example-streamer-uses/src/bin/someip_publisher.rs @@ -15,6 +15,7 @@ use chrono::Local; use chrono::Timelike; use hello_world_protos::hello_world_topics::Timer; use hello_world_protos::timeofday::TimeOfDay; +use log::info; use log::trace; use std::fs::canonicalize; use std::path::PathBuf; @@ -23,12 +24,12 @@ use std::time::Duration; use up_rust::{UMessageBuilder, UStatus, UTransport, UUri}; use up_transport_vsomeip::UPTransportVsomeip; -const PUB_TOPIC_AUTHORITY: &str = "me_authority"; +const PUB_TOPIC_AUTHORITY: &str = "authority_A"; const PUB_TOPIC_UE_ID: u32 = 0x5BA0; const PUB_TOPIC_UE_VERSION_MAJOR: u8 = 1; const PUB_TOPIC_RESOURCE_ID: u16 = 0x8001; -const REMOTE_AUTHORITY: &str = "linux"; +const REMOTE_AUTHORITY: &str = "authority_B"; fn publisher_uuri() -> UUri { UUri::try_from_parts( @@ -44,11 +45,11 @@ fn publisher_uuri() -> UUri { async fn main() -> Result<(), UStatus> { env_logger::init(); - println!("mE_publisher"); + info!("Started someip_publisher"); let crate_dir = env!("CARGO_MANIFEST_DIR"); // TODO: Make configurable to pass the path to the vsomeip config as a command line argument - let vsomeip_config = PathBuf::from(crate_dir).join("vsomeip-configs/mE_publisher.json"); + let vsomeip_config = PathBuf::from(crate_dir).join("vsomeip-configs/someip_publisher.json"); let vsomeip_config = canonicalize(vsomeip_config).ok(); trace!("vsomeip_config: {vsomeip_config:?}"); @@ -92,7 +93,7 @@ async fn main() -> Result<(), UStatus> { let publish_msg = UMessageBuilder::publish(source.clone()) .build_with_protobuf_payload(&timer_message) .unwrap(); - println!("Sending Publish message:\n{publish_msg:?}"); + info!("Sending Publish message:\n{publish_msg:?}"); publisher.send(publish_msg).await?; } diff --git a/example-streamer-uses/src/bin/me_service.rs b/example-streamer-uses/src/bin/someip_service.rs similarity index 59% rename from example-streamer-uses/src/bin/me_service.rs rename to example-streamer-uses/src/bin/someip_service.rs index 8c016d59..9d64c7bd 100644 --- a/example-streamer-uses/src/bin/me_service.rs +++ b/example-streamer-uses/src/bin/someip_service.rs @@ -11,23 +11,23 @@ * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ -use async_trait::async_trait; -use hello_world_protos::hello_world_service::{HelloRequest, HelloResponse}; -use log::{error, trace}; -use protobuf::Message; +mod common; + +use common::ServiceRequestResponder; +use log::{info, trace}; use std::fs::canonicalize; use std::path::PathBuf; use std::sync::Arc; use std::thread; -use up_rust::{UListener, UMessage, UMessageBuilder, UStatus, UTransport, UUri}; +use up_rust::{UListener, UStatus, UTransport, UUri}; use up_transport_vsomeip::UPTransportVsomeip; -const SERVICE_AUTHORITY: &str = "me_authority"; +const SERVICE_AUTHORITY: &str = "authority_A"; const SERVICE_UE_ID: u32 = 0x4321; const SERVICE_UE_VERSION_MAJOR: u8 = 1; const SERVICE_RESOURCE_ID: u16 = 0x0421; -const REMOTE_AUTHORITY: &str = "linux"; +const REMOTE_AUTHORITY: &str = "authority_B"; fn service_uuri() -> UUri { UUri::try_from_parts( @@ -39,54 +39,15 @@ fn service_uuri() -> UUri { .unwrap() } -struct ServiceRequestResponder { - client: Arc, -} -impl ServiceRequestResponder { - pub fn new(client: Arc) -> Self { - Self { client } - } -} -#[async_trait] -impl UListener for ServiceRequestResponder { - async fn on_receive(&self, msg: UMessage) { - println!("ServiceRequestResponder: Received a message: {msg:?}"); - - let Some(payload_bytes) = msg.payload else { - panic!("No bytes available"); - }; - let hello_request = match HelloRequest::parse_from_bytes(&payload_bytes) { - Ok(hello_request) => { - println!("hello_request: {hello_request:?}"); - hello_request - } - Err(err) => { - error!("Unable to parse HelloRequest: {err:?}"); - return; - } - }; - - let hello_response = HelloResponse { - message: format!("The response to the request: {}", hello_request.name), - ..Default::default() - }; - - let response_msg = UMessageBuilder::response_for_request(msg.attributes.as_ref().unwrap()) - .build_with_protobuf_payload(&hello_response) - .unwrap(); - self.client.send(response_msg).await.unwrap(); - } -} - #[tokio::main] async fn main() -> Result<(), UStatus> { env_logger::init(); - println!("mE_service"); + info!("Started someip_service"); let crate_dir = env!("CARGO_MANIFEST_DIR"); // TODO: Make configurable to pass the path to the vsomeip config as a command line argument - let vsomeip_config = PathBuf::from(crate_dir).join("vsomeip-configs/mE_service.json"); + let vsomeip_config = PathBuf::from(crate_dir).join("vsomeip-configs/someip_service.json"); let vsomeip_config = canonicalize(vsomeip_config).ok(); trace!("vsomeip_config: {vsomeip_config:?}"); diff --git a/example-streamer-uses/src/bin/me_subscriber.rs b/example-streamer-uses/src/bin/someip_subscriber.rs similarity index 66% rename from example-streamer-uses/src/bin/me_subscriber.rs rename to example-streamer-uses/src/bin/someip_subscriber.rs index 5bcc0ea9..d73ec5da 100644 --- a/example-streamer-uses/src/bin/me_subscriber.rs +++ b/example-streamer-uses/src/bin/someip_subscriber.rs @@ -11,60 +11,36 @@ * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ -use async_trait::async_trait; -use hello_world_protos::hello_world_topics::Timer; -use log::{error, trace}; -use protobuf::Message; +mod common; + +use common::PublishReceiver; +use log::trace; use std::fs::canonicalize; use std::path::PathBuf; use std::sync::Arc; use std::thread; -use up_rust::{UListener, UMessage, UStatus, UTransport, UUri}; +use up_rust::{UListener, UStatus, UTransport, UUri}; use up_transport_vsomeip::UPTransportVsomeip; -const SUBSCRIBER_AUTHORITY: &str = "me_authority"; -const SUBSCRIBER_UE_ID: u32 = 0x1236; -const SUBSCRIBER_UE_VERSION_MAJOR: u8 = 1; +const SUB_TOPIC_AUTHORITY: &str = "authority_B"; +const SUB_TOPIC_UE_ID: u32 = 0x5BB0; +const SUB_TOPIC_UE_VERSION_MAJOR: u8 = 1; -const PUB_TOPIC_AUTHORITY: &str = "linux"; +const PUB_TOPIC_AUTHORITY: &str = "authority_B"; const PUB_TOPIC_UE_ID: u32 = 0x3039; const PUB_TOPIC_UE_VERSION_MAJOR: u8 = 1; const PUB_TOPIC_RESOURCE_ID: u16 = 0x8001; fn subscriber_uuri() -> UUri { UUri::try_from_parts( - SUBSCRIBER_AUTHORITY, - SUBSCRIBER_UE_ID, - SUBSCRIBER_UE_VERSION_MAJOR, + SUB_TOPIC_AUTHORITY, + SUB_TOPIC_UE_ID, + SUB_TOPIC_UE_VERSION_MAJOR, 0, ) .unwrap() } -#[allow(dead_code)] -struct PublishReceiver; - -#[async_trait] -impl UListener for PublishReceiver { - async fn on_receive(&self, msg: UMessage) { - println!("PublishReceiver: Received a message: {msg:?}"); - - let Some(payload_bytes) = msg.payload else { - panic!("No bytes available"); - }; - match Timer::parse_from_bytes(&payload_bytes) { - Ok(timer_message) => { - println!("timer: {timer_message:?}"); - timer_message - } - Err(err) => { - error!("Unable to parse Timer Message: {err:?}"); - return; - } - }; - } -} - #[tokio::main] async fn main() -> Result<(), UStatus> { env_logger::init(); @@ -73,7 +49,7 @@ async fn main() -> Result<(), UStatus> { let crate_dir = env!("CARGO_MANIFEST_DIR"); // TODO: Make configurable to pass the path to the vsomeip config as a command line argument - let vsomeip_config = PathBuf::from(crate_dir).join("vsomeip-configs/mE_subscriber.json"); + let vsomeip_config = PathBuf::from(crate_dir).join("vsomeip-configs/someip_subscriber.json"); let vsomeip_config = canonicalize(vsomeip_config).ok(); trace!("vsomeip_config: {vsomeip_config:?}"); diff --git a/example-streamer-uses/src/bin/ue_client.rs b/example-streamer-uses/src/bin/zenoh_client.rs similarity index 77% rename from example-streamer-uses/src/bin/ue_client.rs rename to example-streamer-uses/src/bin/zenoh_client.rs index 3780a1c3..3a185d2b 100644 --- a/example-streamer-uses/src/bin/ue_client.rs +++ b/example-streamer-uses/src/bin/zenoh_client.rs @@ -11,23 +11,25 @@ * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ -use async_trait::async_trait; +mod common; + use clap::Parser; -use hello_world_protos::hello_world_service::{HelloRequest, HelloResponse}; -use protobuf::Message; +use common::ServiceResponseListener; +use hello_world_protos::hello_world_service::HelloRequest; +use log::{debug, info}; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; -use up_rust::{UListener, UMessage, UMessageBuilder, UStatus, UTransport, UUri}; +use up_rust::{UListener, UMessageBuilder, UStatus, UTransport, UUri}; use up_transport_zenoh::UPTransportZenoh; use zenoh::config::{Config, EndPoint}; -const SERVICE_AUTHORITY: &str = "me_authority"; +const SERVICE_AUTHORITY: &str = "authority_A"; const SERVICE_UE_ID: u32 = 0x4321; const SERVICE_UE_VERSION_MAJOR: u8 = 1; const SERVICE_RESOURCE_ID: u16 = 0x0421; -const CLIENT_AUTHORITY: &str = "linux"; +const CLIENT_AUTHORITY: &str = "authority_B"; const CLIENT_UE_ID: u32 = 0x1236; const CLIENT_UE_VERSION_MAJOR: u8 = 1; const CLIENT_RESOURCE_ID: u16 = 0; @@ -52,32 +54,13 @@ struct Args { endpoint: String, } -struct ServiceResponseListener; - -#[async_trait] -impl UListener for ServiceResponseListener { - async fn on_receive(&self, msg: UMessage) { - println!("ServiceResponseListener: Received a message: {msg:?}"); - - let Some(payload_bytes) = msg.payload else { - panic!("No payload bytes"); - }; - - let Ok(hello_response) = HelloResponse::parse_from_bytes(&payload_bytes) else { - panic!("Unable to parse into HelloResponse"); - }; - - println!("Here we received response: {hello_response:?}"); - } -} - #[tokio::main] async fn main() -> Result<(), UStatus> { env_logger::init(); let args = Args::parse(); - println!("uE_client"); + info!("Started zenoh_client"); let mut zenoh_config = Config::default(); @@ -134,7 +117,8 @@ async fn main() -> Result<(), UStatus> { let request_msg = UMessageBuilder::request(sink.clone(), source.clone(), REQUEST_TTL) .build_with_protobuf_payload(&hello_request) .unwrap(); - println!("Sending Request message:\n{request_msg:?}"); + debug!("Invoking URI {} with response URI {}", &sink, &source); + info!("Sending Request message:\n{:?}", &request_msg); client.send(request_msg).await?; } diff --git a/example-streamer-uses/src/bin/ue_publisher.rs b/example-streamer-uses/src/bin/zenoh_publisher.rs similarity index 98% rename from example-streamer-uses/src/bin/ue_publisher.rs rename to example-streamer-uses/src/bin/zenoh_publisher.rs index 38d1cf51..81eab92d 100644 --- a/example-streamer-uses/src/bin/ue_publisher.rs +++ b/example-streamer-uses/src/bin/zenoh_publisher.rs @@ -23,7 +23,7 @@ use up_rust::{UMessageBuilder, UStatus, UTransport, UUri}; use up_transport_zenoh::UPTransportZenoh; use zenoh::config::{Config, EndPoint}; -const PUB_TOPIC_AUTHORITY: &str = "linux"; +const PUB_TOPIC_AUTHORITY: &str = "authority_B"; const PUB_TOPIC_UE_ID: u32 = 0x3039; const PUB_TOPIC_UE_VERSION_MAJOR: u8 = 1; const PUB_TOPIC_RESOURCE_ID: u16 = 0x8001; diff --git a/example-streamer-uses/src/bin/ue_service.rs b/example-streamer-uses/src/bin/zenoh_service.rs similarity index 61% rename from example-streamer-uses/src/bin/ue_service.rs rename to example-streamer-uses/src/bin/zenoh_service.rs index a0205ce1..b1f6e15b 100644 --- a/example-streamer-uses/src/bin/ue_service.rs +++ b/example-streamer-uses/src/bin/zenoh_service.rs @@ -11,22 +11,22 @@ * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ -use async_trait::async_trait; +mod common; + use clap::Parser; -use hello_world_protos::hello_world_service::{HelloRequest, HelloResponse}; -use log::error; -use protobuf::Message; +use common::ServiceRequestResponder; +use log::info; use std::str::FromStr; use std::sync::Arc; use std::thread; -use up_rust::{UListener, UMessage, UMessageBuilder, UStatus, UTransport, UUri}; +use up_rust::{UListener, UStatus, UTransport, UUri}; use up_transport_zenoh::UPTransportZenoh; use zenoh::config::{Config, EndPoint}; -const SERVICE_AUTHORITY: &str = "linux"; +const SERVICE_AUTHORITY: &str = "authority_B"; const SERVICE_UE_ID: u32 = 0x1236; const SERVICE_UE_VERSION_MAJOR: u8 = 1; -const SERVICE_RESOURCE_ID: u16 = 0x0896; +const SERVICE_RESOURCE_ID: u16 = 0x0421; fn service_uuri() -> UUri { UUri::try_from_parts( @@ -38,15 +38,6 @@ fn service_uuri() -> UUri { .unwrap() } -struct ServiceRequestResponder { - client: Arc, -} -impl ServiceRequestResponder { - pub fn new(client: Arc) -> Self { - Self { client } - } -} - #[derive(Parser, Debug)] #[command(version, about, long_about = None)] struct Args { @@ -55,44 +46,13 @@ struct Args { endpoint: String, } -#[async_trait] -impl UListener for ServiceRequestResponder { - async fn on_receive(&self, msg: UMessage) { - println!("ServiceRequestResponder: Received a message: {msg:?}"); - - let Some(payload_bytes) = msg.payload else { - panic!("No bytes available"); - }; - let hello_request = match HelloRequest::parse_from_bytes(&payload_bytes) { - Ok(hello_request) => { - println!("hello_request: {hello_request:?}"); - hello_request - } - Err(err) => { - error!("Unable to parse HelloRequest: {err:?}"); - return; - } - }; - - let hello_response = HelloResponse { - message: format!("The response to the request: {}", hello_request.name), - ..Default::default() - }; - - let response_msg = UMessageBuilder::response_for_request(msg.attributes.as_ref().unwrap()) - .build_with_protobuf_payload(&hello_response) - .unwrap(); - self.client.send(response_msg).await.unwrap(); - } -} - #[tokio::main] async fn main() -> Result<(), UStatus> { env_logger::init(); let args = Args::parse(); - println!("uE_service"); + info!("Started zenoh_service"); let mut zenoh_config = Config::default(); diff --git a/example-streamer-uses/src/bin/ue_subscriber.rs b/example-streamer-uses/src/bin/zenoh_subscriber.rs similarity index 73% rename from example-streamer-uses/src/bin/ue_subscriber.rs rename to example-streamer-uses/src/bin/zenoh_subscriber.rs index 19c7aeef..243808a3 100644 --- a/example-streamer-uses/src/bin/ue_subscriber.rs +++ b/example-streamer-uses/src/bin/zenoh_subscriber.rs @@ -11,24 +11,24 @@ * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ -use async_trait::async_trait; +mod common; + use clap::Parser; -use hello_world_protos::hello_world_topics::Timer; -use log::error; -use protobuf::Message; +use common::PublishReceiver; +use log::info; use std::str::FromStr; use std::sync::Arc; use std::thread; -use up_rust::{UListener, UMessage, UStatus, UTransport, UUri}; +use up_rust::{UListener, UStatus, UTransport, UUri}; use up_transport_zenoh::UPTransportZenoh; use zenoh::config::{Config, EndPoint}; -const PUB_TOPIC_AUTHORITY: &str = "me_authority"; +const PUB_TOPIC_AUTHORITY: &str = "authority_A"; const PUB_TOPIC_UE_ID: u32 = 0x5BA0; const PUB_TOPIC_UE_VERSION_MAJOR: u8 = 1; const PUB_TOPIC_RESOURCE_ID: u16 = 0x8001; -const SUB_TOPIC_AUTHORITY: &str = "linux"; +const SUB_TOPIC_AUTHORITY: &str = "authority_B"; const SUB_TOPIC_UE_ID: u32 = 0x5BB0; const SUB_TOPIC_UE_VERSION_MAJOR: u8 = 1; @@ -42,9 +42,6 @@ fn subscriber_uuri() -> UUri { .unwrap() } -#[allow(dead_code)] -struct PublishReceiver; - #[derive(Parser, Debug)] #[command(version, about, long_about = None)] struct Args { @@ -53,32 +50,13 @@ struct Args { endpoint: String, } -#[async_trait] -impl UListener for PublishReceiver { - async fn on_receive(&self, msg: UMessage) { - println!("PublishReceiver: Received a message: {msg:?}"); - - let Some(payload_bytes) = msg.payload else { - panic!("No bytes available"); - }; - match Timer::parse_from_bytes(&payload_bytes) { - Ok(timer_message) => { - println!("timer: {timer_message:?}"); - } - Err(err) => { - error!("Unable to parse Timer Message: {err:?}"); - } - }; - } -} - #[tokio::main] async fn main() -> Result<(), UStatus> { env_logger::init(); let args = Args::parse(); - println!("uE_subscriber"); + info!("Started zenoh_subscriber"); let mut zenoh_config = Config::default(); diff --git a/example-streamer-uses/vsomeip-configs/mE_client.json b/example-streamer-uses/vsomeip-configs/someip_client.json similarity index 100% rename from example-streamer-uses/vsomeip-configs/mE_client.json rename to example-streamer-uses/vsomeip-configs/someip_client.json diff --git a/example-streamer-uses/vsomeip-configs/mE_publisher.json b/example-streamer-uses/vsomeip-configs/someip_publisher.json similarity index 100% rename from example-streamer-uses/vsomeip-configs/mE_publisher.json rename to example-streamer-uses/vsomeip-configs/someip_publisher.json diff --git a/example-streamer-uses/vsomeip-configs/mE_service.json b/example-streamer-uses/vsomeip-configs/someip_service.json similarity index 100% rename from example-streamer-uses/vsomeip-configs/mE_service.json rename to example-streamer-uses/vsomeip-configs/someip_service.json diff --git a/example-streamer-uses/vsomeip-configs/mE_subscriber.json b/example-streamer-uses/vsomeip-configs/someip_subscriber.json similarity index 100% rename from example-streamer-uses/vsomeip-configs/mE_subscriber.json rename to example-streamer-uses/vsomeip-configs/someip_subscriber.json diff --git a/up-linux-streamer-plugin/README.md b/up-linux-streamer-plugin/README.md index ecd669f4..4ffc14ba 100644 --- a/up-linux-streamer-plugin/README.md +++ b/up-linux-streamer-plugin/README.md @@ -116,7 +116,7 @@ rust-version = "1.74.0" ### Make a build of Zenoh and copy to standalone folder -Within the zenoh folder: +Within the Zenoh folder: ```bash cargo build diff --git a/up-linux-streamer/README.md b/up-linux-streamer/README.md deleted file mode 100644 index 50e3b9dc..00000000 --- a/up-linux-streamer/README.md +++ /dev/null @@ -1,23 +0,0 @@ -# up-linux-streamer - -Concrete implementation of a uStreamer as a binary. - -## Configuration - -Reference the `DEFAULT_CONFIG.json5` configuration file to understand configuration options. - -As well, the `ZENOH_CONFIG.json5` file is used to set Zenoh configurations. By default, it is only used to set listening endpoints, but can be used with more configurations according to [Zenoh's page on it](https://zenoh.io/docs/manual/configuration/#configuration-files). - -### Bundled vsomeip or bring your own - -The default is to build a bundled version of [vsomeip](https://github.com/COVESA/vsomeip) for use by the `up-transport-vsomeip` crate. - -The vsomeip library is used to communicate over [SOME/IP](https://some-ip.com/) to mechatronics devices. - -If you wish to bring your own vsomeip install, you can use the flag `--no-default-features` flag when building with `cargo build`. For more details on required environment variables when building `up-transport-vsomeip-rust`, reference the README for [vsomeip-sys](https://github.com/eclipse-uprotocol/up-transport-vsomeip-rust/tree/main/vsomeip-sys). - -### Running the `up-linux-streamer` - -```bash -LD_LIBRARY_PATH=$LD_LIBRARY_PATH: cargo run -- --config up-linux-streamer/DEFAULT_CONFIG.json5 -``` diff --git a/up-linux-streamer/src/main.rs b/up-linux-streamer/src/main.rs deleted file mode 100644 index a4929afc..00000000 --- a/up-linux-streamer/src/main.rs +++ /dev/null @@ -1,177 +0,0 @@ -/******************************************************************************** - * Copyright (c) 2024 Contributors to the Eclipse Foundation - * - * See the NOTICE file(s) distributed with this work for additional - * information regarding copyright ownership. - * - * This program and the accompanying materials are made available under the - * terms of the Apache License Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - ********************************************************************************/ - -mod config; - -use crate::config::{Config, HostTransport}; -use clap::Parser; -use log::{error, trace}; -use std::fs::File; -use std::io::Read; -use std::str::FromStr; -use std::sync::Arc; -use std::{env, thread}; -use up_rust::{UCode, UStatus, UTransport, UUri}; -use up_streamer::{Endpoint, UStreamer}; -use up_transport_vsomeip::UPTransportVsomeip; -use up_transport_zenoh::UPTransportZenoh; -use usubscription_static_file::USubscriptionStaticFile; -use zenoh::config::Config as ZenohConfig; - -#[derive(Parser)] -#[command()] -struct StreamerArgs { - #[arg(short, long, value_name = "FILE")] - config: String, -} - -#[tokio::main(flavor = "multi_thread")] -async fn main() -> Result<(), UStatus> { - env_logger::init(); - - let args = StreamerArgs::parse(); - - let mut file = File::open(args.config) - .map_err(|e| UStatus::fail_with_code(UCode::NOT_FOUND, format!("File not found: {e:?}")))?; - let mut contents = String::new(); - file.read_to_string(&mut contents).map_err(|e| { - UStatus::fail_with_code( - UCode::INTERNAL, - format!("Unable to read config file: {e:?}"), - ) - })?; - - let config: Config = json5::from_str(&contents).map_err(|e| { - UStatus::fail_with_code( - UCode::INTERNAL, - format!("Unable to parse config file: {e:?}"), - ) - })?; - - let subscription_path = config.usubscription_config.file_path; - let usubscription = Arc::new(USubscriptionStaticFile::new(subscription_path)); - - let mut streamer = match UStreamer::new( - "up-linux-streamer", - config.up_streamer_config.message_queue_size, - usubscription, - ) { - Ok(streamer) => streamer, - Err(error) => panic!("Failed to create uStreamer: {}", error), - }; - - let zenoh_config = match ZenohConfig::from_file(config.zenoh_transport_config.config_file) { - Ok(config) => { - trace!("Able to read zenoh config from file"); - config - } - Err(error) => { - panic!("Unable to read zenoh config from file: {}", error); - } - }; - - let streamer_uuri = UUri::try_from_parts( - &config.streamer_uuri.authority, - config.streamer_uuri.ue_id, - config.streamer_uuri.ue_version_major, - 0, - ) - .expect("Unable to form streamer_uuri"); - - trace!("streamer_uuri: {streamer_uuri:#?}"); - let streamer_uri: String = (&streamer_uuri).into(); - // TODO: Remove this once the error reporting from UPTransportZenoh no longer "hides" - // the underlying reason for the failure on converting uri -> UUri - trace!("streamer_uri: {streamer_uri}"); - let _zenoh_internal_uuri = UUri::from_str(&streamer_uri).map_err(|e| { - let msg = format!("Unable to transform the uri to UUri, e: {e:?}"); - error!("{msg}"); - UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg) - })?; - - let host_transport: Arc = Arc::new(match config.host_config.transport { - HostTransport::Zenoh => UPTransportZenoh::new(zenoh_config, streamer_uri) - .await - .expect("Unable to initialize Zenoh UTransport"), // other host transports can be added here as they become available - }); - - let host_endpoint = Endpoint::new( - "host_endpoint", - &config.streamer_uuri.authority, - host_transport.clone(), - ); - - if config.someip_config.enabled { - let someip_config_file_abs_path = if config.someip_config.config_file.is_relative() { - env::current_exe() - .unwrap() - .parent() - .unwrap() - .join(&config.someip_config.config_file) - } else { - config.someip_config.config_file - }; - trace!("someip_config_file_abs_path: {someip_config_file_abs_path:?}"); - if !someip_config_file_abs_path.exists() { - panic!( - "The specified someip config_file doesn't exist: {someip_config_file_abs_path:?}" - ); - } - - let host_uuri = UUri::try_from_parts( - &config.streamer_uuri.authority, - config - .someip_config - .default_someip_application_id_for_someip_subscriptions as u32, - 1, - 0, - ) - .expect("Unable to make host_uuri"); - - // There will be at most one vsomeip_transport, as there is a connection into device and a streamer - let someip_transport: Arc = Arc::new( - UPTransportVsomeip::new_with_config( - host_uuri, - &config.someip_config.authority, - &someip_config_file_abs_path, - None, - ) - .expect("Unable to initialize vsomeip UTransport"), - ); - - let mechatronics_endpoint = Endpoint::new( - "mechatronics_endpoint", - &config.someip_config.authority, - someip_transport.clone(), - ); - let forwarding_res = streamer - .add_forwarding_rule(mechatronics_endpoint.clone(), host_endpoint.clone()) - .await; - - if let Err(err) = forwarding_res { - panic!("Unable to add forwarding result: {err:?}"); - } - - let forwarding_res = streamer - .add_forwarding_rule(host_endpoint.clone(), mechatronics_endpoint.clone()) - .await; - - if let Err(err) = forwarding_res { - panic!("Unable to add forwarding result: {err:?}"); - } - } - - thread::park(); - - Ok(()) -} diff --git a/up-linux-streamer/vsomeip-configs/point_to_point.json b/up-linux-streamer/vsomeip-configs/point_to_point.json deleted file mode 100644 index 2e6c2dc8..00000000 --- a/up-linux-streamer/vsomeip-configs/point_to_point.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "applications" : - [ - { - "name" : "foo", - "id" : "0x1236" - } - ], - "services" : - [ - { - "service" : "0x1236", - "instance" : "0x0001" - } - ] -} diff --git a/utils/mosquitto/docker-compose.yaml b/utils/mosquitto/docker-compose.yaml new file mode 100644 index 00000000..09e6afd5 --- /dev/null +++ b/utils/mosquitto/docker-compose.yaml @@ -0,0 +1,8 @@ +services: + mosquitto: + image: eclipse-mosquitto:2.0 + volumes: + # read-only prevents the container changing file owners on the host + - ./mosquitto.conf:/mosquitto/config/mosquitto.conf:ro + ports: + - 1883:1883 diff --git a/utils/mosquitto/mosquitto.config b/utils/mosquitto/mosquitto.config new file mode 100644 index 00000000..a6d36377 --- /dev/null +++ b/utils/mosquitto/mosquitto.config @@ -0,0 +1,8 @@ +persistence false +allow_anonymous true +log_type all +log_type debug +log_dest stdout +connection_messages true +listener 1883 + diff --git a/utils/usubscription-static-file/static-configs/testdata.json b/utils/usubscription-static-file/static-configs/testdata.json index efcb9f41..ca03eaa2 100644 --- a/utils/usubscription-static-file/static-configs/testdata.json +++ b/utils/usubscription-static-file/static-configs/testdata.json @@ -1,4 +1,4 @@ { - "//linux/3039/1/8001": ["//me_authority/5678/1/1234"], - "//me_authority/5BA0/1/8001": ["//linux/5678/1/1234"] + "//authority_B/3039/1/8001": ["//authority_A/5678/1/1234"], + "//authority_A/5BA0/1/8001": ["//authority_B/5678/1/1234"] } From edb53927a595fd55198ba8129424e362791113e8 Mon Sep 17 00:00:00 2001 From: Pete LeVasseur Date: Wed, 20 Nov 2024 07:29:45 -0500 Subject: [PATCH 2/2] Test CI with mqtt-transport flag active --- .github/workflows/bundled-lint-and-test.yaml | 6 +++--- .github/workflows/unbundled-lint-and-test.yaml | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/.github/workflows/bundled-lint-and-test.yaml b/.github/workflows/bundled-lint-and-test.yaml index b6d1d201..68b35664 100644 --- a/.github/workflows/bundled-lint-and-test.yaml +++ b/.github/workflows/bundled-lint-and-test.yaml @@ -69,12 +69,12 @@ jobs: - name: cargo clippy without Zenoh & vsomeip transports working-directory: ${{github.workspace}} run: cargo clippy --all-targets -- -W warnings -D warnings - - name: Build the project with Zenoh & vsomeip transports (and thus streamer references) + - name: Build the project with Zenoh & vsomeip & mqtt transports (and thus streamer references) working-directory: ${{github.workspace}} - run: cargo build --features vsomeip-transport,bundled-vsomeip,zenoh-transport + run: cargo build --features vsomeip-transport,bundled-vsomeip,zenoh-transport,mqtt-transport - name: cargo clippy with Zenoh & vsomeip transports (and thus streamer references) working-directory: ${{github.workspace}} - run: cargo clippy --features vsomeip-transport,bundled-vsomeip,zenoh-transport --all-targets -- -W warnings -D warnings + run: cargo clippy --features vsomeip-transport,bundled-vsomeip,zenoh-transport,mqtt-transport --all-targets -- -W warnings -D warnings - name: cargo fmt working-directory: ${{github.workspace}} run: cargo fmt -- --check diff --git a/.github/workflows/unbundled-lint-and-test.yaml b/.github/workflows/unbundled-lint-and-test.yaml index c567ad84..91236a23 100644 --- a/.github/workflows/unbundled-lint-and-test.yaml +++ b/.github/workflows/unbundled-lint-and-test.yaml @@ -166,12 +166,12 @@ jobs: - name: cargo clippy without Zenoh & vsomeip transports working-directory: ${{github.workspace}} run: cargo clippy --all-targets -- -W warnings -D warnings - - name: Build the project with Zenoh & vsomeip transports (and thus streamer references) + - name: Build the project with Zenoh & vsomeip & mqtt transports (and thus streamer references) working-directory: ${{github.workspace}} - run: cargo build --features vsomeip-transport,zenoh-transport - - name: cargo clippy with Zenoh & vsomeip transports (and thus streamer references) + run: cargo build --features vsomeip-transport,zenoh-transport,mqtt-transport + - name: cargo clippy with Zenoh & vsomeip & mqtt transports (and thus streamer references) working-directory: ${{github.workspace}} - run: cargo clippy --features vsomeip-transport,zenoh-transport --all-targets -- -W warnings -D warnings + run: cargo clippy --features vsomeip-transport,zenoh-transport,mqtt-transport --all-targets -- -W warnings -D warnings - name: cargo fmt working-directory: ${{github.workspace}} run: cargo fmt -- --check