Skip to content

Commit

Permalink
Reduce TX/RX buffers allocation (#1749)
Browse files Browse the repository at this point in the history
* Lazy allocation of batches in transmission pipeline

* Add config support for lazy allocation of tx batches

* Fix typos

* Properly consider config parameters

* Limit allocation when really needed

* Remove unnecessary println

* Default to 1 RX buffer
  • Loading branch information
Mallets authored Feb 3, 2025
1 parent 732196f commit 6360eb6
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 14 deletions.
6 changes: 6 additions & 0 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,12 @@
/// The maximum time limit (in ms) a message should be retained for batching when back-pressure happens.
time_limit: 1,
},
allocation: {
/// Mode for memory allocation of batches in the priority queues.
/// - "init": batches are allocated at queue initialization time.
/// - "lazy": batches are allocated when needed up to the maximum number of batches configured in the size configuration parameter.
mode: "init",
},
},
},
/// Configure the zenoh RX parameters of a link
Expand Down
15 changes: 15 additions & 0 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,13 @@ validated_struct::validator! {
/// The maximum time limit (in ms) a message should be retained for batching when back-pressure happens.
time_limit: u64,
},
/// Perform lazy memory allocation of batches in the prioritiey queues. If set to false all batches are initialized at
/// initialization time. If set to true the batches will be allocated when needed up to the maximum number of batches
/// configured in the size configuration parameter.
pub allocation: #[derive(Default, Copy, PartialEq, Eq)]
QueueAllocConf {
pub mode: QueueAllocMode,
},
},
// Number of threads used for TX
threads: usize,
Expand Down Expand Up @@ -619,6 +626,14 @@ validated_struct::validator! {
}
}

#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum QueueAllocMode {
#[default]
Init,
Lazy,
}

impl Default for PermissionsConf {
fn default() -> Self {
PermissionsConf {
Expand Down
40 changes: 33 additions & 7 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use zenoh_buffers::{
ZBuf,
};
use zenoh_codec::{transport::batch::BatchError, WCodec, Zenoh080};
use zenoh_config::QueueSizeConf;
use zenoh_config::{QueueAllocConf, QueueAllocMode, QueueSizeConf};
use zenoh_core::zlock;
use zenoh_protocol::{
core::Priority,
Expand All @@ -55,6 +55,8 @@ const RBLEN: usize = QueueSizeConf::MAX;
struct StageInRefill {
n_ref_r: Waiter,
s_ref_r: RingBufferReader<WBatch, RBLEN>,
batch_config: (usize, BatchConfig),
batch_allocs: usize,
}

#[derive(Debug)]
Expand All @@ -68,7 +70,14 @@ impl std::error::Error for TransportClosed {}

impl StageInRefill {
fn pull(&mut self) -> Option<WBatch> {
self.s_ref_r.pull()
match self.s_ref_r.pull() {
Some(b) => Some(b),
None if self.batch_allocs < self.batch_config.0 => {
self.batch_allocs += 1;
Some(WBatch::new(self.batch_config.1))
}
None => None,
}
}

fn wait(&self) -> bool {
Expand Down Expand Up @@ -637,6 +646,7 @@ pub(crate) struct TransmissionPipelineConf {
pub(crate) wait_before_close: Duration,
pub(crate) batching_enabled: bool,
pub(crate) batching_time_limit: Duration,
pub(crate) queue_alloc: QueueAllocConf,
}

// A 2-stage transmission pipeline
Expand Down Expand Up @@ -667,10 +677,14 @@ impl TransmissionPipeline {
// Create the refill ring buffer
// This is a SPSC ring buffer
let (mut s_ref_w, s_ref_r) = RingBuffer::<WBatch, RBLEN>::init();
// Fill the refill ring buffer with batches
for _ in 0..*num {
let batch = WBatch::new(config.batch);
assert!(s_ref_w.push(batch).is_none());
let mut batch_allocs = 0;
if *config.queue_alloc.mode() == QueueAllocMode::Init {
// Fill the refill ring buffer with batches
for _ in 0..*num {
let batch = WBatch::new(config.batch);
batch_allocs += 1;
assert!(s_ref_w.push(batch).is_none());
}
}
// Create the channel for notifying that new batches are in the refill ring buffer
// This is a SPSC channel
Expand All @@ -689,7 +703,12 @@ impl TransmissionPipeline {
});

stage_in.push(Mutex::new(StageIn {
s_ref: StageInRefill { n_ref_r, s_ref_r },
s_ref: StageInRefill {
n_ref_r,
s_ref_r,
batch_config: (*num, config.batch),
batch_allocs,
},
s_out: StageInOut {
n_out_w: n_out_w.clone(),
s_out_w,
Expand Down Expand Up @@ -963,6 +982,7 @@ mod tests {
ZBuf,
};
use zenoh_codec::{RCodec, Zenoh080};
use zenoh_config::{QueueAllocConf, QueueAllocMode};
use zenoh_protocol::{
core::{Bits, CongestionControl, Encoding, Priority},
network::{ext, Push},
Expand All @@ -988,6 +1008,9 @@ mod tests {
wait_before_drop: (Duration::from_millis(1), Duration::from_millis(1024)),
wait_before_close: Duration::from_secs(5),
batching_time_limit: Duration::from_micros(1),
queue_alloc: QueueAllocConf {
mode: QueueAllocMode::Init,
},
};

const CONFIG_NOT_STREAMED: TransmissionPipelineConf = TransmissionPipelineConf {
Expand All @@ -1002,6 +1025,9 @@ mod tests {
wait_before_drop: (Duration::from_millis(1), Duration::from_millis(1024)),
wait_before_close: Duration::from_secs(5),
batching_time_limit: Duration::from_micros(1),
queue_alloc: QueueAllocConf {
mode: QueueAllocMode::Init,
},
};

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
Expand Down
12 changes: 11 additions & 1 deletion io/zenoh-transport/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration};

use rand::{RngCore, SeedableRng};
use tokio::sync::Mutex as AsyncMutex;
use zenoh_config::{Config, LinkRxConf, QueueConf, QueueSizeConf};
use zenoh_config::{Config, LinkRxConf, QueueAllocConf, QueueConf, QueueSizeConf};
use zenoh_crypto::{BlockCipher, PseudoRng};
use zenoh_link::NewLinkChannelSender;
use zenoh_protocol::{
Expand Down Expand Up @@ -112,6 +112,7 @@ pub struct TransportManagerConfig {
pub wait_before_close: Duration,
pub queue_size: [usize; Priority::NUM],
pub queue_backoff: Duration,
pub queue_alloc: QueueAllocConf,
pub defrag_buff_size: usize,
pub link_rx_buffer_size: usize,
pub unicast: TransportManagerConfigUnicast,
Expand Down Expand Up @@ -143,6 +144,7 @@ pub struct TransportManagerBuilder {
wait_before_drop: (Duration, Duration),
wait_before_close: Duration,
queue_size: QueueSizeConf,
queue_alloc: QueueAllocConf,
defrag_buff_size: usize,
link_rx_buffer_size: usize,
unicast: TransportManagerBuilderUnicast,
Expand Down Expand Up @@ -191,6 +193,11 @@ impl TransportManagerBuilder {
self
}

pub fn queue_alloc(mut self, queue_alloc: QueueAllocConf) -> Self {
self.queue_alloc = queue_alloc;
self
}

pub fn wait_before_drop(mut self, wait_before_drop: (Duration, Duration)) -> Self {
self.wait_before_drop = wait_before_drop;
self
Expand Down Expand Up @@ -266,6 +273,7 @@ impl TransportManagerBuilder {
));
self = self.wait_before_close(duration_from_i64us(*cc_block.wait_before_close()));
self = self.queue_size(link.tx().queue().size().clone());
self = self.queue_alloc(*link.tx().queue().allocation());
self = self.tx_threads(*link.tx().threads());
self = self.protocols(link.protocols().clone());

Expand Down Expand Up @@ -326,6 +334,7 @@ impl TransportManagerBuilder {
wait_before_close: self.wait_before_close,
queue_size,
queue_backoff: self.batching_time_limit,
queue_alloc: self.queue_alloc,
defrag_buff_size: self.defrag_buff_size,
link_rx_buffer_size: self.link_rx_buffer_size,
unicast: unicast.config,
Expand Down Expand Up @@ -377,6 +386,7 @@ impl Default for TransportManagerBuilder {
),
wait_before_close: duration_from_i64us(*cc_block.wait_before_close()),
queue_size: queue.size,
queue_alloc: queue.allocation,
batching_time_limit: Duration::from_millis(backoff),
defrag_buff_size: *link_rx.max_message_size(),
link_rx_buffer_size: *link_rx.buffer_size(),
Expand Down
6 changes: 4 additions & 2 deletions io/zenoh-transport/src/multicast/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ impl TransportLinkMulticastUniversal {
wait_before_close: self.transport.manager.config.wait_before_close,
batching_enabled: self.transport.manager.config.batching,
batching_time_limit: self.transport.manager.config.queue_backoff,
queue_alloc: self.transport.manager.config.queue_alloc,
};
// The pipeline
let (producer, consumer) = TransmissionPipeline::make(tpc, &priority_tx);
Expand Down Expand Up @@ -543,8 +544,9 @@ async fn rx_task(
// The pool of buffers
let mtu = link.inner.config.batch.mtu as usize;
let mut n = rx_buffer_size / mtu;
if rx_buffer_size % mtu != 0 {
n += 1;
if n == 0 {
tracing::debug!("RX configured buffer of {rx_buffer_size} bytes is too small for {link} that has an MTU of {mtu} bytes. Defaulting to {mtu} bytes for RX buffer.");
n = 1;
}

let pool = RecyclingObjectPool::new(n, || vec![0_u8; mtu].into_boxed_slice());
Expand Down
5 changes: 3 additions & 2 deletions io/zenoh-transport/src/unicast/lowlatency/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,9 @@ impl TransportUnicastLowlatency {
let pool = {
let mtu = link_rx.config.batch.mtu as usize;
let mut n = rx_buffer_size / mtu;
if rx_buffer_size % mtu != 0 {
n += 1;
if n == 0 {
tracing::debug!("RX configured buffer of {rx_buffer_size} bytes is too small for {link_rx} that has an MTU of {mtu} bytes. Defaulting to {mtu} bytes for RX buffer.");
n = 1;
}
zenoh_sync::RecyclingObjectPool::new(n, move || vec![0_u8; mtu].into_boxed_slice())
};
Expand Down
6 changes: 4 additions & 2 deletions io/zenoh-transport/src/unicast/universal/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ impl TransportLinkUnicastUniversal {
wait_before_close: transport.manager.config.wait_before_close,
batching_enabled: transport.manager.config.batching,
batching_time_limit: transport.manager.config.queue_backoff,
queue_alloc: transport.manager.config.queue_alloc,
};

// The pipeline
Expand Down Expand Up @@ -261,8 +262,9 @@ async fn rx_task(
// The pool of buffers
let mtu = link.config.batch.mtu as usize;
let mut n = rx_buffer_size / mtu;
if rx_buffer_size % mtu != 0 {
n += 1;
if n == 0 {
tracing::debug!("RX configured buffer of {rx_buffer_size} bytes is too small for {link} that has an MTU of {mtu} bytes. Defaulting to {mtu} bytes for RX buffer.");
n = 1;
}

let pool = RecyclingObjectPool::new(n, || vec![0_u8; mtu].into_boxed_slice());
Expand Down

0 comments on commit 6360eb6

Please sign in to comment.