Skip to content

Commit

Permalink
Merge pull request #108 from martinling/timestamps
Browse files Browse the repository at this point in the history
Add timestamping
  • Loading branch information
miek authored Jul 3, 2024
2 parents f08089f + 0457bba commit 828d667
Show file tree
Hide file tree
Showing 10 changed files with 331 additions and 60 deletions.
89 changes: 76 additions & 13 deletions src/backend/cynthion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ const PID: u16 = 0x615b;

const CLASS: u8 = 0xff;
const SUBCLASS: u8 = 0x10;
const PROTOCOL: u8 = 0x00;
const PROTOCOL: u8 = 0x01;

const ENDPOINT: u8 = 0x81;

Expand Down Expand Up @@ -142,13 +142,28 @@ pub struct CynthionQueue {
pub struct CynthionStream {
receiver: mpsc::Receiver<Vec<u8>>,
buffer: VecDeque<u8>,
padding_due: bool,
total_clk_cycles: u64,
}

pub struct CynthionStop {
stop_request: oneshot::Sender<()>,
worker: JoinHandle::<()>,
}

pub struct CynthionPacket {
pub timestamp_ns: u64,
pub bytes: Vec<u8>,
}

/// Convert 60MHz clock cycles to nanoseconds, rounding down.
fn clk_to_ns(clk_cycles: u64) -> u64 {
const TABLE: [u64; 3] = [0, 16, 33];
let quotient = clk_cycles / 3;
let remainder = clk_cycles % 3;
quotient * 50 + TABLE[remainder as usize]
}

/// Check whether a Cynthion device has an accessible analyzer interface.
fn check_device(device_info: &DeviceInfo)
-> Result<(InterfaceSelection, Vec<Speed>), Error>
Expand Down Expand Up @@ -302,6 +317,8 @@ impl CynthionHandle {
CynthionStream {
receiver: rx,
buffer: VecDeque::new(),
padding_due: false,
total_clk_cycles: 0,
},
CynthionStop {
stop_request: stop_tx,
Expand Down Expand Up @@ -437,9 +454,9 @@ impl CynthionQueue {
}

impl Iterator for CynthionStream {
type Item = Vec<u8>;
type Item = CynthionPacket;

fn next(&mut self) -> Option<Vec<u8>> {
fn next(&mut self) -> Option<CynthionPacket> {
loop {
// Do we have another packet already in the buffer?
match self.next_buffered_packet() {
Expand All @@ -458,25 +475,71 @@ impl Iterator for CynthionStream {
}

impl CynthionStream {
fn next_buffered_packet(&mut self) -> Option<Vec<u8>> {
// Do we have the length header for the next packet?
let buffer_len = self.buffer.len();
if buffer_len <= 2 {
return None;
fn next_buffered_packet(&mut self) -> Option<CynthionPacket> {
// Are we waiting for a padding byte?
if self.padding_due {
if self.buffer.is_empty() {
return None;
} else {
self.buffer.pop_front();
self.padding_due= false;
}
}

// Loop over any non-packet events, until we get to a packet.
loop {
// Do we have the length and timestamp for the next packet/event?
if self.buffer.len() < 4 {
return None;
}

if self.buffer[0] == 0xFF {
// This is an event.
let _event_code = self.buffer[1];

// Update our cycle count.
self.update_cycle_count();

// Remove event from buffer.
self.buffer.drain(0..4);
} else {
// This is a packet, handle it below.
break;
}
}

// Do we have all the data for the next packet?
let packet_len = u16::from_be_bytes(
[self.buffer[0], self.buffer[1]]) as usize;
if buffer_len <= 2 + packet_len {
if self.buffer.len() <= 4 + packet_len {
return None;
}

// Remove the length header from the buffer.
self.buffer.drain(0..2);
// Update our cycle count.
self.update_cycle_count();

// Remove the length and timestamp from the buffer.
self.buffer.drain(0..4);

// If packet length is odd, we will need to skip a padding byte after.
if packet_len % 2 == 1 {
self.padding_due = true;
}

// Remove the rest of the packet from the buffer and return it.
Some(CynthionPacket {
timestamp_ns: clk_to_ns(self.total_clk_cycles),
bytes: self.buffer.drain(0..packet_len).collect()
})
}

fn update_cycle_count(&mut self) {
// Decode the cycle count.
let clk_cycles = u16::from_be_bytes(
[self.buffer[2], self.buffer[3]]);

// Remove the packet from the buffer and return it.
Some(self.buffer.drain(0..packet_len).collect())
// Update our running total.
self.total_clk_cycles += clk_cycles as u64;
}
}

Expand Down
57 changes: 49 additions & 8 deletions src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub struct CaptureWriter {
pub shared: Arc<CaptureShared>,
pub packet_data: DataWriter<u8, PACKET_DATA_BLOCK_SIZE>,
pub packet_index: CompactWriter<PacketId, PacketByteId, 2>,
pub packet_times: CompactWriter<PacketId, Timestamp, 2>,
pub transaction_index: CompactWriter<TransactionId, PacketId>,
pub transfer_index: DataWriter<TransferIndexEntry>,
pub item_index: CompactWriter<TrafficItemId, TransferId>,
Expand All @@ -53,6 +54,7 @@ pub struct CaptureReader {
endpoint_readers: VecMap<EndpointId, EndpointReader>,
pub packet_data: DataReader<u8, PACKET_DATA_BLOCK_SIZE>,
pub packet_index: CompactReader<PacketId, PacketByteId>,
pub packet_times: CompactReader<PacketId, Timestamp>,
pub transaction_index: CompactReader<TransactionId, PacketId>,
pub transfer_index: DataReader<TransferIndexEntry>,
pub item_index: CompactReader<TrafficItemId, TransferId>,
Expand All @@ -72,6 +74,7 @@ pub fn create_capture()
let (data_writer, data_reader) =
data_stream_with_block_size::<_, PACKET_DATA_BLOCK_SIZE>()?;
let (packets_writer, packets_reader) = compact_index()?;
let (timestamp_writer, timestamp_reader) = compact_index()?;
let (transactions_writer, transactions_reader) = compact_index()?;
let (transfers_writer, transfers_reader) = data_stream()?;
let (items_writer, items_reader) = compact_index()?;
Expand All @@ -93,6 +96,7 @@ pub fn create_capture()
shared: shared.clone(),
packet_data: data_writer,
packet_index: packets_writer,
packet_times: timestamp_writer,
transaction_index: transactions_writer,
transfer_index: transfers_writer,
item_index: items_writer,
Expand All @@ -109,6 +113,7 @@ pub fn create_capture()
endpoint_readers: VecMap::new(),
packet_data: data_reader,
packet_index: packets_reader,
packet_times: timestamp_reader,
transaction_index: transactions_reader,
transfer_index: transfers_reader,
item_index: items_reader,
Expand Down Expand Up @@ -194,6 +199,7 @@ pub fn create_endpoint()

pub type PacketByteId = Id<u8>;
pub type PacketId = Id<PacketByteId>;
pub type Timestamp = u64;
pub type TransactionId = Id<PacketId>;
pub type TransferId = Id<TransferIndexEntry>;
pub type EndpointTransactionId = Id<TransactionId>;
Expand Down Expand Up @@ -515,7 +521,7 @@ pub struct Transaction {
start_pid: PID,
end_pid: PID,
split: Option<(SplitFields, PID)>,
packet_id_range: Range<PacketId>,
pub packet_id_range: Range<PacketId>,
data_packet_id: Option<PacketId>,
payload_byte_range: Option<Range<Id<u8>>>,
}
Expand Down Expand Up @@ -862,14 +868,20 @@ impl CaptureReader {
self.packet_data.get_range(&range)
}

pub fn packet_time(&mut self, id: PacketId)
-> Result<Timestamp, Error>
{
self.packet_times.get(id)
}

fn packet_pid(&mut self, id: PacketId)
-> Result<PID, Error>
{
let offset: Id<u8> = self.packet_index.get(id)?;
Ok(PID::from(self.packet_data.get(offset)?))
}

fn transaction(&mut self, id: TransactionId)
pub fn transaction(&mut self, id: TransactionId)
-> Result<Transaction, Error>
{
let packet_id_range = self.transaction_index.target_range(
Expand Down Expand Up @@ -1056,6 +1068,7 @@ pub trait ItemSource<Item> {
-> Result<(CompletionStatus, u64), Error>;
fn summary(&mut self, item: &Item) -> Result<String, Error>;
fn connectors(&mut self, item: &Item) -> Result<String, Error>;
fn timestamp(&mut self, item: &Item) -> Result<Timestamp, Error>;
}

impl ItemSource<TrafficItem> for CaptureReader {
Expand Down Expand Up @@ -1361,6 +1374,27 @@ impl ItemSource<TrafficItem> for CaptureReader {
);
Ok(connectors)
}

fn timestamp(&mut self, item: &TrafficItem)
-> Result<Timestamp, Error>
{
use TrafficItem::*;
let packet_id = match item {
Transfer(transfer_id) => {
let entry = self.transfer_index.get(*transfer_id)?;
let ep_traf = self.endpoint_traffic(entry.endpoint_id())?;
let ep_transaction_id =
ep_traf.transfer_index.get(entry.transfer_id())?;
let transaction_id =
ep_traf.transaction_ids.get(ep_transaction_id)?;
self.transaction_index.get(transaction_id)?
},
Transaction(.., transaction_id) =>
self.transaction_index.get(*transaction_id)?,
Packet(.., packet_id) => *packet_id,
};
self.packet_time(packet_id)
}
}

impl ItemSource<DeviceItem> for CaptureReader {
Expand Down Expand Up @@ -1579,6 +1613,12 @@ impl ItemSource<DeviceItem> for CaptureReader {
};
Ok(" ".repeat(depth))
}

fn timestamp(&mut self, _item: &DeviceItem)
-> Result<Timestamp, Error>
{
unreachable!()
}
}

#[cfg(test)]
Expand All @@ -1588,8 +1628,8 @@ mod tests {
use std::io::{BufReader, BufWriter, BufRead, Write};
use std::path::PathBuf;
use crate::decoder::Decoder;
use crate::loader::Loader;
use itertools::Itertools;
use pcap_file::pcap::PcapReader;

fn summarize_item(cap: &mut CaptureReader, item: &TrafficItem, depth: usize)
-> String
Expand Down Expand Up @@ -1643,13 +1683,14 @@ mod tests {
ref_path.push("reference.txt");
out_path.push("output.txt");
{
let pcap_file = File::open(cap_path).unwrap();
let mut pcap_reader = PcapReader::new(pcap_file).unwrap();
let mut loader = Loader::open(cap_path).unwrap();
let (writer, mut reader) = create_capture().unwrap();
let mut decoder = Decoder::new(writer).unwrap();
while let Some(result) = pcap_reader.next_raw_packet() {
let packet = result.unwrap().data;
decoder.handle_raw_packet(&packet).unwrap();
while let Some(result) = loader.next() {
let (packet, timestamp_ns) = result.unwrap();
decoder
.handle_raw_packet(&packet.data, timestamp_ns)
.unwrap();
}
decoder.finish().unwrap();
let out_file = File::create(out_path.clone()).unwrap();
Expand Down
3 changes: 2 additions & 1 deletion src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,11 +578,12 @@ impl Decoder {
Ok(decoder)
}

pub fn handle_raw_packet(&mut self, packet: &[u8])
pub fn handle_raw_packet(&mut self, packet: &[u8], timestamp_ns: u64)
-> Result<(), Error>
{
let data_range = self.capture.packet_data.append(packet)?;
let packet_id = self.capture.packet_index.push(data_range.start)?;
self.capture.packet_times.push(timestamp_ns)?;
self.transaction_update(packet_id, packet)?;
Ok(())
}
Expand Down
57 changes: 57 additions & 0 deletions src/loader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use std::fs::File;
use std::io::BufReader;
use std::mem::size_of;
use std::path::PathBuf;

use pcap_file::{
pcap::{PcapReader, PcapHeader, RawPcapPacket},
TsResolution,
};

use anyhow::Error;

pub struct Loader {
pcap: PcapReader<BufReader<File>>,
pub file_size: u64,
pub bytes_read: u64,
frac_ns: u64,
start_time: Option<u64>,
}

impl Loader {
pub fn open(path: PathBuf) -> Result<Loader, Error> {
let file = File::open(path)?;
let file_size = file.metadata()?.len();
let reader = BufReader::new(file);
let pcap = PcapReader::new(reader)?;
let header = pcap.header();
let bytes_read = size_of::<PcapHeader>() as u64;
let frac_ns = match header.ts_resolution {
TsResolution::MicroSecond => 1_000,
TsResolution::NanoSecond => 1,
};
let start_time = None;
Ok(Loader{pcap, file_size, bytes_read, frac_ns, start_time})
}

pub fn next(&mut self) -> Option<Result<(RawPcapPacket, u64), Error>> {
match self.pcap.next_raw_packet() {
None => None,
Some(Err(e)) => Some(Err(Error::from(e))),
Some(Ok(packet)) => {
let raw_timestamp =
packet.ts_sec as u64 * 1_000_000_000 +
packet.ts_frac as u64 * self.frac_ns;
let timestamp = if let Some(start) = self.start_time {
raw_timestamp - start
} else {
self.start_time = Some(raw_timestamp);
0
};
let size = 16 + packet.data.len();
self.bytes_read += size as u64;
Some(Ok((packet, timestamp)))
}
}
}
}
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod decoder;
mod expander;
mod id;
mod index_stream;
mod loader;
mod model;
mod rcu;
mod row_data;
Expand Down
Loading

0 comments on commit 828d667

Please sign in to comment.