Skip to content

Commit

Permalink
Use let-else pattern
Browse files Browse the repository at this point in the history
Signed-off-by: Sreekanth <[email protected]>
  • Loading branch information
BulkBeing committed Jan 16, 2025
1 parent a1b52dc commit 30e8a11
Showing 1 changed file with 70 additions and 61 deletions.
131 changes: 70 additions & 61 deletions rust/numaflow-core/src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,38 +214,42 @@ impl Tracker {

/// Updates an existing entry in the tracker with the number of expected messages and EOF status.
fn handle_update(&mut self, offset: Bytes, responses: Option<Vec<String>>) {
if let Some(entry) = self.entries.get_mut(&offset) {
entry.count += 1;
let Some(entry) = self.entries.get_mut(&offset) else {
return;
};

entry.count += 1;
entry
.callback_info
.as_mut()
.map(|cb| cb.responses.push(responses));

// if the count is zero, we can send an ack immediately
// this is case where map stream will send eof true after
// receiving all the messages.
if entry.count == 0 {
let entry = self.entries.remove(&offset).unwrap();
entry
.callback_info
.as_mut()
.map(|cb| cb.responses.push(responses));
// if the count is zero, we can send an ack immediately
// this is case where map stream will send eof true after
// receiving all the messages.
if entry.count == 0 {
let entry = self.entries.remove(&offset).unwrap();
entry
.ack_send
.send(ReadAck::Ack)
.expect("Failed to send ack");
}
.ack_send
.send(ReadAck::Ack)
.expect("Failed to send ack");
}
}

fn handle_update_eof(&mut self, offset: Bytes) {
if let Some(entry) = self.entries.get_mut(&offset) {
entry.eof = true;
// if the count is zero, we can send an ack immediately
// this is case where map stream will send eof true after
// receiving all the messages.
if entry.count == 0 {
let entry = self.entries.remove(&offset).unwrap();
entry
.ack_send
.send(ReadAck::Ack)
.expect("Failed to send ack");
}
let Some(entry) = self.entries.get_mut(&offset) else {
return;
};
entry.eof = true;
// if the count is zero, we can send an ack immediately
// this is case where map stream will send eof true after
// receiving all the messages.
if entry.count == 0 {
let entry = self.entries.remove(&offset).unwrap();
entry
.ack_send
.send(ReadAck::Ack)
.expect("Failed to send ack");
}
}

Expand All @@ -255,52 +259,57 @@ impl Tracker {
responses: Vec<Option<Vec<String>>>,
eof: bool,
) {
if let Some(entry) = self.entries.get_mut(&offset) {
entry.count += responses.len() as u32;
entry.eof = eof;
let Some(entry) = self.entries.get_mut(&offset) else {
return;
};

entry.count += responses.len() as u32;
entry.eof = eof;
entry
.callback_info
.as_mut()
.map(|cb| cb.responses.extend(responses));

// if the count is zero, we can send an ack immediately
// this is case where map stream will send eof true after
// receiving all the messages.
if entry.count == 0 {
let entry = self.entries.remove(&offset).unwrap();
entry
.callback_info
.as_mut()
.map(|cb| cb.responses.extend(responses));
// if the count is zero, we can send an ack immediately
// this is case where map stream will send eof true after
// receiving all the messages.
if entry.count == 0 {
let entry = self.entries.remove(&offset).unwrap();
entry
.ack_send
.send(ReadAck::Ack)
.expect("Failed to send ack");
}
.ack_send
.send(ReadAck::Ack)
.expect("Failed to send ack");
}
}

/// Removes an entry from the tracker and sends an acknowledgment if the count is zero
/// and the entry is marked as EOF.
fn handle_delete(&mut self, offset: Bytes) {
if let Some(mut entry) = self.entries.remove(&offset) {
if entry.count > 0 {
entry.count -= 1;
}
if entry.count == 0 && entry.eof {
entry
.ack_send
.send(ReadAck::Ack)
.expect("Failed to send ack");
} else {
self.entries.insert(offset, entry);
}
let Some(mut entry) = self.entries.remove(&offset) else {
return;
};
if entry.count > 0 {
entry.count -= 1;
}
if entry.count == 0 && entry.eof {
entry
.ack_send
.send(ReadAck::Ack)
.expect("Failed to send ack");
} else {
self.entries.insert(offset, entry);
}
}

/// Discards an entry from the tracker and sends a nak.
fn handle_discard(&mut self, offset: Bytes) {
if let Some(entry) = self.entries.remove(&offset) {
entry
.ack_send
.send(ReadAck::Nak)
.expect("Failed to send nak");
}
let Some(entry) = self.entries.remove(&offset) else {
return;
};
entry
.ack_send
.send(ReadAck::Nak)
.expect("Failed to send nak");
}

/// Discards all entries from the tracker and sends a nak for each.
Expand Down

0 comments on commit 30e8a11

Please sign in to comment.