Skip to content

Commit

Permalink
Add simple benchmarks (#75)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kimahriman authored Feb 18, 2024
1 parent 578d21e commit 83120f3
Show file tree
Hide file tree
Showing 9 changed files with 215 additions and 9 deletions.
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 14 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,17 @@ cargo test -p hdfs-native --features token,kerberos,intergation-test
```

### Python tests
See the [Python README](./python/README.md)
See the [Python README](./python/README.md)

## Running benchmarks
Some of the benchmarks compare performance to the JVM based client through libhdfs via the fs-hdfs3 crate. Because of that, some extra setup is required to run the benchmarks:

```bash
export HADOOP_CONF_DIR=$(pwd)/crates/hdfs-native/target/test
export CLASSPATH=$(hadoop classpath)
```

then you can run the benchmarks with
```bash
cargo bench -p hdfs-native --features benchmark,integration-test
```
15 changes: 12 additions & 3 deletions crates/hdfs-native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ prost-types = "0.12"
roxmltree = "0.18"
socket2 = "0.5"
thiserror = "1"
tokio = { workspace = true, features = ["rt", "net", "io-util", "macros", "sync", "time"] }
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "net", "io-util", "macros", "sync", "time"] }
url = "2"
users = { version = "0.11", default-features = false }
uuid = { version = "1", features = ["v4"] }
Expand All @@ -37,8 +37,9 @@ prost-build = { version = "0.12", optional = true }
protobuf-src = { version = "1.1", optional = true }

[dev-dependencies]
criterion = "0.5"
criterion = { version = "0.5", features = ["async_tokio", "async_futures"] }
env_logger = "0.10"
fs-hdfs3 = "0.1.12"
serial_test = "2.0.0"
tempfile = "3"
which = "4"
Expand All @@ -49,8 +50,16 @@ token = ["gsasl-sys"]

generate-protobuf = ["prost-build", "protobuf-src"]
integration-test = ["which"]
benchmark = []
benchmark = ["which"]

[[bench]]
name = "ec"
harness = false

[[bench]]
name = "io"
harness = false

[[bench]]
name = "rpc"
harness = false
110 changes: 110 additions & 0 deletions crates/hdfs-native/benches/io.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use std::collections::HashSet;

use bytes::{Buf, BufMut, BytesMut};
use criterion::*;
use hdfs::hdfs::get_hdfs;
use hdfs_native::{minidfs::MiniDfs, Client, WriteOptions};

async fn write_file(client: &Client, ints: usize) {
let mut writer = client
.create("/bench", WriteOptions::default())
.await
.unwrap();

let mut data = BytesMut::with_capacity(ints * 4);
for i in 0..ints {
data.put_u32(i as u32);
}
writer.write(data.freeze()).await.unwrap();
writer.close().await.unwrap();
}

fn bench(c: &mut Criterion) {
let _ = env_logger::builder().is_test(true).try_init();

let _dfs = MiniDfs::with_features(&HashSet::new());
let client = Client::default();

let ints_to_write: usize = 128 * 1024 * 1024; // 128 MiB file

let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();

rt.block_on(async { write_file(&client, ints_to_write).await });

let fs = get_hdfs().unwrap();

let mut group = c.benchmark_group("read");
group.throughput(Throughput::Bytes((ints_to_write * 4) as u64));
group.sample_size(10);

let reader = rt.block_on(client.read("/bench")).unwrap();
group.bench_function("read-native", |b| {
b.to_async(&rt).iter(|| async {
// let reader = client.read("/bench").await.unwrap();

reader.read_range(0, reader.file_length()).await.unwrap()
})
});
group.sample_size(10);
group.bench_function("read-libhdfs", |b| {
b.iter(|| {
let mut buf = BytesMut::zeroed(ints_to_write * 4);
let mut bytes_read = 0;
let fs = get_hdfs().unwrap();
let reader = fs.open("/bench").unwrap();

while bytes_read < ints_to_write * 4 {
bytes_read += reader
.read(&mut buf[bytes_read..ints_to_write * 4])
.unwrap() as usize;
}
reader.close().unwrap();
buf
})
});

let mut data_to_write = BytesMut::with_capacity(ints_to_write * 4);
for i in 0..ints_to_write {
data_to_write.put_i32(i as i32);
}

let buf = data_to_write.freeze();

drop(group);

let mut group = c.benchmark_group("write");
group.throughput(Throughput::Bytes((ints_to_write * 4) as u64));
group.sample_size(10);

group.bench_function("write-native", |b| {
b.to_async(&rt).iter(|| async {
let mut writer = client
.create("/bench-write", WriteOptions::default().overwrite(true))
.await
.unwrap();

writer.write(buf.clone()).await.unwrap();
writer.close().await.unwrap();
})
});

group.sample_size(10);
group.bench_function("write-libhdfs", |b| {
b.iter(|| {
let mut buf = buf.clone();
let writer = fs.create_with_overwrite("/bench-write", true).unwrap();

while buf.remaining() > 0 {
let written = writer.write(&buf[..]).unwrap();
buf.advance(written as usize);
}
writer.close().unwrap();
})
});
}

criterion_group!(benches, bench);
criterion_main!(benches);
41 changes: 41 additions & 0 deletions crates/hdfs-native/benches/rpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use std::collections::HashSet;

use criterion::*;
use hdfs::hdfs::get_hdfs;
use hdfs_native::{minidfs::MiniDfs, Client, WriteOptions};

fn bench(c: &mut Criterion) {
let _ = env_logger::builder().is_test(true).try_init();

let _dfs = MiniDfs::with_features(&HashSet::new());
let client = Client::default();

let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();

rt.block_on(async {
client
.create("/bench", WriteOptions::default())
.await
.unwrap()
.close()
.await
.unwrap();
});

let fs = get_hdfs().unwrap();

let mut group = c.benchmark_group("rpc");
group.bench_function("getFileInfo-native", |b| {
b.to_async(&rt)
.iter(|| async { client.get_file_info("/bench").await.unwrap() })
});
group.bench_function("getFileInfo-libhdfs", |b| {
b.iter(|| fs.get_file_status("/bench").unwrap())
});
}

criterion_group!(benches, bench);
criterion_main!(benches);
9 changes: 6 additions & 3 deletions crates/hdfs-native/src/hdfs/block_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,16 @@ impl ReplicatedBlockStream {
}

async fn next_packet(&mut self) -> Result<Option<Bytes>> {
if self.len == 0 {
return Ok(None);
}
if self.connection.is_none() {
self.select_next_datanode().await?;
}
let conn = self.connection.as_mut().unwrap();

if self.len == 0 {
conn.send_read_success().await?;
return Ok(None);
}

let packet = conn.read_packet().await?;

let packet_offset = if self.offset > packet.header.offset_in_block as usize {
Expand Down
14 changes: 14 additions & 0 deletions crates/hdfs-native/src/hdfs/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,20 @@ impl DatanodeConnection {
Ok(Packet::new(header, checksum, data))
}

pub(crate) async fn send_read_success(&mut self) -> Result<()> {
let client_read_status = hdfs::ClientReadStatusProto {
status: hdfs::Status::ChecksumOk as i32,
};

self.stream
.write_all(&client_read_status.encode_length_delimited_to_vec())
.await?;
self.stream.flush().await?;
self.stream.shutdown().await?;

Ok(())
}

pub(crate) fn split(self) -> (DatanodeReader, DatanodeWriter) {
let (reader, writer) = self.stream.into_inner().into_split();
let reader = DatanodeReader {
Expand Down
2 changes: 1 addition & 1 deletion crates/hdfs-native/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub(crate) mod ec;
pub(crate) mod error;
pub mod file;
pub(crate) mod hdfs;
#[cfg(feature = "integration-test")]
#[cfg(any(feature = "integration-test", feature = "benchmark"))]
pub mod minidfs;
pub(crate) mod proto;
pub(crate) mod security;
Expand Down
1 change: 0 additions & 1 deletion crates/hdfs-native/src/minidfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ impl MiniDfs {
};

env::set_var("HADOOP_CONF_DIR", "target/test");

MiniDfs {
process: child,
url: url.to_string(),
Expand Down

0 comments on commit 83120f3

Please sign in to comment.