diff --git a/db/compact_files_test.cc b/db/compact_files_test.cc index 5d7e64dc0..254bd68be 100644 --- a/db/compact_files_test.cc +++ b/db/compact_files_test.cc @@ -132,6 +132,52 @@ TEST_F(CompactFilesTest, L0ConflictsFiles) { delete db; } +// TODO: add unit tests for: +// base case - stop adding to L0 files once limit has reached. +// small files, make sure it keeps adding until it reaches the size limit. +// make sure - oldest files are picked. +// test that bulkload still holds. + +// to check scenario : 1-3, 5-6, 2-7 L0 files, +// 1-3 is picked in SetupInitialFiles(). +// will 5-6 be picked in SetupOtherL0FilesIfNeeded. +TEST_F(CompactFilesTest, L0PickExpansion) { + Options options; + options.create_if_missing = true; + options.compaction_style = kCompactionStyleLevel; + // Small slowdown and stop trigger for experimental purpose. + options.compaction_pri = CompactionPri::kOldestSmallestSeqFirst; + options.level0_file_num_compaction_trigger = 4; + + DB* db = nullptr; + DestroyDB(db_name_, options); + Status s = DB::Open(options, db_name_, &db); + assert(s.ok()); + assert(db); + WriteOptions write_options; + // create L0 files + ASSERT_OK(db->Put(write_options, "foo1", "v1")); + ASSERT_OK(db->Put(write_options, "foo3", "v1")); + ASSERT_OK(db->Flush(FlushOptions())); + + ASSERT_OK(db->Put(write_options, "foo2", "v1")); + ASSERT_OK(db->Put(write_options, "foo5", "v1")); + ASSERT_OK(db->Put(write_options, "foo7", "v1")); + ASSERT_OK(db->Flush(FlushOptions())); + + ASSERT_OK(db->Put(write_options, "foo5", "v2")); + ASSERT_OK(db->Put(write_options, "foo6", "v1")); + ASSERT_OK(db->Flush(FlushOptions())); + + ASSERT_OK(db->Put(write_options, "foo8", "v1")); + ASSERT_OK(db->Put(write_options, "foo9", "v1")); + ASSERT_OK(db->Flush(FlushOptions())); + + ASSERT_OK(static_cast_with_check(db)->TEST_WaitForBackgroundWork()); + + delete db; +} + TEST_F(CompactFilesTest, MultipleLevel) { Options options; options.create_if_missing = true; diff --git a/db/version_set.cc b/db/version_set.cc index c9eb84921..909c14d73 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2138,7 +2138,7 @@ VersionStorageInfo::VersionStorageInfo( CompactionStyle compaction_style, VersionStorageInfo* ref_vstorage, bool _force_consistency_checks, EpochNumberRequirement epoch_number_requirement, SystemClock* clock, - uint32_t bottommost_file_compaction_delay) + uint32_t bottommost_file_compaction_delay, uint64_t base_level_size) : internal_comparator_(internal_comparator), user_comparator_(user_comparator), // cfd is nullptr if Version is dummy @@ -2171,6 +2171,8 @@ VersionStorageInfo::VersionStorageInfo( finalized_(false), force_consistency_checks_(_force_consistency_checks), epoch_number_requirement_(epoch_number_requirement) { + max_size_to_compact_ = 3 * base_level_size; + if (ref_vstorage != nullptr) { accumulated_file_size_ = ref_vstorage->accumulated_file_size_; accumulated_raw_key_size_ = ref_vstorage->accumulated_raw_key_size_; @@ -2214,9 +2216,9 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset, cfd_ == nullptr ? false : cfd_->ioptions()->force_consistency_checks, epoch_number_requirement, cfd_ == nullptr ? nullptr : cfd_->ioptions()->clock, - cfd_ == nullptr - ? 0 - : mutable_cf_options.bottommost_file_compaction_delay), + cfd_ == nullptr ? 0 + : mutable_cf_options.bottommost_file_compaction_delay, + mutable_cf_options.max_bytes_for_level_base), vset_(vset), next_(this), prev_(this), @@ -4321,11 +4323,20 @@ void VersionStorageInfo::GetOverlappingInputs( user_end = end->user_key(); } - // index stores the file index need to check. + // sortedness is required for https://github.com/speedb-io/speedb/issues/154. + // order of files is newest to oldest. + // files seq: 21 - 30 , 11 - 20 , 1 - 10 + for (size_t i = 1; i < level_files_brief_[level].num_files; i++) { + assert(files_[0][i - 1]->fd.smallest_seqno >= + files_[0][i]->fd.largest_seqno); + } + // build index in reverse order to make sure that files are picked from oldest + // to newest. index stores the file index need to check. std::list index; - for (size_t i = 0; i < level_files_brief_[level].num_files; i++) { + for (int64_t i = level_files_brief_[level].num_files - 1; i >= 0; i--) { index.emplace_back(i); } + uint64_t cur_files_size = 0; while (!index.empty()) { bool found_overlapping_file = false; @@ -4345,6 +4356,7 @@ void VersionStorageInfo::GetOverlappingInputs( } else { // if overlap inputs->emplace_back(files_[level][*iter]); + cur_files_size += f->fd.file_size; found_overlapping_file = true; // record the first file index. if (file_index && *file_index == -1) { @@ -4362,6 +4374,12 @@ void VersionStorageInfo::GetOverlappingInputs( user_end = file_limit; } } + + // stop accumulating L0 files for compaction once enough are added. + // stop when: total size of currently selected files is big enough. + if (cur_files_size > max_size_to_compact_) { + return; + } } } // if all the files left are not overlap, break diff --git a/db/version_set.h b/db/version_set.h index 187772e67..77129a9e3 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -148,7 +148,8 @@ class VersionStorageInfo { bool _force_consistency_checks, EpochNumberRequirement epoch_number_requirement, SystemClock* clock, - uint32_t bottommost_file_compaction_delay); + uint32_t bottommost_file_compaction_delay, + const uint64_t base_level_size = 0); // No copying allowed VersionStorageInfo(const VersionStorageInfo&) = delete; void operator=(const VersionStorageInfo&) = delete; @@ -677,8 +678,11 @@ class VersionStorageInfo { // A list for the same set of files that are stored in files_, // but files in each level are now sorted based on file - // size. The file with the largest size is at the front. + // size. or other pri as in CompactionPri advanced_options.h . + // sorting is done in each version edit in PrepareForVersionAppend. + // The file with the largest size is at the front. // This vector stores the index of the file from files_. + // only first number_of_files_to_sort_ files are sorted. std::vector> files_by_compaction_pri_; // If true, means that files in L0 have keys with non overlapping ranges @@ -775,6 +779,10 @@ class VersionStorageInfo { EpochNumberRequirement epoch_number_requirement_; + // mutable_cf_options.max_bytes_for_level_base + size_t max_size_to_compact_; + // bool one_compaction_thread; + friend class Version; friend class VersionSet; };