From f3b6ac225af0fb4c0fce71566f3cba34d57361eb Mon Sep 17 00:00:00 2001 From: Martin Ling Date: Mon, 13 May 2024 18:59:42 +0100 Subject: [PATCH 1/6] Pass error handler to capture thread, instead of errors through channel. Currently, if errors occur during capture, the capture thread sends them to the decoder thread, via the same channel used for packets. This allows errors to be caught in the decoder thread and shown immediately to the user, even when the the stop button has not yet been pressed, and therefore the capture thread not yet joined. This is unnecessarily complicated though, and can currently lead to some errors being shown twice (where both sent to the decoder thread and returned in the capture thread), or not being shown (where returned in the capture thread, but not sent to the decoder thread). Instead, let's just pass a handler function to LunaHandle::start(), which will be called by the capture thread as it ends. If an error occured, it is then signalled to the user immediately, without the decoder thread needing to be involved. When joining the capture thread, it is still necessary to check if the thread panicked, but any normal error will already have been handled. We can use the existing display_error() function, which already takes care of accessing the UI from whichever thread it is called from. --- src/backend/luna.rs | 24 +++++++++++------------- src/ui.rs | 5 +++-- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/src/backend/luna.rs b/src/backend/luna.rs index 60c1db92..b02ff439 100644 --- a/src/backend/luna.rs +++ b/src/backend/luna.rs @@ -84,12 +84,12 @@ pub struct LunaHandle { } pub struct LunaStream { - receiver: Receiver, rusb::Error>>, + receiver: Receiver>, } pub struct LunaStop { stop_request: Sender<()>, - worker: JoinHandle::>, + worker: JoinHandle::<()>, } impl LunaDevice { @@ -160,13 +160,14 @@ impl LunaHandle { Ok(speeds) } - pub fn start(mut self, speed: Speed) + pub fn start(mut self, speed: Speed, result_handler: F) -> Result<(LunaStream, LunaStop), Error> + where F: FnOnce(Result<(), Error>) + Send + 'static { self.usb_handle.claim_interface(0)?; let (tx, rx) = channel(); let (stop_tx, stop_rx) = channel(); - let worker = spawn(move || { + let mut run_capture = move || { let mut buffer = [0u8; READ_LEN]; let mut packet_queue = PacketQueue::new(); let mut state = State::new(true, speed); @@ -179,23 +180,20 @@ impl LunaHandle { Ok(count) => { packet_queue.extend(&buffer[..count]); while let Some(packet) = packet_queue.next() { - tx.send(Ok(packet)) + tx.send(packet) .context("Failed sending packet to channel")?; }; }, Err(rusb::Error::Timeout) => continue, - Err(usb_error) => { - tx.send(Err(usb_error)) - .context("Failed sending error to channel")?; - return Err(Error::from(usb_error)); - } + Err(usb_error) => return Err(Error::from(usb_error)) } } state.set_enable(false); self.write_state(state)?; println!("Capture disabled"); Ok(()) - }); + }; + let worker = spawn(move || result_handler(run_capture())); Ok(( LunaStream { receiver: rx, @@ -222,7 +220,7 @@ impl LunaHandle { } impl LunaStream { - pub fn next(&mut self) -> Option, rusb::Error>> { + pub fn next(&mut self) -> Option> { self.receiver.recv().ok() } } @@ -232,7 +230,7 @@ impl LunaStop { println!("Requesting capture stop"); self.stop_request.send(()).context("Failed sending stop request")?; match self.worker.join() { - Ok(result) => result, + Ok(()) => Ok(()), Err(panic) => { let msg = match ( panic.downcast_ref::<&str>(), diff --git a/src/ui.rs b/src/ui.rs index d0546573..17f87676 100644 --- a/src/ui.rs +++ b/src/ui.rs @@ -791,7 +791,8 @@ pub fn start_luna() -> Result<(), Error> { let writer = reset_capture()?; with_ui(|ui| { let (luna, speed) = ui.selector.open()?; - let (mut stream_handle, stop_handle) = luna.start(speed)?; + let (mut stream_handle, stop_handle) = + luna.start(speed, display_error)?; ui.stop_handle.replace(stop_handle); ui.open_button.set_sensitive(false); ui.scan_button.set_sensitive(false); @@ -803,7 +804,7 @@ pub fn start_luna() -> Result<(), Error> { let read_luna = move || { let mut decoder = Decoder::new(writer)?; while let Some(packet) = stream_handle.next() { - decoder.handle_raw_packet(&packet?)?; + decoder.handle_raw_packet(&packet)?; } decoder.finish()?; Ok(()) From 47a0b3f4511b704f55b7a16dd98653a0c9af6d61 Mon Sep 17 00:00:00 2001 From: Martin Ling Date: Mon, 13 May 2024 14:06:14 +0100 Subject: [PATCH 2/6] Move work splitting capture data into packets to decoder thread. Rather than the capture thread sending individual packets to the decoder thread, send the contents of entire USB transfers, and do the work of picking out individual packets in the decoder thread. --- src/backend/luna.rs | 78 +++++++++++++++++++++++---------------------- 1 file changed, 40 insertions(+), 38 deletions(-) diff --git a/src/backend/luna.rs b/src/backend/luna.rs index b02ff439..4f654d98 100644 --- a/src/backend/luna.rs +++ b/src/backend/luna.rs @@ -85,6 +85,7 @@ pub struct LunaHandle { pub struct LunaStream { receiver: Receiver>, + buffer: VecDeque, } pub struct LunaStop { @@ -169,7 +170,6 @@ impl LunaHandle { let (stop_tx, stop_rx) = channel(); let mut run_capture = move || { let mut buffer = [0u8; READ_LEN]; - let mut packet_queue = PacketQueue::new(); let mut state = State::new(true, speed); self.write_state(state)?; println!("Capture enabled, speed: {}", speed.description()); @@ -178,11 +178,8 @@ impl LunaHandle { ENDPOINT, &mut buffer, Duration::from_millis(100)); match result { Ok(count) => { - packet_queue.extend(&buffer[..count]); - while let Some(packet) = packet_queue.next() { - tx.send(packet) - .context("Failed sending packet to channel")?; - }; + tx.send(buffer[..count].to_vec()) + .context("Failed sending capture data to channel")?; }, Err(rusb::Error::Timeout) => continue, Err(usb_error) => return Err(Error::from(usb_error)) @@ -197,6 +194,7 @@ impl LunaHandle { Ok(( LunaStream { receiver: rx, + buffer: VecDeque::new(), }, LunaStop { stop_request: stop_tx, @@ -220,8 +218,43 @@ impl LunaHandle { } impl LunaStream { + pub fn next(&mut self) -> Option> { - self.receiver.recv().ok() + loop { + // Do we have another packet already in the buffer? + match self.next_buffered_packet() { + // Yes; return the packet. + Some(packet) => return Some(packet), + // No; wait for more data from the capture thread. + None => match self.receiver.recv().ok() { + // Received more data; add it to the buffer and retry. + Some(bytes) => self.buffer.extend(bytes.iter()), + // Capture has ended, there are no more packets. + None => return None + } + } + } + } + + fn next_buffered_packet(&mut self) -> Option> { + // Do we have the length header for the next packet? + let buffer_len = self.buffer.len(); + if buffer_len <= 2 { + return None; + } + + // Do we have all the data for the next packet? + let packet_len = u16::from_be_bytes( + [self.buffer[0], self.buffer[1]]) as usize; + if buffer_len <= 2 + packet_len { + return None; + } + + // Remove the length header from the buffer. + self.buffer.drain(0..2); + + // Remove the packet from the buffer and return it. + Some(self.buffer.drain(0..packet_len).collect()) } } @@ -245,34 +278,3 @@ impl LunaStop { } } } - -struct PacketQueue { - buffer: VecDeque, -} - -impl PacketQueue { - pub fn new() -> Self { - PacketQueue { - buffer: VecDeque::new(), - } - } - - pub fn extend(&mut self, slice: &[u8]) { - self.buffer.extend(slice.iter()); - } - - pub fn next(&mut self) -> Option> { - let buffer_len = self.buffer.len(); - if buffer_len <= 2 { - return None; - } - let packet_len = u16::from_be_bytes([self.buffer[0], self.buffer[1]]) as usize; - if buffer_len <= 2 + packet_len { - return None; - } - - self.buffer.drain(0..2); - - Some(self.buffer.drain(0..packet_len).collect()) - } -} From 817ed656e2636e57ceb15f57ea46668b73033144 Mon Sep 17 00:00:00 2001 From: Martin Ling Date: Mon, 13 May 2024 14:38:28 +0100 Subject: [PATCH 3/6] Allocate a new buffer for each bulk transfer. We need to end up with a new Vec for each transfer anyway, in order to send it to the decoder thread. But we can save a copy by capturing directly into that new Vec, rather than copying from an existing buffer. --- src/backend/luna.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/backend/luna.rs b/src/backend/luna.rs index 4f654d98..12016fe5 100644 --- a/src/backend/luna.rs +++ b/src/backend/luna.rs @@ -169,16 +169,17 @@ impl LunaHandle { let (tx, rx) = channel(); let (stop_tx, stop_rx) = channel(); let mut run_capture = move || { - let mut buffer = [0u8; READ_LEN]; let mut state = State::new(true, speed); self.write_state(state)?; println!("Capture enabled, speed: {}", speed.description()); while stop_rx.try_recv().is_err() { + let mut buffer = vec![0u8; READ_LEN]; let result = self.usb_handle.read_bulk( ENDPOINT, &mut buffer, Duration::from_millis(100)); match result { Ok(count) => { - tx.send(buffer[..count].to_vec()) + buffer.truncate(count); + tx.send(buffer) .context("Failed sending capture data to channel")?; }, Err(rusb::Error::Timeout) => continue, From ab031995cace651faace5906a9562eb23080a2e7 Mon Sep 17 00:00:00 2001 From: Martin Ling Date: Mon, 13 May 2024 19:32:30 +0100 Subject: [PATCH 4/6] Implement Iterator for LunaStream. --- src/backend/luna.rs | 7 +++++-- src/ui.rs | 4 ++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/backend/luna.rs b/src/backend/luna.rs index 12016fe5..b3a86a0e 100644 --- a/src/backend/luna.rs +++ b/src/backend/luna.rs @@ -218,9 +218,10 @@ impl LunaHandle { } } -impl LunaStream { +impl Iterator for LunaStream { + type Item = Vec; - pub fn next(&mut self) -> Option> { + fn next(&mut self) -> Option> { loop { // Do we have another packet already in the buffer? match self.next_buffered_packet() { @@ -236,7 +237,9 @@ impl LunaStream { } } } +} +impl LunaStream { fn next_buffered_packet(&mut self) -> Option> { // Do we have the length header for the next packet? let buffer_len = self.buffer.len(); diff --git a/src/ui.rs b/src/ui.rs index 17f87676..875c3a95 100644 --- a/src/ui.rs +++ b/src/ui.rs @@ -791,7 +791,7 @@ pub fn start_luna() -> Result<(), Error> { let writer = reset_capture()?; with_ui(|ui| { let (luna, speed) = ui.selector.open()?; - let (mut stream_handle, stop_handle) = + let (stream_handle, stop_handle) = luna.start(speed, display_error)?; ui.stop_handle.replace(stop_handle); ui.open_button.set_sensitive(false); @@ -803,7 +803,7 @@ pub fn start_luna() -> Result<(), Error> { display_error(stop_luna())); let read_luna = move || { let mut decoder = Decoder::new(writer)?; - while let Some(packet) = stream_handle.next() { + for packet in stream_handle { decoder.handle_raw_packet(&packet)?; } decoder.finish()?; From c75c775b9d430be7e4fcfff786a022c1307da75c Mon Sep 17 00:00:00 2001 From: Martin Ling Date: Mon, 13 May 2024 14:57:28 +0100 Subject: [PATCH 5/6] Initial port from rusb to nusb, using synchronous operations only. --- Cargo.lock | 213 +++++++++++++++++++++++++++++++++++--------- Cargo.toml | 5 +- src/backend/luna.rs | 166 ++++++++++++++++++---------------- src/ui.rs | 10 +-- 4 files changed, 265 insertions(+), 129 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a4903ff9..3f08cb9d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -38,6 +38,12 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "autocfg" version = "1.1.0" @@ -172,6 +178,22 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" +[[package]] +name = "core-foundation" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "core-foundation-sys" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" + [[package]] name = "derive-into-owned" version = "0.2.0" @@ -215,7 +237,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.52.0", ] [[package]] @@ -266,6 +288,19 @@ version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" +[[package]] +name = "futures-lite" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "445ba825b27408685aaecefd65178908c36c6e96aaf6d8599419d46e624192ba" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "parking", + "pin-project-lite", +] + [[package]] name = "futures-macro" version = "0.3.21" @@ -399,7 +434,7 @@ dependencies = [ "gobject-sys", "libc", "system-deps", - "windows-sys", + "windows-sys 0.52.0", ] [[package]] @@ -597,6 +632,16 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "io-kit-sys" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4769cb30e5dcf1710fc6730d3e94f78c47723a014a567de385e113c737394640" +dependencies = [ + "core-foundation-sys", + "mach2", +] + [[package]] name = "itertools" version = "0.12.1" @@ -624,30 +669,33 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" -[[package]] -name = "libusb1-sys" -version = "0.6.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9d0e2afce4245f2c9a418511e5af8718bcaf2fa408aefb259504d1a9cb25f27" -dependencies = [ - "cc", - "libc", - "pkg-config", - "vcpkg", -] - [[package]] name = "linux-raw-sys" version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" +[[package]] +name = "log" +version = "0.4.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" + [[package]] name = "lrumap" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1251ce8e8f9909600e127dcbe74ac50d8464e6685cf5953d37df7a741dbf9e9d" +[[package]] +name = "mach2" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19b955cdeb2a02b9117f121ce63aa52d08ade45de53e48fe6a38b39c10f6f709" +dependencies = [ + "libc", +] + [[package]] name = "memchr" version = "2.7.1" @@ -712,6 +760,23 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "nusb" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66354494bb95f979c053fd988dfeb7c9f1cc8c91544139bce2b3d1765be9571a" +dependencies = [ + "atomic-waker", + "core-foundation", + "core-foundation-sys", + "io-kit-sys", + "log", + "once_cell", + "rustix", + "slab", + "windows-sys 0.48.0", +] + [[package]] name = "object" version = "0.30.4" @@ -739,6 +804,9 @@ dependencies = [ "bytemuck", "bytemuck_derive", "derive_more", + "futures-channel", + "futures-lite", + "futures-util", "gtk4", "humansize", "itertools", @@ -746,12 +814,12 @@ dependencies = [ "memmap2", "num-format", "num_enum", + "nusb", "once_cell", "page_size", "pcap-file", "rand", "rand_xorshift", - "rusb", "serde", "serde_json", "tempfile", @@ -791,6 +859,12 @@ dependencies = [ "system-deps", ] +[[package]] +name = "parking" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" + [[package]] name = "pcap-file" version = "2.0.0" @@ -934,16 +1008,6 @@ dependencies = [ "bitflags 1.3.2", ] -[[package]] -name = "rusb" -version = "0.9.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45fff149b6033f25e825cbb7b2c625a11ee8e6dac09264d49beb125e39aa97bf" -dependencies = [ - "libc", - "libusb1-sys", -] - [[package]] name = "rustc-demangle" version = "0.1.23" @@ -978,7 +1042,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys", + "windows-sys 0.52.0", ] [[package]] @@ -1044,9 +1108,12 @@ dependencies = [ [[package]] name = "slab" -version = "0.4.5" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9def91fd1e018fe007022791f865d0ccc9b3a0d5001e01aabb8b40e46000afb5" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] [[package]] name = "smallvec" @@ -1099,7 +1166,7 @@ dependencies = [ "fastrand", "redox_syscall", "rustix", - "windows-sys", + "windows-sys 0.52.0", ] [[package]] @@ -1166,12 +1233,6 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" -[[package]] -name = "vcpkg" -version = "0.2.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" - [[package]] name = "version-compare" version = "0.1.0" @@ -1212,13 +1273,37 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets 0.48.5", +] + [[package]] name = "windows-sys" version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets", + "windows-targets 0.52.0", +] + +[[package]] +name = "windows-targets" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +dependencies = [ + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", ] [[package]] @@ -1227,51 +1312,93 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a18201040b24831fbb9e4eb208f8892e1f50a37feb53cc7ff887feb8f50e7cd" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_aarch64_gnullvm 0.52.0", + "windows_aarch64_msvc 0.52.0", + "windows_i686_gnu 0.52.0", + "windows_i686_msvc 0.52.0", + "windows_x86_64_gnu 0.52.0", + "windows_x86_64_gnullvm 0.52.0", + "windows_x86_64_msvc 0.52.0", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea" +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" + [[package]] name = "windows_aarch64_msvc" version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef" +[[package]] +name = "windows_i686_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" + [[package]] name = "windows_i686_gnu" version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313" +[[package]] +name = "windows_i686_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" + [[package]] name = "windows_i686_msvc" version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a" +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" + [[package]] name = "windows_x86_64_gnu" version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" + [[package]] name = "windows_x86_64_gnullvm" version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e" +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" + [[package]] name = "windows_x86_64_msvc" version = "0.52.0" diff --git a/Cargo.toml b/Cargo.toml index 140903fe..e3138ee1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,10 @@ num-format = "0.4.4" humansize = "2.1.3" bisection = "0.1.0" derive_more = "0.99.17" -rusb = "0.9.3" +nusb = "0.1.9" +futures-lite = "2.0.1" +futures-channel = "0.3.21" +futures-util = "0.3.21" serde = { version = "1.0.196", optional = true, features = ["derive"] } serde_json = { version = "1.0.113", optional = true } itertools = "0.12.1" diff --git a/src/backend/luna.rs b/src/backend/luna.rs index b3a86a0e..f6ebe703 100644 --- a/src/backend/luna.rs +++ b/src/backend/luna.rs @@ -1,23 +1,29 @@ use std::collections::VecDeque; use std::thread::{spawn, JoinHandle}; -use std::sync::mpsc::{channel, Sender, Receiver}; use std::time::Duration; +use std::sync::mpsc; use anyhow::{Context as ErrorContext, Error, bail}; +use futures_channel::oneshot; +use futures_lite::future::block_on; use num_enum::{FromPrimitive, IntoPrimitive}; -use rusb::{ - Context, - Device, - DeviceHandle, - UsbContext, - Version +use nusb::{ + self, + transfer::{ + Control, + ControlType, + Recipient, + RequestBuffer, + }, + DeviceInfo, + Interface }; const VID: u16 = 0x1d50; const PID: u16 = 0x615b; -const MIN_SUPPORTED: Version = Version(0, 0, 2); -const NOT_SUPPORTED: Version = Version(0, 0, 3); +const MIN_SUPPORTED: u16 = 0x0002; +const NOT_SUPPORTED: u16 = 0x0003; const ENDPOINT: u8 = 0x81; @@ -73,38 +79,48 @@ impl State { /// A Luna device attached to the system. pub struct LunaDevice { - usb_device: Device, + device_info: DeviceInfo, pub description: String, pub speeds: Vec, } /// A handle to an open Luna device. pub struct LunaHandle { - usb_handle: DeviceHandle, + interface: Interface, } pub struct LunaStream { - receiver: Receiver>, + receiver: mpsc::Receiver>, buffer: VecDeque, } pub struct LunaStop { - stop_request: Sender<()>, + stop_request: oneshot::Sender<()>, worker: JoinHandle::<()>, } impl LunaDevice { - pub fn scan(context: &mut Context) -> Result, Error> { - let devices = context.devices()?; - let mut result = Vec::with_capacity(devices.len()); - for usb_device in devices.iter() { - let desc = usb_device.device_descriptor()?; - if desc.vendor_id() == VID && desc.product_id() == PID { - let handle = LunaHandle::new(usb_device.open()?)?; - let description = handle.description()?; + pub fn scan() -> Result, Error> { + let mut result = Vec::new(); + for device_info in nusb::list_devices()? { + if device_info.vendor_id() == VID && + device_info.product_id() == PID + { + let version = device_info.device_version(); + if !(MIN_SUPPORTED..=NOT_SUPPORTED).contains(&version) { + continue; + } + let manufacturer = device_info + .manufacturer_string() + .unwrap_or("Unknown"); + let product = device_info + .product_string() + .unwrap_or("Device"); + let description = format!("{} {}", manufacturer, product); + let handle = LunaHandle::new(&device_info)?; let speeds = handle.speeds()?; result.push(LunaDevice{ - usb_device, + device_info, description, speeds, }) @@ -114,45 +130,35 @@ impl LunaDevice { } pub fn open(&self) -> Result { - LunaHandle::new(self.usb_device.open()?) + LunaHandle::new(&self.device_info) } } impl LunaHandle { - fn new(usb_handle: DeviceHandle) -> Result { - let version = usb_handle - .device() - .device_descriptor()? - .device_version(); - if version >= MIN_SUPPORTED && version < NOT_SUPPORTED { - Ok(Self { usb_handle }) - } else { - bail!("Unsupported analyzer version: Gateware version is {version}. \ - Supported range is {MIN_SUPPORTED} or higher, \ - but not {NOT_SUPPORTED} or higher") - } - } - - pub fn description(&self) -> Result { - let desc = self.usb_handle.device().device_descriptor()?; - let manufacturer = self.usb_handle.read_manufacturer_string_ascii(&desc)?; - let product = self.usb_handle.read_product_string_ascii(&desc)?; - Ok(format!("{} {}", manufacturer, product)) + fn new(device_info: &DeviceInfo) -> Result { + let device = device_info.open()?; + let interface = device.claim_interface(0)?; + Ok(LunaHandle { interface }) } pub fn speeds(&self) -> Result, Error> { - use rusb::{Direction, RequestType, Recipient, request_type}; - let mut buf = [0u8]; - self.usb_handle.read_control( - request_type(Direction::In, RequestType::Vendor, Recipient::Device), - 2, - 0, - 0, - &mut buf, - Duration::from_secs(5), - )?; - let mut speeds = vec![]; use Speed::*; + let control = Control { + control_type: ControlType::Vendor, + recipient: Recipient::Device, + request: 2, + value: 0, + index: 0, + }; + let mut buf = [0; 64]; + let timeout = Duration::from_secs(1); + let size = self.interface + .control_in_blocking(control, &mut buf, timeout) + .context("Failed retrieving supported speeds from device")?; + if size != 1 { + bail!("Expected 1-byte response to speed request, got {size}"); + } + let mut speeds = Vec::new(); for speed in [Auto, High, Full, Low] { if buf[0] & speed.mask() != 0 { speeds.push(speed); @@ -165,27 +171,31 @@ impl LunaHandle { -> Result<(LunaStream, LunaStop), Error> where F: FnOnce(Result<(), Error>) + Send + 'static { - self.usb_handle.claim_interface(0)?; - let (tx, rx) = channel(); - let (stop_tx, stop_rx) = channel(); + // Channel to pass captured data to the decoder thread. + let (tx, rx) = mpsc::channel(); + // Channel to stop the capture thread on request. + let (stop_tx, mut stop_rx) = oneshot::channel(); + // Capture thread. let mut run_capture = move || { let mut state = State::new(true, speed); self.write_state(state)?; println!("Capture enabled, speed: {}", speed.description()); - while stop_rx.try_recv().is_err() { - let mut buffer = vec![0u8; READ_LEN]; - let result = self.usb_handle.read_bulk( - ENDPOINT, &mut buffer, Duration::from_millis(100)); - match result { - Ok(count) => { - buffer.truncate(count); - tx.send(buffer) + while stop_rx.try_recv() == Ok(None) { + let buffer = RequestBuffer::new(READ_LEN); + let completion = block_on(self.interface.bulk_in(ENDPOINT, buffer)); + match completion.status { + Ok(()) => { + // Transfer successful. Send data to decoder thread. + tx.send(completion.data) .context("Failed sending capture data to channel")?; }, - Err(rusb::Error::Timeout) => continue, - Err(usb_error) => return Err(Error::from(usb_error)) + Err(usb_error) => { + // Transfer failed. + return Err(Error::from(usb_error)); + } } } + // Stop capture. state.set_enable(false); self.write_state(state)?; println!("Capture disabled"); @@ -205,15 +215,18 @@ impl LunaHandle { } fn write_state(&mut self, state: State) -> Result<(), Error> { - use rusb::{Direction, RequestType, Recipient, request_type}; - self.usb_handle.write_control( - request_type(Direction::Out, RequestType::Vendor, Recipient::Device), - 1, - u16::from(state.0), - 0, - &[], - Duration::from_secs(5), - )?; + let control = Control { + control_type: ControlType::Vendor, + recipient: Recipient::Device, + request: 1, + value: u16::from(state.0), + index: 0, + }; + let data = &[]; + let timeout = Duration::from_secs(1); + self.interface + .control_out_blocking(control, data, timeout) + .context("Failed writing state to device")?; Ok(()) } } @@ -265,7 +278,8 @@ impl LunaStream { impl LunaStop { pub fn stop(self) -> Result<(), Error> { println!("Requesting capture stop"); - self.stop_request.send(()).context("Failed sending stop request")?; + self.stop_request.send(()) + .or_else(|_| bail!("Failed sending stop request"))?; match self.worker.join() { Ok(()) => Ok(()), Err(panic) => { diff --git a/src/ui.rs b/src/ui.rs index 875c3a95..e1c22bcd 100644 --- a/src/ui.rs +++ b/src/ui.rs @@ -47,8 +47,6 @@ use pcap_file::{ pcap::{PcapReader, PcapWriter, PcapHeader, RawPcapPacket}, }; -use rusb::Context; - use crate::backend::luna::{LunaDevice, LunaHandle, LunaStop, Speed}; use crate::capture::{ create_capture, @@ -95,7 +93,6 @@ enum FileAction { } struct DeviceSelector { - usb_context: Option, devices: Vec, dev_strings: Vec, dev_speeds: Vec>, @@ -107,7 +104,6 @@ struct DeviceSelector { impl DeviceSelector { fn new() -> Result { let selector = DeviceSelector { - usb_context: Context::new().ok(), devices: vec![], dev_strings: vec![], dev_speeds: vec![], @@ -144,11 +140,7 @@ impl DeviceSelector { } fn scan(&mut self) -> Result { - self.devices = if let Some(context) = self.usb_context.as_mut() { - LunaDevice::scan(context)? - } else { - vec![] - }; + self.devices = LunaDevice::scan()?; self.dev_strings = Vec::with_capacity(self.devices.len()); self.dev_speeds = Vec::with_capacity(self.devices.len()); for device in self.devices.iter() { From 5b6a6d6de1647a39ffb7e5f0d23a94ddbd3581b5 Mon Sep 17 00:00:00 2001 From: Martin Ling Date: Mon, 13 May 2024 15:36:16 +0100 Subject: [PATCH 6/6] Use Queue API to pipeline data transfers. --- src/backend/luna.rs | 68 +++++++++++++++++++++++++++++++++++---------- 1 file changed, 53 insertions(+), 15 deletions(-) diff --git a/src/backend/luna.rs b/src/backend/luna.rs index f6ebe703..9a97b109 100644 --- a/src/backend/luna.rs +++ b/src/backend/luna.rs @@ -6,6 +6,7 @@ use std::sync::mpsc; use anyhow::{Context as ErrorContext, Error, bail}; use futures_channel::oneshot; use futures_lite::future::block_on; +use futures_util::{select_biased, FutureExt}; use num_enum::{FromPrimitive, IntoPrimitive}; use nusb::{ self, @@ -14,6 +15,7 @@ use nusb::{ ControlType, Recipient, RequestBuffer, + TransferError, }, DeviceInfo, Interface @@ -28,6 +30,7 @@ const NOT_SUPPORTED: u16 = 0x0003; const ENDPOINT: u8 = 0x81; const READ_LEN: usize = 0x4000; +const NUM_TRANSFERS: usize = 4; #[derive(Copy, Clone, FromPrimitive, IntoPrimitive)] #[repr(u8)] @@ -176,25 +179,60 @@ impl LunaHandle { // Channel to stop the capture thread on request. let (stop_tx, mut stop_rx) = oneshot::channel(); // Capture thread. - let mut run_capture = move || { + let run_capture = move || { let mut state = State::new(true, speed); self.write_state(state)?; println!("Capture enabled, speed: {}", speed.description()); - while stop_rx.try_recv() == Ok(None) { - let buffer = RequestBuffer::new(READ_LEN); - let completion = block_on(self.interface.bulk_in(ENDPOINT, buffer)); - match completion.status { - Ok(()) => { - // Transfer successful. Send data to decoder thread. - tx.send(completion.data) - .context("Failed sending capture data to channel")?; - }, - Err(usb_error) => { - // Transfer failed. - return Err(Error::from(usb_error)); - } - } + let mut stopped = false; + + // Set up transfer queue. + let mut data_transfer_queue = self.interface.bulk_in_queue(ENDPOINT); + while data_transfer_queue.pending() < NUM_TRANSFERS { + data_transfer_queue.submit(RequestBuffer::new(READ_LEN)); } + + // Set up capture task. + let capture_task = async move { + loop { + select_biased!( + _ = stop_rx => { + // Capture stop requested. Cancel all transfers. + data_transfer_queue.cancel_all(); + stopped = true; + } + completion = data_transfer_queue.next_complete().fuse() => { + match completion.status { + Ok(()) => { + // Transfer successful. + if !stopped { + // Send data to decoder thread. + tx.send(completion.data) + .context("Failed sending capture data to channel")?; + // Submit next transfer. + data_transfer_queue.submit(RequestBuffer::new(READ_LEN)); + } + }, + Err(TransferError::Cancelled) if stopped => { + // Transfer cancelled during shutdown. Drop it. + drop(completion); + if data_transfer_queue.pending() == 0 { + // All cancellations now handled. + return Ok(()); + } + }, + Err(usb_error) => { + // Transfer failed. + return Err(Error::from(usb_error)); + } + } + } + ); + } + }; + + // Run capture task to completion. + block_on(capture_task)?; + // Stop capture. state.set_enable(false); self.write_state(state)?;