From 989dedbfd11bebd110a9d9ba5069eae42bd3b0a3 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 6 Dec 2023 20:08:13 +0800 Subject: [PATCH] fix(services/dropbox): Workaround for dropbox limitations for create_folder (#3719) try to fix create dir Signed-off-by: Xuanwo --- core/src/services/dropbox/backend.rs | 174 ++++------------- core/src/services/dropbox/core.rs | 279 +++++++++++++++++---------- 2 files changed, 216 insertions(+), 237 deletions(-) diff --git a/core/src/services/dropbox/backend.rs b/core/src/services/dropbox/backend.rs index b93d2065db32..652b89b4869b 100644 --- a/core/src/services/dropbox/backend.rs +++ b/core/src/services/dropbox/backend.rs @@ -17,29 +17,17 @@ use std::fmt::Debug; use std::sync::Arc; -use std::time::Duration; use async_trait::async_trait; -use backon::ExponentialBuilder; use backon::Retryable; use http::StatusCode; -use once_cell::sync::Lazy; -use serde::Deserialize; -use super::core::DropboxCore; -use super::error::parse_error; +use super::core::*; +use super::error::*; use super::writer::DropboxWriter; use crate::raw::*; -use crate::services::dropbox::error::DropboxErrorResponse; use crate::*; -static BACKOFF: Lazy = Lazy::new(|| { - ExponentialBuilder::default() - .with_max_delay(Duration::from_secs(10)) - .with_max_times(10) - .with_jitter() -}); - #[derive(Clone, Debug)] pub struct DropboxBackend { pub core: Arc, @@ -96,18 +84,17 @@ impl Accessor for DropboxBackend { } } - let resp = self.core.dropbox_create_folder(path).await?; - let status = resp.status(); - match status { - StatusCode::OK => Ok(RpCreateDir::default()), - _ => { - let err = parse_error(resp).await?; - match err.kind() { - ErrorKind::AlreadyExists => Ok(RpCreateDir::default()), - _ => Err(err), - } - } - } + // Dropbox has very, very, very strong limitation on the create_folder requests. + // + // Let's try our best to make sure it won't failed for rate limited issues. + let res = { || self.core.dropbox_create_folder(path) } + .retry(&*BACKOFF) + .when(|e| e.is_temporary()) + .await + // Set this error to permanent to avoid retrying. + .map_err(|e| e.set_permanent())?; + + Ok(res) } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { @@ -199,115 +186,38 @@ impl Accessor for DropboxBackend { let paths = ops.into_iter().map(|(p, _)| p).collect::>(); let resp = self.core.dropbox_delete_batch(paths).await?; + if resp.status() != StatusCode::OK { + return Err(parse_error(resp).await?); + } - let status = resp.status(); - - match status { - StatusCode::OK => { - let (_parts, body) = resp.into_parts(); - let bs = body.bytes().await?; - let decoded_response = serde_json::from_slice::(&bs) - .map_err(new_json_deserialize_error)?; - - match decoded_response.tag.as_str() { - "complete" => { - let entries = decoded_response.entries.unwrap_or_default(); - let results = self.core.handle_batch_delete_complete_result(entries); - Ok(RpBatch::new(results)) - } - "async_job_id" => { - let job_id = decoded_response - .async_job_id - .expect("async_job_id should be present"); - let res = { || self.core.dropbox_delete_batch_check(job_id.clone()) } - .retry(&*BACKOFF) - .when(|e| e.is_temporary()) - .await?; + let bs = resp.into_body().bytes().await?; + let decoded_response = serde_json::from_slice::(&bs) + .map_err(new_json_deserialize_error)?; - Ok(res) - } - _ => Err(Error::new( - ErrorKind::Unexpected, - &format!( - "delete batch failed with unexpected tag {}", - decoded_response.tag - ), - )), - } + match decoded_response.tag.as_str() { + "complete" => { + let entries = decoded_response.entries.unwrap_or_default(); + let results = self.core.handle_batch_delete_complete_result(entries); + Ok(RpBatch::new(results)) } - _ => Err(parse_error(resp).await?), + "async_job_id" => { + let job_id = decoded_response + .async_job_id + .expect("async_job_id should be present"); + let res = { || self.core.dropbox_delete_batch_check(job_id.clone()) } + .retry(&*BACKOFF) + .when(|e| e.is_temporary()) + .await?; + + Ok(res) + } + _ => Err(Error::new( + ErrorKind::Unexpected, + &format!( + "delete batch failed with unexpected tag {}", + decoded_response.tag + ), + )), } } } - -#[derive(Default, Debug, Deserialize)] -#[serde(default)] -pub struct DropboxMetadataResponse { - #[serde(rename(deserialize = ".tag"))] - pub tag: String, - pub client_modified: String, - pub content_hash: Option, - pub file_lock_info: Option, - pub has_explicit_shared_members: Option, - pub id: String, - pub is_downloadable: Option, - pub name: String, - pub path_display: String, - pub path_lower: String, - pub property_groups: Option>, - pub rev: Option, - pub server_modified: Option, - pub sharing_info: Option, - pub size: Option, -} - -#[derive(Default, Debug, Deserialize)] -#[serde(default)] -pub struct DropboxMetadataFileLockInfo { - pub created: Option, - pub is_lockholder: bool, - pub lockholder_name: Option, -} - -#[derive(Default, Debug, Deserialize)] -#[serde(default)] -pub struct DropboxMetadataPropertyGroup { - pub fields: Vec, - pub template_id: String, -} - -#[derive(Default, Debug, Deserialize)] -#[serde(default)] -pub struct DropboxMetadataPropertyGroupField { - pub name: String, - pub value: String, -} - -#[derive(Default, Debug, Deserialize)] -#[serde(default)] -pub struct DropboxMetadataSharingInfo { - pub modified_by: Option, - pub parent_shared_folder_id: Option, - pub read_only: Option, - pub shared_folder_id: Option, - pub traverse_only: Option, - pub no_access: Option, -} - -#[derive(Default, Debug, Deserialize)] -#[serde(default)] -pub struct DropboxDeleteBatchResponse { - #[serde(rename(deserialize = ".tag"))] - pub tag: String, - pub async_job_id: Option, - pub entries: Option>, -} - -#[derive(Default, Debug, Deserialize)] -#[serde(default)] -pub struct DropboxDeleteBatchResponseEntry { - #[serde(rename(deserialize = ".tag"))] - pub tag: String, - pub metadata: Option, - pub error: Option, -} diff --git a/core/src/services/dropbox/core.rs b/core/src/services/dropbox/core.rs index 8857475735b1..dbc53f69de81 100644 --- a/core/src/services/dropbox/core.rs +++ b/core/src/services/dropbox/core.rs @@ -15,10 +15,12 @@ // specific language governing permissions and limitations // under the License. +use backon::ExponentialBuilder; use std::default::Default; use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; +use std::time::Duration; use bytes::Bytes; use chrono::DateTime; @@ -29,17 +31,22 @@ use http::header::CONTENT_TYPE; use http::Request; use http::Response; use http::StatusCode; +use once_cell::sync::Lazy; use serde::Deserialize; use serde::Serialize; use tokio::sync::Mutex; +use super::error::{parse_error, DropboxErrorResponse}; use crate::raw::*; -use crate::services::dropbox::backend::DropboxDeleteBatchResponse; -use crate::services::dropbox::backend::DropboxDeleteBatchResponseEntry; -use crate::services::dropbox::error::parse_error; -use crate::types::Error; -use crate::types::ErrorKind; -use crate::types::Result; +use crate::*; + +/// BACKOFF is the backoff used inside dropbox to make sure dropbox async task succeed. +pub static BACKOFF: Lazy = Lazy::new(|| { + ExponentialBuilder::default() + .with_max_delay(Duration::from_secs(10)) + .with_max_times(10) + .with_jitter() +}); pub struct DropboxCore { pub root: String, @@ -65,6 +72,54 @@ impl DropboxCore { path.trim_end_matches('/').to_string() } + pub async fn sign(&self, req: &mut Request) -> Result<()> { + let mut signer = self.signer.lock().await; + + // Access token is valid, use it directly. + if !signer.access_token.is_empty() && signer.expires_in > Utc::now() { + let value = format!("Bearer {}", signer.access_token) + .parse() + .expect("token must be valid header value"); + req.headers_mut().insert(header::AUTHORIZATION, value); + return Ok(()); + } + + // Refresh invalid token. + let url = "https://api.dropboxapi.com/oauth2/token".to_string(); + + let content = format!( + "grant_type=refresh_token&refresh_token={}&client_id={}&client_secret={}", + signer.refresh_token, signer.client_id, signer.client_secret + ); + let bs = Bytes::from(content); + + let request = Request::post(&url) + .header(CONTENT_TYPE, "application/x-www-form-urlencoded") + .header(CONTENT_LENGTH, bs.len()) + .body(AsyncBody::Bytes(bs)) + .map_err(new_request_build_error)?; + + let resp = self.client.send(request).await?; + let body = resp.into_body().bytes().await?; + + let token: DropboxTokenResponse = + serde_json::from_slice(&body).map_err(new_json_deserialize_error)?; + + // Update signer after token refreshed. + signer.access_token = token.access_token.clone(); + + // Refresh it 2 minutes earlier. + signer.expires_in = Utc::now() + chrono::Duration::seconds(token.expires_in as i64) + - chrono::Duration::seconds(120); + + let value = format!("Bearer {}", token.access_token) + .parse() + .expect("token must be valid header value"); + req.headers_mut().insert(header::AUTHORIZATION, value); + + Ok(()) + } + pub async fn dropbox_get( &self, path: &str, @@ -171,14 +226,11 @@ impl DropboxCore { self.client.send(request).await } - pub async fn dropbox_delete_batch_check_request( - &self, - async_job_id: String, - ) -> Result> { + pub async fn dropbox_delete_batch_check(&self, async_job_id: String) -> Result { let url = "https://api.dropboxapi.com/2/files/delete_batch/check".to_string(); let args = DropboxDeleteBatchCheckArgs { async_job_id }; - let bs = Bytes::from(serde_json::to_string(&args).map_err(new_json_serialize_error)?); + let bs = Bytes::from(serde_json::to_vec(&args).map_err(new_json_serialize_error)?); let mut request = Request::post(&url) .header(CONTENT_TYPE, "application/json") @@ -187,45 +239,38 @@ impl DropboxCore { .map_err(new_request_build_error)?; self.sign(&mut request).await?; - self.client.send(request).await - } - pub async fn dropbox_delete_batch_check(&self, job_id: String) -> Result { - let resp = self - .dropbox_delete_batch_check_request(job_id.clone()) - .await?; - let status = resp.status(); - match status { - StatusCode::OK => { - let bs = resp.into_body().bytes().await?; + let resp = self.client.send(request).await?; + if resp.status() != StatusCode::OK { + return Err(parse_error(resp).await?); + } - let decoded_response = serde_json::from_slice::(&bs) - .map_err(new_json_deserialize_error)?; - match decoded_response.tag.as_str() { - "in_progress" => Err(Error::new( - ErrorKind::Unexpected, - "delete batch job still in progress", - ) - .set_temporary()), - "complete" => { - let entries = decoded_response.entries.unwrap_or_default(); - let results = self.handle_batch_delete_complete_result(entries); - Ok(RpBatch::new(results)) - } - _ => Err(Error::new( - ErrorKind::Unexpected, - &format!( - "delete batch check failed with unexpected tag {}", - decoded_response.tag - ), - )), - } + let bs = resp.into_body().bytes().await?; + + let decoded_response = serde_json::from_slice::(&bs) + .map_err(new_json_deserialize_error)?; + match decoded_response.tag.as_str() { + "in_progress" => Err(Error::new( + ErrorKind::Unexpected, + "delete batch job still in progress", + ) + .set_temporary()), + "complete" => { + let entries = decoded_response.entries.unwrap_or_default(); + let results = self.handle_batch_delete_complete_result(entries); + Ok(RpBatch::new(results)) } - _ => Err(parse_error(resp).await?), + _ => Err(Error::new( + ErrorKind::Unexpected, + &format!( + "delete batch check failed with unexpected tag {}", + decoded_response.tag + ), + )), } } - pub async fn dropbox_create_folder(&self, path: &str) -> Result> { + pub async fn dropbox_create_folder(&self, path: &str) -> Result { let url = "https://api.dropboxapi.com/2/files/create_folder_v2".to_string(); let args = DropboxCreateFolderArgs { path: self.build_path(path), @@ -240,7 +285,18 @@ impl DropboxCore { .map_err(new_request_build_error)?; self.sign(&mut request).await?; - self.client.send(request).await + let resp = self.client.send(request).await?; + let status = resp.status(); + match status { + StatusCode::OK => Ok(RpCreateDir::default()), + _ => { + let err = parse_error(resp).await?; + match err.kind() { + ErrorKind::AlreadyExists => Ok(RpCreateDir::default()), + _ => Err(err), + } + } + } } pub async fn dropbox_get_metadata(&self, path: &str) -> Result> { @@ -263,54 +319,6 @@ impl DropboxCore { self.client.send(request).await } - pub async fn sign(&self, req: &mut Request) -> Result<()> { - let mut signer = self.signer.lock().await; - - // Access token is valid, use it directly. - if !signer.access_token.is_empty() && signer.expires_in > Utc::now() { - let value = format!("Bearer {}", signer.access_token) - .parse() - .expect("token must be valid header value"); - req.headers_mut().insert(header::AUTHORIZATION, value); - return Ok(()); - } - - // Refresh invalid token. - let url = "https://api.dropboxapi.com/oauth2/token".to_string(); - - let content = format!( - "grant_type=refresh_token&refresh_token={}&client_id={}&client_secret={}", - signer.refresh_token, signer.client_id, signer.client_secret - ); - let bs = Bytes::from(content); - - let request = Request::post(&url) - .header(CONTENT_TYPE, "application/x-www-form-urlencoded") - .header(CONTENT_LENGTH, bs.len()) - .body(AsyncBody::Bytes(bs)) - .map_err(new_request_build_error)?; - - let resp = self.client.send(request).await?; - let body = resp.into_body().bytes().await?; - - let token: DropboxTokenResponse = - serde_json::from_slice(&body).map_err(new_json_deserialize_error)?; - - // Update signer after token refreshed. - signer.access_token = token.access_token.clone(); - - // Refresh it 2 minutes earlier. - signer.expires_in = Utc::now() + chrono::Duration::seconds(token.expires_in as i64) - - chrono::Duration::seconds(120); - - let value = format!("Bearer {}", token.access_token) - .parse() - .expect("token must be valid header value"); - req.headers_mut().insert(header::AUTHORIZATION, value); - - Ok(()) - } - pub fn handle_batch_delete_complete_result( &self, entries: Vec, @@ -423,7 +431,7 @@ struct DropboxCreateFolderArgs { path: String, } -#[derive(Clone, Debug, Deserialize, Serialize)] +#[derive(Default, Clone, Debug, Deserialize, Serialize)] struct DropboxMetadataArgs { include_deleted: bool, include_has_explicit_shared_members: bool, @@ -431,19 +439,80 @@ struct DropboxMetadataArgs { path: String, } -impl Default for DropboxMetadataArgs { - fn default() -> Self { - DropboxMetadataArgs { - include_deleted: false, - include_has_explicit_shared_members: false, - include_media_info: false, - path: "".to_string(), - } - } -} - #[derive(Clone, Deserialize)] struct DropboxTokenResponse { access_token: String, expires_in: usize, } + +#[derive(Default, Debug, Deserialize)] +#[serde(default)] +pub struct DropboxMetadataResponse { + #[serde(rename(deserialize = ".tag"))] + pub tag: String, + pub client_modified: String, + pub content_hash: Option, + pub file_lock_info: Option, + pub has_explicit_shared_members: Option, + pub id: String, + pub is_downloadable: Option, + pub name: String, + pub path_display: String, + pub path_lower: String, + pub property_groups: Option>, + pub rev: Option, + pub server_modified: Option, + pub sharing_info: Option, + pub size: Option, +} + +#[derive(Default, Debug, Deserialize)] +#[serde(default)] +pub struct DropboxMetadataFileLockInfo { + pub created: Option, + pub is_lockholder: bool, + pub lockholder_name: Option, +} + +#[derive(Default, Debug, Deserialize)] +#[serde(default)] +pub struct DropboxMetadataPropertyGroup { + pub fields: Vec, + pub template_id: String, +} + +#[derive(Default, Debug, Deserialize)] +#[serde(default)] +pub struct DropboxMetadataPropertyGroupField { + pub name: String, + pub value: String, +} + +#[derive(Default, Debug, Deserialize)] +#[serde(default)] +pub struct DropboxMetadataSharingInfo { + pub modified_by: Option, + pub parent_shared_folder_id: Option, + pub read_only: Option, + pub shared_folder_id: Option, + pub traverse_only: Option, + pub no_access: Option, +} + +#[derive(Default, Debug, Deserialize)] +#[serde(default)] +pub struct DropboxDeleteBatchResponse { + #[serde(rename(deserialize = ".tag"))] + pub tag: String, + pub async_job_id: Option, + pub entries: Option>, +} + +#[derive(Default, Debug, Deserialize)] +#[serde(default)] +pub struct DropboxDeleteBatchResponseEntry { + #[serde(rename(deserialize = ".tag"))] + pub tag: String, + pub metadata: Option, + pub error: Option, +}