Skip to content

Commit

Permalink
refactor cmp_datum_iter
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Feb 1, 2025
1 parent 6627008 commit 787a5b1
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 12 deletions.
8 changes: 0 additions & 8 deletions src/common/src/util/sort_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,14 +535,6 @@ pub fn cmp_datum_iter(
})
}

pub fn cmp_datum_iter_le(
lhs: impl IntoIterator<Item = impl ToDatumRef>,
rhs: impl IntoIterator<Item = impl ToDatumRef>,
order_type: impl IntoIterator<Item = OrderType>,
) -> bool {
cmp_datum_iter(lhs, rhs, order_type) != Ordering::Greater
}

/// Partial compare two `Row`s with specified order types.
///
/// NOTE: This function returns `None` if two rows have different schema.
Expand Down
8 changes: 4 additions & 4 deletions src/stream/src/executor/backfill/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use risingwave_common::types::{DataType, Datum};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::epoch::EpochPair;
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_common::util::sort_util::{cmp_datum_iter_le, OrderType};
use risingwave_common::util::sort_util::{cmp_datum_iter, OrderType};
use risingwave_common::util::value_encoding::BasicSerde;
use risingwave_common_rate_limit::RateLimit;
use risingwave_connector::error::ConnectorError;
Expand Down Expand Up @@ -354,7 +354,7 @@ pub(crate) fn mark_chunk_ref_by_vnode<S: StateStore, SD: ValueRowSerde>(
BackfillProgressPerVnode::NotStarted => false,
// If in progress, we need to check row <= current_pos.
BackfillProgressPerVnode::InProgress { current_pos, .. } => {
cmp_datum_iter_le(pk.iter(), current_pos.iter(), pk_order.iter().copied())
cmp_datum_iter(pk.iter(), current_pos.iter(), pk_order.iter().copied()).is_le()
}
};
new_visibility.append(visible);
Expand Down Expand Up @@ -394,7 +394,7 @@ fn mark_chunk_inner(
for (i, (op, row)) in ops.iter().zip_eq_debug(data.rows()).enumerate() {
let lhs = row.project(pk_in_output_indices);
let rhs = current_pos;
let visible = cmp_datum_iter_le(lhs.iter(), rhs.iter(), pk_order.iter().copied());
let visible = cmp_datum_iter(lhs.iter(), rhs.iter(), pk_order.iter().copied()).is_le();
new_visibility.append(visible);

normalize_unmatched_updates(
Expand Down Expand Up @@ -504,7 +504,7 @@ fn mark_cdc_chunk_inner(
if in_binlog_range {
let lhs = row.project(pk_in_output_indices);
let rhs = current_pos;
cmp_datum_iter_le(lhs.iter(), rhs.iter(), pk_order.iter().copied())
cmp_datum_iter(lhs.iter(), rhs.iter(), pk_order.iter().copied()).is_le()
} else {
false
}
Expand Down

0 comments on commit 787a5b1

Please sign in to comment.