Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better debug #293

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ enum_dispatch = "0.3.12"
uuid = { version = "0.8", features = ["v4", "serde"] }
ts-rs = "7.0"
cached = { version = "0.45", features = ["serde", "serde_json", "async_tokio_rt_multi_thread"] }
derivative = "2.2.0"

## Mavlink
mavlink = { version = "0.10.1", features = ["default", "emit-extensions"] }
Expand Down
2 changes: 1 addition & 1 deletion src/stream/gst/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub fn set_plugin_rank(plugin_name: &str, rank: gst::Rank) -> Result<()> {
feature.set_rank(rank);
} else {
return Err(anyhow!(
"Cannot found Gstreamer feature {plugin_name:#?} in the registry.",
"Cannot found GStreamer feature {plugin_name:#?} in the registry.",
));
}

Expand Down
2 changes: 1 addition & 1 deletion src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl Stream {
)
})
.context(format!(
"Failed when spawing PipelineRunner thread for Pipeline {pipeline_id:#?}"
"Failed when spawning PipelineRunner thread for Pipeline {pipeline_id:#?}"
))?;

Ok(Self {
Expand Down
13 changes: 8 additions & 5 deletions src/stream/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,25 @@ impl Pipeline {
})
}

#[instrument(level = "debug", skip(self))]
#[instrument(level = "debug")]
pub fn add_sink(&mut self, sink: Sink) -> Result<()> {
self.inner_state_mut().add_sink(sink)
}

#[allow(dead_code)] // This functions is reserved here for when we start dynamically add/remove Sinks
#[instrument(level = "debug", skip(self))]
#[instrument(level = "debug")]
pub fn remove_sink(&mut self, sink_id: &uuid::Uuid) -> Result<()> {
self.inner_state_mut().remove_sink(sink_id)
}
}

#[derive(Debug)]
#[derive(derivative::Derivative)]
#[derivative(Debug)]
pub struct PipelineState {
pub pipeline_id: uuid::Uuid,
#[derivative(Debug = "ignore")]
pub pipeline: gst::Pipeline,
#[derivative(Debug = "ignore")]
pub sink_tee: gst::Element,
pub sinks: HashMap<uuid::Uuid, Sink>,
pub pipeline_runner: PipelineRunner,
Expand Down Expand Up @@ -140,7 +143,7 @@ impl PipelineState {
}

/// Links the sink pad from the given Sink to this Pipeline's Tee element
#[instrument(level = "debug", skip(self))]
#[instrument(level = "debug")]
pub fn add_sink(&mut self, mut sink: Sink) -> Result<()> {
let pipeline_id = &self.pipeline_id;

Expand Down Expand Up @@ -213,7 +216,7 @@ impl PipelineState {
/// Unlinks the src pad from this Sink from the given sink pad of a Tee element
///
/// Important notes about pad unlinking: [here](https://gstreamer.freedesktop.org/documentation/application-development/advanced/pipeline-manipulation.html?gi-language=c#dynamically-changing-the-pipeline)
#[instrument(level = "info", skip(self))]
#[instrument(level = "info")]
pub fn remove_sink(&mut self, sink_id: &uuid::Uuid) -> Result<()> {
let pipeline_id = &self.pipeline_id;
let sink = self.sinks.remove(sink_id).context(format!(
Expand Down
2 changes: 1 addition & 1 deletion src/stream/pipeline/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl PipelineRunner {
}
})
.context(format!(
"Failed when spawing PipelineRunner thread for Pipeline {pipeline_id:#?}"
"Failed when spawning PipelineRunner thread for Pipeline {pipeline_id:#?}"
))?,
})
}
Expand Down
2 changes: 1 addition & 1 deletion src/stream/rtsp/rtsp_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl RTSPServer {
thread::Builder::new()
.name("RTSPServer".to_string())
.spawn(move || RTSPServer::run_main_loop(sender))
.expect("Failed when spawing RTSPServer thread"),
.expect("Failed when spawning RTSPServer thread"),
),
main_loop_thread_rx_channel: receiver,
}
Expand Down
28 changes: 19 additions & 9 deletions src/stream/sink/image_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,33 @@ impl CachedThumbnails {
}
}

#[derive(Debug)]
#[derive(derivative::Derivative)]
#[derivative(Debug)]
pub struct ImageSink {
sink_id: uuid::Uuid,
#[derivative(Debug = "ignore")]
pipeline: gst::Pipeline,
#[derivative(Debug = "ignore")]
queue: gst::Element,
#[derivative(Debug = "ignore")]
proxysink: gst::Element,
#[derivative(Debug = "ignore")]
_proxysrc: gst::Element,
#[derivative(Debug = "ignore")]
_transcoding_elements: Vec<gst::Element>,
#[derivative(Debug = "ignore")]
appsink: gst_app::AppSink,
#[derivative(Debug = "ignore")]
tee_src_pad: Option<gst::Pad>,
#[derivative(Debug = "ignore")]
flat_samples_sender: tokio::sync::broadcast::Sender<ClonableResult<FlatSamples<Vec<u8>>>>,
#[derivative(Debug = "ignore")]
Comment on lines 63 to +80
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is ugly, maybe we could change it to an internal struct, like:

struct Inner {
    pipeline: gst::Pipeline,
    queue: gst::Element,
    proxysink: gst::Element,
    _proxysrc: gst::Element,
    _transcoding_elements: Vec<gst::Element>,
    appsink: gst_app::AppSink,
    tee_src_pad: Option<gst::Pad>,
}

#[derive(derivative::Derivative)]
#[derivative(Debug)]
pub struct ImageSink {
    sink_id: uuid::Uuid,
    #[derivative(Debug = "ignore")]
    inner: Inner,
    flat_samples_sender: tokio::sync::broadcast::Sender<ClonableResult<FlatSamples<Vec<u8>>>>,
    pad_blocker: Arc<Mutex<Option<gst::PadProbeId>>>,
    pipeline_runner: PipelineRunner,
    thumbnails: Arc<Mutex<CachedThumbnails>>,
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the consequences of such approach ? If there are none, we could just change the code.

pad_blocker: Arc<Mutex<Option<gst::PadProbeId>>>,
pipeline_runner: PipelineRunner,
thumbnails: Arc<Mutex<CachedThumbnails>>,
}
impl SinkInterface for ImageSink {
#[instrument(level = "debug", skip(self))]
#[instrument(level = "debug")]
fn link(
&mut self,
pipeline: &gst::Pipeline,
Expand Down Expand Up @@ -207,7 +217,7 @@ impl SinkInterface for ImageSink {
Ok(())
}

#[instrument(level = "debug", skip(self))]
#[instrument(level = "debug")]
fn unlink(&self, pipeline: &gst::Pipeline, pipeline_id: &uuid::Uuid) -> Result<()> {
let Some(tee_src_pad) = &self.tee_src_pad else {
warn!("Tried to unlink Sink from a pipeline without a Tee src pad.");
Expand Down Expand Up @@ -265,24 +275,24 @@ impl SinkInterface for ImageSink {
Ok(())
}

#[instrument(level = "debug", skip(self))]
#[instrument(level = "debug")]
fn get_id(&self) -> uuid::Uuid {
self.sink_id
}

#[instrument(level = "trace", skip(self))]
#[instrument(level = "trace")]
fn get_sdp(&self) -> Result<gst_sdp::SDPMessage> {
Err(anyhow!(
"Not available. Reason: Image Sink doesn't provide endpoints"
))
}

#[instrument(level = "debug", skip(self))]
#[instrument(level = "debug")]
fn start(&self) -> Result<()> {
self.pipeline_runner.start()
}

#[instrument(level = "debug", skip(self))]
#[instrument(level = "debug")]
fn eos(&self) {
if let Err(error) = self.pipeline.post_message(gst::message::Eos::new()) {
error!("Failed posting Eos message into Sink bus. Reason: {error:?}");
Expand Down Expand Up @@ -532,7 +542,7 @@ impl ImageSink {
})
}

#[instrument(level = "debug", skip(self))]
#[instrument(level = "debug")]
async fn try_get_flat_sample(&self) -> Result<FlatSamples<Vec<u8>>> {
// Play the pipeline if it's not playing yet.
// Here we can ignore the result because we have a timeout when waiting for the snapshot
Expand Down Expand Up @@ -614,7 +624,7 @@ impl ImageSink {
Ok(thumbnail)
}

#[instrument(level = "debug", skip(self))]
#[instrument(level = "debug")]
pub async fn make_jpeg_thumbnail_from_last_frame(
&self,
quality: u8,
Expand Down
23 changes: 14 additions & 9 deletions src/stream/sink/rtsp_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,23 @@ use gst::prelude::*;

use super::SinkInterface;

#[derive(Debug)]
#[derive(derivative::Derivative)]
#[derivative(Debug)]
pub struct RtspSink {
sink_id: uuid::Uuid,
#[derivative(Debug = "ignore")]
queue: gst::Element,
#[derivative(Debug = "ignore")]
sink: gst::Element,
#[derivative(Debug = "ignore")]
sink_sink_pad: gst::Pad,
#[derivative(Debug = "ignore")]
tee_src_pad: Option<gst::Pad>,
path: String,
socket_path: String,
}
impl SinkInterface for RtspSink {
#[instrument(level = "debug", skip(self))]
#[instrument(level = "debug")]
fn link(
&mut self,
pipeline: &gst::Pipeline,
Expand Down Expand Up @@ -144,7 +149,7 @@ impl SinkInterface for RtspSink {
Ok(())
}

#[instrument(level = "debug", skip(self))]
#[instrument(level = "debug")]
fn unlink(&self, pipeline: &gst::Pipeline, pipeline_id: &uuid::Uuid) -> Result<()> {
if let Err(error) = std::fs::remove_file(&self.socket_path) {
warn!("Failed removing the RTSP Sink socket file. Reason: {error:?}");
Expand Down Expand Up @@ -201,24 +206,24 @@ impl SinkInterface for RtspSink {
Ok(())
}

#[instrument(level = "debug", skip(self))]
#[instrument(level = "debug")]
fn get_id(&self) -> uuid::Uuid {
self.sink_id
}

#[instrument(level = "trace", skip(self))]
#[instrument(level = "trace")]
fn get_sdp(&self) -> Result<gst_sdp::SDPMessage> {
Err(anyhow!(
"Not available. Reason: RTSP Sink should only be connected from its RTSP endpoint."
))
}

#[instrument(level = "debug", skip(self))]
#[instrument(level = "debug")]
fn start(&self) -> Result<()> {
Ok(())
}

#[instrument(level = "debug", skip(self))]
#[instrument(level = "debug")]
fn eos(&self) {}
}

Expand Down Expand Up @@ -259,12 +264,12 @@ impl RtspSink {
})
}

#[instrument(level = "trace", skip(self))]
#[instrument(level = "trace")]
pub fn path(&self) -> String {
self.path.clone()
}

#[instrument(level = "trace", skip(self))]
#[instrument(level = "trace")]
pub fn socket_path(&self) -> String {
self.socket_path.clone()
}
Expand Down
21 changes: 14 additions & 7 deletions src/stream/sink/udp_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,28 @@ use gst::prelude::*;
use super::SinkInterface;
use crate::stream::pipeline::runner::PipelineRunner;

#[derive(Debug)]
#[derive(derivative::Derivative)]
#[derivative(Debug)]
pub struct UdpSink {
sink_id: uuid::Uuid,
pipeline: gst::Pipeline,
#[derivative(Debug = "ignore")]
queue: gst::Element,
#[derivative(Debug = "ignore")]
proxysink: gst::Element,
#[derivative(Debug = "ignore")]
_proxysrc: gst::Element,
#[derivative(Debug = "ignore")]
_udpsink: gst::Element,
#[derivative(Debug = "ignore")]
udpsink_sink_pad: gst::Pad,
#[derivative(Debug = "ignore")]
tee_src_pad: Option<gst::Pad>,
addresses: Vec<url::Url>,
pipeline_runner: PipelineRunner,
}
impl SinkInterface for UdpSink {
#[instrument(level = "debug", skip(self))]
#[instrument(level = "debug")]
fn link(
&mut self,
pipeline: &gst::Pipeline,
Expand Down Expand Up @@ -155,7 +162,7 @@ impl SinkInterface for UdpSink {
Ok(())
}

#[instrument(level = "debug", skip(self))]
#[instrument(level = "debug")]
fn unlink(&self, pipeline: &gst::Pipeline, pipeline_id: &uuid::Uuid) -> Result<()> {
let Some(tee_src_pad) = &self.tee_src_pad else {
warn!("Tried to unlink Sink from a pipeline without a Tee src pad.");
Expand Down Expand Up @@ -213,12 +220,12 @@ impl SinkInterface for UdpSink {
Ok(())
}

#[instrument(level = "debug", skip(self))]
#[instrument(level = "debug")]
fn get_id(&self) -> uuid::Uuid {
self.sink_id
}

#[instrument(level = "debug", skip(self))]
#[instrument(level = "debug")]
fn get_sdp(&self) -> Result<gst_sdp::SDPMessage> {
let caps = self
.udpsink_sink_pad
Expand Down Expand Up @@ -257,12 +264,12 @@ impl SinkInterface for UdpSink {
Ok(sdp)
}

#[instrument(level = "debug", skip(self))]
#[instrument(level = "debug")]
fn start(&self) -> Result<()> {
self.pipeline_runner.start()
}

#[instrument(level = "debug", skip(self))]
#[instrument(level = "debug")]
fn eos(&self) {
if let Err(error) = self.pipeline.post_message(gst::message::Eos::new()) {
error!("Failed posting Eos message into Sink bus. Reason: {error:?}");
Expand Down
Loading