From e30f00b671df24e73274036fb6cc05c33ea93ac2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20Ml=C3=A1dek?= Date: Thu, 29 Aug 2024 23:44:32 +0200 Subject: [PATCH 1/5] Set workspace resolver --- Cargo.toml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index 4988320..5a4f580 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,7 @@ [workspace] +resolver = "2" + members = [ "spotflow", "spotflow-c", From c89469e9c1564f2a177203fb41a2b8e5e8b5c087 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20Ml=C3=A1dek?= Date: Thu, 29 Aug 2024 23:45:21 +0200 Subject: [PATCH 2/5] Upgrade dependencies --- spotflow-c/Cargo.toml | 6 +++--- spotflow-py/Cargo.toml | 2 +- spotflow/Cargo.toml | 14 +++++++------- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/spotflow-c/Cargo.toml b/spotflow-c/Cargo.toml index 1c65f7b..e755559 100644 --- a/spotflow-c/Cargo.toml +++ b/spotflow-c/Cargo.toml @@ -13,11 +13,11 @@ crate-type = ["cdylib", "staticlib"] spotflow = { path = "../spotflow", version = "0.7.0", features = ["openssl-vendored"] } anyhow = "1.0.57" chrono = "0.4.20" -simple_logger = { version = "4.3.0", features = ["stderr"] } -http = "0.2.6" +simple_logger = { version = "5.0.0", features = ["stderr"] } +http = "1.1.0" # Used because stable std does not have c_size_t libc = "0.2.121" log = "0.4.16" [build-dependencies] -cbindgen = "0.21.0" +cbindgen = "0.26.0" diff --git a/spotflow-py/Cargo.toml b/spotflow-py/Cargo.toml index 2759320..6bd906e 100644 --- a/spotflow-py/Cargo.toml +++ b/spotflow-py/Cargo.toml @@ -11,7 +11,7 @@ crate-type = ["cdylib"] [dependencies] chrono = "0.4.19" spotflow = { path = "../spotflow", version = "0.7.0", features = ["openssl-vendored"] } -http = "0.2.8" +http = "1.1.0" log = "0.4.17" pyo3 = { version = "0.19.0", features = ["extension-module", "serde", "abi3-py37"] } pyo3-log = "0.8.2" diff --git a/spotflow/Cargo.toml b/spotflow/Cargo.toml index ddaf50f..536d1df 100644 --- a/spotflow/Cargo.toml +++ b/spotflow/Cargo.toml @@ -14,20 +14,20 @@ openssl-vendored = ["openssl/vendored"] [dependencies] anyhow = "1.0.56" async-trait = "0.1.61" -brotli = "3.3.4" +brotli = "6.0.0" chrono = { version = "0.4.19", features = ["serde"] } -http = "0.2.6" -json-patch = "0.2.6" +http = "1.1.0" +json-patch = "2.0.0" log = "0.4.16" native-tls = "0.2.8" openssl = { version = "0.10.29", optional = true } rumqttc = { package = "spotflow-rumqttc-fork", version = "0.12.0", features = ["use-native-tls"], default-features = false } serde = { version = "1.0.136", features = ["derive"] } serde_json = "1.0.79" -sqlx = { version = "0.6.2", features = ["sqlite", "chrono", "macros", "runtime-tokio-native-tls", "offline"] } +sqlx = { version = "0.7.4", features = ["sqlite", "chrono", "macros", "runtime-tokio", "tls-native-tls"] } thiserror = "1.0.30" time = "0.3.36" -tokio = { version = "1.17.0", features = ["rt", "sync", "macros"] } +tokio = { version = "1.17.0", features = ["rt", "sync", "macros", "rt-multi-thread"] } tokio-util = "0.7.1" ureq = { version = "2.4.0", features = ["json", "native-tls"], default-features = false } urlencoding = "2.1.0" @@ -35,12 +35,12 @@ uuid = { version = "1.0.0", features = ["v4"] } [build-dependencies] tokio = { version = "1.17.0", features = ["rt"] } -sqlx = { version = "0.6.2", features = ["sqlite", "runtime-tokio-native-tls"] } +sqlx = { version = "0.7.4", features = ["sqlite", "runtime-tokio-native-tls"] } [dev-dependencies] azure_core = "0.1.1" azure_identity = "0.1.1" -env_logger = "0.9.0" +env_logger = "0.11.0" azure_storage = "0.1.0" azure_storage_blobs = "0.1.0" oauth2 = "4.1.0" From c8ee54c93c89f307f91e9a7246e1a45bdf6827d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20Ml=C3=A1dek?= Date: Thu, 29 Aug 2024 23:45:52 +0200 Subject: [PATCH 3/5] Setup stricter clippy --- clippy.toml | 3 +++ spotflow/src/lib.rs | 24 ++++++++++++++++++++++++ 2 files changed, 27 insertions(+) create mode 100644 clippy.toml diff --git a/clippy.toml b/clippy.toml new file mode 100644 index 0000000..d9f788f --- /dev/null +++ b/clippy.toml @@ -0,0 +1,3 @@ +allow-expect-in-tests = true +allow-unwrap-in-tests = true +doc-valid-idents = ["IoT", "SQLite", ".."] diff --git a/spotflow/src/lib.rs b/spotflow/src/lib.rs index 6df0ded..2a34762 100644 --- a/spotflow/src/lib.rs +++ b/spotflow/src/lib.rs @@ -1,3 +1,27 @@ +#![deny( + clippy::expect_used, + clippy::future_not_send, + clippy::indexing_slicing, + clippy::panic, + clippy::pedantic, + clippy::todo, + clippy::unreachable, + clippy::unwrap_used, + unsafe_code +)] +#![allow( + // These should be also fixed sooner or later. + clippy::unwrap_used, + clippy::expect_used, + clippy::todo, + clippy::unreachable, + + clippy::missing_errors_doc, + clippy::module_name_repetitions, + clippy::struct_field_names, + clippy::too_many_lines, +)] + //! This crate contains the Device SDK for the Spotflow IoT Platform. //! More information: //! From 5bdb4ac3636127a8e50315832d0ebba07b854939 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20Ml=C3=A1dek?= Date: Thu, 29 Aug 2024 23:55:47 +0200 Subject: [PATCH 4/5] Add rust-toolchain.toml file --- spotflow/rust-toolchain.toml | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 spotflow/rust-toolchain.toml diff --git a/spotflow/rust-toolchain.toml b/spotflow/rust-toolchain.toml new file mode 100644 index 0000000..a56a283 --- /dev/null +++ b/spotflow/rust-toolchain.toml @@ -0,0 +1,2 @@ +[toolchain] +channel = "1.80.1" From 016800519263f986f7be972bc73d18995fdb8a26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20Ml=C3=A1dek?= Date: Fri, 30 Aug 2024 00:01:01 +0200 Subject: [PATCH 5/5] Fix new clippy lints Many of those are getting rid of potential panics due to malformed connection strings, IDs and other user input. --- spotflow/src/cloud/api_core.rs | 15 ++-- spotflow/src/cloud/dps.rs | 12 +-- spotflow/src/cloud/drs.rs | 35 +++------ spotflow/src/cloud/duration_wrapper.rs | 12 ++- spotflow/src/ingress/builder.rs | 5 ++ spotflow/src/ingress/mod.rs | 6 ++ spotflow/src/iothub/eventloop.rs | 33 ++++---- spotflow/src/iothub/handlers/c2d.rs | 19 +++-- spotflow/src/iothub/handlers/direct_method.rs | 40 +++++----- spotflow/src/iothub/handlers/twins.rs | 49 ++++++------ spotflow/src/iothub/json_diff.rs | 12 +-- spotflow/src/iothub/mod.rs | 11 ++- spotflow/src/iothub/query.rs | 10 +-- spotflow/src/iothub/sender.rs | 20 ++--- spotflow/src/iothub/token_handler.rs | 8 +- spotflow/src/iothub/topics.rs | 6 +- spotflow/src/iothub/twins/mod.rs | 8 +- .../twins/update_callback_dispatcher.rs | 2 +- spotflow/src/persistence/c2d.rs | 7 +- spotflow/src/persistence/mod.rs | 23 +++--- spotflow/src/persistence/sqlite.rs | 75 ++++++++++--------- spotflow/src/persistence/sqlite_channel.rs | 19 ++--- spotflow/src/persistence/twins.rs | 5 +- spotflow/src/utils/thread.rs | 2 +- 24 files changed, 218 insertions(+), 216 deletions(-) diff --git a/spotflow/src/cloud/api_core.rs b/spotflow/src/cloud/api_core.rs index 80ef347..0d0901f 100644 --- a/spotflow/src/cloud/api_core.rs +++ b/spotflow/src/cloud/api_core.rs @@ -44,7 +44,7 @@ pub(crate) fn put( token: impl AsRef, data: impl serde::Serialize, ) -> Result { - send(http::Method::PUT, base_uri, relative_uri, token, data) + send(&http::Method::PUT, base_uri, relative_uri, token, data) } pub(crate) fn post( @@ -53,21 +53,18 @@ pub(crate) fn post( token: impl AsRef, data: impl serde::Serialize, ) -> Result { - send(http::Method::POST, base_uri, relative_uri, token, data) + send(&http::Method::POST, base_uri, relative_uri, token, data) } pub(crate) fn send( - method: http::Method, + method: &http::Method, base_uri: &Uri, relative_uri: &Uri, token: impl AsRef, data: impl serde::Serialize, ) -> Result { - let authority = match base_uri.authority() { - Some(authority) => authority, - None => { - return Err(anyhow!("Provided base URI {base_uri:?} does not contain the authority (e.g., 'api.eu1.spotflow.io').").into()) - } + let Some(authority) = base_uri.authority() else { + return Err(anyhow!("Provided base URI {base_uri:?} does not contain the authority (e.g., 'api.eu1.spotflow.io').").into()); }; let path = relative_uri.path_and_query(); @@ -89,7 +86,7 @@ pub(crate) fn send( Arc::new(native_tls::TlsConnector::new().expect("Unable to build TLS connector")); let agent = ureq::AgentBuilder::new().tls_connector(connector).build(); - let request = match method { + let request = match *method { http::Method::POST => agent.post(&uri.to_string()), http::Method::PUT => agent.put(&uri.to_string()), _ => unimplemented!("Method {} is not implemented.", method), diff --git a/spotflow/src/cloud/dps.rs b/spotflow/src/cloud/dps.rs index e3aa354..7ebca5f 100644 --- a/spotflow/src/cloud/dps.rs +++ b/spotflow/src/cloud/dps.rs @@ -134,7 +134,7 @@ impl Provisioning { })? .into_json() .context("Failed deserializing response from JSON") - .map_err(|e| e.into()) + .map_err(Into::into) } pub fn complete(&mut self, operation_id: &str) -> Result { @@ -192,7 +192,7 @@ pub fn refresh( })? .into_json() .context("Failed deserializing response from JSON") - .map_err(|e| e.into()) + .map_err(Into::into) } #[cfg(test)] @@ -226,7 +226,7 @@ mod tests { instance_uri = format!("https://{instance_uri}"); } - let instance_url: Uri = instance_uri + let instance_uri: Uri = instance_uri .parse() .expect("Invalid instance URL: '{instance_uri}'"); @@ -235,7 +235,7 @@ mod tests { ); let pt = ProvisioningToken { token: pt }; - let mut provisioning = Provisioning::new(instance_url.clone(), pt); + let mut provisioning = Provisioning::new(instance_uri.clone(), pt); provisioning.with_device_id(device_id); let init = provisioning .init() @@ -251,7 +251,7 @@ mod tests { assert!(matches!(result, Err(CompletionError::NotReady))); // Approve will normally happen out of band by manual operator calls - approve_provisioning(&instance_url, &workspace_id, &operation) + approve_provisioning(&instance_uri, &workspace_id, &operation) .expect("Unable to approve operation"); let result = provisioning.complete(&operation); @@ -275,7 +275,7 @@ mod tests { expiration: result.expiration, }; - let res = register(&instance_url, &rt).expect("Unable to register device."); + let res = register(&instance_uri, &rt).expect("Unable to register device."); println!("{res:#?}"); } diff --git a/spotflow/src/cloud/drs.rs b/spotflow/src/cloud/drs.rs index b74212e..c3211e4 100644 --- a/spotflow/src/cloud/drs.rs +++ b/spotflow/src/cloud/drs.rs @@ -44,8 +44,7 @@ impl RegistrationResponse { bail!("Cannot parse anything but Shared Access Signature."); } - let device_id = self - .connection_string + self.connection_string .split(';') .find_map(|part| { if let Some((k, v)) = part.split_once('=') { @@ -55,31 +54,20 @@ impl RegistrationResponse { } None }) - .unwrap(); - - Ok(device_id) + .context("Connection string does not contain `DeviceId`.") } pub fn workspace_id(&self) -> Result<&str> { - let iot_hub_device_id = self.iot_hub_device_id()?; - - let separator_pos = Self::get_iot_hub_device_id_separator_pos(iot_hub_device_id)?; - - Ok(&iot_hub_device_id[..separator_pos]) + Ok(self.split_iot_hub_device_id()?.0) } pub fn device_id(&self) -> Result<&str> { - let iot_hub_device_id = self.iot_hub_device_id()?; - - let separator_pos = Self::get_iot_hub_device_id_separator_pos(iot_hub_device_id)?; - - Ok(&iot_hub_device_id[separator_pos + 1..]) + Ok(self.split_iot_hub_device_id()?.1) } - fn get_iot_hub_device_id_separator_pos( - iot_hub_device_id: &str, - ) -> Result { - iot_hub_device_id.find(':').ok_or_else(|| { + fn split_iot_hub_device_id(&self) -> Result<(&str, &str), anyhow::Error> { + let iot_hub_device_id = self.iot_hub_device_id()?; + iot_hub_device_id.split_once(':').ok_or_else(|| { anyhow::anyhow!("Unknown format of IoT Hub Device ID, it does not contain a colon: '{iot_hub_device_id}'.") }) } @@ -89,8 +77,7 @@ impl RegistrationResponse { bail!("Cannot parse anything but Shared Access Signature."); } - let sas = self - .connection_string + self.connection_string .split(';') .find_map(|part| { if let Some((k, v)) = part.split_once('=') { @@ -100,9 +87,7 @@ impl RegistrationResponse { } None }) - .unwrap(); - - Ok(sas) + .context("Connection string does not contain `SharedAccessSignature`.") } } @@ -126,5 +111,5 @@ pub fn register( })? .into_json() .context("Failed deserializing response from JSON") - .map_err(|e| e.into()) + .map_err(Into::into) } diff --git a/spotflow/src/cloud/duration_wrapper.rs b/spotflow/src/cloud/duration_wrapper.rs index 1dd91c6..4b8248f 100644 --- a/spotflow/src/cloud/duration_wrapper.rs +++ b/spotflow/src/cloud/duration_wrapper.rs @@ -59,7 +59,13 @@ impl<'de> Visitor<'de> for DurationVisitor { let hours: u64; // let mut microseconds = 0; - let parts: Vec<&str> = s.split(':').collect(); + let parts: [&str; 3] = s.split(':').collect::>().try_into().map_err(|_| { + de::Error::invalid_value( + Unexpected::Str("Duration did not contain two colons."), + &self, + ) + })?; + let day_and_hour = parts[0]; let minutes: u64 = parts[1].parse().map_err(|_| { de::Error::invalid_value( @@ -131,7 +137,7 @@ mod tests { fn deser_duration() { let s = "\"8.11:55:36.3296177\""; let d: Duration = serde_json::from_str::(s).unwrap().into(); - assert_eq!(d.as_secs(), 734136); + assert_eq!(d.as_secs(), 734_136); } #[test] @@ -156,7 +162,7 @@ mod tests { } #[test] - #[should_panic] + #[should_panic(expected = "This duration cannot be deserialized.")] fn deser_duration_fail() { let s = "\"10:39\""; let _d: Duration = serde_json::from_str::(s).unwrap().into(); diff --git a/spotflow/src/ingress/builder.rs b/spotflow/src/ingress/builder.rs index 18af8a3..d51067b 100644 --- a/spotflow/src/ingress/builder.rs +++ b/spotflow/src/ingress/builder.rs @@ -122,6 +122,7 @@ impl DeviceClientBuilder { /// Hidden from the documentation because the concept of Sites and their IDs is not yet explained in the Platform documentation. #[doc(hidden)] + #[must_use] pub fn with_site_id(mut self, site_id: String) -> DeviceClientBuilder { self.site_id = Some(site_id); self @@ -132,6 +133,7 @@ impl DeviceClientBuilder { /// /// If your company uses a dedicated instance of the Platform, such as `acme.spotflow.io`, specify it here. /// The default value is `api.eu1.spotflow.io`. + #[must_use] pub fn with_instance(mut self, instance: String) -> DeviceClientBuilder { self.instance = Some(instance); self @@ -140,6 +142,7 @@ impl DeviceClientBuilder { /// Set the callback to display the details of the /// [Provisioning Operation](https://docs.spotflow.io/connect-devices/#provisioning-operation) /// when [`DeviceClientBuilder::build`] is performing [Device Provisioning](https://docs.spotflow.io/connect-devices/#device-provisioning). + #[must_use] pub fn with_display_provisioning_operation_callback( mut self, callback: Box, @@ -153,6 +156,7 @@ impl DeviceClientBuilder { /// [Device](https://docs.spotflow.io/connect-devices/#device) receives their update from the Platform. /// The [Device configuration tutorial](https://docs.spotflow.io/configure-devices/tutorial-configure-device#1-start-device) /// shows how to use this option. The callback must be `Send` and `Sync`, because it's called in a separate thread. + #[must_use] pub fn with_desired_properties_updated_callback( mut self, callback: Box, @@ -162,6 +166,7 @@ impl DeviceClientBuilder { } /// Set the source of the system signals that can request the process to stop. + #[must_use] pub fn with_signals_source(mut self, signals_src: Box) -> Self { self.signals_src = Some(signals_src); self diff --git a/spotflow/src/ingress/mod.rs b/spotflow/src/ingress/mod.rs index 86e1ea6..5c048a6 100644 --- a/spotflow/src/ingress/mod.rs +++ b/spotflow/src/ingress/mod.rs @@ -57,6 +57,7 @@ pub struct MessageContext { impl MessageContext { // Create a new instance of [`MessageContext`] with the provided [Stream Group](https://docs.spotflow.io/send-data/#stream-group) // and [Stream](https://docs.spotflow.io/send-data/#stream). + #[must_use] pub fn new(stream_group: Option, stream: Option) -> Self { Self { stream_group, @@ -67,6 +68,7 @@ impl MessageContext { /// Get the [Stream Group](https://docs.spotflow.io/send-data/#stream-group) where /// [Messages](https://docs.spotflow.io/send-data/#message) will be sent to. + #[must_use] pub fn stream_group(&self) -> Option<&str> { self.stream_group.as_deref() } @@ -79,6 +81,7 @@ impl MessageContext { /// Get the [Stream](https://docs.spotflow.io/send-data/#stream) where /// [Messages](https://docs.spotflow.io/send-data/#message) will be sent to. + #[must_use] pub fn stream(&self) -> Option<&str> { self.stream.as_deref() } @@ -90,6 +93,7 @@ impl MessageContext { } /// Get the compression to use for sending [Messages](https://docs.spotflow.io/send-data/#message). + #[must_use] pub fn compression(&self) -> Option { self.compression.clone() } @@ -338,6 +342,7 @@ impl DeviceClient { /// if their version is higher than `version`. Otherwise, return `None`. /// /// Only the latest version is returned, any versions between the last obtained one and the current one are skipped. + #[must_use] pub fn desired_properties_if_newer(&self, version: u64) -> Option { self.connection.desired_properties_if_newer(version) } @@ -393,6 +398,7 @@ impl DeviceClient { /// **Warning**: Deprecated, don't use. #[deprecated] #[doc(hidden)] + #[must_use] pub fn reported_properties(&self) -> Option { self.connection.reported_properties() } diff --git a/spotflow/src/iothub/eventloop.rs b/spotflow/src/iothub/eventloop.rs index 3d5ee97..cd31cde 100644 --- a/spotflow/src/iothub/eventloop.rs +++ b/spotflow/src/iothub/eventloop.rs @@ -38,7 +38,7 @@ impl SubscriptionTask { acked += self.receiver.recv().await.map_err(|_| anyhow!("Channel for subscribe acknowledgements was closed before startup finished, possibly because subscription failed.") )?; - log::debug!("Subscription {}/{} acknowledged", acked, self.total,) + log::debug!("Subscription {acked}/{} acknowledged", self.total); } Ok(()) @@ -141,7 +141,7 @@ impl EventLoop { pub(super) async fn run(&mut self) { loop { select! { - _ = self.cancellation.cancelled() => { + () = self.cancellation.cancelled() => { log::debug!("Stopping MQTT because of cancellation"); break; }, @@ -200,7 +200,7 @@ impl EventLoop { time: Instant::now(), }, ) { - Ok(_) => log::debug!("Requesting IoT Hub authentication refresh."), + Ok(()) => log::debug!("Requesting IoT Hub authentication refresh."), Err(e) => log::error!( "Unable to request IoT Hub authentication refresh: {e:?}" ), @@ -238,7 +238,7 @@ impl EventLoop { log::warn!( "Ignoring message received on unexpected topic {:?}", &publish.topic, - ) + ); } Packet::PubAck(ack) => { if self.pending_d2c.contains(&ack.pkid) { @@ -268,11 +268,12 @@ impl EventLoop { } Packet::UnsubAck(_) => todo!(), Packet::Connect(_) => unreachable!("Client is responsible for connection initiation"), - Packet::PubRec(_) => unreachable!("Azure IoT Hub does not support QoS 2"), - Packet::PubRel(_) => unreachable!("Azure IoT Hub does not support QoS 2"), - Packet::PubComp(_) => unreachable!("Azure IoT Hub does not support QoS 2"), - Packet::Subscribe(_) => unreachable!("Only the client can subscribe to topics"), - Packet::Unsubscribe(_) => unreachable!("Only the client can subscribe to topics"), + Packet::PubRec(_) | Packet::PubRel(_) | Packet::PubComp(_) => { + unreachable!("Azure IoT Hub does not support QoS 2") + } + Packet::Subscribe(_) | Packet::Unsubscribe(_) => { + unreachable!("Only the client can subscribe to topics") + } Packet::Disconnect => unreachable!("Only the client sends disconnect"), // Packet::ConnAck(_) => {}, // Ignore ConnAck; rumqttc will handle everything for us // Packet::PingReq => {}, @@ -297,21 +298,19 @@ impl EventLoop { // Else this is request-response type of exchange such as reported properties update // We do not care about packet IDs or anything like that } + #[allow(clippy::match_same_arms)] Outgoing::Subscribe(_) => { // We counted subscriptions beforehand so we do not want to count them now. // We cannot subscribe reliably during runtime. } Outgoing::Unsubscribe(_) => todo!(), - Outgoing::PubRec(_) => unreachable!("Azure IoT Hub does not support QoS 2"), - Outgoing::PubRel(_) => unreachable!("Azure IoT Hub does not support QoS 2"), - Outgoing::PubComp(_) => unreachable!("Azure IoT Hub does not support QoS 2"), + Outgoing::PubRec(_) | Outgoing::PubComp(_) | Outgoing::PubRel(_) => { + unreachable!("Azure IoT Hub does not support QoS 2") + } Outgoing::AwaitAck(_) => { - log::warn!("MQTT is blocking until an out-of-order message is acknowledged.") + log::warn!("MQTT is blocking until an out-of-order message is acknowledged."); } - // Outgoing::PubAck(_) => {} - // Outgoing::PingReq => {}, - // Outgoing::PingResp => {}, - _ => {} + Outgoing::PubAck(_) | Outgoing::PingReq | Outgoing::PingResp => {} } } } diff --git a/spotflow/src/iothub/handlers/c2d.rs b/spotflow/src/iothub/handlers/c2d.rs index bbd7680..73b03d4 100644 --- a/spotflow/src/iothub/handlers/c2d.rs +++ b/spotflow/src/iothub/handlers/c2d.rs @@ -40,16 +40,15 @@ impl AsyncHandler for CloudToDeviceHandler { let topic = &publish.topic; log::debug!("Received C2D message on topic {topic}"); - let properties = match query::parse(&publish.topic[self.c2d_prefix.len()..]) { - Ok(properties) => properties, - Err(e) => { - log::error!( - "Failed parsing cloud to device message topic `{}`: {:?}", - topic, - e - ); - return; - } + let Some(properties) = publish.topic.strip_prefix(&self.c2d_prefix) else { + // Ignore malformed requests + return; + }; + + let Ok(properties) = query::parse(properties).inspect_err(|e| { + log::error!("Failed parsing cloud to device message topic `{topic}`: {e:?}"); + }) else { + return; }; let msg = CloudToDeviceMessage::new( diff --git a/spotflow/src/iothub/handlers/direct_method.rs b/spotflow/src/iothub/handlers/direct_method.rs index 18550f5..4060cc7 100644 --- a/spotflow/src/iothub/handlers/direct_method.rs +++ b/spotflow/src/iothub/handlers/direct_method.rs @@ -55,7 +55,7 @@ impl DirectMethodHandler { } } Ok((status, payload)) => { - let topic = topics::response_topic(status, msg.request_id); + let topic = topics::response_topic(status, &msg.request_id); _ = client.try_publish(topic, QoS::AtLeastOnce, false, payload); } } @@ -87,27 +87,31 @@ impl Handler for DirectMethodHandler { log::debug!("Received direct method call on topic {topic}"); // Because the method name may contain slashes we need to look for `/` from the right - let topic_without_prefix = &topic[topics::METHODS_PREFIX.len()..]; - let last_slash = topic_without_prefix - .rfind('/') - .expect("Invalid topic starts like direct method call but misses fourth slash."); - let method_name = topic_without_prefix[..last_slash].to_string(); + let Some(topic_without_prefix) = topic.strip_prefix(topics::METHODS_PREFIX) else { + // Ignore malformed requests + return; + }; + let Some((method_name, rest)) = topic_without_prefix.rsplit_once('/') else { + // "Invalid topic starts like direct method call but misses fourth slash." + return; + }; + + let Some(properties) = rest.strip_prefix("?$") else { + return; + }; // Skip the last slash and the leading question mark - let properties = match query::parse(&topic_without_prefix[last_slash + 2..]) { + let properties = match query::parse(properties) { Ok(properties) => properties, Err(e) => { - log::error!("Failed parsing method call topic `{}`: {:?}", topic, e); + log::error!("Failed parsing method call topic `{topic}`: {e:?}"); return; } }; - let request_id = match properties.get("$rid") { - Some(Some(id)) => id.to_string(), - _ => { - log::error!("Request ID is missing in method call on topic `{}`", topic); - return; - } + let Some(Some(request_id)) = properties.get("$rid") else { + log::error!("Request ID is missing in method call on topic `{topic}`"); + return; }; log::debug!("Invoking method named {method_name}"); @@ -118,14 +122,14 @@ impl Handler for DirectMethodHandler { .expect("The sender is wrapped in Option only to be able to drop it explicitly") .try_send(Invocation { publish: publish.clone(), - method_name, - request_id, + method_name: method_name.to_owned(), + request_id: request_id.to_owned(), }) { Err(TrySendError::Full(invocation)) => log::warn!("Received unexpectedly many direct method calls before they could be processed. Ignoring call to {} with request ID {}.", invocation.method_name, invocation.request_id), Err(TrySendError::Disconnected(invocation)) => log::error!("Received direct method call after processor shut down. Ignoring call to {} with request ID {}.", invocation.method_name, invocation.request_id), - Ok(_) => {}, + Ok(()) => {}, } } } @@ -143,7 +147,7 @@ fn join(handle: &mut Option>) { if let Some(handle) = handle { let thread = handle.thread(); let id = thread.id(); - let name = thread.name().map(|n| n.to_string()).unwrap_or_default(); + let name = thread.name().map(ToString::to_string).unwrap_or_default(); log::trace!("Joining thread {:?} named `{}`", id, name); if let Err(cause) = handle.join() { if let Some(s) = cause.downcast_ref::<&'static str>() { diff --git a/spotflow/src/iothub/handlers/twins.rs b/spotflow/src/iothub/handlers/twins.rs index c1b1ece..f9cdcad 100644 --- a/spotflow/src/iothub/handlers/twins.rs +++ b/spotflow/src/iothub/handlers/twins.rs @@ -126,8 +126,8 @@ impl TwinsMiddleware { pub(crate) async fn process(&mut self) { loop { if let Err(e) = select!( - _ = self.cancellation.cancelled() => break, - Some(_) = self.get_twins.recv() => { + () = self.cancellation.cancelled() => break, + Some(()) = self.get_twins.recv() => { self.get_twins().await.context("Receiving complete twins failed") } Ok(msg) = self.reported_properties_updates.recv(&None) => { @@ -182,7 +182,7 @@ impl TwinsMiddleware { log::debug!("Updating reported properties with request ID {rid}"); self.mqtt_client .try_publish( - topics::patch_reported_properties(rid), + topics::patch_reported_properties(&rid), rumqttc::QoS::AtLeastOnce, false, patch.as_bytes(), @@ -206,7 +206,7 @@ impl TwinsMiddleware { log::debug!("Requesting device twins with request ID {rid}"); self.mqtt_client .try_publish( - topics::get_twins(rid), + topics::get_twins(&rid), rumqttc::QoS::AtLeastOnce, false, Vec::new(), @@ -237,16 +237,15 @@ impl TwinsMiddleware { let topic = &publish.topic; log::debug!("Received device twin desired properties update on topic {topic}"); - let mut parts = topic.split('/'); - if parts.clone().count() != 6 { - bail!("Received message on invalid topic '{}'.", topic); - } + let Ok(parts): Result<[_; 6], _> = topic.split('/').collect::>().try_into() else { + bail!("Received message on invalid topic '{topic}'."); + }; - let properties = parts - .nth(5) - .expect("Unreachable because we have checked the number of parts"); + let Some(properties) = parts[5].strip_prefix('?') else { + bail!("Received message with malformed properties '{}'.", parts[5]); + }; - let properties = query::parse(&properties[1..]).context(format!( + let properties = query::parse(properties).context(format!( "Failed parsing twin desired properties update topic `{topic}`" ))?; let version = match properties.get("$version") { @@ -285,25 +284,21 @@ impl TwinsMiddleware { let topic = &publish.topic; log::debug!("Received device twin desired properties or reported properties change result on topic {topic}"); - let mut parts = topic.split('/'); - if parts.clone().count() != 5 { - bail!("Received message on an invalid topic '{}'.", topic); - } + let Ok(parts): Result<[_; 5], _> = topic.split('/').collect::>().try_into() else { + bail!("Received message on an invalid topic '{topic}'."); + }; - // This status is currently unused. However the side effect of advancing in the parts enum is important - let status = parts - .nth(3) - .expect("Unreachable because we have checked the number of parts"); - let _status: usize = status - .parse() - .unwrap_or_else(|_| panic!("Received message on an invalid topic '{topic}'")); + let status = parts[3]; + if status.parse::().is_err() { + bail!("Received message on an invalid topic '{topic}'."); + } - let properties = parts - .next() - .expect("Unreachable because we have checked the number of parts"); + let Some(properties) = parts[4].strip_prefix('?') else { + bail!("Received message with malformed properties '{}'.", parts[4]); + }; // Skip leading question mark - let properties = query::parse(&properties[1..]).context(format!( + let properties = query::parse(properties).context(format!( "Failed parsing twin response message topic `{topic}`" ))?; diff --git a/spotflow/src/iothub/json_diff.rs b/spotflow/src/iothub/json_diff.rs index 45b80e6..986f7c0 100644 --- a/spotflow/src/iothub/json_diff.rs +++ b/spotflow/src/iothub/json_diff.rs @@ -19,16 +19,12 @@ fn diff_objects( return Ok(None); } - if !desired.is_object() { + let Some(original) = original.as_object() else { return Ok(Some(desired.clone())); - } - - if !original.is_object() { + }; + let Some(desired) = desired.as_object() else { return Ok(Some(desired.clone())); - } - - let original = original.as_object().unwrap(); - let desired = desired.as_object().unwrap(); + }; let mut result = Map::new(); for (name, desired_child) in desired { diff --git a/spotflow/src/iothub/mod.rs b/spotflow/src/iothub/mod.rs index 20ee11c..c3dfa3f 100644 --- a/spotflow/src/iothub/mod.rs +++ b/spotflow/src/iothub/mod.rs @@ -162,7 +162,11 @@ impl IotHubConnection { // Ok(Box::new(self.twins_client.as_ref().unwrap().clone())) // } pub fn twins_client(&self) -> Result { - Ok(self.twins_client.as_ref().unwrap().clone()) + Ok(self + .twins_client + .as_ref() + .ok_or(anyhow!("Connection was not established yet."))? + .clone()) } } @@ -308,7 +312,10 @@ impl (i32, Vec) + Send + Sync + RefUnwindSafe + 'sta .ok() .and_then(|o| match &*o.state.borrow() { State::Ready => None, - State::ConnectionError(e) => Some(e.clone() as Arc), + State::ConnectionError(e) => { + let cast: Arc = e.to_owned(); + Some(cast) + } }) } } diff --git a/spotflow/src/iothub/query.rs b/spotflow/src/iothub/query.rs index f6b8108..4b38e91 100644 --- a/spotflow/src/iothub/query.rs +++ b/spotflow/src/iothub/query.rs @@ -7,16 +7,14 @@ pub(crate) fn parse(query: &str) -> Result>> { let mut map = HashMap::new(); for prop in query.split('&') { - match prop.find('=') { + match prop.split_once('=') { None => { let key = decode(prop).context(format!("Unable to URL decode key {prop}"))?; map.insert(key.into_owned(), None); } - Some(pos) => { - let key = - decode(&prop[..pos]).context(format!("Unable to URL decode key {prop}"))?; - let value = decode(&prop[pos + 1..]) - .context(format!("Unable to URL decode value {prop}"))?; + Some((key, value)) => { + let key = decode(key).context(format!("Unable to URL decode key {prop}"))?; + let value = decode(value).context(format!("Unable to URL decode value {prop}"))?; map.insert(key.into_owned(), Some(value.into_owned())); } } diff --git a/spotflow/src/iothub/sender.rs b/spotflow/src/iothub/sender.rs index 75f125e..a599ea4 100644 --- a/spotflow/src/iothub/sender.rs +++ b/spotflow/src/iothub/sender.rs @@ -40,16 +40,21 @@ impl Sender { pub(super) async fn process_saved(&mut self) { loop { select!( - _ = self.cancellation.cancelled() => break, + () = self.cancellation.cancelled() => break, // At this point we panic. I don't know what else to do as this is core functionality. // In a better world I will let the user know that the SDK stopped working and they need to restart or something. // For now this should panic on our own thread (not on user's thread) and cascade to the SDK itself which will probably return Error when the user tries to send more messages. Some(msg) = self.message_queue.get_message() => self.publish_iothub(msg).await.unwrap(), - ) + ); } } async fn publish_iothub(&self, msg: DeviceMessage) -> Result<()> { + fn encode_property(key: &str, value: &str) -> String { + let value = urlencoding::encode(value); + format!("{key}={value}") + } + let id = msg .id .expect("We have a saved message without an ID. This should never happen."); @@ -121,7 +126,7 @@ impl Sender { log::trace!("Sending message {} through file upload", id); properties.push(String::from("has-externalized-payload=true")); let blob_name = loop { - match self.publish_file(content.as_ref()).await { + match self.publish_file(content.as_ref()) { Ok(name) => break name, Err(e) => log::error!("Failed uploading file: {:?}", e), } @@ -170,15 +175,10 @@ impl Sender { log::trace!("Message sent {}", id); - return Ok(()); - - fn encode_property(key: &str, value: &str) -> String { - let value = urlencoding::encode(value); - format!("{}={}", key, value) - } + Ok(()) } - async fn publish_file(&self, content: &[u8]) -> Result { + fn publish_file(&self, content: &[u8]) -> Result { let registration = self.registration_watch.borrow(); let registration = registration .as_ref() diff --git a/spotflow/src/iothub/token_handler.rs b/spotflow/src/iothub/token_handler.rs index daa2649..191e804 100644 --- a/spotflow/src/iothub/token_handler.rs +++ b/spotflow/src/iothub/token_handler.rs @@ -104,7 +104,7 @@ impl TokenHandler { }; match processing_result { - Ok(_) => break, + Ok(()) => break, Err(e) => { log::warn!("First registration has failed, waiting for 30 seconds and trying again. Error: {e:?}"); tokio::time::sleep(Duration::from_secs(30)).await; @@ -153,7 +153,7 @@ impl TokenHandler { // Wait until the next command is received or it is time to check token expiration time again // (we check it periodically to ensure that we do not miss the expiration even if the device is in sleep mode) select! { - _ = tokio::time::sleep(Duration::from_secs(60)) => {} + () = tokio::time::sleep(Duration::from_secs(60)) => {} Some(command) = self.command_receiver.recv() => self.process_command(command).await } @@ -240,7 +240,7 @@ impl TokenHandler { } RegistrationCommand::RefreshRegistration { time } => { if time >= self.last_registration_refresh_attempt { - let result = self.try_refresh_registration().await; + let result = self.try_refresh_registration(); self.last_registration_refresh_attempt = Instant::now(); if let Err(e) = result { @@ -264,7 +264,7 @@ impl TokenHandler { } } - async fn try_refresh_registration(&mut self) -> Result<()> { + fn try_refresh_registration(&mut self) -> Result<()> { log::info!("Refreshing registration to the platform"); let registration = drs::register(&self.instance_url, &self.tokens.registration_token)?; self.tokens.iothub_sas_token = Some(ConnectionToken { diff --git a/spotflow/src/iothub/topics.rs b/spotflow/src/iothub/topics.rs index cbccb56..d55c40e 100644 --- a/spotflow/src/iothub/topics.rs +++ b/spotflow/src/iothub/topics.rs @@ -10,14 +10,14 @@ pub(super) fn c2d_topic(device_id: &str) -> String { format!("devices/{device_id}/messages/devicebound/") } -pub(crate) fn response_topic(status: i32, request_id: String) -> String { +pub(crate) fn response_topic(status: i32, request_id: &str) -> String { format!("$iothub/methods/res/{status}/?$rid={request_id}") } -pub(crate) fn patch_reported_properties(rid: String) -> String { +pub(crate) fn patch_reported_properties(rid: &str) -> String { format!("$iothub/twin/PATCH/properties/reported/?$rid={rid}") } -pub(crate) fn get_twins(rid: String) -> String { +pub(crate) fn get_twins(rid: &str) -> String { format!("$iothub/twin/GET/?$rid={rid}") } diff --git a/spotflow/src/iothub/twins/mod.rs b/spotflow/src/iothub/twins/mod.rs index b999e49..51c9b74 100644 --- a/spotflow/src/iothub/twins/mod.rs +++ b/spotflow/src/iothub/twins/mod.rs @@ -100,8 +100,8 @@ impl DeviceTwin { log::debug!("Setting reported properties to version {version}"); self.reported = Some(Twin { - properties, version, + properties, }); let reported = self .reported @@ -133,8 +133,8 @@ impl DeviceTwin { log::debug!("Setting desired properties to version {version}"); self.desired = Some(Twin { - properties, version, + properties, }); let desired = self .desired @@ -142,7 +142,7 @@ impl DeviceTwin { .expect("Desired properties value has just been assigned but is missing"); while let Some(update) = self.desired_properties_updates.pop_front() { - desired.update(update)?; + desired.update(&update)?; } log::trace!("Current desired properties:\n{:#?}", desired.properties); @@ -497,7 +497,7 @@ mod tests { let update: TwinUpdate = serde_json::from_str(update).expect("Unable to deserialize update"); - twins.desired.update(update).unwrap(); + twins.desired.update(&update).unwrap(); let result = r#"{"lorem":"ipsum","ahoj":"hi","next":42}"#; let result: serde_json::Value = serde_json::from_str(result).expect("Unable to deserialize expected JSON result"); diff --git a/spotflow/src/iothub/twins/update_callback_dispatcher.rs b/spotflow/src/iothub/twins/update_callback_dispatcher.rs index 4c85a99..926e5ad 100644 --- a/spotflow/src/iothub/twins/update_callback_dispatcher.rs +++ b/spotflow/src/iothub/twins/update_callback_dispatcher.rs @@ -26,7 +26,7 @@ impl DesiredPropertiesUpdatedCallbackDispatcher { if let Err(cause) = result { let message = if let Some(s) = cause.downcast_ref::<&'static str>() { - s.to_string() + (*s).to_string() } else if let Some(s) = cause.downcast_ref::() { s.clone() } else { diff --git a/spotflow/src/persistence/c2d.rs b/spotflow/src/persistence/c2d.rs index 7ede1cf..6318fbd 100644 --- a/spotflow/src/persistence/c2d.rs +++ b/spotflow/src/persistence/c2d.rs @@ -26,7 +26,7 @@ impl Storable for CloudToDeviceMessage { SELECT last_insert_rowid() as id"#, self.content, ) - .fetch_one(&mut transaction) + .fetch_one(&mut *transaction) .await?; log::debug!("Saved C2D message with ID {}", record.id); @@ -38,7 +38,7 @@ impl Storable for CloudToDeviceMessage { k, v, ) - .execute(&mut transaction) + .execute(&mut *transaction) .await?; } @@ -90,6 +90,7 @@ impl Storable for CloudToDeviceMessage { .fetch_one(conn) .await?; - Ok(res.cnt as usize) + // `cnt` cannot be negative so this is safe. + Ok(res.cnt.try_into().unwrap_or_default()) } } diff --git a/spotflow/src/persistence/mod.rs b/spotflow/src/persistence/mod.rs index 20cb8af..7311351 100644 --- a/spotflow/src/persistence/mod.rs +++ b/spotflow/src/persistence/mod.rs @@ -163,14 +163,14 @@ pub async fn create( ) -> Result { let sqlite = SqliteStore::init(store_path, config).await?; - start(sqlite, config, cancellation_token).await + Ok(start(sqlite, config, cancellation_token)) } -async fn start( +fn start( sqlite: SqliteStore, config: &SdkConfiguration, cancellation_token: CancellationToken, -) -> Result { +) -> Store { let (message_sender, message_receiver) = mpsc::channel(100); let (latest_msg_id_sender, mut latest_msg_id_receiver) = watch::channel(-1); @@ -198,7 +198,7 @@ async fn start( for msg in messages { select!( - _ = cancellation_token.cancelled() => { + () = cancellation_token.cancelled() => { // Cancelled return; }, @@ -213,18 +213,18 @@ async fn start( } } else if *latest_msg_id_receiver.borrow_and_update() == last_id { select!( - _ = cancellation_token.cancelled() => { + () = cancellation_token.cancelled() => { // Cancelled return; }, read = latest_msg_id_receiver.changed() => { if read.is_err() { - // No more updates are comming + // No more updates are coming return; } // else we start running the loop again }, - ) + ); } } }); @@ -254,7 +254,7 @@ async fn start( inner: sqlite.clone(), }; - Ok(Store { + Store { store: sqlite, d2c_producer: producer, d2c_consumer: consumer, @@ -263,7 +263,7 @@ async fn start( c2d_producer, c2d_consumer, twins_store, - }) + } } #[derive(Debug)] @@ -291,6 +291,7 @@ pub struct CloudToDeviceMessage { } impl CloudToDeviceMessage { + #[must_use] pub fn new(content: Vec, properties: HashMap) -> Self { CloudToDeviceMessage { id: None, @@ -300,7 +301,7 @@ impl CloudToDeviceMessage { } } -#[derive(Debug, sqlx::Type)] +#[derive(Copy, Clone, Debug, sqlx::Type)] pub enum CloseOption { None, Close, @@ -308,7 +309,7 @@ pub enum CloseOption { CloseMessageOnly, } -#[derive(Debug, sqlx::Type)] +#[derive(Copy, Clone, Debug, sqlx::Type)] pub enum Compression { None, BrotliFastest, diff --git a/spotflow/src/persistence/sqlite.rs b/spotflow/src/persistence/sqlite.rs index e11cc37..bce8d49 100644 --- a/spotflow/src/persistence/sqlite.rs +++ b/spotflow/src/persistence/sqlite.rs @@ -3,7 +3,7 @@ use chrono::{DateTime, Utc}; use http::Uri; use log::{debug, warn}; use sqlx::{Connection, Row, SqliteConnection}; -use std::{fs::File, ops::DerefMut, path::Path, str::FromStr, sync::Arc}; +use std::{fs::File, path::Path, str::FromStr, sync::Arc}; use tokio::sync::{Mutex, MutexGuard}; use super::{ @@ -178,7 +178,7 @@ impl SqliteStore { msg.compression as _, msg.batch_slice_id, msg.chunk_id, - ).fetch_one(conn.deref_mut()).await?; + ).fetch_one(&mut *conn).await?; Ok(record.id) } @@ -189,16 +189,17 @@ impl SqliteStore { sqlx::query_as!( DeviceMessage, r#"SELECT id AS "id?: i32", site_id, stream_group, stream, batch_id, message_id, content, close_option AS "close_option!: CloseOption", compression AS "compression!: Compression", batch_slice_id, chunk_id FROM Messages WHERE id > ? ORDER BY id LIMIT 100"#, after, - ).fetch_all(conn.deref_mut()).await.map_err(anyhow::Error::from) + ).fetch_all(&mut *conn).await.map_err(anyhow::Error::from) } pub async fn message_count(&self) -> Result { let mut conn = self.conn.lock().await; let res = sqlx::query!("SELECT COUNT(id) as cnt FROM Messages") - .fetch_one(conn.deref_mut()) + .fetch_one(&mut *conn) .await?; - Ok(res.cnt as usize) + // This is safe because the result cannot be negative. + Ok(res.cnt.try_into().unwrap_or_default()) } pub async fn remove_oldest_message(&self) -> Result<()> { @@ -206,7 +207,7 @@ impl SqliteStore { sqlx::query!( "DELETE FROM Messages WHERE id = (SELECT id FROM Messages ORDER BY id LIMIT 1)" ) - .execute(conn.deref_mut()) + .execute(&mut *conn) .await?; Ok(()) @@ -232,16 +233,16 @@ impl SqliteStore { async fn load_twin_properties(&self, twin_type: &str) -> Result> { let mut conn = self.conn.lock().await; - let res = match sqlx::query!( + let res = sqlx::query!( r#"SELECT properties FROM Twins WHERE type = ? ORDER BY id DESC LIMIT 1"#, twin_type, ) - .fetch_optional(conn.deref_mut()) + .fetch_optional(&mut *conn) .await - .context("Unable to load twin")? - { - Some(row) => row, - None => return Ok(None), + .context("Unable to load twin")?; + + let Some(res) = res else { + return Ok(None); }; Ok(Some( @@ -257,7 +258,7 @@ impl SqliteStore { twin_type, json, ) - .execute(conn.deref_mut()) + .execute(&mut *conn) .await .context(format!("Unable to save twin {twin_type} properties")) .map(|_| ()) @@ -269,7 +270,7 @@ impl SqliteStore { let mut conn = self.conn.lock().await; Ok( sqlx::query!(r#"SELECT requested_device_id FROM SdkConfiguration WHERE id = "0""#,) - .fetch_one(conn.deref_mut()) + .fetch_one(&mut *conn) .await .context("Unable to load device ID from configuration")? .requested_device_id, @@ -282,7 +283,7 @@ impl SqliteStore { r#"UPDATE SdkConfiguration SET workspace_id = ? WHERE id = "0""#, workspace_id, ) - .execute(conn.deref_mut()) + .execute(&mut *conn) .await .context("Unable to save Workspace ID to configuration")?; @@ -293,7 +294,7 @@ impl SqliteStore { let mut conn = self.conn.lock().await; Ok( sqlx::query!(r#"SELECT workspace_id FROM SdkConfiguration WHERE id = "0""#,) - .fetch_one(conn.deref_mut()) + .fetch_one(&mut *conn) .await .context("Unable to load workspace ID from configuration")? .workspace_id, @@ -306,7 +307,7 @@ impl SqliteStore { r#"UPDATE SdkConfiguration SET device_id = ? WHERE id = "0""#, device_id, ) - .execute(conn.deref_mut()) + .execute(&mut *conn) .await .context("Unable to save Device ID to configuration")?; @@ -317,7 +318,7 @@ impl SqliteStore { let mut conn = self.conn.lock().await; Ok( sqlx::query!(r#"SELECT device_id FROM SdkConfiguration WHERE id = "0""#,) - .fetch_one(conn.deref_mut()) + .fetch_one(&mut *conn) .await .context("Unable to load device ID from configuration")? .device_id, @@ -330,7 +331,7 @@ impl SqliteStore { ProvisioningToken, r#"SELECT provisioning_token AS token FROM SdkConfiguration WHERE id = "0""#, ) - .fetch_one(conn.deref_mut()) + .fetch_one(&mut *conn) .await .context("Unable to load provisioning token from configuration") } @@ -342,7 +343,7 @@ impl SqliteStore { r#"UPDATE SdkConfiguration SET provisioning_token = ? WHERE id = "0""#, token.token, ) - .execute(conn.deref_mut()) + .execute(&mut *conn) .await?; Ok(()) @@ -354,7 +355,7 @@ impl SqliteStore { RegistrationToken, r#"SELECT registration_token AS token, rt_expiration AS "expiration: DateTime" FROM SdkConfiguration WHERE id = "0""#, ) - .fetch_one(conn.deref_mut()) + .fetch_one(&mut *conn) .await .context("Unable to load registration token from configuration") } @@ -370,7 +371,7 @@ impl SqliteStore { token.token, token.expiration, ) - .execute(conn.deref_mut()) + .execute(&mut *conn) .await?; Ok(()) @@ -379,7 +380,7 @@ impl SqliteStore { pub(crate) async fn load_instance_url(&self) -> Result { let mut conn = self.conn.lock().await; let record = sqlx::query!(r#"SELECT instance_url FROM SdkConfiguration WHERE id = "0""#) - .fetch_one(conn.deref_mut()) + .fetch_one(&mut *conn) .await?; Ok(record.instance_url) @@ -538,6 +539,18 @@ async fn update_version_to_1_0_1(conn: &mut SqliteConnection) -> Result<(), anyh } async fn update_version_to_1_1_0(conn: &mut SqliteConnection) -> Result<(), anyhow::Error> { + async fn do_columns_exist(conn: &mut SqliteConnection) -> Result { + let res = sqlx::query_scalar!( + r#"SELECT COUNT(*) + FROM pragma_table_info('Messages') + WHERE name = 'batch_slice_id' OR name = 'chunk_id'"# + ) + .fetch_one(conn) + .await?; + + Ok(res == 2) + } + log::debug!("Updating database schema from version 1.0.1 to 1.1.0"); // There was an error in the code causing schema of version 1.1.0 to be marked 1.0.1, so we need to check if the @@ -557,19 +570,7 @@ async fn update_version_to_1_1_0(conn: &mut SqliteConnection) -> Result<(), anyh query.execute(conn).await?; log::debug!("Database schema updated to version 1.1.0"); - return Ok(()); - - async fn do_columns_exist(conn: &mut SqliteConnection) -> Result { - let res = sqlx::query_scalar!( - r#"SELECT COUNT(*) - FROM pragma_table_info('Messages') - WHERE name = 'batch_slice_id' OR name = 'chunk_id'"# - ) - .fetch_one(conn) - .await?; - - Ok(res == 2) - } + Ok(()) } async fn update_version_to_1_2_0( @@ -656,7 +657,7 @@ fn convert_dps_url_to_instance_url(dps_url: &str) -> Result(store: SqliteStore) -> (Sender, Receiver) { phantom: PhantomData, }, Receiver { - store: store.clone(), + store, last_saved: watch_rx, last_received: None, phantom: PhantomData, @@ -64,10 +65,10 @@ pub struct Receiver { phantom: PhantomData, } -impl Sender { +impl Sender { pub async fn send(&self, obj: &T) -> Result<()> { let mut conn = self.store.connection().await; - let id = obj.store(conn.deref_mut()).await?; + let id = obj.store(&mut conn).await?; { let last_saved = self.last_saved.lock().await; @@ -89,13 +90,13 @@ impl Sender { } } -impl Receiver { +impl Receiver { pub async fn recv(&mut self, cancellation: &Option) -> Result { let last_inserted = self.wait_new(cancellation).await?; let mut conn = self.store.connection().await; - let obj = T::load(conn.deref_mut(), self.last_received.unwrap_or(i32::MIN)) + let obj = T::load(&mut conn, self.last_received.unwrap_or(i32::MIN)) .await? .ok_or_else(|| { anyhow::anyhow!( @@ -117,7 +118,7 @@ impl Receiver { let result = if let Some(cancellation) = cancellation { select! { result = change_task => result, - _ = cancellation.cancelled() => anyhow::bail!("Task cancelled."), + () = cancellation.cancelled() => anyhow::bail!("Task cancelled."), } } else { change_task.await @@ -134,7 +135,7 @@ impl Receiver { pub async fn ack(&self, obj: &T) -> Result<()> { let mut conn = self.store.connection().await; - T::remove(conn.deref_mut(), obj.id()).await + T::remove(&mut conn, obj.id()).await } pub async fn count(&self) -> Result { diff --git a/spotflow/src/persistence/twins.rs b/spotflow/src/persistence/twins.rs index 075ef8c..a9a1685 100644 --- a/spotflow/src/persistence/twins.rs +++ b/spotflow/src/persistence/twins.rs @@ -21,7 +21,7 @@ pub struct Twin { } impl Twin { - pub fn update(&mut self, update: TwinUpdate) -> Result<()> { + pub fn update(&mut self, update: &TwinUpdate) -> Result<()> { match update.version { None => { log::debug!("Applying twin patch to automatically incremented version"); @@ -140,6 +140,7 @@ impl Storable for ReportedPropertiesUpdate { .fetch_one(conn) .await?; - Ok(res.count as usize) + // This is safe because the result cannot be negative + Ok(res.count.try_into().unwrap_or_default()) } } diff --git a/spotflow/src/utils/thread.rs b/spotflow/src/utils/thread.rs index 9f62b95..4697657 100644 --- a/spotflow/src/utils/thread.rs +++ b/spotflow/src/utils/thread.rs @@ -5,7 +5,7 @@ pub(crate) fn join(handle: &mut Option>) { if let Some(handle) = handle { let thread = handle.thread(); let id = thread.id(); - let name = thread.name().map(|n| n.to_string()).unwrap_or_default(); + let name = thread.name().map(ToString::to_string).unwrap_or_default(); log::trace!("Joining thread {:?} named `{}`", id, name); if let Err(cause) = handle.join() { if let Some(s) = cause.downcast_ref::<&'static str>() {