Skip to content

Commit

Permalink
Fix formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
anand1976 committed Apr 19, 2024
1 parent 367f01d commit 288eeea
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 16 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1365,6 +1365,7 @@ if(WITH_TESTS)
db/file_indexer_test.cc
db/filename_test.cc
db/flush_job_test.cc
db/db_follower_test.cc
db/import_column_family_test.cc
db/listener_test.cc
db/log_test.cc
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1922,6 +1922,9 @@ sst_file_reader_test: $(OBJ_DIR)/table/sst_file_reader_test.o $(TEST_LIBRARY) $(
db_secondary_test: $(OBJ_DIR)/db/db_secondary_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

db_follower_test: $(OBJ_DIR)/db/db_follower_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

block_cache_tracer_test: $(OBJ_DIR)/trace_replay/block_cache_tracer_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

Expand Down
63 changes: 63 additions & 0 deletions db/db_follower_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) 2024-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "test_util/sync_point.h"

namespace ROCKSDB_NAMESPACE {

#ifdef OS_LINUX

class DBFollowerTest : public DBTestBase {
public:
// Create directories for leader and follower
// Create the leader DB object
DBFollowerTest()
: DBTestBase("/db_follower_test", /*env_do_fsync*/false) {
follower_name_ = dbname_ + "/follower";
Close();
Destroy(CurrentOptions());
EXPECT_EQ(env_->CreateDirIfMissing(dbname_), Status::OK());
dbname_ = dbname_ + "/leader";
Reopen(CurrentOptions());
}

~DBFollowerTest() {
follower_.reset();
EXPECT_EQ(DestroyDB(follower_name_, CurrentOptions()), Status::OK());
}

protected:
Status OpenAsFollower() {
return DB::OpenAsFollower(CurrentOptions(), follower_name_, dbname_, &follower_);
}
DB* follower() { return follower_.get(); }

private:
std::string follower_name_;
std::unique_ptr<DB> follower_;
};

TEST_F(DBFollowerTest, Basic) {
ASSERT_OK(Put("k1", "v1"));
ASSERT_OK(Flush());
ASSERT_OK(Put("k2", "v2"));
ASSERT_OK(Flush());

ASSERT_OK(OpenAsFollower());
std::string val;
ASSERT_OK(follower()->Get(ReadOptions(), "k1", &val));
ASSERT_EQ(val, "v1");
}

#endif
} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
43 changes: 28 additions & 15 deletions db/db_impl/db_impl_follower.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ DBImplFollower::DBImplFollower(const DBOptions& db_options,
const std::string& dbname, std::string src_path)
: DBImplSecondary(db_options, dbname, ""),
env_guard_(std::move(env)),
src_path_(std::move(src_path)) {
src_path_(std::move(src_path)),
cv_(&mu_) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Opening the db in follower mode");
LogFlush(immutable_db_options_.info_log);
}

DBImplFollower::~DBImplFollower() {
Status s = CloseImpl();
Status s = Close();
if (!s.ok()) {
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Error closing DB : %s",
s.ToString().c_str());
Expand All @@ -52,9 +53,8 @@ DBImplFollower::~DBImplFollower() {
Status DBImplFollower::Recover(
const std::vector<ColumnFamilyDescriptor>& column_families,
bool /*readonly*/, bool /*error_if_wal_file_exists*/,
bool /*error_if_data_exists_in_wals*/, bool /*is_retry*/,
uint64_t*, RecoveryContext* /*recovery_ctx*/,
bool* /*can_retry*/) {
bool /*error_if_data_exists_in_wals*/, bool /*is_retry*/, uint64_t*,
RecoveryContext* /*recovery_ctx*/, bool* /*can_retry*/) {
mutex_.AssertHeld();

JobContext job_context(0);
Expand Down Expand Up @@ -154,35 +154,48 @@ Status DBImplFollower::TryCatchUpWithLeader() {

void DBImplFollower::PeriodicRefresh() {
while (!stop_requested_.load()) {
env_->SleepForMicroseconds(
static_cast<int>(
immutable_db_options_.follower_refresh_catchup_period_ms) *
1000);
MutexLock l(&mu_);
int64_t wait_until =
immutable_db_options_.clock->NowMicros() +
immutable_db_options_.follower_refresh_catchup_period_ms * 1000;
immutable_db_options_.clock->TimedWait(
&cv_, std::chrono::microseconds(wait_until));
if (stop_requested_.load()) {
break;
}
Status s;
for (uint64_t i = 0; i < immutable_db_options_.follower_catchup_retry_count;
for (uint64_t i = 0;
i < immutable_db_options_.follower_catchup_retry_count &&
!stop_requested_.load();
++i) {
s = TryCatchUpWithLeader();

if (s.ok()) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Successful catch up on attempt %llu",
static_cast<unsigned long long>(i));
break;
}
env_->SleepForMicroseconds(
static_cast<int>(
immutable_db_options_.follower_catchup_retry_wait_ms) *
1000);
wait_until = immutable_db_options_.clock->NowMicros() +
immutable_db_options_.follower_catchup_retry_wait_ms * 1000;
immutable_db_options_.clock->TimedWait(
&cv_, std::chrono::microseconds(wait_until));
}
if (!s.ok()) {
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Catch up unsuccessful");
}
}
}

Status DBImplFollower::CloseImpl() {
Status DBImplFollower::Close() {
if (catch_up_thread_) {
stop_requested_.store(true);
{
MutexLock l(&mu_);
cv_.SignalAll();
}
catch_up_thread_->join();
catch_up_thread_.reset();
}

return DBImpl::Close();
Expand Down
6 changes: 5 additions & 1 deletion db/db_impl/db_impl_follower.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "db/db_impl/db_impl.h"
#include "db/db_impl/db_impl_secondary.h"
#include "logging/logging.h"
#include "port/port.h"

namespace ROCKSDB_NAMESPACE {

Expand All @@ -20,14 +21,15 @@ class DBImplFollower : public DBImplSecondary {
const std::string& dbname, std::string src_path);
~DBImplFollower();

Status Close() override;

protected:
bool OwnTablesAndLogs() const override {
// TODO: Change this to true once we've properly implemented file
// deletion for the read scaling case
return false;
}

Status CloseImpl() override;

Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families,
bool /*readonly*/, bool /*error_if_wal_file_exists*/,
Expand All @@ -46,5 +48,7 @@ class DBImplFollower : public DBImplSecondary {
std::unique_ptr<port::Thread> catch_up_thread_;
std::atomic<bool> stop_requested_;
std::string src_path_;
port::Mutex mu_;
port::CondVar cv_;
};
} // namespace ROCKSDB_NAMESPACE
1 change: 1 addition & 0 deletions src.mk
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ TEST_MAIN_SOURCES = \
db/db_dynamic_level_test.cc \
db/db_encryption_test.cc \
db/db_flush_test.cc \
db/db_follower_test.cc \
db/db_readonly_with_timestamp_test.cc \
db/db_with_timestamp_basic_test.cc \
db/import_column_family_test.cc \
Expand Down

0 comments on commit 288eeea

Please sign in to comment.