Skip to content

Commit

Permalink
Merge branch 'master' into fix/en-doc-list-not-null
Browse files Browse the repository at this point in the history
  • Loading branch information
sunny19930321 authored Jan 3, 2024
2 parents 464c039 + 358995e commit a295902
Show file tree
Hide file tree
Showing 73 changed files with 2,649 additions and 528 deletions.
2 changes: 1 addition & 1 deletion NOTICE.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Apache Doris
Copyright 2018-2023 The Apache Software Foundation
Copyright 2018-2024 The Apache Software Foundation

This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
Expand Down
2 changes: 1 addition & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1121,7 +1121,7 @@ DEFINE_Int32(group_commit_insert_threads, "10");
DEFINE_Int32(group_commit_memory_rows_for_max_filter_ratio, "10000");
DEFINE_Bool(wait_internal_group_commit_finish, "false");
// Max size(bytes) of group commit queues, used for mem back pressure, defult 64M.
DEFINE_Int32(group_commit_queue_mem_limit, "67108864");
DEFINE_mInt32(group_commit_queue_mem_limit, "67108864");
// Max size(bytes) or percentage(%) of wal disk usage, used for disk space back pressure, default 10% of the disk available space.
// group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10% can be automatically identified.
DEFINE_String(group_commit_wal_max_disk_limit, "10%");
Expand Down
2 changes: 1 addition & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1187,7 +1187,7 @@ DECLARE_mInt32(group_commit_insert_threads);
DECLARE_mInt32(group_commit_memory_rows_for_max_filter_ratio);
DECLARE_Bool(wait_internal_group_commit_finish);
// Max size(bytes) of group commit queues, used for mem back pressure.
DECLARE_Int32(group_commit_queue_mem_limit);
DECLARE_mInt32(group_commit_queue_mem_limit);
// Max size(bytes) or percentage(%) of wal disk usage, used for disk space back pressure, default 10% of the disk available space.
// group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10% can be automatically identified.
DECLARE_mString(group_commit_wal_max_disk_limit);
Expand Down
137 changes: 80 additions & 57 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
#include "olap/wrapper_field.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/runtime_state.h"
#include "util/debug_points.h"
#include "util/defer_op.h"
#include "util/trace.h"
#include "vec/aggregate_functions/aggregate_function.h"
Expand Down Expand Up @@ -711,6 +712,13 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
<< " res=" << res;
return res;
}
new_tablet->set_alter_failed(false);
Defer defer([&new_tablet] {
// if tablet state is not TABLET_RUNNING when return, indicates that alter has failed.
if (new_tablet->tablet_state() != TABLET_RUNNING) {
new_tablet->set_alter_failed(true);
}
});

LOG(INFO) << "finish to validate alter tablet request. begin to convert data from base tablet "
"to new tablet"
Expand Down Expand Up @@ -918,7 +926,8 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
std::lock_guard<std::shared_mutex> wrlock(_mutex);
_tablet_ids_in_converting.insert(new_tablet->tablet_id());
}
res = _convert_historical_rowsets(sc_params);
int64_t real_alter_version = 0;
res = _convert_historical_rowsets(sc_params, &real_alter_version);
{
std::lock_guard<std::shared_mutex> wrlock(_mutex);
_tablet_ids_in_converting.erase(new_tablet->tablet_id());
Expand All @@ -927,65 +936,12 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
break;
}

// For unique with merge-on-write table, should process delete bitmap here.
// 1. During double write, the newly imported rowsets does not calculate
// delete bitmap and publish successfully.
// 2. After conversion, calculate delete bitmap for the rowsets imported
// during double write. During this period, new data can still be imported
// witout calculating delete bitmap and publish successfully.
// 3. Block the new publish, calculate the delete bitmap of the
// incremental rowsets.
// 4. Switch the tablet status to TABLET_RUNNING. The newly imported
// data will calculate delete bitmap.
if (new_tablet->keys_type() == UNIQUE_KEYS &&
new_tablet->enable_unique_key_merge_on_write()) {
// step 2
int64_t max_version = new_tablet->max_version().second;
std::vector<RowsetSharedPtr> rowsets;
if (end_version < max_version) {
LOG(INFO)
<< "alter table for unique with merge-on-write, calculate delete bitmap of "
<< "double write rowsets for version: " << end_version + 1 << "-"
<< max_version;
RETURN_IF_ERROR(new_tablet->capture_consistent_rowsets(
{end_version + 1, max_version}, &rowsets));
}
for (auto rowset_ptr : rowsets) {
if (rowset_ptr->version().second <= end_version) {
continue;
}
std::lock_guard<std::mutex> rwlock(new_tablet->get_rowset_update_lock());
std::shared_lock<std::shared_mutex> wrlock(new_tablet->get_header_lock());
RETURN_IF_ERROR(new_tablet->update_delete_bitmap_without_lock(rowset_ptr));
}

// step 3
std::lock_guard<std::mutex> rwlock(new_tablet->get_rowset_update_lock());
std::lock_guard<std::shared_mutex> new_wlock(new_tablet->get_header_lock());
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
int64_t new_max_version = new_tablet->max_version_unlocked().second;
rowsets.clear();
if (max_version < new_max_version) {
LOG(INFO)
<< "alter table for unique with merge-on-write, calculate delete bitmap of "
<< "incremental rowsets for version: " << max_version + 1 << "-"
<< new_max_version;
RETURN_IF_ERROR(new_tablet->capture_consistent_rowsets(
{max_version + 1, new_max_version}, &rowsets));
}
for (auto rowset_ptr : rowsets) {
if (rowset_ptr->version().second <= max_version) {
continue;
}
RETURN_IF_ERROR(new_tablet->update_delete_bitmap_without_lock(rowset_ptr));
}

// step 4
res = new_tablet->set_tablet_state(TabletState::TABLET_RUNNING);
res = _calc_delete_bitmap_for_mow_table(new_tablet, real_alter_version);
if (!res) {
break;
}
new_tablet->save_meta();
} else {
// set state to ready
std::lock_guard<std::shared_mutex> new_wlock(new_tablet->get_header_lock());
Expand Down Expand Up @@ -1035,7 +991,10 @@ Status SchemaChangeHandler::_get_versions_to_be_changed(
return Status::OK();
}

Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams& sc_params) {
// The `real_alter_version` parameter indicates that the version of [0-real_alter_version] is
// converted from a base tablet, only used for the mow table now.
Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams& sc_params,
int64_t* real_alter_version) {
LOG(INFO) << "begin to convert historical rowsets for new_tablet from base_tablet."
<< " base_tablet=" << sc_params.base_tablet->tablet_id()
<< ", new_tablet=" << sc_params.new_tablet->tablet_id();
Expand Down Expand Up @@ -1146,7 +1105,7 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
<< "tablet=" << sc_params.new_tablet->tablet_id() << ", version='"
<< rs_reader->version().first << "-" << rs_reader->version().second;
StorageEngine::instance()->add_unused_rowset(new_rowset);
res = Status::OK();
return process_alter_exit();
} else if (!res) {
LOG(WARNING) << "failed to register new version. "
<< " tablet=" << sc_params.new_tablet->tablet_id()
Expand All @@ -1159,6 +1118,7 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
<< ", version=" << rs_reader->version().first << "-"
<< rs_reader->version().second;
}
*real_alter_version = rs_reader->version().second;

VLOG_TRACE << "succeed to convert a history version."
<< " version=" << rs_reader->version().first << "-"
Expand Down Expand Up @@ -1377,4 +1337,67 @@ Status SchemaChangeHandler::_validate_alter_result(TabletSharedPtr new_tablet,
return Status::OK();
}

// For unique with merge-on-write table, should process delete bitmap here.
// 1. During double write, the newly imported rowsets does not calculate
// delete bitmap and publish successfully.
// 2. After conversion, calculate delete bitmap for the rowsets imported
// during double write. During this period, new data can still be imported
// witout calculating delete bitmap and publish successfully.
// 3. Block the new publish, calculate the delete bitmap of the
// incremental rowsets.
// 4. Switch the tablet status to TABLET_RUNNING. The newly imported
// data will calculate delete bitmap.
Status SchemaChangeHandler::_calc_delete_bitmap_for_mow_table(TabletSharedPtr new_tablet,
int64_t alter_version) {
DBUG_EXECUTE_IF("SchemaChangeHandler._calc_delete_bitmap_for_mow_table.random_failed", {
if (rand() % 100 < (100 * dp->param("percent", 0.1))) {
LOG_WARNING("SchemaChangeHandler._calc_delete_bitmap_for_mow_table.random_failed");
return Status::InternalError("debug schema change calc delete bitmap random failed");
}
});

// can't do compaction when calc delete bitmap, if the rowset being calculated does
// a compaction, it may cause the delete bitmap to be missed.
std::lock_guard base_compaction_lock(new_tablet->get_base_compaction_lock());
std::lock_guard cumu_compaction_lock(new_tablet->get_cumulative_compaction_lock());

// step 2
int64_t max_version = new_tablet->max_version().second;
std::vector<RowsetSharedPtr> rowsets;
if (alter_version < max_version) {
LOG(INFO) << "alter table for unique with merge-on-write, calculate delete bitmap of "
<< "double write rowsets for version: " << alter_version + 1 << "-" << max_version
<< " new_tablet=" << new_tablet->tablet_id();
std::shared_lock<std::shared_mutex> rlock(new_tablet->get_header_lock());
RETURN_IF_ERROR(
new_tablet->capture_consistent_rowsets({alter_version + 1, max_version}, &rowsets));
}
for (auto rowset_ptr : rowsets) {
std::lock_guard<std::mutex> rwlock(new_tablet->get_rowset_update_lock());
std::shared_lock<std::shared_mutex> rlock(new_tablet->get_header_lock());
RETURN_IF_ERROR(new_tablet->update_delete_bitmap_without_lock(rowset_ptr));
}

// step 3
std::lock_guard<std::mutex> rwlock(new_tablet->get_rowset_update_lock());
std::lock_guard<std::shared_mutex> new_wlock(new_tablet->get_header_lock());
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
int64_t new_max_version = new_tablet->max_version_unlocked().second;
rowsets.clear();
if (max_version < new_max_version) {
LOG(INFO) << "alter table for unique with merge-on-write, calculate delete bitmap of "
<< "incremental rowsets for version: " << max_version + 1 << "-"
<< new_max_version << " new_tablet=" << new_tablet->tablet_id();
RETURN_IF_ERROR(new_tablet->capture_consistent_rowsets({max_version + 1, new_max_version},
&rowsets));
}
for (auto rowset_ptr : rowsets) {
RETURN_IF_ERROR(new_tablet->update_delete_bitmap_without_lock(rowset_ptr));
}
// step 4
RETURN_IF_ERROR(new_tablet->set_tablet_state(TabletState::TABLET_RUNNING));
new_tablet->save_meta();
return Status::OK();
}

} // namespace doris
6 changes: 5 additions & 1 deletion be/src/olap/schema_change.h
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,8 @@ class SchemaChangeHandler {
static Status _validate_alter_result(TabletSharedPtr new_tablet,
const TAlterTabletReqV2& request);

static Status _convert_historical_rowsets(const SchemaChangeParams& sc_params);
static Status _convert_historical_rowsets(const SchemaChangeParams& sc_params,
int64_t* real_alter_version);

static Status _parse_request(const SchemaChangeParams& sc_params, BlockChanger* changer,
bool* sc_sorting, bool* sc_directly);
Expand All @@ -286,6 +287,9 @@ class SchemaChangeHandler {
static Status _init_column_mapping(ColumnMapping* column_mapping,
const TabletColumn& column_schema, const std::string& value);

static Status _calc_delete_bitmap_for_mow_table(TabletSharedPtr new_tablet,
int64_t alter_version);

static std::shared_mutex _mutex;
static std::unordered_set<int64_t> _tablet_ids_in_converting;
static std::set<std::string> _supported_functions;
Expand Down
22 changes: 14 additions & 8 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1026,14 +1026,6 @@ bool Tablet::can_do_compaction(size_t path_hash, CompactionType compaction_type)
return false;
}

// unique key table with merge-on-write also cann't do cumulative compaction under alter
// process. It may cause the delete bitmap calculation error, such as two
// rowsets have same key.
if (tablet_state() != TABLET_RUNNING && keys_type() == UNIQUE_KEYS &&
enable_unique_key_merge_on_write()) {
return false;
}

if (data_dir()->path_hash() != path_hash || !is_used() || !init_succeeded()) {
return false;
}
Expand Down Expand Up @@ -1762,6 +1754,13 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info,
}
}

// There are two cases when tablet state is TABLET_NOTREADY
// case 1: tablet is doing schema change. Fe knows it's state, doing nothing.
// case 2: tablet has finished schema change, but failed. Fe will perform recovery.
if (tablet_state() == TABLET_NOTREADY && is_alter_failed()) {
tablet_info->__set_used(false);
}

if (tablet_state() == TABLET_SHUTDOWN) {
tablet_info->__set_used(false);
}
Expand Down Expand Up @@ -3287,6 +3286,13 @@ void Tablet::_rowset_ids_difference(const RowsetIdUnorderedSet& cur,

// The caller should hold _rowset_update_lock and _meta_lock lock.
Status Tablet::update_delete_bitmap_without_lock(const RowsetSharedPtr& rowset) {
DBUG_EXECUTE_IF("Tablet.update_delete_bitmap_without_lock.random_failed", {
if (rand() % 100 < (100 * dp->param("percent", 0.1))) {
LOG_WARNING("Tablet.update_delete_bitmap_without_lock.random_failed");
return Status::InternalError(
"debug tablet update delete bitmap without lock random failed");
}
});
int64_t cur_version = rowset->end_version();
std::vector<segment_v2::SegmentSharedPtr> segments;
RETURN_IF_ERROR(_load_rowset_segments(rowset, &segments));
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,8 @@ class Tablet final : public BaseTablet {
SegmentCacheHandle* segment_cache_handle,
std::unique_ptr<segment_v2::ColumnIterator>* column_iterator,
OlapReaderStatistics* stats);
void set_alter_failed(bool alter_failed) { _alter_failed = alter_failed; }
bool is_alter_failed() { return _alter_failed; }

private:
Status _init_once_action();
Expand Down Expand Up @@ -685,6 +687,8 @@ class Tablet final : public BaseTablet {
// may delete compaction input rowsets.
std::mutex _cold_compaction_lock;
int64_t _last_failed_follow_cooldown_time = 0;
// `_alter_failed` is used to indicate whether the tablet failed to perform a schema change
std::atomic<bool> _alter_failed = false;

DISALLOW_COPY_AND_ASSIGN(Tablet);

Expand Down
9 changes: 8 additions & 1 deletion be/src/olap/task/engine_clone_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,19 @@ Status EngineCloneTask::_do_clone() {
StorageEngine::instance()->tablet_manager()->get_tablet(_clone_req.tablet_id);

// The status of a tablet is not ready, indicating that it is a residual tablet after a schema
// change failure. It should not provide normal read and write, so drop it here.
// change failure. Clone a new tablet from remote be to overwrite it. This situation basically only
// occurs when the be_rebalancer_fuzzy_test configuration is enabled.
if (tablet && tablet->tablet_state() == TABLET_NOTREADY) {
LOG(WARNING) << "tablet state is not ready when clone, need to drop old tablet, tablet_id="
<< tablet->tablet_id();
// can not drop tablet when under clone. so unregister clone tablet firstly.
StorageEngine::instance()->tablet_manager()->unregister_clone_tablet(_clone_req.tablet_id);
RETURN_IF_ERROR(StorageEngine::instance()->tablet_manager()->drop_tablet(
tablet->tablet_id(), tablet->replica_id(), false));
if (!StorageEngine::instance()->tablet_manager()->register_clone_tablet(
_clone_req.tablet_id)) {
return Status::InternalError("tablet {} is under clone", _clone_req.tablet_id);
}
tablet.reset();
}
bool is_new_tablet = tablet == nullptr;
Expand Down
Loading

0 comments on commit a295902

Please sign in to comment.