From 874559624591a177d5b5fd009e9e24d014ac3652 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Luis=20Mill=C3=A1n?= Date: Thu, 26 Oct 2023 11:47:53 +0200 Subject: [PATCH] ConsumerDump response, do not allocate memory --- rust/src/messages.rs | 14 ++-- rust/src/router/consumer.rs | 115 ++++++++++++---------------- rust/src/router/producer.rs | 10 ++- rust/src/rtp_parameters.rs | 147 +++++------------------------------- 4 files changed, 80 insertions(+), 206 deletions(-) diff --git a/rust/src/messages.rs b/rust/src/messages.rs index 0d3bff5dbd..5e6f3b808f 100644 --- a/rust/src/messages.rs +++ b/rust/src/messages.rs @@ -19,6 +19,7 @@ use crate::ortc::RtpMapping; use crate::pipe_transport::PipeTransportOptions; use crate::plain_transport::PlainTransportOptions; use crate::producer::{ProducerId, ProducerTraceEventType, ProducerType}; +use crate::router::consumer::ConsumerDump; use crate::router::producer::ProducerDump; use crate::router::{RouterDump, RouterId}; use crate::rtp_observer::RtpObserverId; @@ -2388,7 +2389,7 @@ pub(crate) struct ConsumerDumpRequest {} impl Request for ConsumerDumpRequest { const METHOD: request::Method = request::Method::ConsumerDump; type HandlerId = ConsumerId; - type Response = response::Body; + type Response = ConsumerDump; fn into_bytes(self, id: u32, handler_id: Self::HandlerId) -> Vec { let mut builder = Builder::new(); @@ -2409,12 +2410,11 @@ impl Request for ConsumerDumpRequest { fn convert_response( response: Option>, ) -> Result> { - match response { - Some(data) => Ok(data.try_into().unwrap()), - _ => { - panic!("Wrong message from worker: {response:?}"); - } - } + let Some(response::BodyRef::ConsumerDumpResponse(data)) = response else { + panic!("Wrong message from worker: {response:?}"); + }; + + ConsumerDump::from_fbs_ref(data) } } diff --git a/rust/src/router/consumer.rs b/rust/src/router/consumer.rs index 827a03073b..852c3ae078 100644 --- a/rust/src/router/consumer.rs +++ b/rust/src/router/consumer.rs @@ -161,26 +161,6 @@ pub struct RtpStreamParams { } impl RtpStreamParams { - pub(crate) fn from_fbs(params: &rtp_stream::Params) -> Self { - Self { - clock_rate: params.clock_rate, - cname: params.cname.clone(), - encoding_idx: params.encoding_idx, - mime_type: params.mime_type.clone().parse().unwrap(), - payload_type: params.payload_type, - spatial_layers: params.spatial_layers, - ssrc: params.ssrc, - temporal_layers: params.temporal_layers, - use_dtx: params.use_dtx, - use_in_band_fec: params.use_in_band_fec, - use_nack: params.use_nack, - use_pli: params.use_pli, - rid: params.rid.clone(), - rtx_ssrc: params.rtx_ssrc, - rtx_payload_type: params.rtx_payload_type, - } - } - pub(crate) fn from_fbs_ref(params: rtp_stream::ParamsRef<'_>) -> Result> { Ok(Self { clock_rate: params.clock_rate()?, @@ -236,11 +216,11 @@ pub struct RtpStream { } impl RtpStream { - pub(crate) fn from_fbs(dump: rtp_stream::Dump) -> Self { - Self { - params: RtpStreamParams::from_fbs(&dump.params), - score: dump.score, - } + pub(crate) fn from_fbs_ref(dump: rtp_stream::DumpRef<'_>) -> Result> { + Ok(Self { + params: RtpStreamParams::from_fbs_ref(dump.params()?)?, + score: dump.score()?, + }) } } @@ -283,42 +263,48 @@ pub struct ConsumerDump { } impl ConsumerDump { - pub(crate) fn from_fbs(dump: consumer::DumpResponse) -> Result> { - let dump = dump.data; + pub(crate) fn from_fbs_ref( + dump: consumer::DumpResponseRef<'_>, + ) -> Result> { + let dump = dump.data(); Ok(Self { - id: dump.base.id.parse()?, - kind: MediaKind::from_fbs(dump.base.kind), - paused: dump.base.paused, - priority: dump.base.priority, - producer_id: dump.base.producer_id.parse()?, - producer_paused: dump.base.producer_paused, - rtp_parameters: RtpParameters::from_fbs(*dump.base.rtp_parameters).unwrap(), - supported_codec_payload_types: dump.base.supported_codec_payload_types, - trace_event_types: dump - .base - .trace_event_types + id: dump?.base()?.id()?.parse()?, + kind: MediaKind::from_fbs(dump?.base()?.kind()?), + paused: dump?.base()?.paused()?, + priority: dump?.base()?.priority()?, + producer_id: dump?.base()?.producer_id()?.parse()?, + producer_paused: dump?.base()?.producer_paused()?, + rtp_parameters: RtpParameters::from_fbs_ref(dump?.base()?.rtp_parameters()?)?, + supported_codec_payload_types: Vec::from( + dump?.base()?.supported_codec_payload_types()?, + ), + trace_event_types: dump? + .base()? + .trace_event_types()? + .iter() + .map(|trace_event_type| Ok(ConsumerTraceEventType::from_fbs(trace_event_type?))) + .collect::>>()?, + r#type: ConsumerType::from_fbs(dump?.base()?.type_()?), + consumable_rtp_encodings: dump? + .base()? + .consumable_rtp_encodings()? + .iter() + .map(|encoding_parameters| { + Ok(RtpEncodingParameters::from_fbs_ref(encoding_parameters?)?) + }) + .collect::>>()?, + rtp_streams: dump? + .rtp_streams()? .iter() - .map(ConsumerTraceEventType::from_fbs) - .collect(), - r#type: ConsumerType::from_fbs(dump.base.type_), - consumable_rtp_encodings: dump - .base - .consumable_rtp_encodings - .into_iter() - .map(RtpEncodingParameters::from_fbs) - .collect(), - rtp_streams: dump - .rtp_streams - .into_iter() - .map(RtpStream::from_fbs) - .collect(), - preferred_spatial_layer: dump.preferred_spatial_layer, - target_spatial_layer: dump.target_spatial_layer, - current_spatial_layer: dump.current_spatial_layer, - preferred_temporal_layer: dump.preferred_temporal_layer, - target_temporal_layer: dump.target_temporal_layer, - current_temporal_layer: dump.current_temporal_layer, + .map(|stream| Ok(RtpStream::from_fbs_ref(stream?)?)) + .collect::>>()?, + preferred_spatial_layer: dump?.preferred_spatial_layer()?, + target_spatial_layer: dump?.target_spatial_layer()?, + current_spatial_layer: dump?.current_spatial_layer()?, + preferred_temporal_layer: dump?.preferred_temporal_layer()?, + target_temporal_layer: dump?.target_temporal_layer()?, + current_temporal_layer: dump?.current_temporal_layer()?, }) } } @@ -583,7 +569,7 @@ impl ConsumerTraceEventType { } } - pub(crate) fn from_fbs(event_type: &consumer::TraceEventType) -> Self { + pub(crate) fn from_fbs(event_type: consumer::TraceEventType) -> Self { match event_type { consumer::TraceEventType::Rtp => ConsumerTraceEventType::Rtp, consumer::TraceEventType::Keyframe => ConsumerTraceEventType::KeyFrame, @@ -1006,17 +992,10 @@ impl Consumer { pub async fn dump(&self) -> Result { debug!("dump()"); - let response = self - .inner + self.inner .channel .request(self.id(), ConsumerDumpRequest {}) - .await?; - - if let response::Body::ConsumerDumpResponse(data) = response { - Ok(ConsumerDump::from_fbs(*data).expect("Error parsing dump response")) - } else { - panic!("Wrong message from worker"); - } + .await } /// Returns current RTC statistics of the consumer. diff --git a/rust/src/router/producer.rs b/rust/src/router/producer.rs index 47601d18ce..5b0e1b68fd 100644 --- a/rust/src/router/producer.rs +++ b/rust/src/router/producer.rs @@ -110,9 +110,13 @@ impl RtpStreamRecv { Ok(Self { params: RtpStreamParams::from_fbs_ref(dump.params()?)?, score: dump.score()?, - rtx_stream: dump.rtx_stream()?.map(|stream| RtxStream { - params: RtxStreamParams::from_fbs_ref(stream.params().unwrap()).unwrap(), - }), + rtx_stream: if let Some(rtx_stream) = dump.rtx_stream()? { + Some(RtxStream { + params: RtxStreamParams::from_fbs_ref(rtx_stream.params()?)?, + }) + } else { + None + }, }) } } diff --git a/rust/src/rtp_parameters.rs b/rust/src/rtp_parameters.rs index 0aed81df37..e063baa242 100644 --- a/rust/src/rtp_parameters.rs +++ b/rust/src/rtp_parameters.rs @@ -787,119 +787,6 @@ pub struct RtpParameters { } impl RtpParameters { - pub(crate) fn from_fbs( - rtp_parameters: rtp_parameters::RtpParameters, - ) -> Result> { - Ok(Self { - mid: rtp_parameters.mid, - codecs: rtp_parameters - .codecs - .into_iter() - .map(|codec| { - let parameters = codec - .parameters - .unwrap_or_default() - .into_iter() - .map(|parameters| { - Ok(( - Cow::Owned(parameters.name), - match parameters.value { - rtp_parameters::Value::Boolean(_) - | rtp_parameters::Value::Double(_) - | rtp_parameters::Value::Integer32Array(_) => { - // TODO: Above value variant should not exist in the - // first place - panic!("Invalid parameter") - } - rtp_parameters::Value::Integer32(n) => { - RtpCodecParametersParametersValue::Number( - n.value.try_into()?, - ) - } - rtp_parameters::Value::String(s) => { - RtpCodecParametersParametersValue::String(s.value.into()) - } - }, - )) - }) - .collect::>>()?; - let rtcp_feedback = codec - .rtcp_feedback - .unwrap_or_default() - .into_iter() - .map(|rtcp_feedback| { - RtcpFeedback::from_type_parameter( - &rtcp_feedback.type_, - &rtcp_feedback.parameter.unwrap_or_default(), - ) - }) - .collect::>()?; - - Ok(match MimeType::from_str(&codec.mime_type)? { - MimeType::Audio(mime_type) => RtpCodecParameters::Audio { - mime_type, - payload_type: codec.payload_type, - clock_rate: codec.clock_rate.try_into()?, - channels: codec - .channels - .ok_or("Audio must have channels specified")? - .try_into()?, - parameters, - rtcp_feedback: vec![], - }, - MimeType::Video(mime_type) => RtpCodecParameters::Video { - mime_type, - payload_type: codec.payload_type, - clock_rate: codec.clock_rate.try_into()?, - parameters, - rtcp_feedback, - }, - }) - }) - .collect::>>()?, - header_extensions: rtp_parameters - .header_extensions - .into_iter() - .map(|header_extension_parameters| { - Ok(RtpHeaderExtensionParameters { - uri: RtpHeaderExtensionUri::from_fbs(header_extension_parameters.uri), - id: u16::from(header_extension_parameters.id), - encrypt: header_extension_parameters.encrypt, - }) - }) - .collect::>>()?, - encodings: rtp_parameters - .encodings - .into_iter() - .map(|encoding| { - Ok(RtpEncodingParameters { - ssrc: encoding.ssrc, - rid: encoding.rid, - codec_payload_type: encoding.codec_payload_type, - rtx: encoding - .rtx - .map(|rtx| RtpEncodingParametersRtx { ssrc: rtx.ssrc }), - dtx: { - match encoding.dtx { - true => Some(true), - false => None, - } - }, - scalability_mode: encoding - .scalability_mode - .unwrap_or(String::from("S1T1")) - .parse()?, - max_bitrate: encoding.max_bitrate, - }) - }) - .collect::>>()?, - rtcp: RtcpParameters { - cname: rtp_parameters.rtcp.cname, - reduced_size: rtp_parameters.rtcp.reduced_size, - }, - }) - } - pub(crate) fn from_fbs_ref( rtp_parameters: rtp_parameters::RtpParametersRef<'_>, ) -> Result> { @@ -1421,27 +1308,31 @@ impl RtpEncodingParameters { } } - pub(crate) fn from_fbs(encoding_parameters: rtp_parameters::RtpEncodingParameters) -> Self { - Self { - ssrc: encoding_parameters.ssrc, - rid: encoding_parameters.rid.clone(), - codec_payload_type: encoding_parameters.codec_payload_type, - rtx: encoding_parameters - .rtx - .map(|rtx| RtpEncodingParametersRtx { ssrc: rtx.ssrc }), + pub(crate) fn from_fbs_ref( + encoding_parameters: rtp_parameters::RtpEncodingParametersRef<'_>, + ) -> Result> { + Ok(Self { + ssrc: encoding_parameters.ssrc()?, + rid: encoding_parameters.rid()?.map(|rid| rid.to_string()), + codec_payload_type: encoding_parameters.codec_payload_type()?, + rtx: if let Some(rtx) = encoding_parameters.rtx()? { + Some(RtpEncodingParametersRtx { ssrc: rtx.ssrc()? }) + } else { + None + }, dtx: { - match encoding_parameters.dtx { + match encoding_parameters.dtx()? { true => Some(true), false => None, } }, scalability_mode: encoding_parameters - .scalability_mode - .unwrap_or(String::from("S1T1")) - .parse() - .unwrap(), - max_bitrate: encoding_parameters.max_bitrate, - } + .scalability_mode()? + .map(|maybe_scalability_mode| maybe_scalability_mode.parse()) + .transpose()? + .unwrap_or_default(), + max_bitrate: encoding_parameters.max_bitrate()?, + }) } }