Skip to content

Commit

Permalink
Switch from FragmentedRangeTombstoneIterator to TruncatedRangeDelIter…
Browse files Browse the repository at this point in the history
…ator
  • Loading branch information
udi-speedb committed Feb 21, 2024
1 parent 3831506 commit 80f928d
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 65 deletions.
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,10 @@ cmake-build-*
third-party/folly/
.cache
*.sublime-*
db_bench_db/*
examples/*LOG
examples/*txt
*csv
*cc*orig
out_file*
perf*data*
179 changes: 149 additions & 30 deletions db/db_impl/spdb_db_gs_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -192,15 +192,15 @@ class FragmentedRangeTombstoneIteratorWrapper : public Iterator {

void SeekToFirst() override {
if (wrapped_iter_ptr_) {
wrapped_iter_ptr_->SeekToFirst();
wrapped_iter_ptr_->SeekToTopFirst();
UpdateValidity();
ReportProgress("SeekToFirst");
}
}

void SeekToLast() override {
if (wrapped_iter_ptr_) {
wrapped_iter_ptr_->SeekToLast();
wrapped_iter_ptr_->SeekToTopLast();
UpdateValidity();
ReportProgress("SeekToLast");
}
Expand Down Expand Up @@ -326,12 +326,139 @@ class FragmentedRangeTombstoneIteratorWrapper : public Iterator {
bool valid_ = false;
};

class TruncatedRangeDelIteratorWrapper {
public:
TruncatedRangeDelIteratorWrapper(
TruncatedRangeDelIterator* wrapped_iter_ptr,
const Comparator* comparator, const Slice& upper_bound)
: wrapped_iter_ptr_(wrapped_iter_ptr),
comparator_(comparator),
upper_bound_(upper_bound) {}

~TruncatedRangeDelIteratorWrapper() {
delete wrapped_iter_ptr_;
}

bool Valid() const { return valid_; }

bool HasUpperBound() const { return (upper_bound_.empty() == false); }

void SetUpperBound(const Slice& upper_bound) {
upper_bound_ = upper_bound;
UpdateValidity();
}

const Slice& GetUpperBound() const {
assert(HasUpperBound());
return upper_bound_;
}

void SeekToFirst() {
if (wrapped_iter_ptr_) {
wrapped_iter_ptr_->SeekToFirst();
UpdateValidity();
ReportProgress("SeekToFirst");
}
}

void Seek(const Slice& target) {
if (wrapped_iter_ptr_) {
wrapped_iter_ptr_->Seek(target);
UpdateValidity();
ReportProgress("Seek");
}
}

void Next() {
if (wrapped_iter_ptr_) {
assert(Valid());
wrapped_iter_ptr_->Next();
UpdateValidity();
ReportProgress("Next");
} else {
assert(0);
}
}

RangeTombstone Tombstone() const {
if (wrapped_iter_ptr_) {
assert(Valid());
auto curr_range_ts = wrapped_iter_ptr_->Tombstone();
if (HasUpperBound() == false) {
return curr_range_ts;
}

assert(comparator_->Compare(curr_range_ts.start_key_, upper_bound_) < 0);
auto curr_range_end_vs_upper_bound =
comparator_->Compare(curr_range_ts.end_key_, upper_bound_);
if (curr_range_end_vs_upper_bound <= 0) {
return curr_range_ts;
}

// curr range extends beyond the upper bound, return a range that ends at
// the upper bound (exclusive)
return RangeTombstone(curr_range_ts.start_key_, upper_bound_,
curr_range_ts.seq_);
} else {
assert(0);
return RangeTombstone();
}
}

Slice key() {
assert(Valid());
return wrapped_iter_ptr_->start_key().user_key;
}

void Invalidate() {
valid_ = false;
}

std::string ToString() {
if (Valid() == false) {
return "Invalid";
}
return RangeTsToString(Tombstone());
}

void ReportProgress(const std::string& progress_str) {
if (gs_report_iters_progress) {
std::cout << "Range-TS-Iter: " << progress_str << ":" << ToString() << '\n';
}
}

private:
void UpdateValidity() {
if (wrapped_iter_ptr_ == nullptr) {
valid_ = false;
} else {
valid_ = wrapped_iter_ptr_->Valid();
if (valid_ && HasUpperBound()) {
auto curr_range_start_vs_upper_bound = comparator_->Compare(
key(), GetUpperBound());
// The upper bound is exclusive for ranges;
// A range that starts at the upper bound is invalid
if (curr_range_start_vs_upper_bound >= 0) {
valid_ = false;
}
}
}
}

private:
TruncatedRangeDelIterator* wrapped_iter_ptr_ = nullptr;
const Comparator* comparator_ = nullptr;
Slice upper_bound_;
bool valid_ = false;
};

struct GlobalContext {
ReadOptions mutable_read_options;
SequenceNumber seq_num = kMaxSequenceNumber;
GlobalDelList* del_list = nullptr;
Slice target;
std::string csk;
const InternalKeyComparator* icomparator = nullptr;
const Comparator* comparator = nullptr;
std::shared_ptr<Logger> logger;

Expand All @@ -340,7 +467,7 @@ struct GlobalContext {

struct LevelContext {
std::unique_ptr<InternalIteratorWrapper> values_iter;
std::unique_ptr<FragmentedRangeTombstoneIteratorWrapper> range_del_iter;
TruncatedRangeDelIteratorWrapper* range_del_iter = nullptr;

// std::string prev_del_key;

Expand All @@ -349,6 +476,10 @@ struct LevelContext {
ValueCategory value_category = ValueCategory::NONE;

bool new_csk_found_in_level = false;

~LevelContext() {
delete range_del_iter;
}
};

void UpdateCSK(GlobalContext& gc, LevelContext& lc) {
Expand Down Expand Up @@ -699,14 +830,18 @@ Status ProcessMutableMemtable(SuperVersion* super_version, GlobalContext& gc,
lc.values_iter.reset(new InternalIteratorWrapper(
std::move(wrapped_values_iter), gc.comparator, gc.csk));

auto range_del_iter = super_version->mem->NewRangeTombstoneIterator(
gc.mutable_read_options, gc.seq_num, false /* immutable_memtable */);
std::unique_ptr<FragmentedRangeTombstoneIterator> wrapped_range_del_iter;
if (range_del_iter != nullptr) {
wrapped_range_del_iter.reset(std::move(range_del_iter));
TruncatedRangeDelIterator* range_del_iter = nullptr;
auto fragmented_range_del_iter = super_version->mem->NewRangeTombstoneIterator(
gc.mutable_read_options, gc.seq_num, false /* immutable_memtable */);
if (fragmented_range_del_iter == nullptr || fragmented_range_del_iter->empty()) {
delete fragmented_range_del_iter;
fragmented_range_del_iter = nullptr;
} else {
range_del_iter = new TruncatedRangeDelIterator(
std::unique_ptr<FragmentedRangeTombstoneIterator>(fragmented_range_del_iter),
gc.icomparator, nullptr /* smallest */, nullptr /* largest */);
}
lc.range_del_iter.reset(new FragmentedRangeTombstoneIteratorWrapper(
std::move(wrapped_range_del_iter), gc.comparator, gc.csk));
lc.range_del_iter = new TruncatedRangeDelIteratorWrapper(range_del_iter, gc.comparator, gc.csk);

if (gs_debug_prints) printf("Processing Mutable Table\n");
return ProcessLogLevel(gc, lc);
Expand All @@ -726,12 +861,7 @@ Status ProcessImmutableMemtables(SuperVersion* super_version, GlobalContext& gc,
lc.values_iter.reset(new InternalIteratorWrapper(
std::move(memtbl_iters.memtbl_iter), gc.comparator, gc.csk));

std::unique_ptr<FragmentedRangeTombstoneIterator> wrapped_range_del_iter;
if (memtbl_iters.range_ts_iter.get() != nullptr) {
wrapped_range_del_iter.reset(memtbl_iters.range_ts_iter.release());
}
lc.range_del_iter.reset(new FragmentedRangeTombstoneIteratorWrapper(
std::move(wrapped_range_del_iter), gc.comparator, gc.csk));
lc.range_del_iter = new TruncatedRangeDelIteratorWrapper(memtbl_iters.range_del_iter, gc.comparator, gc.csk);

if (gs_debug_prints) printf("Processing Immutable Memtable #%d\n", i);
auto status = ProcessLogLevel(gc, lc);
Expand Down Expand Up @@ -765,13 +895,7 @@ Status ProcessLevel0Files(SuperVersion* super_version, GlobalContext& gc,
LevelContext lc;
lc.values_iter.reset(new InternalIteratorWrapper(
std::move(file_iters.table_iter), gc.comparator, gc.csk));

std::unique_ptr<FragmentedRangeTombstoneIterator> wrapped_range_del_iter;
if (file_iters.range_ts_iter.get() != nullptr) {
wrapped_range_del_iter.reset(file_iters.range_ts_iter.release());
}
lc.range_del_iter.reset(new FragmentedRangeTombstoneIteratorWrapper(
std::move(wrapped_range_del_iter), gc.comparator, gc.csk));
lc.range_del_iter = new TruncatedRangeDelIteratorWrapper(file_iters.range_ts_iter, gc.comparator, gc.csk);

if (gs_debug_prints) printf("Processing Level-0 File #%d\n", i);
auto status = ProcessLogLevel(gc, lc);
Expand Down Expand Up @@ -808,13 +932,7 @@ Status ProcessLevelsGt0(SuperVersion* super_version, GlobalContext& gc,
lc.values_iter.reset(new InternalIteratorWrapper(
std::move(iters.table_iter), gc.comparator, gc.csk));

std::unique_ptr<FragmentedRangeTombstoneIterator> wrapped_range_del_iter;
assert(iters.range_ts_iter.get() == nullptr);
if (iters.range_ts_iter.get() != nullptr) {
wrapped_range_del_iter.reset(iters.range_ts_iter.release());
}
lc.range_del_iter.reset(new FragmentedRangeTombstoneIteratorWrapper(
std::move(wrapped_range_del_iter), gc.comparator, gc.csk));
lc.range_del_iter = new TruncatedRangeDelIteratorWrapper(nullptr, gc.comparator, gc.csk);

if (gs_debug_prints) printf("Processing Level-%d\n", level);
auto status = ProcessLogLevel(gc, lc);
Expand Down Expand Up @@ -857,6 +975,7 @@ Status DBImpl::GetSmallestAtOrAfter(const ReadOptions& read_options,
gc.target = target;
}
gc.csk.clear();
gc.icomparator = &cfd->ioptions()->internal_comparator;
gc.comparator = cfd->user_comparator();
gc.logger = immutable_db_options_.info_log;

Expand Down
19 changes: 12 additions & 7 deletions db/memtable_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -282,20 +282,25 @@ MemTableListVersion::GetIterators(const ReadOptions& read_options,
auto i = 0U;
for (auto& m : memlist_) {
auto mem_iter = m->NewIterator(read_options, arena);
FragmentedRangeTombstoneIterator* range_ts_iter = nullptr;

TruncatedRangeDelIterator* range_del_iter = nullptr;
if (read_options.ignore_range_deletions == false) {
// Except for snapshot read, using kMaxSequenceNumber is OK because these
// are immutable memtables.
range_ts_iter = m->NewRangeTombstoneIterator(
FragmentedRangeTombstoneIterator* fragmented_range_del_iter = m->NewRangeTombstoneIterator(
read_options, read_seq, true /* immutale_memtable */);
if (range_ts_iter == nullptr || range_ts_iter->empty()) {
// printf("range_ts_iter null or empty!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n");
if (fragmented_range_del_iter == nullptr || fragmented_range_del_iter->empty()) {
delete fragmented_range_del_iter;
fragmented_range_del_iter = nullptr;
} else {
range_del_iter = new TruncatedRangeDelIterator(
std::unique_ptr<FragmentedRangeTombstoneIterator>(fragmented_range_del_iter),
&m->GetInternalKeyComparator(), nullptr /* smallest */,
nullptr /* largest */);

}
}
iters[i] = {std::move(std::unique_ptr<InternalIterator>(mem_iter)),
std::move(std::unique_ptr<FragmentedRangeTombstoneIterator>(
range_ts_iter))};
iters[i] = {std::move(std::unique_ptr<InternalIterator>(mem_iter)), range_del_iter};
++i;
}

Expand Down
2 changes: 1 addition & 1 deletion db/memtable_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class MemTableListVersion {

struct IteratorPair {
std::unique_ptr<InternalIterator> memtbl_iter;
std::unique_ptr<FragmentedRangeTombstoneIterator> range_ts_iter;
TruncatedRangeDelIterator* range_del_iter;
};
std::vector<IteratorPair> GetIterators(const ReadOptions& options,
Arena* arena);
Expand Down
11 changes: 5 additions & 6 deletions db/range_del_aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ class TruncatedRangeDelIterator {
: *largest_;
}

// TODO - Reconsider / maybe need to account for smallest_ / largest_
RangeTombstone Tombstone() const {
return iter_->Tombstone();
}

SequenceNumber seq() const { return iter_->seq(); }
Slice timestamp() const {
assert(icmp_->user_comparator()->timestamp_size());
Expand All @@ -90,12 +95,6 @@ class TruncatedRangeDelIterator {

SequenceNumber lower_bound() const { return iter_->lower_bound(); }

// TODO - CONSIDER WHAT TO DO ABOUT THIS!!!!
std::unique_ptr<FragmentedRangeTombstoneIterator>
StealInternalIterAndInvalidate() {
return std::move(iter_);
}

private:
std::unique_ptr<FragmentedRangeTombstoneIterator> iter_;
const InternalKeyComparator* icmp_;
Expand Down
23 changes: 3 additions & 20 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2096,20 +2096,8 @@ std::vector<Version::IteratorPair> Version::GetIteratorsForLevel0(
/*largest_compaction_key=*/nullptr, allow_unprepared_value,
mutable_cf_options_.block_protection_bytes_per_key, &tombstone_iter);

// TODO - Handle read_options.ignore_range_deletions!!!!
FragmentedRangeTombstoneIterator* range_ts_iter = nullptr;
if ((read_options.ignore_range_deletions == false) &&
(tombstone_iter != nullptr)) {
range_ts_iter =
tombstone_iter->StealInternalIterAndInvalidate().release();
}
delete tombstone_iter;
tombstone_iter = nullptr;

iters.push_back(
{std::move(std::unique_ptr<InternalIterator>(table_iter)),
std::move(std::unique_ptr<FragmentedRangeTombstoneIterator>(
range_ts_iter))});
{std::move(std::unique_ptr<InternalIterator>(table_iter)), tombstone_iter});
}

return iters;
Expand Down Expand Up @@ -2142,13 +2130,8 @@ Version::IteratorPair Version::GetIteratorsForLevelGt0(int level,
/*compaction_boundaries=*/nullptr, allow_unprepared_value,
&tombstone_iter_ptr);


FragmentedRangeTombstoneIterator* range_ts_iter = nullptr;

// TODO - HANDLE RANGE DEL ITER

return {std::move(std::unique_ptr<InternalIterator>(level_iter)),
std::move(std::unique_ptr<FragmentedRangeTombstoneIterator>(range_ts_iter))};
// TODO - HANDLE RANGE DEL ITER
return {std::move(std::unique_ptr<InternalIterator>(level_iter)), nullptr};
}

Status Version::OverlapWithLevelIterator(const ReadOptions& read_options,
Expand Down
2 changes: 1 addition & 1 deletion db/version_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ class Version {

struct IteratorPair {
std::unique_ptr<InternalIterator> table_iter;
std::unique_ptr<FragmentedRangeTombstoneIterator> range_ts_iter;
TruncatedRangeDelIterator* range_ts_iter;
};

// TODO - Consider using auto-vector or, adding to some entity like the
Expand Down

0 comments on commit 80f928d

Please sign in to comment.