Skip to content

Commit

Permalink
Refactor show: block column
Browse files Browse the repository at this point in the history
Signed-off-by: Jin Hai <[email protected]>
  • Loading branch information
JinHai-CN committed Jan 30, 2025
1 parent cac462e commit 628b8fd
Show file tree
Hide file tree
Showing 12 changed files with 128 additions and 81 deletions.
72 changes: 8 additions & 64 deletions src/executor/operator/physical_show.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2481,50 +2481,13 @@ void PhysicalShow::ExecuteShowBlockDetail(QueryContext *query_context, ShowOpera

void PhysicalShow::ExecuteShowBlockColumn(QueryContext *query_context, ShowOperatorState *show_operator_state) {
auto txn = query_context->GetTxn();
TxnTimeStamp begin_ts = txn->BeginTS();

auto [table_entry, status] = txn->GetTableByName(db_name_, *object_name_);
auto [block_column_info, status] = txn->GetBlockColumnInfo(db_name_, *object_name_, *segment_id_, *block_id_, *column_id_);
if (!status.ok()) {
show_operator_state->status_ = status.clone();
RecoverableError(status);
return;
}

SizeT column_count = table_entry->ColumnCount();
if (!column_id_.has_value()) {
String error_message = "No column id is given.";
UnrecoverableError(error_message);
return;
}
SizeT table_column_id = *column_id_;

if (table_column_id >= column_count) {
Status status = Status::ColumnNotExist(fmt::format("index {}", table_column_id));
RecoverableError(status);
return;
}

auto segment_entry = table_entry->GetSegmentByID(*segment_id_, begin_ts);
if (!segment_entry) {
Status status = Status::SegmentNotExist(*segment_id_);
RecoverableError(status);
return;
}

auto block_entry = segment_entry->GetBlockEntryByID(*block_id_);
if (!block_entry) {
Status status = Status::BlockNotExist(*block_id_);
RecoverableError(status);
return;
}

auto column_block_entry = block_entry->GetColumnBlockEntry(table_column_id);
if (!column_block_entry) {
String error_message = fmt::format("Attempt to get column {} from block {}", table_column_id, *block_id_);
UnrecoverableError(error_message);
return;
}

auto varchar_type = MakeShared<DataType>(LogicalType::kVarchar);
UniquePtr<DataBlock> output_block_ptr = DataBlock::MakeUniquePtr();
Vector<SharedPtr<DataType>> column_types{
Expand All @@ -2533,23 +2496,6 @@ void PhysicalShow::ExecuteShowBlockColumn(QueryContext *query_context, ShowOpera
};
output_block_ptr->Init(column_types);

{
SizeT column_id = 0;
{
Value value = Value::MakeVarchar("column_name");
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
}

++column_id;
{
const ColumnDef *column_def = table_entry->GetColumnDefByID(table_column_id);
Value value = Value::MakeVarchar(column_def->name_);
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
}
}

{
SizeT column_id = 0;
{
Expand All @@ -2560,7 +2506,7 @@ void PhysicalShow::ExecuteShowBlockColumn(QueryContext *query_context, ShowOpera

++column_id;
{
Value value = Value::MakeVarchar(std::to_string(table_column_id));
Value value = Value::MakeVarchar(std::to_string(block_column_info->column_id_));
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
}
Expand All @@ -2576,7 +2522,7 @@ void PhysicalShow::ExecuteShowBlockColumn(QueryContext *query_context, ShowOpera

++column_id;
{
Value value = Value::MakeVarchar(column_block_entry->column_type()->ToString());
Value value = Value::MakeVarchar(block_column_info->data_type_->ToString());
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
}
Expand All @@ -2592,13 +2538,12 @@ void PhysicalShow::ExecuteShowBlockColumn(QueryContext *query_context, ShowOpera

++column_id;
{
Value value = Value::MakeVarchar(column_block_entry->FilePath());
Value value = Value::MakeVarchar(*block_column_info->filename_);
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
}
}

SizeT outline_count = column_block_entry->OutlineBufferCount();
{
SizeT column_id = 0;
{
Expand All @@ -2609,7 +2554,7 @@ void PhysicalShow::ExecuteShowBlockColumn(QueryContext *query_context, ShowOpera

++column_id;
{
Value value = Value::MakeVarchar(std::to_string(outline_count));
Value value = Value::MakeVarchar(std::to_string(block_column_info->extra_file_count_));
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
}
Expand All @@ -2625,8 +2570,7 @@ void PhysicalShow::ExecuteShowBlockColumn(QueryContext *query_context, ShowOpera

++column_id;
{
SizeT storage_size = column_block_entry->GetStorageSize();
String storage_size_str = Utility::FormatByteSize(storage_size);
String storage_size_str = Utility::FormatByteSize(block_column_info->storage_size_);
Value value = Value::MakeVarchar(storage_size_str);
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
Expand All @@ -2644,8 +2588,8 @@ void PhysicalShow::ExecuteShowBlockColumn(QueryContext *query_context, ShowOpera
++column_id;
{
String outline_storage;
for (SizeT idx = 0; idx < outline_count; ++idx) {
outline_storage += *(column_block_entry->OutlineFilename(idx));
for (SizeT idx = 0; idx < block_column_info->extra_file_count_; ++idx) {
outline_storage += *block_column_info->extra_file_names_[idx];
outline_storage += ";";
}

Expand Down
10 changes: 10 additions & 0 deletions src/storage/common/meta_info.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import stl;
import table_entry_type;
import column_def;
import default_values;
import data_type;

export module meta_info;

Expand Down Expand Up @@ -89,6 +90,15 @@ export struct BlockInfo {
Vector<String> files_{};
};

export struct BlockColumnInfo {
ColumnID column_id_;
SharedPtr<DataType> data_type_{};
SharedPtr<String> filename_{};
i64 storage_size_{};
SizeT extra_file_count_{};
Vector<SharedPtr<String>> extra_file_names_{};
};

export struct TableDetail {
SharedPtr<String> db_name_{};
SharedPtr<String> table_name_{};
Expand Down
34 changes: 30 additions & 4 deletions src/storage/meta/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,12 @@ Tuple<SharedPtr<SegmentInfo>, Status> Catalog::GetSegmentInfo(const String &db_n
return {nullptr, table_status};
}

return {table_entry->GetSegmentInfo(segment_id, txn_ptr), Status::OK()};
SharedPtr<SegmentInfo> segment_info = table_entry->GetSegmentInfo(segment_id, txn_ptr);
if (segment_info == nullptr) {
return {nullptr, Status::SegmentNotExist(segment_id)};
}

return {segment_info, Status::OK()};
}

Tuple<Vector<SharedPtr<SegmentInfo>>, Status> Catalog::GetSegmentsInfo(const String &db_name, const String &table_name, Txn *txn_ptr) {
Expand Down Expand Up @@ -383,12 +388,12 @@ Catalog::GetBlockInfo(const String &db_name, const String &table_name, SegmentID
return {nullptr, Status::SegmentNotExist(segment_id)};
}

auto block_entry = segment_entry->GetBlockEntryByID(block_id);
if (!block_entry) {
auto block_info = segment_entry->GetBlockInfo(block_id, txn_ptr);
if (!block_info) {
return {nullptr, Status::BlockNotExist(block_id)};
}

return {block_entry->GetBlockInfo(txn_ptr), Status::OK()};
return {block_info, Status::OK()};
}

Tuple<Vector<SharedPtr<BlockInfo>>, Status>
Expand All @@ -410,6 +415,27 @@ Catalog::GetBlocksInfo(const String &db_name, const String &table_name, SegmentI
return {segment_entry->GetBlocksInfo(txn_ptr), Status::OK()};
}

Tuple<SharedPtr<BlockColumnInfo>, Status> Catalog::GetBlockColumnInfo(const String &db_name,
const String &table_name,
SegmentID segment_id,
BlockID block_id,
ColumnID column_id,
Txn *txn_ptr) {
TransactionID txn_id = txn_ptr->TxnID();
TxnTimeStamp begin_ts = txn_ptr->BeginTS();
auto [table_entry, table_status] = this->GetTableByName(db_name, table_name, txn_id, begin_ts);
if (!table_status.ok()) {
return {nullptr, table_status};
}

auto segment_entry = table_entry->GetSegmentByID(segment_id, txn_ptr->BeginTS());
if (!segment_entry) {
return {nullptr, Status::SegmentNotExist(segment_id)};
}

return segment_entry->GetBlockColumnInfo(block_id, column_id, txn_ptr);
}

Status Catalog::RemoveTableEntry(TableEntry *table_entry, TransactionID txn_id) {
TableMeta *table_meta = table_entry->GetTableMeta();
LOG_TRACE(fmt::format("Remove a table/collection entry: {}", *table_entry->GetTableName()));
Expand Down
3 changes: 3 additions & 0 deletions src/storage/meta/catalog.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ public:

Tuple<Vector<SharedPtr<BlockInfo>>, Status> GetBlocksInfo(const String &db_name, const String &table_name, SegmentID segment_id, Txn *txn);

Tuple<SharedPtr<BlockColumnInfo>, Status>
GetBlockColumnInfo(const String &db_name, const String &table_name, SegmentID segment_id, BlockID block_id, ColumnID column_id, Txn *txn);

static Status RemoveTableEntry(TableEntry *table_entry, TransactionID txn_id);

// Index Related methods
Expand Down
25 changes: 24 additions & 1 deletion src/storage/meta/entry/block_column_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import internal_types;
import data_type;
import logical_type;
import infinity_context;
import meta_info;

namespace infinity {

Expand Down Expand Up @@ -175,6 +176,28 @@ UniquePtr<BlockColumnEntry> BlockColumnEntry::ApplyBlockColumnSnapshot(BlockEntr
return block_column_entry;
}

SharedPtr<BlockColumnInfo> BlockColumnEntry::GetColumnInfo() const {
SharedPtr<BlockColumnInfo> block_column_info = MakeShared<BlockColumnInfo>();
block_column_info->column_id_ = column_id_;
block_column_info->data_type_ = column_type_;
block_column_info->filename_ = filename_;

SizeT storage_size = 0;
storage_size += buffer_->GetBufferSize();
for (BufferObj *outline_buffer : outline_buffers_) {
storage_size += outline_buffer->GetBufferSize();
}

block_column_info->storage_size_ = storage_size;
SizeT outline_count = outline_buffers_.size();
block_column_info->extra_file_count_ = outline_count;
block_column_info->extra_file_names_.reserve(outline_count);
for (SizeT file_idx = 0; file_idx < outline_count; ++file_idx) {
block_column_info->extra_file_names_.emplace_back(OutlineFilename(file_idx));
}
return block_column_info;
}

SharedPtr<BlockColumnSnapshotInfo> BlockColumnEntry::GetSnapshotInfo() const {
SharedPtr<BlockColumnSnapshotInfo> block_column_snapshot_info = MakeShared<BlockColumnSnapshotInfo>();
block_column_snapshot_info->column_id_ = column_id_;
Expand Down Expand Up @@ -353,7 +376,7 @@ void BlockColumnEntry::Cleanup(CleanupInfoTracer *info_tracer, [[maybe_unused]]
}
}

Vector<String> BlockColumnEntry::GetFilePath(Txn* txn) const { return FilePaths(); }
Vector<String> BlockColumnEntry::GetFilePath(Txn *txn) const { return FilePaths(); }

nlohmann::json BlockColumnEntry::Serialize() {
nlohmann::json json_res;
Expand Down
3 changes: 3 additions & 0 deletions src/storage/meta/entry/block_column_entry.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import column_def;
import value;
import cleanup_scanner;
import snapshot_info;
import meta_info;

namespace infinity {

Expand Down Expand Up @@ -70,6 +71,8 @@ public:
TransactionID txn_id,
TxnTimeStamp begin_ts);

SharedPtr<BlockColumnInfo> GetColumnInfo() const;

SharedPtr<BlockColumnSnapshotInfo> GetSnapshotInfo() const;

nlohmann::json Serialize();
Expand Down
11 changes: 9 additions & 2 deletions src/storage/meta/entry/block_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ SharedPtr<BlockSnapshotInfo> BlockEntry::GetSnapshotInfo() const {
return block_snapshot_info;
}

SharedPtr<BlockInfo> BlockEntry::GetBlockInfo(Txn* txn) const {
SharedPtr<BlockInfo> BlockEntry::GetBlockInfo(Txn *txn) const {
SharedPtr<BlockInfo> block_info = MakeShared<BlockInfo>();
block_info->block_id_ = this->block_id();
block_info->block_dir_ = this->block_dir();
Expand All @@ -214,6 +214,13 @@ SharedPtr<BlockInfo> BlockEntry::GetBlockInfo(Txn* txn) const {
return block_info;
}

Tuple<SharedPtr<BlockColumnInfo>, Status> BlockEntry::GetBlockColumnInfo(ColumnID column_id) const {
if (column_id >= columns_.size()) {
return {nullptr, Status::ColumnNotExist(fmt::format("ColumnID: {}", column_id))};
}
return {columns_[column_id]->GetColumnInfo(), Status::OK()};
}

// Used to replay DeltaOp to update a BlockEntry
void BlockEntry::UpdateBlockReplay(SharedPtr<BlockEntry> block_entry, String block_filter_binary_data) {
block_row_count_ = block_entry->block_row_count_;
Expand Down Expand Up @@ -636,7 +643,7 @@ void BlockEntry::Cleanup(CleanupInfoTracer *info_tracer, bool dropped) {
}
}

Vector<String> BlockEntry::GetFilePath(Txn* txn) const {
Vector<String> BlockEntry::GetFilePath(Txn *txn) const {
std::shared_lock<std::shared_mutex> lock(this->rw_locker_);
Vector<String> res;
res.reserve(columns_.size());
Expand Down
7 changes: 5 additions & 2 deletions src/storage/meta/entry/block_entry.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import cleanup_scanner;
import snapshot_info;
import meta_info;
import txn;
import status;

namespace infinity {

Expand Down Expand Up @@ -106,7 +107,7 @@ public:

void Cleanup(CleanupInfoTracer *info_tracer = nullptr, bool dropped = true) override;

Vector<String> GetFilePath(Txn* txn) const final;
Vector<String> GetFilePath(Txn *txn) const final;

void Flush(TxnTimeStamp checkpoint_ts);

Expand Down Expand Up @@ -201,7 +202,9 @@ public:

SharedPtr<BlockSnapshotInfo> GetSnapshotInfo() const;

SharedPtr<BlockInfo> GetBlockInfo(Txn* txn) const;
SharedPtr<BlockInfo> GetBlockInfo(Txn *txn) const;

Tuple<SharedPtr<BlockColumnInfo>, Status> GetBlockColumnInfo(ColumnID column_id) const;

public:
// Setter, Used in import, segment append block, and block append block in compact
Expand Down
19 changes: 18 additions & 1 deletion src/storage/meta/entry/segment_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,23 @@ SharedPtr<BlockEntry> SegmentEntry::GetBlockEntryByID(BlockID block_id) const {
return block_entries_[block_id];
}

Tuple<SharedPtr<BlockColumnInfo>, Status> SegmentEntry::GetBlockColumnInfo(BlockID block_id, ColumnID column_id, Txn *txn) const {
std::shared_lock lock(rw_locker_);
if (block_id >= block_entries_.size()) {
return {nullptr, Status::BlockNotExist(block_id)};
}
BlockEntry *block_entry = block_entries_[block_id].get();
return block_entry->GetBlockColumnInfo(column_id);
}

SharedPtr<BlockInfo> SegmentEntry::GetBlockInfo(BlockID block_id, Txn *txn_ptr) const {
std::shared_lock lock(rw_locker_);
if (block_id >= block_entries_.size()) {
return nullptr;
}
return block_entries_[block_id]->GetBlockInfo(txn_ptr);
}

Vector<SharedPtr<BlockInfo>> SegmentEntry::GetBlocksInfo(Txn *txn) const {
Vector<SharedPtr<BlockInfo>> blocks_info;
std::shared_lock lock(rw_locker_);
Expand Down Expand Up @@ -762,7 +779,7 @@ void SegmentEntry::DropColumns(const Vector<ColumnID> &column_ids, TxnTableStore
}
}

SharedPtr<SegmentInfo> SegmentEntry::GetSegmentInfo(Txn* txn_ptr) const {
SharedPtr<SegmentInfo> SegmentEntry::GetSegmentInfo(Txn *txn_ptr) const {
SharedPtr<SegmentInfo> segment_info = MakeShared<SegmentInfo>();
std::shared_lock lock(rw_locker_);
segment_info->segment_id_ = segment_id_;
Expand Down
Loading

0 comments on commit 628b8fd

Please sign in to comment.