diff --git a/zbus/src/address.rs b/zbus/src/address.rs index 4737faed9..34f4c17b0 100644 --- a/zbus/src/address.rs +++ b/zbus/src/address.rs @@ -9,30 +9,10 @@ //! //! [Server addresses]: https://dbus.freedesktop.org/doc/dbus-specification.html#addresses -#[cfg(target_os = "macos")] -use crate::process::run; -#[cfg(windows)] -use crate::win32::windows_autolaunch_bus_address; use crate::{Error, Result}; -#[cfg(not(feature = "tokio"))] -use async_io::Async; #[cfg(all(unix, not(target_os = "macos")))] use nix::unistd::Uid; -#[cfg(not(feature = "tokio"))] -use std::net::{SocketAddr, TcpStream, ToSocketAddrs}; -#[cfg(all(unix, not(feature = "tokio")))] -use std::os::unix::net::UnixStream; -use std::{collections::HashMap, env, str::FromStr}; -#[cfg(feature = "tokio")] -use tokio::net::TcpStream; -#[cfg(all(unix, feature = "tokio"))] -use tokio::net::UnixStream; -#[cfg(feature = "tokio-vsock")] -use tokio_vsock::VsockStream; -#[cfg(all(windows, not(feature = "tokio")))] -use uds_windows::UnixStream; -#[cfg(all(feature = "vsock", not(feature = "tokio")))] -use vsock::VsockStream; +use std::{collections::HashMap, convert::TryFrom, env, str::FromStr}; use std::{ ffi::OsString, @@ -192,214 +172,7 @@ pub enum Address { UnixTmpDir(OsString), } -#[cfg(not(feature = "tokio"))] -#[derive(Debug)] -pub(crate) enum Stream { - Unix(Async), - Tcp(Async), - #[cfg(feature = "vsock")] - Vsock(Async), -} - -#[cfg(feature = "tokio")] -#[derive(Debug)] -pub(crate) enum Stream { - #[cfg(unix)] - Unix(UnixStream), - Tcp(TcpStream), - #[cfg(feature = "tokio-vsock")] - Vsock(VsockStream), -} - -#[cfg(not(feature = "tokio"))] -async fn connect_tcp(addr: TcpAddress) -> Result> { - let addrs = crate::Task::spawn_blocking( - move || -> Result> { - let addrs = (addr.host(), addr.port()).to_socket_addrs()?.filter(|a| { - if let Some(family) = addr.family() { - if family == TcpAddressFamily::Ipv4 { - a.is_ipv4() - } else { - a.is_ipv6() - } - } else { - true - } - }); - Ok(addrs.collect()) - }, - "connect tcp", - ) - .await - .map_err(|e| Error::Address(format!("Failed to receive TCP addresses: {e}")))?; - - // we could attempt connections in parallel? - let mut last_err = Error::Address("Failed to connect".into()); - for addr in addrs { - match Async::::connect(addr).await { - Ok(stream) => return Ok(stream), - Err(e) => last_err = e.into(), - } - } - - Err(last_err) -} - -#[cfg(feature = "tokio")] -async fn connect_tcp(addr: TcpAddress) -> Result { - TcpStream::connect((addr.host(), addr.port())) - .await - .map_err(|e| Error::InputOutput(e.into())) -} - -#[cfg(target_os = "macos")] -pub(crate) async fn macos_launchd_bus_address(env_key: &str) -> Result
{ - let output = run("launchctl", ["getenv", env_key]) - .await - .expect("failed to wait on launchctl output"); - - if !output.status.success() { - return Err(crate::Error::Address(format!( - "launchctl terminated with code: {}", - output.status - ))); - } - - let addr = String::from_utf8(output.stdout).map_err(|e| { - crate::Error::Address(format!("Unable to parse launchctl output as UTF-8: {}", e)) - })?; - - format!("unix:path={}", addr.trim()).parse() -} - impl Address { - #[cfg_attr(any(target_os = "macos", windows), async_recursion::async_recursion)] - pub(crate) async fn connect(self) -> Result { - match self { - Address::Unix(p) => { - #[cfg(not(feature = "tokio"))] - { - #[cfg(windows)] - { - let stream = crate::Task::spawn_blocking( - move || UnixStream::connect(p), - "unix stream connection", - ) - .await?; - Async::new(stream) - .map(Stream::Unix) - .map_err(|e| Error::InputOutput(e.into())) - } - - #[cfg(not(windows))] - { - Async::::connect(p) - .await - .map(Stream::Unix) - .map_err(|e| Error::InputOutput(e.into())) - } - } - - #[cfg(feature = "tokio")] - { - #[cfg(unix)] - { - UnixStream::connect(p) - .await - .map(Stream::Unix) - .map_err(|e| Error::InputOutput(e.into())) - } - - #[cfg(not(unix))] - { - let _ = p; - Err(Error::Unsupported) - } - } - } - - #[cfg(all(feature = "vsock", not(feature = "tokio")))] - Address::Vsock(addr) => { - let stream = VsockStream::connect_with_cid_port(addr.cid, addr.port)?; - Async::new(stream).map(Stream::Vsock).map_err(Into::into) - } - - #[cfg(feature = "tokio-vsock")] - Address::Vsock(addr) => VsockStream::connect(addr.cid, addr.port) - .await - .map(Stream::Vsock) - .map_err(Into::into), - - Address::Tcp(addr) => connect_tcp(addr).await.map(Stream::Tcp), - - Address::NonceTcp { addr, nonce_file } => { - let mut stream = connect_tcp(addr).await?; - - #[cfg(unix)] - let nonce_file = { - use std::os::unix::ffi::OsStrExt; - std::ffi::OsStr::from_bytes(&nonce_file) - }; - - #[cfg(windows)] - let nonce_file = std::str::from_utf8(&nonce_file) - .map_err(|_| Error::Address("nonce file path is invalid UTF-8".to_owned()))?; - - #[cfg(not(feature = "tokio"))] - { - let nonce = std::fs::read(nonce_file)?; - let mut nonce = &nonce[..]; - - while !nonce.is_empty() { - let len = stream - .write_with_mut(|s| std::io::Write::write(s, nonce)) - .await?; - nonce = &nonce[len..]; - } - } - - #[cfg(feature = "tokio")] - { - let nonce = tokio::fs::read(nonce_file).await?; - tokio::io::AsyncWriteExt::write_all(&mut stream, &nonce).await?; - } - - Ok(Stream::Tcp(stream)) - } - - #[cfg(not(windows))] - Address::Autolaunch(_) => Err(Error::Address( - "Autolaunch addresses are only supported on Windows".to_owned(), - )), - - #[cfg(windows)] - Address::Autolaunch(Some(_)) => Err(Error::Address( - "Autolaunch scopes are currently unsupported".to_owned(), - )), - - #[cfg(windows)] - Address::Autolaunch(None) => { - let addr = windows_autolaunch_bus_address()?; - addr.connect().await - } - - #[cfg(not(target_os = "macos"))] - Address::Launchd(_) => Err(Error::Address( - "Launchd addresses are only supported on macOS".to_owned(), - )), - - #[cfg(target_os = "macos")] - Address::Launchd(env) => { - let addr = macos_launchd_bus_address(&env).await?; - addr.connect().await - } - Address::UnixDir(_) | Address::UnixTmpDir(_) => { - // you can't connect to a unix:dir - Err(Error::Unsupported) - } - } - } - /// Get the address for session socket respecting the DBUS_SESSION_BUS_ADDRESS environment /// variable. If we don't recognize the value (or it's not set) we fall back to /// $XDG_RUNTIME_DIR/bus @@ -991,67 +764,4 @@ mod tests { "vsock:cid=98,port=2934", // no support for guid= yet.. ); } - - #[test] - fn connect_tcp() { - let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); - let port = listener.local_addr().unwrap().port(); - let addr = Address::from_str(&format!("tcp:host=localhost,port={port}")).unwrap(); - crate::utils::block_on(async { addr.connect().await }).unwrap(); - } - - #[test] - fn connect_nonce_tcp() { - struct PercentEncoded<'a>(&'a [u8]); - - impl std::fmt::Display for PercentEncoded<'_> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - super::encode_percents(f, self.0) - } - } - - use std::io::Write; - - const TEST_COOKIE: &[u8] = b"VERILY SECRETIVE"; - - let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); - let port = listener.local_addr().unwrap().port(); - - let mut cookie = tempfile::NamedTempFile::new().unwrap(); - cookie.as_file_mut().write_all(TEST_COOKIE).unwrap(); - - let encoded_path = format!( - "{}", - PercentEncoded(cookie.path().to_str().unwrap().as_ref()) - ); - - let addr = Address::from_str(&format!( - "nonce-tcp:host=localhost,port={port},noncefile={encoded_path}" - )) - .unwrap(); - - let (sender, receiver) = std::sync::mpsc::sync_channel(1); - - std::thread::spawn(move || { - use std::io::Read; - - let mut client = listener.incoming().next().unwrap().unwrap(); - - let mut buf = [0u8; 16]; - client.read_exact(&mut buf).unwrap(); - - sender.send(buf == TEST_COOKIE).unwrap(); - }); - - crate::utils::block_on(addr.connect()).unwrap(); - - let saw_cookie = receiver - .recv_timeout(std::time::Duration::from_millis(100)) - .expect("nonce file content hasn't been received by server thread in time"); - - assert!( - saw_cookie, - "nonce file content has been received, but was invalid" - ); - } } diff --git a/zbus/src/lib.rs b/zbus/src/lib.rs index ae546eb4a..1c865db1a 100644 --- a/zbus/src/lib.rs +++ b/zbus/src/lib.rs @@ -43,20 +43,23 @@ pub use error::*; pub mod addr; +#[doc(hidden)] pub mod address; +#[deprecated(note = "Use `addr::DBusAddress` instead")] +#[doc(hidden)] pub use address::Address; -#[deprecated(note = "Use `address::TcpAddress` instead")] +#[deprecated(note = "Use `addr::transport::Tcp` instead")] #[doc(hidden)] pub use address::TcpAddress; -#[deprecated(note = "Use `address::TcpAddressFamily` instead")] +#[deprecated(note = "Use `addr::transport::TcpFamily` instead")] #[doc(hidden)] pub use address::TcpAddressFamily; #[cfg(any( all(feature = "vsock", not(feature = "tokio")), feature = "tokio-vsock" ))] -#[deprecated(note = "Use `address::VsockAddress` instead")] +#[deprecated(note = "Use `addr::transport::Vsock` instead")] #[doc(hidden)] pub use address::VsockAddress; diff --git a/zbus/src/win32.rs b/zbus/src/win32.rs index 655698e8d..5b3f33162 100644 --- a/zbus/src/win32.rs +++ b/zbus/src/win32.rs @@ -26,7 +26,6 @@ use winapi::{ }, }; -use crate::Address; #[cfg(not(feature = "tokio"))] use uds_windows::UnixStream; @@ -301,17 +300,6 @@ pub(crate) fn read_shm(name: &str) -> Result, crate::Error> { Ok(data.to_bytes().to_owned()) } -pub fn windows_autolaunch_bus_address() -> Result { - let mutex = Mutex::new("DBusAutolaunchMutex")?; - let _guard = mutex.lock(); - - let addr = read_shm("DBusDaemonAddressInfo")?; - let addr = String::from_utf8(addr) - .map_err(|e| crate::Error::Address(format!("Unable to parse address as UTF-8: {}", e)))?; - - addr.parse() -} - #[cfg(test)] mod tests { use super::*;