Skip to content

Commit

Permalink
feat(integration/object_store): implement get_opts and put_opts
Browse files Browse the repository at this point in the history
  • Loading branch information
meteorgan committed Jan 12, 2025
1 parent 02381c5 commit 3d5a5a5
Show file tree
Hide file tree
Showing 4 changed files with 520 additions and 92 deletions.
222 changes: 132 additions & 90 deletions integrations/object_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,16 @@

use std::fmt::{self, Debug, Display, Formatter};
use std::future::IntoFuture;
use std::ops::Range;
use std::io;
use std::sync::Arc;

use crate::utils::*;
use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::FutureExt;
use futures::StreamExt;
use futures::TryStreamExt;
use object_store::path::Path;
use object_store::GetResult;
use object_store::GetResultPayload;
use object_store::ListResult;
use object_store::MultipartUpload;
use object_store::ObjectMeta;
Expand All @@ -39,6 +36,8 @@ use object_store::PutOptions;
use object_store::PutPayload;
use object_store::PutResult;
use object_store::{GetOptions, UploadPart};
use object_store::{GetRange, GetResultPayload};
use object_store::{GetResult, PutMode};
use opendal::Buffer;
use opendal::Writer;
use opendal::{Operator, OperatorInfo};
Expand Down Expand Up @@ -104,6 +103,11 @@ impl OpendalStore {
inner: op,
}
}

/// Get the Operator info.
pub fn info(&self) -> &OperatorInfo {
self.info.as_ref()
}
}

impl Debug for OpendalStore {
Expand Down Expand Up @@ -138,29 +142,47 @@ impl From<Operator> for OpendalStore {

#[async_trait]
impl ObjectStore for OpendalStore {
async fn put(&self, location: &Path, bytes: PutPayload) -> object_store::Result<PutResult> {
self.inner
.write(location.as_ref(), Buffer::from_iter(bytes.into_iter()))
.into_send()
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;
Ok(PutResult {
e_tag: None,
version: None,
})
}

async fn put_opts(
&self,
_location: &Path,
_bytes: PutPayload,
_opts: PutOptions,
location: &Path,
bytes: PutPayload,
opts: PutOptions,
) -> object_store::Result<PutResult> {
Err(object_store::Error::NotSupported {
source: Box::new(opendal::Error::new(
opendal::ErrorKind::Unsupported,
"put_opts is not implemented so far",
)),
let mut future_write = self
.inner
.write_with(location.as_ref(), Buffer::from_iter(bytes.into_iter()));
let opts_mode = opts.mode.clone();
match opts.mode {
PutMode::Overwrite => {}
PutMode::Create => {
future_write = future_write.if_not_exists(true);
}
PutMode::Update(update_version) => {
let Some(etag) = update_version.e_tag else {
Err(object_store::Error::NotSupported {
source: Box::new(opendal::Error::new(
opendal::ErrorKind::Unsupported,
"etag is required for conditional put",
)),
})?
};
future_write = future_write.if_match(etag.as_str());
}
}
future_write.into_send().await.map_err(|err| {
match format_object_store_error(err, location.as_ref()) {
object_store::Error::Precondition { path, source }
if opts_mode == PutMode::Create =>
{
object_store::Error::AlreadyExists { path, source }
}
e => e,
}
})?;

Ok(PutResult {
e_tag: None,
version: None,
})
}

Expand Down Expand Up @@ -193,13 +215,32 @@ impl ObjectStore for OpendalStore {
})
}

async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
let meta = self
.inner
.stat(location.as_ref())
.into_send()
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;
async fn get_opts(
&self,
location: &Path,
options: GetOptions,
) -> object_store::Result<GetResult> {
let meta = {
let mut s = self.inner.stat_with(location.as_ref());
if let Some(version) = &options.version {
s = s.version(version.as_str())
}
if let Some(if_match) = &options.if_match {
s = s.if_match(if_match.as_str());
}
if let Some(if_none_match) = &options.if_none_match {
s = s.if_none_match(if_none_match.as_str());
}
if let Some(if_modified_since) = options.if_modified_since {
s = s.if_modified_since(if_modified_since);
}
if let Some(if_unmodified_since) = options.if_unmodified_since {
s = s.if_unmodified_since(if_unmodified_since);
}
s.into_send()
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?
};

let meta = ObjectMeta {
location: location.clone(),
Expand All @@ -209,78 +250,79 @@ impl ObjectStore for OpendalStore {
version: meta.version().map(|x| x.to_string()),
};

let r = self
.inner
.reader(location.as_ref())
.into_send()
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;
if options.head {
return Ok(GetResult {
payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())),
range: 0..0,
meta,
attributes: Default::default(),
});
}

let reader = {
let mut r = self.inner.reader_with(location.as_ref());
if let Some(version) = options.version {
r = r.version(version.as_str());
}
if let Some(if_match) = options.if_match {
r = r.if_match(if_match.as_str());
}
if let Some(if_none_match) = options.if_none_match {
r = r.if_none_match(if_none_match.as_str());
}
if let Some(if_modified_since) = options.if_modified_since {
r = r.if_modified_since(if_modified_since);
}
if let Some(if_unmodified_since) = options.if_unmodified_since {
r = r.if_unmodified_since(if_unmodified_since);
}
r.into_send()
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?
};

let stream = r
.into_bytes_stream(0..meta.size as u64)
let mut read_range = 0..meta.size;
if let Some(range) = options.range {
match range {
GetRange::Bounded(r) => {
read_range = r;
}
GetRange::Offset(r) => {
if r >= meta.size {
read_range = 0..0;
} else {
read_range = r..meta.size;
}
}
GetRange::Suffix(r) => {
if r >= meta.size {
read_range = 0..meta.size;
} else {
read_range = (meta.size - r)..meta.size;
}
}
}
}

let stream = reader
.into_bytes_stream(read_range.start as u64..read_range.end as u64)
.into_send()
.await
.map_err(|err| object_store::Error::Generic {
store: "IoError",
source: Box::new(err),
})?
.map_err(|err| format_object_store_error(err, location.as_ref()))?
.into_send()
.map_err(|err| object_store::Error::Generic {
.map_err(|err: io::Error| object_store::Error::Generic {
store: "IoError",
source: Box::new(err),
});

Ok(GetResult {
payload: GetResultPayload::Stream(Box::pin(stream)),
range: 0..meta.size,
range: read_range.start..read_range.end,
meta,
attributes: Default::default(),
})
}

async fn get_opts(
&self,
_location: &Path,
_options: GetOptions,
) -> object_store::Result<GetResult> {
Err(object_store::Error::NotSupported {
source: Box::new(opendal::Error::new(
opendal::ErrorKind::Unsupported,
"get_opts is not implemented so far",
)),
})
}

async fn get_range(&self, location: &Path, range: Range<usize>) -> object_store::Result<Bytes> {
let bs = self
.inner
.read_with(location.as_ref())
.range(range.start as u64..range.end as u64)
.into_future()
.into_send()
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;

Ok(bs.to_bytes())
}

async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
let meta = self
.inner
.stat(location.as_ref())
.into_send()
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;

Ok(ObjectMeta {
location: location.clone(),
last_modified: meta.last_modified().unwrap_or_default(),
size: meta.content_length() as usize,
e_tag: meta.etag().map(|x| x.to_string()),
version: meta.version().map(|x| x.to_string()),
})
}

async fn delete(&self, location: &Path) -> object_store::Result<()> {
self.inner
.delete(location.as_ref())
Expand Down Expand Up @@ -534,12 +576,12 @@ impl Debug for OpendalMultipartUpload {

#[cfg(test)]
mod tests {
use std::sync::Arc;

use bytes::Bytes;
use object_store::path::Path;
use object_store::{ObjectStore, WriteMultipart};
use opendal::services;
use rand::prelude::*;
use std::sync::Arc;

use super::*;

Expand Down
4 changes: 4 additions & 0 deletions integrations/object_store/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ pub fn format_object_store_error(err: opendal::Error, path: &str) -> object_stor
path: path.to_string(),
source: Box::new(err),
},
ErrorKind::ConditionNotMatch => object_store::Error::Precondition {
path: path.to_string(),
source: Box::new(err),
},
kind => object_store::Error::Generic {
store: kind.into_static(),
source: Box::new(err),
Expand Down
Loading

0 comments on commit 3d5a5a5

Please sign in to comment.