From 288eeeae2a0b549b56a72bf4a6bb8692e7950a37 Mon Sep 17 00:00:00 2001 From: anand76 Date: Fri, 19 Apr 2024 10:43:57 -0700 Subject: [PATCH] Fix formatting --- CMakeLists.txt | 1 + Makefile | 3 ++ db/db_follower_test.cc | 63 ++++++++++++++++++++++++++++++++++ db/db_impl/db_impl_follower.cc | 43 +++++++++++++++-------- db/db_impl/db_impl_follower.h | 6 +++- src.mk | 1 + 6 files changed, 101 insertions(+), 16 deletions(-) create mode 100644 db/db_follower_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 58a0fec24a9..50cc40b0d23 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1365,6 +1365,7 @@ if(WITH_TESTS) db/file_indexer_test.cc db/filename_test.cc db/flush_job_test.cc + db/db_follower_test.cc db/import_column_family_test.cc db/listener_test.cc db/log_test.cc diff --git a/Makefile b/Makefile index 850c18af4f9..f22727f9254 100644 --- a/Makefile +++ b/Makefile @@ -1922,6 +1922,9 @@ sst_file_reader_test: $(OBJ_DIR)/table/sst_file_reader_test.o $(TEST_LIBRARY) $( db_secondary_test: $(OBJ_DIR)/db/db_secondary_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) +db_follower_test: $(OBJ_DIR)/db/db_follower_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) + block_cache_tracer_test: $(OBJ_DIR)/trace_replay/block_cache_tracer_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) diff --git a/db/db_follower_test.cc b/db/db_follower_test.cc new file mode 100644 index 00000000000..a7ebeae2f51 --- /dev/null +++ b/db/db_follower_test.cc @@ -0,0 +1,63 @@ +// Copyright (c) 2024-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/db_test_util.h" +#include "port/stack_trace.h" +#include "test_util/sync_point.h" + +namespace ROCKSDB_NAMESPACE { + +#ifdef OS_LINUX + +class DBFollowerTest : public DBTestBase { + public: + // Create directories for leader and follower + // Create the leader DB object + DBFollowerTest() + : DBTestBase("/db_follower_test", /*env_do_fsync*/false) { + follower_name_ = dbname_ + "/follower"; + Close(); + Destroy(CurrentOptions()); + EXPECT_EQ(env_->CreateDirIfMissing(dbname_), Status::OK()); + dbname_ = dbname_ + "/leader"; + Reopen(CurrentOptions()); + } + + ~DBFollowerTest() { + follower_.reset(); + EXPECT_EQ(DestroyDB(follower_name_, CurrentOptions()), Status::OK()); + } + + protected: + Status OpenAsFollower() { + return DB::OpenAsFollower(CurrentOptions(), follower_name_, dbname_, &follower_); + } + DB* follower() { return follower_.get(); } + + private: + std::string follower_name_; + std::unique_ptr follower_; +}; + +TEST_F(DBFollowerTest, Basic) { + ASSERT_OK(Put("k1", "v1")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("k2", "v2")); + ASSERT_OK(Flush()); + + ASSERT_OK(OpenAsFollower()); + std::string val; + ASSERT_OK(follower()->Get(ReadOptions(), "k1", &val)); + ASSERT_EQ(val, "v1"); +} + +#endif +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/db_impl/db_impl_follower.cc b/db/db_impl/db_impl_follower.cc index 262901b0cfe..6abd40731be 100644 --- a/db/db_impl/db_impl_follower.cc +++ b/db/db_impl/db_impl_follower.cc @@ -26,14 +26,15 @@ DBImplFollower::DBImplFollower(const DBOptions& db_options, const std::string& dbname, std::string src_path) : DBImplSecondary(db_options, dbname, ""), env_guard_(std::move(env)), - src_path_(std::move(src_path)) { + src_path_(std::move(src_path)), + cv_(&mu_) { ROCKS_LOG_INFO(immutable_db_options_.info_log, "Opening the db in follower mode"); LogFlush(immutable_db_options_.info_log); } DBImplFollower::~DBImplFollower() { - Status s = CloseImpl(); + Status s = Close(); if (!s.ok()) { ROCKS_LOG_INFO(immutable_db_options_.info_log, "Error closing DB : %s", s.ToString().c_str()); @@ -52,9 +53,8 @@ DBImplFollower::~DBImplFollower() { Status DBImplFollower::Recover( const std::vector& column_families, bool /*readonly*/, bool /*error_if_wal_file_exists*/, - bool /*error_if_data_exists_in_wals*/, bool /*is_retry*/, - uint64_t*, RecoveryContext* /*recovery_ctx*/, - bool* /*can_retry*/) { + bool /*error_if_data_exists_in_wals*/, bool /*is_retry*/, uint64_t*, + RecoveryContext* /*recovery_ctx*/, bool* /*can_retry*/) { mutex_.AssertHeld(); JobContext job_context(0); @@ -154,24 +154,32 @@ Status DBImplFollower::TryCatchUpWithLeader() { void DBImplFollower::PeriodicRefresh() { while (!stop_requested_.load()) { - env_->SleepForMicroseconds( - static_cast( - immutable_db_options_.follower_refresh_catchup_period_ms) * - 1000); + MutexLock l(&mu_); + int64_t wait_until = + immutable_db_options_.clock->NowMicros() + + immutable_db_options_.follower_refresh_catchup_period_ms * 1000; + immutable_db_options_.clock->TimedWait( + &cv_, std::chrono::microseconds(wait_until)); + if (stop_requested_.load()) { + break; + } Status s; - for (uint64_t i = 0; i < immutable_db_options_.follower_catchup_retry_count; + for (uint64_t i = 0; + i < immutable_db_options_.follower_catchup_retry_count && + !stop_requested_.load(); ++i) { s = TryCatchUpWithLeader(); + if (s.ok()) { ROCKS_LOG_INFO(immutable_db_options_.info_log, "Successful catch up on attempt %llu", static_cast(i)); break; } - env_->SleepForMicroseconds( - static_cast( - immutable_db_options_.follower_catchup_retry_wait_ms) * - 1000); + wait_until = immutable_db_options_.clock->NowMicros() + + immutable_db_options_.follower_catchup_retry_wait_ms * 1000; + immutable_db_options_.clock->TimedWait( + &cv_, std::chrono::microseconds(wait_until)); } if (!s.ok()) { ROCKS_LOG_INFO(immutable_db_options_.info_log, "Catch up unsuccessful"); @@ -179,10 +187,15 @@ void DBImplFollower::PeriodicRefresh() { } } -Status DBImplFollower::CloseImpl() { +Status DBImplFollower::Close() { if (catch_up_thread_) { stop_requested_.store(true); + { + MutexLock l(&mu_); + cv_.SignalAll(); + } catch_up_thread_->join(); + catch_up_thread_.reset(); } return DBImpl::Close(); diff --git a/db/db_impl/db_impl_follower.h b/db/db_impl/db_impl_follower.h index 9d7b607a3b2..1ffe5c21f95 100644 --- a/db/db_impl/db_impl_follower.h +++ b/db/db_impl/db_impl_follower.h @@ -11,6 +11,7 @@ #include "db/db_impl/db_impl.h" #include "db/db_impl/db_impl_secondary.h" #include "logging/logging.h" +#include "port/port.h" namespace ROCKSDB_NAMESPACE { @@ -20,6 +21,8 @@ class DBImplFollower : public DBImplSecondary { const std::string& dbname, std::string src_path); ~DBImplFollower(); + Status Close() override; + protected: bool OwnTablesAndLogs() const override { // TODO: Change this to true once we've properly implemented file @@ -27,7 +30,6 @@ class DBImplFollower : public DBImplSecondary { return false; } - Status CloseImpl() override; Status Recover(const std::vector& column_families, bool /*readonly*/, bool /*error_if_wal_file_exists*/, @@ -46,5 +48,7 @@ class DBImplFollower : public DBImplSecondary { std::unique_ptr catch_up_thread_; std::atomic stop_requested_; std::string src_path_; + port::Mutex mu_; + port::CondVar cv_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/src.mk b/src.mk index 9d3b3cc4f40..23cf348e1eb 100644 --- a/src.mk +++ b/src.mk @@ -476,6 +476,7 @@ TEST_MAIN_SOURCES = \ db/db_dynamic_level_test.cc \ db/db_encryption_test.cc \ db/db_flush_test.cc \ + db/db_follower_test.cc \ db/db_readonly_with_timestamp_test.cc \ db/db_with_timestamp_basic_test.cc \ db/import_column_family_test.cc \