Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[branch-1.2](schema_change)Fix the coredump when doubly write during … #32814

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_columns_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ std::string SchemaColumnsScanner::to_mysql_data_type_string(TColumnDesc& desc) {
}
case TPrimitiveType::JSONB: {
return "json";
}
}
case TPrimitiveType::MAP: {
return "map";
}
Expand Down
40 changes: 24 additions & 16 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,33 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
_db_id = pschema.db_id();
_table_id = pschema.table_id();
_version = pschema.version();
std::map<std::string, SlotDescriptor*> slots_map;
std::map<std::pair<std::string, FieldType>, SlotDescriptor*> slots_map;
_tuple_desc = _obj_pool.add(new TupleDescriptor(pschema.tuple_desc()));

for (auto& p_slot_desc : pschema.slot_descs()) {
auto slot_desc = _obj_pool.add(new SlotDescriptor(p_slot_desc));
_tuple_desc->add_slot(slot_desc);
slots_map.emplace(slot_desc->col_name(), slot_desc);
std::string data_type;
EnumToString(TPrimitiveType, to_thrift(slot_desc->col_type()), data_type);
slots_map.emplace(std::make_pair(slot_desc->col_name(),
TabletColumn::get_field_type_by_string(data_type)),
slot_desc);
}

for (auto& p_index : pschema.indexes()) {
auto index = _obj_pool.add(new OlapTableIndexSchema());
index->index_id = p_index.id();
index->schema_hash = p_index.schema_hash();
for (auto& col : p_index.columns()) {
auto it = slots_map.find(col);
for (auto& pcolumn_desc : p_index.columns_desc()) {
auto it = slots_map.find(
std::make_pair(pcolumn_desc.name(),
TabletColumn::get_field_type_by_string(pcolumn_desc.type())));
if (it == std::end(slots_map)) {
return Status::InternalError("unknown index column, column={}", col);
return Status::InternalError("unknown index column, column={}, type={}",
pcolumn_desc.name(), pcolumn_desc.type());
}
index->slots.emplace_back(it->second);
}
for (auto& pcolumn_desc : p_index.columns_desc()) {

TabletColumn* tc = _obj_pool.add(new TabletColumn());
tc->init_from_pb(pcolumn_desc);
index->columns.emplace_back(tc);
Expand All @@ -79,27 +85,29 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) {
_db_id = tschema.db_id;
_table_id = tschema.table_id;
_version = tschema.version;
std::map<std::string, SlotDescriptor*> slots_map;
std::map<std::pair<std::string, PrimitiveType>, SlotDescriptor*> slots_map;
_tuple_desc = _obj_pool.add(new TupleDescriptor(tschema.tuple_desc));
for (auto& t_slot_desc : tschema.slot_descs) {
auto slot_desc = _obj_pool.add(new SlotDescriptor(t_slot_desc));
_tuple_desc->add_slot(slot_desc);
slots_map.emplace(slot_desc->col_name(), slot_desc);
slots_map.emplace(std::make_pair(slot_desc->col_name(), slot_desc->col_type()), slot_desc);
}

for (auto& t_index : tschema.indexes) {
auto index = _obj_pool.add(new OlapTableIndexSchema());
index->index_id = t_index.id;
index->schema_hash = t_index.schema_hash;
for (auto& col : t_index.columns) {
auto it = slots_map.find(col);
if (it == std::end(slots_map)) {
return Status::InternalError("unknown index column, column={}", col);
}
index->slots.emplace_back(it->second);
}
if (t_index.__isset.columns_desc) {
for (auto& tcolumn_desc : t_index.columns_desc) {
auto it = slots_map.find(std::make_pair(
tcolumn_desc.column_name, thrift_to_type(tcolumn_desc.column_type.type)));
if (it == slots_map.end()) {
return Status::InternalError("unknown index column, column={}, type={}",
tcolumn_desc.column_name,
tcolumn_desc.column_type.type);
}

index->slots.emplace_back(it->second);
TabletColumn* tc = _obj_pool.add(new TabletColumn());
tc->init_from_thrift(tcolumn_desc);
index->columns.emplace_back(tc);
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/descriptors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "common/object_pool.h"
#include "gen_cpp/Descriptors_types.h"
#include "gen_cpp/descriptors.pb.h"
#include "runtime/primitive_type.h"
#include "util/string_util.h"
#include "vec/columns/column_nullable.h"
#include "vec/data_types/data_type_factory.hpp"
Expand Down Expand Up @@ -60,6 +61,7 @@ SlotDescriptor::SlotDescriptor(const TSlotDescriptor& tdesc)
_col_name(tdesc.colName),
_col_name_lower_case(to_lower(tdesc.colName)),
_col_unique_id(tdesc.col_unique_id),
_col_type(thrift_to_type(tdesc.primitive_type)),
_slot_idx(tdesc.slotIdx),
_slot_size(_type.get_slot_size()),
_field_idx(-1),
Expand All @@ -75,6 +77,7 @@ SlotDescriptor::SlotDescriptor(const PSlotDescriptor& pdesc)
_col_name(pdesc.col_name()),
_col_name_lower_case(to_lower(pdesc.col_name())),
_col_unique_id(-1),
_col_type(static_cast<PrimitiveType>(pdesc.col_type())),
_slot_idx(pdesc.slot_idx()),
_slot_size(_type.get_slot_size()),
_field_idx(-1),
Expand All @@ -92,6 +95,7 @@ void SlotDescriptor::to_protobuf(PSlotDescriptor* pslot) const {
pslot->set_col_name(_col_name);
pslot->set_slot_idx(_slot_idx);
pslot->set_is_materialized(_is_materialized);
pslot->set_col_type(_col_type);
}

vectorized::MutableColumnPtr SlotDescriptor::get_empty_mutable_column() const {
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/descriptors.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "gen_cpp/Descriptors_types.h" // for TTupleId
#include "gen_cpp/FrontendService_types.h" // for TTupleId
#include "gen_cpp/Types_types.h"
#include "runtime/define_primitive_type.h"
#include "runtime/types.h"
#include "vec/data_types/data_type.h"

Expand Down Expand Up @@ -115,6 +116,7 @@ class SlotDescriptor {
doris::vectorized::DataTypePtr get_data_type_ptr() const;

int32_t col_unique_id() const { return _col_unique_id; }
PrimitiveType col_type() const { return _col_type; }

private:
friend class DescriptorTbl;
Expand All @@ -132,6 +134,7 @@ class SlotDescriptor {
const std::string _col_name_lower_case;

const int32_t _col_unique_id;
const PrimitiveType _col_type;

// the idx of the slot in the tuple descriptor (0-based).
// this is provided by the FE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,11 +305,12 @@ public TSlotDescriptor toThrift() {

TSlotDescriptor tSlotDescriptor = new TSlotDescriptor(id.asInt(), parent.getId().asInt(),
(originType != null ? originType.toThrift() : type.toThrift()), -1, byteOffset, nullIndicatorByte,
nullIndicatorBit, ((column != null) ? column.getName() : ""), slotIdx, isMaterialized);
nullIndicatorBit, ((column != null) ? column.getNonShadowName() : ""), slotIdx, isMaterialized);

if (column != null) {
LOG.debug("column name:{}, column unique id:{}", column.getName(), column.getUniqueId());
LOG.debug("column name:{}, column unique id:{}", column.getNonShadowName(), column.getUniqueId());
tSlotDescriptor.setColUniqueId(column.getUniqueId());
tSlotDescriptor.setPrimitiveType(column.getDataType().toThrift());
}
return tSlotDescriptor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ public int getOlapColumnIndexSize() {

public TColumn toThrift() {
TColumn tColumn = new TColumn();
tColumn.setColumnName(this.name);
tColumn.setColumnName(removeNamePrefix(this.name));

TColumnType tColumnType = new TColumnType();
tColumnType.setType(this.getDataType().toThrift());
Expand Down
4 changes: 4 additions & 0 deletions gensrc/proto/descriptors.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ message PSlotDescriptor {
required string col_name = 8;
required int32 slot_idx = 9;
required bool is_materialized = 10;
optional int32 col_unique_id = 11;
optional bool is_key = 12;
optional bool is_auto_increment = 13;
optional int32 col_type = 14 [default = 0];
};

message PTupleDescriptor {
Expand Down
9 changes: 9 additions & 0 deletions gensrc/thrift/Descriptors.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ struct TSlotDescriptor {
9: required i32 slotIdx
10: required bool isMaterialized
11: optional i32 col_unique_id = -1
12: optional bool is_key = false
// If set to false, then such slots will be ignored during
// materialize them.Used to optmize to read less data and less memory usage
13: optional bool need_materialize = true
14: optional bool is_auto_increment = false;
// subcolumn path info list for semi structure column(variant)
15: optional list<string> column_paths
16: optional string col_default_value
17: optional Types.TPrimitiveType primitive_type = Types.TPrimitiveType.INVALID_TYPE
}

struct TTupleDescriptor {
Expand Down
24 changes: 24 additions & 0 deletions regression-test/suites/schema_change/ddl/lineorder_create.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
CREATE TABLE IF NOT EXISTS `lineorder` (
`lo_orderkey` bigint(20) NOT NULL COMMENT "",
`lo_linenumber` bigint(20) NOT NULL COMMENT "",
`lo_custkey` int(11) NOT NULL COMMENT "",
`lo_partkey` int(11) NOT NULL COMMENT "",
`lo_suppkey` int(11) NOT NULL COMMENT "",
`lo_orderdate` int(11) NOT NULL COMMENT "",
`lo_orderpriority` varchar(16) NOT NULL COMMENT "",
`lo_shippriority` int(11) NOT NULL COMMENT "",
`lo_quantity` bigint(20) NOT NULL COMMENT "",
`lo_extendedprice` bigint(20) NOT NULL COMMENT "",
`lo_ordtotalprice` bigint(20) NOT NULL COMMENT "",
`lo_discount` bigint(20) NOT NULL COMMENT "",
`lo_revenue` bigint(20) NOT NULL COMMENT "",
`lo_supplycost` bigint(20) NOT NULL COMMENT "",
`lo_tax` bigint(20) NOT NULL COMMENT "",
`lo_commitdate` bigint(20) NOT NULL COMMENT "",
`lo_shipmode` varchar(11) NOT NULL COMMENT ""
)
DUPLICATE KEY (`lo_orderkey`, `lo_linenumber`)
DISTRIBUTED BY HASH(`lo_orderkey`) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
);
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
drop table IF EXISTS lineorder;
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// Most of the cases are copied from https://github.com/trinodb/trino/tree/master
// /testing/trino-product-tests/src/main/resources/sql-tests/testcases
// and modified by Doris.

// Note: To filter out tables from sql files, use the following one-liner comamnd
// sed -nr 's/.*tables: (.*)$/\1/gp' /path/to/*.sql | sed -nr 's/,/\n/gp' | sort | uniq
suite("double_write_schema_change") {

// ssb_sf1_p1 is writted to test unique key table merge correctly.
// It creates unique key table and sets bucket num to 1 in order to make sure that
// many rowsets will be created during loading and then the merge process will be triggered.

def tableName = "lineorder"
def columns = """lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority,
lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount,
lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode,lo_dummy"""

sql new File("""${context.file.parent}/ddl/${tableName}_delete.sql""").text
sql new File("""${context.file.parent}/ddl/${tableName}_create.sql""").text

streamLoad {
// a default db 'regression_test' is specified in
// ${DORIS_HOME}/conf/regression-conf.groovy
table tableName

// default label is UUID:
// set 'label' UUID.randomUUID().toString()

// default column_separator is specify in doris fe config, usually is '\t'.
// this line change to ','
set 'column_separator', '|'
set 'compress_type', 'GZ'
set 'columns', columns


// relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
// also, you can stream load a http stream, e.g. http://xxx/some.csv
file """${getS3Url()}/regression/ssb/sf1/${tableName}.tbl.gz"""

time 10000 // limit inflight 10s

// stream load action will check result, include Success status, and NumberTotalRows == NumberLoadedRows

// if declared a check callback, the default check condition will ignore.
// So you must check all condition
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
}
}

def getJobState = { indexName ->
def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${indexName}' ORDER BY createtime DESC LIMIT 1 """
return jobStateResult[0][9]
}

def insert_sql = """ insert into ${tableName} values(100000000, 1, 1, 1, 1, 1, "1", 1, 1, 1, 1, 1, 1, 1, 1, 1, "1") """

sql """ ALTER TABLE ${tableName} modify COLUMN lo_custkey double"""
int max_try_time = 3000
while (max_try_time--){
String result = getJobState(tableName)
if (result == "FINISHED") {
sleep(3000)
break
} else {
if (result == "RUNNING") {
sql insert_sql
}
sleep(100)
if (max_try_time < 1){
assertEquals(1,2)
}
}
}
}
Loading