From 3ab88af5f91dec6a0c3ad5b9d540712ce2a9c054 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Mon, 3 Feb 2025 18:03:24 +0800 Subject: [PATCH] add unit test + fix typo --- src/stream/src/executor/backfill/utils.rs | 113 +++++++++++++++++++++- 1 file changed, 112 insertions(+), 1 deletion(-) diff --git a/src/stream/src/executor/backfill/utils.rs b/src/stream/src/executor/backfill/utils.rs index 7128c55986a47..1f001cfd859c7 100644 --- a/src/stream/src/executor/backfill/utils.rs +++ b/src/stream/src/executor/backfill/utils.rs @@ -404,7 +404,7 @@ fn mark_chunk_inner( ); } let (columns, _) = data.into_parts(); - StreamChunk::with_visibility(ops, columns, new_visibility.finish()) + StreamChunk::with_visibility(new_ops, columns, new_visibility.finish()) } /// We will rewrite unmatched U-/U+ into +/- ops. @@ -856,3 +856,114 @@ pub fn create_builder( }; DataChunkBuilder::new(data_types, batch_size) } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + + #[test] + fn test_normalizing_unmatched_updates() { + let ops = vec![ + Op::UpdateDelete, + Op::UpdateInsert, + Op::UpdateDelete, + Op::UpdateInsert, + ]; + let ops: Arc<[Op]> = ops.into(); + + { + let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref()); + let mut unmatched_update_delete = true; + let mut visible_update_delete = true; + let current_visibility = true; + normalize_unmatched_updates( + &mut new_ops, + &mut unmatched_update_delete, + &mut visible_update_delete, + current_visibility, + 1, + &Op::UpdateInsert, + ); + assert_eq!( + &new_ops[..], + vec![ + Op::UpdateDelete, + Op::UpdateInsert, + Op::UpdateDelete, + Op::UpdateInsert + ] + ); + } + { + let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref()); + let mut unmatched_update_delete = true; + let mut visible_update_delete = false; + let current_visibility = false; + normalize_unmatched_updates( + &mut new_ops, + &mut unmatched_update_delete, + &mut visible_update_delete, + current_visibility, + 1, + &Op::UpdateInsert, + ); + assert_eq!( + &new_ops[..], + vec![ + Op::UpdateDelete, + Op::UpdateInsert, + Op::UpdateDelete, + Op::UpdateInsert + ] + ); + } + { + let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref()); + let mut unmatched_update_delete = true; + let mut visible_update_delete = true; + let current_visibility = false; + normalize_unmatched_updates( + &mut new_ops, + &mut unmatched_update_delete, + &mut visible_update_delete, + current_visibility, + 1, + &Op::UpdateInsert, + ); + assert_eq!( + &new_ops[..], + vec![ + Op::Delete, + Op::UpdateInsert, + Op::UpdateDelete, + Op::UpdateInsert + ] + ); + } + { + let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref()); + let mut unmatched_update_delete = true; + let mut visible_update_delete = false; + let current_visibility = true; + normalize_unmatched_updates( + &mut new_ops, + &mut unmatched_update_delete, + &mut visible_update_delete, + current_visibility, + 1, + &Op::UpdateInsert, + ); + assert_eq!( + &new_ops[..], + vec![ + Op::UpdateDelete, + Op::Insert, + Op::UpdateDelete, + Op::UpdateInsert + ] + ); + } + } +}