From 4ee445ffe47b8c642a25db675c9e5267d649d182 Mon Sep 17 00:00:00 2001 From: Nicolas Patry Date: Tue, 7 Jan 2025 16:45:41 +0100 Subject: [PATCH 01/15] Forcing a flush due to tokio::fs semantics. --- src/api/tokio.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/api/tokio.rs b/src/api/tokio.rs index 0677f2f..c0b0748 100644 --- a/src/api/tokio.rs +++ b/src/api/tokio.rs @@ -672,12 +672,20 @@ impl ApiRepo { file.write_all(&committed.to_le_bytes()).await?; } } - tokio::fs::OpenOptions::new() + let mut f = tokio::fs::OpenOptions::new() .write(true) .open(&filename) - .await? + .await?; + f .set_len(length as u64) .await?; + // XXX Extremely important and not obvious. + // Tokio::fs doesn't guarantee data is written at the end of `.await` + // boundaries. Even though we await the `set_len` it may not have been + // committed to disk, leading to invalid rename. + // Forcing a flush forces the data (here the truncation) to be committed to disk + f.flush().await?; + progressbar.finish().await; Ok(filename) } From 0adbb8078be65a2c721084f8b45c314d86b4056a Mon Sep 17 00:00:00 2001 From: Nicolas Patry Date: Tue, 7 Jan 2025 17:06:30 +0100 Subject: [PATCH 02/15] What? --- src/api/tokio.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/api/tokio.rs b/src/api/tokio.rs index c0b0748..5a14620 100644 --- a/src/api/tokio.rs +++ b/src/api/tokio.rs @@ -670,6 +670,7 @@ impl ApiRepo { .await?; file.seek(SeekFrom::Start(length as u64)).await?; file.write_all(&committed.to_le_bytes()).await?; + file.flush().await; } } let mut f = tokio::fs::OpenOptions::new() @@ -722,6 +723,7 @@ impl ApiRepo { .await?; file.seek(SeekFrom::Start(start as u64)).await?; file.write_all(&buf).await?; + file.flush().await?; Ok((start, stop)) } From 0064504b793d01ba8ca0eaa6bacb14628b79e922 Mon Sep 17 00:00:00 2001 From: Nicolas Patry Date: Tue, 7 Jan 2025 17:09:37 +0100 Subject: [PATCH 03/15] Ok. --- src/api/tokio.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/api/tokio.rs b/src/api/tokio.rs index 5a14620..a801e18 100644 --- a/src/api/tokio.rs +++ b/src/api/tokio.rs @@ -670,7 +670,7 @@ impl ApiRepo { .await?; file.seek(SeekFrom::Start(length as u64)).await?; file.write_all(&committed.to_le_bytes()).await?; - file.flush().await; + file.flush().await?; } } let mut f = tokio::fs::OpenOptions::new() From c9a4311624b0db43cfc798fd6828eb2949ae3fa9 Mon Sep 17 00:00:00 2001 From: Nicolas Patry Date: Tue, 7 Jan 2025 19:17:41 +0100 Subject: [PATCH 04/15] Using fs4 for locking. --- Cargo.toml | 6 +++++- examples/download.rs | 7 +++++-- src/api/sync.rs | 35 +++++++++++++++-------------------- src/api/tokio.rs | 30 ++++++++---------------------- 4 files changed, 33 insertions(+), 45 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9eadaac..fe15c50 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,12 +32,13 @@ rustls = { version = "0.23.4", optional = true } serde = { version = "1", features = ["derive"], optional = true } serde_json = { version = "1", optional = true } thiserror = { version = "2", optional = true } -tokio = { version = "1.29.1", optional = true, features = ["fs", "macros"] } +tokio = { version = "1.29.1", optional = true, features = ["fs", "macros", "signal"] } ureq = { version = "2.8.0", optional = true, features = [ "json", "socks-proxy", ] } native-tls = { version = "0.2.12", optional = true } +fs4 = { version = "0.12.0", default-features = false, optional = true } [features] default = ["default-tls", "tokio", "ureq"] @@ -59,6 +60,7 @@ tokio = [ "dep:thiserror", "dep:tokio", "tokio/rt-multi-thread", + "fs4/tokio", ] ureq = [ "dep:http", @@ -68,6 +70,8 @@ ureq = [ "dep:serde_json", "dep:thiserror", "dep:ureq", + "dep:fs4", + "fs4/sync", ] [dev-dependencies] diff --git a/examples/download.rs b/examples/download.rs index c59a663..6faa013 100644 --- a/examples/download.rs +++ b/examples/download.rs @@ -5,7 +5,7 @@ fn main() {} #[cfg(feature = "ureq")] #[cfg(not(feature = "tokio"))] fn main() { - let api = hf_hub::api::sync::Api::new().unwrap(); + let api = hf_hub::api::sync::ApiBuilder::from_env().build().unwrap(); let _filename = api .model("meta-llama/Llama-2-7b-hf".to_string()) @@ -16,7 +16,10 @@ fn main() { #[cfg(feature = "tokio")] #[tokio::main] async fn main() { - let api = hf_hub::api::tokio::Api::new().unwrap(); + let api = hf_hub::api::tokio::ApiBuilder::from_env() + .high() + .build() + .unwrap(); let _filename = api .model("meta-llama/Llama-2-7b-hf".to_string()) diff --git a/src/api/sync.rs b/src/api/sync.rs index cd2c4b4..44d9fb2 100644 --- a/src/api/sync.rs +++ b/src/api/sync.rs @@ -11,7 +11,6 @@ use std::io::Seek; use std::num::ParseIntError; use std::path::{Component, Path, PathBuf}; use std::str::FromStr; -use std::time::Duration; use thiserror::Error; use ureq::{Agent, AgentBuilder, Request}; @@ -72,35 +71,30 @@ impl HeaderAgent { #[derive(Debug)] struct Handle { - _file: std::fs::File, + file: std::fs::File, path: PathBuf, } impl Drop for Handle { fn drop(&mut self) { - std::fs::remove_file(&self.path).expect("Removing lockfile") + use fs4::fs_std::FileExt; + println!("Unlocking file {:?}", std::thread::current().id()); + self.file.unlock().unwrap(); + self.file.lock_exclusive().unwrap(); + println!("Removing file {:?}", std::thread::current().id()); + std::fs::remove_file(&self.path).ok(); + self.file.unlock().unwrap(); + println!("Final {:?}", std::thread::current().id()); } } fn lock_file(mut path: PathBuf) -> Result { + use fs4::fs_std::FileExt; path.set_extension("lock"); - let mut lock_handle = None; - for i in 0..30 { - match std::fs::File::create_new(path.clone()) { - Ok(handle) => { - lock_handle = Some(handle); - break; - } - _ => { - if i == 0 { - log::warn!("Waiting for lock {path:?}"); - } - std::thread::sleep(Duration::from_secs(1)); - } - } - } - let _file = lock_handle.ok_or_else(|| ApiError::LockAcquisition(path.clone()))?; - Ok(Handle { path, _file }) + let file = std::fs::File::create(path.clone())?; + file.lock_exclusive()?; + println!("Acquired lock {:?}", std::thread::current().id()); + Ok(Handle { path, file }) } #[derive(Debug, Error)] @@ -769,6 +763,7 @@ mod tests { use serde_json::{json, Value}; use sha2::{Digest, Sha256}; use std::io::{Seek, SeekFrom, Write}; + use std::time::Duration; struct TempDir { path: PathBuf, diff --git a/src/api/tokio.rs b/src/api/tokio.rs index a801e18..67e43db 100644 --- a/src/api/tokio.rs +++ b/src/api/tokio.rs @@ -18,7 +18,6 @@ use std::collections::BinaryHeap; use std::num::ParseIntError; use std::path::{Component, Path, PathBuf}; use std::sync::Arc; -use std::time::Duration; use thiserror::Error; use tokio::io::AsyncReadExt; use tokio::io::{AsyncSeekExt, AsyncWriteExt, SeekFrom}; @@ -75,24 +74,12 @@ impl Drop for Handle { } async fn lock_file(mut path: PathBuf) -> Result { + use fs4::tokio::AsyncFileExt; path.set_extension("lock"); - let mut lock_handle = None; - for i in 0..30 { - match tokio::fs::File::create_new(path.clone()).await { - Ok(handle) => { - lock_handle = Some(handle); - break; - } - Err(_err) => { - if i == 0 { - log::warn!("Waiting for lock {path:?}"); - } - tokio::time::sleep(Duration::from_secs(1)).await; - } - } - } - let _file = lock_handle.ok_or_else(|| ApiError::LockAcquisition(path.clone()))?; + let file = tokio::fs::File::create(path.clone()).await?; + file.lock_exclusive()?; + let _file = file; Ok(Handle { path, _file }) } @@ -677,11 +664,9 @@ impl ApiRepo { .write(true) .open(&filename) .await?; - f - .set_len(length as u64) - .await?; + f.set_len(length as u64).await?; // XXX Extremely important and not obvious. - // Tokio::fs doesn't guarantee data is written at the end of `.await` + // Tokio::fs doesn't guarantee data is written at the end of `.await` // boundaries. Even though we await the `set_len` it may not have been // committed to disk, leading to invalid rename. // Forcing a flush forces the data (here the truncation) to be committed to disk @@ -808,7 +793,7 @@ impl ApiRepo { let blob_path = cache.blob_path(&metadata.etag); std::fs::create_dir_all(blob_path.parent().unwrap())?; - let lock = lock_file(blob_path.clone()).await; + let lock = lock_file(blob_path.clone()).await?; progress.init(metadata.size, filename).await; let mut tmp_path = blob_path.clone(); tmp_path.set_extension(EXTENSION); @@ -869,6 +854,7 @@ mod tests { use serde_json::{json, Value}; use sha2::{Digest, Sha256}; use std::io::{Seek, Write}; + use std::time::Duration; struct TempDir { path: PathBuf, From f9f6a058844ccccd2782baf835d465774711d91c Mon Sep 17 00:00:00 2001 From: Nicolas Patry Date: Tue, 7 Jan 2025 20:05:09 +0100 Subject: [PATCH 05/15] Different locking mecanism. --- Cargo.toml | 7 +++---- src/api/sync.rs | 29 ++++++++++++++++------------- src/api/tokio.rs | 24 +++++++++++++++++------- 3 files changed, 36 insertions(+), 24 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index fe15c50..e5363c5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,7 +38,7 @@ ureq = { version = "2.8.0", optional = true, features = [ "socks-proxy", ] } native-tls = { version = "0.2.12", optional = true } -fs4 = { version = "0.12.0", default-features = false, optional = true } +libc = { version = "0.2", optional = true } [features] default = ["default-tls", "tokio", "ureq"] @@ -60,7 +60,7 @@ tokio = [ "dep:thiserror", "dep:tokio", "tokio/rt-multi-thread", - "fs4/tokio", + "dep:libc", ] ureq = [ "dep:http", @@ -70,8 +70,7 @@ ureq = [ "dep:serde_json", "dep:thiserror", "dep:ureq", - "dep:fs4", - "fs4/sync", + "dep:libc", ] [dev-dependencies] diff --git a/src/api/sync.rs b/src/api/sync.rs index 44d9fb2..6e480e3 100644 --- a/src/api/sync.rs +++ b/src/api/sync.rs @@ -9,6 +9,7 @@ use std::collections::HashMap; use std::io::Read; use std::io::Seek; use std::num::ParseIntError; +use std::os::fd::AsRawFd; use std::path::{Component, Path, PathBuf}; use std::str::FromStr; use thiserror::Error; @@ -72,29 +73,31 @@ impl HeaderAgent { #[derive(Debug)] struct Handle { file: std::fs::File, - path: PathBuf, } + impl Drop for Handle { fn drop(&mut self) { - use fs4::fs_std::FileExt; - println!("Unlocking file {:?}", std::thread::current().id()); - self.file.unlock().unwrap(); - self.file.lock_exclusive().unwrap(); - println!("Removing file {:?}", std::thread::current().id()); - std::fs::remove_file(&self.path).ok(); - self.file.unlock().unwrap(); - println!("Final {:?}", std::thread::current().id()); + unsafe { libc::flock(self.file.as_raw_fd(), libc::LOCK_UN) }; } } fn lock_file(mut path: PathBuf) -> Result { - use fs4::fs_std::FileExt; path.set_extension("lock"); let file = std::fs::File::create(path.clone())?; - file.lock_exclusive()?; - println!("Acquired lock {:?}", std::thread::current().id()); - Ok(Handle { path, file }) + let mut res = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX | libc::LOCK_NB) }; + for _ in 0..5 { + if res == 0 { + break; + } + std::thread::sleep(std::time::Duration::from_secs(1)); + res = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX | libc::LOCK_NB) }; + } + if res != 0 { + Err(ApiError::LockAcquisition(path)) + } else { + Ok(Handle { file }) + } } #[derive(Debug, Error)] diff --git a/src/api/tokio.rs b/src/api/tokio.rs index 67e43db..dbf0b3e 100644 --- a/src/api/tokio.rs +++ b/src/api/tokio.rs @@ -16,6 +16,7 @@ use reqwest::{ use std::cmp::Reverse; use std::collections::BinaryHeap; use std::num::ParseIntError; +use std::os::fd::{AsFd, AsRawFd}; use std::path::{Component, Path, PathBuf}; use std::sync::Arc; use thiserror::Error; @@ -64,23 +65,32 @@ impl Progress for () { } struct Handle { - _file: tokio::fs::File, - path: PathBuf, + file: tokio::fs::File, } + impl Drop for Handle { fn drop(&mut self) { - std::fs::remove_file(&self.path).expect("Removing lockfile") + unsafe { libc::flock(self.file.as_fd().as_raw_fd(), libc::LOCK_UN) }; } } async fn lock_file(mut path: PathBuf) -> Result { - use fs4::tokio::AsyncFileExt; path.set_extension("lock"); let file = tokio::fs::File::create(path.clone()).await?; - file.lock_exclusive()?; - let _file = file; - Ok(Handle { path, _file }) + let mut res = unsafe { libc::flock(file.as_fd().as_raw_fd(), libc::LOCK_EX | libc::LOCK_NB) }; + for _ in 0..5 { + if res == 0 { + break; + } + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + res = unsafe { libc::flock(file.as_fd().as_raw_fd(), libc::LOCK_EX | libc::LOCK_NB) }; + } + if res != 0 { + Err(ApiError::LockAcquisition(path)) + } else { + Ok(Handle { file }) + } } #[derive(Debug, Error)] From 08fc326f88be19dbd2eb1f9f0e69886933de14bd Mon Sep 17 00:00:00 2001 From: Nicolas Patry Date: Wed, 8 Jan 2025 00:32:59 +0100 Subject: [PATCH 06/15] Since there doesn't seem to be a sane locking dep, let's reinvent our own. --- Cargo.toml | 13 +++++++-- examples/download.rs | 7 ++--- src/api/sync.rs | 65 ++++++++++++++++++++++++++++++++++++++++---- src/api/tokio.rs | 64 ++++++++++++++++++++++++++++++++++++++++--- 4 files changed, 133 insertions(+), 16 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e5363c5..f21b11a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,13 +32,21 @@ rustls = { version = "0.23.4", optional = true } serde = { version = "1", features = ["derive"], optional = true } serde_json = { version = "1", optional = true } thiserror = { version = "2", optional = true } -tokio = { version = "1.29.1", optional = true, features = ["fs", "macros", "signal"] } +tokio = { version = "1.29.1", optional = true, features = ["fs", "macros"] } ureq = { version = "2.8.0", optional = true, features = [ "json", "socks-proxy", ] } native-tls = { version = "0.2.12", optional = true } -libc = { version = "0.2", optional = true } + +[target.'cfg(windows)'.dependencies.windows-sys] +version = "0.59" +features = ["Win32_Foundation", "Win32_Storage_FileSystem"] +optional = true + +[target.'cfg(unix)'.dependencies.libc] +version = "0.2" +optional = true [features] default = ["default-tls", "tokio", "ureq"] @@ -61,6 +69,7 @@ tokio = [ "dep:tokio", "tokio/rt-multi-thread", "dep:libc", + "dep:windows-sys", ] ureq = [ "dep:http", diff --git a/examples/download.rs b/examples/download.rs index 6faa013..c59a663 100644 --- a/examples/download.rs +++ b/examples/download.rs @@ -5,7 +5,7 @@ fn main() {} #[cfg(feature = "ureq")] #[cfg(not(feature = "tokio"))] fn main() { - let api = hf_hub::api::sync::ApiBuilder::from_env().build().unwrap(); + let api = hf_hub::api::sync::Api::new().unwrap(); let _filename = api .model("meta-llama/Llama-2-7b-hf".to_string()) @@ -16,10 +16,7 @@ fn main() { #[cfg(feature = "tokio")] #[tokio::main] async fn main() { - let api = hf_hub::api::tokio::ApiBuilder::from_env() - .high() - .build() - .unwrap(); + let api = hf_hub::api::tokio::Api::new().unwrap(); let _filename = api .model("meta-llama/Llama-2-7b-hf".to_string()) diff --git a/src/api/sync.rs b/src/api/sync.rs index 6e480e3..b047bf0 100644 --- a/src/api/sync.rs +++ b/src/api/sync.rs @@ -9,7 +9,6 @@ use std::collections::HashMap; use std::io::Read; use std::io::Seek; use std::num::ParseIntError; -use std::os::fd::AsRawFd; use std::path::{Component, Path, PathBuf}; use std::str::FromStr; use thiserror::Error; @@ -70,14 +69,13 @@ impl HeaderAgent { } } -#[derive(Debug)] struct Handle { file: std::fs::File, } impl Drop for Handle { fn drop(&mut self) { - unsafe { libc::flock(self.file.as_raw_fd(), libc::LOCK_UN) }; + unlock(&self.file); } } @@ -85,13 +83,13 @@ fn lock_file(mut path: PathBuf) -> Result { path.set_extension("lock"); let file = std::fs::File::create(path.clone())?; - let mut res = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX | libc::LOCK_NB) }; + let mut res = lock(&file); for _ in 0..5 { if res == 0 { break; } std::thread::sleep(std::time::Duration::from_secs(1)); - res = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX | libc::LOCK_NB) }; + res = lock(&file); } if res != 0 { Err(ApiError::LockAcquisition(path)) @@ -100,6 +98,63 @@ fn lock_file(mut path: PathBuf) -> Result { } } +#[cfg(target_family = "unix")] +mod unix { + use std::os::fd::AsRawFd; + + pub(crate) fn lock(file: &std::fs::File) -> i32 { + unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX | libc::LOCK_NB) } + } + pub(crate) fn unlock(file: &std::fs::File) -> i32 { + unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_UN) } + } +} +#[cfg(target_family = "unix")] +use unix::{lock, unlock}; + +#[cfg(target_family = "windows")] +mod windows { + use std::os::windows::io::AsRawHandle; + use windows_sys::Win32::Foundation::HANDLE; + use windows_sys::Win32::Storage::FileSystem::{ + LockFileEx, UnlockFile, LOCKFILE_EXCLUSIVE_LOCK, LOCKFILE_FAIL_IMMEDIATELY, + }; + + pub(crate) fn lock(file: &std::fs::File) -> i32 { + unsafe { + let mut overlapped = mem::zeroed(); + let flags = LOCKFILE_EXCLUSIVE_LOCK | LOCKFILE_FAIL_IMMEDIATELY; + LockFileEx( + file.as_raw_handle() as HANDLE, + flags, + 0, + !0, + !0, + &mut overlapped, + ) + } + } + pub(crate) fn unlock(file: &std::fs::File) -> i32 { + unsafe { + UnlockFile(file.as_raw_handle() as HANDLE, 0, 0, !0, !0); + } + } +} +#[cfg(target_family = "windows")] +use windows::{lock, unlock}; + +#[cfg(not(any(target_family = "unix", target_family = "windows")))] +mod other { + pub(crate) fn lock(file: &std::fs::File) -> i32 { + 0 + } + pub(crate) fn unlock(file: &std::fs::File) -> i32 { + 0 + } +} +#[cfg(not(any(target_family = "unix", target_family = "windows")))] +use other::{lock, unlock}; + #[derive(Debug, Error)] /// All errors the API can throw pub enum ApiError { diff --git a/src/api/tokio.rs b/src/api/tokio.rs index dbf0b3e..4311823 100644 --- a/src/api/tokio.rs +++ b/src/api/tokio.rs @@ -16,7 +16,6 @@ use reqwest::{ use std::cmp::Reverse; use std::collections::BinaryHeap; use std::num::ParseIntError; -use std::os::fd::{AsFd, AsRawFd}; use std::path::{Component, Path, PathBuf}; use std::sync::Arc; use thiserror::Error; @@ -70,7 +69,7 @@ struct Handle { impl Drop for Handle { fn drop(&mut self) { - unsafe { libc::flock(self.file.as_fd().as_raw_fd(), libc::LOCK_UN) }; + unlock(&self.file); } } @@ -78,13 +77,13 @@ async fn lock_file(mut path: PathBuf) -> Result { path.set_extension("lock"); let file = tokio::fs::File::create(path.clone()).await?; - let mut res = unsafe { libc::flock(file.as_fd().as_raw_fd(), libc::LOCK_EX | libc::LOCK_NB) }; + let mut res = lock(&file); for _ in 0..5 { if res == 0 { break; } tokio::time::sleep(std::time::Duration::from_secs(1)).await; - res = unsafe { libc::flock(file.as_fd().as_raw_fd(), libc::LOCK_EX | libc::LOCK_NB) }; + res = lock(&file); } if res != 0 { Err(ApiError::LockAcquisition(path)) @@ -93,6 +92,63 @@ async fn lock_file(mut path: PathBuf) -> Result { } } +#[cfg(target_family = "unix")] +mod unix { + use std::os::fd::AsRawFd; + + pub(crate) fn lock(file: &tokio::fs::File) -> i32 { + unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX | libc::LOCK_NB) } + } + pub(crate) fn unlock(file: &tokio::fs::File) -> i32 { + unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_UN) } + } +} +#[cfg(target_family = "unix")] +use unix::{lock, unlock}; + +#[cfg(target_family = "windows")] +mod windows { + use std::os::windows::io::AsRawHandle; + use windows_sys::Win32::Foundation::HANDLE; + use windows_sys::Win32::Storage::FileSystem::{ + LockFileEx, UnlockFile, LOCKFILE_EXCLUSIVE_LOCK, LOCKFILE_FAIL_IMMEDIATELY, + }; + + pub(crate) fn lock(file: &tokio::fs::File) -> i32 { + unsafe { + let mut overlapped = mem::zeroed(); + let flags = LOCKFILE_EXCLUSIVE_LOCK | LOCKFILE_FAIL_IMMEDIATELY; + LockFileEx( + file.as_raw_handle() as HANDLE, + flags, + 0, + !0, + !0, + &mut overlapped, + ) + } + } + pub(crate) fn unlock(file: &tokio::fs::File) -> i32 { + unsafe { + UnlockFile(file.as_raw_handle() as HANDLE, 0, 0, !0, !0); + } + } +} +#[cfg(target_family = "windows")] +use windows::{lock, unlock}; + +#[cfg(not(any(target_family = "unix", target_family = "windows")))] +mod other { + pub(crate) fn lock(file: &tokio::fs::File) -> i32 { + 0 + } + pub(crate) fn unlock(file: &tokio::fs::File) -> i32 { + 0 + } +} +#[cfg(not(any(target_family = "unix", target_family = "windows")))] +use other::{lock, unlock}; + #[derive(Debug, Error)] /// All errors the API can throw pub enum ApiError { From 34ed39c3e0aeb5641e333f19a4e952ba02b12215 Mon Sep 17 00:00:00 2001 From: Nicolas Patry Date: Wed, 8 Jan 2025 00:40:02 +0100 Subject: [PATCH 07/15] Simple fix. --- Cargo.toml | 2 +- src/api/sync.rs | 6 ++---- src/api/tokio.rs | 6 ++---- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f21b11a..e1f94b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,7 +41,7 @@ native-tls = { version = "0.2.12", optional = true } [target.'cfg(windows)'.dependencies.windows-sys] version = "0.59" -features = ["Win32_Foundation", "Win32_Storage_FileSystem"] +features = ["Win32_Foundation", "Win32_Storage_FileSystem", "Win32_System_IO"] optional = true [target.'cfg(unix)'.dependencies.libc] diff --git a/src/api/sync.rs b/src/api/sync.rs index b047bf0..3bc8bfc 100644 --- a/src/api/sync.rs +++ b/src/api/sync.rs @@ -122,7 +122,7 @@ mod windows { pub(crate) fn lock(file: &std::fs::File) -> i32 { unsafe { - let mut overlapped = mem::zeroed(); + let mut overlapped = std::mem::zeroed(); let flags = LOCKFILE_EXCLUSIVE_LOCK | LOCKFILE_FAIL_IMMEDIATELY; LockFileEx( file.as_raw_handle() as HANDLE, @@ -135,9 +135,7 @@ mod windows { } } pub(crate) fn unlock(file: &std::fs::File) -> i32 { - unsafe { - UnlockFile(file.as_raw_handle() as HANDLE, 0, 0, !0, !0); - } + unsafe { UnlockFile(file.as_raw_handle() as HANDLE, 0, 0, !0, !0) } } } #[cfg(target_family = "windows")] diff --git a/src/api/tokio.rs b/src/api/tokio.rs index 4311823..698ece7 100644 --- a/src/api/tokio.rs +++ b/src/api/tokio.rs @@ -116,7 +116,7 @@ mod windows { pub(crate) fn lock(file: &tokio::fs::File) -> i32 { unsafe { - let mut overlapped = mem::zeroed(); + let mut overlapped = std::mem::zeroed(); let flags = LOCKFILE_EXCLUSIVE_LOCK | LOCKFILE_FAIL_IMMEDIATELY; LockFileEx( file.as_raw_handle() as HANDLE, @@ -129,9 +129,7 @@ mod windows { } } pub(crate) fn unlock(file: &tokio::fs::File) -> i32 { - unsafe { - UnlockFile(file.as_raw_handle() as HANDLE, 0, 0, !0, !0); - } + unsafe { UnlockFile(file.as_raw_handle() as HANDLE, 0, 0, !0, !0) } } } #[cfg(target_family = "windows")] From 7fd75ddedc6312c12fcf5d91e34e23387443363c Mon Sep 17 00:00:00 2001 From: Nicolas Patry Date: Wed, 8 Jan 2025 00:46:58 +0100 Subject: [PATCH 08/15] ? --- src/api/sync.rs | 2 +- src/api/tokio.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/api/sync.rs b/src/api/sync.rs index 3bc8bfc..cdd0b86 100644 --- a/src/api/sync.rs +++ b/src/api/sync.rs @@ -738,7 +738,7 @@ impl ApiRepo { .blob_path(&metadata.etag); std::fs::create_dir_all(blob_path.parent().unwrap())?; - let lock = lock_file(blob_path.clone())?; + let lock = lock_file(blob_path.clone()).unwrap(); let mut tmp_path = blob_path.clone(); tmp_path.set_extension(EXTENSION); let tmp_filename = diff --git a/src/api/tokio.rs b/src/api/tokio.rs index 698ece7..6126f3c 100644 --- a/src/api/tokio.rs +++ b/src/api/tokio.rs @@ -857,7 +857,7 @@ impl ApiRepo { let blob_path = cache.blob_path(&metadata.etag); std::fs::create_dir_all(blob_path.parent().unwrap())?; - let lock = lock_file(blob_path.clone()).await?; + let lock = lock_file(blob_path.clone()).await.unwrap(); progress.init(metadata.size, filename).await; let mut tmp_path = blob_path.clone(); tmp_path.set_extension(EXTENSION); From 4eeeceeda438407c37d0e43326ac8aaf72190b4f Mon Sep 17 00:00:00 2001 From: Nicolas Patry Date: Wed, 8 Jan 2025 00:56:00 +0100 Subject: [PATCH 09/15] Let's figure out windows warts. --- src/api/sync.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/api/sync.rs b/src/api/sync.rs index cdd0b86..06c3b74 100644 --- a/src/api/sync.rs +++ b/src/api/sync.rs @@ -75,6 +75,7 @@ struct Handle { impl Drop for Handle { fn drop(&mut self) { + println!("Released lock on {:?}", std::thread::current().id()); unlock(&self.file); } } @@ -94,6 +95,7 @@ fn lock_file(mut path: PathBuf) -> Result { if res != 0 { Err(ApiError::LockAcquisition(path)) } else { + println!("Acquired lock on {:?}", std::thread::current().id()); Ok(Handle { file }) } } From e854be7ad45f0398062b2805dd86a3ac2e493c8c Mon Sep 17 00:00:00 2001 From: Nicolas Patry Date: Wed, 8 Jan 2025 01:02:57 +0100 Subject: [PATCH 10/15] Shot in the dark. --- src/api/sync.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/api/sync.rs b/src/api/sync.rs index 06c3b74..960aa9a 100644 --- a/src/api/sync.rs +++ b/src/api/sync.rs @@ -121,6 +121,7 @@ mod windows { use windows_sys::Win32::Storage::FileSystem::{ LockFileEx, UnlockFile, LOCKFILE_EXCLUSIVE_LOCK, LOCKFILE_FAIL_IMMEDIATELY, }; + use windows_sys::Win32::System::SystemServices::MAXDWORD; pub(crate) fn lock(file: &std::fs::File) -> i32 { unsafe { @@ -130,8 +131,8 @@ mod windows { file.as_raw_handle() as HANDLE, flags, 0, - !0, - !0, + MAXDWORD, + MAXDWORD, &mut overlapped, ) } From 4cc79a7a68915081ea75af0068c1f7985c6bcad3 Mon Sep 17 00:00:00 2001 From: Nicolas Patry Date: Wed, 8 Jan 2025 01:08:09 +0100 Subject: [PATCH 11/15] Goold old i32 semantics. --- src/api/sync.rs | 5 +++-- src/api/tokio.rs | 10 ++++++---- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/api/sync.rs b/src/api/sync.rs index 960aa9a..0ba408a 100644 --- a/src/api/sync.rs +++ b/src/api/sync.rs @@ -127,14 +127,15 @@ mod windows { unsafe { let mut overlapped = std::mem::zeroed(); let flags = LOCKFILE_EXCLUSIVE_LOCK | LOCKFILE_FAIL_IMMEDIATELY; - LockFileEx( + let res = LockFileEx( file.as_raw_handle() as HANDLE, flags, 0, MAXDWORD, MAXDWORD, &mut overlapped, - ) + ); + 1 - res } } pub(crate) fn unlock(file: &std::fs::File) -> i32 { diff --git a/src/api/tokio.rs b/src/api/tokio.rs index 6126f3c..5015969 100644 --- a/src/api/tokio.rs +++ b/src/api/tokio.rs @@ -113,19 +113,21 @@ mod windows { use windows_sys::Win32::Storage::FileSystem::{ LockFileEx, UnlockFile, LOCKFILE_EXCLUSIVE_LOCK, LOCKFILE_FAIL_IMMEDIATELY, }; + use windows_sys::Win32::System::SystemServices::MAXDWORD; pub(crate) fn lock(file: &tokio::fs::File) -> i32 { unsafe { let mut overlapped = std::mem::zeroed(); let flags = LOCKFILE_EXCLUSIVE_LOCK | LOCKFILE_FAIL_IMMEDIATELY; - LockFileEx( + let res = LockFileEx( file.as_raw_handle() as HANDLE, flags, 0, - !0, - !0, + MAXDWORD, + MAXDWORD, &mut overlapped, - ) + ); + 1 - res } } pub(crate) fn unlock(file: &tokio::fs::File) -> i32 { From 18bf18252dbbd8fd0e1e3b7479da175a4e6ff409 Mon Sep 17 00:00:00 2001 From: Nicolas Patry Date: Wed, 8 Jan 2025 01:12:09 +0100 Subject: [PATCH 12/15] MAXDWORD is feature gated. --- src/api/sync.rs | 7 ++----- src/api/tokio.rs | 7 +++---- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/src/api/sync.rs b/src/api/sync.rs index 0ba408a..1819f13 100644 --- a/src/api/sync.rs +++ b/src/api/sync.rs @@ -75,7 +75,6 @@ struct Handle { impl Drop for Handle { fn drop(&mut self) { - println!("Released lock on {:?}", std::thread::current().id()); unlock(&self.file); } } @@ -95,7 +94,6 @@ fn lock_file(mut path: PathBuf) -> Result { if res != 0 { Err(ApiError::LockAcquisition(path)) } else { - println!("Acquired lock on {:?}", std::thread::current().id()); Ok(Handle { file }) } } @@ -121,7 +119,6 @@ mod windows { use windows_sys::Win32::Storage::FileSystem::{ LockFileEx, UnlockFile, LOCKFILE_EXCLUSIVE_LOCK, LOCKFILE_FAIL_IMMEDIATELY, }; - use windows_sys::Win32::System::SystemServices::MAXDWORD; pub(crate) fn lock(file: &std::fs::File) -> i32 { unsafe { @@ -131,8 +128,8 @@ mod windows { file.as_raw_handle() as HANDLE, flags, 0, - MAXDWORD, - MAXDWORD, + !0, + !0, &mut overlapped, ); 1 - res diff --git a/src/api/tokio.rs b/src/api/tokio.rs index 5015969..69070a2 100644 --- a/src/api/tokio.rs +++ b/src/api/tokio.rs @@ -113,7 +113,6 @@ mod windows { use windows_sys::Win32::Storage::FileSystem::{ LockFileEx, UnlockFile, LOCKFILE_EXCLUSIVE_LOCK, LOCKFILE_FAIL_IMMEDIATELY, }; - use windows_sys::Win32::System::SystemServices::MAXDWORD; pub(crate) fn lock(file: &tokio::fs::File) -> i32 { unsafe { @@ -123,8 +122,8 @@ mod windows { file.as_raw_handle() as HANDLE, flags, 0, - MAXDWORD, - MAXDWORD, + !0, + !0, &mut overlapped, ); 1 - res @@ -859,7 +858,7 @@ impl ApiRepo { let blob_path = cache.blob_path(&metadata.etag); std::fs::create_dir_all(blob_path.parent().unwrap())?; - let lock = lock_file(blob_path.clone()).await.unwrap(); + let lock = lock_file(blob_path.clone()).await?; progress.init(metadata.size, filename).await; let mut tmp_path = blob_path.clone(); tmp_path.set_extension(EXTENSION); From d9bed7c48c7eb0ca363df1e270ad82b4b01c10a2 Mon Sep 17 00:00:00 2001 From: Nicolas Patry Date: Wed, 8 Jan 2025 01:16:31 +0100 Subject: [PATCH 13/15] windows-sys on ureq. --- Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.toml b/Cargo.toml index e1f94b9..ad10ebe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -80,6 +80,7 @@ ureq = [ "dep:thiserror", "dep:ureq", "dep:libc", + "dep:windows-sys", ] [dev-dependencies] From 922e3ed424cf12a1f3791b4270bdb92e6b178655 Mon Sep 17 00:00:00 2001 From: Nicolas Patry Date: Wed, 8 Jan 2025 01:18:00 +0100 Subject: [PATCH 14/15] Making everythign fail, when one fails. --- .github/workflows/rust.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 2222486..d44280c 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -42,10 +42,10 @@ jobs: - name: Run Tests (ssl cross) run: | - cargo test --no-default-features --features ureq,native-tls - cargo test --no-default-features --features ureq,rustls-tls - cargo test --no-default-features --features tokio,native-tls - cargo test --no-default-features --features tokio,rustls-tls + cargo test --no-default-features --features ureq,native-tls && + cargo test --no-default-features --features ureq,rustls-tls && + cargo test --no-default-features --features tokio,native-tls && + cargo test --no-default-features --features tokio,rustls-tls && - name: Run Audit run: cargo audit -D warnings From 962acc35ae771856fe1de7b9c1249a7ebaf266e9 Mon Sep 17 00:00:00 2001 From: Nicolas Patry Date: Wed, 8 Jan 2025 01:21:32 +0100 Subject: [PATCH 15/15] "folded style" yaml BS. --- .github/workflows/rust.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index d44280c..c9a1cd3 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -41,11 +41,11 @@ jobs: run: cargo test --no-default-features --verbose - name: Run Tests (ssl cross) - run: | + run: > cargo test --no-default-features --features ureq,native-tls && cargo test --no-default-features --features ureq,rustls-tls && cargo test --no-default-features --features tokio,native-tls && - cargo test --no-default-features --features tokio,rustls-tls && + cargo test --no-default-features --features tokio,rustls-tls - name: Run Audit run: cargo audit -D warnings