diff --git a/CMakeLists.txt b/CMakeLists.txt index 02ffc8d408..bef122aa03 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -745,19 +745,4 @@ target_link_libraries(${PROJECT_NAME} ${LIBUNWIND_LIBRARY} ${JEMALLOC_LIBRARY}) -option(ENABLE_IPO "enable interprocedural optimization" ON) -if (ENABLE_IPO) - include(CheckIPOSupported) - check_ipo_supported(RESULT ipo_result OUTPUT ipo_output LANGUAGES CXX) - - if (ipo_result) - set_property(TARGET ${PROJECT_NAME} PROPERTY INTERPROCEDURAL_OPTIMIZATION TRUE) - if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang") - target_link_libraries(${PROJECT_NAME} PUBLIC "-fuse-ld=lld") - endif () - else () - message(WARNING "IPO is not supported: ${ipo_output}") - endif () -endif () - option(USE_SSL "Enable SSL support" OFF) diff --git a/include/pika_command.h b/include/pika_command.h index 677149a2e3..cd1cc2d5bc 100644 --- a/include/pika_command.h +++ b/include/pika_command.h @@ -412,6 +412,7 @@ class Cmd : public std::enable_shared_from_this { bool is_admin_require() const; bool is_single_partition() const; bool is_multi_partition() const; + bool is_classic_mode() const; bool HashtagIsConsistent(const std::string& lhs, const std::string& rhs) const; uint64_t GetDoDuration() const { return do_duration_; }; diff --git a/include/pika_conf.h b/include/pika_conf.h index 361b2a5094..800de4d016 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -141,6 +141,7 @@ class PikaConf : public pstd::BaseConf { std::shared_lock l(rwlock_); return user_blacklist_; } + bool classic_mode() { return classic_mode_.load(); } int databases() { std::shared_lock l(rwlock_); return databases_; diff --git a/src/pika_admin.cc b/src/pika_admin.cc index 2eed0a5e6f..ec260f6ec7 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -161,6 +161,10 @@ void DbSlaveofCmd::DoInitial() { res_.SetRes(CmdRes::kWrongNum, kCmdNameDbSlaveof); return; } + if (!g_pika_conf->classic_mode()) { + res_.SetRes(CmdRes::kErrOther, "DbSlaveof only support on classic mode"); + return; + } if (g_pika_server->role() ^ PIKA_ROLE_SLAVE || !g_pika_server->MetaSyncDone()) { res_.SetRes(CmdRes::kErrOther, "Not currently a slave"); return; @@ -414,9 +418,11 @@ void SelectCmd::DoInitial() { res_.SetRes(CmdRes::kInvalidIndex, kCmdNameSelect); return; } - if (index < 0 || index >= g_pika_conf->databases()) { - res_.SetRes(CmdRes::kInvalidIndex, kCmdNameSelect + " DB index is out of range"); - return; + if (g_pika_conf->classic_mode()) { + if (index < 0 || index >= g_pika_conf->databases()) { + res_.SetRes(CmdRes::kInvalidIndex, kCmdNameSelect + " DB index is out of range"); + return; + } } table_name_ = "db" + argv_[1]; if (!g_pika_server->IsTableExist(table_name_)) { @@ -922,6 +928,11 @@ void InfoCmd::InfoShardingReplication(std::string& info) { } void InfoCmd::InfoReplication(std::string& info) { + if (!g_pika_conf->classic_mode()) { + // In Sharding mode, show different replication info + InfoShardingReplication(info); + return; + } int host_role = g_pika_server->role(); std::stringstream tmp_stream; @@ -1347,16 +1358,16 @@ void ConfigCmd::ConfigGet(std::string& ret) { if (pstd::stringmatch(pattern.data(), "instance-mode", 1)) { elements += 2; EncodeString(&config_body, "instance-mode"); - EncodeString(&config_body, "classic"); + EncodeString(&config_body, (g_pika_conf->classic_mode() ? "classic" : "sharding")); } - if (pstd::stringmatch(pattern.data(), "databases", 1)) { + if (g_pika_conf->classic_mode() && pstd::stringmatch(pattern.data(), "databases", 1)) { elements += 2; EncodeString(&config_body, "databases"); EncodeInt32(&config_body, g_pika_conf->databases()); } - if (pstd::stringmatch(pattern.data(), "default-slot-num", 1)) { + if (!g_pika_conf->classic_mode() && pstd::stringmatch(pattern.data(), "default-slot-num", 1)) { elements += 2; EncodeString(&config_body, "default-slot-num"); EncodeInt32(&config_body, g_pika_conf->default_slot_num()); @@ -2329,7 +2340,9 @@ void HelloCmd::Do(std::shared_ptr partition) { }; // just for redis resp2 protocol fvs.push_back({"proto", "2"}); - fvs.push_back({"mode", "classic"}); + if (g_pika_conf->classic_mode()) { + fvs.push_back({"mode", "classic"}); + } int host_role = g_pika_server->role(); switch (host_role) { case PIKA_ROLE_SINGLE: diff --git a/src/pika_auxiliary_thread.cc b/src/pika_auxiliary_thread.cc index f8fc7effb7..d65de95903 100644 --- a/src/pika_auxiliary_thread.cc +++ b/src/pika_auxiliary_thread.cc @@ -21,9 +21,13 @@ PikaAuxiliaryThread::~PikaAuxiliaryThread() { void* PikaAuxiliaryThread::ThreadMain() { while (!should_stop()) { - if (g_pika_server->ShouldMetaSync()) { - g_pika_rm->SendMetaSyncRequest(); - } else if (g_pika_server->MetaSyncDone()) { + if (g_pika_conf->classic_mode()) { + if (g_pika_server->ShouldMetaSync()) { + g_pika_rm->SendMetaSyncRequest(); + } else if (g_pika_server->MetaSyncDone()) { + g_pika_rm->RunSyncSlavePartitionStateMachine(); + } + } else { g_pika_rm->RunSyncSlavePartitionStateMachine(); } diff --git a/src/pika_client_conn.cc b/src/pika_client_conn.cc index 89760e8bd8..20b970e282 100644 --- a/src/pika_client_conn.cc +++ b/src/pika_client_conn.cc @@ -161,7 +161,7 @@ void PikaClientConn::ProcessSlowlog(const PikaCmdArgsType& argv, uint64_t start_ void PikaClientConn::ProcessMonitor(const PikaCmdArgsType& argv) { std::string monitor_message; - std::string table_name = current_table_.substr(2); + std::string table_name = g_pika_conf->classic_mode() ? current_table_.substr(2) : current_table_; monitor_message = std::to_string(1.0 * pstd::NowMicros() / 1000000) + " [" + table_name + " " + this->ip_port() + "]"; for (PikaCmdArgsType::const_iterator iter = argv.begin(); iter != argv.end(); iter++) { monitor_message += " " + pstd::ToRead(*iter); diff --git a/src/pika_cmd_table_manager.cc b/src/pika_cmd_table_manager.cc index 2f31efed27..5a8da711e4 100644 --- a/src/pika_cmd_table_manager.cc +++ b/src/pika_cmd_table_manager.cc @@ -51,7 +51,11 @@ bool PikaCmdTableManager::CheckCurrentThreadDistributionMapExist(const std::thre void PikaCmdTableManager::InsertCurrentThreadDistributionMap() { auto tid = std::this_thread::get_id(); PikaDataDistribution* distribution = nullptr; - distribution = new HashModulo(); + if (g_pika_conf->classic_mode()) { + distribution = new HashModulo(); + } else { + distribution = new Crc32(); + } distribution->Init(); std::lock_guard l(map_protector_); thread_distribution_map_.emplace(tid, distribution); diff --git a/src/pika_command.cc b/src/pika_command.cc index f92d21ea8a..319fbceaf5 100644 --- a/src/pika_command.cc +++ b/src/pika_command.cc @@ -518,7 +518,7 @@ void Cmd::Execute() { ProcessFlushAllCmd(); } else if (name_ == kCmdNameInfo || name_ == kCmdNameConfig) { ProcessDoNotSpecifyPartitionCmd(); - } else if (is_single_partition()) { + } else if (is_single_partition() || g_pika_conf->classic_mode()) { ProcessSinglePartitionCmd(); } else if (is_multi_partition()) { ProcessMultiPartitionCmd(); @@ -578,8 +578,18 @@ void Cmd::ProcessFlushAllCmd() { void Cmd::ProcessSinglePartitionCmd() { std::shared_ptr partition; - //a table has only one partition - partition = g_pika_server->GetPartitionByDbName(table_name_); + if (g_pika_conf->classic_mode()) { + // in classic mode a table has only one partition + partition = g_pika_server->GetPartitionByDbName(table_name_); + } else { + std::vector cur_key = current_key(); + if (cur_key.empty()) { + res_.SetRes(CmdRes::kErrOther, "Internal Error"); + return; + } + // in sharding mode we select partition by key + partition = g_pika_server->GetTablePartitionByKey(table_name_, cur_key.front()); + } if (!partition) { res_.SetRes(CmdRes::kErrOther, "Partition not found"); @@ -612,7 +622,11 @@ void Cmd::InternalProcessCommand(std::shared_ptr partition, std::shared_ptr sync_partition, const HintKeys& hint_keys) { pstd::lock::MultiRecordLock record_lock(partition->LockMgr()); if (is_write()) { - record_lock.Lock(current_key()); + if (!hint_keys.empty() && is_multi_partition() && !g_pika_conf->classic_mode()) { + record_lock.Lock(hint_keys.keys); + } else { + record_lock.Lock(current_key()); + } } uint64_t start_us = 0; @@ -626,7 +640,11 @@ void Cmd::InternalProcessCommand(std::shared_ptr partition, DoBinlog(sync_partition); if (is_write()) { - record_lock.Unlock(current_key()); + if (!hint_keys.empty() && is_multi_partition() && !g_pika_conf->classic_mode()) { + record_lock.Unlock(hint_keys.keys); + } else { + record_lock.Unlock(current_key()); + } } } @@ -634,7 +652,12 @@ void Cmd::DoCommand(std::shared_ptr partition, const HintKeys& hint_k if (!is_suspend()) { partition->DbRWLockReader(); } - Do(partition); + + if (!hint_keys.empty() && is_multi_partition() && !g_pika_conf->classic_mode()) { + Split(partition, hint_keys); + } else { + Do(partition); + } if (!is_suspend()) { partition->DbRWUnLock(); @@ -733,9 +756,13 @@ bool Cmd::is_admin_require() const { return ((flag_ & kCmdFlagsMaskAdminRequire) bool Cmd::is_single_partition() const { return ((flag_ & kCmdFlagsMaskPartition) == kCmdFlagsSinglePartition); } bool Cmd::is_multi_partition() const { return ((flag_ & kCmdFlagsMaskPartition) == kCmdFlagsMultiPartition); } +bool Cmd::is_classic_mode() const { return g_pika_conf->classic_mode(); } + bool Cmd::HashtagIsConsistent(const std::string& lhs, const std::string& rhs) const { - if (GetHashkey(lhs) != GetHashkey(rhs)) { - return false; + if (is_classic_mode() == false) { + if (GetHashkey(lhs) != GetHashkey(rhs)) { + return false; + } } return true; } diff --git a/src/pika_conf.cc b/src/pika_conf.cc index 7475c18ada..cc1d25a9e4 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -240,14 +240,33 @@ int PikaConf::Load() { sync_thread_num_ = 24; } - GetConfInt("databases", &databases_); - if (databases_ < 1 || databases_ > 8) { - LOG(FATAL) << "config databases error, limit [1 ~ 8], the actual is: " << databases_; - } - for (int idx = 0; idx < databases_; ++idx) { - table_structs_.push_back({"db" + std::to_string(idx), 1, {0}}); + std::string instance_mode; + GetConfStr("instance-mode", &instance_mode); + classic_mode_.store(instance_mode.empty() || !strcasecmp(instance_mode.data(), "classic")); + + if (classic_mode_.load()) { + GetConfInt("databases", &databases_); + if (databases_ < 1 || databases_ > 8) { + LOG(FATAL) << "config databases error, limit [1 ~ 8], the actual is: " << databases_; + } + for (int idx = 0; idx < databases_; ++idx) { + table_structs_.push_back({"db" + std::to_string(idx), 1, {0}}); + } + } else { + GetConfInt("default-slot-num", &default_slot_num_); + if (default_slot_num_ <= 0) { + LOG(FATAL) << "config default-slot-num error," + << " it should greater than zero, the actual is: " << default_slot_num_; + } + std::string pika_meta_path = db_path_ + kPikaMeta; + if (!pstd::FileExists(pika_meta_path)) { + local_meta_->StableSave({{"db0", static_cast(default_slot_num_), {}}}); + } + Status s = local_meta_->ParseMeta(&table_structs_); + if (!s.ok()) { + LOG(FATAL) << "parse meta file error"; + } } - default_table_ = table_structs_[0].table_name; int tmp_replication_num = 0; @@ -266,7 +285,7 @@ int PikaConf::Load() { << " [0..." << replication_num_.load() << "]"; } consensus_level_.store(tmp_consensus_level); - if ((consensus_level_.load() != 0 || replication_num_.load() != 0)) { + if (classic_mode_.load() && (consensus_level_.load() != 0 || replication_num_.load() != 0)) { LOG(FATAL) << "consensus-level & replication-num only configurable under sharding mode," << " set it to be 0 if you are using classic mode"; } diff --git a/src/pika_consensus.cc b/src/pika_consensus.cc index 447c07ea20..0c09759faa 100644 --- a/src/pika_consensus.cc +++ b/src/pika_consensus.cc @@ -266,7 +266,8 @@ int MemLog::InternalFindLogByBinlogOffset(const LogOffset& offset) { ConsensusCoordinator::ConsensusCoordinator(const std::string& table_name, uint32_t partition_id) : table_name_(table_name), partition_id_(partition_id) { std::string table_log_path = g_pika_conf->log_path() + "log_" + table_name + "/"; - std::string log_path = table_log_path; + std::string log_path = + g_pika_conf->classic_mode() ? table_log_path : table_log_path + std::to_string(partition_id) + "/"; context_ = std::make_shared(log_path + kContext); stable_logger_ = std::make_shared(table_name, partition_id, log_path); mem_logger_ = std::make_shared(); diff --git a/src/pika_partition.cc b/src/pika_partition.cc index a0f5f9f0fd..f3d847fff9 100644 --- a/src/pika_partition.cc +++ b/src/pika_partition.cc @@ -36,19 +36,24 @@ std::string BgsaveSubPath(const std::string& table_name, uint32_t partition_id) return std::string(buf); } -std::string DbSyncPath(const std::string& sync_path, const std::string& table_name, const uint32_t partition_id) { +std::string DbSyncPath(const std::string& sync_path, const std::string& table_name, const uint32_t partition_id, + bool classic_mode) { char buf[256]; std::string partition_id_str = std::to_string(partition_id); - snprintf(buf, sizeof(buf), "%s/", table_name.data()); + if (classic_mode) { + snprintf(buf, sizeof(buf), "%s/", table_name.data()); + } else { + snprintf(buf, sizeof(buf), "%s/%s/", table_name.data(), partition_id_str.data()); + } return sync_path + buf; } Partition::Partition(const std::string& table_name, uint32_t partition_id, const std::string& table_db_path) : table_name_(table_name), partition_id_(partition_id), bgsave_engine_(nullptr) { - db_path_ = table_db_path; - bgsave_sub_path_ = table_name; - dbsync_path_ = DbSyncPath(g_pika_conf->db_sync_path(), table_name_, partition_id_); - partition_name_ = table_name ; + db_path_ = g_pika_conf->classic_mode() ? table_db_path : PartitionPath(table_db_path, partition_id_); + bgsave_sub_path_ = g_pika_conf->classic_mode() ? table_name : BgsaveSubPath(table_name_, partition_id_); + dbsync_path_ = DbSyncPath(g_pika_conf->db_sync_path(), table_name_, partition_id_, g_pika_conf->classic_mode()); + partition_name_ = g_pika_conf->classic_mode() ? table_name : PartitionName(table_name_, partition_id_); db_ = std::shared_ptr(new storage::Storage()); rocksdb::Status s = db_->Open(g_pika_server->storage_options(), db_path_); diff --git a/src/pika_repl_bgworker.cc b/src/pika_repl_bgworker.cc index ac44d8b39a..9101d754bd 100644 --- a/src/pika_repl_bgworker.cc +++ b/src/pika_repl_bgworker.cc @@ -142,7 +142,7 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) { const InnerMessage::InnerResponse::BinlogSync& binlog_res = res->binlog_sync((*index)[i]); // if pika are not current a slave or partition not in // BinlogSync state, we drop remain write binlog task - if ((!(g_pika_server->role() & PIKA_ROLE_SLAVE)) || + if ((g_pika_conf->classic_mode() && !(g_pika_server->role() & PIKA_ROLE_SLAVE)) || ((slave_partition->State() != ReplState::kConnected) && (slave_partition->State() != ReplState::kWaitDBSync))) { return; } @@ -209,7 +209,7 @@ int PikaReplBgWorker::HandleWriteBinlog(net::RedisParser* parser, const net::Red // Monitor related std::string monitor_message; if (g_pika_server->HasMonitorClients()) { - std::string table_name = worker->table_name_.substr(2); + std::string table_name = g_pika_conf->classic_mode() ? worker->table_name_.substr(2) : worker->table_name_; std::string monitor_message = std::to_string(1.0 * pstd::NowMicros() / 1000000) + " [" + table_name + " " + worker->ip_port_ + "]"; for (const auto& item : argv) { diff --git a/src/pika_repl_client_conn.cc b/src/pika_repl_client_conn.cc index a399f7a539..3ebb5d9eb5 100644 --- a/src/pika_repl_client_conn.cc +++ b/src/pika_repl_client_conn.cc @@ -108,6 +108,14 @@ void PikaReplClientConn::HandleMetaSyncResponse(void* arg) { } const InnerMessage::InnerResponse_MetaSync meta_sync = response->meta_sync(); + if (g_pika_conf->classic_mode() != meta_sync.classic_mode()) { + LOG(WARNING) << "Self in " << (g_pika_conf->classic_mode() ? "classic" : "sharding") << " mode, but master in " + << (meta_sync.classic_mode() ? "classic" : "sharding") + << " mode, failed to establish master-slave relationship"; + g_pika_server->SyncError(); + conn->NotifyClose(); + return; + } std::vector master_table_structs; for (int idx = 0; idx < meta_sync.tables_info_size(); ++idx) { diff --git a/src/pika_repl_server_conn.cc b/src/pika_repl_server_conn.cc index 0a49ad8c19..f7b57b8418 100644 --- a/src/pika_repl_server_conn.cc +++ b/src/pika_repl_server_conn.cc @@ -46,6 +46,7 @@ void PikaReplServerConn::HandleMetaSyncRequest(void* arg) { g_pika_server->BecomeMaster(); response.set_code(InnerMessage::kOk); InnerMessage::InnerResponse_MetaSync* meta_sync = response.mutable_meta_sync(); + meta_sync->set_classic_mode(g_pika_conf->classic_mode()); for (const auto& table_struct : table_structs) { InnerMessage::InnerResponse_MetaSync_TableInfo* table_info = meta_sync->add_tables_info(); table_info->set_table_name(table_struct.table_name); diff --git a/src/pika_rm.cc b/src/pika_rm.cc index 448e671a27..b3f7871c20 100644 --- a/src/pika_rm.cc +++ b/src/pika_rm.cc @@ -28,7 +28,11 @@ SyncPartition::SyncPartition(const std::string& table_name, uint32_t partition_i : partition_info_(table_name, partition_id) {} std::string SyncPartition::PartitionName() { - return partition_info_.table_name_; + if (g_pika_conf->classic_mode()) { + return partition_info_.table_name_; + } else { + return partition_info_.ToString(); + } } /* SyncMasterPartition*/ diff --git a/src/pika_server.cc b/src/pika_server.cc index 58a01486b1..31055dcee2 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -248,6 +248,23 @@ bool PikaServer::readonly(const std::string& table_name, const std::string& key) if ((role_ & PIKA_ROLE_SLAVE) && g_pika_conf->slave_read_only()) { return true; } + if (!g_pika_conf->classic_mode()) { + std::shared_ptr table = GetTable(table_name); + if (table == nullptr) { + // swallow this error will process later + return false; + } + uint32_t index = g_pika_cmd_table_manager->DistributeKey(key, table->PartitionNum()); + int role = 0; + Status s = g_pika_rm->CheckPartitionRole(table_name, index, &role); + if (!s.ok()) { + // swallow this error will process later + return false; + } + if (role & PIKA_ROLE_SLAVE) { + return true; + } + } return false; } @@ -441,7 +458,14 @@ bool PikaServer::IsCommandSupport(const std::string& command) { return res; } } - return true; + + if (g_pika_conf->classic_mode()) { + return true; + } else { + std::string cmd = command; + pstd::StringToLower(cmd); + return !ShardingModeNotSupportCommands.count(cmd); + } } bool PikaServer::IsTableBinlogIoError(const std::string& table_name) { @@ -967,7 +991,7 @@ void PikaServer::DbSyncSendFile(const std::string& ip, int port, const std::stri } std::string local_path, target_path; - std::string remote_path = table_name; + std::string remote_path = g_pika_conf->classic_mode() ? table_name : table_name + "/" + std::to_string(partition_id); std::vector::const_iterator iter = descendant.begin(); pstd::RsyncRemote remote(ip, port, kDBSyncModule, g_pika_conf->db_sync_speed() * 1024); std::string secret_file_path = g_pika_conf->db_sync_path();