Skip to content

Commit

Permalink
use Cow::to_mut
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Feb 1, 2025
1 parent 787a5b1 commit 62ba750
Showing 1 changed file with 10 additions and 34 deletions.
44 changes: 10 additions & 34 deletions src/stream/src/executor/backfill/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::borrow::Cow;
use std::collections::HashMap;
use std::ops::Bound;

Expand Down Expand Up @@ -341,7 +342,7 @@ pub(crate) fn mark_chunk_ref_by_vnode<S: StateStore, SD: ValueRowSerde>(
let (data, ops) = chunk.into_parts();
let mut new_visibility = BitmapBuilder::with_capacity(ops.len());

let mut new_ops: Option<Vec<Op>> = None;
let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
let mut unmatched_update_delete = false;
let mut visible_update_delete = false;
for (i, (op, row)) in ops.iter().zip_eq_debug(data.rows()).enumerate() {
Expand All @@ -360,7 +361,6 @@ pub(crate) fn mark_chunk_ref_by_vnode<S: StateStore, SD: ValueRowSerde>(
new_visibility.append(visible);

normalize_unmatched_updates(
&ops,
&mut new_ops,
&mut unmatched_update_delete,
&mut visible_update_delete,
Expand All @@ -370,10 +370,7 @@ pub(crate) fn mark_chunk_ref_by_vnode<S: StateStore, SD: ValueRowSerde>(
);
}
let (columns, _) = data.into_parts();
let chunk = match new_ops {
Some(new_ops) => StreamChunk::with_visibility(new_ops, columns, new_visibility.finish()),
None => StreamChunk::with_visibility(ops, columns, new_visibility.finish()),
};
let chunk = StreamChunk::with_visibility(new_ops, columns, new_visibility.finish());
Ok(chunk)
}

Expand All @@ -388,7 +385,7 @@ fn mark_chunk_inner(
) -> StreamChunk {
let (data, ops) = chunk.into_parts();
let mut new_visibility = BitmapBuilder::with_capacity(ops.len());
let mut new_ops: Option<Vec<Op>> = None;
let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
let mut unmatched_update_delete = false;
let mut visible_update_delete = false;
for (i, (op, row)) in ops.iter().zip_eq_debug(data.rows()).enumerate() {
Expand All @@ -398,7 +395,6 @@ fn mark_chunk_inner(
new_visibility.append(visible);

normalize_unmatched_updates(
&ops,
&mut new_ops,
&mut unmatched_update_delete,
&mut visible_update_delete,
Expand All @@ -408,10 +404,7 @@ fn mark_chunk_inner(
);
}
let (columns, _) = data.into_parts();
match new_ops {
Some(new_ops) => StreamChunk::with_visibility(new_ops, columns, new_visibility.finish()),
None => StreamChunk::with_visibility(ops, columns, new_visibility.finish()),
}
StreamChunk::with_visibility(ops, columns, new_visibility.finish())
}

/// We will rewrite unmatched U-/U+ into +/- ops.
Expand All @@ -422,8 +415,7 @@ fn mark_chunk_inner(
/// This hanging U-/U+ can lead to issues downstream, since we work with an assumption in the
/// system that there's never hanging U-/U+.
fn normalize_unmatched_updates(
original_ops: &[Op],
normalized_ops: &mut Option<Vec<Op>>,
normalized_ops: &mut Cow<'_, [Op]>,
unmatched_update_delete: &mut bool,
visible_update_delete: &mut bool,
current_visibility: bool,
Expand All @@ -436,29 +428,13 @@ fn normalize_unmatched_updates(
match (visible_update_delete, visible_update_insert) {
(true, false) => {
// Lazily clone the ops here.
match normalized_ops {
Some(ref mut new_ops_inner) => {
new_ops_inner[current_op_index - 1] = Op::Delete;
}
None => {
let mut new_ops_inner = original_ops.to_vec();
new_ops_inner[current_op_index - 1] = Op::Delete;
*normalized_ops = Some(new_ops_inner);
}
}
let ops = normalized_ops.to_mut();
ops[current_op_index - 1] = Op::Delete;
}
(false, true) => {
// Lazily clone the ops here.
match normalized_ops {
Some(ref mut new_ops_inner) => {
new_ops_inner[current_op_index] = Op::Insert;
}
None => {
let mut new_ops_inner = original_ops.to_vec();
new_ops_inner[current_op_index] = Op::Insert;
*normalized_ops = Some(new_ops_inner);
}
}
let ops = normalized_ops.to_mut();
ops[current_op_index] = Op::Insert;
}
(true, true) | (false, false) => {}
}
Expand Down

0 comments on commit 62ba750

Please sign in to comment.