Skip to content

Commit

Permalink
week3 day16 Snapshot Read - Memtables and Timestamps
Browse files Browse the repository at this point in the history
  • Loading branch information
MaYangrui6 committed Dec 30, 2024
1 parent 7d2c83e commit 0f2cc2d
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 119 deletions.
35 changes: 21 additions & 14 deletions mini-lsm-starter/src/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,36 +119,43 @@ impl LsmStorageInner {
fn compact_generate_sst_from_iter(
&self,
mut iter: impl for<'a> StorageIterator<KeyType<'a> = KeySlice<'a>>,
compact_to_bottom_level: bool,
_compact_to_bottom_level: bool,
) -> Result<Vec<Arc<SsTable>>> {
// let _state_lock = self.state_lock.lock();
let mut builder = None;
let mut new_sst = Vec::new();

// last_key 是用来跟踪压缩过程中处理的上一条记录的键(不包含时间戳的主键)。它的主要作用是帮助判断当前正在处理的键是否和上一条记录的键相同
let mut last_key = Vec::<u8>::new();
while iter.is_valid() {
if builder.is_none() {
builder = Some(SsTableBuilder::new(self.options.block_size));
}

let same_as_last_key = iter.key().key_ref() == last_key;

let builder_inner = builder.as_mut().unwrap();
if compact_to_bottom_level {
if !iter.value().is_empty() {
builder_inner.add(iter.key(), iter.value());
}
} else {
builder_inner.add(iter.key(), iter.value());
}
iter.next()?;

if builder_inner.estimated_size() >= self.options.target_sst_size {
// 由于 MVCC 的压缩规则要求同一主键的不同版本必须存储在同一个 SST 文件中,因此在实际运行中,生成的 SST 文件大小可能会有所不同
if builder_inner.estimated_size() >= self.options.target_sst_size && !same_as_last_key {
let sst_id = self.next_sst_id();
let builder = builder.take().unwrap();
let sst = Arc::new(builder.build(
let old_builder = builder.take().unwrap();
let sst = Arc::new(old_builder.build(
sst_id,
Some(self.block_cache.clone()),
self.path_of_sst(sst_id),
)?);
new_sst.push(sst);
builder = Some(SsTableBuilder::new(self.options.block_size));
}

let builder_inner = builder.as_mut().unwrap();
builder_inner.add(iter.key(), iter.value());

if !same_as_last_key {
last_key.clear();
last_key.extend(iter.key().key_ref());
}

iter.next()?;
}
if let Some(builder) = builder {
let sst_id = self.next_sst_id(); // lock dropped here
Expand Down
30 changes: 25 additions & 5 deletions mini-lsm-starter/src/lsm_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub struct LsmIterator {
inner: LsmIteratorInner,
end_bound: Bound<Bytes>,
is_valid: bool,
prev_key: Vec<u8>,
}

impl LsmIterator {
Expand All @@ -32,8 +33,10 @@ impl LsmIterator {
is_valid: iter.is_valid(),
inner: iter,
end_bound,
prev_key: Vec::new(),
};
iter.move_to_non_delete()?;
// iter.move_to_non_delete()?;
iter.move_to_key()?;
Ok(iter)
}

Expand All @@ -52,9 +55,26 @@ impl LsmIterator {
Ok(())
}

fn move_to_non_delete(&mut self) -> Result<()> {
while self.is_valid() && self.inner.value().is_empty() {
self.next_inner()?;
// fn move_to_non_delete(&mut self) -> Result<()> {
// while self.is_valid() && self.inner.value().is_empty() {
// self.next_inner()?;
// }
// Ok(())
// }

fn move_to_key(&mut self) -> Result<()> {
loop {
while self.inner.is_valid() && self.inner.key().key_ref() == self.prev_key {
self.next_inner()?;
}
if !self.inner.is_valid() {
break;
}
self.prev_key.clear();
self.prev_key.extend(self.inner.key().key_ref());
if !self.inner.value().is_empty() {
break;
}
}
Ok(())
}
Expand All @@ -77,7 +97,7 @@ impl StorageIterator for LsmIterator {

fn next(&mut self) -> Result<()> {
self.next_inner()?;
self.move_to_non_delete()?;
self.move_to_key()?;
Ok(())
}

Expand Down
96 changes: 66 additions & 30 deletions mini-lsm-starter/src/lsm_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ use crate::iterators::concat_iterator::SstConcatIterator;
use crate::iterators::merge_iterator::MergeIterator;
use crate::iterators::two_merge_iterator::TwoMergeIterator;
use crate::iterators::StorageIterator;
use crate::key::{Key, KeySlice, TS_RANGE_BEGIN};
use crate::key::{Key, KeySlice, TS_RANGE_BEGIN, TS_RANGE_END};
use crate::lsm_iterator::{FusedIterator, LsmIterator};
use crate::manifest::{Manifest, ManifestRecord};
use crate::mem_table::{map_bound, MemTable};
use crate::mem_table::{map_bound, map_key_bound_plus_ts, MemTable};
use crate::mvcc::LsmMvccInner;
use crate::table::{FileObject, SsTable, SsTableBuilder, SsTableIterator};

Expand Down Expand Up @@ -311,6 +311,13 @@ impl LsmStorageInner {
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
}

pub(crate) fn mvcc(&self) -> &LsmMvccInner {
self.mvcc.as_ref().unwrap()
}
pub(crate) fn manifest(&self) -> &Manifest {
self.manifest.as_ref().unwrap()
}

/// Start the storage engine by either loading an existing directory or creating a new one if the directory does
/// not exist.
pub(crate) fn open(path: impl AsRef<Path>, options: LsmStorageOptions) -> Result<Self> {
Expand Down Expand Up @@ -431,7 +438,7 @@ impl LsmStorageInner {
compaction_controller,
manifest: Some(manifest),
options: options.into(),
mvcc: None,
mvcc: Some(LsmMvccInner::new(0)),
compaction_filters: Arc::new(Mutex::new(Vec::new())),
};
storage.sync_dir()?;
Expand All @@ -455,25 +462,38 @@ impl LsmStorageInner {
Arc::clone(&guard)
}; // drop global lock here

// 1、Search on the current memtable.
if let Some(value) = snapshot.memtable.get(key) {
if value.is_empty() {
// found tomestone, return key not exists
return Ok(None);
}
return Ok(Some(value));
}

// 2、Search on immutable memtables.
// // 1、Search on the current memtable.
// if let Some(value) = snapshot.memtable.get(key) {
// if value.is_empty() {
// // found tomestone, return key not exists
// return Ok(None);
// }
// return Ok(Some(value));
// }
//
// // 2、Search on immutable memtables.
// for memtable in snapshot.imm_memtables.iter() {
// if let Some(value) = memtable.get(key) {
// if value.is_empty() {
// // found tomestone, return key not exists
// return Ok(None);
// }
// return Ok(Some(value));
// }
// }
// 1-2、 search in memtable
let mut memtable_iters = Vec::with_capacity(snapshot.imm_memtables.len() + 1);
memtable_iters.push(Box::new(snapshot.memtable.scan(
Bound::Included(KeySlice::from_slice(key, TS_RANGE_BEGIN)),
Bound::Included(KeySlice::from_slice(key, TS_RANGE_END)),
)));
for memtable in snapshot.imm_memtables.iter() {
if let Some(value) = memtable.get(key) {
if value.is_empty() {
// found tomestone, return key not exists
return Ok(None);
}
return Ok(Some(value));
}
memtable_iters.push(Box::new(memtable.scan(
Bound::Included(KeySlice::from_slice(key, TS_RANGE_BEGIN)),
Bound::Included(KeySlice::from_slice(key, TS_RANGE_END)),
)));
}
let memtable_iter = MergeIterator::create(memtable_iters);

// 3、search on l0
let mut l0_iters = Vec::with_capacity(snapshot.l0_sstables.len());
Expand Down Expand Up @@ -519,16 +539,25 @@ impl LsmStorageInner {
level_iters.push(Box::new(level_iter));
}

let iter = TwoMergeIterator::create(l0_iter, MergeIterator::create(level_iters))?;
// let iter = TwoMergeIterator::create(l0_iter, MergeIterator::create(level_iters))?;
let iter = LsmIterator::new(
TwoMergeIterator::create(
TwoMergeIterator::create(memtable_iter, l0_iter)?,
MergeIterator::create(level_iters),
)?,
Bound::Unbounded,
)?;

if iter.is_valid() && iter.key().key_ref() == key && !iter.value().is_empty() {
if iter.is_valid() && iter.key() == key && !iter.value().is_empty() {
return Ok(Some(Bytes::copy_from_slice(iter.value())));
}
Ok(None)
}

/// Write a batch of data into the storage. Implement in week 2 day 7.
pub fn write_batch<T: AsRef<[u8]>>(&self, batch: &[WriteBatchRecord<T>]) -> Result<()> {
let _lck = self.mvcc().write_lock.lock();
let ts = self.mvcc().latest_commit_ts() + 1;
for record in batch {
match record {
WriteBatchRecord::Del(key) => {
Expand All @@ -537,7 +566,7 @@ impl LsmStorageInner {
let size;
{
let guard = self.state.read();
guard.memtable.put(key, b"")?;
guard.memtable.put(KeySlice::from_slice(key, ts), b"")?;
size = guard.memtable.approximate_size();
}
self.try_freeze(size)?;
Expand All @@ -550,13 +579,14 @@ impl LsmStorageInner {
let size;
{
let guard = self.state.read();
guard.memtable.put(key, value)?;
guard.memtable.put(KeySlice::from_slice(key, ts), value)?;
size = guard.memtable.approximate_size();
}
self.try_freeze(size)?;
}
}
}
self.mvcc().update_commit_ts(ts);
Ok(())
}

Expand Down Expand Up @@ -634,7 +664,7 @@ impl LsmStorageInner {

self.freeze_memtable_with_memtable(memtable)?;

self.manifest.as_ref().unwrap().add_record(
self.manifest().add_record(
state_lock_observer,
ManifestRecord::NewMemtable(memtable_id),
)?;
Expand Down Expand Up @@ -692,9 +722,7 @@ impl LsmStorageInner {
if self.options.enable_wal {
std::fs::remove_file(self.path_of_wal(sst_id))?;
}
self.manifest
.as_ref()
.unwrap()
self.manifest()
.add_record(&_state_lock, ManifestRecord::Flush(sst_id))?;
self.sync_dir()?;
Ok(())
Expand All @@ -717,9 +745,17 @@ impl LsmStorageInner {
}; // drop global lock here

let mut memtable_iters = Vec::with_capacity(snapshot.imm_memtables.len() + 1);
memtable_iters.push(Box::new(snapshot.memtable.scan(lower, upper)));
// memtable_iters.push(Box::new(snapshot.memtable.scan(lower, upper)));
memtable_iters.push(Box::new(snapshot.memtable.scan(
map_key_bound_plus_ts(lower, TS_RANGE_BEGIN),
map_key_bound_plus_ts(upper, TS_RANGE_END),
)));
for memtable in snapshot.imm_memtables.iter() {
memtable_iters.push(Box::new(memtable.scan(lower, upper)));
// memtable_iters.push(Box::new(memtable.scan(lower, upper)));
memtable_iters.push(Box::new(memtable.scan(
map_key_bound_plus_ts(lower, TS_RANGE_BEGIN),
map_key_bound_plus_ts(upper, TS_RANGE_END),
)));
}
let memtable_iter = MergeIterator::create(memtable_iters);

Expand Down
Loading

0 comments on commit 0f2cc2d

Please sign in to comment.