Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove need for hadoop binary #82

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ cargo build --features token,kerberos
An object_store implementation for HDFS is provided in the [hdfs-native-object-store](./crates/hdfs-native-object-store/) crate.

## Running tests
The tests are mostly integration tests that utilize a small Java application in `rust/mindifs/` that runs a custom `MiniDFSCluster`. To run the tests, you need to have Java, Maven, Hadoop binaries, and Kerberos tools available and on your path. Any Java version between 8 and 17 should work.
The tests are mostly integration tests that utilize a small Java application in `rust/mindifs/` that runs a custom `MiniDFSCluster`. To run the tests, you need to have Java, Maven, and Kerberos tools available and on your path. Any Java version between 8 and 17 should work.

```bash
cargo test -p hdfs-native --features token,kerberos,intergation-test
Expand Down
13 changes: 13 additions & 0 deletions crates/hdfs-native/minidfs/src/main/java/main/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem;
Expand All @@ -34,6 +36,8 @@

public class Main {

static int TEST_FILE_INTS = 64 * 1024 * 1024;

public static void main(String args[]) throws Exception {
Set<String> flags = new HashSet<>();
for (String arg : args) {
Expand Down Expand Up @@ -162,6 +166,15 @@ public static void main(String args[]) throws Exception {

hdfsConf.writeXml(new FileOutputStream("target/test/core-site.xml"));

if (flags.contains("testfile")) {
FileSystem fs = FileSystem.get(hdfsConf);
FSDataOutputStream os = fs.create(new Path("/testfile"));
for (int i=0; i < TEST_FILE_INTS; i++) {
os.writeInt(i);
}
os.close();
}

System.out.println("Ready!");
if (flags.contains("security")) {
System.out.println(kdc.getKrb5conf().toPath().toString());
Expand Down
2 changes: 2 additions & 0 deletions crates/hdfs-native/src/minidfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use which::which;

#[derive(PartialEq, Eq, Hash, Debug)]
pub enum DfsFeatures {
TESTFILE,
SECURITY,
TOKEN,
PRIVACY,
Expand All @@ -21,6 +22,7 @@ pub enum DfsFeatures {
impl DfsFeatures {
pub fn as_str(&self) -> &str {
match self {
DfsFeatures::TESTFILE => "testfile",
DfsFeatures::EC => "ec",
DfsFeatures::HA => "ha",
DfsFeatures::VIEWFS => "viewfs",
Expand Down
37 changes: 0 additions & 37 deletions crates/hdfs-native/tests/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,45 +1,8 @@
#![allow(dead_code)]
use bytes::Buf;
use std::collections::HashSet;
use std::io::{BufWriter, Write};
use std::process::Command;
use tempfile::NamedTempFile;
use which::which;

use hdfs_native::minidfs::{DfsFeatures, MiniDfs};

pub const TEST_FILE_INTS: usize = 64 * 1024 * 1024;

pub fn setup(features: &HashSet<DfsFeatures>) -> MiniDfs {
let hadoop_exc = which("hadoop").expect("Failed to find hadoop executable");

let dfs = MiniDfs::with_features(features);

let mut file = NamedTempFile::new_in("target/test").unwrap();
{
let mut writer = BufWriter::new(file.as_file_mut());
for i in 0..TEST_FILE_INTS as i32 {
let bytes = i.to_be_bytes();
writer.write_all(&bytes).unwrap();
}
writer.flush().unwrap();
}

let status = Command::new(hadoop_exc)
.args([
"fs",
"-copyFromLocal",
"-f",
file.path().to_str().unwrap(),
&format!("{}/testfile", dfs.url),
])
.status()
.unwrap();
assert!(status.success());

dfs
}

pub fn assert_bufs_equal(buf1: &impl Buf, buf2: &impl Buf, message: Option<String>) {
assert_eq!(buf1.chunk().len(), buf2.chunk().len());

Expand Down
39 changes: 21 additions & 18 deletions crates/hdfs-native/tests/test_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,27 @@ mod common;

#[cfg(feature = "integration-test")]
mod test {
use crate::common::{assert_bufs_equal, setup, TEST_FILE_INTS};
use crate::common::{assert_bufs_equal, TEST_FILE_INTS};
use bytes::{BufMut, BytesMut};
use hdfs_native::{client::FileStatus, minidfs::DfsFeatures, Client, Result, WriteOptions};
use hdfs_native::{
client::FileStatus,
minidfs::{DfsFeatures, MiniDfs},
Client, Result, WriteOptions,
};
use serial_test::serial;
use std::collections::HashSet;

#[tokio::test]
#[serial]
async fn test_basic() {
test_with_features(&HashSet::new()).await.unwrap();
test_with_features(HashSet::new()).await.unwrap();
}

#[tokio::test]
#[serial]
#[cfg(feature = "kerberos")]
async fn test_security_kerberos() {
test_with_features(&HashSet::from([DfsFeatures::SECURITY]))
test_with_features(HashSet::from([DfsFeatures::SECURITY]))
.await
.unwrap();
}
Expand All @@ -28,7 +32,7 @@ mod test {
#[serial]
#[cfg(feature = "token")]
async fn test_security_token() {
test_with_features(&HashSet::from([DfsFeatures::SECURITY, DfsFeatures::TOKEN]))
test_with_features(HashSet::from([DfsFeatures::SECURITY, DfsFeatures::TOKEN]))
.await
.unwrap();
}
Expand All @@ -38,7 +42,7 @@ mod test {
#[serial]
#[cfg(feature = "token")]
async fn test_privacy_token() {
test_with_features(&HashSet::from([
test_with_features(HashSet::from([
DfsFeatures::SECURITY,
DfsFeatures::TOKEN,
DfsFeatures::PRIVACY,
Expand All @@ -51,18 +55,15 @@ mod test {
#[serial]
#[cfg(feature = "kerberos")]
async fn test_privacy_kerberos() {
test_with_features(&HashSet::from([
DfsFeatures::SECURITY,
DfsFeatures::PRIVACY,
]))
.await
.unwrap();
test_with_features(HashSet::from([DfsFeatures::SECURITY, DfsFeatures::PRIVACY]))
.await
.unwrap();
}

#[tokio::test]
#[serial]
async fn test_basic_ha() {
test_with_features(&HashSet::from([DfsFeatures::HA]))
test_with_features(HashSet::from([DfsFeatures::HA]))
.await
.unwrap();
}
Expand All @@ -71,7 +72,7 @@ mod test {
#[serial]
#[cfg(feature = "kerberos")]
async fn test_security_privacy_ha() {
test_with_features(&HashSet::from([
test_with_features(HashSet::from([
DfsFeatures::SECURITY,
DfsFeatures::PRIVACY,
DfsFeatures::HA,
Expand All @@ -84,7 +85,7 @@ mod test {
#[serial]
#[cfg(feature = "token")]
async fn test_security_token_ha() {
test_with_features(&HashSet::from([
test_with_features(HashSet::from([
DfsFeatures::SECURITY,
DfsFeatures::TOKEN,
DfsFeatures::HA,
Expand All @@ -96,15 +97,17 @@ mod test {
#[tokio::test]
#[serial]
async fn test_rbf() {
test_with_features(&HashSet::from([DfsFeatures::RBF]))
test_with_features(HashSet::from([DfsFeatures::RBF]))
.await
.unwrap();
}

pub async fn test_with_features(features: &HashSet<DfsFeatures>) -> Result<()> {
pub async fn test_with_features(mut features: HashSet<DfsFeatures>) -> Result<()> {
let _ = env_logger::builder().is_test(true).try_init();

let _dfs = setup(features);
features.insert(DfsFeatures::TESTFILE);

let _dfs = MiniDfs::with_features(&features);
let client = Client::default();

test_file_info(&client).await?;
Expand Down
9 changes: 6 additions & 3 deletions crates/hdfs-native/tests/test_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ mod common;

#[cfg(feature = "integration-test")]
mod test {
use crate::common::{setup, TEST_FILE_INTS};
use crate::common::TEST_FILE_INTS;
use bytes::Buf;
use hdfs_native::{minidfs::DfsFeatures, Client, Result};
use hdfs_native::{
minidfs::{DfsFeatures, MiniDfs},
Client, Result,
};
use serial_test::serial;
use std::collections::HashSet;

Expand All @@ -14,7 +17,7 @@ mod test {
async fn test_read() -> Result<()> {
let _ = env_logger::builder().is_test(true).try_init();

let _dfs = setup(&HashSet::from([DfsFeatures::HA]));
let _dfs = MiniDfs::with_features(&HashSet::from([DfsFeatures::HA, DfsFeatures::TESTFILE]));
let client = Client::default();

// Read the whole file
Expand Down
9 changes: 6 additions & 3 deletions crates/hdfs-native/tests/test_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ mod common;

#[cfg(feature = "integration-test")]
mod test {
use crate::common::{assert_bufs_equal, setup};
use crate::common::assert_bufs_equal;
use bytes::{BufMut, BytesMut};
use hdfs_native::{minidfs::DfsFeatures, Client, Result, WriteOptions};
use hdfs_native::{
minidfs::{DfsFeatures, MiniDfs},
Client, Result, WriteOptions,
};
use serial_test::serial;
use std::collections::HashSet;

Expand All @@ -14,7 +17,7 @@ mod test {
async fn test_write() {
let _ = env_logger::builder().is_test(true).try_init();

let _dfs = setup(&HashSet::from([DfsFeatures::HA]));
let _dfs = MiniDfs::with_features(&HashSet::from([DfsFeatures::HA]));
let client = Client::default();

test_create(&client).await.unwrap();
Expand Down
Loading