diff --git a/.github/workflows/operator.yml b/.github/workflows/operator.yml index 0a2e8eb3d1..fc8045872b 100644 --- a/.github/workflows/operator.yml +++ b/.github/workflows/operator.yml @@ -29,7 +29,7 @@ jobs: - name: Build run: | - cd tools/pika_operator && make + cd tools/pika_operator && make - name: Unit Test run: | diff --git a/CMakeLists.txt b/CMakeLists.txt index 373bfe4304..4a2fb71bfc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -735,7 +735,7 @@ set(PIKA_BUILD_VERSION_CC ${CMAKE_BINARY_DIR}/pika_build_version.cc) message("PIKA_BUILD_VERSION_CC : " ${PIKA_BUILD_VERSION_CC}) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/src/build_version.cc.in ${PIKA_BUILD_VERSION_CC} @ONLY) -set(PROTO_FILES ${CMAKE_CURRENT_SOURCE_DIR}/src/pika_inner_message.proto) +set(PROTO_FILES ${CMAKE_CURRENT_SOURCE_DIR}/src/pika_inner_message.proto ${CMAKE_CURRENT_SOURCE_DIR}/src/rsync_service.proto) custom_protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS ${PROTO_FILES}) message("pika PROTO_SRCS = ${PROTO_SRCS}") message("pika PROTO_HDRS = ${PROTO_HDRS}") diff --git a/include/pika_define.h b/include/pika_define.h index a159a4e8cc..fb3a1fed7a 100644 --- a/include/pika_define.h +++ b/include/pika_define.h @@ -34,7 +34,8 @@ class PikaServer; /* Port shift */ const int kPortShiftRSync = 1000; const int kPortShiftReplServer = 2000; - +//TODO: Temporarily used for rsync server port shift. will be deleted. +const int kPortShiftRsync2 = 10001; const std::string kPikaPidFile = "pika.pid"; const std::string kPikaSecretFile = "rsync.secret"; const std::string kDefaultRsyncAuth = "default"; diff --git a/include/pika_rm.h b/include/pika_rm.h index 7c50bbe05d..50d5f4af96 100644 --- a/include/pika_rm.h +++ b/include/pika_rm.h @@ -21,6 +21,7 @@ #include "include/pika_repl_server.h" #include "include/pika_slave_node.h" #include "include/pika_stable_log.h" +#include "include/rsync_client.h" #define kBinlogSendPacketNum 40 #define kBinlogSendBatchNum 100 @@ -157,7 +158,12 @@ class SyncSlaveSlot : public SyncSlot { std::string LocalIp(); + void ActivateRsync(); + + bool IsRsyncRunning() {return rsync_cli_->IsRunning();} + private: + std::unique_ptr rsync_cli_; pstd::Mutex slot_mu_; RmNode m_info_; ReplState repl_state_{kNoConnect}; diff --git a/include/pika_server.h b/include/pika_server.h index 3f79d3e3cd..e05513f2d2 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -36,6 +36,7 @@ #include "include/pika_repl_client.h" #include "include/pika_repl_server.h" #include "include/pika_rsync_service.h" +#include "include/rsync_server.h" #include "include/pika_statistic.h" #include "include/pika_slot_command.h" #include "include/pika_migrate_thread.h" @@ -261,6 +262,8 @@ class PikaServer : public pstd::noncopyable { /* * DBSync used */ + pstd::Status GetDumpUUID(const std::string& db_name, const uint32_t slot_id, std::string* snapshot_uuid); + pstd::Status GetDumpMeta(const std::string& db_name, const uint32_t slot_id, std::vector* files, std::string* snapshot_uuid); void DBSync(const std::string& ip, int port, const std::string& db_name, uint32_t slot_id); void TryDBSync(const std::string& ip, int port, const std::string& db_name, uint32_t slot_id, int32_t top); void DbSyncSendFile(const std::string& ip, int port, const std::string& db_name, uint32_t slot_id); @@ -587,6 +590,7 @@ class PikaServer : public pstd::noncopyable { * Rsync used */ std::unique_ptr pika_rsync_service_; + std::unique_ptr rsync_server_; /* * Pubsub used diff --git a/include/pika_slot.h b/include/pika_slot.h index 0aa2a49cf0..617805c468 100644 --- a/include/pika_slot.h +++ b/include/pika_slot.h @@ -76,6 +76,8 @@ class Slot : public std::enable_shared_from_this,public pstd::noncopyable bool IsBgSaving(); void BgSaveSlot(); BgSaveInfo bgsave_info(); + void GetBgSaveMetaData(std::vector* fileNames, std::string* snapshot_uuid); + pstd::Status GetBgSaveUUID(std::string* snapshot_uuid); // FlushDB & FlushSubDB use bool FlushDB(); @@ -93,6 +95,7 @@ class Slot : public std::enable_shared_from_this,public pstd::noncopyable private: std::string db_name_; uint32_t slot_id_ = 0; + std::string snapshot_uuid_; std::string db_path_; std::string bgsave_sub_path_; diff --git a/include/rsync_client.h b/include/rsync_client.h new file mode 100644 index 0000000000..45d9894e3e --- /dev/null +++ b/include/rsync_client.h @@ -0,0 +1,152 @@ +// Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef RSYNC_CLIENT_H_ +#define RSYNC_CLIENT_H_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "net/include/bg_thread.h" +#include "net/include/net_cli.h" +#include "pstd/include/env.h" +#include "pstd/include/pstd_status.h" +#include "pstd/include/pstd_hash.h" +#include "pstd/include/pstd_string.h" +#include "pstd/include/pstd_status.h" +#include "include/rsync_client_thread.h" +#include "include/throttle.h" +#include "rsync_service.pb.h" + +const std::string kDumpMetaFileName = "DUMP_META_DATA"; +const std::string kUuidPrefix = "snapshot-uuid:"; + +namespace rsync { + +class RsyncWriter; +class Session; +class WaitObject; + +class RsyncClient : public net::Thread { +public: + enum State { + IDLE, + RUNNING, + STOP, + }; + RsyncClient(const std::string& dir, const std::string& db_name, const uint32_t slot_id); + void* ThreadMain() override; + bool Init(); + Status Start(); + Status Stop(); + bool IsRunning() { + return state_.load() == RUNNING; + } + bool IsIdle() { return state_.load() == IDLE;} + void OnReceive(RsyncService::RsyncResponse* resp); + +private: + bool Recover(); + Status Wait(RsyncService::RsyncResponse*& resp); + Status CopyRemoteFile(const std::string& filename); + Status CopyRemoteMeta(std::string* snapshot_uuid, std::set* file_set); + Status LoadLocalMeta(std::string* snapshot_uuid, std::map* file_map); + std::string GetLocalMetaFilePath(); + Status FlushMetaTable(); + Status CleanUpExpiredFiles(bool need_reset_path, const std::set& files); + Status UpdateLocalMeta(const std::string& snapshot_uuid, const std::set& expired_files, std::map* localFileMap); + void HandleRsyncMetaResponse(RsyncService::RsyncResponse* response); + +private: + std::map meta_table_; + int flush_period_ = 10; + std::set file_set_; + std::string snapshot_uuid_; + std::string dir_; + std::string db_name_; + uint32_t slot_id_ = 0; + std::unique_ptr client_thread_; + std::atomic state_; + int max_retries_ = 10; + std::unique_ptr wo_; + std::condition_variable cond_; + std::mutex mu_; + std::unique_ptr throttle_; + std::string master_ip_; + int master_port_; +}; + +class RsyncWriter { +public: + RsyncWriter(const std::string& filepath) { + filepath_ = filepath; + fd_ = open(filepath.c_str(), O_RDWR | O_APPEND | O_CREAT, 0644); + } + ~RsyncWriter() {} + Status Write(uint64_t offset, size_t n, const char* data) { + const char* ptr = data; + size_t left = n; + Status s; + while (left != 0) { + ssize_t done = write(fd_, ptr, left); + if (done < 0) { + if (errno == EINTR) { + continue; + } + LOG(WARNING) << "pwrite failed, filename: " << filepath_ << "errno: " << strerror(errno) << "n: " << n; + return Status::IOError(filepath_, "pwrite failed"); + } + left -= done; + ptr += done; + offset += done; + } + return Status::OK(); + } + Status Close() { + close(fd_); + return Status::OK(); + } + Status Fsync() { + fsync(fd_); + return Status::OK(); + } + +private: + std::string filepath_; + int fd_ = -1; +}; + +class WaitObject { +public: + WaitObject() : filename_(""), type_(RsyncService::kRsyncMeta), offset_(0), resp_(nullptr) {} + ~WaitObject() {} + void Reset(const std::string& filename, RsyncService::Type t, size_t offset) { + resp_ = nullptr; + filename_ = filename; + type_ = t; + offset_ = offset; + } + void Reset(RsyncService::Type t) { + resp_ = nullptr; + filename_ = ""; + type_ = t; + offset_ = 0xFFFFFFFF; + } + std::string filename_; + RsyncService::Type type_; + size_t offset_ = 0xFFFFFFFF; + RsyncService::RsyncResponse* resp_ = nullptr; +}; + +} // end namespace rsync +#endif + diff --git a/include/rsync_client_thread.h b/include/rsync_client_thread.h new file mode 100644 index 0000000000..91d5cf383f --- /dev/null +++ b/include/rsync_client_thread.h @@ -0,0 +1,55 @@ +// Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef RSYNC_CLIENT_THREAD_H_ +#define RSYNC_CLIENT_THREAD_H_ + +#include "net/include/client_thread.h" +#include "net/include/net_conn.h" +#include "net/include/pb_conn.h" +#include "rsync_service.pb.h" + +using namespace pstd; +using namespace net; + +namespace rsync { + +class RsyncClientConn : public PbConn { +public: + RsyncClientConn(int fd, const std::string& ip_port, + net::Thread* thread, void* cb_handler, + NetMultiplexer* mpx); + ~RsyncClientConn() override; + int DealMessage() override; + +private: + void* cb_handler_ = nullptr; +}; + +class RsyncClientConnFactory : public ConnFactory { +public: + RsyncClientConnFactory(void* scheduler) : cb_handler_(scheduler) {} + std::shared_ptr NewNetConn(int connfd, const std::string& ip_port, + net::Thread* thread, void* cb_handler, + net::NetMultiplexer* net) const override { + return std::static_pointer_cast( + std::make_shared(connfd, ip_port, thread, cb_handler_, net)); + } +private: + void* cb_handler_ = nullptr; +}; + +class RsyncClientThread : public ClientThread { +public: + RsyncClientThread(int cron_interval, int keepalive_timeout, void* scheduler); + ~RsyncClientThread() override; +private: + RsyncClientConnFactory conn_factory_; + ClientHandle handle_; +}; + +} //end namespace rsync +#endif + diff --git a/include/rsync_server.h b/include/rsync_server.h new file mode 100644 index 0000000000..eef55fc661 --- /dev/null +++ b/include/rsync_server.h @@ -0,0 +1,91 @@ +// Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef RSYNC_SERVER_H_ +#define RSYNC_SERVER_H_ + +#include +#include + +#include "net/include/net_conn.h" +#include "net/include/net_thread.h" +#include "net/include/pb_conn.h" +#include "net/include/server_thread.h" +#include "net/include/thread_pool.h" +#include "net/src/holy_thread.h" +#include "net/src/net_multiplexer.h" +#include "pstd/include/env.h" +#include "pstd_hash.h" +#include "rsync_service.pb.h" + +namespace rsync { +struct RsyncServerTaskArg { + std::shared_ptr req; + std::shared_ptr conn; + RsyncServerTaskArg(std::shared_ptr _req, std::shared_ptr _conn) + : req(std::move(_req)), conn(std::move(_conn)) {} +}; +class RsyncReader; +class RsyncServerThread; + +class RsyncServer { +public: + RsyncServer(const std::set& ips, const int port); + ~RsyncServer(); + void Schedule(net::TaskFunc func, void* arg); + int Start(); + int Stop(); +private: + std::unique_ptr work_thread_; + std::unique_ptr rsync_server_thread_; +}; + +class RsyncServerConn : public net::PbConn { +public: + RsyncServerConn(int connfd, const std::string& ip_port, + net::Thread* thread, void* worker_specific_data, + net::NetMultiplexer* mpx); + virtual ~RsyncServerConn() override; + int DealMessage() override; + static void HandleMetaRsyncRequest(void* arg); + static void HandleFileRsyncRequest(void* arg); +private: + void* data_ = nullptr; +}; + +class RsyncServerThread : public net::HolyThread { +public: + RsyncServerThread(const std::set& ips, int port, int cron_internal, RsyncServer* arg); + ~RsyncServerThread(); + +private: + class RsyncServerConnFactory : public net::ConnFactory { + public: + explicit RsyncServerConnFactory(RsyncServer* sched) : scheduler_(sched) {} + + std::shared_ptr NewNetConn(int connfd, const std::string& ip_port, + net::Thread* thread, void* worker_specific_data, + net::NetMultiplexer* net) const override { + return std::static_pointer_cast( + std::make_shared(connfd, ip_port, thread, scheduler_, net)); + } + private: + RsyncServer* scheduler_ = nullptr; + }; + class RsyncServerHandle : public net::ServerHandle { + public: + void FdClosedHandle(int fd, const std::string& ip_port) const override; + void FdTimeoutHandle(int fd, const std::string& ip_port) const override; + bool AccessHandle(int fd, std::string& ip) const override; + void CronHandle() const override; + }; +private: + RsyncServerConnFactory conn_factory_; + RsyncServerHandle handle_; +}; + +} //end namespace rsync +#endif + diff --git a/include/throttle.h b/include/throttle.h new file mode 100644 index 0000000000..d80f43aaca --- /dev/null +++ b/include/throttle.h @@ -0,0 +1,36 @@ +// Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef THROTTLE_H_ +#define THROTTLE_H_ + +#include +#include "pstd/include/pstd_mutex.h" + +namespace rsync { +class Throttle { + public: + Throttle() {} + Throttle(size_t throttle_throughput_bytes, size_t check_cycle); + ~Throttle(); + size_t ThrottledByThroughput(size_t bytes); + void ReturnUnusedThroughput(size_t acquired, size_t consumed, size_t elaspe_time_us); + + private: + std::atomic throttle_throughput_bytes_ = 100 * 1024 * 1024; + // the num of tasks doing install_snapshot + std::atomic last_throughput_check_time_us_; + std::atomic cur_throughput_bytes_; + // user defined check cycles of throughput per second + size_t check_cycle_ = 10; + pstd::Mutex keys_mutex_; + size_t caculate_check_time_us_(int64_t current_time_us, int64_t check_cycle) { + size_t base_aligning_time_us = 1000 * 1000 / check_cycle; + return current_time_us / base_aligning_time_us * base_aligning_time_us; + } +}; +} // end namespace rsync +#endif + diff --git a/src/pika.cc b/src/pika.cc index 71ce55feea..911ee2c005 100644 --- a/src/pika.cc +++ b/src/pika.cc @@ -15,8 +15,8 @@ #include "include/pika_define.h" #include "include/pika_rm.h" #include "include/pika_server.h" -#include "include/pika_version.h" #include "include/pika_slot_command.h" +#include "include/pika_version.h" #include "pstd/include/env.h" #include "pstd/include/pstd_defer.h" diff --git a/src/pika_repl_server_conn.cc b/src/pika_repl_server_conn.cc index 09cf95ce00..4eafa500b5 100644 --- a/src/pika_repl_server_conn.cc +++ b/src/pika_repl_server_conn.cc @@ -344,6 +344,7 @@ void PikaReplServerConn::HandleDBSyncRequest(void* arg) { g_pika_server->TryDBSync(node.ip(), node.port() + kPortShiftRSync, db_name, slot_id, static_cast(slave_boffset.filenum())); + master_slot->ActivateSlaveDbSync(node.ip(), node.port()); std::string reply_str; if (!response.SerializeToString(&reply_str) || (conn->WriteResp(reply_str) != 0)) { diff --git a/src/pika_rm.cc b/src/pika_rm.cc index b46de79f2c..340b547624 100644 --- a/src/pika_rm.cc +++ b/src/pika_rm.cc @@ -537,7 +537,9 @@ Status SyncMasterSlot::ConsensusReset(const LogOffset& applied_offset) { return /* SyncSlaveSlot */ SyncSlaveSlot::SyncSlaveSlot(const std::string& db_name, uint32_t slot_id) - : SyncSlot(db_name, slot_id) { + : SyncSlot(db_name, slot_id) { + std::string dbsync_path = g_pika_conf->db_sync_path() + "/" + db_name; + rsync_cli_.reset(new rsync::RsyncClient(dbsync_path, db_name, slot_id)); m_info_.SetLastRecvTime(pstd::NowMicros()); } @@ -599,6 +601,7 @@ void SyncSlaveSlot::Deactivate() { std::lock_guard l(slot_mu_); m_info_ = RmNode(); repl_state_ = ReplState::kNoConnect; + rsync_cli_->Stop(); } std::string SyncSlaveSlot::ToStringStatus() { @@ -637,6 +640,16 @@ std::string SyncSlaveSlot::LocalIp() { return local_ip_; } +void SyncSlaveSlot::ActivateRsync() { + if (!rsync_cli_->IsIdle()) { + return; + } + LOG(WARNING) << "ActivateRsync ..."; + if (rsync_cli_->Init()) { + rsync_cli_->Start(); + } +} + /* PikaReplicaManger */ PikaReplicaManager::PikaReplicaManager() { @@ -1139,10 +1152,13 @@ Status PikaReplicaManager::RunSyncSlaveSlotStateMachine() { } else if (s_slot->State() == ReplState::kWaitReply) { continue; } else if (s_slot->State() == ReplState::kWaitDBSync) { + s_slot->ActivateRsync(); std::shared_ptr slot = g_pika_server->GetDBSlotById(p_info.db_name_, p_info.slot_id_); if (slot) { - slot->TryUpdateMasterOffset(); + if (!s_slot->IsRsyncRunning()) { + slot->TryUpdateMasterOffset(); + } } else { LOG(WARNING) << "Slot not found, DB Name: " << p_info.db_name_ << " Slot Id: " << p_info.slot_id_; diff --git a/src/pika_server.cc b/src/pika_server.cc index efa1c38fa2..df28dda014 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -3,8 +3,6 @@ // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. -#include "include/pika_server.h" - #include #include #include @@ -25,9 +23,10 @@ #include "include/pika_cmd_table_manager.h" #include "include/pika_dispatch_thread.h" -#include "include/pika_rm.h" #include "include/pika_monotonic_time.h" #include "include/pika_instant.h" +#include "include/pika_server.h" +#include "include/pika_rm.h" using pstd::Status; extern PikaServer* g_pika_server; @@ -74,9 +73,12 @@ PikaServer::PikaServer() // We estimate the queue size int worker_queue_limit = g_pika_conf->maxclients() / worker_num_ + 100; LOG(INFO) << "Worker queue limit is " << worker_queue_limit; + for_each(ips.begin(), ips.end(), [](auto& ip) {LOG(WARNING) << ip;}); pika_dispatch_thread_ = std::make_unique(ips, port_, worker_num_, 3000, worker_queue_limit, g_pika_conf->max_conn_rbuf_size()); pika_rsync_service_ = std::make_unique(g_pika_conf->db_sync_path(), g_pika_conf->port() + kPortShiftRSync); + //TODO: remove pika_rsync_service_,reuse pika_rsync_service_ port + rsync_server_ = std::make_unique(ips, port_ + kPortShiftRsync2); pika_pubsub_thread_ = std::make_unique(); pika_auxiliary_thread_ = std::make_unique(); pika_migrate_ = std::make_unique(); @@ -88,6 +90,7 @@ PikaServer::PikaServer() } PikaServer::~PikaServer() { + rsync_server_->Stop(); // DispatchThread will use queue of worker thread, // so we need to delete dispatch before worker. pika_client_processor_->Stop(); @@ -134,12 +137,15 @@ bool PikaServer::ServerInit() { void PikaServer::Start() { int ret = 0; // start rsync first, rocksdb opened fd will not appear in this fork + // TODO: temporarily disable rsync server + /* ret = pika_rsync_service_->StartRsync(); if (0 != ret) { dbs_.clear(); LOG(FATAL) << "Start Rsync Error: bind port " + std::to_string(pika_rsync_service_->ListenPort()) + " failed" << ", Listen on this port to receive Master FullSync Data"; } + */ // We Init DB Struct Before Start The following thread InitDBStruct(); @@ -172,6 +178,8 @@ void PikaServer::Start() { time(&start_time_s_); + std::string master_run_id = g_pika_conf->master_run_id(); + set_master_run_id(master_run_id); std::string slaveof = g_pika_conf->slaveof(); if (!slaveof.empty()) { auto sep = static_cast(slaveof.find(':')); @@ -190,8 +198,7 @@ void PikaServer::Start() { cmdstat_map->emplace(iter.first, statistics); } LOG(INFO) << "Pika Server going to start"; - - + rsync_server_->Start(); while (!exit_) { DoTimingTask(); // wake up every 5 seconds @@ -917,6 +924,26 @@ void PikaServer::DBSync(const std::string& ip, int port, const std::string& db_n bgsave_thread_.Schedule(&DoDBSync, reinterpret_cast(arg)); } +pstd::Status PikaServer::GetDumpUUID(const std::string& db_name, const uint32_t slot_id, std::string* snapshot_uuid) { + std::shared_ptr slot = GetDBSlotById(db_name, slot_id); + if (!slot) { + LOG(WARNING) << "cannot find slot for db_name " << db_name << " slot_id: " << slot_id; + return pstd::Status::NotFound("slot no found"); + } + slot->GetBgSaveUUID(snapshot_uuid); + return pstd::Status::OK(); +} + +pstd::Status PikaServer::GetDumpMeta(const std::string& db_name, const uint32_t slot_id, std::vector* fileNames, std::string* snapshot_uuid) { + std::shared_ptr slot = GetDBSlotById(db_name, slot_id); + if (!slot) { + LOG(WARNING) << "cannot find slot for db_name " << db_name << " slot_id: " << slot_id; + return pstd::Status::NotFound("slot no found"); + } + slot->GetBgSaveMetaData(fileNames, snapshot_uuid); + return pstd::Status::OK(); +} + void PikaServer::TryDBSync(const std::string& ip, int port, const std::string& db_name, uint32_t slot_id, int32_t top) { std::shared_ptr slot = GetDBSlotById(db_name, slot_id); @@ -940,7 +967,8 @@ void PikaServer::TryDBSync(const std::string& ip, int port, const std::string& d // Need Bgsave first slot->BgSaveSlot(); } - DBSync(ip, port, db_name, slot_id); + //TODO: temporarily disable rsync server + //DBSync(ip, port, db_name, slot_id); } void PikaServer::DbSyncSendFile(const std::string& ip, int port, const std::string& db_name, uint32_t slot_id) { @@ -1301,7 +1329,8 @@ void PikaServer::DoTimingTask() { // Delete expired dump AutoDeleteExpiredDump(); // Cheek Rsync Status - AutoKeepAliveRSync(); + //TODO: temporarily disable rsync + //AutoKeepAliveRSync(); // Reset server qps ResetLastSecQuerynum(); // Auto update network instantaneous metric diff --git a/src/pika_slot.cc b/src/pika_slot.cc index 3d9671bf8d..72206f1216 100644 --- a/src/pika_slot.cc +++ b/src/pika_slot.cc @@ -3,16 +3,16 @@ // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. -#include "include/pika_slot.h" - #include #include #include "include/pika_conf.h" #include "include/pika_rm.h" #include "include/pika_server.h" +#include "include/pika_slot.h" #include "pstd/include/mutex_impl.h" +#include "pstd/include/pstd_hash.h" using pstd::Status; @@ -296,6 +296,52 @@ BgSaveInfo Slot::bgsave_info() { return bgsave_info_; } +void Slot::GetBgSaveMetaData(std::vector* fileNames, std::string* snapshot_uuid) { + const std::string slotPath = bgsave_info().path; + + std::string types[] = {storage::STRINGS_DB, storage::HASHES_DB, storage::LISTS_DB, storage::ZSETS_DB, storage::SETS_DB}; + for (const auto& type : types) { + std::string typePath = slotPath + ((slotPath.back() != '/') ? "/" : "") + type; + if (!pstd::FileExists(typePath)) { + continue ; + } + + std::vector tmpFileNames; + int ret = pstd::GetChildren(typePath, tmpFileNames); + if (ret) { + LOG(WARNING) << slotPath << " read dump meta files failed, path " << typePath; + return; + } + + for (const std::string fileName : tmpFileNames) { + fileNames -> push_back(type + "/" + fileName); + } + } + fileNames->push_back(kBgsaveInfoFile); + pstd::Status s = GetBgSaveUUID(snapshot_uuid); + if (!s.ok()) { + LOG(WARNING) << "read dump meta info failed! error:" << s.ToString(); + return; + } +} + +Status Slot::GetBgSaveUUID(std::string* snapshot_uuid) { + if (snapshot_uuid_.empty()) { + std::string info_data; + const std::string infoPath = bgsave_info().path + "/info"; + //TODO: using file read function to replace rocksdb::ReadFileToString + rocksdb::Status s = rocksdb::ReadFileToString(rocksdb::Env::Default(), infoPath, &info_data); + if (!s.ok()) { + LOG(WARNING) << "read dump meta info failed! error:" << s.ToString(); + return Status::IOError("read dump meta info failed", infoPath); + } + pstd::MD5 md5 = pstd::MD5(info_data); + snapshot_uuid_ = md5.hexdigest(); + } + *snapshot_uuid = snapshot_uuid_; + return Status::OK(); +} + void Slot::DoBgSave(void* arg) { std::unique_ptr bg_task_arg(static_cast(arg)); @@ -304,17 +350,20 @@ void Slot::DoBgSave(void* arg) { // Some output BgSaveInfo info = bg_task_arg->slot->bgsave_info(); + std::stringstream info_content; std::ofstream out; out.open(info.path + "/" + kBgsaveInfoFile, std::ios::in | std::ios::trunc); if (out.is_open()) { - out << (time(nullptr) - info.start_time) << "s\n" - << g_pika_server->host() << "\n" - << g_pika_server->port() << "\n" - << info.offset.b_offset.filenum << "\n" - << info.offset.b_offset.offset << "\n"; + info_content << (time(nullptr) - info.start_time) << "s\n" + << g_pika_server->host() << "\n" + << g_pika_server->port() << "\n" + << info.offset.b_offset.filenum << "\n" + << info.offset.b_offset.offset << "\n"; if (g_pika_conf->consensus_level() != 0) { - out << info.offset.l_offset.term << "\n" << info.offset.l_offset.index << "\n"; + info_content << info.offset.l_offset.term << "\n" << info.offset.l_offset.index << "\n"; } + bg_task_arg->slot->snapshot_uuid_ = md5(info_content.str()); + out << info_content.rdbuf(); out.close(); } if (!success) { @@ -322,7 +371,6 @@ void Slot::DoBgSave(void* arg) { pstd::RenameFile(info.path, fail_path); } bg_task_arg->slot->FinishBgsave(); - } bool Slot::RunBgsaveEngine() { diff --git a/src/pstd/include/pstd_hash.h b/src/pstd/include/pstd_hash.h index 7a23a4de19..54c12bc332 100644 --- a/src/pstd/include/pstd_hash.h +++ b/src/pstd/include/pstd_hash.h @@ -83,6 +83,57 @@ namespace pstd { std::string md5(const std::string& str, bool raw = false); std::string sha256(const std::string& input, bool raw = false); +// a small class for calculating MD5 hashes of strings or byte arrays +// it is not meant to be fast or secure +// +// usage: 1) feed it blocks of uchars with update() +// 2) finalize() +// 3) get hexdigest() string +// or +// MD5(std::string).hexdigest() +// +// assumes that char is 8 bit and int is 32 bit +class MD5 { + public: + using size_type = unsigned int; // must be 32bit + + MD5(); + MD5(const std::string& text); + void update(const unsigned char* input, size_type length); + void update(const char* input, size_type length); + MD5& finalize(); + std::string hexdigest() const; + std::string rawdigest() const; + friend std::ostream& operator<<(std::ostream& /*out*/, MD5 md5); + + private: + void init(); + using uint1 = unsigned char; // 8bit + using uint4 = unsigned int; // 32bit + enum { blocksize = 64 }; // VC6 won't eat a const static int here + + void transform(const uint1 block[blocksize]); + static void decode(uint4 output[], const uint1 input[], size_type len); + static void encode(uint1 output[], const uint4 input[], size_type len); + + bool finalized; + uint1 buffer[blocksize]; // bytes that didn't fit in last 64 byte chunk + uint4 count[2]; // 64bit counter for number of bits (lo, hi) + uint4 state[4]; // digest so far + uint1 digest[16]; // the result + + // low level logic operations + static inline uint4 F(uint4 x, uint4 y, uint4 z); + static inline uint4 G(uint4 x, uint4 y, uint4 z); + static inline uint4 H(uint4 x, uint4 y, uint4 z); + static inline uint4 I(uint4 x, uint4 y, uint4 z); + static inline uint4 rotate_left(uint4 x, int n); + static inline void FF(uint4& a, uint4 b, uint4 c, uint4 d, uint4 x, uint4 s, uint4 ac); + static inline void GG(uint4& a, uint4 b, uint4 c, uint4 d, uint4 x, uint4 s, uint4 ac); + static inline void HH(uint4& a, uint4 b, uint4 c, uint4 d, uint4 x, uint4 s, uint4 ac); + static inline void II(uint4& a, uint4 b, uint4 c, uint4 d, uint4 x, uint4 s, uint4 ac); +}; + } // namespace pstd #endif // __PSTD_HASH_H__ diff --git a/src/pstd/src/pstd_hash.cc b/src/pstd/src/pstd_hash.cc index eaed3e90fe..af9deed397 100644 --- a/src/pstd/src/pstd_hash.cc +++ b/src/pstd/src/pstd_hash.cc @@ -124,57 +124,6 @@ class SHA256 { ((uint32) * ((str) + 0) << 24); \ } -// a small class for calculating MD5 hashes of strings or byte arrays -// it is not meant to be fast or secure -// -// usage: 1) feed it blocks of uchars with update() -// 2) finalize() -// 3) get hexdigest() string -// or -// MD5(std::string).hexdigest() -// -// assumes that char is 8 bit and int is 32 bit -class MD5 { - public: - using size_type = unsigned int; // must be 32bit - - MD5(); - MD5(const std::string& text); - void update(const unsigned char* input, size_type length); - void update(const char* input, size_type length); - MD5& finalize(); - std::string hexdigest() const; - std::string rawdigest() const; - friend std::ostream& operator<<(std::ostream& /*out*/, MD5 md5); - - private: - void init(); - using uint1 = unsigned char; // 8bit - using uint4 = unsigned int; // 32bit - enum { blocksize = 64 }; // VC6 won't eat a const static int here - - void transform(const uint1 block[blocksize]); - static void decode(uint4 output[], const uint1 input[], size_type len); - static void encode(uint1 output[], const uint4 input[], size_type len); - - bool finalized; - uint1 buffer[blocksize]; // bytes that didn't fit in last 64 byte chunk - uint4 count[2]; // 64bit counter for number of bits (lo, hi) - uint4 state[4]; // digest so far - uint1 digest[16]; // the result - - // low level logic operations - static inline uint4 F(uint4 x, uint4 y, uint4 z); - static inline uint4 G(uint4 x, uint4 y, uint4 z); - static inline uint4 H(uint4 x, uint4 y, uint4 z); - static inline uint4 I(uint4 x, uint4 y, uint4 z); - static inline uint4 rotate_left(uint4 x, int n); - static inline void FF(uint4& a, uint4 b, uint4 c, uint4 d, uint4 x, uint4 s, uint4 ac); - static inline void GG(uint4& a, uint4 b, uint4 c, uint4 d, uint4 x, uint4 s, uint4 ac); - static inline void HH(uint4& a, uint4 b, uint4 c, uint4 d, uint4 x, uint4 s, uint4 ac); - static inline void II(uint4& a, uint4 b, uint4 c, uint4 d, uint4 x, uint4 s, uint4 ac); -}; - const unsigned int SHA256::sha256_k[64] = { // UL = uint32 0x428a2f98, 0x71374491, 0xb5c0fbcf, 0xe9b5dba5, 0x3956c25b, 0x59f111f1, 0x923f82a4, 0xab1c5ed5, 0xd807aa98, 0x12835b01, 0x243185be, 0x550c7dc3, 0x72be5d74, 0x80deb1fe, 0x9bdc06a7, 0xc19bf174, diff --git a/src/rsync_client.cc b/src/rsync_client.cc new file mode 100644 index 0000000000..0dd8863201 --- /dev/null +++ b/src/rsync_client.cc @@ -0,0 +1,473 @@ +// Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include +#include + +#include "rocksdb/env.h" +#include "pstd/include/pstd_defer.h" +#include "include/pika_server.h" +#include "include/rsync_client.h" + +using namespace net; +using namespace pstd; +using namespace RsyncService; + +extern PikaServer* g_pika_server; + +namespace rsync { +RsyncClient::RsyncClient(const std::string& dir, const std::string& db_name, const uint32_t slot_id) + : flush_period_(10), snapshot_uuid_(""), dir_(dir), db_name_(db_name), slot_id_(slot_id), + state_(IDLE), max_retries_(10), master_ip_(""), master_port_(0) { + client_thread_ = std::make_unique(10 * 1000, 60 * 1000, this); + wo_.reset(new WaitObject()); + throttle_.reset(new Throttle()); +} + +bool RsyncClient::Init() { + if (state_ != IDLE) { + LOG(WARNING) << "State should be IDLE when Init"; + return false; + } + master_ip_ = g_pika_server->master_ip(); + master_port_ = g_pika_server->master_port() + kPortShiftRsync2; + file_set_.clear(); + client_thread_->StartThread(); + bool ret = Recover(); + if (!ret) { + LOG(WARNING) << "RsyncClient recover failed..."; + client_thread_->StopThread(); + return false; + } + LOG(INFO) << "RsyncClient recover success..."; + return true; +} + +void* RsyncClient::ThreadMain() { + int cnt = 0; + int period = 0; + Status s = Status::OK(); + LOG(INFO) << "RsyncClient ThreadMain..."; + if (file_set_.empty()) { + state_.store(STOP); + return nullptr; + } + + std::string meta_file_path = GetLocalMetaFilePath(); + std::ofstream outfile; + outfile.open(meta_file_path, std::ios_base::app); // append instead of overwrite + std::string meta_rep; + + for (const auto& file : file_set_) { + LOG(INFO) << "CopyRemoteFile: " << file; + while (state_.load() == RUNNING) { + s = CopyRemoteFile(file); + if (!s.ok()) { + LOG(WARNING) << "rsync CopyRemoteFile failed, filename: " << file; + continue; + } + break; + } + if (state_.load(std::memory_order_relaxed) != RUNNING) { + break; + } + meta_rep.append(file + ":" + meta_table_[file]); + meta_rep.append("\n"); + if (++period == flush_period_) { + period = 0; + outfile << meta_rep; + outfile.flush(); + meta_rep.clear(); + } + } + if (!meta_rep.empty()) { + outfile << meta_rep; + outfile.flush(); + } + state_.store(STOP); + LOG(INFO) << "RsyncClient fetch copy remote files done..."; + return nullptr; +} + +void RsyncClient::OnReceive(RsyncResponse* resp) { + std::unique_lock lock(mu_); + if (resp->type() != wo_->type_) { + delete resp; + resp = nullptr; + return; + } + if (resp->type() == kRsyncFile && + (resp->file_resp().filename() != wo_->filename_ || resp->file_resp().offset() != wo_->offset_)) { + delete resp; + resp = nullptr; + return; + } + wo_->resp_ = resp; + cond_.notify_all(); +} + +Status RsyncClient::Wait(RsyncResponse*& resp) { + Status s = Status::Timeout("rsync timeout", "timeout"); + { + std::unique_lock lock(mu_); + auto cv_s = cond_.wait_for(lock, std::chrono::seconds(3), [this] { + return this->wo_->resp_ != nullptr; + }); + if (!cv_s) { + return s; + } + resp = wo_->resp_; + s = Status::OK(); + } + return s; +} + +Status RsyncClient::CopyRemoteFile(const std::string& filename) { + Status s; + int retries = 0; + size_t offset = 0; + size_t copy_file_begin_time = pstd::NowMicros(); + size_t count = throttle_->ThrottledByThroughput(4 * 1024 * 1024); + MD5 md5; + std::unique_ptr writer(new RsyncWriter(dir_ + "/" + filename)); + DEFER { + if (writer) { + writer->Close(); + writer.reset(); + } + if (!s.ok()) { + DeleteFile(filename); + } + }; + + while (retries < max_retries_) { + RsyncRequest request; + request.set_type(kRsyncFile); + request.set_db_name(db_name_); + request.set_slot_id(slot_id_); + FileRequest* file_req = request.mutable_file_req(); + file_req->set_filename(filename); + file_req->set_offset(offset); + file_req->set_count(count); + std::string to_send; + request.SerializeToString(&to_send); + + s = client_thread_->Write(master_ip_, master_port_, to_send); + if (!s.ok()) { + LOG(WARNING) << "send rsync request failed"; + continue; + } + + { + std::lock_guard lock(mu_); + wo_->Reset(filename, kRsyncFile, offset); + } + + RsyncResponse* resp = nullptr; + s = Wait(resp); + if (s.IsTimeout() || resp == nullptr) { + LOG(WARNING) << "rsync request timeout"; + retries++; + continue; + } + + size_t copy_file_end_time = pstd::NowMicros(); + size_t elaspe_time_us = copy_file_end_time - copy_file_begin_time; + throttle_->ReturnUnusedThroughput(count, resp->file_resp().count(), elaspe_time_us); + + if (resp->code() != RsyncService::kOk) { + //TODO: handle different error + delete resp; + continue; + } + + if (resp->snapshot_uuid() != snapshot_uuid_) { + LOG(WARNING) << "receive newer dump, reset state to STOP, local_snapshot_uuid:" + << snapshot_uuid_ << "remote snapshot uuid: " << resp->snapshot_uuid(); + state_.store(STOP); + delete resp; + return s; + } + + size_t ret_count = resp->file_resp().count(); + resp->file_resp().data(); + s = writer->Write((uint64_t)offset, ret_count, resp->file_resp().data().c_str()); + if (!s.ok()) { + LOG(WARNING) << "rsync client write file error"; + break; + } + + md5.update(resp->file_resp().data().c_str(), ret_count); + offset += resp->file_resp().count(); + if (resp->file_resp().eof()) { + if (md5.finalize().hexdigest() != resp->file_resp().checksum()) { + LOG(WARNING) << "mismatch file checksum for file: " << filename; + s = Status::IOError("mismatch checksum", "mismatch checksum"); + return s; + } + s = writer->Fsync(); + if (!s.ok()) { + return s; + } + s = writer->Close(); + if (!s.ok()) { + return s; + } + writer.reset(); + meta_table_[filename] = resp->file_resp().checksum(); + break; + } + retries = 0; + } + + return s; +} + +Status RsyncClient::Start() { + StartThread(); + return Status::OK(); +} + +Status RsyncClient::Stop() { + if (state_ == IDLE) { + return Status::OK(); + } + LOG(WARNING) << "RsyncClient stop ..."; + state_ = STOP; + StopThread(); + client_thread_->StopThread(); + JoinThread(); + client_thread_->JoinThread(); + state_ = IDLE; + return Status::OK(); +} + +bool RsyncClient::Recover() { + std::string remote_snapshot_uuid; + std::set remote_file_set; + std::string local_snapshot_uuid; + std::map local_file_map; + std::set local_file_set; + + Status s = CopyRemoteMeta(&remote_snapshot_uuid, &remote_file_set); + if (!s.ok()) { + LOG(WARNING) << "copy remote meta failed"; + return false; + } + + s = LoadLocalMeta(&local_snapshot_uuid, &local_file_map); + if (!s.ok()) { + LOG(WARNING) << "load local meta failed"; + return false; + } + for (auto const& file : local_file_map) { + local_file_set.insert(file.first); + } + + std::set expired_files; + if (remote_snapshot_uuid != local_snapshot_uuid) { + snapshot_uuid_ = remote_snapshot_uuid; + file_set_ = remote_file_set; + expired_files = local_file_set; + } else { + for_each(remote_file_set.begin(), remote_file_set.end(), [](auto& file) {LOG(WARNING) << "remote_file_set: " << file;}); + for_each(local_file_set.begin(), local_file_set.end(), [](auto& file) {LOG(WARNING) << "local_file_set: " << file;}); + + std::set newly_files; + set_difference(remote_file_set.begin(), remote_file_set.end(), local_file_set.begin(), local_file_set.end(), + inserter(newly_files, newly_files.begin())); + set_difference(local_file_set.begin(), local_file_set.end(), remote_file_set.begin(), remote_file_set.end(), + inserter(expired_files, expired_files.begin())); + file_set_.insert(newly_files.begin(), newly_files.end()); + } + + s = CleanUpExpiredFiles(local_snapshot_uuid != remote_snapshot_uuid, expired_files); + if (!s.ok()) { + LOG(WARNING) << "clean up expired files failed"; + return false; + } + s = UpdateLocalMeta(snapshot_uuid_, expired_files, &local_file_map); + if (!s.ok()) { + LOG(WARNING) << "update local meta failed"; + return false; + } + + state_ = RUNNING; + LOG(INFO) << "copy meta data done, slot_id: " << slot_id_ << "snapshot_uuid: " << snapshot_uuid_ + << "file count: " << file_set_.size() << "expired file count: " << expired_files.size() + << ", local file count: " << local_file_set.size() << "remote file count: " << remote_file_set.size() + << "remote snapshot_uuid: " << remote_snapshot_uuid << "local snapshot_uuid: " << local_snapshot_uuid + << "file_set_: " << file_set_.size(); + for_each(file_set_.begin(), file_set_.end(), [](auto& file) {LOG(WARNING) << "file_set: " << file;}); + return true; +} + +Status RsyncClient::CopyRemoteMeta(std::string* snapshot_uuid, std::set* file_set) { + Status s; + int retries = 0; + RsyncRequest request; + request.set_db_name(db_name_); + request.set_slot_id(slot_id_); + request.set_type(kRsyncMeta); + std::string to_send; + request.SerializeToString(&to_send); + while (retries < max_retries_) { + s = client_thread_->Write(master_ip_, master_port_, to_send); + if (!s.ok()) { + retries++; + } + { + std::lock_guard lock(mu_); + wo_->Reset(kRsyncMeta); + } + RsyncResponse* resp = nullptr; + s = Wait(resp); + if (s.IsTimeout() || resp == nullptr) { + LOG(WARNING) << "rsync CopyRemoteMeta request timeout, " + << "retry times: " << retries; + retries++; + continue; + } + + if (resp->code() != RsyncService::kOk) { + //TODO: handle different error + delete resp; + continue; + } + LOG(INFO) << "receive rsync meta infos, snapshot_uuid: " << resp->snapshot_uuid() + << "files count: " << resp->meta_resp().filenames_size(); + for (std::string item : resp->meta_resp().filenames()) { + file_set->insert(item); + } + + *snapshot_uuid = resp->snapshot_uuid(); + for (int i = 0; i < resp->meta_resp().filenames_size(); i++) { + file_set->insert(resp->meta_resp().filenames(i)); + } + delete resp; + resp = nullptr; + break; + } + return s; +} + +Status RsyncClient::LoadLocalMeta(std::string* snapshot_uuid, std::map* file_map) { + std::string meta_file_path = GetLocalMetaFilePath(); + if (!FileExists(meta_file_path)) { + return Status::OK(); + } + + FILE* fp; + char* line = nullptr; + size_t len = 0; + size_t read = 0; + int32_t line_num = 0; + + std::atomic_int8_t retry_times = 5; + + while (retry_times-- > 0) { + fp = fopen(meta_file_path.c_str(), "r"); + if (fp == nullptr) { + LOG(WARNING) << "open meta file failed, meta_path: " << dir_; + } else { + break; + } + } + + // if the file cannot be read from disk, use the remote file directly + if (fp == nullptr) { + LOG(WARNING) << "open meta file failed, meta_path: " << meta_file_path << ", retry times: " << retry_times; + return Status::IOError("open meta file failed, dir: ", meta_file_path); + } + + while ((read = getline(&line, &len, fp)) != -1) { + std::string str(line); + std::string::size_type pos; + while ((pos = str.find("\r")) != std::string::npos) { + str.erase(pos, 1); + } + while ((pos = str.find("\n")) != std::string::npos) { + str.erase(pos, 1); + } + + if (str.empty()) { + continue; + } + + if (line_num == 0) { + *snapshot_uuid = str.erase(0, kUuidPrefix.size()); + } else { + if ((pos = str.find(":")) != std::string::npos) { + std::string filename = str.substr(0, pos); + std::string shecksum = str.substr(pos + 1, str.size()); + (*file_map)[filename] = shecksum; + } + } + + line_num++; + } + fclose(fp); + return Status::OK(); +} + +Status RsyncClient::CleanUpExpiredFiles(bool need_reset_path, const std::set& files) { + if (need_reset_path) { + std::string db_path = dir_ + (dir_.back() == '/' ? "" : "/"); + pstd::DeleteDirIfExist(db_path); + pstd::CreatePath(db_path + "strings"); + pstd::CreatePath(db_path + "hashes"); + pstd::CreatePath(db_path + "lists"); + pstd::CreatePath(db_path + "sets"); + pstd::CreatePath(db_path + "zsets"); + return Status::OK(); + } + + std::string db_path = dir_ + (dir_.back() == '/' ? "" : "/"); + for (const auto& file : files) { + bool b = pstd::DeleteDirIfExist(db_path + file); + if (!b) { + LOG(WARNING) << "delete file failed, file: " << file; + return Status::IOError("delete file failed"); + } + } + return Status::OK(); +} + +Status RsyncClient::UpdateLocalMeta(const std::string& snapshot_uuid, const std::set& expired_files, + std::map* localFileMap) { + for (const auto& item : expired_files) { + localFileMap->erase(item); + } + + std::string meta_file_path = GetLocalMetaFilePath(); + pstd::DeleteFile(meta_file_path); + + std::unique_ptr file; + pstd::Status s = pstd::NewWritableFile(meta_file_path, file); + if (!s.ok()) { + LOG(WARNING) << "create meta file failed, meta_file_path: " << meta_file_path; + return s; + } + file->Append(kUuidPrefix + snapshot_uuid + "\n"); + + for (const auto& item : *localFileMap) { + std::string line = item.first + ":" + item.second + "\n"; + file->Append(line); + } + s = file->Flush(); + if (!s.ok()) { + LOG(WARNING) << "flush meta file failed, meta_file_path: " << meta_file_path; + return s; + } + return Status::OK(); +} + +std::string RsyncClient::GetLocalMetaFilePath() { + std::string db_path = dir_ + (dir_.back() == '/' ? "" : "/"); + return db_path + kDumpMetaFileName; +} + +} // end namespace rsync + diff --git a/src/rsync_client_thread.cc b/src/rsync_client_thread.cc new file mode 100644 index 0000000000..1672f906fe --- /dev/null +++ b/src/rsync_client_thread.cc @@ -0,0 +1,45 @@ +// Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "include/rsync_client_thread.h" +#include "include/rsync_client.h" +#include "include/pika_define.h" + +using namespace pstd; +using namespace net; +using namespace RsyncService; + +namespace rsync { +class RsyncClient; +RsyncClientConn::RsyncClientConn(int fd, const std::string& ip_port, + net::Thread* thread, void* worker_specific_data, NetMultiplexer* mpx) + : PbConn(fd, ip_port, thread, mpx), cb_handler_(worker_specific_data) {} + +RsyncClientConn::~RsyncClientConn() {} + +int RsyncClientConn::DealMessage() { + RsyncResponse* response = new RsyncResponse(); + ::google::protobuf::io::ArrayInputStream input(rbuf_ + cur_pos_ - header_len_, header_len_); + ::google::protobuf::io::CodedInputStream decoder(&input); + decoder.SetTotalBytesLimit(PIKA_MAX_CONN_RBUF); + bool success = response->ParseFromCodedStream(&decoder) && decoder.ConsumedEntireMessage(); + if (!success) { + delete response; + LOG(WARNING) << "ParseFromArray FAILED! " + << " msg_len: " << header_len_; + return -1; + } + RsyncClient* handler = (RsyncClient*)cb_handler_; + handler->OnReceive(response); + return 0; +} + +RsyncClientThread::RsyncClientThread(int cron_interval, int keepalive_timeout, void* scheduler) + : ClientThread(&conn_factory_, cron_interval, keepalive_timeout, &handle_, nullptr), + conn_factory_(scheduler) {} + +RsyncClientThread::~RsyncClientThread() {} +} //end namespace rsync + diff --git a/src/rsync_server.cc b/src/rsync_server.cc new file mode 100644 index 0000000000..55af5f2558 --- /dev/null +++ b/src/rsync_server.cc @@ -0,0 +1,275 @@ +// Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include + +#include +#include + +#include "pstd_hash.h" +#include "include/pika_server.h" +#include "include/rsync_server.h" +#include "pstd/include/pstd_defer.h" + +extern PikaServer* g_pika_server; +namespace rsync { + +using namespace net; +using namespace RsyncService; +using namespace pstd; + +//TODO: optimzie file read and calculate checksum, maybe use RsyncReader prefeching file content +Status ReadDumpFile(const std::string filepath, const size_t offset, const size_t count, + char* data, size_t* bytes_read, std::string* checksum) { + int fd = open(filepath.c_str(), O_RDONLY); + if (fd < 0) { + return Status::IOError("fd open failed"); + } + DEFER { close(fd); }; + + const int kMaxCopyBlockSize = 1 << 20; + size_t read_offset = offset; + size_t read_count = count; + if (read_count > kMaxCopyBlockSize) { + read_count = kMaxCopyBlockSize; + } + ssize_t bytesin = 0; + size_t left_read_count = count; + + while ((bytesin = pread(fd, data, read_count, read_offset)) > 0) { + left_read_count -= bytesin; + if (left_read_count < 0) { + break ; + } + if (read_count > left_read_count) { + read_count = left_read_count; + } + + data += bytesin; + *bytes_read += bytesin; + read_offset += bytesin; + } + + if (bytesin == -1) { + LOG(ERROR) << "unable to read from " << filepath; + return pstd::Status::IOError("unable to read from " + filepath); + } + + if (bytesin == 0) { + char* buffer = new char[kMaxCopyBlockSize]; + pstd::MD5 md5; + + while ((bytesin = read(fd, buffer, kMaxCopyBlockSize)) > 0) { + md5.update(buffer, bytesin); + } + if (bytesin == -1) { + LOG(ERROR) << "unable to read from " << filepath; + delete []buffer; + return pstd::Status::IOError("unable to read from " + filepath); + } + delete []buffer; + *checksum = md5.finalize().hexdigest(); + } + return pstd::Status::OK(); +} + +void RsyncWriteResp(RsyncService::RsyncResponse& response, std::shared_ptr conn) { + std::string reply_str; + if (!response.SerializeToString(&reply_str) || (conn->WriteResp(reply_str) != 0)) { + LOG(WARNING) << "Process FileRsync request serialization failed"; + conn->NotifyClose(); + return; + } + conn->NotifyWrite(); +} + +RsyncServer::RsyncServer(const std::set& ips, const int port) { + work_thread_ = std::make_unique(2, 100000); + rsync_server_thread_ = std::make_unique(ips, port, 60 * 1000, this); +} + +RsyncServer::~RsyncServer() { + //TODO: handle destory + LOG(INFO) << "Rsync server destroyed"; +} + +void RsyncServer::Schedule(net::TaskFunc func, void* arg) { + work_thread_->Schedule(func, arg); +} + +int RsyncServer::Start() { + LOG(INFO) << "start RsyncServer ..."; + int res = rsync_server_thread_->StartThread(); + if (res != net::kSuccess) { + LOG(FATAL) << "Start rsync Server Thread Error: " << res; + } + res = work_thread_->start_thread_pool(); + if (res != net::kSuccess) { + LOG(FATAL) << "Start ThreadPool Error: " << res + << (res == net::kCreateThreadError ? ": create thread error " : ": other error"); + } + return res; +} + +int RsyncServer::Stop() { + LOG(INFO) << "stop RsyncServer ..."; + work_thread_->stop_thread_pool(); + rsync_server_thread_->StopThread(); + return 0; +} + +RsyncServerConn::RsyncServerConn(int connfd, const std::string& ip_port, Thread* thread, + void* worker_specific_data, NetMultiplexer* mpx) + : PbConn(connfd, ip_port, thread, mpx), data_(worker_specific_data) {} + +RsyncServerConn::~RsyncServerConn() {} + +int RsyncServerConn::DealMessage() { + std::shared_ptr req = std::make_shared(); + bool parse_res = req->ParseFromArray(rbuf_ + cur_pos_ - header_len_, header_len_); + if (!parse_res) { + LOG(WARNING) << "Pika rsync server connection pb parse error."; + return -1; + } + switch (req->type()) { + case RsyncService::kRsyncMeta: { + auto task_arg = + new RsyncServerTaskArg(req, std::dynamic_pointer_cast(shared_from_this())); + ((RsyncServer*)(data_))->Schedule(&RsyncServerConn::HandleMetaRsyncRequest, task_arg); + break; + } + case RsyncService::kRsyncFile: { + auto task_arg = + new RsyncServerTaskArg(req, std::dynamic_pointer_cast(shared_from_this())); + ((RsyncServer*)(data_))->Schedule(&RsyncServerConn::HandleFileRsyncRequest, task_arg); + break; + } + default: { + LOG(WARNING) << "Invalid RsyncRequest type"; + } + } + return 0; +} + +void RsyncServerConn::HandleMetaRsyncRequest(void* arg) { + std::unique_ptr task_arg(static_cast(arg)); + const std::shared_ptr req = task_arg->req; + std::shared_ptr conn = task_arg->conn; + std::string db_name = req->db_name(); + uint32_t slot_id = req->slot_id(); + std::shared_ptr slot = g_pika_server->GetDBSlotById(db_name, slot_id); + if (!slot || slot->IsBgSaving()) { + LOG(WARNING) << "waiting bgsave done..."; + return; + } + + RsyncService::RsyncResponse response; + response.set_code(RsyncService::kOk); + response.set_type(RsyncService::kRsyncMeta); + response.set_db_name(db_name); + response.set_slot_id(slot_id); + + std::vector filenames; + std::string snapshot_uuid; + g_pika_server->GetDumpMeta(db_name, slot_id, &filenames, &snapshot_uuid); + response.set_snapshot_uuid(snapshot_uuid); + + LOG(INFO) << "Rsync Meta request, snapshot_uuid: " << snapshot_uuid + << " files count: " << filenames.size() << " file list: "; + std::for_each(filenames.begin(), filenames.end(), [](auto& file) { + LOG(INFO) << "rsync snapshot file: " << file; + }); + + RsyncService::MetaResponse* meta_resp = response.mutable_meta_resp(); + for (const auto& filename : filenames) { + meta_resp->add_filenames(filename); + } + RsyncWriteResp(response, conn); +} + +void RsyncServerConn::HandleFileRsyncRequest(void* arg) { + std::unique_ptr task_arg(static_cast(arg)); + const std::shared_ptr req = task_arg->req; + std::shared_ptr conn = task_arg->conn; + + uint32_t slot_id = req->slot_id(); + std::string db_name = req->db_name(); + std::string filename = req->file_req().filename(); + size_t offset = req->file_req().offset(); + size_t count = req->file_req().count(); + + RsyncService::RsyncResponse response; + response.set_code(RsyncService::kOk); + response.set_type(RsyncService::kRsyncFile); + response.set_db_name(db_name); + response.set_slot_id(slot_id); + + std::string snapshot_uuid; + Status s = g_pika_server->GetDumpUUID(db_name, slot_id, &snapshot_uuid); + response.set_snapshot_uuid(snapshot_uuid); + if (!s.ok()) { + LOG(WARNING) << "rsyncserver get snapshotUUID failed"; + response.set_code(RsyncService::kErr); + RsyncWriteResp(response, conn); + return; + } + + std::shared_ptr slot = g_pika_server->GetDBSlotById(db_name, slot_id); + if (!slot) { + LOG(WARNING) << "cannot find slot for db_name: " << db_name + << " slot_id: " << slot_id; + response.set_code(RsyncService::kErr); + RsyncWriteResp(response, conn); + } + + const std::string filepath = slot->bgsave_info().path + "/" + filename; + char* buffer = new char[req->file_req().count() + 1]; + std::string checksum = ""; + size_t bytes_read{0}; + s = ReadDumpFile(filepath, offset, count, buffer, &bytes_read, &checksum); + if (!s.ok()) { + response.set_code(RsyncService::kErr); + RsyncWriteResp(response, conn); + delete []buffer; + return; + } + + RsyncService::FileResponse* file_resp = response.mutable_file_resp(); + file_resp->set_data(buffer, bytes_read); + file_resp->set_eof(bytes_read != count); + file_resp->set_checksum(checksum); + file_resp->set_filename(filename); + file_resp->set_count(bytes_read); + file_resp->set_offset(offset); + + RsyncWriteResp(response, conn); + delete []buffer; +} + +RsyncServerThread::RsyncServerThread(const std::set& ips, int port, int cron_interval, RsyncServer* arg) + : HolyThread(ips, port, &conn_factory_, cron_interval, &handle_, true), conn_factory_(arg) {} + +RsyncServerThread::~RsyncServerThread() { + LOG(WARNING) << "RsyncServerThread destroyed"; +} + +void RsyncServerThread::RsyncServerHandle::FdClosedHandle(int fd, const std::string& ip_port) const { + LOG(WARNING) << "ip_port: " << ip_port << " connection closed"; +} + +void RsyncServerThread::RsyncServerHandle::FdTimeoutHandle(int fd, const std::string& ip_port) const { + LOG(WARNING) << "ip_port: " << ip_port << " connection timeout"; +} + +bool RsyncServerThread::RsyncServerHandle::AccessHandle(int fd, std::string& ip_port) const { + LOG(WARNING) << "fd: "<< fd << " ip_port: " << ip_port << " connection accepted"; + return true; +} + +void RsyncServerThread::RsyncServerHandle::CronHandle() const { +} + +} // end namespace rsync + diff --git a/src/rsync_service.proto b/src/rsync_service.proto new file mode 100644 index 0000000000..73f6005bd6 --- /dev/null +++ b/src/rsync_service.proto @@ -0,0 +1,49 @@ +syntax = "proto2"; +package RsyncService; + +enum Type { + kRsyncMeta = 1; + kRsyncFile = 2; +} + +enum StatusCode { + kOk = 1; + kErr = 2; +} + +message MetaResponse { + repeated string filenames = 1; +} + +message FileRequest { + required string filename = 1; + required uint64 count = 2; + required uint64 offset = 3; +} + +message FileResponse { + required int32 eof = 1; + required uint64 count = 2; + required uint64 offset = 3; + required bytes data = 4; + required string checksum = 5; + required string filename = 6; +} + +message RsyncRequest { + required Type type = 1; + required string db_name = 2; + required uint32 slot_id = 3; + optional FileRequest file_req = 4; +} + +message RsyncResponse { + required Type type = 1; + required string snapshot_uuid = 2; + required string db_name = 3; + required uint32 slot_id = 4; + required StatusCode code = 5; + optional MetaResponse meta_resp = 6; + optional FileResponse file_resp = 7; +} + diff --git a/src/throttle.cc b/src/throttle.cc new file mode 100644 index 0000000000..39f93d0254 --- /dev/null +++ b/src/throttle.cc @@ -0,0 +1,61 @@ +// Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "include/throttle.h" +#include +#include +#include "pstd/include/env.h" + +DEFINE_uint64(raft_minimal_throttle_threshold_mb, 0, "minimal throttle throughput threshold per second"); +namespace rsync{ + +Throttle::Throttle(size_t throttle_throughput_bytes, size_t check_cycle) + : throttle_throughput_bytes_(throttle_throughput_bytes), + last_throughput_check_time_us_(caculate_check_time_us_(pstd::NowMicros(), check_cycle)), + cur_throughput_bytes_(0) {} + +Throttle::~Throttle() {} + +size_t Throttle::ThrottledByThroughput(size_t bytes) { + size_t available_size = bytes; + size_t now = pstd::NowMicros(); + size_t limit_throughput_bytes_s = std::max(static_cast(throttle_throughput_bytes_), + FLAGS_raft_minimal_throttle_threshold_mb * 1024 * 1024); + size_t limit_per_cycle = limit_throughput_bytes_s / check_cycle_; + std::unique_lock lock(keys_mutex_); + if (cur_throughput_bytes_ + bytes > limit_per_cycle) { + // reading another |bytes| excceds the limit + if (now - last_throughput_check_time_us_ <= 1 * 1000 * 1000 / check_cycle_) { + // if a time interval is less than or equal to a cycle, read more data + // to make full use of the throughput of the current cycle. + available_size = limit_per_cycle > cur_throughput_bytes_ ? limit_per_cycle - cur_throughput_bytes_ : 0; + cur_throughput_bytes_ = limit_per_cycle; + } else { + // otherwise, read the data in the next cycle. + available_size = bytes > limit_per_cycle ? limit_per_cycle : bytes; + cur_throughput_bytes_ = available_size; + last_throughput_check_time_us_ = caculate_check_time_us_(now, check_cycle_); + } + } else { + // reading another |bytes| doesn't excced limit (less than or equal to), + // put it in the current cycle + available_size = bytes; + cur_throughput_bytes_ += available_size; + } + keys_mutex_.unlock(); + return available_size; +} + +void Throttle::ReturnUnusedThroughput(size_t acquired, size_t consumed, size_t elaspe_time_us) { + size_t now = pstd::NowMicros(); + std::unique_lock lock(keys_mutex_); + if (now - elaspe_time_us < last_throughput_check_time_us_) { + // Tokens are aqured in last cycle, ignore + return; + } + cur_throughput_bytes_ = std::max(cur_throughput_bytes_ - (acquired - consumed), size_t(0)); +} +} +