Skip to content

Commit

Permalink
ConsumerDump response, do not allocate memory
Browse files Browse the repository at this point in the history
  • Loading branch information
jmillan committed Oct 26, 2023
1 parent 7c28fbc commit 8745596
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 206 deletions.
14 changes: 7 additions & 7 deletions rust/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::ortc::RtpMapping;
use crate::pipe_transport::PipeTransportOptions;
use crate::plain_transport::PlainTransportOptions;
use crate::producer::{ProducerId, ProducerTraceEventType, ProducerType};
use crate::router::consumer::ConsumerDump;
use crate::router::producer::ProducerDump;
use crate::router::{RouterDump, RouterId};
use crate::rtp_observer::RtpObserverId;
Expand Down Expand Up @@ -2388,7 +2389,7 @@ pub(crate) struct ConsumerDumpRequest {}
impl Request for ConsumerDumpRequest {
const METHOD: request::Method = request::Method::ConsumerDump;
type HandlerId = ConsumerId;
type Response = response::Body;
type Response = ConsumerDump;

fn into_bytes(self, id: u32, handler_id: Self::HandlerId) -> Vec<u8> {
let mut builder = Builder::new();
Expand All @@ -2409,12 +2410,11 @@ impl Request for ConsumerDumpRequest {
fn convert_response(
response: Option<response::BodyRef<'_>>,
) -> Result<Self::Response, Box<dyn Error>> {
match response {
Some(data) => Ok(data.try_into().unwrap()),
_ => {
panic!("Wrong message from worker: {response:?}");
}
}
let Some(response::BodyRef::ConsumerDumpResponse(data)) = response else {
panic!("Wrong message from worker: {response:?}");
};

ConsumerDump::from_fbs_ref(data)
}
}

Expand Down
115 changes: 47 additions & 68 deletions rust/src/router/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,26 +161,6 @@ pub struct RtpStreamParams {
}

impl RtpStreamParams {
pub(crate) fn from_fbs(params: &rtp_stream::Params) -> Self {
Self {
clock_rate: params.clock_rate,
cname: params.cname.clone(),
encoding_idx: params.encoding_idx,
mime_type: params.mime_type.clone().parse().unwrap(),
payload_type: params.payload_type,
spatial_layers: params.spatial_layers,
ssrc: params.ssrc,
temporal_layers: params.temporal_layers,
use_dtx: params.use_dtx,
use_in_band_fec: params.use_in_band_fec,
use_nack: params.use_nack,
use_pli: params.use_pli,
rid: params.rid.clone(),
rtx_ssrc: params.rtx_ssrc,
rtx_payload_type: params.rtx_payload_type,
}
}

pub(crate) fn from_fbs_ref(params: rtp_stream::ParamsRef<'_>) -> Result<Self, Box<dyn Error>> {
Ok(Self {
clock_rate: params.clock_rate()?,
Expand Down Expand Up @@ -236,11 +216,11 @@ pub struct RtpStream {
}

impl RtpStream {
pub(crate) fn from_fbs(dump: rtp_stream::Dump) -> Self {
Self {
params: RtpStreamParams::from_fbs(&dump.params),
score: dump.score,
}
pub(crate) fn from_fbs_ref(dump: rtp_stream::DumpRef<'_>) -> Result<Self, Box<dyn Error>> {
Ok(Self {
params: RtpStreamParams::from_fbs_ref(dump.params()?)?,
score: dump.score()?,
})
}
}

Expand Down Expand Up @@ -283,42 +263,48 @@ pub struct ConsumerDump {
}

impl ConsumerDump {
pub(crate) fn from_fbs(dump: consumer::DumpResponse) -> Result<Self, Box<dyn Error>> {
let dump = dump.data;
pub(crate) fn from_fbs_ref(
dump: consumer::DumpResponseRef<'_>,
) -> Result<Self, Box<dyn Error>> {
let dump = dump.data();

Ok(Self {
id: dump.base.id.parse()?,
kind: MediaKind::from_fbs(dump.base.kind),
paused: dump.base.paused,
priority: dump.base.priority,
producer_id: dump.base.producer_id.parse()?,
producer_paused: dump.base.producer_paused,
rtp_parameters: RtpParameters::from_fbs(*dump.base.rtp_parameters).unwrap(),
supported_codec_payload_types: dump.base.supported_codec_payload_types,
trace_event_types: dump
.base
.trace_event_types
id: dump?.base()?.id()?.parse()?,
kind: MediaKind::from_fbs(dump?.base()?.kind()?),
paused: dump?.base()?.paused()?,
priority: dump?.base()?.priority()?,
producer_id: dump?.base()?.producer_id()?.parse()?,
producer_paused: dump?.base()?.producer_paused()?,
rtp_parameters: RtpParameters::from_fbs_ref(dump?.base()?.rtp_parameters()?)?,
supported_codec_payload_types: Vec::from(
dump?.base()?.supported_codec_payload_types()?,
),
trace_event_types: dump?
.base()?
.trace_event_types()?
.iter()
.map(|trace_event_type| Ok(ConsumerTraceEventType::from_fbs(trace_event_type?)))
.collect::<Result<_, Box<dyn Error>>>()?,
r#type: ConsumerType::from_fbs(dump?.base()?.type_()?),
consumable_rtp_encodings: dump?
.base()?
.consumable_rtp_encodings()?
.iter()
.map(|encoding_parameters| {
Ok(RtpEncodingParameters::from_fbs_ref(encoding_parameters?)?)
})
.collect::<Result<_, Box<dyn Error>>>()?,
rtp_streams: dump?
.rtp_streams()?
.iter()
.map(ConsumerTraceEventType::from_fbs)
.collect(),
r#type: ConsumerType::from_fbs(dump.base.type_),
consumable_rtp_encodings: dump
.base
.consumable_rtp_encodings
.into_iter()
.map(RtpEncodingParameters::from_fbs)
.collect(),
rtp_streams: dump
.rtp_streams
.into_iter()
.map(RtpStream::from_fbs)
.collect(),
preferred_spatial_layer: dump.preferred_spatial_layer,
target_spatial_layer: dump.target_spatial_layer,
current_spatial_layer: dump.current_spatial_layer,
preferred_temporal_layer: dump.preferred_temporal_layer,
target_temporal_layer: dump.target_temporal_layer,
current_temporal_layer: dump.current_temporal_layer,
.map(|stream| Ok(RtpStream::from_fbs_ref(stream?)?))
.collect::<Result<_, Box<dyn Error>>>()?,
preferred_spatial_layer: dump?.preferred_spatial_layer()?,
target_spatial_layer: dump?.target_spatial_layer()?,
current_spatial_layer: dump?.current_spatial_layer()?,
preferred_temporal_layer: dump?.preferred_temporal_layer()?,
target_temporal_layer: dump?.target_temporal_layer()?,
current_temporal_layer: dump?.current_temporal_layer()?,
})
}
}
Expand Down Expand Up @@ -583,7 +569,7 @@ impl ConsumerTraceEventType {
}
}

pub(crate) fn from_fbs(event_type: &consumer::TraceEventType) -> Self {
pub(crate) fn from_fbs(event_type: consumer::TraceEventType) -> Self {
match event_type {
consumer::TraceEventType::Rtp => ConsumerTraceEventType::Rtp,
consumer::TraceEventType::Keyframe => ConsumerTraceEventType::KeyFrame,
Expand Down Expand Up @@ -1006,17 +992,10 @@ impl Consumer {
pub async fn dump(&self) -> Result<ConsumerDump, RequestError> {
debug!("dump()");

let response = self
.inner
self.inner
.channel
.request(self.id(), ConsumerDumpRequest {})
.await?;

if let response::Body::ConsumerDumpResponse(data) = response {
Ok(ConsumerDump::from_fbs(*data).expect("Error parsing dump response"))
} else {
panic!("Wrong message from worker");
}
.await
}

/// Returns current RTC statistics of the consumer.
Expand Down
10 changes: 7 additions & 3 deletions rust/src/router/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,13 @@ impl RtpStreamRecv {
Ok(Self {
params: RtpStreamParams::from_fbs_ref(dump.params()?)?,
score: dump.score()?,
rtx_stream: dump.rtx_stream()?.map(|stream| RtxStream {
params: RtxStreamParams::from_fbs_ref(stream.params().unwrap()).unwrap(),
}),
rtx_stream: if let Some(rtx_stream) = dump.rtx_stream()? {
Some(RtxStream {
params: RtxStreamParams::from_fbs_ref(rtx_stream.params()?)?,
})
} else {
None
},
})
}
}
Expand Down
147 changes: 19 additions & 128 deletions rust/src/rtp_parameters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -787,119 +787,6 @@ pub struct RtpParameters {
}

impl RtpParameters {
pub(crate) fn from_fbs(
rtp_parameters: rtp_parameters::RtpParameters,
) -> Result<Self, Box<dyn Error>> {
Ok(Self {
mid: rtp_parameters.mid,
codecs: rtp_parameters
.codecs
.into_iter()
.map(|codec| {
let parameters = codec
.parameters
.unwrap_or_default()
.into_iter()
.map(|parameters| {
Ok((
Cow::Owned(parameters.name),
match parameters.value {
rtp_parameters::Value::Boolean(_)
| rtp_parameters::Value::Double(_)
| rtp_parameters::Value::Integer32Array(_) => {
// TODO: Above value variant should not exist in the
// first place
panic!("Invalid parameter")
}
rtp_parameters::Value::Integer32(n) => {
RtpCodecParametersParametersValue::Number(
n.value.try_into()?,
)
}
rtp_parameters::Value::String(s) => {
RtpCodecParametersParametersValue::String(s.value.into())
}
},
))
})
.collect::<Result<_, Box<dyn Error>>>()?;
let rtcp_feedback = codec
.rtcp_feedback
.unwrap_or_default()
.into_iter()
.map(|rtcp_feedback| {
RtcpFeedback::from_type_parameter(
&rtcp_feedback.type_,
&rtcp_feedback.parameter.unwrap_or_default(),
)
})
.collect::<Result<_, _>>()?;

Ok(match MimeType::from_str(&codec.mime_type)? {
MimeType::Audio(mime_type) => RtpCodecParameters::Audio {
mime_type,
payload_type: codec.payload_type,
clock_rate: codec.clock_rate.try_into()?,
channels: codec
.channels
.ok_or("Audio must have channels specified")?
.try_into()?,
parameters,
rtcp_feedback: vec![],
},
MimeType::Video(mime_type) => RtpCodecParameters::Video {
mime_type,
payload_type: codec.payload_type,
clock_rate: codec.clock_rate.try_into()?,
parameters,
rtcp_feedback,
},
})
})
.collect::<Result<_, Box<dyn Error>>>()?,
header_extensions: rtp_parameters
.header_extensions
.into_iter()
.map(|header_extension_parameters| {
Ok(RtpHeaderExtensionParameters {
uri: RtpHeaderExtensionUri::from_fbs(header_extension_parameters.uri),
id: u16::from(header_extension_parameters.id),
encrypt: header_extension_parameters.encrypt,
})
})
.collect::<Result<_, Box<dyn Error>>>()?,
encodings: rtp_parameters
.encodings
.into_iter()
.map(|encoding| {
Ok(RtpEncodingParameters {
ssrc: encoding.ssrc,
rid: encoding.rid,
codec_payload_type: encoding.codec_payload_type,
rtx: encoding
.rtx
.map(|rtx| RtpEncodingParametersRtx { ssrc: rtx.ssrc }),
dtx: {
match encoding.dtx {
true => Some(true),
false => None,
}
},
scalability_mode: encoding
.scalability_mode
.unwrap_or(String::from("S1T1"))
.parse()?,
max_bitrate: encoding.max_bitrate,
})
})
.collect::<Result<_, Box<dyn Error>>>()?,
rtcp: RtcpParameters {
cname: rtp_parameters.rtcp.cname,
reduced_size: rtp_parameters.rtcp.reduced_size,
},
})
}

pub(crate) fn from_fbs_ref(
rtp_parameters: rtp_parameters::RtpParametersRef<'_>,
) -> Result<Self, Box<dyn Error>> {
Expand Down Expand Up @@ -1421,27 +1308,31 @@ impl RtpEncodingParameters {
}
}

pub(crate) fn from_fbs(encoding_parameters: rtp_parameters::RtpEncodingParameters) -> Self {
Self {
ssrc: encoding_parameters.ssrc,
rid: encoding_parameters.rid.clone(),
codec_payload_type: encoding_parameters.codec_payload_type,
rtx: encoding_parameters
.rtx
.map(|rtx| RtpEncodingParametersRtx { ssrc: rtx.ssrc }),
pub(crate) fn from_fbs_ref(
encoding_parameters: rtp_parameters::RtpEncodingParametersRef<'_>,
) -> Result<Self, Box<dyn Error>> {
Ok(Self {
ssrc: encoding_parameters.ssrc()?,
rid: encoding_parameters.rid()?.map(|rid| rid.to_string()),
codec_payload_type: encoding_parameters.codec_payload_type()?,
rtx: if let Some(rtx) = encoding_parameters.rtx()? {
Some(RtpEncodingParametersRtx { ssrc: rtx.ssrc()? })
} else {
None
},
dtx: {
match encoding_parameters.dtx {
match encoding_parameters.dtx()? {
true => Some(true),
false => None,
}
},
scalability_mode: encoding_parameters
.scalability_mode
.unwrap_or(String::from("S1T1"))
.parse()
.unwrap(),
max_bitrate: encoding_parameters.max_bitrate,
}
.scalability_mode()?
.map(|maybe_scalability_mode| maybe_scalability_mode.parse())
.transpose()?
.unwrap_or_default(),
max_bitrate: encoding_parameters.max_bitrate()?,
})
}
}

Expand Down

0 comments on commit 8745596

Please sign in to comment.