From 9f19c759af13e5e5480c00e5eb2d7b63eeeebecf Mon Sep 17 00:00:00 2001 From: wangshao1 <30471730+wangshao1@users.noreply.github.com> Date: Fri, 8 Dec 2023 21:10:34 +0800 Subject: [PATCH] fix: delete old sst files when re-downloading sst files in full synchronization (#2186) * bugfix fix rsync when request timeout --- include/rsync_client.h | 21 +++++++++++---------- src/rsync_client.cc | 23 +++++++---------------- src/rsync_server.cc | 6 ++++-- 3 files changed, 22 insertions(+), 28 deletions(-) diff --git a/include/rsync_client.h b/include/rsync_client.h index d1077a8a61..388bd8ff0e 100644 --- a/include/rsync_client.h +++ b/include/rsync_client.h @@ -33,6 +33,7 @@ extern std::unique_ptr g_pika_conf; const std::string kDumpMetaFileName = "DUMP_META_DATA"; const std::string kUuidPrefix = "snapshot-uuid:"; +const size_t kInvalidOffset = 0xFFFFFFFF; namespace rsync { @@ -42,8 +43,7 @@ class WaitObject; class WaitObjectManager; using pstd::Status; - - +using ResponseSPtr = std::shared_ptr; class RsyncClient : public net::Thread { public: enum State { @@ -151,18 +151,18 @@ class WaitObject { void Reset(const std::string& filename, RsyncService::Type t, size_t offset) { std::lock_guard guard(mu_); - resp_ = nullptr; + resp_.reset(); filename_ = filename; type_ = t; offset_ = offset; } - pstd::Status Wait(RsyncService::RsyncResponse*& resp) { + pstd::Status Wait(ResponseSPtr& resp) { pstd::Status s = Status::Timeout("rsync timeout", "timeout"); { std::unique_lock lock(mu_); auto cv_s = cond_.wait_for(lock, std::chrono::seconds(1), [this] { - return resp_ != nullptr; + return resp_.get() != nullptr; }); if (!cv_s) { return s; @@ -175,19 +175,19 @@ class WaitObject { void WakeUp(RsyncService::RsyncResponse* resp) { std::unique_lock lock(mu_); - resp_ = resp; + resp_.reset(resp); + offset_ = kInvalidOffset; cond_.notify_all(); } - RsyncService::RsyncResponse* Response() {return resp_;} std::string Filename() {return filename_;} RsyncService::Type Type() {return type_;} size_t Offset() {return offset_;} private: std::string filename_; RsyncService::Type type_; - size_t offset_ = 0xFFFFFFFF; - RsyncService::RsyncResponse* resp_ = nullptr; + size_t offset_ = kInvalidOffset; + ResponseSPtr resp_ = nullptr; std::condition_variable cond_; std::mutex mu_; }; @@ -222,7 +222,8 @@ class WaitObjectManager { return; } if (resp->type() == RsyncService::kRsyncFile && - (resp->file_resp().filename() != wo_vec_[index]->Filename())) { + ((resp->file_resp().filename() != wo_vec_[index]->Filename()) || + (resp->file_resp().offset() != wo_vec_[index]->Offset()))) { delete resp; return; } diff --git a/src/rsync_client.cc b/src/rsync_client.cc index fbdcf2726b..bfaa6dc5af 100644 --- a/src/rsync_client.cc +++ b/src/rsync_client.cc @@ -140,7 +140,8 @@ void* RsyncClient::ThreadMain() { } Status RsyncClient::CopyRemoteFile(const std::string& filename, int index) { - std::unique_ptr writer(new RsyncWriter(dir_ + "/" + filename)); + const std::string filepath = dir_ + "/" + filename; + std::unique_ptr writer(new RsyncWriter(filepath)); Status s = Status::OK(); size_t offset = 0; int retries = 0; @@ -151,7 +152,7 @@ Status RsyncClient::CopyRemoteFile(const std::string& filename, int index) { writer.reset(); } if (!s.ok()) { - DeleteFile(filename); + DeleteFile(filepath); } }; @@ -184,12 +185,7 @@ Status RsyncClient::CopyRemoteFile(const std::string& filename, int index) { continue; } - RsyncResponse* resp = nullptr; - DEFER { - if (resp) { - delete resp; - } - }; + std::shared_ptr resp = nullptr; s = wo->Wait(resp); if (s.IsTimeout() || resp == nullptr) { LOG(WARNING) << "rsync request timeout"; @@ -336,19 +332,14 @@ Status RsyncClient::CopyRemoteMeta(std::string* snapshot_uuid, std::setUpdateWaitObject(0, "", kRsyncMeta, 0xFFFFFFFF); + WaitObject* wo = wo_mgr_->UpdateWaitObject(0, "", kRsyncMeta, kInvalidOffset); s = client_thread_->Write(master_ip_, master_port_, to_send); if (!s.ok()) { retries++; } - RsyncResponse* resp = nullptr; - DEFER { - if (resp) { - delete resp; - } - }; + std::shared_ptr resp; s = wo->Wait(resp); - if (s.IsTimeout() || resp == nullptr) { + if (s.IsTimeout() || resp.get() == nullptr) { LOG(WARNING) << "rsync CopyRemoteMeta request timeout, " << "retry times: " << retries; retries++; diff --git a/src/rsync_server.cc b/src/rsync_server.cc index c8d28f51fb..d4064364ff 100644 --- a/src/rsync_server.cc +++ b/src/rsync_server.cc @@ -48,13 +48,15 @@ int RsyncServer::Start() { LOG(INFO) << "start RsyncServer ..."; int res = rsync_server_thread_->StartThread(); if (res != net::kSuccess) { - LOG(FATAL) << "Start rsync Server Thread Error: " << res; + LOG(FATAL) << "Start rsync Server Thread Error. ret_code: " << res << " message: " + << (res == net::kBindError ? ": bind port conflict" : ": other error"); } res = work_thread_->start_thread_pool(); if (res != net::kSuccess) { - LOG(FATAL) << "Start ThreadPool Error: " << res + LOG(FATAL) << "Start rsync Server ThreadPool Error, ret_code: " << res << " message: " << (res == net::kCreateThreadError ? ": create thread error " : ": other error"); } + LOG(INFO) << "RsyncServer started ..."; return res; }