Skip to content

Commit

Permalink
Bump object store version to 0.8 (#64)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kimahriman authored Jan 8, 2024
1 parent 6654633 commit d082faa
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 62 deletions.
51 changes: 12 additions & 39 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/hdfs-native-object-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ bytes = { workspace = true }
chrono = { version = "0.4" }
futures = { workspace = true }
hdfs-native = { path = "../hdfs-native", version = "0.6" }
object_store = { version = "0.7", features = ["cloud"] }
object_store = { version = "0.8", features = ["cloud"] }
thiserror = "1"
tokio = { workspace = true, features = ["rt", "net", "io-util", "macros", "sync", "time"] }

Expand Down
34 changes: 21 additions & 13 deletions crates/hdfs-native-object-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use object_store::{
multipart::{PartId, PutPart, WriteMultiPart},
path::Path,
GetOptions, GetResult, GetResultPayload, ListResult, MultipartId, ObjectMeta, ObjectStore,
Result,
PutMode, PutOptions, PutResult, Result,
};
use tokio::io::AsyncWrite;

Expand Down Expand Up @@ -101,7 +101,17 @@ impl ObjectStore for HdfsObjectStore {
///
/// To make the operation atomic, we write to a temporary file ".{filename}.tmp" and rename
/// on a successful write.
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result<PutResult> {
let overwrite = match opts.mode {
PutMode::Create => false,
PutMode::Overwrite => true,
PutMode::Update(_) => {
return Err(object_store::Error::NotSupported {
source: "Update mode not supported".to_string().into(),
})
}
};

let final_file_path = make_absolute_file(location);
let path_buf = PathBuf::from(&final_file_path);

Expand All @@ -121,13 +131,6 @@ impl ObjectStore for HdfsObjectStore {
.to_object_store_err()?
.to_string();

// First we need to check if the tmp file exists so we know whether to overwrite
let overwrite = match self.client.get_file_info(&tmp_filename).await {
Ok(_) => true,
Err(HdfsError::FileNotFound(_)) => false,
Err(e) => Err(e).to_object_store_err()?,
};

let write_options = WriteOptions {
overwrite,
..Default::default()
Expand All @@ -142,11 +145,14 @@ impl ObjectStore for HdfsObjectStore {
writer.close().await.to_object_store_err()?;

self.client
.rename(&tmp_filename, &final_file_path, true)
.rename(&tmp_filename, &final_file_path, overwrite)
.await
.to_object_store_err()?;

Ok(())
Ok(PutResult {
e_tag: None,
version: None,
})
}

/// Uses the [PutPart] trait to implement an asynchronous writer. We can't actually upload
Expand Down Expand Up @@ -263,6 +269,7 @@ impl ObjectStore for HdfsObjectStore {
),
size: status.length,
e_tag: None,
version: None,
})
}

Expand Down Expand Up @@ -290,7 +297,7 @@ impl ObjectStore for HdfsObjectStore {
/// `foo/bar_baz/x`.
///
/// Note: the order of returned [`ObjectMeta`] is not guaranteed
async fn list(&self, prefix: Option<&Path>) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
let status_stream = self
.client
.list_status_iter(
Expand All @@ -309,7 +316,7 @@ impl ObjectStore for HdfsObjectStore {
})
.map(|res| res.map_or_else(|e| Err(e).to_object_store_err(), |s| get_object_meta(&s)));

Ok(Box::pin(status_stream))
Box::pin(status_stream)
}

/// List objects with the given prefix and an implementation specific
Expand Down Expand Up @@ -513,5 +520,6 @@ fn get_object_meta(status: &FileStatus) -> Result<ObjectMeta> {
),
size: status.length,
e_tag: None,
version: None,
})
}
28 changes: 20 additions & 8 deletions crates/hdfs-native-object-store/tests/test_object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod test {
use bytes::{Buf, BufMut, Bytes, BytesMut};
use hdfs_native::{minidfs::DfsFeatures, Client};
use hdfs_native_object_store::{HdfsErrorConvert, HdfsObjectStore};
use object_store::{PutMode, PutOptions};
use serial_test::serial;
use std::collections::HashSet;

Expand Down Expand Up @@ -46,18 +47,14 @@ mod test {
use object_store::{path::Path, ObjectMeta, ObjectStore};

let list: Vec<object_store::Result<ObjectMeta>> =
store.list(Some(&Path::from("/"))).await?.collect().await;
store.list(Some(&Path::from("/"))).collect().await;

assert_eq!(list.len(), 1);
assert_eq!(list[0].as_ref().unwrap().location, Path::from("/testfile"));

// Listing of a prefix that doesn't exist should return an empty result, not an error
assert_eq!(
store
.list(Some(&Path::from("/doesnt/exist")))
.await?
.count()
.await,
store.list(Some(&Path::from("/doesnt/exist"))).count().await,
0
);

Expand Down Expand Up @@ -114,12 +111,27 @@ mod test {
store.put(&Path::from("/newfile"), Bytes::new()).await?;
store.head(&Path::from("/newfile")).await?;

// PutMode = Create should fail for existing file
match store
.put_opts(
&Path::from("/newfile"),
Bytes::new(),
PutOptions {
mode: PutMode::Create,
..Default::default()
},
)
.await
{
Err(object_store::Error::AlreadyExists { .. }) => (),
Err(e) => panic!("Wrong error was thrown for put without overewrite: {:?}", e),
Ok(_) => panic!("No error was thrown for put without overwrite for existing file"),
}

// Check a small files, a file that is exactly one block, and a file slightly bigger than a block
for size_to_check in [16i32, 128 * 1024 * 1024, 130 * 1024 * 1024] {
let ints_to_write = size_to_check / 4;

// let mut writer = client.create("/newfile", write_options.clone()).await?;

let mut data = BytesMut::with_capacity(size_to_check as usize);
for i in 0..ints_to_write {
data.put_i32(i);
Expand Down
9 changes: 8 additions & 1 deletion crates/hdfs-native/src/hdfs/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ impl NameServiceProxy {
// RPCError indicates the call was successfully attempted but had an error, so should be returned immediately
Err(HdfsError::RPCError(exception, msg)) if !Self::is_retriable(&exception) => {
warn!("{}: {}", exception, msg);
return Err(HdfsError::RPCError(exception, msg));
return Err(Self::convert_rpc_error(exception, msg));
}
Err(_) if attempts >= self.proxy_connections.len() - 1 => return result,
Err(e) => {
Expand All @@ -167,4 +167,11 @@ impl NameServiceProxy {
attempts += 1;
}
}

fn convert_rpc_error(exception: String, msg: String) -> HdfsError {
match exception.as_ref() {
"org.apache.hadoop.fs.FileAlreadyExistsException" => HdfsError::AlreadyExists(msg),
_ => HdfsError::RPCError(exception, msg),
}
}
}

0 comments on commit d082faa

Please sign in to comment.