From c55ffbc3816db17aee9b4472461cea5f9662ccb0 Mon Sep 17 00:00:00 2001 From: Mike Hommey Date: Mon, 13 Jan 2025 19:41:02 +0900 Subject: [PATCH] Avoid sending notifications to git before Mercurial's stderr is flushed --- Cargo.lock | 13 +++++ Cargo.toml | 4 ++ src/hg_connect.rs | 10 ++++ src/hg_connect_stdio.rs | 120 +++++++++++++++++++++++++++++++++++++++- src/main.rs | 3 + 5 files changed, 148 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9fe0142b..f21545d8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -475,6 +475,7 @@ dependencies = [ "log", "lru", "make-cmd", + "mio", "once_cell", "percent-encoding", "rand", @@ -797,6 +798,18 @@ dependencies = [ "adler2", ] +[[package]] +name = "mio" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" +dependencies = [ + "libc", + "log", + "wasi", + "windows-sys 0.52.0", +] + [[package]] name = "object" version = "0.36.5" diff --git a/Cargo.toml b/Cargo.toml index 98a48f31..46d90ab6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -105,6 +105,10 @@ features = ["std"] version = "0.12" default-features = false +[dependencies.mio] +version = "1" +features = ["os-ext", "os-poll"] + [dependencies.regex] version = "1" default-features = false diff --git a/src/hg_connect.rs b/src/hg_connect.rs index 40ad6a23..b9ec9afe 100644 --- a/src/hg_connect.rs +++ b/src/hg_connect.rs @@ -165,6 +165,8 @@ pub trait HgConnectionBase { fn sample_size(&self) -> usize { 100 } + + fn sync(&mut self) {} } pub trait HgWireConnection: HgConnectionBase { @@ -368,6 +370,10 @@ impl HgConnectionBase for LogWireConnection { fn sample_size(&self) -> usize { self.conn.sample_size() } + + fn sync(&mut self) { + self.conn.sync(); + } } impl HgWireConnection for LogWireConnection { @@ -500,6 +506,10 @@ impl HgConnectionBase for HgWired { fn sample_size(&self) -> usize { self.conn.sample_size() } + + fn sync(&mut self) { + self.conn.sync(); + } } impl HgConnection for HgWired { diff --git a/src/hg_connect_stdio.rs b/src/hg_connect_stdio.rs index bf6680a9..3a3d9f39 100644 --- a/src/hg_connect_stdio.rs +++ b/src/hg_connect_stdio.rs @@ -5,15 +5,26 @@ use std::borrow::Cow; use std::ffi::{c_char, c_void, CStr, CString, OsString}; use std::fs::File; -use std::io::{copy, BufRead, BufReader, Read, Seek, SeekFrom, Write}; +use std::io::{copy, BufRead, BufReader, ErrorKind, Read, Seek, SeekFrom, Write}; +#[cfg(unix)] +use std::os::fd::AsRawFd; use std::os::raw::c_int; +#[cfg(windows)] +use std::os::windows::io::{FromRawHandle, IntoRawHandle}; use std::process::{self, ChildStdin, ChildStdout, Command, Stdio}; use std::str::FromStr; +use std::sync::{Arc, Condvar, Mutex}; use std::thread::{self, JoinHandle}; +use std::time::Duration; use std::{mem, ptr}; use bstr::{BStr, BString}; use itertools::Itertools; +#[cfg(unix)] +use mio::unix::SourceFd; +#[cfg(windows)] +use mio::windows::NamedPipe; +use mio::{Events, Poll, Token, Waker}; use percent_encoding::percent_decode_str; use url::Url; @@ -35,6 +46,23 @@ pub struct HgStdioConnection { proc: process::Child, thread: Option>, url: Url, + synchronizer: Arc, +} + +struct Synchronizer { + read_finished: Mutex, + condvar: Condvar, + waker: Waker, +} + +impl Synchronizer { + fn new(waker: Waker) -> Self { + Synchronizer { + read_finished: Mutex::new(false), + condvar: Condvar::new(), + waker, + } + } } unsafe impl Send for HgStdioConnection {} @@ -185,6 +213,14 @@ impl HgConnectionBase for HgStdioConnection { fn sample_size(&self) -> usize { 10000 } + + fn sync(&mut self) { + let guard = self.synchronizer.read_finished.lock().unwrap(); + if !*guard { + self.synchronizer.waker.wake().unwrap(); + let _unused = self.synchronizer.condvar.wait(guard).unwrap(); + } + } } impl Drop for HgStdioConnection { @@ -262,8 +298,28 @@ pub fn get_stdio_connection(url: &Url, flags: c_int) -> Option> return None; }; + #[cfg(unix)] let mut proc_err = proc.stderr.take().unwrap(); + #[cfg(windows)] + let mut proc_err = + unsafe { NamedPipe::from_raw_handle(proc.stderr.take().unwrap().into_raw_handle()) }; + const STDERR: Token = Token(0); + const WAKER: Token = Token(1); + let mut events = Events::with_capacity(1); + let mut poll = Poll::new().unwrap(); + poll.registry() + .register( + #[cfg(unix)] + &mut SourceFd(&proc_err.as_raw_fd()), + #[cfg(windows)] + &mut proc_err, + STDERR, + mio::Interest::READABLE, + ) + .unwrap(); + let waker = Waker::new(poll.registry(), WAKER).unwrap(); + let synchronizer = Arc::new(Synchronizer::new(waker)); let mut conn = HgStdioConnection { capabilities: HgCapabilities::default(), proc_in: proc.stdin.take(), @@ -271,6 +327,7 @@ pub fn get_stdio_connection(url: &Url, flags: c_int) -> Option> proc, thread: None, url: url.clone(), + synchronizer: synchronizer.clone(), }; conn.thread = Some( @@ -283,7 +340,66 @@ pub fn get_stdio_connection(url: &Url, flags: c_int) -> Option> * Japanese locale) */ let stderr = unsafe { FdFile::stderr() }; let mut writer = PrefixWriter::new("remote: ", stderr); - copy(&mut proc_err, &mut writer).unwrap(); + let mut buf = [0; 1024]; + let mut block = true; + let mut read_finished = false; + let mut notify = 0; + loop { + match poll.poll(&mut events, if block { None } else { Some(Duration::ZERO) }) { + Ok(()) => {} + Err(e) if e.kind() == ErrorKind::Interrupted => continue, + e => e.map(|_| ()).unwrap(), + } + if !block && events.is_empty() { + let mut read_finished_ = synchronizer.read_finished.lock().unwrap(); + while notify > 0 { + synchronizer.condvar.notify_one(); + notify -= 1; + } + if read_finished { + *read_finished_ = read_finished; + break; + } + block = true; + } + for event in &events { + block = false; + match event.token() { + STDERR => { + // Work around https://github.com/tokio-rs/mio/issues/1855 + if !event.is_readable() && !event.is_read_closed() { + continue; + } + let buf = match (!event.is_read_closed()) + .then(|| proc_err.read(&mut buf)) + { + Some(Ok(len)) => &buf[..len], + Some(Err(e)) if e.kind() == ErrorKind::Interrupted => continue, + Some(e) => e.map(|_| &[]).unwrap(), + None => &[], + }; + if buf.is_empty() { + read_finished = true; + poll.registry() + .deregister( + #[cfg(unix)] + &mut SourceFd(&proc_err.as_raw_fd()), + #[cfg(windows)] + &mut proc_err, + ) + .unwrap(); + continue; + } + writer.write_all(buf).unwrap(); + } + WAKER => { + notify += 1; + } + _ => {} + } + } + } + assert_eq!(notify, 0); }) .unwrap(), ); diff --git a/src/main.rs b/src/main.rs index a6796aa9..8496ee28 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4492,6 +4492,7 @@ fn remote_helper_import( } transaction.commit().unwrap(); + conn.sync(); writeln!(stdout, "done").unwrap(); stdout.flush().unwrap(); @@ -4921,6 +4922,8 @@ fn remote_helper_push( }) .collect::>(); + conn.sync(); + for (_, dest, _) in push_refs { let mut buf = Vec::new(); match status