Skip to content

Commit

Permalink
1st l0 file ut passes
Browse files Browse the repository at this point in the history
  • Loading branch information
udi-speedb committed Feb 4, 2024
1 parent 90d67cc commit 6ea81bc
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 9 deletions.
56 changes: 51 additions & 5 deletions db/db_impl/spdb_db_gs_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ bool ProcessCurrValuesIterVsDelList(GlobalContext& gc, LevelContext& lc) {
}

Status ProcessLogLevel(GlobalContext& gc, LevelContext& lc) {
// TODO - Create this once for all levels rather than recreate per level
lc.del_list_iter.reset(gc.del_list->NewIterator().release());

lc.del_list_iter->SeekToFirst();
Expand All @@ -440,7 +441,7 @@ Status ProcessLogLevel(GlobalContext& gc, LevelContext& lc) {
if (lc.range_del_iter->Valid() == false) {
auto was_new_csk_found = ProcessCurrValuesIterVsDelList(gc, lc);
if (was_new_csk_found) {
// printf("Processing Level Ended, new csk was found\n");
printf("Processing Level Ended, new csk was found\n");
return Status::OK();
} else {
continue;
Expand Down Expand Up @@ -495,8 +496,7 @@ Status ProcessLogLevel(GlobalContext& gc, LevelContext& lc) {
}
}

// printf("Processing Level Ended, was new csk was found:%d\n",
// lc.new_csk_found_in_level);
printf("Processing Level Ended, was new csk was found:%d\n", lc.new_csk_found_in_level);

return Status::OK();
}
Expand All @@ -519,7 +519,7 @@ Status ProcessMutableMemtable(SuperVersion* super_version, GlobalContext& gc,
lc.range_del_iter.reset(new FragmentedRangeTombstoneIteratorWrapper(
std::move(wrapped_range_del_iter), gc.comparator, *gc.csk));

// printf("Processing Mutable Table - Iter\n");
printf("Processing Mutable Table\n");
return ProcessLogLevel(gc, lc);
}

Expand All @@ -530,6 +530,9 @@ Status ProcessImmutableMemtables(SuperVersion* super_version, GlobalContext& gc,
auto iters =
super_version->imm->GetIterators(gc.mutable_read_options, &arena);

printf("Processing Immutable Memtables. Num Memtables:%zu\n", iters.size());

auto i = 1;
for (auto& memtbl_iters : iters) {
lc.values_iter.reset(new InternalIteratorWrapper(
std::move(memtbl_iters.memtbl_iter), gc.comparator, *gc.csk));
Expand All @@ -541,11 +544,50 @@ Status ProcessImmutableMemtables(SuperVersion* super_version, GlobalContext& gc,
lc.range_del_iter.reset(new FragmentedRangeTombstoneIteratorWrapper(
std::move(wrapped_range_del_iter), gc.comparator, *gc.csk));

// printf("Processing Immutable Memtable\n");
printf("Processing Immutable Memtable #%d\n", i);
auto status = ProcessLogLevel(gc, lc);
if (status.ok() == false) {
return status;
}
++i;
}
return Status::OK();
}

Status ProcessLevel0Files(SuperVersion* super_version, GlobalContext& gc,
const FileOptions& file_options, Arena& arena) {
constexpr int level0 = 0;

if (super_version->current->storage_info()->IsLevelEmpty(level0)) {
return Status::OK();
}

LevelContext lc;

// TOOD - Handle allow_unprepared_value!!!!
auto iters =
super_version->current->GetLevel0Iterators(gc.mutable_read_options, file_options, false /* allow_unprepared_value */, &arena);

printf("Processing Level-0 Files. Num Files:%zu\n", iters.size());

auto i = 1;
for (auto& file_iters : iters) {
lc.values_iter.reset(new InternalIteratorWrapper(
std::move(file_iters.table_iter), gc.comparator, *gc.csk));

std::unique_ptr<FragmentedRangeTombstoneIterator> wrapped_range_del_iter;
if (file_iters.range_ts_iter.get() != nullptr) {
wrapped_range_del_iter.reset(file_iters.range_ts_iter.release());
}
lc.range_del_iter.reset(new FragmentedRangeTombstoneIteratorWrapper(
std::move(wrapped_range_del_iter), gc.comparator, *gc.csk));

printf("Processing Level-0 File #%d\n", i);
auto status = ProcessLogLevel(gc, lc);
if (status.ok() == false) {
return status;
}
++i;
}
return Status::OK();
}
Expand Down Expand Up @@ -588,6 +630,10 @@ Status DBImpl::GetSmallest(const ReadOptions& read_options,
status = ProcessImmutableMemtables(super_version, gc, arena);
}

if (status.ok()) {
status = ProcessLevel0Files(super_version, gc, file_options_, arena);
}

CleanupSuperVersion(super_version);

if (key->empty()) {
Expand Down
46 changes: 42 additions & 4 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <unordered_set>
#include <utility>
#include <vector>
#include <iostream>

#ifndef OS_WIN
#include <unistd.h>
Expand Down Expand Up @@ -7886,7 +7887,11 @@ class DBGsTest : public DBTest {
}
}

void GetSmallestAndValidate(const Slice expected_smallest_key) {
void GetSmallestAndValidate(const Slice expected_smallest_key, const std::string& title = "") {
if (title.empty() == false) {
std::cout << "\n" << title << "\n";
}

std::string smallest_key;
Status s =
dbfull()->GetSmallest(ReadOptions(), dbfull()->DefaultColumnFamily(),
Expand Down Expand Up @@ -8166,9 +8171,6 @@ TEST_F(DBGsTest, GS_RangeTsAndDelKeyInImmCoveringInMutable) {
CALL_WRAPPER(GetSmallestAndValidate("c"));
}

// XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
#endif

TEST_F(DBGsTest, GS_RangeTsAndDelKeyInMutableCoveringImm) {
ReopenNewDb();
auto dflt_cfh = dbfull()->DefaultColumnFamily();
Expand All @@ -8182,6 +8184,42 @@ TEST_F(DBGsTest, GS_RangeTsAndDelKeyInMutableCoveringImm) {
CALL_WRAPPER(GetSmallestAndValidate(""));
}

// XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
#endif

// TEST_F(DBGsTest, GS_RangeTsAndDelKeyInImmCoveringInMutable) {
// ReopenNewDb();
// auto dflt_cfh = dbfull()->DefaultColumnFamily();

// ASSERT_OK(dbfull()->Delete(WriteOptions(), dflt_cfh, "c"));
// ASSERT_OK(dbfull()->DeleteRange(WriteOptions(), dflt_cfh, "k", "z"));
// CALL_WRAPPER(GetSmallestAndValidate("", "mutable with dk / dr"));

// ASSERT_OK(dbfull()->TEST_SwitchMemtable());
// ASSERT_OK(dbfull()->Put(WriteOptions(), "x", "b1"));
// ASSERT_OK(dbfull()->Put(WriteOptions(), "c", "a1"));
// CALL_WRAPPER(GetSmallestAndValidate("c", "switched and added c, x"));

// ASSERT_OK(dbfull()->TEST_SwitchMemtable());
// ASSERT_OK(dbfull()->Put(WriteOptions(), "d", "a1"));
// CALL_WRAPPER(GetSmallestAndValidate("c", "switched and added d"));

// ASSERT_OK(dbfull()->TEST_SwitchMemtable());
// ASSERT_OK(dbfull()->Delete(WriteOptions(), "c"));
// CALL_WRAPPER(GetSmallestAndValidate("d", "switched and deleted d"));
// }

TEST_F(DBGsTest, GS_EmptyMutableValuesIn1L0File) {
ReopenNewDb();

ASSERT_OK(dbfull()->Put(WriteOptions(), "x", "b1"));
ASSERT_OK(dbfull()->Put(WriteOptions(), "c", "a1"));
ASSERT_OK(dbfull()->TEST_FlushMemTable());
ASSERT_EQ(1, NumTableFilesAtLevel(0));

CALL_WRAPPER(GetSmallestAndValidate("c"));
}

} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down
5 changes: 5 additions & 0 deletions db/range_del_aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ class TruncatedRangeDelIterator {

SequenceNumber lower_bound() const { return iter_->lower_bound(); }

// TODO - CONSIDER WHAT TO DO ABOUT THIS!!!!
std::unique_ptr<FragmentedRangeTombstoneIterator> StealInternalIterAndInvalidate() {
return std::move(iter_);
}

private:
std::unique_ptr<FragmentedRangeTombstoneIterator> iter_;
const InternalKeyComparator* icmp_;
Expand Down
44 changes: 44 additions & 0 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2068,6 +2068,50 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options,
}
}

std::vector<Version::IteratorPair> Version::GetLevel0Iterators( const ReadOptions& read_options,
const FileOptions& soptions,
bool allow_unprepared_value,
Arena* arena) {
assert(storage_info_.finalized_);

if (storage_info_.IsLevelEmpty(0)) {
return {};
}

// TODO - Understand if this should be handled for Get Smallest as well
// bool should_sample = should_sample_file_read();

std::vector<IteratorPair> iters;

for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
const auto& file = storage_info_.LevelFilesBrief(0).files[i];
TruncatedRangeDelIterator* tombstone_iter = nullptr;
auto table_iter = cfd_->table_cache()->NewIterator(
read_options, soptions, cfd_->internal_comparator(),
*file.file_metadata, /*range_del_agg=*/nullptr,
mutable_cf_options_.prefix_extractor, nullptr,
cfd_->internal_stats()->GetFileReadHist(0),
TableReaderCaller::kUserIterator, arena,
/*skip_filters=*/false, /*level=*/0, max_file_size_for_l0_meta_pin_,
/*smallest_compaction_key=*/nullptr,
/*largest_compaction_key=*/nullptr, allow_unprepared_value,
mutable_cf_options_.block_protection_bytes_per_key, &tombstone_iter);

// TODO - Handle read_options.ignore_range_deletions!!!!
FragmentedRangeTombstoneIterator* range_ts_iter = nullptr;
if ((read_options.ignore_range_deletions == false) && (tombstone_iter != nullptr)) {
range_ts_iter = tombstone_iter->StealInternalIterAndInvalidate().release();
}
delete tombstone_iter;
tombstone_iter = nullptr;

iters.push_back( {std::move(std::unique_ptr<InternalIterator>(table_iter)),
std::move(std::unique_ptr<FragmentedRangeTombstoneIterator>(range_ts_iter))});
}

return iters;
}

Status Version::OverlapWithLevelIterator(const ReadOptions& read_options,
const FileOptions& file_options,
const Slice& smallest_user_key,
Expand Down
15 changes: 15 additions & 0 deletions db/version_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,10 @@ class VersionStorageInfo {
bool HasOverlappingUserKey(const std::vector<FileMetaData*>* inputs,
int level);

bool IsLevelEmpty(int level) const {
return ((level >= num_non_empty_levels()) || (LevelFilesBrief(level).num_files == 0));
}

int num_levels() const { return num_levels_; }

// REQUIRES: PrepareForVersionAppend has been called
Expand Down Expand Up @@ -862,6 +866,17 @@ class Version {
MergeIteratorBuilder* merger_iter_builder,
int level, bool allow_unprepared_value);

struct IteratorPair {
std::unique_ptr<InternalIterator> table_iter;
std::unique_ptr<FragmentedRangeTombstoneIterator> range_ts_iter;
};

// TODO - Consider using auto-vector or, adding to some entity like the merger_iter_builder
std::vector<IteratorPair> GetLevel0Iterators( const ReadOptions& read_options,
const FileOptions& soptions,
bool allow_unprepared_value,
Arena* arena);

Status OverlapWithLevelIterator(const ReadOptions&, const FileOptions&,
const Slice& smallest_user_key,
const Slice& largest_user_key, int level,
Expand Down

0 comments on commit 6ea81bc

Please sign in to comment.