Skip to content

Commit

Permalink
backend: Define a common API for backends, and use it.
Browse files Browse the repository at this point in the history
  • Loading branch information
martinling committed Sep 23, 2024
1 parent 71f9da8 commit bcc5945
Show file tree
Hide file tree
Showing 5 changed files with 410 additions and 347 deletions.
220 changes: 90 additions & 130 deletions src/backend/cynthion.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
use std::cmp::Ordering;
use std::collections::VecDeque;
use std::thread::{spawn, sleep};
use std::time::Duration;
use std::sync::mpsc;

use anyhow::{Context as ErrorContext, Error, bail};
use futures_channel::oneshot;
use futures_lite::future::block_on;
use nusb::{
self,
transfer::{
Expand All @@ -18,23 +15,21 @@ use nusb::{
Interface
};

use crate::util::handle_thread_panic;

use super::{
transfer_queue::TransferQueue,
BackendStop, DeviceUsability, InterfaceSelection, Speed, TimestampedPacket,
BackendDevice,
BackendHandle,
Speed,
PacketIterator,
PacketResult,
TimestampedPacket,
TransferQueue,
};
use super::DeviceUsability::*;

const VID: u16 = 0x1d50;
const PID: u16 = 0x615b;

pub const VID_PID: (u16, u16) = (0x1d50, 0x615b);
const CLASS: u8 = 0xff;
const SUBCLASS: u8 = 0x10;
const PROTOCOL: u8 = 0x01;

const ENDPOINT: u8 = 0x81;

const READ_LEN: usize = 0x4000;
const NUM_TRANSFERS: usize = 4;

Expand Down Expand Up @@ -79,8 +74,10 @@ impl TestConfig {

/// A Cynthion device attached to the system.
pub struct CynthionDevice {
pub device_info: DeviceInfo,
pub usability: DeviceUsability,
device_info: DeviceInfo,
interface_number: u8,
alt_setting_number: u8,
speeds: Vec<Speed>,
}

/// A handle to an open Cynthion device.
Expand All @@ -89,6 +86,7 @@ pub struct CynthionHandle {
interface: Interface,
}

/// Converts from received data bytes to timestamped packets.
pub struct CynthionStream {
receiver: mpsc::Receiver<Vec<u8>>,
buffer: VecDeque<u8>,
Expand All @@ -104,11 +102,15 @@ fn clk_to_ns(clk_cycles: u64) -> u64 {
quotient * 50 + TABLE[remainder as usize]
}

/// Probe a Cynthion device.
pub fn probe(device_info: DeviceInfo) -> Result<Box<dyn BackendDevice>, Error> {
Ok(Box::new(CynthionDevice::new(device_info)?))
}

impl CynthionDevice {
/// Check whether a Cynthion device has an accessible analyzer interface.
fn check_device(device_info: &DeviceInfo)
-> Result<(InterfaceSelection, Vec<Speed>), Error>
{
pub fn new(device_info: DeviceInfo) -> Result<CynthionDevice, Error> {

// Check we can open the device.
let device = device_info
.open()
Expand Down Expand Up @@ -164,56 +166,83 @@ impl CynthionDevice {
.context("Failed to fetch available speeds")?;

// Now we have a usable device.
return Ok((
InterfaceSelection {
return Ok(
CynthionDevice {
device_info,
interface_number,
alt_setting_number,
},
speeds
))
speeds,
}
)
}
}

bail!("No supported analyzer interface found");
}

pub fn scan() -> Result<Vec<CynthionDevice>, Error> {
Ok(nusb::list_devices()?
.filter(|info| info.vendor_id() == VID)
.filter(|info| info.product_id() == PID)
.map(|device_info|
match CynthionDevice::check_device(&device_info) {
Ok((iface, speeds)) => CynthionDevice {
device_info,
usability: Usable(iface, speeds)
},
Err(err) => CynthionDevice {
device_info,
usability: Unusable(format!("{}", err))
}
}
)
.collect())
}

/// Open this device.
pub fn open(&self) -> Result<CynthionHandle, Error> {
match &self.usability {
Usable(iface, _) => {
let device = self.device_info.open()?;
let interface = device.claim_interface(iface.interface_number)?;
if iface.alt_setting_number != 0 {
interface.set_alt_setting(iface.alt_setting_number)?;
}
Ok(CynthionHandle { interface })
},
Unusable(reason) => bail!("Device not usable: {}", reason),
let device = self.device_info.open()?;
let interface = device.claim_interface(self.interface_number)?;
if self.alt_setting_number != 0 {
interface.set_alt_setting(self.alt_setting_number)?;
}
Ok(CynthionHandle { interface })
}
}

impl BackendDevice for CynthionDevice {
fn open_as_generic(&self) -> Result<Box<dyn BackendHandle>, Error> {
Ok(Box::new(self.open()?))
}

fn supported_speeds(&self) -> &[Speed] {
&self.speeds
}
}

impl BackendHandle for CynthionHandle {
fn begin_capture(
&mut self,
speed: Speed,
data_tx: mpsc::Sender<Vec<u8>>
) -> Result<TransferQueue, Error>
{
self.start_capture(speed)?;

Ok(TransferQueue::new(&self.interface, data_tx,
ENDPOINT, NUM_TRANSFERS, READ_LEN))
}

fn end_capture(&mut self) -> Result<(), Error> {
self.stop_capture()
}

fn post_capture(&mut self) -> Result<(), Error> {
Ok(())
}

fn timestamped_packets(&self, data_rx: mpsc::Receiver<Vec<u8>>)
-> Box<dyn PacketIterator>
{
Box::new(
CynthionStream {
receiver: data_rx,
buffer: VecDeque::new(),
padding_due: false,
total_clk_cycles: 0,
}
)
}

fn duplicate(&self) -> Box<dyn BackendHandle> {
Box::new(self.clone())
}
}

impl CynthionHandle {

pub fn speeds(&self) -> Result<Vec<Speed>, Error> {
fn speeds(&self) -> Result<Vec<Speed>, Error> {
use Speed::*;
let control = Control {
control_type: ControlType::Vendor,
Expand All @@ -239,82 +268,12 @@ impl CynthionHandle {
Ok(speeds)
}

pub fn start<F>(&self, speed: Speed, result_handler: F)
-> Result<(CynthionStream, BackendStop), Error>
where F: FnOnce(Result<(), Error>) + Send + 'static
{
// Channel to pass captured data to the decoder thread.
let (tx, rx) = mpsc::channel();
// Channel to stop the capture thread on request.
let (stop_tx, stop_rx) = oneshot::channel();
// Clone handle to give to the worker thread.
let handle = self.clone();
// Start worker thread.
let worker = spawn(move ||
result_handler(
handle.run_capture(speed, tx, stop_rx)));
Ok((
CynthionStream {
receiver: rx,
buffer: VecDeque::new(),
padding_due: false,
total_clk_cycles: 0,
},
BackendStop {
stop_request: stop_tx,
worker,
}
))
}

fn run_capture(mut self,
speed: Speed,
tx: mpsc::Sender<Vec<u8>>,
stop: oneshot::Receiver<()>)
-> Result<(), Error>
{
// Set up a separate channel pair to stop queue processing.
let (queue_stop_tx, queue_stop_rx) = oneshot::channel();

// Start capture.
self.start_capture(speed)?;

// Set up transfer queue.
let mut queue = TransferQueue::new(&self.interface, tx,
ENDPOINT, NUM_TRANSFERS, READ_LEN);

// Spawn a worker thread to process queue until stopped.
let worker = spawn(move || block_on(queue.process(queue_stop_rx)));

// Wait until this thread is signalled to stop.
block_on(stop)
.context("Sender was dropped")?;

// Stop capture.
self.stop_capture()?;

// Leave queue worker running briefly to receive flushed data.
sleep(Duration::from_millis(100));

// Signal queue processing to stop, then join the worker thread.
queue_stop_tx.send(())
.or_else(|_| bail!("Failed sending stop signal to queue worker"))?;
handle_thread_panic(worker.join())?
.context("Error in queue worker thread")?;

Ok(())
}

fn start_capture(&mut self, speed: Speed) -> Result<(), Error> {
self.write_request(1, State::new(true, speed).0)?;
println!("Capture enabled, speed: {}", speed.description());
Ok(())
fn start_capture (&mut self, speed: Speed) -> Result<(), Error> {
self.write_request(1, State::new(true, speed).0)
}

fn stop_capture(&mut self) -> Result<(), Error> {
self.write_request(1, State::new(false, Speed::High).0)?;
println!("Capture disabled");
Ok(())
self.write_request(1, State::new(false, Speed::High).0)
}

pub fn configure_test_device(&mut self, speed: Option<Speed>)
Expand Down Expand Up @@ -342,15 +301,16 @@ impl CynthionHandle {
}
}

impl Iterator for CynthionStream {
type Item = TimestampedPacket;
impl PacketIterator for CynthionStream {}

fn next(&mut self) -> Option<TimestampedPacket> {
impl Iterator for CynthionStream {
type Item = PacketResult;
fn next(&mut self) -> Option<PacketResult> {
loop {
// Do we have another packet already in the buffer?
match self.next_buffered_packet() {
// Yes; return the packet.
Some(packet) => return Some(packet),
Some(packet) => return Some(Ok(packet)),
// No; wait for more data from the capture thread.
None => match self.receiver.recv().ok() {
// Received more data; add it to the buffer and retry.
Expand Down
Loading

0 comments on commit bcc5945

Please sign in to comment.