Skip to content

Commit

Permalink
[CP] fix direct load column idxs in client task
Browse files Browse the repository at this point in the history
  • Loading branch information
suz-yang authored and ant-ob-hengtang committed May 31, 2024
1 parent ed28f13 commit 64f5736
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 47 deletions.
57 changes: 54 additions & 3 deletions src/observer/table_load/ob_table_load_client_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ namespace observer
using namespace common;
using namespace sql;
using namespace storage;
using namespace share::schema;
using namespace table;

/**
Expand Down Expand Up @@ -334,6 +335,57 @@ int ObTableLoadClientTask::create_session_info(uint64_t tenant_id, uint64_t user
return ret;
}

int ObTableLoadClientTask::get_column_idxs(ObIArray<int64_t> &column_idxs) const
{
int ret = OB_SUCCESS;
const uint64_t tenant_id = param_.get_tenant_id();
const uint64_t table_id = param_.get_table_id();
ObSchemaGetterGuard schema_guard;
const ObTableSchema *table_schema = nullptr;
ObArray<int64_t> column_ids; // in user define order
ObArray<ObColDesc> column_descs; // in storage order
bool found_column = true;
column_idxs.reset();
if (OB_FAIL(
ObTableLoadSchema::get_table_schema(tenant_id, table_id, schema_guard, table_schema))) {
LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id));
} else if (OB_FAIL(ObTableLoadSchema::get_user_column_ids(table_schema, column_ids))) {
LOG_WARN("failed to get all column idx", K(ret));
} else if (OB_UNLIKELY(column_ids.empty())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected empty column idxs", KR(ret));
} else if (OB_FAIL(table_schema->get_column_ids(column_descs))) {
LOG_WARN("fail to get column descs", KR(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && OB_LIKELY(found_column) && i < column_descs.count(); ++i) {
const ObColDesc &col_desc = column_descs.at(i);
const ObColumnSchemaV2 *col_schema = table_schema->get_column_schema(col_desc.col_id_);
if (OB_ISNULL(col_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null column schema", KR(ret), K(col_desc));
} else {
found_column = col_schema->is_hidden();
}
// 在用户定义的列数组中找到对应的列
for (int64_t j = 0; OB_SUCC(ret) && OB_LIKELY(!found_column) && j < column_ids.count(); ++j) {
const int64_t column_id = column_ids.at(j);
if (col_desc.col_id_ == column_id) {
found_column = true;
if (OB_FAIL(column_idxs.push_back(j))) {
LOG_WARN("fail to push back column desc", KR(ret), K(column_idxs), K(i), K(col_desc),
K(j), K(column_ids));
}
}
}
}
if (OB_SUCC(ret) && OB_UNLIKELY(!found_column)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected column not found", KR(ret), K(column_ids), K(column_descs),
K(column_idxs));
}
return ret;
}

int ObTableLoadClientTask::init_exec_ctx(int64_t timeout_us, int64_t heartbeat_timeout_us)
{
int ret = OB_SUCCESS;
Expand Down Expand Up @@ -570,9 +622,8 @@ int ObTableLoadClientTask::init_instance()
ObArray<int64_t> column_idxs;
if (OB_FAIL(GCTX.omt_->get_tenant(param_.get_tenant_id(), tenant))) {
LOG_WARN("fail to get tenant handle", KR(ret), K(param_.get_tenant_id()));
} else if (OB_FAIL(ObTableLoadSchema::get_column_idxs(param_.get_tenant_id(),
param_.get_table_id(), column_idxs))) {
LOG_WARN("failed to get column idx", K(ret));
} else if (OB_FAIL(get_column_idxs(column_idxs))) {
LOG_WARN("failed to get column idxs", K(ret));
} else if (OB_UNLIKELY(column_idxs.empty())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected empty column idxs", KR(ret));
Expand Down
1 change: 1 addition & 0 deletions src/observer/table_load/ob_table_load_client_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class ObTableLoadClientTask
sql::ObFreeSessionCtx &free_session_ctx);
int init_exec_ctx(int64_t timeout_us, int64_t heartbeat_timeout_us);

int get_column_idxs(ObIArray<int64_t> &column_idxs) const;
int init_instance();
int commit_instance();
void destroy_instance();
Expand Down
95 changes: 60 additions & 35 deletions src/observer/table_load/ob_table_load_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,9 @@ int ObTableLoadSchema::get_table_schema(uint64_t tenant_id, uint64_t table_id,
return ret;
}

int ObTableLoadSchema::get_column_names(const ObTableSchema *table_schema, ObIAllocator &allocator,
ObIArray<ObString> &column_names)
int ObTableLoadSchema::get_user_column_names(const ObTableSchema *table_schema,
ObIAllocator &allocator,
ObIArray<ObString> &column_names)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(table_schema)) {
Expand All @@ -131,7 +132,9 @@ int ObTableLoadSchema::get_column_names(const ObTableSchema *table_schema, ObIAl
} else if (OB_ISNULL(column_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("The column is null", KR(ret));
} else if (!column_schema->is_hidden() && !column_schema->is_invisible_column()) {
} else if (column_schema->is_hidden()) {
// 不显示隐藏pk
} else {
ObString column_name;
if (OB_FAIL(
ob_write_string(allocator, column_schema->get_column_name_str(), column_name))) {
Expand All @@ -145,46 +148,32 @@ int ObTableLoadSchema::get_column_names(const ObTableSchema *table_schema, ObIAl
return ret;
}

int ObTableLoadSchema::get_column_idxs(uint64_t tenant_id, uint64_t table_id,
ObIArray<int64_t> &column_idxs)
int ObTableLoadSchema::get_user_column_ids(const ObTableSchema *table_schema,
ObIArray<int64_t> &column_ids)
{
int ret = OB_SUCCESS;
column_idxs.reset();
ObSchemaGetterGuard schema_guard;
const ObTableSchema *table_schema = nullptr;
if (OB_FAIL(get_table_schema(tenant_id, table_id, schema_guard, table_schema))) {
LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id));
} else {
ret = get_column_idxs(table_schema, column_idxs);
}
return ret;
}

int ObTableLoadSchema::get_column_idxs(const ObTableSchema *table_schema,
ObIArray<int64_t> &column_idxs)
{
int ret = OB_SUCCESS;
column_idxs.reset();
column_ids.reset();
if (OB_ISNULL(table_schema)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), KP(table_schema));
} else {
ObArray<ObColDesc> column_descs;
column_descs.set_tenant_id(MTL_ID());
if (OB_FAIL(table_schema->get_column_ids(column_descs, false))) {
LOG_WARN("fail to get column ids", KR(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && (i < column_descs.count()); ++i) {
ObColDesc &col_desc = column_descs.at(i);
const ObColumnSchemaV2 *column_schema = table_schema->get_column_schema(col_desc.col_id_);
if (OB_ISNULL(column_schema)) {
ObColumnIterByPrevNextID iter(*table_schema);
const ObColumnSchemaV2 *column_schema = NULL;
while (OB_SUCC(ret)) {
if (OB_FAIL(iter.next(column_schema))) {
if (OB_UNLIKELY(OB_ITER_END != ret)) {
LOG_WARN("fail to iterate all table columns", KR(ret));
} else {
ret = OB_SUCCESS;
break;
}
} else if (OB_ISNULL(column_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("The column is null", KR(ret));
} else if (!column_schema->is_hidden() && !column_schema->is_invisible_column()) {
const int64_t idx = col_desc.col_id_ - OB_APP_MIN_COLUMN_ID;
if (OB_FAIL(column_idxs.push_back(idx))) {
LOG_WARN("fail to push back idx", KR(ret), K(idx));
}
} else if (column_schema->is_hidden()) {
// 不显示隐藏pk
} else if (OB_FAIL(column_ids.push_back(column_schema->get_column_id()))) {
LOG_WARN("fail to push back column id", KR(ret));
}
}
}
Expand Down Expand Up @@ -276,6 +265,42 @@ int ObTableLoadSchema::get_lob_meta_tid(
return ret;
}

int ObTableLoadSchema::check_has_invisible_column(const ObTableSchema *table_schema, bool &bret)
{
int ret = OB_SUCCESS;
bret = false;
for (ObTableSchema::const_column_iterator iter = table_schema->column_begin();
OB_SUCC(ret) && iter != table_schema->column_end(); ++iter) {
ObColumnSchemaV2 *column_schema = *iter;
if (OB_ISNULL(column_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("invalid column schema", K(column_schema));
} else if (column_schema->is_invisible_column()) {
bret = true;
break;
}
}
return ret;
}

int ObTableLoadSchema::check_has_unused_column(const ObTableSchema *table_schema, bool &bret)
{
int ret = OB_SUCCESS;
bret = false;
for (ObTableSchema::const_column_iterator iter = table_schema->column_begin();
OB_SUCC(ret) && iter != table_schema->column_end(); ++iter) {
ObColumnSchemaV2 *column_schema = *iter;
if (OB_ISNULL(column_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("invalid column schema", K(column_schema));
} else if (column_schema->is_unused()) {
bret = true;
break;
}
}
return ret;
}

ObTableLoadSchema::ObTableLoadSchema()
: allocator_("TLD_Schema"),
is_partitioned_table_(false),
Expand Down
14 changes: 7 additions & 7 deletions src/observer/table_load/ob_table_load_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,18 @@ class ObTableLoadSchema
static int get_table_schema(uint64_t tenant_id, uint64_t table_id,
share::schema::ObSchemaGetterGuard &schema_guard,
const share::schema::ObTableSchema *&table_schema);
static int get_column_names(const share::schema::ObTableSchema *table_schema,
common::ObIAllocator &allocator,
common::ObIArray<common::ObString> &column_names);
static int get_column_idxs(uint64_t tenant_id, uint64_t table_id,
common::ObIArray<int64_t> &column_idxs);
static int get_column_idxs(const share::schema::ObTableSchema *table_schema,
common::ObIArray<int64_t> &column_idxs);
static int get_user_column_names(const share::schema::ObTableSchema *table_schema,
common::ObIAllocator &allocator,
common::ObIArray<common::ObString> &column_names);
static int get_user_column_ids(const share::schema::ObTableSchema *table_schema,
common::ObIArray<int64_t> &column_ids);
static int check_has_udt_column(const share::schema::ObTableSchema *table_schema, bool &bret);
static int get_tenant_optimizer_gather_stats_on_load(const uint64_t tenant_id, bool &value);
static int get_lob_meta_tid(const uint64_t tenant_id,
const uint64_t data_table_id,
uint64_t &lob_meta_table_id);
static int check_has_invisible_column(const share::schema::ObTableSchema *table_schema, bool &bret);
static int check_has_unused_column(const share::schema::ObTableSchema *table_schema, bool &bret);
public:
ObTableLoadSchema();
~ObTableLoadSchema();
Expand Down
18 changes: 18 additions & 0 deletions src/observer/table_load/ob_table_load_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,8 @@ int ObTableLoadService::check_support_direct_load(
bool has_udt_column = false;
bool has_fts_index = false;
bool has_multivalue_index = false;
bool has_invisible_column = false;
bool has_unused_column = false;
if (OB_FAIL(
ObTableLoadSchema::get_table_schema(tenant_id, table_id, schema_guard, table_schema))) {
LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(table_id));
Expand Down Expand Up @@ -507,6 +509,22 @@ int ObTableLoadService::check_support_direct_load(
LOG_WARN("direct-load does not support table has udt column", KR(ret));
FORWARD_USER_ERROR_MSG(ret, "direct-load does not support table has udt column");
}
// check has invisible column
else if (OB_FAIL(ObTableLoadSchema::check_has_invisible_column(table_schema, has_invisible_column))) {
LOG_WARN("fail to check has invisible column", KR(ret));
} else if (has_invisible_column) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("direct-load does not support table has invisible column", KR(ret));
FORWARD_USER_ERROR_MSG(ret, "direct-load does not support table has invisible column");
}
// check has unused column
else if (OB_FAIL(ObTableLoadSchema::check_has_unused_column(table_schema, has_unused_column))) {
LOG_WARN("fail to check has unused column", KR(ret));
} else if (has_unused_column) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("direct-load does not support table has unused column", KR(ret));
FORWARD_USER_ERROR_MSG(ret, "direct-load does not support table has unused column");
}
// check if table has mlog
else if (table_schema->has_mlog_table()) {
ret = OB_NOT_SUPPORTED;
Expand Down
13 changes: 11 additions & 2 deletions src/share/table/ob_table_load_row.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,20 @@ template<class T>
int ObTableLoadRow<T>::project(const ObIArray<int64_t> &idx_projector, ObTableLoadRow<T> &projected_row) const
{
int ret = OB_SUCCESS;
if (OB_FAIL(projected_row.init(count_, allocator_handle_))) {
if (OB_UNLIKELY(idx_projector.count() != count_)) {
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "unexpected count", KR(ret), K(idx_projector), K(count_));
} else if (OB_FAIL(projected_row.init(count_, allocator_handle_))) {
OB_LOG(WARN, "failed to alloate cells", KR(ret), K(projected_row.count_));
} else {
for (int64_t j = 0; j < count_; ++j) {
projected_row.cells_[j] = cells_[idx_projector.at(j)];
const int64_t idx = idx_projector.at(j);
if (OB_UNLIKELY(idx < 0 || idx >= count_)) {
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "unexpected idx", KR(ret), K(j), K(idx), K(idx_projector));
} else {
projected_row.cells_[j] = cells_[idx];
}
}
projected_row.seq_no_ = seq_no_;
}
Expand Down

0 comments on commit 64f5736

Please sign in to comment.