diff --git a/Cargo.lock b/Cargo.lock index 10f1356..4fa9285 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -504,6 +504,20 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +[[package]] +name = "examples" +version = "0.1.0" +dependencies = [ + "clap", + "color-eyre", + "simd-json", + "supabase-auth", + "supabase-realtime", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "eyre" version = "0.6.12" diff --git a/Cargo.toml b/Cargo.toml index 56c9c0d..7df73b0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,7 @@ mod_module_files = "allow" supabase-mock = { path = "crates/supabase-mock" } supabase-auth = { path = "crates/supabase-auth" } postgrest-error = { path = "crates/postgrest-error" } +supabase-realtime = { path = "crates/supabase-realtime" } # Async futures-concurrency = "7.4" diff --git a/crates/examples/Cargo.toml b/crates/examples/Cargo.toml new file mode 100644 index 0000000..be417b2 --- /dev/null +++ b/crates/examples/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "examples" +version = "0.1.0" +edition = "2021" + +[dependencies] +clap.workspace = true +color-eyre.workspace = true +supabase-realtime.workspace = true +supabase-auth.workspace = true +tokio.workspace = true +tracing.workspace = true +tracing-subscriber.workspace = true +simd-json.workspace = true + +[lints] +workspace = true + +[[bin]] +name = "realtime-example" +path = "src/realtime_example.rs" + +[[bin]] +name = "auth_example" +path = "src/auth_example.rs" diff --git a/crates/supabase-auth/examples/auth_example.rs b/crates/examples/src/auth_example.rs similarity index 85% rename from crates/supabase-auth/examples/auth_example.rs rename to crates/examples/src/auth_example.rs index 16e8e55..db86bff 100644 --- a/crates/supabase-auth/examples/auth_example.rs +++ b/crates/examples/src/auth_example.rs @@ -1,7 +1,8 @@ -use std::time::Duration; +use core::time::Duration; use clap::Parser; -use futures::StreamExt; +use supabase_auth::futures::StreamExt as _; +use supabase_auth::url; use tracing_subscriber::EnvFilter; #[derive(Parser, Debug)] @@ -38,8 +39,8 @@ async fn main() { EnvFilter::builder() .from_env() .unwrap() - .add_directive(format!("supabase_auth=debug").parse().unwrap()) - .add_directive(format!("auth_example=debug").parse().unwrap()), + .add_directive("supabase_auth=debug".to_owned().parse().unwrap()) + .add_directive("auth_example=debug".to_owned().parse().unwrap()), ) .init(); diff --git a/crates/supabase-realtime/examples/example1.rs b/crates/examples/src/realtime_example.rs similarity index 83% rename from crates/supabase-realtime/examples/example1.rs rename to crates/examples/src/realtime_example.rs index 1d2fc02..b732c1a 100644 --- a/crates/supabase-realtime/examples/example1.rs +++ b/crates/examples/src/realtime_example.rs @@ -1,11 +1,9 @@ -use std::borrow::Cow; -use std::time::Duration; +use core::time::Duration; use clap::Parser; -use futures::StreamExt; -use supabase_auth::redact::Secret; -use supabase_realtime::message::{phx_join, ProtocolMessage}; -use tokio_stream::wrappers::ReceiverStream; +use supabase_auth::url; +use supabase_realtime::futures::StreamExt as _; +use supabase_realtime::message::phx_join; use tracing_subscriber::EnvFilter; #[derive(Parser, Debug)] @@ -40,9 +38,9 @@ async fn main() { EnvFilter::builder() .from_env() .unwrap() - .add_directive(format!("supabase_auth=info").parse().unwrap()) - .add_directive(format!("supabase_realtime=info").parse().unwrap()) - .add_directive(format!("example1=info").parse().unwrap()), + .add_directive("supabase_auth=info".to_owned().parse().unwrap()) + .add_directive("supabase_realtime=info".to_owned().parse().unwrap()) + .add_directive("example1=info".to_owned().parse().unwrap()), ) .init(); color_eyre::install().unwrap(); @@ -70,12 +68,10 @@ async fn main() { self_item: false, ack: false, }, - presence: phx_join::PresenceConfig { - key: "".to_string(), - }, + presence: phx_join::PresenceConfig { key: String::new() }, postgres_changes: vec![phx_join::PostgrsChanges { event: phx_join::PostgresChangetEvent::All, - schema: "public".to_string(), + schema: "public".to_owned(), table: args.table, filter: args.filter, }], diff --git a/crates/supabase-auth/src/lib.rs b/crates/supabase-auth/src/lib.rs index b711a18..5565482 100644 --- a/crates/supabase-auth/src/lib.rs +++ b/crates/supabase-auth/src/lib.rs @@ -13,7 +13,6 @@ use simd_json::json; use thiserror::Error; use tokio::task::JoinSet; pub use {futures, redact, url}; - pub const SUPABASE_KEY: &str = "apikey"; #[derive(Clone, Debug, PartialEq, Eq)] diff --git a/crates/supabase-realtime/src/connection.rs b/crates/supabase-realtime/src/connection.rs index 459b64b..616ac3c 100644 --- a/crates/supabase-realtime/src/connection.rs +++ b/crates/supabase-realtime/src/connection.rs @@ -1,5 +1,5 @@ -use std::future::Future; -use std::sync::Arc; +use alloc::sync::Arc; +use core::future::Future; use bytes::Bytes; use fastwebsockets::FragmentCollector; diff --git a/crates/supabase-realtime/src/lib.rs b/crates/supabase-realtime/src/lib.rs index 6dd11d1..d6b3b5d 100644 --- a/crates/supabase-realtime/src/lib.rs +++ b/crates/supabase-realtime/src/lib.rs @@ -1,4 +1,8 @@ +extern crate alloc; + mod connection; mod error; pub mod message; pub mod realtime; + +pub use {futures, supabase_auth, url}; diff --git a/crates/supabase-realtime/src/message.rs b/crates/supabase-realtime/src/message.rs index fdc7061..8eae0f2 100644 --- a/crates/supabase-realtime/src/message.rs +++ b/crates/supabase-realtime/src/message.rs @@ -1,4 +1,4 @@ -//! Implementation of the datat types specified here: https://supabase.com/docs/guides/realtime/protocol +//! Implementation of the datat types specified here: use serde::{Deserialize, Serialize}; @@ -110,18 +110,18 @@ pub mod phx_reply { } "#; let expected_struct = ProtocolMessage { - topic: "realtime:db".to_string(), + topic: "realtime:db".to_owned(), payload: ProtocolPayload::PhxReply(PhxReply::Error(ErrorReply { - reason: "Invalid JWT Token".to_string(), + reason: "Invalid JWT Token".to_owned(), })), - ref_field: Some("1".to_string()), + ref_field: Some("1".to_owned()), join_ref: None, }; let serialzed = simd_json::to_string_pretty(&expected_struct).unwrap(); dbg!(serialzed); let deserialized_struct: ProtocolMessage = - simd_json::from_slice(json_data.to_string().into_bytes().as_mut_slice()).unwrap(); + simd_json::from_slice(json_data.to_owned().into_bytes().as_mut_slice()).unwrap(); assert_eq!(deserialized_struct, expected_struct); } @@ -150,17 +150,17 @@ pub mod phx_reply { } "#; let expected_struct = ProtocolMessage { - topic: "realtime:db".to_string(), + topic: "realtime:db".to_owned(), payload: ProtocolPayload::PhxReply(PhxReply::Ok(PhxReplyQuery { postgres_changes: vec![PostgresChanges { - schema: "public".to_string(), - table: "profiles".to_string(), + schema: "public".to_owned(), + table: "profiles".to_owned(), id: 31339675, - filter: Some("id=eq.83a19c16-fcd8-45d0-9710-d7b06ce6f329".to_string()), + filter: Some("id=eq.83a19c16-fcd8-45d0-9710-d7b06ce6f329".to_owned()), event: PostgresChangetEvent::All, }], })), - ref_field: Some("1".to_string()), + ref_field: Some("1".to_owned()), join_ref: None, }; @@ -168,7 +168,7 @@ pub mod phx_reply { dbg!(serialzed); let deserialized_struct: ProtocolMessage = - simd_json::from_slice(json_data.to_string().into_bytes().as_mut_slice()).unwrap(); + simd_json::from_slice(json_data.to_owned().into_bytes().as_mut_slice()).unwrap(); assert_eq!(deserialized_struct, expected_struct); } @@ -197,24 +197,24 @@ pub mod phx_reply { } "#; let expected_struct = ProtocolMessage { - topic: "realtime:db".to_string(), + topic: "realtime:db".to_owned(), payload: ProtocolPayload::PhxReply(PhxReply::Ok(PhxReplyQuery { postgres_changes: vec![PostgresChanges { - schema: "public".to_string(), - table: "profiles".to_string(), + schema: "public".to_owned(), + table: "profiles".to_owned(), id: 30636876, - filter: Some("".to_string()), + filter: Some(String::new()), event: PostgresChangetEvent::All, }], })), - ref_field: Some("1".to_string()), + ref_field: Some("1".to_owned()), join_ref: None, }; let serialzed = simd_json::to_string_pretty(&expected_struct).unwrap(); dbg!(serialzed); let deserialized_struct: ProtocolMessage = - simd_json::from_slice(json_data.to_string().into_bytes().as_mut_slice()).unwrap(); + simd_json::from_slice(json_data.to_owned().into_bytes().as_mut_slice()).unwrap(); assert_eq!(deserialized_struct, expected_struct); } @@ -234,7 +234,7 @@ pub mod phx_reply { }"#; let expected_struct = ProtocolMessage { - topic: "phoenix".to_string(), + topic: "phoenix".to_owned(), payload: ProtocolPayload::PhxReply(PhxReply::Ok(PhxReplyQuery { postgres_changes: Vec::new(), })), @@ -246,7 +246,7 @@ pub mod phx_reply { dbg!(serialized); let deserialized_struct: ProtocolMessage = - simd_json::from_slice(json_data.to_string().into_bytes().as_mut_slice()).unwrap(); + simd_json::from_slice(json_data.to_owned().into_bytes().as_mut_slice()).unwrap(); assert_eq!(deserialized_struct, expected_struct); } @@ -342,7 +342,7 @@ pub mod phx_join { "#; let expected_struct = ProtocolMessage { - topic: "realtime:db".to_string(), + topic: "realtime:db".to_owned(), payload: ProtocolPayload::PhxJoin(PhxJoin { config: JoinConfig { broadcast: BroadcastConfig { @@ -350,25 +350,25 @@ pub mod phx_join { ack: false, }, presence: PresenceConfig { - key: "".to_string(), + key: String::new(), }, postgres_changes: vec![PostgrsChanges { event: PostgresChangetEvent::All, - schema: "public".to_string(), - table: "profiles".to_string(), - filter: Some("id=eq.83a19c16-fcd8-45d0-9710-d7b06ce6f329".to_string()), + schema: "public".to_owned(), + table: "profiles".to_owned(), + filter: Some("id=eq.83a19c16-fcd8-45d0-9710-d7b06ce6f329".to_owned()), }], }, - access_token: Some("your_access_token".to_string()), + access_token: Some("your_access_token".to_owned()), }), - ref_field: Some("1".to_string()), - join_ref: Some("1".to_string()), + ref_field: Some("1".to_owned()), + join_ref: Some("1".to_owned()), }; let serialzed = simd_json::to_string_pretty(&expected_struct).unwrap(); dbg!(serialzed); let deserialized_struct: ProtocolMessage = - simd_json::from_slice(json_data.to_string().into_bytes().as_mut_slice()).unwrap(); + simd_json::from_slice(json_data.to_owned().into_bytes().as_mut_slice()).unwrap(); assert_eq!(deserialized_struct, expected_struct); } @@ -398,7 +398,7 @@ pub mod presence_state { "#; let expected_struct = ProtocolMessage { - topic: "realtime:db".to_string(), + topic: "realtime:db".to_owned(), payload: ProtocolPayload::PresenceState(PresenceState), ref_field: None, join_ref: None, @@ -408,7 +408,7 @@ pub mod presence_state { dbg!(serialzed); let deserialized_struct: ProtocolMessage = - simd_json::from_slice(json_data.to_string().into_bytes().as_mut_slice()).unwrap(); + simd_json::from_slice(json_data.to_owned().into_bytes().as_mut_slice()).unwrap(); assert_eq!(deserialized_struct, expected_struct); } @@ -438,7 +438,7 @@ pub mod heartbeat { "#; let expected_struct = ProtocolMessage { - topic: "phoenix".to_string(), + topic: "phoenix".to_owned(), payload: ProtocolPayload::Heartbeat(Heartbeat), ref_field: None, join_ref: None, @@ -448,7 +448,7 @@ pub mod heartbeat { dbg!(serialzed); let deserialized_struct: ProtocolMessage = - simd_json::from_slice(json_data.to_string().into_bytes().as_mut_slice()).unwrap(); + simd_json::from_slice(json_data.to_owned().into_bytes().as_mut_slice()).unwrap(); assert_eq!(deserialized_struct, expected_struct); } @@ -482,9 +482,9 @@ pub mod access_token { "#; let expected_struct = ProtocolMessage { - topic: "realtime::something::something".to_string(), + topic: "realtime::something::something".to_owned(), payload: ProtocolPayload::AccessToken(AccessToken { - access_token: "ssss".to_string(), + access_token: "ssss".to_owned(), }), ref_field: None, join_ref: None, @@ -494,7 +494,7 @@ pub mod access_token { dbg!(serialzed); let deserialized_struct: ProtocolMessage = - simd_json::from_slice(json_data.to_string().into_bytes().as_mut_slice()).unwrap(); + simd_json::from_slice(json_data.to_owned().into_bytes().as_mut_slice()).unwrap(); assert_eq!(deserialized_struct, expected_struct); } @@ -524,7 +524,7 @@ pub mod phx_close { "#; let expected_struct = ProtocolMessage { - topic: "realtime::something::something".to_string(), + topic: "realtime::something::something".to_owned(), payload: ProtocolPayload::PhxClose(PhxClose {}), ref_field: None, join_ref: None, @@ -534,7 +534,7 @@ pub mod phx_close { dbg!(serialzed); let deserialized_struct: ProtocolMessage = - simd_json::from_slice(json_data.to_string().into_bytes().as_mut_slice()).unwrap(); + simd_json::from_slice(json_data.to_owned().into_bytes().as_mut_slice()).unwrap(); assert_eq!(deserialized_struct, expected_struct); } @@ -579,12 +579,12 @@ pub mod system { "#; let expected_struct = ProtocolMessage { - topic: "realtime:db".to_string(), + topic: "realtime:db".to_owned(), payload: ProtocolPayload::System(System { - channel: "db".to_string(), - extension: "postgres_changes".to_string(), - message: "{:error, \"Unable to subscribe to changes with given parameters. Please check Realtime is enabled for the given connect parameters: [event: *, filter: id=eq.83a19c16-fcd8-45d0-9710-d7b06ce6f329, schema: public, table: profiles]\"}".to_string(), - status: "error".to_string(), + channel: "db".to_owned(), + extension: "postgres_changes".to_owned(), + message: "{:error, \"Unable to subscribe to changes with given parameters. Please check Realtime is enabled for the given connect parameters: [event: *, filter: id=eq.83a19c16-fcd8-45d0-9710-d7b06ce6f329, schema: public, table: profiles]\"}".to_owned(), + status: "error".to_owned(), }), ref_field: None, join_ref: None, @@ -596,7 +596,7 @@ pub mod system { dbg!(serialzed); let deserialized_struct: ProtocolMessage = - simd_json::from_slice(json_data.to_string().into_bytes().as_mut_slice()).unwrap(); + simd_json::from_slice(json_data.to_owned().into_bytes().as_mut_slice()).unwrap(); assert_eq!(deserialized_struct, expected_struct); } @@ -618,12 +618,12 @@ pub mod system { "#; let expected_struct = ProtocolMessage { - topic: "realtime:db".to_string(), + topic: "realtime:db".to_owned(), payload: ProtocolPayload::System(System { - channel: "db".to_string(), - extension: "postgres_changes".to_string(), - message: "{:error, \"Error parsing `filter` params: [\\\"\\\"]\"}".to_string(), - status: "error".to_string(), + channel: "db".to_owned(), + extension: "postgres_changes".to_owned(), + message: "{:error, \"Error parsing `filter` params: [\\\"\\\"]\"}".to_owned(), + status: "error".to_owned(), }), ref_field: None, join_ref: None, @@ -635,7 +635,7 @@ pub mod system { dbg!(serialzed); let deserialized_struct: ProtocolMessage = - simd_json::from_slice(json_data.to_string().into_bytes().as_mut_slice()).unwrap(); + simd_json::from_slice(json_data.to_owned().into_bytes().as_mut_slice()).unwrap(); assert_eq!(deserialized_struct, expected_struct); } @@ -657,12 +657,12 @@ pub mod system { "#; let expected_struct = ProtocolMessage { - topic: "realtime:db".to_string(), + topic: "realtime:db".to_owned(), payload: ProtocolPayload::System(System { - channel: "db".to_string(), - extension: "postgres_changes".to_string(), - message: "Subscribed to PostgreSQL".to_string(), - status: "ok".to_string(), + channel: "db".to_owned(), + extension: "postgres_changes".to_owned(), + message: "Subscribed to PostgreSQL".to_owned(), + status: "ok".to_owned(), }), ref_field: None, join_ref: None, @@ -674,7 +674,7 @@ pub mod system { dbg!(serialzed); let deserialized_struct: ProtocolMessage = - simd_json::from_slice(json_data.to_string().into_bytes().as_mut_slice()).unwrap(); + simd_json::from_slice(json_data.to_owned().into_bytes().as_mut_slice()).unwrap(); assert_eq!(deserialized_struct, expected_struct); } @@ -704,9 +704,9 @@ pub mod phx_error { "#; let expected_struct = ProtocolMessage { - topic: "realtime:db".to_string(), + topic: "realtime:db".to_owned(), payload: ProtocolPayload::PhxError(PhxError), - ref_field: Some("1".to_string()), + ref_field: Some("1".to_owned()), join_ref: None, }; @@ -714,7 +714,7 @@ pub mod phx_error { dbg!(serialized); let deserialized_struct: ProtocolMessage = - simd_json::from_slice(json_data.to_string().into_bytes().as_mut_slice()).unwrap(); + simd_json::from_slice(json_data.to_owned().into_bytes().as_mut_slice()).unwrap(); dbg!(&deserialized_struct); assert_eq!(deserialized_struct, expected_struct); @@ -723,13 +723,11 @@ pub mod phx_error { } pub mod postgres_changes { - use std::fmt; + use alloc::fmt; use serde::de::{self, DeserializeOwned}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; - use super::*; - #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub struct PostgresChangesPayload { @@ -826,6 +824,7 @@ pub mod postgres_changes { pub struct Buffer(pub Vec); impl Buffer { + #[must_use] pub fn into_inner(self) -> Vec { self.0 } @@ -912,6 +911,7 @@ pub mod postgres_changes { use pretty_assertions::assert_eq; use super::*; + use crate::message::{ProtocolMessage, ProtocolPayload}; #[test] fn test_postgres_changes_serialization() { @@ -947,7 +947,7 @@ pub mod postgres_changes { "#; // Parse json_data to extract raw bytes for record and old_record using simd_json - let mut json_data_bytes = json_data.to_string().into_bytes(); + let mut json_data_bytes = json_data.to_owned().into_bytes(); let json_value: simd_json::OwnedValue = simd_json::from_slice(&mut json_data_bytes).unwrap(); let data = &json_value["payload"]["data"]; @@ -959,29 +959,29 @@ pub mod postgres_changes { let old_record_bytes = simd_json::to_vec(old_record_value).unwrap(); let expected_struct = ProtocolMessage { - topic: "realtime:db".to_string(), + topic: "realtime:db".to_owned(), payload: ProtocolPayload::PostgresChanges(PostgresChangesPayload { data: Data { columns: vec![ Column { - name: "id".to_string(), - type_: "uuid".to_string(), + name: "id".to_owned(), + type_: "uuid".to_owned(), }, Column { - name: "updated_at".to_string(), - type_: "timestamptz".to_string(), + name: "updated_at".to_owned(), + type_: "timestamptz".to_owned(), }, Column { - name: "url".to_string(), - type_: "text".to_string(), + name: "url".to_owned(), + type_: "text".to_owned(), }, ], - commit_timestamp: "2024-08-25T17:00:19.009Z".to_string(), + commit_timestamp: "2024-08-25T17:00:19.009Z".to_owned(), errors: None, - old_record: Some(Buffer(old_record_bytes.clone())), - record: Some(Buffer(record_bytes.clone())), - schema: "public".to_string(), - table: "profiles".to_string(), + old_record: Some(Buffer(old_record_bytes)), + record: Some(Buffer(record_bytes)), + schema: "public".to_owned(), + table: "profiles".to_owned(), type_: PostgresDataChangeEvent::Update, }, ids: vec![38606455], @@ -991,7 +991,7 @@ pub mod postgres_changes { }; // Deserialize json_data using simd_json - let mut deserialized_bytes = json_data.to_string().into_bytes(); + let mut deserialized_bytes = json_data.to_owned().into_bytes(); let deserialized_struct: ProtocolMessage = simd_json::from_slice(&mut deserialized_bytes).unwrap(); @@ -1051,7 +1051,7 @@ pub mod postgres_changes { "#; // Parse json_data to extract raw bytes for record and old_record using simd_json - let mut json_data_bytes = json_data.to_string().into_bytes(); + let mut json_data_bytes = json_data.to_owned().into_bytes(); let json_value: simd_json::OwnedValue = simd_json::from_slice(&mut json_data_bytes).unwrap(); let data = &json_value["payload"]["data"]; @@ -1061,38 +1061,38 @@ pub mod postgres_changes { let record_bytes = simd_json::to_vec(record_value).unwrap(); let expected_struct = ProtocolMessage { - topic: "realtime:table-db-changes".to_string(), + topic: "realtime:table-db-changes".to_owned(), payload: ProtocolPayload::PostgresChanges(PostgresChangesPayload { data: Data { - table: "rooms".to_string(), + table: "rooms".to_owned(), type_: PostgresDataChangeEvent::Insert, record: Some(Buffer(record_bytes)), old_record: None, columns: vec![ Column { - name: "id".to_string(), - type_: "uuid".to_string(), + name: "id".to_owned(), + type_: "uuid".to_owned(), }, Column { - name: "created_at".to_string(), - type_: "timestamptz".to_string(), + name: "created_at".to_owned(), + type_: "timestamptz".to_owned(), }, Column { - name: "name".to_string(), - type_: "text".to_string(), + name: "name".to_owned(), + type_: "text".to_owned(), }, Column { - name: "owner_id".to_string(), - type_: "uuid".to_string(), + name: "owner_id".to_owned(), + type_: "uuid".to_owned(), }, Column { - name: "server_id".to_string(), - type_: "uuid".to_string(), + name: "server_id".to_owned(), + type_: "uuid".to_owned(), }, ], - commit_timestamp: "2024-10-19T07:55:12.926Z".to_string(), + commit_timestamp: "2024-10-19T07:55:12.926Z".to_owned(), errors: None, - schema: "public".to_string(), + schema: "public".to_owned(), }, ids: vec![60402389], }), @@ -1104,7 +1104,7 @@ pub mod postgres_changes { dbg!(serialized); let deserialized_struct: ProtocolMessage = - simd_json::from_slice(json_data.to_string().into_bytes().as_mut_slice()).unwrap(); + simd_json::from_slice(json_data.to_owned().into_bytes().as_mut_slice()).unwrap(); dbg!(&deserialized_struct); pretty_assertions::assert_eq!(deserialized_struct, expected_struct); @@ -1158,7 +1158,7 @@ pub mod postgres_changes { "#; // Parse json_data to extract raw bytes for record and old_record using simd_json - let mut json_data_bytes = json_data.to_string().into_bytes(); + let mut json_data_bytes = json_data.to_owned().into_bytes(); let json_value: simd_json::OwnedValue = simd_json::from_slice(&mut json_data_bytes).unwrap(); let data = &json_value["payload"]["data"]; @@ -1168,38 +1168,38 @@ pub mod postgres_changes { let old_record_bytes = simd_json::to_vec(old_record_value).unwrap(); let expected_struct = ProtocolMessage { - topic: "realtime:table-db-changes".to_string(), + topic: "realtime:table-db-changes".to_owned(), payload: ProtocolPayload::PostgresChanges(PostgresChangesPayload { data: Data { - table: "rooms".to_string(), + table: "rooms".to_owned(), type_: PostgresDataChangeEvent::Delete, record: None, old_record: Some(Buffer(old_record_bytes)), columns: vec![ Column { - name: "id".to_string(), - type_: "uuid".to_string(), + name: "id".to_owned(), + type_: "uuid".to_owned(), }, Column { - name: "created_at".to_string(), - type_: "timestamptz".to_string(), + name: "created_at".to_owned(), + type_: "timestamptz".to_owned(), }, Column { - name: "name".to_string(), - type_: "text".to_string(), + name: "name".to_owned(), + type_: "text".to_owned(), }, Column { - name: "owner_id".to_string(), - type_: "uuid".to_string(), + name: "owner_id".to_owned(), + type_: "uuid".to_owned(), }, Column { - name: "server_id".to_string(), - type_: "uuid".to_string(), + name: "server_id".to_owned(), + type_: "uuid".to_owned(), }, ], - commit_timestamp: "2024-10-19T07:54:05.101Z".to_string(), + commit_timestamp: "2024-10-19T07:54:05.101Z".to_owned(), errors: None, - schema: "public".to_string(), + schema: "public".to_owned(), }, ids: vec![38377940], }), @@ -1211,7 +1211,7 @@ pub mod postgres_changes { dbg!(serialized); let deserialized_struct: ProtocolMessage = - simd_json::from_slice(json_data.to_string().into_bytes().as_mut_slice()).unwrap(); + simd_json::from_slice(json_data.to_owned().into_bytes().as_mut_slice()).unwrap(); dbg!(&deserialized_struct); assert_eq!(deserialized_struct, expected_struct); diff --git a/crates/supabase-realtime/src/realtime.rs b/crates/supabase-realtime/src/realtime.rs index e2c2395..e5b1e29 100644 --- a/crates/supabase-realtime/src/realtime.rs +++ b/crates/supabase-realtime/src/realtime.rs @@ -1,9 +1,9 @@ -use std::rc::Rc; -use std::task::Poll; +use alloc::rc::Rc; +use core::task::Poll; use fastwebsockets::Frame; use futures::stream::FuturesUnordered; -use futures::{FutureExt, SinkExt, Stream, StreamExt}; +use futures::{SinkExt as _, Stream, StreamExt as _}; use supabase_auth::LoginCredentials; use tokio::sync::Mutex; use tokio::time::timeout; @@ -35,10 +35,11 @@ pub struct RealtimeConnection { type RealtimeStreamType = Result; impl RealtimeConnection { - const HEARTBEAT_PERIOD: std::time::Duration = std::time::Duration::from_secs(20); + const HEARTBEAT_PERIOD: core::time::Duration = core::time::Duration::from_secs(20); const DB_UPDATE_TOPIC: &str = "realtime:table-db-changes"; - pub fn new(config: supabase_auth::SupabaseAuthConfig) -> Self { + #[must_use] + pub const fn new(config: supabase_auth::SupabaseAuthConfig) -> Self { Self { config } } @@ -80,7 +81,7 @@ impl RealtimeConnection { ref_counter += 1; join_ref_counter += 1; message::ProtocolMessage { - topic: Self::DB_UPDATE_TOPIC.to_string(), + topic: Self::DB_UPDATE_TOPIC.to_owned(), payload: message::ProtocolPayload::PhxJoin(item), ref_field: Some(ref_counter.to_string()), join_ref: Some(join_ref_counter.to_string()), @@ -95,7 +96,7 @@ impl RealtimeConnection { let interval_stream = IntervalStream::new(interval).fuse(); interval_stream .map(move |_s| message::ProtocolMessage { - topic: "phoenix".to_string(), + topic: "phoenix".to_owned(), payload: message::ProtocolPayload::Heartbeat(message::heartbeat::Heartbeat), ref_field: None, join_ref: None, @@ -108,7 +109,7 @@ impl RealtimeConnection { auth_stream .map(|item| { item.map(|item| message::ProtocolMessage { - topic: Self::DB_UPDATE_TOPIC.to_string(), + topic: Self::DB_UPDATE_TOPIC.to_owned(), payload: message::ProtocolPayload::AccessToken(AccessToken { access_token: item.access_token, }), @@ -151,7 +152,8 @@ pub struct RealtimeBaseConnection { } impl RealtimeBaseConnection { - pub fn new(url: url::Url) -> Self { + #[must_use] + pub const fn new(url: url::Url) -> Self { Self { url } } pub async fn connect + Unpin>( @@ -200,7 +202,7 @@ impl RealtimeBaseConnection { match write_futures.poll_next_unpin(cx) { Poll::Ready(Some(res)) => match res { - Ok(_) => { + Ok(()) => { tracing::debug!("Message sent successfully"); } Err(err) => { @@ -217,10 +219,10 @@ impl RealtimeBaseConnection { Poll::Ready(Some(item)) => { tracing::debug!(?item, "Received item"); cx.waker().wake_by_ref(); - return Poll::Ready(Some(Ok(item))); + Poll::Ready(Some(Ok(item))) } - Poll::Ready(None) => return Poll::Ready(None), - Poll::Pending => return Poll::Pending, + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, } }); Ok(stream_to_return) @@ -232,7 +234,7 @@ async fn read_from_ws( mut tx: futures::channel::mpsc::UnboundedSender, ) { tracing::info!("Starting read_from_ws task"); - let duration = std::time::Duration::from_millis(100); + let duration = core::time::Duration::from_millis(100); loop { let mut con = con.lock().await; let Ok(frame) = timeout(duration, con.read_frame()).await else {