Skip to content

Commit

Permalink
♻️ zb: internal refactoring of address guid handling
Browse files Browse the repository at this point in the history
Connect to the target and return the associated guid.

As a target address may have multiple addresses in the future, return
the actual associated guid used.
  • Loading branch information
elmarco committed Feb 1, 2024
1 parent 079f584 commit 428fd86
Showing 1 changed file with 22 additions and 22 deletions.
44 changes: 22 additions & 22 deletions zbus/src/connection/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::{
async_lock::RwLock,
names::{InterfaceName, UniqueName, WellKnownName},
object_server::Interface,
Connection, Error, Executor, Guid, Result,
Connection, Error, Executor, Guid, OwnedGuid, Result,
};

use super::{
Expand Down Expand Up @@ -343,17 +343,11 @@ impl<'a> Builder<'a> {
}

async fn build_(mut self, executor: Executor<'static>) -> Result<Connection> {
let mut stream = self.stream_for_target().await?;
let (mut stream, server_guid) = self.target_connect().await?;
let mut auth = match self.guid {
None => {
let guid = match self.target {
Some(Target::Address(ref addr)) => {
addr.guid().map(|guid| guid.to_owned().into())
}
_ => None,
};
// SASL Handshake
Authenticated::client(stream, guid, self.auth_mechanisms).await?
Authenticated::client(stream, server_guid, self.auth_mechanisms).await?
}
Some(guid) => {
if !self.p2p {
Expand Down Expand Up @@ -456,10 +450,10 @@ impl<'a> Builder<'a> {
}
}

async fn stream_for_target(&mut self) -> Result<BoxedSplit> {
async fn target_connect(&mut self) -> Result<(BoxedSplit, Option<OwnedGuid>)> {
// SAFETY: `self.target` is always `Some` from the beginning and this method is only called
// once.
Ok(match self.target.take().unwrap() {
let split = match self.target.take().unwrap() {
#[cfg(not(feature = "tokio"))]
Target::UnixStream(stream) => Async::new(stream)?.into(),
#[cfg(all(unix, feature = "tokio"))]
Expand All @@ -474,18 +468,24 @@ impl<'a> Builder<'a> {
Target::VsockStream(stream) => Async::new(stream)?.into(),
#[cfg(feature = "tokio-vsock")]
Target::VsockStream(stream) => stream.into(),
Target::Address(address) => match address.connect().await? {
#[cfg(any(unix, not(feature = "tokio")))]
address::transport::Stream::Unix(stream) => stream.into(),
address::transport::Stream::Tcp(stream) => stream.into(),
#[cfg(any(
all(feature = "vsock", not(feature = "tokio")),
feature = "tokio-vsock"
))]
address::transport::Stream::Vsock(stream) => stream.into(),
},
Target::Address(address) => {
let guid = address.guid().map(|g| g.to_owned().into());
let split = match address.connect().await? {
#[cfg(any(unix, not(feature = "tokio")))]
address::transport::Stream::Unix(stream) => stream.into(),
address::transport::Stream::Tcp(stream) => stream.into(),
#[cfg(any(
all(feature = "vsock", not(feature = "tokio")),
feature = "tokio-vsock"
))]
address::transport::Stream::Vsock(stream) => stream.into(),
};
return Ok((split, guid));
}
Target::Socket(stream) => stream,
})
};

Ok((split, None))
}
}

Expand Down

0 comments on commit 428fd86

Please sign in to comment.