diff --git a/crates/hdfs-native/tests/common/mod.rs b/crates/hdfs-native/tests/common/mod.rs index 9969a89..047df63 100644 --- a/crates/hdfs-native/tests/common/mod.rs +++ b/crates/hdfs-native/tests/common/mod.rs @@ -1,4 +1,5 @@ #![allow(dead_code)] +use bytes::Buf; use std::collections::HashSet; use std::io::{BufWriter, Write}; use std::process::Command; @@ -38,3 +39,17 @@ pub fn setup(features: &HashSet) -> MiniDfs { dfs } + +pub fn assert_bufs_equal(buf1: &impl Buf, buf2: &impl Buf, message: Option) { + assert_eq!(buf1.chunk().len(), buf2.chunk().len()); + + let message = message.unwrap_or_default(); + + buf1.chunk() + .iter() + .zip(buf2.chunk()) + .enumerate() + .for_each(move |(i, (b1, b2))| { + assert_eq!(b1, b2, "data is different as position {} {}", i, message) + }); +} diff --git a/crates/hdfs-native/tests/test_integration.rs b/crates/hdfs-native/tests/test_integration.rs index dc40481..8dadbbd 100644 --- a/crates/hdfs-native/tests/test_integration.rs +++ b/crates/hdfs-native/tests/test_integration.rs @@ -3,8 +3,8 @@ mod common; #[cfg(feature = "integration-test")] mod test { - use crate::common::{setup, TEST_FILE_INTS}; - use bytes::{Buf, BufMut, BytesMut}; + use crate::common::{assert_bufs_equal, setup, TEST_FILE_INTS}; + use bytes::{BufMut, BytesMut}; use hdfs_native::{client::FileStatus, minidfs::DfsFeatures, Client, Result, WriteOptions}; use serial_test::serial; use std::collections::HashSet; @@ -109,31 +109,15 @@ mod test { test_file_info(&client).await?; test_listing(&client).await?; - test_read(&client).await?; test_rename(&client).await?; test_dirs(&client).await?; - test_create(&client).await?; - test_append(&client).await?; + test_read_write(&client).await?; // We use writing to create files, so do this after test_recursive_listing(&client).await?; Ok(()) } - fn assert_bufs_equal(buf1: &impl Buf, buf2: &impl Buf, message: Option) { - assert_eq!(buf1.chunk().len(), buf2.chunk().len()); - - let message = message.unwrap_or_default(); - - buf1.chunk() - .iter() - .zip(buf2.chunk()) - .enumerate() - .for_each(move |(i, (b1, b2))| { - assert_eq!(b1, b2, "data is different as position {} {}", i, message) - }); - } - async fn test_file_info(client: &Client) -> Result<()> { let status = client.get_file_info("/testfile").await?; // Path is empty, I guess because we already know what file we just got the info for? @@ -157,33 +141,6 @@ mod test { Ok(()) } - async fn test_read(client: &Client) -> Result<()> { - // Read the whole file - let reader = client.read("/testfile").await?; - let mut buf = reader.read_range(0, TEST_FILE_INTS * 4).await?; - for i in 0..TEST_FILE_INTS as i32 { - assert_eq!(buf.get_i32(), i); - } - - // Read a single integer from the file - let mut buf = reader.read_range(TEST_FILE_INTS / 2 * 4, 4).await?; - assert_eq!(buf.get_i32(), TEST_FILE_INTS as i32 / 2); - - // Read the whole file in 1 MiB chunks - let mut offset = 0; - let mut val = 0; - while offset < TEST_FILE_INTS * 4 { - let mut buf = reader.read_range(offset, 1024 * 1024).await?; - while !buf.is_empty() { - assert_eq!(buf.get_i32(), val); - val += 1; - } - offset += 1024 * 1024; - } - - Ok(()) - } - async fn test_rename(client: &Client) -> Result<()> { client.rename("/testfile", "/testfile2", false).await?; @@ -220,7 +177,7 @@ mod test { Ok(()) } - async fn test_create(client: &Client) -> Result<()> { + async fn test_read_write(client: &Client) -> Result<()> { let write_options = WriteOptions::default().overwrite(true); // Create an empty file @@ -230,88 +187,45 @@ mod test { assert_eq!(client.get_file_info("/newfile").await?.length, 0); - // Check a small files, a file that is exactly one block, and a file slightly bigger than a block - for size_to_check in [16i32, 128 * 1024 * 1024, 130 * 1024 * 1024] { - let ints_to_write = size_to_check / 4; + let mut writer = client.create("/newfile", &write_options).await?; - let mut writer = client.create("/newfile", &write_options).await?; + let mut file_contents = BytesMut::new(); + let mut data = BytesMut::new(); + for i in 0..1024 { + file_contents.put_i32(i); + data.put_i32(i); + } - let mut data = BytesMut::with_capacity(size_to_check as usize); - for i in 0..ints_to_write { - data.put_i32(i); - } + let buf = data.freeze(); - let buf = data.freeze(); + writer.write(buf).await?; + writer.close().await?; - writer.write(buf.clone()).await?; - writer.close().await?; + assert_eq!(client.get_file_info("/newfile").await?.length, 4096); - assert_eq!( - client.get_file_info("/newfile").await?.length, - size_to_check as usize - ); + let mut reader = client.read("/newfile").await?; + let read_data = reader.read(reader.file_length()).await?; - let mut reader = client.read("/newfile").await?; - let read_data = reader.read(reader.file_length()).await?; + assert_bufs_equal(&file_contents, &read_data, None); - assert_bufs_equal( - &buf, - &read_data, - Some(format!("for size {}", size_to_check)), - ); + let mut data = BytesMut::new(); + for i in 0..1024 { + file_contents.put_i32(i); + data.put_i32(i); } - assert!(client.delete("/newfile", false).await.is_ok_and(|r| r)); - - Ok(()) - } + let buf = data.freeze(); - async fn test_append(client: &Client) -> Result<()> { - // Create an empty file - client - .create("/newfile", WriteOptions::default()) - .await? - .close() - .await?; + let mut writer = client.append("/newfile").await?; + writer.write(buf).await?; + writer.close().await?; - assert_eq!(client.get_file_info("/newfile").await?.length, 0); + let mut reader = client.read("/newfile").await?; + let read_data = reader.read(reader.file_length()).await?; - // Keep track of what should be in the file - let mut file_contents = BytesMut::new(); + assert_bufs_equal(&file_contents, &read_data, None); - // Test a few different things with each range: - - for range in [ - // Append a few bytes to an empty file - 0u32..4, - // Append a few bytes to a partial chunk - 4..8, - // Append multiple chunks to a file - 8..2048, - // Append to the file filling up the block - 2048..(128 * 1024 * 1024), - // Append some bytes to a new block - 0..511, - // Append to a chunk with only one byte missing - 512..1024, - ] { - let mut data = BytesMut::new(); - for i in range { - file_contents.put_u8((i % 256) as u8); - data.put_u8((i % 256) as u8); - } - - let buf = data.freeze(); - - let mut writer = client.append("/newfile").await?; - writer.write(buf).await?; - writer.close().await?; - - let mut reader = client.read("/newfile").await?; - let read_data = reader.read(reader.file_length()).await?; - - assert_bufs_equal(&file_contents, &read_data, None); - } + assert!(client.delete("/newfile", false).await.is_ok_and(|r| r)); Ok(()) } diff --git a/crates/hdfs-native/tests/test_read.rs b/crates/hdfs-native/tests/test_read.rs new file mode 100644 index 0000000..fad48da --- /dev/null +++ b/crates/hdfs-native/tests/test_read.rs @@ -0,0 +1,45 @@ +#[cfg(feature = "integration-test")] +mod common; + +#[cfg(feature = "integration-test")] +mod test { + use crate::common::{setup, TEST_FILE_INTS}; + use bytes::Buf; + use hdfs_native::{minidfs::DfsFeatures, Client, Result}; + use serial_test::serial; + use std::collections::HashSet; + + #[tokio::test] + #[serial] + async fn test_read() -> Result<()> { + let _ = env_logger::builder().is_test(true).try_init(); + + let _dfs = setup(&HashSet::from([DfsFeatures::HA])); + let client = Client::default(); + + // Read the whole file + let reader = client.read("/testfile").await?; + let mut buf = reader.read_range(0, TEST_FILE_INTS * 4).await?; + for i in 0..TEST_FILE_INTS as i32 { + assert_eq!(buf.get_i32(), i); + } + + // Read a single integer from the file + let mut buf = reader.read_range(TEST_FILE_INTS / 2 * 4, 4).await?; + assert_eq!(buf.get_i32(), TEST_FILE_INTS as i32 / 2); + + // Read the whole file in 1 MiB chunks + let mut offset = 0; + let mut val = 0; + while offset < TEST_FILE_INTS * 4 { + let mut buf = reader.read_range(offset, 1024 * 1024).await?; + while !buf.is_empty() { + assert_eq!(buf.get_i32(), val); + val += 1; + } + offset += 1024 * 1024; + } + + Ok(()) + } +} diff --git a/crates/hdfs-native/tests/test_write.rs b/crates/hdfs-native/tests/test_write.rs new file mode 100644 index 0000000..93f1267 --- /dev/null +++ b/crates/hdfs-native/tests/test_write.rs @@ -0,0 +1,119 @@ +#[cfg(feature = "integration-test")] +mod common; + +#[cfg(feature = "integration-test")] +mod test { + use crate::common::{assert_bufs_equal, setup}; + use bytes::{BufMut, BytesMut}; + use hdfs_native::{minidfs::DfsFeatures, Client, Result, WriteOptions}; + use serial_test::serial; + use std::collections::HashSet; + + #[tokio::test] + #[serial] + async fn test_write() { + let _ = env_logger::builder().is_test(true).try_init(); + + let _dfs = setup(&HashSet::from([DfsFeatures::HA])); + let client = Client::default(); + + test_create(&client).await.unwrap(); + test_append(&client).await.unwrap(); + } + + async fn test_create(client: &Client) -> Result<()> { + let write_options = WriteOptions::default().overwrite(true); + + // Create an empty file + let mut writer = client.create("/newfile", &write_options).await?; + + writer.close().await?; + + assert_eq!(client.get_file_info("/newfile").await?.length, 0); + + // Check a small files, a file that is exactly one block, and a file slightly bigger than a block + for size_to_check in [16i32, 128 * 1024 * 1024, 130 * 1024 * 1024] { + let ints_to_write = size_to_check / 4; + + let mut writer = client.create("/newfile", &write_options).await?; + + let mut data = BytesMut::with_capacity(size_to_check as usize); + for i in 0..ints_to_write { + data.put_i32(i); + } + + let buf = data.freeze(); + + writer.write(buf.clone()).await?; + writer.close().await?; + + assert_eq!( + client.get_file_info("/newfile").await?.length, + size_to_check as usize + ); + + let mut reader = client.read("/newfile").await?; + let read_data = reader.read(reader.file_length()).await?; + + assert_bufs_equal( + &buf, + &read_data, + Some(format!("for size {}", size_to_check)), + ); + } + + assert!(client.delete("/newfile", false).await.is_ok_and(|r| r)); + + Ok(()) + } + + async fn test_append(client: &Client) -> Result<()> { + // Create an empty file + client + .create("/newfile", WriteOptions::default()) + .await? + .close() + .await?; + + assert_eq!(client.get_file_info("/newfile").await?.length, 0); + + // Keep track of what should be in the file + let mut file_contents = BytesMut::new(); + + // Test a few different things with each range: + + for range in [ + // Append a few bytes to an empty file + 0u32..4, + // Append a few bytes to a partial chunk + 4..8, + // Append multiple chunks to a file + 8..2048, + // Append to the file filling up the block + 2048..(128 * 1024 * 1024), + // Append some bytes to a new block + 0..511, + // Append to a chunk with only one byte missing + 512..1024, + ] { + let mut data = BytesMut::new(); + for i in range { + file_contents.put_u8((i % 256) as u8); + data.put_u8((i % 256) as u8); + } + + let buf = data.freeze(); + + let mut writer = client.append("/newfile").await?; + writer.write(buf).await?; + writer.close().await?; + + let mut reader = client.read("/newfile").await?; + let read_data = reader.read(reader.file_length()).await?; + + assert_bufs_equal(&file_contents, &read_data, None); + } + + Ok(()) + } +}