diff --git a/db/db_impl/spdb_db_gs_impl.cc b/db/db_impl/spdb_db_gs_impl.cc index 85d67df62..65be63442 100644 --- a/db/db_impl/spdb_db_gs_impl.cc +++ b/db/db_impl/spdb_db_gs_impl.cc @@ -417,6 +417,7 @@ bool ProcessCurrValuesIterVsDelList(GlobalContext& gc, LevelContext& lc) { } Status ProcessLogLevel(GlobalContext& gc, LevelContext& lc) { + // TODO - Create this once for all levels rather than recreate per level lc.del_list_iter.reset(gc.del_list->NewIterator().release()); lc.del_list_iter->SeekToFirst(); @@ -440,7 +441,7 @@ Status ProcessLogLevel(GlobalContext& gc, LevelContext& lc) { if (lc.range_del_iter->Valid() == false) { auto was_new_csk_found = ProcessCurrValuesIterVsDelList(gc, lc); if (was_new_csk_found) { - // printf("Processing Level Ended, new csk was found\n"); + printf("Processing Level Ended, new csk was found\n"); return Status::OK(); } else { continue; @@ -495,8 +496,7 @@ Status ProcessLogLevel(GlobalContext& gc, LevelContext& lc) { } } - // printf("Processing Level Ended, was new csk was found:%d\n", - // lc.new_csk_found_in_level); + printf("Processing Level Ended, was new csk was found:%d\n", lc.new_csk_found_in_level); return Status::OK(); } @@ -519,7 +519,7 @@ Status ProcessMutableMemtable(SuperVersion* super_version, GlobalContext& gc, lc.range_del_iter.reset(new FragmentedRangeTombstoneIteratorWrapper( std::move(wrapped_range_del_iter), gc.comparator, *gc.csk)); - // printf("Processing Mutable Table - Iter\n"); + printf("Processing Mutable Table\n"); return ProcessLogLevel(gc, lc); } @@ -530,6 +530,9 @@ Status ProcessImmutableMemtables(SuperVersion* super_version, GlobalContext& gc, auto iters = super_version->imm->GetIterators(gc.mutable_read_options, &arena); + printf("Processing Immutable Memtables. Num Memtables:%zu\n", iters.size()); + + auto i = 1; for (auto& memtbl_iters : iters) { lc.values_iter.reset(new InternalIteratorWrapper( std::move(memtbl_iters.memtbl_iter), gc.comparator, *gc.csk)); @@ -541,11 +544,50 @@ Status ProcessImmutableMemtables(SuperVersion* super_version, GlobalContext& gc, lc.range_del_iter.reset(new FragmentedRangeTombstoneIteratorWrapper( std::move(wrapped_range_del_iter), gc.comparator, *gc.csk)); - // printf("Processing Immutable Memtable\n"); + printf("Processing Immutable Memtable #%d\n", i); auto status = ProcessLogLevel(gc, lc); if (status.ok() == false) { return status; } + ++i; + } + return Status::OK(); +} + +Status ProcessLevel0Files(SuperVersion* super_version, GlobalContext& gc, + const FileOptions& file_options, Arena& arena) { + constexpr int level0 = 0; + + if (super_version->current->storage_info()->IsLevelEmpty(level0)) { + return Status::OK(); + } + + LevelContext lc; + + // TOOD - Handle allow_unprepared_value!!!! + auto iters = + super_version->current->GetLevel0Iterators(gc.mutable_read_options, file_options, false /* allow_unprepared_value */, &arena); + + printf("Processing Level-0 Files. Num Files:%zu\n", iters.size()); + + auto i = 1; + for (auto& file_iters : iters) { + lc.values_iter.reset(new InternalIteratorWrapper( + std::move(file_iters.table_iter), gc.comparator, *gc.csk)); + + std::unique_ptr wrapped_range_del_iter; + if (file_iters.range_ts_iter.get() != nullptr) { + wrapped_range_del_iter.reset(file_iters.range_ts_iter.release()); + } + lc.range_del_iter.reset(new FragmentedRangeTombstoneIteratorWrapper( + std::move(wrapped_range_del_iter), gc.comparator, *gc.csk)); + + printf("Processing Level-0 File #%d\n", i); + auto status = ProcessLogLevel(gc, lc); + if (status.ok() == false) { + return status; + } + ++i; } return Status::OK(); } @@ -588,6 +630,10 @@ Status DBImpl::GetSmallest(const ReadOptions& read_options, status = ProcessImmutableMemtables(super_version, gc, arena); } + if (status.ok()) { + status = ProcessLevel0Files(super_version, gc, file_options_, arena); + } + CleanupSuperVersion(super_version); if (key->empty()) { diff --git a/db/db_test.cc b/db/db_test.cc index 9283d9d57..19da3b1f7 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -33,6 +33,7 @@ #include #include #include +#include #ifndef OS_WIN #include @@ -7886,7 +7887,11 @@ class DBGsTest : public DBTest { } } - void GetSmallestAndValidate(const Slice expected_smallest_key) { + void GetSmallestAndValidate(const Slice expected_smallest_key, const std::string& title = "") { + if (title.empty() == false) { + std::cout << "\n" << title << "\n"; + } + std::string smallest_key; Status s = dbfull()->GetSmallest(ReadOptions(), dbfull()->DefaultColumnFamily(), @@ -8166,9 +8171,6 @@ TEST_F(DBGsTest, GS_RangeTsAndDelKeyInImmCoveringInMutable) { CALL_WRAPPER(GetSmallestAndValidate("c")); } -// XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX -#endif - TEST_F(DBGsTest, GS_RangeTsAndDelKeyInMutableCoveringImm) { ReopenNewDb(); auto dflt_cfh = dbfull()->DefaultColumnFamily(); @@ -8182,6 +8184,42 @@ TEST_F(DBGsTest, GS_RangeTsAndDelKeyInMutableCoveringImm) { CALL_WRAPPER(GetSmallestAndValidate("")); } +// XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX +#endif + +// TEST_F(DBGsTest, GS_RangeTsAndDelKeyInImmCoveringInMutable) { +// ReopenNewDb(); +// auto dflt_cfh = dbfull()->DefaultColumnFamily(); + +// ASSERT_OK(dbfull()->Delete(WriteOptions(), dflt_cfh, "c")); +// ASSERT_OK(dbfull()->DeleteRange(WriteOptions(), dflt_cfh, "k", "z")); +// CALL_WRAPPER(GetSmallestAndValidate("", "mutable with dk / dr")); + +// ASSERT_OK(dbfull()->TEST_SwitchMemtable()); +// ASSERT_OK(dbfull()->Put(WriteOptions(), "x", "b1")); +// ASSERT_OK(dbfull()->Put(WriteOptions(), "c", "a1")); +// CALL_WRAPPER(GetSmallestAndValidate("c", "switched and added c, x")); + +// ASSERT_OK(dbfull()->TEST_SwitchMemtable()); +// ASSERT_OK(dbfull()->Put(WriteOptions(), "d", "a1")); +// CALL_WRAPPER(GetSmallestAndValidate("c", "switched and added d")); + +// ASSERT_OK(dbfull()->TEST_SwitchMemtable()); +// ASSERT_OK(dbfull()->Delete(WriteOptions(), "c")); +// CALL_WRAPPER(GetSmallestAndValidate("d", "switched and deleted d")); +// } + +TEST_F(DBGsTest, GS_EmptyMutableValuesIn1L0File) { + ReopenNewDb(); + + ASSERT_OK(dbfull()->Put(WriteOptions(), "x", "b1")); + ASSERT_OK(dbfull()->Put(WriteOptions(), "c", "a1")); + ASSERT_OK(dbfull()->TEST_FlushMemTable()); + ASSERT_EQ(1, NumTableFilesAtLevel(0)); + + CALL_WRAPPER(GetSmallestAndValidate("c")); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/range_del_aggregator.h b/db/range_del_aggregator.h index dc1e73038..dffc8fd80 100644 --- a/db/range_del_aggregator.h +++ b/db/range_del_aggregator.h @@ -90,6 +90,11 @@ class TruncatedRangeDelIterator { SequenceNumber lower_bound() const { return iter_->lower_bound(); } + // TODO - CONSIDER WHAT TO DO ABOUT THIS!!!! + std::unique_ptr StealInternalIterAndInvalidate() { + return std::move(iter_); + } + private: std::unique_ptr iter_; const InternalKeyComparator* icmp_; diff --git a/db/version_set.cc b/db/version_set.cc index c9eb84921..90ac76c6b 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2068,6 +2068,50 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options, } } +std::vector Version::GetLevel0Iterators( const ReadOptions& read_options, + const FileOptions& soptions, + bool allow_unprepared_value, + Arena* arena) { + assert(storage_info_.finalized_); + + if (storage_info_.IsLevelEmpty(0)) { + return {}; + } + + // TODO - Understand if this should be handled for Get Smallest as well + // bool should_sample = should_sample_file_read(); + + std::vector iters; + + for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) { + const auto& file = storage_info_.LevelFilesBrief(0).files[i]; + TruncatedRangeDelIterator* tombstone_iter = nullptr; + auto table_iter = cfd_->table_cache()->NewIterator( + read_options, soptions, cfd_->internal_comparator(), + *file.file_metadata, /*range_del_agg=*/nullptr, + mutable_cf_options_.prefix_extractor, nullptr, + cfd_->internal_stats()->GetFileReadHist(0), + TableReaderCaller::kUserIterator, arena, + /*skip_filters=*/false, /*level=*/0, max_file_size_for_l0_meta_pin_, + /*smallest_compaction_key=*/nullptr, + /*largest_compaction_key=*/nullptr, allow_unprepared_value, + mutable_cf_options_.block_protection_bytes_per_key, &tombstone_iter); + + // TODO - Handle read_options.ignore_range_deletions!!!! + FragmentedRangeTombstoneIterator* range_ts_iter = nullptr; + if ((read_options.ignore_range_deletions == false) && (tombstone_iter != nullptr)) { + range_ts_iter = tombstone_iter->StealInternalIterAndInvalidate().release(); + } + delete tombstone_iter; + tombstone_iter = nullptr; + + iters.push_back( {std::move(std::unique_ptr(table_iter)), + std::move(std::unique_ptr(range_ts_iter))}); + } + + return iters; +} + Status Version::OverlapWithLevelIterator(const ReadOptions& read_options, const FileOptions& file_options, const Slice& smallest_user_key, diff --git a/db/version_set.h b/db/version_set.h index 187772e67..f9aa24767 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -312,6 +312,10 @@ class VersionStorageInfo { bool HasOverlappingUserKey(const std::vector* inputs, int level); + bool IsLevelEmpty(int level) const { + return ((level >= num_non_empty_levels()) || (LevelFilesBrief(level).num_files == 0)); + } + int num_levels() const { return num_levels_; } // REQUIRES: PrepareForVersionAppend has been called @@ -862,6 +866,17 @@ class Version { MergeIteratorBuilder* merger_iter_builder, int level, bool allow_unprepared_value); + struct IteratorPair { + std::unique_ptr table_iter; + std::unique_ptr range_ts_iter; + }; + + // TODO - Consider using auto-vector or, adding to some entity like the merger_iter_builder + std::vector GetLevel0Iterators( const ReadOptions& read_options, + const FileOptions& soptions, + bool allow_unprepared_value, + Arena* arena); + Status OverlapWithLevelIterator(const ReadOptions&, const FileOptions&, const Slice& smallest_user_key, const Slice& largest_user_key, int level,