Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Nov 27, 2023
1 parent 1f311fe commit b528a05
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 35 deletions.
18 changes: 12 additions & 6 deletions src/storage/hummock_sdk/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,16 +414,22 @@ impl<T: AsRef<[u8]>> AsRef<[u8]> for TableKey<T> {
}

impl<T: AsRef<[u8]>> TableKey<T> {
pub fn split_vnode(&self) -> (VirtualNode, &[u8]) {
debug_assert!(
self.0.as_ref().len() >= VirtualNode::SIZE,
"too short table key: {:?}",
self.0.as_ref()
);
let (vnode, inner_key) = self.0.as_ref().split_array_ref::<{ VirtualNode::SIZE }>();
(VirtualNode::from_be_bytes(*vnode), inner_key)
}

pub fn vnode_part(&self) -> VirtualNode {
VirtualNode::from_be_bytes(
self.0.as_ref()[..VirtualNode::SIZE]
.try_into()
.expect("slice with incorrect length"),
)
self.split_vnode().0
}

pub fn key_part(&self) -> &[u8] {
&self.0.as_ref()[VirtualNode::SIZE..]
self.split_vnode().1
}
}

Expand Down
1 change: 1 addition & 0 deletions src/storage/hummock_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#![feature(is_sorted)]
#![feature(let_chains)]
#![feature(btree_cursors)]
#![feature(split_array)]

mod key_cmp;
use std::cmp::Ordering;
Expand Down
67 changes: 38 additions & 29 deletions src/storage/src/hummock/iterator/skip_watermark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ impl<I: HummockIterator<Direction = Forward>> SkipWatermarkIterator<I> {
if self.inner.is_valid() {
let key = self.inner.key();
let key_table_id = key.user_key.table_id;
let key_vnode = key.user_key.table_key.vnode_part();
let inner_key = key.user_key.table_key.key_part();
let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
while let Some((table_id, vnode, direction, watermark)) = self.remain_watermarks.front()
{
match (table_id, vnode).cmp(&(&key_table_id, &key_vnode)) {
Expand All @@ -79,19 +78,31 @@ impl<I: HummockIterator<Direction = Forward>> SkipWatermarkIterator<I> {
continue;
}
Ordering::Equal => {
if direction.filter_by_watermark(inner_key, watermark) {
return true;
}
// when watermark is ascending and the inner key has passed the watermark,
// the watermark is not likely to take effect to current or future key,
// and we can skip the watermark.
if *direction == WatermarkDirection::Ascending
&& inner_key >= watermark.as_ref()
{
self.remain_watermarks.pop_front();
continue;
} else {
return false;
match direction {
WatermarkDirection::Ascending => {
match inner_key.cmp(watermark.as_ref()) {
Ordering::Less => {
// The current key will be filtered by the watermark.
// Return true to further advance the key.
return true;
}
Ordering::Equal | Ordering::Greater => {
// The current key has passed the watermark.
// Advance the next watermark.
self.remain_watermarks.pop_front();
continue;
}
}
}
WatermarkDirection::Descending => {
return match inner_key.cmp(watermark.as_ref()) {
// Current key as not reached the watermark. Just return.
Ordering::Less | Ordering::Equal => false,
// Current key will be filtered by the watermark.
// Return true to further advance the key.
Ordering::Greater => true,
};
}
}
}
Ordering::Greater => {
Expand All @@ -113,8 +124,7 @@ impl<I: HummockIterator<Direction = Forward>> SkipWatermarkIterator<I> {
{
let key = self.inner.key();
let key_table_id = key.user_key.table_id;
let key_vnode = key.user_key.table_key.vnode_part();
let inner_key = key.user_key.table_key.key_part();
let (key_vnode, inner_key) = key.user_key.table_key.split_vnode();
match (&key_table_id, &key_vnode).cmp(&(table_id, vnode)) {
Ordering::Less => {
return Ok(false);
Expand All @@ -139,18 +149,17 @@ impl<I: HummockIterator<Direction = Forward>> SkipWatermarkIterator<I> {
}

async fn advance_key_and_watermark(&mut self) -> HummockResult<()> {
// here we assume that before calling this method, it was an operation on key,
// so we call advance_watermark anyway at first.
if self.advance_watermark() {
loop {
// advance key and watermark in an interleave manner until nothing
// changed after the method is called.
if !self.advance_key().await? {
break;
}
if !self.advance_watermark() {
break;
}
// advance key and watermark in an interleave manner until nothing
// changed after the method is called.
loop {
// here we assume that before calling this method, it was an operation on key,
// so we call advance_watermark anyway at first.
if !self.advance_watermark() {
break;
}

if !self.advance_key().await? {
break;
}
}
Ok(())
Expand Down

0 comments on commit b528a05

Please sign in to comment.