Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix clippy errors #259

Merged
merged 3 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 16 additions & 27 deletions src/socket/filter/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,7 @@ use std::{
/// cache's `time_window`.
pub const ENFORCED_SIZE_TIME: u64 = 1;

pub struct ReceivedPacket<T> {
/// The source that sent us the packet.
pub content: T,
/// The time the packet was received.
pub received: Instant,
}

pub struct ReceivedPacketCache<T> {
pub struct ReceivedPacketCache {
/// The target number of entries per ENFORCED_SIZE_TIME before inserting new elements reports
/// failure. The maximum size of the cache is target*time_window
target: usize,
Expand All @@ -39,11 +32,11 @@ pub struct ReceivedPacketCache<T> {
time_window: u64,
/// This stores the current number of messages that are within the `ENFORCED_SIZE_TIME`.
within_enforced_time: usize,
/// The underlying data structure.
inner: VecDeque<ReceivedPacket<T>>,
/// The underlying data structure. It stores the time when a packet was received.
inner: VecDeque<Instant>,
}

impl<T> ReceivedPacketCache<T> {
impl ReceivedPacketCache {
/// Creates a new `ReceivedPacketCache` with a specified size from which no more can enter.
pub fn new(target: usize, time_window: u64) -> Self {
Self {
Expand All @@ -56,21 +49,21 @@ impl<T> ReceivedPacketCache<T> {

/// Remove expired packets. We only keep, `CACHE_TIME` of data in the cache.
pub fn reset(&mut self) {
while let Some(packet) = self.inner.pop_front() {
if packet.received
while let Some(received_at) = self.inner.pop_front() {
if received_at
> Instant::now()
.checked_sub(Duration::from_secs(self.time_window))
.unwrap()
{
// add the packet back and end
self.inner.push_front(packet);
self.inner.push_front(received_at);
break;
}
}
// update the within_enforced_time
let mut count = 0;
for packet in self.inner.iter().rev() {
if packet.received
for received_at in self.inner.iter().rev() {
if *received_at
> Instant::now()
.checked_sub(Duration::from_secs(ENFORCED_SIZE_TIME))
.unwrap()
Expand All @@ -84,37 +77,33 @@ impl<T> ReceivedPacketCache<T> {
}

/// Inserts an element into the cache, removing any expired elements.
pub fn cache_insert(&mut self, content: T) -> bool {
pub fn cache_insert(&mut self) -> bool {
self.reset();
self.internal_insert(content)
self.internal_insert()
}

/// Inserts an element into the cache without removing expired elements.
fn internal_insert(&mut self, content: T) -> bool {
fn internal_insert(&mut self) -> bool {
if self.within_enforced_time >= self.target {
// Reached the target
false
} else {
let received_packet = ReceivedPacket {
content,
received: Instant::now(),
};
self.inner.push_back(received_packet);
self.inner.push_back(Instant::now());
self.within_enforced_time += 1;
true
}
}
}

impl<T> std::ops::Deref for ReceivedPacketCache<T> {
type Target = VecDeque<ReceivedPacket<T>>;
impl std::ops::Deref for ReceivedPacketCache {
type Target = VecDeque<Instant>;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl<T> std::ops::DerefMut for ReceivedPacketCache<T> {
impl std::ops::DerefMut for ReceivedPacketCache {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
Expand Down
14 changes: 6 additions & 8 deletions src/socket/filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub(crate) struct Filter {
/// An ordered (by time) collection of recently seen packets by SocketAddr. The packet data is not
/// stored here. This stores 5 seconds of history to calculate a 5 second moving average for
/// the metrics.
raw_packets_received: ReceivedPacketCache<SocketAddr>,
raw_packets_received: ReceivedPacketCache,
/// The duration that bans by this filter last.
ban_duration: Option<Duration>,
/// Keep track of node ids per socket. If someone is using too many node-ids per IP, they can
Expand Down Expand Up @@ -86,19 +86,19 @@ impl Filter {
/// The first check. This determines if a new UDP packet should be decoded or dropped.
/// Only unsolicited packets arrive here.
pub fn initial_pass(&mut self, src: &SocketAddr) -> bool {
if PERMIT_BAN_LIST.read().permit_ips.get(&src.ip()).is_some() {
if PERMIT_BAN_LIST.read().permit_ips.contains(&src.ip()) {
return true;
}

if PERMIT_BAN_LIST.read().ban_ips.get(&src.ip()).is_some() {
if PERMIT_BAN_LIST.read().ban_ips.contains_key(&src.ip()) {
debug!("Dropped unsolicited packet from banned src: {:?}", src);
return false;
}

// Add the un-solicited request to the cache
// If this is over the maximum requests per ENFORCED_SIZE_TIME, it will not be added, we
// leave the rate limiter to enforce the rate limits..
self.raw_packets_received.cache_insert(*src);
self.raw_packets_received.cache_insert();

// build the metrics
METRICS
Expand Down Expand Up @@ -135,17 +135,15 @@ impl Filter {
if PERMIT_BAN_LIST
.read()
.permit_nodes
.get(&node_address.node_id)
.is_some()
.contains(&node_address.node_id)
{
return true;
}

if PERMIT_BAN_LIST
.read()
.ban_nodes
.get(&node_address.node_id)
.is_some()
.contains_key(&node_address.node_id)
{
debug!(
"Dropped unsolicited packet from banned node_id: {}",
Expand Down
Loading