Skip to content

Commit

Permalink
fix: delete old sst files when re-downloading sst files in full synch…
Browse files Browse the repository at this point in the history
…ronization (#2186)

* bugfix fix rsync when request timeout
  • Loading branch information
wangshao1 authored and brother-jin committed Dec 12, 2023
1 parent fda44ff commit 9f19c75
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 28 deletions.
21 changes: 11 additions & 10 deletions include/rsync_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ extern std::unique_ptr<PikaConf> g_pika_conf;

const std::string kDumpMetaFileName = "DUMP_META_DATA";
const std::string kUuidPrefix = "snapshot-uuid:";
const size_t kInvalidOffset = 0xFFFFFFFF;

namespace rsync {

Expand All @@ -42,8 +43,7 @@ class WaitObject;
class WaitObjectManager;

using pstd::Status;


using ResponseSPtr = std::shared_ptr<RsyncService::RsyncResponse>;
class RsyncClient : public net::Thread {
public:
enum State {
Expand Down Expand Up @@ -151,18 +151,18 @@ class WaitObject {

void Reset(const std::string& filename, RsyncService::Type t, size_t offset) {
std::lock_guard<std::mutex> guard(mu_);
resp_ = nullptr;
resp_.reset();
filename_ = filename;
type_ = t;
offset_ = offset;
}

pstd::Status Wait(RsyncService::RsyncResponse*& resp) {
pstd::Status Wait(ResponseSPtr& resp) {
pstd::Status s = Status::Timeout("rsync timeout", "timeout");
{
std::unique_lock<std::mutex> lock(mu_);
auto cv_s = cond_.wait_for(lock, std::chrono::seconds(1), [this] {
return resp_ != nullptr;
return resp_.get() != nullptr;
});
if (!cv_s) {
return s;
Expand All @@ -175,19 +175,19 @@ class WaitObject {

void WakeUp(RsyncService::RsyncResponse* resp) {
std::unique_lock<std::mutex> lock(mu_);
resp_ = resp;
resp_.reset(resp);
offset_ = kInvalidOffset;
cond_.notify_all();
}

RsyncService::RsyncResponse* Response() {return resp_;}
std::string Filename() {return filename_;}
RsyncService::Type Type() {return type_;}
size_t Offset() {return offset_;}
private:
std::string filename_;
RsyncService::Type type_;
size_t offset_ = 0xFFFFFFFF;
RsyncService::RsyncResponse* resp_ = nullptr;
size_t offset_ = kInvalidOffset;
ResponseSPtr resp_ = nullptr;
std::condition_variable cond_;
std::mutex mu_;
};
Expand Down Expand Up @@ -222,7 +222,8 @@ class WaitObjectManager {
return;
}
if (resp->type() == RsyncService::kRsyncFile &&
(resp->file_resp().filename() != wo_vec_[index]->Filename())) {
((resp->file_resp().filename() != wo_vec_[index]->Filename()) ||
(resp->file_resp().offset() != wo_vec_[index]->Offset()))) {
delete resp;
return;
}
Expand Down
23 changes: 7 additions & 16 deletions src/rsync_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ void* RsyncClient::ThreadMain() {
}

Status RsyncClient::CopyRemoteFile(const std::string& filename, int index) {
std::unique_ptr<RsyncWriter> writer(new RsyncWriter(dir_ + "/" + filename));
const std::string filepath = dir_ + "/" + filename;
std::unique_ptr<RsyncWriter> writer(new RsyncWriter(filepath));
Status s = Status::OK();
size_t offset = 0;
int retries = 0;
Expand All @@ -151,7 +152,7 @@ Status RsyncClient::CopyRemoteFile(const std::string& filename, int index) {
writer.reset();
}
if (!s.ok()) {
DeleteFile(filename);
DeleteFile(filepath);
}
};

Expand Down Expand Up @@ -184,12 +185,7 @@ Status RsyncClient::CopyRemoteFile(const std::string& filename, int index) {
continue;
}

RsyncResponse* resp = nullptr;
DEFER {
if (resp) {
delete resp;
}
};
std::shared_ptr<RsyncResponse> resp = nullptr;
s = wo->Wait(resp);
if (s.IsTimeout() || resp == nullptr) {
LOG(WARNING) << "rsync request timeout";
Expand Down Expand Up @@ -336,19 +332,14 @@ Status RsyncClient::CopyRemoteMeta(std::string* snapshot_uuid, std::set<std::str
std::string to_send;
request.SerializeToString(&to_send);
while (retries < max_retries_) {
WaitObject* wo = wo_mgr_->UpdateWaitObject(0, "", kRsyncMeta, 0xFFFFFFFF);
WaitObject* wo = wo_mgr_->UpdateWaitObject(0, "", kRsyncMeta, kInvalidOffset);
s = client_thread_->Write(master_ip_, master_port_, to_send);
if (!s.ok()) {
retries++;
}
RsyncResponse* resp = nullptr;
DEFER {
if (resp) {
delete resp;
}
};
std::shared_ptr<RsyncResponse> resp;
s = wo->Wait(resp);
if (s.IsTimeout() || resp == nullptr) {
if (s.IsTimeout() || resp.get() == nullptr) {
LOG(WARNING) << "rsync CopyRemoteMeta request timeout, "
<< "retry times: " << retries;
retries++;
Expand Down
6 changes: 4 additions & 2 deletions src/rsync_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,15 @@ int RsyncServer::Start() {
LOG(INFO) << "start RsyncServer ...";
int res = rsync_server_thread_->StartThread();
if (res != net::kSuccess) {
LOG(FATAL) << "Start rsync Server Thread Error: " << res;
LOG(FATAL) << "Start rsync Server Thread Error. ret_code: " << res << " message: "
<< (res == net::kBindError ? ": bind port conflict" : ": other error");
}
res = work_thread_->start_thread_pool();
if (res != net::kSuccess) {
LOG(FATAL) << "Start ThreadPool Error: " << res
LOG(FATAL) << "Start rsync Server ThreadPool Error, ret_code: " << res << " message: "
<< (res == net::kCreateThreadError ? ": create thread error " : ": other error");
}
LOG(INFO) << "RsyncServer started ...";
return res;
}

Expand Down

0 comments on commit 9f19c75

Please sign in to comment.