Skip to content

Commit

Permalink
chore: reduce duplicated code for hdfs
Browse files Browse the repository at this point in the history
  • Loading branch information
yjhmelody committed Aug 24, 2024
1 parent c152360 commit 3591a82
Showing 1 changed file with 10 additions and 110 deletions.
120 changes: 10 additions & 110 deletions core/src/services/hdfs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,31 +237,12 @@ impl Access for HdfsBackend {
am.into()
}

async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
let p = build_rooted_abs_path(&self.root, path);

self.client.create_dir(&p).map_err(new_std_io_error)?;

Ok(RpCreateDir::default())
async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
self.blocking_create_dir(path, args)
}

async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
let p = build_rooted_abs_path(&self.root, path);

let meta = self.client.metadata(&p).map_err(new_std_io_error)?;

let mode = if meta.is_dir() {
EntryMode::DIR
} else if meta.is_file() {
EntryMode::FILE
} else {
EntryMode::Unknown
};
let mut m = Metadata::new(mode);
m.set_content_length(meta.len());
m.set_last_modified(meta.modified().into());

Ok(RpStat::new(m))
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
self.blocking_stat(path, args)
}

async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
Expand Down Expand Up @@ -345,97 +326,16 @@ impl Access for HdfsBackend {
))
}

async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
let p = build_rooted_abs_path(&self.root, path);

let meta = self.client.metadata(&p);

if let Err(err) = meta {
return if err.kind() == io::ErrorKind::NotFound {
Ok(RpDelete::default())
} else {
Err(new_std_io_error(err))
};
}

// Safety: Err branch has been checked, it's OK to unwrap.
let meta = meta.ok().unwrap();

let result = if meta.is_dir() {
self.client.remove_dir(&p)
} else {
self.client.remove_file(&p)
};

result.map_err(new_std_io_error)?;

Ok(RpDelete::default())
async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
self.blocking_delete(path, args)
}

async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
let p = build_rooted_abs_path(&self.root, path);

let f = match self.client.read_dir(&p) {
Ok(f) => f,
Err(e) => {
return if e.kind() == io::ErrorKind::NotFound {
Ok((RpList::default(), None))
} else {
Err(new_std_io_error(e))
}
}
};

let rd = HdfsLister::new(&self.root, f);

Ok((RpList::default(), Some(rd)))
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
self.blocking_list(path, args)
}

async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
let from_path = build_rooted_abs_path(&self.root, from);
self.client.metadata(&from_path).map_err(new_std_io_error)?;

let to_path = build_rooted_abs_path(&self.root, to);
let result = self.client.metadata(&to_path);
match result {
Err(err) => {
// Early return if other error happened.
if err.kind() != io::ErrorKind::NotFound {
return Err(new_std_io_error(err));
}

let parent = PathBuf::from(&to_path)
.parent()
.ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
"path should have parent but not, it must be malformed",
)
.with_context("input", &to_path)
})?
.to_path_buf();

self.client
.create_dir(&parent.to_string_lossy())
.map_err(new_std_io_error)?;
}
Ok(metadata) => {
if metadata.is_file() {
self.client
.remove_file(&to_path)
.map_err(new_std_io_error)?;
} else {
return Err(Error::new(ErrorKind::IsADirectory, "path should be a file")
.with_context("input", &to_path));
}
}
}

self.client
.rename_file(&from_path, &to_path)
.map_err(new_std_io_error)?;

Ok(RpRename::new())
async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
self.blocking_rename(from, to, args)
}

fn blocking_create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
Expand Down

0 comments on commit 3591a82

Please sign in to comment.