Skip to content

Commit

Permalink
Initial version of an external table reader interface
Browse files Browse the repository at this point in the history
  • Loading branch information
anand1976 committed Feb 14, 2025
1 parent e697219 commit 00b2926
Show file tree
Hide file tree
Showing 7 changed files with 544 additions and 0 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,7 @@ set(SOURCES
table/cuckoo/cuckoo_table_builder.cc
table/cuckoo/cuckoo_table_factory.cc
table/cuckoo/cuckoo_table_reader.cc
table/external_table_reader.cc
table/format.cc
table/get_context.cc
table/iterator.cc
Expand Down
2 changes: 2 additions & 0 deletions db/dbformat.h
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,8 @@ class InternalKey {
// Intended only to be used together with ConvertFromUserKey().
std::string* rep() { return &rep_; }

const std::string* const_rep() const { return &rep_; }

// Assuming that *rep() contains a user key, this method makes internal key
// out of it in-place. This saves a memcpy compared to Set()/SetFrom().
void ConvertFromUserKey(SequenceNumber s, ValueType t) {
Expand Down
120 changes: 120 additions & 0 deletions include/rocksdb/external_table_reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

#pragma once

#include "rocksdb/customizable.h"
#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
#include "rocksdb/status.h"

namespace ROCKSDB_NAMESPACE {

class ExternalTableFactory;

// EXPERIMENTAL
// The interface defined in this file is subject to change at any time without
// warning!!


// This file defines an interface for plugging in an external table reader
// into RocksDB. The external table reader will be used instead of the
// BlockBasedTable to load and query sst files. As of now, creating the
// external table files using RocksDB is not supported, but will be added in
// the near future. The external table files can be created outside and
// RocksDB and ingested into a RocksDB instance using the IngestExternalFIle()
// API.
//
// Initial support is for loading and querying the files using an
// SstFileReader. We will add support for ingestion of an external table
// into a limited RocksDB instance that only supports ingestion and not live
// writes in the near future. It'll be followed by support for replacing the
// column family by ingesting a new set of files. In all cases, the external
// table files will only be allowed in the bottommost level.
//
// The external table reader can support one or both of the following layouts -
// 1. Total order seek - All the keys in the files are in sorted order, and a
// user can seek to the first, last, or any key in between and iterate
// forwards or backwards till the end of the range. To support this mode,
// the implementation needs to use the comparator passed in
// ExternalTableOptions to enforce the key ordering. The prefix_extractor
// in ExternalTableOptions and the ExternalTableReader interfaces can be
// ignored.
// 2. Prefix seek - In this mode, the prefix_extractor is used to extract the
// prefix from a key. All the keys sharing the same prefix are ordered in
// ascending order according to the comparator. However, no specific
// ordering is required across prefixes. Users can scan keys by seeking
// to a specific key inside a prefix, and iterate forwards or backwards
// within the prefix. The prefix_same_as_start flag in ReadOptions will
// be true.
// 3. Both - If supporting both of the above, a user can seek inside a prefix
// and iterate beyond the prefix. The prefix_same_as_start in ReadOptions
// will be false. Additionally, the total_order_seek flag can be set to
// true to seek to the first non-empty prefix (as determined by the key
// order) if the seek prefix is empty.
//
// Many of the options in ReadOptions may not be relevant to the external
// table implementation.
// TODO: Specify which options are relevant

class ExternalTableReader {
public:
virtual ~ExternalTableReader() {}

// Return an Iterator that can be used to scan the table file.
// The read_options can optionally contain the upper bound
// key (exclusive) of the scan in iterate_upper_bound.
virtual Iterator* NewIterator(const ReadOptions& read_options,
const SliceTransform* prefix_extractor) = 0;

// Point lookup the given key and return its value
virtual Status Get(const ReadOptions& read_options, const Slice& key,
const SliceTransform* prefix_extractor,
std::string* value) = 0;

// Point lookup the given vector of keys and return the values, as well
// as status of each individual lookup in statuses.
virtual void MultiGet(const ReadOptions& read_options,
const std::vector<Slice>& keys,
const SliceTransform* prefix_extractor,
std::vector<std::string>* values,
std::vector<Status>* statuses) = 0;

// Return TableProperties for the file. At a minimum, the following
// properties need to be returned -
// comparator_name
// num_entries
// raw_key_size
// raw_value_size
virtual std::shared_ptr<const TableProperties> GetTableProperties() const = 0;

virtual Status VerifyChecksum(const ReadOptions& /*ro*/) {
return Status::NotSupported("VerifyChecksum() not supported");
}
};

struct ExternalTableOptions {
const std::shared_ptr<const SliceTransform>& prefix_extractor;
const Comparator* comparator;
};

class ExternalTableFactory : public Customizable {
public:
~ExternalTableFactory() override {}

const char* Name() const override { return "ExternalTableFactory"; }

virtual Status NewTableReader(
const ReadOptions& read_options, const std::string& file_path,
const ExternalTableOptions& table_options,
std::unique_ptr<ExternalTableReader>* table_reader) = 0;
};

// Allocate a TableFactory that wraps around an ExternalTableFactory. Use this
// to allocate and set in ColumnFamilyOptions::table_factory.
std::shared_ptr<TableFactory> NewExternalTableFactory(
std::shared_ptr<ExternalTableFactory> inner_factory);

} // namespace ROCKSDB_NAMESPACE
10 changes: 10 additions & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -1953,6 +1953,16 @@ struct ReadOptions {
// EXPERIMENTAL
Env::IOActivity io_activity = Env::IOActivity::kUnknown;

// EXPERIMENTAL
// An optional weight of values to be returned by a scan. Once the
// weight is exceeded the scan is terminated (i.e Next() invalidates the
// iterator). In the case of a DB with one of the built-in table formats,
// such as BlockBasedTable, the weight is simply the number of key-value
// pairs. In the case of an ExternalTableReader, the weight is passed
// through to the table reader and the interpretation is upto teh reader
// implementation.
uint64_t weight = 0;

// *** END options for RocksDB internal use only ***

ReadOptions() {}
Expand Down
1 change: 1 addition & 0 deletions src.mk
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ LIB_SOURCES = \
table/cuckoo/cuckoo_table_builder.cc \
table/cuckoo/cuckoo_table_factory.cc \
table/cuckoo/cuckoo_table_reader.cc \
table/external_table_reader.cc \
table/format.cc \
table/get_context.cc \
table/iterator.cc \
Expand Down
215 changes: 215 additions & 0 deletions table/external_table_reader.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

#include "rocksdb/external_table_reader.h"

#include "rocksdb/table.h"
#include "table/internal_iterator.h"
#include "table/table_builder.h"
#include "table/table_reader.h"

namespace ROCKSDB_NAMESPACE {

namespace {

class ExternalTableIterator : public InternalIterator {
public:
explicit ExternalTableIterator(Iterator* iterator) : iterator_(iterator) {}

// No copying allowed
ExternalTableIterator(const ExternalTableIterator&) = delete;
ExternalTableIterator& operator=(const ExternalTableIterator&) = delete;

~ExternalTableIterator() override {}

bool Valid() const override { return iterator_ && iterator_->Valid(); }

void SeekToFirst() override {
status_ = Status::OK();
if (iterator_) {
iterator_->SeekToFirst();
UpdateKey();
}
}

void SeekToLast() override {
status_ = Status::OK();
if (iterator_) {
iterator_->SeekToLast();
UpdateKey();
}
}

void Seek(const Slice& target) override {
status_ = Status::OK();
if (iterator_) {
ParsedInternalKey pkey;
status_ = ParseInternalKey(target, &pkey, /*log_err_key=*/false);
if (status_.ok()) {
iterator_->Seek(pkey.user_key);
UpdateKey();
}
}
}

void SeekForPrev(const Slice& target) override {
status_ = Status::OK();
if (iterator_) {
ParsedInternalKey pkey;
status_ = ParseInternalKey(target, &pkey, /*log_err_key=*/false);
if (status_.ok()) {
iterator_->SeekForPrev(pkey.user_key);
UpdateKey();
}
}
}

void Next() override {
if (iterator_) {
iterator_->Next();
UpdateKey();
}
}

void Prev() override {
if (iterator_) {
iterator_->Prev();
UpdateKey();
}
}

Slice key() const override {
if (iterator_) {
return Slice(*key_.const_rep());
}
return Slice();
}

Slice value() const override {
if (iterator_) {
return iterator_->value();
}
return Slice();
}

Status status() const override {
return !status_.ok() ? status_
: (iterator_ ? iterator_->status() : Status::OK());
}

private:
std::unique_ptr<Iterator> iterator_;
InternalKey key_;
Status status_;

void UpdateKey() { key_.Set(iterator_->key(), 0, ValueType::kTypeValue); }
};

class ExternalTableReaderAdapter : public TableReader {
public:
explicit ExternalTableReaderAdapter(
std::unique_ptr<ExternalTableReader> reader)
: reader_(std::move(reader)) {}

~ExternalTableReaderAdapter() override {}

// No copying allowed
ExternalTableReaderAdapter(const ExternalTableReaderAdapter&) = delete;
ExternalTableReaderAdapter& operator=(const ExternalTableReaderAdapter&) =
delete;

InternalIterator* NewIterator(
const ReadOptions& read_options, const SliceTransform* prefix_extractor,
Arena* /* arena */, bool /* skip_filters */,
TableReaderCaller /* caller */,
size_t /* compaction_readahead_size */ = 0,
bool /* allow_unprepared_value */ = false) override {
auto iterator = reader_->NewIterator(read_options, prefix_extractor);
return new ExternalTableIterator(iterator);
}

uint64_t ApproximateOffsetOf(const ReadOptions&, const Slice&,
TableReaderCaller) override {
return 0;
}

uint64_t ApproximateSize(const ReadOptions&, const Slice&, const Slice&,
TableReaderCaller) override {
return 0;
}

void SetupForCompaction() override {}

std::shared_ptr<const TableProperties> GetTableProperties() const override {
std::shared_ptr<TableProperties> props =
std::make_shared<TableProperties>(*reader_->GetTableProperties());
props->key_largest_seqno = 0;
return props;
}

size_t ApproximateMemoryUsage() const override { return 0; }

Status Get(const ReadOptions&, const Slice&, GetContext*,
const SliceTransform*, bool = false) override {
return Status::NotSupported(
"Get() not supported on external file iterator");
}

virtual Status VerifyChecksum(const ReadOptions& /*ro*/,
TableReaderCaller /*caller*/) override {
return Status::OK();
}

private:
std::unique_ptr<ExternalTableReader> reader_;
};

class ExternalTableFactoryAdapter : public TableFactory {
public:
explicit ExternalTableFactoryAdapter(
std::shared_ptr<ExternalTableFactory> inner)
: inner_(std::move(inner)) {}

const char* Name() const override { return inner_->Name(); }

Status NewTableReader(
const ReadOptions& ro, const TableReaderOptions& topts,
std::unique_ptr<RandomAccessFileReader>&& file, uint64_t /* file_size */,
std::unique_ptr<TableReader>* table_reader,
bool /* prefetch_index_and_filter_in_cache */) const override {
std::unique_ptr<ExternalTableReader> reader;
ExternalTableOptions ext_topts{
.prefix_extractor = topts.prefix_extractor,
.comparator = topts.ioptions.user_comparator};
auto status =
inner_->NewTableReader(ro, file->file_name(), ext_topts, &reader);
if (!status.ok()) {
return status;
}
table_reader->reset(new ExternalTableReaderAdapter(std::move(reader)));
return Status::OK();
}

TableBuilder* NewTableBuilder(const TableBuilderOptions&,
WritableFileWriter*) const override {
return nullptr;
}

std::unique_ptr<TableFactory> Clone() const override { return nullptr; }

private:
std::shared_ptr<ExternalTableFactory> inner_;
};

} // namespace

std::shared_ptr<TableFactory> NewExternalTableFactory(
std::shared_ptr<ExternalTableFactory> inner_factory) {
std::shared_ptr<TableFactory> res;
res.reset(new ExternalTableFactoryAdapter(std::move(inner_factory)));
return res;
}

} // namespace ROCKSDB_NAMESPACE
Loading

0 comments on commit 00b2926

Please sign in to comment.