Skip to content

Commit

Permalink
Avoid sending notifications to git before Mercurial's stderr is flushed
Browse files Browse the repository at this point in the history
  • Loading branch information
glandium committed Jan 14, 2025
1 parent e76a57c commit c55ffbc
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 2 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ features = ["std"]
version = "0.12"
default-features = false

[dependencies.mio]
version = "1"
features = ["os-ext", "os-poll"]

[dependencies.regex]
version = "1"
default-features = false
Expand Down
10 changes: 10 additions & 0 deletions src/hg_connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ pub trait HgConnectionBase {
fn sample_size(&self) -> usize {
100
}

fn sync(&mut self) {}
}

pub trait HgWireConnection: HgConnectionBase {
Expand Down Expand Up @@ -368,6 +370,10 @@ impl<C: HgWireConnection> HgConnectionBase for LogWireConnection<C> {
fn sample_size(&self) -> usize {
self.conn.sample_size()
}

fn sync(&mut self) {
self.conn.sync();
}
}

impl<C: HgWireConnection> HgWireConnection for LogWireConnection<C> {
Expand Down Expand Up @@ -500,6 +506,10 @@ impl<C: HgWireConnection> HgConnectionBase for HgWired<C> {
fn sample_size(&self) -> usize {
self.conn.sample_size()
}

fn sync(&mut self) {
self.conn.sync();
}
}

impl<C: HgWireConnection> HgConnection for HgWired<C> {
Expand Down
120 changes: 118 additions & 2 deletions src/hg_connect_stdio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,26 @@
use std::borrow::Cow;
use std::ffi::{c_char, c_void, CStr, CString, OsString};
use std::fs::File;
use std::io::{copy, BufRead, BufReader, Read, Seek, SeekFrom, Write};
use std::io::{copy, BufRead, BufReader, ErrorKind, Read, Seek, SeekFrom, Write};
#[cfg(unix)]
use std::os::fd::AsRawFd;
use std::os::raw::c_int;
#[cfg(windows)]
use std::os::windows::io::{FromRawHandle, IntoRawHandle};
use std::process::{self, ChildStdin, ChildStdout, Command, Stdio};
use std::str::FromStr;
use std::sync::{Arc, Condvar, Mutex};
use std::thread::{self, JoinHandle};
use std::time::Duration;
use std::{mem, ptr};

use bstr::{BStr, BString};
use itertools::Itertools;
#[cfg(unix)]
use mio::unix::SourceFd;
#[cfg(windows)]
use mio::windows::NamedPipe;
use mio::{Events, Poll, Token, Waker};
use percent_encoding::percent_decode_str;
use url::Url;

Expand All @@ -35,6 +46,23 @@ pub struct HgStdioConnection {
proc: process::Child,
thread: Option<JoinHandle<()>>,
url: Url,
synchronizer: Arc<Synchronizer>,
}

struct Synchronizer {
read_finished: Mutex<bool>,
condvar: Condvar,
waker: Waker,
}

impl Synchronizer {
fn new(waker: Waker) -> Self {
Synchronizer {
read_finished: Mutex::new(false),
condvar: Condvar::new(),
waker,
}
}
}

unsafe impl Send for HgStdioConnection {}
Expand Down Expand Up @@ -185,6 +213,14 @@ impl HgConnectionBase for HgStdioConnection {
fn sample_size(&self) -> usize {
10000
}

fn sync(&mut self) {
let guard = self.synchronizer.read_finished.lock().unwrap();
if !*guard {
self.synchronizer.waker.wake().unwrap();
let _unused = self.synchronizer.condvar.wait(guard).unwrap();
}
}
}

impl Drop for HgStdioConnection {
Expand Down Expand Up @@ -262,15 +298,36 @@ pub fn get_stdio_connection(url: &Url, flags: c_int) -> Option<Box<dyn HgRepo>>
return None;
};

#[cfg(unix)]
let mut proc_err = proc.stderr.take().unwrap();
#[cfg(windows)]
let mut proc_err =
unsafe { NamedPipe::from_raw_handle(proc.stderr.take().unwrap().into_raw_handle()) };
const STDERR: Token = Token(0);
const WAKER: Token = Token(1);
let mut events = Events::with_capacity(1);
let mut poll = Poll::new().unwrap();
poll.registry()
.register(
#[cfg(unix)]
&mut SourceFd(&proc_err.as_raw_fd()),
#[cfg(windows)]
&mut proc_err,
STDERR,
mio::Interest::READABLE,
)
.unwrap();
let waker = Waker::new(poll.registry(), WAKER).unwrap();

let synchronizer = Arc::new(Synchronizer::new(waker));
let mut conn = HgStdioConnection {
capabilities: HgCapabilities::default(),
proc_in: proc.stdin.take(),
proc_out: proc.stdout.take().map(BufReader::new),
proc,
thread: None,
url: url.clone(),
synchronizer: synchronizer.clone(),
};

conn.thread = Some(
Expand All @@ -283,7 +340,66 @@ pub fn get_stdio_connection(url: &Url, flags: c_int) -> Option<Box<dyn HgRepo>>
* Japanese locale) */
let stderr = unsafe { FdFile::stderr() };
let mut writer = PrefixWriter::new("remote: ", stderr);
copy(&mut proc_err, &mut writer).unwrap();
let mut buf = [0; 1024];
let mut block = true;
let mut read_finished = false;
let mut notify = 0;
loop {
match poll.poll(&mut events, if block { None } else { Some(Duration::ZERO) }) {
Ok(()) => {}
Err(e) if e.kind() == ErrorKind::Interrupted => continue,
e => e.map(|_| ()).unwrap(),
}
if !block && events.is_empty() {
let mut read_finished_ = synchronizer.read_finished.lock().unwrap();
while notify > 0 {
synchronizer.condvar.notify_one();
notify -= 1;
}
if read_finished {
*read_finished_ = read_finished;
break;
}
block = true;
}
for event in &events {
block = false;
match event.token() {
STDERR => {
// Work around https://github.com/tokio-rs/mio/issues/1855
if !event.is_readable() && !event.is_read_closed() {
continue;
}
let buf = match (!event.is_read_closed())
.then(|| proc_err.read(&mut buf))
{
Some(Ok(len)) => &buf[..len],
Some(Err(e)) if e.kind() == ErrorKind::Interrupted => continue,
Some(e) => e.map(|_| &[]).unwrap(),
None => &[],
};
if buf.is_empty() {
read_finished = true;
poll.registry()
.deregister(
#[cfg(unix)]
&mut SourceFd(&proc_err.as_raw_fd()),
#[cfg(windows)]
&mut proc_err,
)
.unwrap();
continue;
}
writer.write_all(buf).unwrap();
}
WAKER => {
notify += 1;
}
_ => {}
}
}
}
assert_eq!(notify, 0);
})
.unwrap(),
);
Expand Down
3 changes: 3 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4492,6 +4492,7 @@ fn remote_helper_import(
}
transaction.commit().unwrap();

conn.sync();
writeln!(stdout, "done").unwrap();
stdout.flush().unwrap();

Expand Down Expand Up @@ -4921,6 +4922,8 @@ fn remote_helper_push(
})
.collect::<HashMap<_, _>>();

conn.sync();

for (_, dest, _) in push_refs {
let mut buf = Vec::new();
match status
Expand Down

0 comments on commit c55ffbc

Please sign in to comment.