Skip to content

Commit

Permalink
Avoid sending notifications to git before Mercurial's stderr is flushed
Browse files Browse the repository at this point in the history
  • Loading branch information
glandium committed Jan 13, 2025
1 parent e76a57c commit f458b35
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 2 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ features = ["std"]
version = "0.12"
default-features = false

[dependencies.mio]
version = "1"
features = ["os-ext", "os-poll"]

[dependencies.regex]
version = "1"
default-features = false
Expand Down
10 changes: 10 additions & 0 deletions src/hg_connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ pub trait HgConnectionBase {
fn sample_size(&self) -> usize {
100
}

fn sync(&mut self) {}
}

pub trait HgWireConnection: HgConnectionBase {
Expand Down Expand Up @@ -368,6 +370,10 @@ impl<C: HgWireConnection> HgConnectionBase for LogWireConnection<C> {
fn sample_size(&self) -> usize {
self.conn.sample_size()
}

fn sync(&mut self) {
self.conn.sync();
}
}

impl<C: HgWireConnection> HgWireConnection for LogWireConnection<C> {
Expand Down Expand Up @@ -500,6 +506,10 @@ impl<C: HgWireConnection> HgConnectionBase for HgWired<C> {
fn sample_size(&self) -> usize {
self.conn.sample_size()
}

fn sync(&mut self) {
self.conn.sync();
}
}

impl<C: HgWireConnection> HgConnection for HgWired<C> {
Expand Down
64 changes: 62 additions & 2 deletions src/hg_connect_stdio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,25 @@
use std::borrow::Cow;
use std::ffi::{c_char, c_void, CStr, CString, OsString};
use std::fs::File;
use std::io::{copy, BufRead, BufReader, Read, Seek, SeekFrom, Write};
use std::io::{copy, BufRead, BufReader, ErrorKind, Read, Seek, SeekFrom, Write};
#[cfg(unix)]
use std::os::fd::AsRawFd;
use std::os::raw::c_int;
#[cfg(windows)]
use std::os::windows::io::{AsRawHandle, FromRawHandle};
use std::process::{self, ChildStdin, ChildStdout, Command, Stdio};
use std::str::FromStr;
use std::sync::{Arc, Condvar, Mutex};
use std::thread::{self, JoinHandle};
use std::{mem, ptr};

use bstr::{BStr, BString};
use itertools::Itertools;
#[cfg(unix)]
use mio::unix::SourceFd;
#[cfg(windows)]
use mio::windows::NamedPipe;
use mio::{Events, Poll, Token, Waker};
use percent_encoding::percent_decode_str;
use url::Url;

Expand All @@ -35,6 +45,8 @@ pub struct HgStdioConnection {
proc: process::Child,
thread: Option<JoinHandle<()>>,
url: Url,
waker: Waker,
condvar: Arc<(Mutex<()>, Condvar)>,
}

unsafe impl Send for HgStdioConnection {}
Expand Down Expand Up @@ -185,6 +197,12 @@ impl HgConnectionBase for HgStdioConnection {
fn sample_size(&self) -> usize {
10000
}

fn sync(&mut self) {
self.waker.wake().unwrap();
let guard = self.condvar.0.lock().unwrap();
let _unused = self.condvar.1.wait(guard).unwrap();
}
}

impl Drop for HgStdioConnection {
Expand Down Expand Up @@ -263,14 +281,34 @@ pub fn get_stdio_connection(url: &Url, flags: c_int) -> Option<Box<dyn HgRepo>>
};

let mut proc_err = proc.stderr.take().unwrap();
#[cfg(windows)]
let mut proc_err = unsafe { NamedPipe::from_raw_handle(proc_err.into_raw_handle()) };
const STDERR: Token = Token(0);
const WAKER: Token = Token(1);
let mut events = Events::with_capacity(2);
let mut poll = Poll::new().unwrap();
poll.registry()
.register(
#[cfg(unix)]
&mut SourceFd(&proc_err.as_raw_fd()),
#[cfg(windows)]
&mut proc_err,
STDERR,
mio::Interest::READABLE,
)
.unwrap();
let waker = Waker::new(poll.registry(), WAKER).unwrap();

let condvar = Arc::new((Mutex::new(()), Condvar::new()));
let mut conn = HgStdioConnection {
capabilities: HgCapabilities::default(),
proc_in: proc.stdin.take(),
proc_out: proc.stdout.take().map(BufReader::new),
proc,
thread: None,
url: url.clone(),
waker,
condvar: condvar.clone(),
};

conn.thread = Some(
Expand All @@ -283,7 +321,29 @@ pub fn get_stdio_connection(url: &Url, flags: c_int) -> Option<Box<dyn HgRepo>>
* Japanese locale) */
let stderr = unsafe { FdFile::stderr() };
let mut writer = PrefixWriter::new("remote: ", stderr);
copy(&mut proc_err, &mut writer).unwrap();
let mut buf = [0; 1024];
'outer: loop {
poll.poll(&mut events, None).unwrap();
for event in &events {
match event.token() {
STDERR => {
let buf = match proc_err.read(&mut buf) {
Ok(len) => &buf[..len],
Err(e) if e.kind() == ErrorKind::Interrupted => continue,
e => e.map(|_| &[]).unwrap(),
};
if buf.is_empty() {
break 'outer;
}
writer.write_all(buf).unwrap();
}
WAKER => {
condvar.1.notify_one();
}
_ => {}
}
}
}
})
.unwrap(),
);
Expand Down
3 changes: 3 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4492,6 +4492,7 @@ fn remote_helper_import(
}
transaction.commit().unwrap();

conn.sync();
writeln!(stdout, "done").unwrap();
stdout.flush().unwrap();

Expand Down Expand Up @@ -4921,6 +4922,8 @@ fn remote_helper_push(
})
.collect::<HashMap<_, _>>();

conn.sync();

for (_, dest, _) in push_refs {
let mut buf = Vec::new();
match status
Expand Down

0 comments on commit f458b35

Please sign in to comment.