Skip to content

Commit

Permalink
feat(storage): support non_pk_prefix_watermark state cleaning (#19889)
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k authored Feb 6, 2025
1 parent c9ca12e commit c3a7a99
Show file tree
Hide file tree
Showing 34 changed files with 1,864 additions and 343 deletions.
3 changes: 3 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ message TableWatermarks {

// The direction of the table watermark.
bool is_ascending = 2;

// The table watermark is non-pk prefix table watermark.
bool is_non_pk_prefix = 3;
}

message EpochNewChangeLog {
Expand Down
12 changes: 12 additions & 0 deletions src/common/src/util/row_serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@ impl OrderedRowSerde {
}
}

#[must_use]
pub fn index(&self, idx: usize) -> Cow<'_, Self> {
if 1 == self.order_types.len() {
Cow::Borrowed(self)
} else {
Cow::Owned(Self {
schema: vec![self.schema[idx].clone()],
order_types: vec![self.order_types[idx]],
})
}
}

/// Note: prefer [`Row::memcmp_serialize`] if possible.
pub fn serialize(&self, row: impl Row, append_to: impl BufMut) {
self.serialize_datums(row.iter(), append_to)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ fn should_delete_key_by_watermark(
let Some(w) = watermark.vnode_watermarks.get(&vnode) else {
return false;
};
watermark.direction.filter_by_watermark(key, w)
watermark.direction.key_filter_by_watermark(key, w)
}

#[cfg(test)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
safe_epoch_read_table_watermarks_impl, safe_epoch_table_watermarks_impl,
};
use risingwave_hummock_sdk::table_watermark::{ReadTableWatermark, TableWatermarks};
use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
use risingwave_hummock_sdk::table_watermark::{
ReadTableWatermark, TableWatermarks, WatermarkSerdeType,
};
use risingwave_hummock_sdk::HummockCompactionTaskId;
use risingwave_pb::hummock::compact_task::TaskType;

Expand All @@ -42,17 +43,18 @@ impl CompactionSelector for VnodeWatermarkCompactionSelector {
level_handlers,
developer_config,
table_watermarks,
state_table_info,
state_table_info: _,
member_table_ids,
..
} = context;
let dynamic_level_core =
DynamicLevelSelectorCore::new(group.compaction_config.clone(), developer_config);
let ctx = dynamic_level_core.calculate_level_base_size(levels);
let mut picker = VnodeWatermarkCompactionPicker::new();
let table_watermarks =
safe_epoch_read_table_watermarks(table_watermarks, state_table_info, member_table_ids);
let compaction_input = picker.pick_compaction(levels, level_handlers, &table_watermarks)?;
let pk_table_watermarks =
safe_epoch_read_table_watermarks(table_watermarks, member_table_ids);
let compaction_input =
picker.pick_compaction(levels, level_handlers, &pk_table_watermarks)?;
compaction_input.add_pending_task(task_id, level_handlers);
Some(create_compaction_task(
dynamic_level_core.get_config(),
Expand All @@ -73,15 +75,25 @@ impl CompactionSelector for VnodeWatermarkCompactionSelector {

fn safe_epoch_read_table_watermarks(
table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
state_table_info: &HummockVersionStateTableInfo,
member_table_ids: &BTreeSet<TableId>,
) -> BTreeMap<TableId, ReadTableWatermark> {
safe_epoch_read_table_watermarks_impl(&safe_epoch_table_watermarks_impl(
table_watermarks,
state_table_info,
&member_table_ids
.iter()
.map(TableId::table_id)
.collect::<Vec<_>>(),
))
safe_epoch_read_table_watermarks_impl(
safe_epoch_table_watermarks_impl(
table_watermarks,
&member_table_ids
.iter()
.map(TableId::table_id)
.collect::<Vec<_>>(),
)
.into_iter()
.filter(|(_table_id, table_watermarks)| {
{
matches!(
table_watermarks.watermark_type,
WatermarkSerdeType::PkPrefix
)
}
})
.collect(),
)
}
16 changes: 14 additions & 2 deletions src/meta/src/hummock/manager/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use risingwave_hummock_sdk::sstable_info::SstableInfo;
use risingwave_hummock_sdk::table_stats::{
add_prost_table_stats_map, purge_prost_table_stats, PbTableStatsMap,
};
use risingwave_hummock_sdk::table_watermark::WatermarkSerdeType;
use risingwave_hummock_sdk::version::{GroupDelta, IntraLevelDelta};
use risingwave_hummock_sdk::{
compact_task_to_string, statistics_compact_task, CompactionGroupId, HummockCompactionTaskId,
Expand Down Expand Up @@ -851,9 +852,20 @@ impl HummockManager {
&mut compact_task,
group_config.compaction_config.as_ref(),
);
compact_task.table_watermarks = version
let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks) = version
.latest_version()
.safe_epoch_table_watermarks(&compact_task.existing_table_ids);
.safe_epoch_table_watermarks(&compact_task.existing_table_ids)
.into_iter()
.partition(|(_table_id, table_watermarke)| {
matches!(
table_watermarke.watermark_type,
WatermarkSerdeType::PkPrefix
)
});

compact_task.pk_prefix_table_watermarks = pk_prefix_table_watermarks;
compact_task.non_pk_prefix_table_watermarks = non_pk_prefix_table_watermarks;

compact_task.table_schemas = compact_task
.existing_table_ids
.iter()
Expand Down
5 changes: 5 additions & 0 deletions src/storage/benches/bench_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,13 @@ async fn build_table(
},
);
let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
let mut builder = SstableBuilder::<_, Xor16FilterBuilder>::for_test(
sstable_object_id,
writer,
opt,
table_id_to_vnode,
table_id_to_watermark_serde,
);
let value = b"1234567890123456789";
let mut full_key = test_key_of(0, epoch, TableId::new(0));
Expand Down Expand Up @@ -186,11 +188,14 @@ async fn build_table_2(
);

let table_id_to_vnode = HashMap::from_iter(vec![(table_id, VirtualNode::COUNT_FOR_TEST)]);
let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);

let mut builder = SstableBuilder::<_, Xor16FilterBuilder>::for_test(
sstable_object_id,
writer,
opt,
table_id_to_vnode,
table_id_to_watermark_serde,
);
let mut full_key = test_key_of(0, epoch, TableId::new(table_id));
let table_key_len = full_key.user_key.table_key.len();
Expand Down
26 changes: 21 additions & 5 deletions src/storage/benches/bench_merge_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@

use std::cell::RefCell;
use std::collections::BTreeMap;
use std::sync::Arc;

use bytes::Bytes;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use futures::executor::block_on;
use risingwave_hummock_sdk::key::TableKey;
use risingwave_storage::compaction_catalog_manager::CompactionCatalogAgent;
use risingwave_storage::hummock::iterator::{
Forward, HummockIterator, HummockIteratorUnion, MergeIterator, SkipWatermarkIterator,
Forward, HummockIterator, HummockIteratorUnion, MergeIterator,
NonPkPrefixSkipWatermarkIterator, NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator,
PkPrefixSkipWatermarkState,
};
use risingwave_storage::hummock::shared_buffer::shared_buffer_batch::{
SharedBufferBatch, SharedBufferBatchIterator, SharedBufferValue,
Expand Down Expand Up @@ -108,10 +112,22 @@ fn criterion_benchmark(c: &mut Criterion) {
},
);

let merge_iter = RefCell::new(SkipWatermarkIterator::new(
MergeIterator::new(gen_interleave_shared_buffer_batch_iter(10000, 100)),
BTreeMap::new(),
));
let combine_iter = {
let iter = PkPrefixSkipWatermarkIterator::new(
MergeIterator::new(gen_interleave_shared_buffer_batch_iter(10000, 100)),
PkPrefixSkipWatermarkState::new(BTreeMap::new()),
);

NonPkPrefixSkipWatermarkIterator::new(
iter,
NonPkPrefixSkipWatermarkState::new(
BTreeMap::new(),
Arc::new(CompactionCatalogAgent::dummy()),
),
)
};

let merge_iter = RefCell::new(combine_iter);
c.bench_with_input(
BenchmarkId::new("bench-merge-iter-skip-empty-watermark", "unordered"),
&merge_iter,
Expand Down
11 changes: 10 additions & 1 deletion src/storage/benches/bench_multi_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,16 @@ impl<F: SstableWriterFactory> TableBuilderFactory for LocalTableBuilderFactory<F
TableId::default().into(),
VirtualNode::COUNT_FOR_TEST,
)]);
let builder = SstableBuilder::for_test(id, writer, self.options.clone(), table_id_to_vnode);

let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);

let builder = SstableBuilder::for_test(
id,
writer,
self.options.clone(),
table_id_to_vnode,
table_id_to_watermark_serde,
);

Ok(builder)
}
Expand Down
3 changes: 2 additions & 1 deletion src/storage/benches/bench_table_watermarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use risingwave_common::hash::VirtualNode;
use risingwave_common::util::epoch::test_epoch;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::table_watermark::{
TableWatermarks, TableWatermarksIndex, VnodeWatermark, WatermarkDirection,
TableWatermarks, TableWatermarksIndex, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
};
use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionStateTableInfo};
use risingwave_hummock_sdk::HummockEpoch;
Expand Down Expand Up @@ -101,6 +101,7 @@ fn gen_committed_table_watermarks(
})
.collect(),
direction: WatermarkDirection::Ascending,
watermark_type: WatermarkSerdeType::PkPrefix,
}
}

Expand Down
Loading

0 comments on commit c3a7a99

Please sign in to comment.