Skip to content

Commit

Permalink
feat: support empty dir
Browse files Browse the repository at this point in the history
  • Loading branch information
RWDai committed Aug 21, 2024
1 parent e445cc6 commit db3f0f1
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 64 deletions.
6 changes: 5 additions & 1 deletion src-tauri/config/conf-default.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
[fw.app]
id = "file-processor"
name = "file-processor"
version = "0.1.0"
version = "0.1.0"

[cs]
[csm.processor]
concurrent = 4
1 change: 1 addition & 0 deletions src-tauri/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tardis::config::config_dto::TardisConfig;
#[cfg(debug_assertions)]
use tardis::TardisFuns;
use tardis::{basic::result::TardisResult, tokio};
mod processor_config;
mod tauri;
mod uploader;

Expand Down
16 changes: 16 additions & 0 deletions src-tauri/src/processor_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use serde::{Deserialize, Serialize};
use std::fmt::Debug;

pub const DOMAIN_CODE: &str = "processor";

#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct ProcessorConfig {
pub concurrent: usize,
}

impl Default for ProcessorConfig {
fn default() -> Self {
ProcessorConfig { concurrent: 5 }
}
}
162 changes: 99 additions & 63 deletions src-tauri/src/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use tardis::{
};
use tauri::{async_runtime::TokioJoinHandle, Emitter as _, Window};

use crate::FileUploadProcessParams;
use crate::{processor_config::{ProcessorConfig, DOMAIN_CODE}, FileUploadProcessParams};

#[derive(Serialize, Deserialize, Clone)]
pub struct UploadProgressResp {
Expand Down Expand Up @@ -156,35 +156,48 @@ pub async fn upload_files(
let base_path = origin_path.parent().unwrap_or(Path::new(""));
let paths = async_get_files(&file_uri).await?;
for path in paths {
let mime_type = mime_infer::from_path(path.clone()).first_or_text_plain();
let file = File::open(path.clone())
.await
.map_err(|e| TardisError::io_error(&format!("io error:{e}"), "error"))?;
let relative_path = path
.strip_prefix(&base_path)
.map_err(|e| TardisError::io_error(&format!("io error:{e}"), "error"))?;
let size;
#[cfg(any(target_os = "macos", target_os = "linux"))]
{
size = file.metadata().await?.size();
}
#[cfg(target_os = "windows")]
{
size = file.metadata().await?.file_size();
}
let info = UploadFileInfo {
name: path
.file_name()
.and_then(|s| s.to_str())
.unwrap_or_default()
.to_string(),
relative_path: relative_path.to_path_buf(),
size,
mime_type: mime_type.to_string(),
id: random::<u64>().to_string(),
};
if path.is_file() {
let mime_type = mime_infer::from_path(path.clone()).first_or_text_plain();
let file = File::open(path.clone())
.await
.map_err(|e| TardisError::io_error(&format!("io error:{e}"), "error"))?;
let size;
#[cfg(any(target_os = "macos", target_os = "linux"))]
{
size = file.metadata().await?.size();
}
#[cfg(target_os = "windows")]
{
size = file.metadata().await?.file_size();
}
let info = UploadFileInfo {
name: path
.file_name()
.and_then(|s| s.to_str())
.unwrap_or_default()
.to_string(),
relative_path: relative_path.to_path_buf(),
size,
mime_type: mime_type.to_string(),
id: random::<u64>().to_string(),
};

files.push((file, info));
files.push((Some(file), info));
} else {
files.push((
None,
UploadFileInfo {
name: "".to_string(),
relative_path: relative_path.to_path_buf(),
size: 0,
mime_type: "dir".to_string(),
id: random::<u64>().to_string(),
},
));
}
}
}
total_file_numbers = files.len();
Expand All @@ -194,7 +207,6 @@ pub async fn upload_files(
.await
.into_iter()
.sum();

let back_task;
if param.title.eq("请按使用文档调用(以下为示例)") {
//mock
Expand All @@ -219,7 +231,7 @@ pub async fn upload_files(
}

async fn mock_backend_task(
files: Vec<(File, UploadFileInfo)>,
files: Vec<(Option<File>, UploadFileInfo)>,
total_file_numbers: usize,
total_file_size: u64,
window: Window,
Expand Down Expand Up @@ -287,17 +299,17 @@ async fn mock_backend_task(
current_files: vec![],
fail_files: vec![],
success_files: if last_file.is_some() {
vec![]
} else {
vec![last_file.unwrap()]
} else {
vec![]
},
},
)
.unwrap();
}

async fn backend_task(
files: Vec<(File, UploadFileInfo)>,
files: Vec<(Option<File>, UploadFileInfo)>,
total_file_numbers: usize,
total_file_size: u64,
window: Window,
Expand All @@ -309,10 +321,10 @@ async fn backend_task(
// first boolean means end(true)/start
// seconde boolean is success(true)/fail
let (tx, mut rx) = mpsc::channel(50);
let max_concurrent_tasks = 2;
let max_concurrent_tasks = TardisFuns::cs_config::<ProcessorConfig>(DOMAIN_CODE).concurrent;
let semaphore = Arc::new(Semaphore::new(max_concurrent_tasks));

for (mut file, info) in files {
for (file, info) in files {
tardis::tokio::task::yield_now().await;
let n_tx = tx.clone();
let config = config.clone();
Expand All @@ -322,36 +334,48 @@ async fn backend_task(
let permit = semaphore.acquire_owned().await.unwrap();
let _ = n_tx.send(((false, false), info.clone())).await;
let body = info.clone().to_body(&config).unwrap();
info!("file====body:{}", body);
if let Ok(upload_metadata_result) = TardisFuns::web_client()
.post_obj_to_str(
config.upload_metadata_url,
&body,
config.upload_fixed_headers.unwrap_or_default(),
)
.await
{
info!("upload_metadata_result=====:{:?}", upload_metadata_result);
if upload_metadata_result.code == 200 {
if let Some(upload_url) = upload_metadata_result.body {
info!("upload_url=====:{:?}", upload_url);
if reqwest::Url::parse(&upload_url).is_err() {
let _ = n_tx.send(((true, false), info.clone())).await;
return;
}
info!("file.body:{}", body);
if let Some(mut file) = file {
if let Ok(upload_metadata_result) = TardisFuns::web_client()
.post_obj_to_str(
config.upload_metadata_url,
&body,
config.upload_fixed_headers.unwrap_or_default(),
)
.await
{
info!("upload_metadata_result:{:?}", upload_metadata_result);
if upload_metadata_result.code == 200 {
if let Some(upload_url) = upload_metadata_result.body {
info!("upload_url:{:?}", upload_url);
if reqwest::Url::parse(&upload_url).is_err() {
let _ = n_tx.send(((true, false), info.clone())).await;
return;
}

let mut content = vec![];
let _ = file.read_to_end(&mut content).await;
let client = reqwest::Client::new();
if let Ok(_) = client.put(upload_url).body(content).send().await {
let _ = n_tx.send(((true, true), info.clone())).await;
return;
let mut content = vec![];
let _ = file.read_to_end(&mut content).await;
let client = reqwest::Client::new();
if let Ok(_) = client.put(upload_url).body(content).send().await {
let _ = n_tx.send(((true, true), info.clone())).await;
return;
}
}
}
}
};
let _ = n_tx.send(((true, false), info.clone())).await;
drop(permit);
};
let _ = n_tx.send(((true, false), info.clone())).await;
drop(permit);
} else {
//empty dir
let _ = TardisFuns::web_client()
.post_obj_to_str(
config.upload_metadata_url,
&body,
config.upload_fixed_headers.unwrap_or_default(),
)
.await;
let _ = n_tx.send(((true, true), info.clone())).await;
}
});
}

Expand Down Expand Up @@ -406,8 +430,12 @@ async fn backend_task(
.unwrap();
}

async fn get_metadata_size(file: &File) -> u64 {
file.metadata().await.map(|md| md.len()).unwrap_or_default()
async fn get_metadata_size(file: &Option<File>) -> u64 {
if let Some(file) = file {
file.metadata().await.map(|md| md.len()).unwrap_or_default()
} else {
0
}
}

async fn async_get_files(file_uri: &str) -> TardisResult<Vec<PathBuf>> {
Expand All @@ -417,13 +445,21 @@ async fn async_get_files(file_uri: &str) -> TardisResult<Vec<PathBuf>> {
result.push(path);
} else {
let mut dir = read_dir(file_uri).await.expect("can't open dir");
result.push(path);
let mut push_dir = true;
while let Some(d) = dir
.next_entry()
.await
.map_err(|e| TardisError::io_error(&format!("io error:{e}"), "error"))?
{
match d.path().to_str() {
Some(path) => result.append(&mut Box::pin(async_get_files(path)).await?),
Some(path) => {
if push_dir {
result.remove(result.len() - 1);
push_dir = false
}
result.append(&mut Box::pin(async_get_files(path)).await?);
}
None => continue,
};
}
Expand Down

0 comments on commit db3f0f1

Please sign in to comment.