From 7fbbb293bd0c0978fa438f966101aa23d40b8f8f Mon Sep 17 00:00:00 2001 From: Bastian Gruber Date: Wed, 24 Apr 2024 15:37:56 -0300 Subject: [PATCH] fix: sec updates, update select loop --- Cargo.lock | 48 +++++++++---------- .../src/task_manager/mod.rs | 34 ++++++------- 2 files changed, 39 insertions(+), 43 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 14fbf4735..2e56c9525 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2835,7 +2835,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35bd3cf68c183738046838e300353e4716c674dc5e56890de4826801a6622a28" dependencies = [ "futures-io", - "rustls 0.21.10", + "rustls 0.21.11", ] [[package]] @@ -3019,9 +3019,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.3.24" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb2c4422095b67ee78da96fbb51a4cc413b3b25883c7717ff7ca1ab31022c9c9" +checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8" dependencies = [ "bytes", "fnv", @@ -3038,9 +3038,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.4.2" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31d030e59af851932b72ceebadf4a2b5986dba4c3b99dd2493f8273a0f151943" +checksum = "816ec7294445779408f36fe57bc5b7fc1cf59664059096c65f905c1c61f58069" dependencies = [ "bytes", "fnv", @@ -3374,14 +3374,14 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2 0.3.24", + "h2 0.3.26", "http 0.2.11", "http-body 0.4.6", "httparse", "httpdate", "itoa", "pin-project-lite", - "socket2 0.5.5", + "socket2 0.4.10", "tokio", "tower-service", "tracing", @@ -3397,7 +3397,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "h2 0.4.2", + "h2 0.4.4", "http 1.0.0", "http-body 1.0.0", "httparse", @@ -3416,7 +3416,7 @@ dependencies = [ "futures-util", "http 0.2.11", "hyper 0.14.28", - "rustls 0.21.10", + "rustls 0.21.11", "tokio", "tokio-rustls 0.24.1", ] @@ -4233,7 +4233,7 @@ dependencies = [ "quinn", "rand", "ring 0.16.20", - "rustls 0.21.10", + "rustls 0.21.11", "socket2 0.5.5", "thiserror", "tokio", @@ -4345,7 +4345,7 @@ dependencies = [ "libp2p-identity", "rcgen", "ring 0.16.20", - "rustls 0.21.10", + "rustls 0.21.11", "rustls-webpki 0.101.7", "thiserror", "x509-parser", @@ -6004,7 +6004,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash", - "rustls 0.21.10", + "rustls 0.21.11", "thiserror", "tokio", "tracing", @@ -6020,7 +6020,7 @@ dependencies = [ "rand", "ring 0.16.20", "rustc-hash", - "rustls 0.21.10", + "rustls 0.21.11", "slab", "thiserror", "tinyvec", @@ -6226,7 +6226,7 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2 0.3.24", + "h2 0.3.26", "http 0.2.11", "http-body 0.4.6", "hyper 0.14.28", @@ -6240,7 +6240,7 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls 0.21.10", + "rustls 0.21.11", "rustls-pemfile 1.0.4", "serde", "serde_json", @@ -6456,9 +6456,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.10" +version = "0.21.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba" +checksum = "7fecbfb7b1444f477b345853b1fce097a2c6fb637b2bfb87e6bc5db0f043fae4" dependencies = [ "log", "ring 0.17.7", @@ -6468,9 +6468,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.22.2" +version = "0.22.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e87c9956bd9807afa1f77e0f7594af32566e830e088a5576d27c5b6f30f49d41" +checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432" dependencies = [ "log", "ring 0.17.7", @@ -7521,7 +7521,7 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ - "rustls 0.21.10", + "rustls 0.21.11", "tokio", ] @@ -7531,7 +7531,7 @@ version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" dependencies = [ - "rustls 0.22.2", + "rustls 0.22.4", "rustls-pki-types", "tokio", ] @@ -7556,7 +7556,7 @@ checksum = "212d5dcb2a1ce06d81107c3d0ffa3121fe974b73f068c8282cb1c32328113b6c" dependencies = [ "futures-util", "log", - "rustls 0.21.10", + "rustls 0.21.11", "tokio", "tokio-rustls 0.24.1", "tungstenite", @@ -7679,7 +7679,7 @@ dependencies = [ "axum 0.6.20", "base64 0.21.7", "bytes", - "h2 0.3.24", + "h2 0.3.26", "http 0.2.11", "http-body 0.4.6", "hyper 0.14.28", @@ -8608,7 +8608,7 @@ dependencies = [ "httparse", "log", "rand", - "rustls 0.21.10", + "rustls 0.21.11", "sha1", "thiserror", "url", diff --git a/crates/topos-tce-broadcast/src/task_manager/mod.rs b/crates/topos-tce-broadcast/src/task_manager/mod.rs index 83931e7ca..10ba5c53d 100644 --- a/crates/topos-tce-broadcast/src/task_manager/mod.rs +++ b/crates/topos-tce-broadcast/src/task_manager/mod.rs @@ -108,7 +108,6 @@ impl TaskManager { pub async fn run(mut self, shutdown_receiver: CancellationToken) { let mut pending_certificate_interval = tokio::time::interval(Duration::from_micros(500)); - let mut message_interval = tokio::time::interval(Duration::from_millis(100)); loop { tokio::select! { @@ -118,29 +117,26 @@ impl TaskManager { self.next_pending_certificate(); } - _ = message_interval.tick() => { - if let Some(msg) = self.message_receiver.recv().await { - match msg { - DoubleEchoCommand::Echo { certificate_id, .. } | DoubleEchoCommand::Ready { certificate_id, .. } => { - if let Some(task_context) = self.tasks.get(&certificate_id) { - _ = task_context.sink.send(msg).await; - } else { - self.buffered_messages - .entry(certificate_id) - .or_default() - .push(msg); - }; - } - DoubleEchoCommand::Broadcast { ref cert, need_gossip, pending_id } => { - trace!("Received broadcast message for certificate {} ", cert.id); + let Some(msg) = self.message_receiver.recv() { + match msg { + DoubleEchoCommand::Echo { certificate_id, .. } | DoubleEchoCommand::Ready { certificate_id, .. } => { + if let Some(task_context) = self.tasks.get(&certificate_id) { + _ = task_context.sink.send(msg).await; + } else { + self.buffered_messages + .entry(certificate_id) + .or_default() + .push(msg); + }; + } + DoubleEchoCommand::Broadcast { ref cert, need_gossip, pending_id } => { + trace!("Received broadcast message for certificate {} ", cert.id); - self.create_task(cert, need_gossip, pending_id) - } + self.create_task(cert, need_gossip, pending_id) } } } - Some((certificate_id, status)) = self.running_tasks.next() => { if let TaskStatus::Success = status { trace!("Task for certificate {} finished successfully", certificate_id);