Skip to content

Commit

Permalink
[RSDK-5820] WebRTC connection improvement (#95)
Browse files Browse the repository at this point in the history
  • Loading branch information
npmenard authored Feb 22, 2024
1 parent fe9da31 commit aab08d5
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 72 deletions.
100 changes: 67 additions & 33 deletions src/rpc/dial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use super::{
webrtc::{webrtc_action_with_timeout, Options},
};
use crate::gen::google;
use crate::gen::proto::rpc::v1::{
auth_service_client::AuthServiceClient, AuthenticateRequest, Credentials,
};
use crate::gen::proto::rpc::webrtc::v1::{
call_response::Stage, call_update_request::Update,
signaling_service_client::SignalingServiceClient, CallUpdateRequest,
Expand All @@ -13,12 +16,6 @@ use crate::gen::proto::rpc::webrtc::v1::{
CallRequest, IceCandidate, Metadata, RequestHeaders, Strings,
};
use crate::rpc::webrtc;
use crate::{
gen::proto::rpc::v1::{
auth_service_client::AuthServiceClient, AuthenticateRequest, Credentials,
},
rpc::webrtc::PollableAtomicBool,
};
use ::http::header::HeaderName;
use ::http::{
uri::{Authority, Parts, PathAndQuery, Scheme},
Expand All @@ -42,6 +39,7 @@ use std::{
task::{Context as TaskContext, Poll},
time::Duration,
};
use tokio::sync::{mpsc, watch};
use tonic::codegen::{http, BoxFuture};
use tonic::transport::{Body, Channel, Uri};
use tonic::{body::BoxBody, transport::ClientTlsConfig};
Expand Down Expand Up @@ -792,16 +790,21 @@ async fn maybe_connect_via_webrtc(
let sent_done_or_error = Arc::new(AtomicBool::new(false));
let uuid_lock = Arc::new(RwLock::new("".to_string()));
let uuid_for_ice_gathering_thread = uuid_lock.clone();
let is_open = Arc::new(AtomicBool::new(false));
let is_open_read = is_open.clone();

// Using an mpsc channel to report unrecoverable errors during Signaling, so we
// don't have to wait until the timeout expires before giving up on this attempt.
// The size of the channel is set to 1 since any error (or success) should terminate the function
let (is_open_s, mut is_open_r) = mpsc::channel(1);
let on_open_is_open = is_open_s.clone();

data_channel.on_open(Box::new(move || {
is_open.store(true, Ordering::Release);
let _ = on_open_is_open.try_send(None); // ignore sending errors, either an error (or success) was already sent or the operation will succeed
Box::pin(async move {})
}));

let exchange_done = Arc::new(AtomicBool::new(false));
let remote_description_set = Arc::new(AtomicBool::new(false));
let ice_done = Arc::new(AtomicBool::new(false));
let (remote_description_set_s, remote_description_set_r) = watch::channel(None);
let ice_done = Arc::new(tokio::sync::Notify::new());
let ice_done2 = ice_done.clone();

if !webrtc_options.disable_trickle_ice {
Expand All @@ -811,30 +814,44 @@ async fn maybe_connect_via_webrtc(
let sent_done_or_error2 = sent_done_or_error.clone();

let exchange_done = exchange_done.clone();
let remote_description_set = remote_description_set.clone();

let on_local_ice_candidate_failure = is_open_s.clone();
peer_connection.on_ice_candidate(Box::new(
move |ice_candidate: Option<RTCIceCandidate>| {
let remote_description_set = remote_description_set.clone();
if exchange_done.load(Ordering::Acquire) {
return Box::pin(async move {});
}
let channel = channel2.clone();
let sent_done_or_error = sent_done_or_error2.clone();
let ice_done = ice_done.clone();
let uuid_lock = uuid_lock2.clone();
let on_local_ice_candidate_failure = on_local_ice_candidate_failure.clone();
let mut remote_description_set_r = remote_description_set_r.clone();
Box::pin(async move {
let remote_description_set = PollableAtomicBool::new(remote_description_set);
if webrtc_action_with_timeout(remote_description_set)
.await
.is_err()
{
log::info!("timed out on_ice_candidate; remote description was never set");
return;
// If the value in the watch channel has not been set yet, we wait until it does.
// Afterwards Some(()) should be visible to all watcher and any watcher waiting will
// return
if remote_description_set_r.borrow().is_none() {
match webrtc_action_with_timeout(remote_description_set_r.changed()).await {
Ok(Err(e)) => {
let _ = on_local_ice_candidate_failure.try_send(Some(Box::new(
anyhow::anyhow!(
"remote description watch channel is closed with error {e}"
),
)));
}
Err(_) => {
log::info!(
"timed out on_ice_candidate; remote description was never set"
);
let _ = on_local_ice_candidate_failure.try_send(Some(Box::new(
anyhow::anyhow!("timed out waiting for remote description"),
)));
}
_ => (),
}
}

if ice_done.load(Ordering::Acquire) {
return;
}
let uuid = uuid_lock.read().unwrap().to_string();
let mut signaling_client = SignalingServiceClient::new(channel.clone());
match ice_candidate {
Expand All @@ -856,13 +873,19 @@ async fn maybe_connect_via_webrtc(
.and_then(|resp| resp.map_err(anyhow::Error::from))
{
log::error!("Error sending ice candidate: {e}");
let _ = on_local_ice_candidate_failure.try_send(Some(
Box::new(anyhow::anyhow!(
"Error sending ice candidate: {e}"
)),
));
}
}
Err(e) => log::error!("Error parsing ice candidate: {e}"),
}
}
None => {
ice_done.store(true, Ordering::Release);
// will only be executed once when gathering is finished
ice_done.notify_one();
send_done_once(sent_done_or_error, &uuid, channel.clone()).await;
}
}
Expand Down Expand Up @@ -910,17 +933,17 @@ async fn maybe_connect_via_webrtc(
Ok(cr) => match cr {
Some(cr) => cr,
None => {
let ice_done = PollableAtomicBool::new(ice_done2);
// want to delay sending done until we either are actually done, or
// we hit a timeout
let _ = webrtc_action_with_timeout(ice_done).await;
let _ = webrtc_action_with_timeout(ice_done2.notified()).await;
let uuid = uuid.read().unwrap().to_string();
send_done_once(sent_done.clone(), &uuid, channel2.clone()).await;
break;
}
},
Err(e) => {
log::error!("Error processing call response: {e}");
let _ = is_open_s.try_send(Some(Box::new(e)));
break;
}
};
Expand All @@ -931,6 +954,7 @@ async fn maybe_connect_via_webrtc(
let uuid = uuid.read().unwrap().to_string();
let e = anyhow::anyhow!("Init received more than once");
send_error_once(sent_done.clone(), &uuid, &e, channel2.clone()).await;
let _ = is_open_s.try_send(Some(Box::new(e)));
break;
}
init_received.store(true, Ordering::Release);
Expand All @@ -949,6 +973,7 @@ async fn maybe_connect_via_webrtc(
channel2.clone(),
)
.await;
let _ = is_open_s.try_send(Some(Box::new(e)));
break;
}
};
Expand All @@ -973,10 +998,11 @@ async fn maybe_connect_via_webrtc(
channel2.clone(),
)
.await;
let _ = is_open_s.try_send(Some(Box::new(e)));
break;
}
}
remote_description_set.store(true, Ordering::Release);
let _ = remote_description_set_s.send_replace(Some(()));
if webrtc_options.disable_trickle_ice {
send_done_once(sent_done.clone(), &response.uuid, channel2.clone()).await;
break;
Expand All @@ -988,6 +1014,7 @@ async fn maybe_connect_via_webrtc(
if !init_received.load(Ordering::Acquire) {
let e = anyhow::anyhow!("Got update before init stage");
send_error_once(sent_done.clone(), &uuid_s, &e, channel2.clone()).await;
let _ = is_open_s.try_send(Some(Box::new(e)));
break;
}

Expand All @@ -998,6 +1025,7 @@ async fn maybe_connect_via_webrtc(
uuid_s,
);
send_error_once(sent_done.clone(), &uuid_s, &e, channel2.clone()).await;
let _ = is_open_s.try_send(Some(Box::new(e)));
break;
}
match ice_candidate_from_proto(update.candidate) {
Expand All @@ -1017,25 +1045,31 @@ async fn maybe_connect_via_webrtc(
let e = anyhow::Error::from(e);
send_error_once(sent_done.clone(), &uuid_s, &e, channel2.clone())
.await;
let _ = is_open_s.try_send(Some(Box::new(e)));
break;
}
}
Err(e) => log::error!("Error adding ice candidate: {e}"),
Err(e) => log::error!("Error parsing ice candidate: {e}"),
}
}
None => continue,
}
}
});

let is_open_read = is_open_read.clone();
let is_open = PollableAtomicBool::new(is_open_read);

// TODO (GOUT-11): create separate authorization if external_auth_addr and/or creds.Type is `Some`

// Delay returning the client channel until data channel is open, so we don't lose messages
if webrtc_action_with_timeout(is_open).await.is_err() {
return Err(anyhow::anyhow!("Timed out opening data channel."));
let is_open = webrtc_action_with_timeout(is_open_r.recv()).await;
match is_open {
Ok(is_open) => {
if let Some(Some(e)) = is_open {
return Err(anyhow::anyhow!("Couldn't connect to peer with error {e}"));
}
}
Err(_) => {
return Err(anyhow::anyhow!("Timed out opening data channel."));
}
}

exchange_done.store(true, Ordering::Release);
Expand Down
42 changes: 3 additions & 39 deletions src/rpc/webrtc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,7 @@ use bytes::Bytes;
use core::fmt;
use futures::Future;
use http::{header::HeaderName, HeaderMap, HeaderValue, Uri};
use std::{
hint,
pin::Pin,
str::FromStr,
sync::{atomic::AtomicBool, Arc},
task::{Context, Poll},
time::Duration,
};
use std::{hint, str::FromStr, sync::Arc, time::Duration};
use webrtc::{
api::{
interceptor_registry, media_engine::MediaEngine, setting_engine::SettingEngine, APIBuilder,
Expand Down Expand Up @@ -45,35 +38,6 @@ pub(crate) struct Options {
pub(crate) signaling_server_address: String,
}

// an Arc<AtomicBool> that is pollable as a future. Pending when false, Ready(()) when true
pub(crate) struct PollableAtomicBool {
// TODO(RSDK-598): expand the PollableAtomicBool to include load and store methods, and
// to keep a Waker field that awakens when the value is true.
inner: Arc<AtomicBool>,
}

impl PollableAtomicBool {
pub(crate) fn new(inner: Arc<AtomicBool>) -> Self {
Self { inner }
}
}

// implementing Future for AtomicBool allows us to use an AtomicBool in a `tokio::select!`
// statement
impl Future for PollableAtomicBool {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.inner.load(std::sync::atomic::Ordering::Acquire) {
false => {
cx.waker().wake_by_ref();
Poll::Pending
}
true => Poll::Ready(()),
}
}
}

impl fmt::Debug for Options {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Options")
Expand Down Expand Up @@ -302,10 +266,10 @@ pub(crate) async fn action_with_timeout<T>(

tokio::select! {
res = &mut f => {
return Ok(res);
Ok(res)
}
_ = &mut timeout => {
return Err(anyhow::anyhow!("Action timed out"));
Err(anyhow::anyhow!("Action timed out"))
}
}
}
Expand Down

0 comments on commit aab08d5

Please sign in to comment.