Skip to content

Commit

Permalink
Refactor more
Browse files Browse the repository at this point in the history
Signed-off-by: Jin Hai <[email protected]>
  • Loading branch information
JinHai-CN committed Jan 24, 2025
1 parent d0c013f commit 8075033
Show file tree
Hide file tree
Showing 11 changed files with 102 additions and 65 deletions.
4 changes: 4 additions & 0 deletions src/common/status.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,10 @@ Status Status::InvalidQueryOption(const String &detail) {

Status Status::FailToStartTxn(const String &detail) { return Status(ErrorCode::kFailToStartTxn, MakeUnique<String>(detail)); }

Status Status::AlreadyLocked(const String &detail) { return Status(ErrorCode::kAlreadyLocked, MakeUnique<String>(detail)); }

Status Status::NotLocked(const String &detail) { return Status(ErrorCode::kNotLocked, MakeUnique<String>(detail)); }

// 4. TXN fail
Status Status::TxnRollback(u64 txn_id, const String &rollback_reason) {
return Status(ErrorCode::kTxnRollback, MakeUnique<String>(fmt::format("Transaction: {} is rollback. {}", txn_id, rollback_reason)));
Expand Down
4 changes: 4 additions & 0 deletions src/common/status.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ export enum class ErrorCode : long {
kUnknown = 3091,
kInvalidQueryOption = 3092,
kFailToStartTxn = 3093,
kAlreadyLocked = 3094,
kNotLocked = 3095,

// 4. Txn fail
kTxnRollback = 4001,
Expand Down Expand Up @@ -317,6 +319,8 @@ public:
static Status Unknown(const String &name);
static Status InvalidQueryOption(const String &detail);
static Status FailToStartTxn(const String &detail);
static Status AlreadyLocked(const String &detail);
static Status NotLocked(const String &detail);

// 4. TXN fail
static Status TxnRollback(u64 txn_id, const String &rollback_reason = "no reanson gived");
Expand Down
25 changes: 8 additions & 17 deletions src/executor/operator/physical_alter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,13 @@ bool PhysicalAddColumns::Execute(QueryContext *query_context, OperatorState *ope
return true;
}

LOG_INFO(fmt::format("Locking table {} for add columns", *table_entry_->GetTableName()));
table_entry_->SetLocked();
LOG_INFO(fmt::format("Table {} is locked", *table_entry_->GetTableName()));

Txn *txn = query_context->GetTxn();
txn->LockTable(*table_info_->db_name_, *table_info_->table_name_);
DeferFn defer_fn([&]() {
LOG_INFO(fmt::format("Table {} unlocked.", *table_entry_->GetTableName()));
table_entry_->SetUnlock();
txn->UnLockTable(*table_info_->db_name_, *table_info_->table_name_);
});

Txn *txn = query_context->GetTxn();

auto status = txn->AddColumns(table_entry_, column_defs_);
auto status = txn->AddColumns(*table_info_->db_name_, *table_info_->table_name_, column_defs_);
if (!status.ok()) {
RecoverableError(status);
}
Expand All @@ -86,17 +81,13 @@ bool PhysicalDropColumns::Execute(QueryContext *query_context, OperatorState *op
return true;
}

LOG_INFO(fmt::format("Locking table {} for add columns", *table_entry_->GetTableName()));
table_entry_->SetLocked();
LOG_INFO(fmt::format("Table {} is locked", *table_entry_->GetTableName()));

Txn *txn = query_context->GetTxn();
txn->LockTable(*table_info_->db_name_, *table_info_->table_name_);
DeferFn defer_fn([&]() {
LOG_INFO(fmt::format("Table {} unlocked.", *table_entry_->GetTableName()));
table_entry_->SetUnlock();
txn->UnLockTable(*table_info_->db_name_, *table_info_->table_name_);
});

Txn *txn = query_context->GetTxn();
auto status = txn->DropColumns(table_entry_, column_names_);
auto status = txn->DropColumns(*table_info_->db_name_, *table_info_->table_name_, column_names_);
if (!status.ok()) {
RecoverableError(status);
}
Expand Down
20 changes: 10 additions & 10 deletions src/executor/operator/physical_alter.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import infinity_exception;
import internal_types;
import data_type;
import logger;
import table_entry;
import meta_info;
import alter_statement;
import column_def;
import constant_expr;
Expand All @@ -36,13 +36,13 @@ namespace infinity {

export class PhysicalAlter : public PhysicalOperator {
public:
explicit PhysicalAlter(TableEntry *table_entry,
explicit PhysicalAlter(const SharedPtr<TableInfo>& table_info,
AlterStatementType type,
SharedPtr<Vector<String>> output_names,
SharedPtr<Vector<SharedPtr<DataType>>> output_types,
u64 id,
SharedPtr<Vector<LoadMeta>> load_metas)
: PhysicalOperator(PhysicalOperatorType::kAlter, nullptr, nullptr, id, load_metas), table_entry_(table_entry),
: PhysicalOperator(PhysicalOperatorType::kAlter, nullptr, nullptr, id, load_metas), table_info_(table_info),
output_names_(std::move(output_names)), output_types_(std::move(output_types)) {}

~PhysicalAlter() override = default;
Expand All @@ -59,20 +59,20 @@ public:

protected:
AlterStatementType type_;
TableEntry *table_entry_{};
SharedPtr<TableInfo> table_info_{};
SharedPtr<Vector<String>> output_names_{};
SharedPtr<Vector<SharedPtr<DataType>>> output_types_{};
};

export class PhysicalRenameTable final : public PhysicalAlter {
public:
PhysicalRenameTable(TableEntry *table_entry,
PhysicalRenameTable(const SharedPtr<TableInfo>& table_info,
String &&new_table_name,
SharedPtr<Vector<String>> output_names,
SharedPtr<Vector<SharedPtr<DataType>>> output_types,
u64 id,
SharedPtr<Vector<LoadMeta>> load_metas)
: PhysicalAlter(table_entry, AlterStatementType::kRenameTable, std::move(output_names), std::move(output_types), id, load_metas),
: PhysicalAlter(table_info, AlterStatementType::kRenameTable, std::move(output_names), std::move(output_types), id, load_metas),
new_table_name_(std::move(new_table_name)) {}

void Init(QueryContext* query_context) override;
Expand All @@ -85,13 +85,13 @@ private:

export class PhysicalAddColumns final : public PhysicalAlter {
public:
PhysicalAddColumns(TableEntry *table_entry,
PhysicalAddColumns(const SharedPtr<TableInfo>& table_info,
const Vector<SharedPtr<ColumnDef>> &column_defs,
SharedPtr<Vector<String>> output_names,
SharedPtr<Vector<SharedPtr<DataType>>> output_types,
u64 id,
SharedPtr<Vector<LoadMeta>> load_metas)
: PhysicalAlter(table_entry, AlterStatementType::kAddColumns, std::move(output_names), std::move(output_types), id, load_metas),
: PhysicalAlter(table_info, AlterStatementType::kAddColumns, std::move(output_names), std::move(output_types), id, load_metas),
column_defs_(column_defs) {}

void Init(QueryContext* query_context) override;
Expand All @@ -104,13 +104,13 @@ private:

export class PhysicalDropColumns final : public PhysicalAlter {
public:
PhysicalDropColumns(TableEntry *table_entry,
PhysicalDropColumns(const SharedPtr<TableInfo>& table_info,
const Vector<String> &column_names,
SharedPtr<Vector<String>> output_names,
SharedPtr<Vector<SharedPtr<DataType>>> output_types,
u64 id,
SharedPtr<Vector<LoadMeta>> load_metas)
: PhysicalAlter(table_entry, AlterStatementType::kDropColumns, std::move(output_names), std::move(output_types), id, load_metas),
: PhysicalAlter(table_info, AlterStatementType::kDropColumns, std::move(output_names), std::move(output_types), id, load_metas),
column_names_(column_names) {}

void Init(QueryContext* query_context) override;
Expand Down
6 changes: 3 additions & 3 deletions src/executor/physical_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ UniquePtr<PhysicalOperator> PhysicalPlanner::BuildAlter(const SharedPtr<LogicalN
switch (logical_alter->type_) {
case AlterStatementType::kRenameTable: {
auto *logical_rename_table = static_cast<LogicalRenameTable *>(logical_alter);
return MakeUnique<PhysicalRenameTable>(logical_rename_table->table_entry_,
return MakeUnique<PhysicalRenameTable>(logical_rename_table->table_info_,
std::move(logical_rename_table->new_table_name_),
logical_operator->GetOutputNames(),
logical_operator->GetOutputTypes(),
Expand All @@ -607,7 +607,7 @@ UniquePtr<PhysicalOperator> PhysicalPlanner::BuildAlter(const SharedPtr<LogicalN
}
case AlterStatementType::kAddColumns: {
auto *logical_add_columns = static_cast<LogicalAddColumns *>(logical_alter);
return MakeUnique<PhysicalAddColumns>(logical_add_columns->table_entry_,
return MakeUnique<PhysicalAddColumns>(logical_add_columns->table_info_,
logical_add_columns->column_defs_,
logical_operator->GetOutputNames(),
logical_operator->GetOutputTypes(),
Expand All @@ -616,7 +616,7 @@ UniquePtr<PhysicalOperator> PhysicalPlanner::BuildAlter(const SharedPtr<LogicalN
}
case AlterStatementType::kDropColumns: {
auto *logical_drop_columns = static_cast<LogicalDropColumns *>(logical_alter);
return MakeUnique<PhysicalDropColumns>(logical_drop_columns->table_entry_,
return MakeUnique<PhysicalDropColumns>(logical_drop_columns->table_info_,
logical_drop_columns->column_names_,
logical_operator->GetOutputNames(),
logical_operator->GetOutputTypes(),
Expand Down
17 changes: 11 additions & 6 deletions src/planner/logical_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1196,16 +1196,21 @@ Status LogicalPlanner::BuildAlter(AlterStatement *statement, SharedPtr<BindConte
statement->schema_name_ = query_context_ptr_->schema_name();
}
Txn *txn = query_context_ptr_->GetTxn();
auto [table_entry, status] = txn->GetTableByName(statement->schema_name_, statement->table_name_);
if (!status.ok()) {
RecoverableError(status);
auto [table_entry, entry_status] = txn->GetTableByName(statement->schema_name_, statement->table_name_);
if (!entry_status.ok()) {
RecoverableError(entry_status);
}

auto [table_info, info_status] = txn->GetTableInfo(statement->schema_name_, statement->table_name_);
if (!info_status.ok()) {
RecoverableError(info_status);
}

switch (statement->type_) {
case AlterStatementType::kRenameTable: {
auto *rename_table_statement = static_cast<RenameTableStatement *>(statement);
this->logical_plan_ = MakeShared<LogicalRenameTable>(bind_context_ptr->GetNewLogicalNodeId(),
table_entry,
table_info,
std::move(rename_table_statement->new_table_name_));
break;
}
Expand All @@ -1229,7 +1234,7 @@ Status LogicalPlanner::BuildAlter(AlterStatement *statement, SharedPtr<BindConte
RecoverableError(Status::DuplicateColumnName(column_def->name()));
}
}
this->logical_plan_ = MakeShared<LogicalAddColumns>(bind_context_ptr->GetNewLogicalNodeId(), table_entry, std::move(column_defs));
this->logical_plan_ = MakeShared<LogicalAddColumns>(bind_context_ptr->GetNewLogicalNodeId(), table_info, std::move(column_defs));
break;
}
case AlterStatementType::kDropColumns: {
Expand All @@ -1241,7 +1246,7 @@ Status LogicalPlanner::BuildAlter(AlterStatement *statement, SharedPtr<BindConte
RecoverableError(Status::NotSupport(fmt::format("Drop column {} which is indexed.", column_name)));
}
}
this->logical_plan_ = MakeShared<LogicalDropColumns>(bind_context_ptr->GetNewLogicalNodeId(), table_entry, std::move(column_names));
this->logical_plan_ = MakeShared<LogicalDropColumns>(bind_context_ptr->GetNewLogicalNodeId(), table_info, std::move(column_names));
break;
}
default: {
Expand Down
20 changes: 10 additions & 10 deletions src/planner/node/logical_alter.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import stl;
import logical_node_type;
import column_binding;
import logical_node;
import table_entry;
import meta_info;
import internal_types;
import data_type;
import alter_statement;
Expand All @@ -30,8 +30,8 @@ namespace infinity {

export class LogicalAlter : public LogicalNode {
public:
LogicalAlter(u64 node_id, TableEntry *table_entry, AlterStatementType type)
: LogicalNode(node_id, LogicalNodeType::kAlter), type_(type), table_entry_(table_entry) {}
LogicalAlter(u64 node_id, const SharedPtr<TableInfo> &table_info, AlterStatementType type)
: LogicalNode(node_id, LogicalNodeType::kAlter), type_(type), table_info_(table_info) {}

Vector<ColumnBinding> GetColumnBindings() const final;

Expand All @@ -41,13 +41,13 @@ public:

public:
AlterStatementType type_;
TableEntry *table_entry_;
SharedPtr<TableInfo> table_info_;
};

export class LogicalRenameTable : public LogicalAlter {
public:
LogicalRenameTable(u64 node_id, TableEntry *table_entry, String new_table_name)
: LogicalAlter(node_id, table_entry, AlterStatementType::kRenameTable), new_table_name_(std::move(new_table_name)) {}
LogicalRenameTable(u64 node_id, const SharedPtr<TableInfo> &table_info, String new_table_name)
: LogicalAlter(node_id, table_info, AlterStatementType::kRenameTable), new_table_name_(std::move(new_table_name)) {}

String ToString(i64 &space) const final;

Expand All @@ -59,8 +59,8 @@ public:

export class LogicalAddColumns : public LogicalAlter {
public:
LogicalAddColumns(u64 node_id, TableEntry *table_entry, Vector<SharedPtr<ColumnDef>> column_defs)
: LogicalAlter(node_id, table_entry, AlterStatementType::kAddColumns), column_defs_(std::move(column_defs)) {}
LogicalAddColumns(u64 node_id, const SharedPtr<TableInfo> &table_info, Vector<SharedPtr<ColumnDef>> column_defs)
: LogicalAlter(node_id, table_info, AlterStatementType::kAddColumns), column_defs_(std::move(column_defs)) {}

String ToString(i64 &space) const final;

Expand All @@ -72,8 +72,8 @@ public:

export class LogicalDropColumns : public LogicalAlter {
public:
LogicalDropColumns(u64 node_id, TableEntry *table_entry, Vector<String> column_name)
: LogicalAlter(node_id, table_entry, AlterStatementType::kDropColumns), column_names_(std::move(column_name)) {}
LogicalDropColumns(u64 node_id, const SharedPtr<TableInfo> &table_info, Vector<String> column_name)
: LogicalAlter(node_id, table_info, AlterStatementType::kDropColumns), column_names_(std::move(column_name)) {}

String ToString(i64 &space) const final;

Expand Down
18 changes: 11 additions & 7 deletions src/storage/meta/entry/table_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ SharedPtr<TableEntry> TableEntry::ApplyTableSnapshot(TableMeta *table_meta,
return table_entry;
}

SharedPtr<TableInfo> TableEntry::GetTableInfo(Txn* txn) {
SharedPtr<TableInfo> TableEntry::GetTableInfo(Txn *txn) {
SharedPtr<TableInfo> table_info = MakeShared<TableInfo>();
table_info->db_name_ = this->GetDBName();
table_info->table_name_ = this->table_name_;
Expand Down Expand Up @@ -1614,27 +1614,31 @@ void TableEntry::DecWriteTxnNum() {
}
}

void TableEntry::SetLocked() {
Status TableEntry::SetLocked() {
std::unique_lock lock(mtx_);
if (locked_) {
LOG_WARN(fmt::format("Table {} has already been locked", *GetTableName()));
return;
String error_message = fmt::format("Table {} has already been locked", *GetTableName());
LOG_WARN(error_message);
return Status::AlreadyLocked(error_message);
}
if (write_txn_num_ > 0) {
wait_lock_ = true;
cv_.wait(lock, [&] { return write_txn_num_ == 0; });
wait_lock_ = false;
}
locked_ = true;
return Status::OK();
}

void TableEntry::SetUnlock() {
Status TableEntry::SetUnlock() {
std::lock_guard lock(mtx_);
if (!locked_) {
LOG_WARN(fmt::format("Table {} is not locked", *GetTableName()));
return;
String error_message = fmt::format("Table {} is not locked", *GetTableName());
LOG_WARN(error_message);
return Status::NotLocked(error_message);
}
locked_ = false;
return Status::OK();
}

bool TableEntry::SetCompact(TableStatus &status, Txn *txn) {
Expand Down
6 changes: 3 additions & 3 deletions src/storage/meta/entry/table_entry.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public:
ApplyTableSnapshot(TableMeta *table_meta, const SharedPtr<TableSnapshotInfo> &table_snapshot_info, TransactionID txn_id, TxnTimeStamp begin_ts);

public:
SharedPtr<TableInfo> GetTableInfo(Txn* txn);
SharedPtr<TableInfo> GetTableInfo(Txn *txn);

Tuple<TableIndexEntry *, Status>
CreateIndex(const SharedPtr<IndexBase> &index_base, ConflictType conflict_type, TransactionID txn_id, TxnTimeStamp begin_ts, TxnManager *txn_mgr);
Expand Down Expand Up @@ -374,9 +374,9 @@ public:

void DecWriteTxnNum();

void SetLocked();
Status SetLocked();

void SetUnlock();
Status SetUnlock();

enum struct TableStatus : u8 {
kNone = 0,
Expand Down
Loading

0 comments on commit 8075033

Please sign in to comment.