Skip to content

Commit

Permalink
perf(refresh): switch notification backend
Browse files Browse the repository at this point in the history
Use the new `notify-debouncer-full`, which doesn't emit duplicate events.
  • Loading branch information
dzfrias committed Jun 27, 2023
1 parent bff6de8 commit 25511f0
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 42 deletions.
30 changes: 26 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ ignore = "0.4.20"
itertools = "0.10.5"
log = { version = "0.4.17", features = ["serde"] }
nom = "7.1.3"
notify = "5.1.0"
notify-debouncer-full = "0.2.0"
rust_search = "2.1.0"
scopeguard = "1.1.0"
serde = { version = "1.0.158", features = ["derive"] }
Expand Down
85 changes: 48 additions & 37 deletions src/external_event/refresh.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use super::{ExternalEvent, RefreshData};
use anyhow::Result;
use crossbeam_channel::{unbounded, Sender};
use notify::{recommended_watcher, Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use notify_debouncer_full::{
new_debouncer,
notify::{EventKind, RecommendedWatcher, RecursiveMode, Watcher},
DebounceEventResult, Debouncer, FileIdMap,
};
use std::{
path::{Path, PathBuf},
sync::{
Expand Down Expand Up @@ -65,49 +69,56 @@ impl ChangeBuffer {
pub fn fs_watch(
path: &Path,
event_sender: Sender<ExternalEvent>,
refresh_time: u64,
_refresh_time: u64,
is_suspended: Arc<AtomicBool>,
) -> Result<(RecommendedWatcher, ChangeBuffer)> {
) -> Result<(Debouncer<RecommendedWatcher, FileIdMap>, ChangeBuffer)> {
let (tx, rx) = unbounded();
let mut watcher = recommended_watcher(tx)?;
watcher.configure(Config::default().with_poll_interval(Duration::from_millis(refresh_time)))?;
watcher.watch(path, RecursiveMode::Recursive)?;
let mut watcher = {
let event_sender = event_sender.clone();
new_debouncer(
Duration::from_secs(2),
None,
move |result: DebounceEventResult| match result {
Ok(events) => {
for event in events {
tx.send(event.event).unwrap();
}
}
Err(errs) => {
for err in errs {
event_sender.send(ExternalEvent::Error(err.into())).unwrap();
}
}
},
)?
};
watcher.watcher().watch(path, RecursiveMode::Recursive)?;
let buffer = ChangeBuffer::new();
let mut thread_buffer = buffer.clone();
thread::spawn(move || {
for res in rx {
let send_result: Result<()> = match res {
Ok(event) => match event.kind {
EventKind::Create(_) => {
if is_suspended.load(Ordering::Acquire) {
thread_buffer.add_created(event.paths);
Ok(())
} else {
let data = ExternalEvent::PartialRefresh(
event.paths.into_iter().map(RefreshData::Add).collect(),
);
event_sender.send(data).map_err(Into::into)
}
for event in rx {
match event.kind {
EventKind::Create(_) => {
if is_suspended.load(Ordering::Acquire) {
thread_buffer.add_created(event.paths);
} else {
let data = ExternalEvent::PartialRefresh(
event.paths.into_iter().map(RefreshData::Add).collect(),
);
event_sender.send(data).unwrap();
}
EventKind::Remove(_) => {
if is_suspended.load(Ordering::Acquire) {
thread_buffer.add_removed(event.paths);
Ok(())
} else {
let data = ExternalEvent::PartialRefresh(
event.paths.into_iter().map(RefreshData::Delete).collect(),
);
event_sender.send(data).map_err(Into::into)
}
}
EventKind::Remove(_) => {
if is_suspended.load(Ordering::Acquire) {
thread_buffer.add_removed(event.paths);
} else {
let data = ExternalEvent::PartialRefresh(
event.paths.into_iter().map(RefreshData::Delete).collect(),
);
event_sender.send(data).unwrap();
}
_ => Ok(()),
},
Err(e) => Err(e.into()),
};
if let Err(err) = send_result {
event_sender
.send(ExternalEvent::Error(err))
.expect("sender should not have deallocated");
}
_ => {}
}
}
});
Expand Down

0 comments on commit 25511f0

Please sign in to comment.