From 30e8a1187b1a8fa25e22cfc6f20f848b62b3a5b0 Mon Sep 17 00:00:00 2001 From: Sreekanth Date: Thu, 16 Jan 2025 18:09:13 +0530 Subject: [PATCH] Use let-else pattern Signed-off-by: Sreekanth --- rust/numaflow-core/src/tracker.rs | 131 ++++++++++++++++-------------- 1 file changed, 70 insertions(+), 61 deletions(-) diff --git a/rust/numaflow-core/src/tracker.rs b/rust/numaflow-core/src/tracker.rs index 875bb6a1a..2c23cef99 100644 --- a/rust/numaflow-core/src/tracker.rs +++ b/rust/numaflow-core/src/tracker.rs @@ -214,38 +214,42 @@ impl Tracker { /// Updates an existing entry in the tracker with the number of expected messages and EOF status. fn handle_update(&mut self, offset: Bytes, responses: Option>) { - if let Some(entry) = self.entries.get_mut(&offset) { - entry.count += 1; + let Some(entry) = self.entries.get_mut(&offset) else { + return; + }; + + entry.count += 1; + entry + .callback_info + .as_mut() + .map(|cb| cb.responses.push(responses)); + + // if the count is zero, we can send an ack immediately + // this is case where map stream will send eof true after + // receiving all the messages. + if entry.count == 0 { + let entry = self.entries.remove(&offset).unwrap(); entry - .callback_info - .as_mut() - .map(|cb| cb.responses.push(responses)); - // if the count is zero, we can send an ack immediately - // this is case where map stream will send eof true after - // receiving all the messages. - if entry.count == 0 { - let entry = self.entries.remove(&offset).unwrap(); - entry - .ack_send - .send(ReadAck::Ack) - .expect("Failed to send ack"); - } + .ack_send + .send(ReadAck::Ack) + .expect("Failed to send ack"); } } fn handle_update_eof(&mut self, offset: Bytes) { - if let Some(entry) = self.entries.get_mut(&offset) { - entry.eof = true; - // if the count is zero, we can send an ack immediately - // this is case where map stream will send eof true after - // receiving all the messages. - if entry.count == 0 { - let entry = self.entries.remove(&offset).unwrap(); - entry - .ack_send - .send(ReadAck::Ack) - .expect("Failed to send ack"); - } + let Some(entry) = self.entries.get_mut(&offset) else { + return; + }; + entry.eof = true; + // if the count is zero, we can send an ack immediately + // this is case where map stream will send eof true after + // receiving all the messages. + if entry.count == 0 { + let entry = self.entries.remove(&offset).unwrap(); + entry + .ack_send + .send(ReadAck::Ack) + .expect("Failed to send ack"); } } @@ -255,52 +259,57 @@ impl Tracker { responses: Vec>>, eof: bool, ) { - if let Some(entry) = self.entries.get_mut(&offset) { - entry.count += responses.len() as u32; - entry.eof = eof; + let Some(entry) = self.entries.get_mut(&offset) else { + return; + }; + + entry.count += responses.len() as u32; + entry.eof = eof; + entry + .callback_info + .as_mut() + .map(|cb| cb.responses.extend(responses)); + + // if the count is zero, we can send an ack immediately + // this is case where map stream will send eof true after + // receiving all the messages. + if entry.count == 0 { + let entry = self.entries.remove(&offset).unwrap(); entry - .callback_info - .as_mut() - .map(|cb| cb.responses.extend(responses)); - // if the count is zero, we can send an ack immediately - // this is case where map stream will send eof true after - // receiving all the messages. - if entry.count == 0 { - let entry = self.entries.remove(&offset).unwrap(); - entry - .ack_send - .send(ReadAck::Ack) - .expect("Failed to send ack"); - } + .ack_send + .send(ReadAck::Ack) + .expect("Failed to send ack"); } } /// Removes an entry from the tracker and sends an acknowledgment if the count is zero /// and the entry is marked as EOF. fn handle_delete(&mut self, offset: Bytes) { - if let Some(mut entry) = self.entries.remove(&offset) { - if entry.count > 0 { - entry.count -= 1; - } - if entry.count == 0 && entry.eof { - entry - .ack_send - .send(ReadAck::Ack) - .expect("Failed to send ack"); - } else { - self.entries.insert(offset, entry); - } + let Some(mut entry) = self.entries.remove(&offset) else { + return; + }; + if entry.count > 0 { + entry.count -= 1; + } + if entry.count == 0 && entry.eof { + entry + .ack_send + .send(ReadAck::Ack) + .expect("Failed to send ack"); + } else { + self.entries.insert(offset, entry); } } /// Discards an entry from the tracker and sends a nak. fn handle_discard(&mut self, offset: Bytes) { - if let Some(entry) = self.entries.remove(&offset) { - entry - .ack_send - .send(ReadAck::Nak) - .expect("Failed to send nak"); - } + let Some(entry) = self.entries.remove(&offset) else { + return; + }; + entry + .ack_send + .send(ReadAck::Nak) + .expect("Failed to send nak"); } /// Discards all entries from the tracker and sends a nak for each.