Skip to content

Commit

Permalink
Loosen locking around RPC calls for better multi-threading (#148)
Browse files Browse the repository at this point in the history
* Loosen locking around RPC calls for better multi-threading

* Add benchmark for parallel RPC
  • Loading branch information
Kimahriman authored Nov 2, 2024
1 parent c4fedda commit 32510e9
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 35 deletions.
18 changes: 18 additions & 0 deletions rust/benches/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashSet;

use criterion::*;
use futures::future::join_all;
use hdfs_native::{minidfs::MiniDfs, Client, WriteOptions};

fn bench(c: &mut Criterion) {
Expand Down Expand Up @@ -34,6 +35,23 @@ fn bench(c: &mut Criterion) {
let fs = hdfs::hdfs::get_hdfs().unwrap();
b.iter(|| fs.get_file_status("/bench").unwrap())
});

group.sampling_mode(SamplingMode::Flat);
group.bench_function("getFileInfo-parallel", |b| {
b.to_async(&rt).iter_batched(
|| {
(0..100)
.map(|_| client.get_file_info("/bench"))
.collect::<Vec<_>>()
},
|futures| async {
for result in join_all(futures).await {
result.unwrap();
}
},
BatchSize::SmallInput,
)
});
}

criterion_group!(benches, bench);
Expand Down
8 changes: 6 additions & 2 deletions rust/src/hdfs/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,11 @@ impl RpcConnection {
Ok(())
}

pub(crate) async fn call(&self, method_name: &str, message: &[u8]) -> Result<Bytes> {
pub(crate) async fn call(
&self,
method_name: &str,
message: &[u8],
) -> Result<oneshot::Receiver<Result<Bytes>>> {
let call_id = self.get_next_call_id();
let conn_header = self.get_connection_header(call_id, 0);

Expand All @@ -284,7 +288,7 @@ impl RpcConnection {
self.write_messages(&[&conn_header_buf, &header_buf, message])
.await?;

receiver.await.unwrap()
Ok(receiver)
}
}

Expand Down
61 changes: 28 additions & 33 deletions rust/src/hdfs/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const OBSERVER_RETRY_EXCEPTION: &str = "org.apache.hadoop.ipc.ObserverRetryOnAct
#[derive(Debug)]
struct ProxyConnection {
url: String,
inner: Option<RpcConnection>,
inner: Arc<tokio::sync::Mutex<Option<RpcConnection>>>,
alignment_context: Arc<Mutex<AlignmentContext>>,
nameservice: Option<String>,
}
Expand All @@ -37,37 +37,42 @@ impl ProxyConnection {
) -> Self {
ProxyConnection {
url,
inner: None,
inner: Arc::new(tokio::sync::Mutex::new(None)),
alignment_context,
nameservice,
}
}

async fn get_connection(&mut self) -> Result<&RpcConnection> {
if self.inner.is_none() || !self.inner.as_ref().unwrap().is_alive() {
self.inner = Some(
RpcConnection::connect(
&self.url,
self.alignment_context.clone(),
self.nameservice.as_deref(),
)
.await?,
);
}
Ok(self.inner.as_ref().unwrap())
}
async fn call(&self, method_name: &str, message: &[u8]) -> Result<Bytes> {
let receiver = {
let mut connection = self.inner.lock().await;
match &mut *connection {
Some(c) if c.is_alive() => (),
c => {
*c = Some(
RpcConnection::connect(
&self.url,
self.alignment_context.clone(),
self.nameservice.as_deref(),
)
.await?,
);
}
}

async fn call(&mut self, method_name: &str, message: &[u8]) -> Result<Bytes> {
self.get_connection()
.await?
.call(method_name, message)
.await
connection
.as_ref()
.unwrap()
.call(method_name, message)
.await?
};
receiver.await.unwrap()
}
}

#[derive(Debug)]
pub(crate) struct NameServiceProxy {
proxy_connections: Vec<Arc<tokio::sync::Mutex<ProxyConnection>>>,
proxy_connections: Vec<ProxyConnection>,
current_index: AtomicUsize,
msycned: AtomicBool,
}
Expand All @@ -80,22 +85,14 @@ impl NameServiceProxy {

let proxy_connections = if let Some(port) = nameservice.port() {
let url = format!("{}:{}", nameservice.host_str().unwrap(), port);
vec![Arc::new(tokio::sync::Mutex::new(ProxyConnection::new(
url,
alignment_context.clone(),
None,
)))]
vec![ProxyConnection::new(url, alignment_context.clone(), None)]
} else if let Some(host) = nameservice.host_str() {
// TODO: Add check for no configured namenodes
config
.get_urls_for_nameservice(host)?
.into_iter()
.map(|url| {
Arc::new(tokio::sync::Mutex::new(ProxyConnection::new(
url,
alignment_context.clone(),
Some(host.to_string()),
)))
ProxyConnection::new(url, alignment_context.clone(), Some(host.to_string()))
})
.collect()
} else {
Expand Down Expand Up @@ -142,8 +139,6 @@ impl NameServiceProxy {
let mut attempts = 0;
loop {
let result = self.proxy_connections[proxy_index]
.lock()
.await
.call(method_name, &message)
.await;

Expand Down

0 comments on commit 32510e9

Please sign in to comment.