Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rust: Data[Producer|Consumer] subchannels #1191

Merged
merged 2 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions node/src/tests/test-DirectTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -385,11 +385,13 @@ test('dataProducer.send() with subchannels succeeds', async () =>
for (const message of receivedMessages1)
{
expect([ 'both', 'dc1' ].includes(message)).toBe(true);
expect([ 'dc2' ].includes(message)).toBe(false);
}

for (const message of receivedMessages2)
{
expect([ 'both', 'dc2' ].includes(message)).toBe(true);
expect([ 'dc1' ].includes(message)).toBe(false);
}
}, 5000);

Expand Down
120 changes: 26 additions & 94 deletions rust/src/fbs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26331,16 +26331,7 @@ mod root {
)]
pub struct SetSubchannelsRequest {
/// The field `subchannels` in the table `SetSubchannelsRequest`
pub subchannels: ::core::option::Option<::planus::alloc::vec::Vec<u16>>,
}

#[allow(clippy::derivable_impls)]
impl ::core::default::Default for SetSubchannelsRequest {
fn default() -> Self {
Self {
subchannels: ::core::default::Default::default(),
}
}
pub subchannels: ::planus::alloc::vec::Vec<u16>,
}

impl SetSubchannelsRequest {
Expand All @@ -26353,23 +26344,17 @@ mod root {
#[allow(clippy::too_many_arguments)]
pub fn create(
builder: &mut ::planus::Builder,
field_subchannels: impl ::planus::WriteAsOptional<::planus::Offset<[u16]>>,
field_subchannels: impl ::planus::WriteAs<::planus::Offset<[u16]>>,
) -> ::planus::Offset<Self> {
let prepared_subchannels = field_subchannels.prepare(builder);

let mut table_writer: ::planus::table_writer::TableWriter<6> =
::core::default::Default::default();
if prepared_subchannels.is_some() {
table_writer.write_entry::<::planus::Offset<[u16]>>(0);
}
table_writer.write_entry::<::planus::Offset<[u16]>>(0);

unsafe {
table_writer.finish(builder, |object_writer| {
if let ::core::option::Option::Some(prepared_subchannels) =
prepared_subchannels
{
object_writer.write::<_, _, 4>(&prepared_subchannels);
}
object_writer.write::<_, _, 4>(&prepared_subchannels);
});
}
builder.current_offset()
Expand Down Expand Up @@ -26424,17 +26409,10 @@ mod root {
#[allow(clippy::type_complexity)]
pub fn subchannels<T0>(self, value: T0) -> SetSubchannelsRequestBuilder<(T0,)>
where
T0: ::planus::WriteAsOptional<::planus::Offset<[u16]>>,
T0: ::planus::WriteAs<::planus::Offset<[u16]>>,
{
SetSubchannelsRequestBuilder((value,))
}

/// Sets the [`subchannels` field](SetSubchannelsRequest#structfield.subchannels) to null.
#[inline]
#[allow(clippy::type_complexity)]
pub fn subchannels_as_null(self) -> SetSubchannelsRequestBuilder<((),)> {
self.subchannels(())
}
}

impl<T0> SetSubchannelsRequestBuilder<(T0,)> {
Expand All @@ -26451,7 +26429,7 @@ mod root {
}
}

impl<T0: ::planus::WriteAsOptional<::planus::Offset<[u16]>>>
impl<T0: ::planus::WriteAs<::planus::Offset<[u16]>>>
::planus::WriteAs<::planus::Offset<SetSubchannelsRequest>>
for SetSubchannelsRequestBuilder<(T0,)>
{
Expand All @@ -26466,7 +26444,7 @@ mod root {
}
}

impl<T0: ::planus::WriteAsOptional<::planus::Offset<[u16]>>>
impl<T0: ::planus::WriteAs<::planus::Offset<[u16]>>>
::planus::WriteAsOptional<::planus::Offset<SetSubchannelsRequest>>
for SetSubchannelsRequestBuilder<(T0,)>
{
Expand All @@ -26482,7 +26460,7 @@ mod root {
}
}

impl<T0: ::planus::WriteAsOptional<::planus::Offset<[u16]>>>
impl<T0: ::planus::WriteAs<::planus::Offset<[u16]>>>
::planus::WriteAsOffset<SetSubchannelsRequest>
for SetSubchannelsRequestBuilder<(T0,)>
{
Expand All @@ -26503,22 +26481,16 @@ mod root {
impl<'a> SetSubchannelsRequestRef<'a> {
/// Getter for the [`subchannels` field](SetSubchannelsRequest#structfield.subchannels).
#[inline]
pub fn subchannels(
&self,
) -> ::planus::Result<::core::option::Option<::planus::Vector<'a, u16>>>
{
self.0.access(0, "SetSubchannelsRequest", "subchannels")
pub fn subchannels(&self) -> ::planus::Result<::planus::Vector<'a, u16>> {
self.0
.access_required(0, "SetSubchannelsRequest", "subchannels")
}
}

impl<'a> ::core::fmt::Debug for SetSubchannelsRequestRef<'a> {
fn fmt(&self, f: &mut ::core::fmt::Formatter<'_>) -> ::core::fmt::Result {
let mut f = f.debug_struct("SetSubchannelsRequestRef");
if let ::core::option::Option::Some(field_subchannels) =
self.subchannels().transpose()
{
f.field("subchannels", &field_subchannels);
}
f.field("subchannels", &self.subchannels());
f.finish()
}
}
Expand All @@ -26529,13 +26501,7 @@ mod root {
#[allow(unreachable_code)]
fn try_from(value: SetSubchannelsRequestRef<'a>) -> ::planus::Result<Self> {
::core::result::Result::Ok(Self {
subchannels: if let ::core::option::Option::Some(subchannels) =
value.subchannels()?
{
::core::option::Option::Some(subchannels.to_vec()?)
} else {
::core::option::Option::None
},
subchannels: value.subchannels()?.to_vec()?,
})
}
}
Expand Down Expand Up @@ -26631,16 +26597,7 @@ mod root {
)]
pub struct SetSubchannelsResponse {
/// The field `subchannels` in the table `SetSubchannelsResponse`
pub subchannels: ::core::option::Option<::planus::alloc::vec::Vec<u16>>,
}

#[allow(clippy::derivable_impls)]
impl ::core::default::Default for SetSubchannelsResponse {
fn default() -> Self {
Self {
subchannels: ::core::default::Default::default(),
}
}
pub subchannels: ::planus::alloc::vec::Vec<u16>,
}

impl SetSubchannelsResponse {
Expand All @@ -26653,23 +26610,17 @@ mod root {
#[allow(clippy::too_many_arguments)]
pub fn create(
builder: &mut ::planus::Builder,
field_subchannels: impl ::planus::WriteAsOptional<::planus::Offset<[u16]>>,
field_subchannels: impl ::planus::WriteAs<::planus::Offset<[u16]>>,
) -> ::planus::Offset<Self> {
let prepared_subchannels = field_subchannels.prepare(builder);

let mut table_writer: ::planus::table_writer::TableWriter<6> =
::core::default::Default::default();
if prepared_subchannels.is_some() {
table_writer.write_entry::<::planus::Offset<[u16]>>(0);
}
table_writer.write_entry::<::planus::Offset<[u16]>>(0);

unsafe {
table_writer.finish(builder, |object_writer| {
if let ::core::option::Option::Some(prepared_subchannels) =
prepared_subchannels
{
object_writer.write::<_, _, 4>(&prepared_subchannels);
}
object_writer.write::<_, _, 4>(&prepared_subchannels);
});
}
builder.current_offset()
Expand Down Expand Up @@ -26726,17 +26677,10 @@ mod root {
#[allow(clippy::type_complexity)]
pub fn subchannels<T0>(self, value: T0) -> SetSubchannelsResponseBuilder<(T0,)>
where
T0: ::planus::WriteAsOptional<::planus::Offset<[u16]>>,
T0: ::planus::WriteAs<::planus::Offset<[u16]>>,
{
SetSubchannelsResponseBuilder((value,))
}

/// Sets the [`subchannels` field](SetSubchannelsResponse#structfield.subchannels) to null.
#[inline]
#[allow(clippy::type_complexity)]
pub fn subchannels_as_null(self) -> SetSubchannelsResponseBuilder<((),)> {
self.subchannels(())
}
}

impl<T0> SetSubchannelsResponseBuilder<(T0,)> {
Expand All @@ -26753,7 +26697,7 @@ mod root {
}
}

impl<T0: ::planus::WriteAsOptional<::planus::Offset<[u16]>>>
impl<T0: ::planus::WriteAs<::planus::Offset<[u16]>>>
::planus::WriteAs<::planus::Offset<SetSubchannelsResponse>>
for SetSubchannelsResponseBuilder<(T0,)>
{
Expand All @@ -26768,7 +26712,7 @@ mod root {
}
}

impl<T0: ::planus::WriteAsOptional<::planus::Offset<[u16]>>>
impl<T0: ::planus::WriteAs<::planus::Offset<[u16]>>>
::planus::WriteAsOptional<::planus::Offset<SetSubchannelsResponse>>
for SetSubchannelsResponseBuilder<(T0,)>
{
Expand All @@ -26784,7 +26728,7 @@ mod root {
}
}

impl<T0: ::planus::WriteAsOptional<::planus::Offset<[u16]>>>
impl<T0: ::planus::WriteAs<::planus::Offset<[u16]>>>
::planus::WriteAsOffset<SetSubchannelsResponse>
for SetSubchannelsResponseBuilder<(T0,)>
{
Expand All @@ -26805,22 +26749,16 @@ mod root {
impl<'a> SetSubchannelsResponseRef<'a> {
/// Getter for the [`subchannels` field](SetSubchannelsResponse#structfield.subchannels).
#[inline]
pub fn subchannels(
&self,
) -> ::planus::Result<::core::option::Option<::planus::Vector<'a, u16>>>
{
self.0.access(0, "SetSubchannelsResponse", "subchannels")
pub fn subchannels(&self) -> ::planus::Result<::planus::Vector<'a, u16>> {
self.0
.access_required(0, "SetSubchannelsResponse", "subchannels")
}
}

impl<'a> ::core::fmt::Debug for SetSubchannelsResponseRef<'a> {
fn fmt(&self, f: &mut ::core::fmt::Formatter<'_>) -> ::core::fmt::Result {
let mut f = f.debug_struct("SetSubchannelsResponseRef");
if let ::core::option::Option::Some(field_subchannels) =
self.subchannels().transpose()
{
f.field("subchannels", &field_subchannels);
}
f.field("subchannels", &self.subchannels());
f.finish()
}
}
Expand All @@ -26831,13 +26769,7 @@ mod root {
#[allow(unreachable_code)]
fn try_from(value: SetSubchannelsResponseRef<'a>) -> ::planus::Result<Self> {
::core::result::Result::Ok(Self {
subchannels: if let ::core::option::Option::Some(subchannels) =
value.subchannels()?
{
::core::option::Option::Some(subchannels.to_vec()?)
} else {
::core::option::Option::None
},
subchannels: value.subchannels()?.to_vec()?,
})
}
}
Expand Down
63 changes: 56 additions & 7 deletions rust/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1909,6 +1909,7 @@ pub(crate) struct TransportConsumeDataRequest {
pub(crate) label: String,
pub(crate) protocol: String,
pub(crate) paused: bool,
pub(crate) subchannels: Option<Vec<u16>>,
}

#[derive(Debug)]
Expand All @@ -1919,6 +1920,7 @@ pub(crate) struct TransportConsumeDataResponse {
pub(crate) protocol: String,
pub(crate) paused: bool,
pub(crate) data_producer_paused: bool,
pub(crate) subchannels: Vec<u16>,
}

impl Request for TransportConsumeDataRequest {
Expand Down Expand Up @@ -1949,8 +1951,7 @@ impl Request for TransportConsumeDataRequest {
Some(self.protocol)
},
self.paused,
// TODO.
None::<Vec<u16>>,
self.subchannels,
);
let request_body = request::Body::create_transport_consume_data_request(&mut builder, data);
let request = request::Request::create(
Expand Down Expand Up @@ -1985,6 +1986,7 @@ impl Request for TransportConsumeDataRequest {
protocol: data.protocol.to_string(),
paused: data.paused,
data_producer_paused: data.data_producer_paused,
subchannels: data.subchannels,
})
}
}
Expand Down Expand Up @@ -2830,6 +2832,8 @@ impl Request for DataProducerResumeRequest {
pub(crate) struct DataProducerSendNotification {
pub(crate) ppid: u32,
pub(crate) payload: Vec<u8>,
pub(crate) subchannels: Option<Vec<u16>>,
pub(crate) required_subchannel: Option<u16>,
}

impl Notification for DataProducerSendNotification {
Expand All @@ -2839,17 +2843,14 @@ impl Notification for DataProducerSendNotification {
fn into_bytes(self, handler_id: Self::HandlerId) -> Vec<u8> {
let mut builder = Builder::new();

// TODO: Implement subchannels.
let subchannels: Vec<u16> = vec![];
let required_subchannel: Option<u16> = Default::default();
let binary_data = data_producer::Binary::create(&mut builder, self.payload);
let binary = data_producer::Data::create_binary(&mut builder, binary_data);
let data = data_producer::SendNotification::create(
&mut builder,
self.ppid,
binary,
subchannels,
required_subchannel,
self.subchannels,
self.required_subchannel,
);
let notification_body =
notification::Body::create_data_producer_send_notification(&mut builder, data);
Expand Down Expand Up @@ -3172,6 +3173,54 @@ impl From<DataConsumerSendRequest> for u32 {
}
}

#[derive(Debug, Clone, Serialize)]
pub(crate) struct DataConsumerSetSubchannelsRequest {
pub(crate) subchannels: Vec<u16>,
}

#[derive(Debug, Clone, Serialize)]
pub(crate) struct DataConsumerSetSubchannelsResponse {
pub(crate) subchannels: Vec<u16>,
}

impl Request for DataConsumerSetSubchannelsRequest {
const METHOD: request::Method = request::Method::DataconsumerSetSubchannels;
type HandlerId = DataConsumerId;
type Response = DataConsumerSetSubchannelsResponse;

fn into_bytes(self, id: u32, handler_id: Self::HandlerId) -> Vec<u8> {
let mut builder = Builder::new();

let data = data_consumer::SetSubchannelsRequest::create(&mut builder, self.subchannels);
let request_body =
request::Body::create_data_consumer_set_subchannels_request(&mut builder, data);

let request = request::Request::create(
&mut builder,
id,
Self::METHOD,
handler_id.to_string(),
Some(request_body),
);
let message_body = message::Body::create_request(&mut builder, request);
let message = message::Message::create(&mut builder, message::Type::Request, message_body);

builder.finish(message, None).to_vec()
}

fn convert_response(
response: Option<response::Body>,
) -> Result<Self::Response, Box<dyn Error>> {
let Some(response::Body::DataConsumerSetSubchannelsResponse(data)) = response else {
panic!("Wrong message from worker: {response:?}");
};

Ok(DataConsumerSetSubchannelsResponse {
subchannels: data.subchannels,
})
}
}

#[derive(Debug)]
pub(crate) struct RtpObserverCloseRequest {
pub(crate) rtp_observer_id: RtpObserverId,
Expand Down
Loading