diff --git a/src/spider/CMakeLists.txt b/src/spider/CMakeLists.txt index a94b9dd..5271c6a 100644 --- a/src/spider/CMakeLists.txt +++ b/src/spider/CMakeLists.txt @@ -99,6 +99,7 @@ set(SPIDER_SCHEDULER_SOURCES scheduler/SchedulerMessage.hpp scheduler/SchedulerServer.cpp scheduler/SchedulerServer.hpp + utils/StopToken.hpp CACHE INTERNAL "spider scheduler source files" ) @@ -110,6 +111,7 @@ target_link_libraries( spider_scheduler PRIVATE Boost::headers + Boost::program_options absl::flat_hash_map spdlog::spdlog ) diff --git a/src/spider/io/BoostAsio.hpp b/src/spider/io/BoostAsio.hpp index 5931b39..23731bc 100644 --- a/src/spider/io/BoostAsio.hpp +++ b/src/spider/io/BoostAsio.hpp @@ -1,6 +1,8 @@ #ifndef SPIDER_CORE_BOOSTASIO_HPP #define SPIDER_CORE_BOOSTASIO_HPP +#include + // clang-format off // IWYU pragma: begin_exports @@ -20,6 +22,7 @@ #include #include +#include #include #include @@ -27,6 +30,7 @@ #include #include #include +#include #include #include @@ -35,4 +39,36 @@ // IWYU pragma: end_exports // clang-format on + +#include + +#include + +namespace spider::core { +inline auto get_address() -> std::optional { + try { + boost::asio::io_context io_context; + boost::asio::ip::tcp::resolver resolver(io_context); + auto const endpoints = resolver.resolve(boost::asio::ip::host_name(), ""); + for (auto const& endpoint : endpoints) { + if (endpoint.endpoint().address().is_v4() + && !endpoint.endpoint().address().is_loopback()) + { + return endpoint.endpoint().address().to_string(); + } + } + // If no non-loopback address found, return loopback address + spdlog::warn("No non-loopback address found, using loopback address"); + for (auto const& endpoint : endpoints) { + if (endpoint.endpoint().address().is_v4()) { + return endpoint.endpoint().address().to_string(); + } + } + return std::nullopt; + } catch (boost::system::system_error const& e) { + return std::nullopt; + } +} +} // namespace spider::core + #endif // SPIDER_CORE_BOOSTASIO_HPP diff --git a/src/spider/io/Serializer.hpp b/src/spider/io/Serializer.hpp index 2e9dfcf..517bc75 100644 --- a/src/spider/io/Serializer.hpp +++ b/src/spider/io/Serializer.hpp @@ -41,13 +41,16 @@ struct msgpack::adaptor::pack { } }; -template -concept SerializableImpl = requires(T t) { +template +concept Packable = requires(Buffer buffer, T t) { { - msgpack::pack(msgpack::sbuffer{}, t) + msgpack::pack(buffer, t) }; }; +template +concept SerializableImpl = Packable; + template concept DeSerializableImpl = requires(T t) { { diff --git a/src/spider/scheduler/SchedulerServer.cpp b/src/spider/scheduler/SchedulerServer.cpp index 110cfc4..7339497 100644 --- a/src/spider/scheduler/SchedulerServer.cpp +++ b/src/spider/scheduler/SchedulerServer.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -16,6 +17,7 @@ #include "../io/Serializer.hpp" // IWYU pragma: keep #include "../storage/DataStorage.hpp" #include "../storage/MetadataStorage.hpp" +#include "../utils/StopToken.hpp" #include "SchedulerMessage.hpp" #include "SchedulerPolicy.hpp" @@ -25,47 +27,75 @@ SchedulerServer::SchedulerServer( unsigned short const port, std::shared_ptr policy, std::shared_ptr metadata_store, - std::shared_ptr data_store + std::shared_ptr data_store, + core::StopToken& stop_token ) - : m_acceptor{m_context, {boost::asio::ip::tcp::v4(), port}}, + : m_port{port}, m_policy{std::move(policy)}, m_metadata_store{std::move(metadata_store)}, - m_data_store{std::move(data_store)} { - // Ignore the returned future as we do not need its value - boost::asio::co_spawn(m_context, receive_message(), boost::asio::use_future); + m_data_store{std::move(data_store)}, + m_stop_token{stop_token} { + boost::asio::co_spawn(m_context, receive_message(), boost::asio::detached); + std::lock_guard const lock{m_mutex}; + m_thread = std::make_unique([&] { m_context.run(); }); +} + +auto SchedulerServer::pause() -> void { + std::lock_guard const lock{m_mutex}; + if (m_thread == nullptr) { + return; + } + m_context.stop(); + m_thread->join(); + m_thread = nullptr; } -auto SchedulerServer::run() -> void { - m_context.run(); +auto SchedulerServer::resume() -> void { + std::lock_guard const lock{m_mutex}; + if (m_thread != nullptr) { + return; + } + m_thread = std::make_unique([&] { + m_context.restart(); + m_context.run(); + }); } auto SchedulerServer::stop() -> void { - m_context.stop(); std::lock_guard const lock{m_mutex}; - m_stop = true; + if (m_thread == nullptr) { + return; + } + m_context.stop(); + m_thread->join(); + m_thread = nullptr; } auto SchedulerServer::receive_message() -> boost::asio::awaitable { - while (!should_stop()) { - // std::unique_ptr socket - // = std::make_unique(m_context); - boost::asio::ip::tcp::socket socket{m_context}; - auto const& [ec] = co_await m_acceptor.async_accept( - socket, - boost::asio::as_tuple(boost::asio::use_awaitable) - ); - if (ec) { - spdlog::error("Cannot accept connection {}: {}", ec.value(), ec.what()); - continue; + try { + boost::asio::ip::tcp::acceptor acceptor{m_context, {boost::asio::ip::tcp::v4(), m_port}}; + while (true) { + boost::asio::ip::tcp::socket socket{m_context}; + auto const& [ec] = co_await acceptor.async_accept( + socket, + boost::asio::as_tuple(boost::asio::use_awaitable) + ); + if (ec) { + spdlog::error("Cannot accept connection {}: {}", ec.value(), ec.what()); + continue; + } + boost::asio::co_spawn( + m_context, + process_message(std::move(socket)), + boost::asio::detached + ); } - // Ignore the returned future as we do not need its value - boost::asio::co_spawn( - m_context, - process_message(std::move(socket)), - boost::asio::use_future - ); + co_return; + } catch (boost::system::system_error& e) { + spdlog::error("Fail to accept connection: {}", e.what()); + m_stop_token.request_stop(); + co_return; } - co_return; } namespace { @@ -125,9 +155,4 @@ auto SchedulerServer::process_message(boost::asio::ip::tcp::socket socket co_return; } -auto SchedulerServer::should_stop() -> bool { - std::lock_guard const lock{m_mutex}; - return m_stop; -} - } // namespace spider::scheduler diff --git a/src/spider/scheduler/SchedulerServer.hpp b/src/spider/scheduler/SchedulerServer.hpp index 1457b6d..564cb44 100644 --- a/src/spider/scheduler/SchedulerServer.hpp +++ b/src/spider/scheduler/SchedulerServer.hpp @@ -3,10 +3,12 @@ #include #include +#include #include "../io/BoostAsio.hpp" // IWYU pragma: keep #include "../storage/DataStorage.hpp" #include "../storage/MetadataStorage.hpp" +#include "../utils/StopToken.hpp" #include "SchedulerPolicy.hpp" namespace spider::scheduler { @@ -24,13 +26,12 @@ class SchedulerServer { unsigned short port, std::shared_ptr policy, std::shared_ptr metadata_store, - std::shared_ptr data_store + std::shared_ptr data_store, + core::StopToken& stop_token ); - /** - * Run the server loop. This function blocks until stop is called. - */ - auto run() -> void; + auto pause() -> void; + auto resume() -> void; auto stop() -> void; @@ -39,17 +40,17 @@ class SchedulerServer { auto process_message(boost::asio::ip::tcp::socket socket) -> boost::asio::awaitable; - auto should_stop() -> bool; - + unsigned short m_port; std::shared_ptr m_policy; std::shared_ptr m_metadata_store; std::shared_ptr m_data_store; boost::asio::io_context m_context; - boost::asio::ip::tcp::acceptor m_acceptor; std::mutex m_mutex; - bool m_stop = false; + std::unique_ptr m_thread; + + core::StopToken& m_stop_token; }; } // namespace spider::scheduler diff --git a/src/spider/scheduler/scheduler.cpp b/src/spider/scheduler/scheduler.cpp index 3e4ee1d..e72b791 100644 --- a/src/spider/scheduler/scheduler.cpp +++ b/src/spider/scheduler/scheduler.cpp @@ -1,2 +1,237 @@ -auto main(int argc, char** argv) -> int {} +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include // IWYU pragma: keep +#include + +#include "../core/Driver.hpp" +#include "../core/Error.hpp" +#include "../io/BoostAsio.hpp" // IWYU pragma: keep +#include "../storage/DataStorage.hpp" +#include "../storage/MetadataStorage.hpp" +#include "../storage/MysqlStorage.hpp" +#include "../utils/StopToken.hpp" +#include "FifoPolicy.hpp" +#include "SchedulerPolicy.hpp" +#include "SchedulerServer.hpp" + +constexpr int cCmdArgParseErr = 1; +constexpr int cStorageConnectionErr = 2; +constexpr int cSchedulerAddrErr = 3; +constexpr int cStorageErr = 4; + +constexpr int cCleanupInterval = 5; +constexpr int cRetryCount = 5; + +namespace { +auto parse_args(int const argc, char** argv) -> boost::program_options::variables_map { + boost::program_options::options_description desc; + desc.add_options()("help", "spider scheduler"); + desc.add_options()( + "port", + boost::program_options::value(), + "port to listen on" + ); + desc.add_options()( + "storage_url", + boost::program_options::value(), + "storage server url" + ); + + boost::program_options::variables_map variables; + boost::program_options::store( + // NOLINTNEXTLINE(misc-include-cleaner) + boost::program_options::parse_command_line(argc, argv, desc), + variables + ); + boost::program_options::notify(variables); + return variables; +} + +auto heartbeat_loop( + std::shared_ptr const& metadata_store, + spider::core::Scheduler const& scheduler, + spider::core::StopToken& stop_token +) -> void { + int fail_count = 0; + while (!stop_token.stop_requested()) { + std::this_thread::sleep_for(std::chrono::seconds(1)); + spdlog::debug("Updating heartbeat"); + spider::core::StorageErr const err = metadata_store->update_heartbeat(scheduler.get_id()); + if (!err.success()) { + spdlog::error("Failed to update scheduler heartbeat: {}", err.description); + fail_count++; + } else { + fail_count = 0; + } + if (fail_count >= cRetryCount - 1) { + stop_token.request_stop(); + break; + } + } +} + +auto cleanup_loop( + std::shared_ptr const& metadata_store, + std::shared_ptr const& data_store, + spider::scheduler::SchedulerServer& server, + std::shared_ptr const& policy, + spider::core::Scheduler const& scheduler, + spider::core::StopToken& stop_token +) -> void { + while (!stop_token.stop_requested()) { + std::this_thread::sleep_for(std::chrono::seconds(cCleanupInterval)); + spdlog::debug("Starting cleanup"); + spider::core::StorageErr err + = metadata_store->set_scheduler_state(scheduler.get_id(), "gc"); + if (!err.success()) { + spdlog::error("Failed to set scheduler state to gc: {}", err.description); + continue; + } + server.pause(); + policy->cleanup(); + data_store->remove_dangling_data(); + server.resume(); + for (size_t i = 0; i < cRetryCount; ++i) { + err = metadata_store->set_scheduler_state(scheduler.get_id(), "normal"); + if (!err.success()) { + spdlog::error("Failed to set scheduler state to normal: {}", err.description); + if (i >= cRetryCount - 1) { + stop_token.request_stop(); + return; + } + } else { + break; + } + } + spdlog::debug("Finished cleanup"); + } +} +} // namespace + +// NOLINTNEXTLINE(bugprone-exception-escape) +auto main(int argc, char** argv) -> int { + // Set up spdlog to write to stderr + // NOLINTNEXTLINE(misc-include-cleaner) + spdlog::set_pattern("[%Y-%m-%d %H:%M:%S.%e] [%^%l%$] [spider][scheduler] %v"); +#ifndef NDEBUG + spdlog::set_level(spdlog::level::trace); +#endif + + boost::program_options::variables_map const args = parse_args(argc, argv); + + unsigned short port = 0; + std::string storage_url; + try { + if (!args.contains("port")) { + spdlog::error("port is required"); + return cCmdArgParseErr; + } + port = args["port"].as(); + if (!args.contains("storage_url")) { + spdlog::error("storage_url is required"); + return cCmdArgParseErr; + } + storage_url = args["storage_url"].as(); + } catch (boost::bad_any_cast& e) { + return cCmdArgParseErr; + } catch (boost::program_options::error& e) { + return cCmdArgParseErr; + } + + // Create storages + std::shared_ptr const metadata_store + = std::make_shared(); + spider::core::StorageErr err = metadata_store->connect(storage_url); + if (!err.success()) { + spdlog::error("Failed to connect to storage server: {}", err.description); + return cStorageConnectionErr; + } + std::shared_ptr const data_store + = std::make_shared(); + err = data_store->connect(storage_url); + if (!err.success()) { + spdlog::error("Failed to connect to storage server: {}", err.description); + return cStorageConnectionErr; + } + + // Initialize storages + err = metadata_store->initialize(); + if (!err.success()) { + spdlog::error("Failed to initialize metadata storage: {}", err.description); + return cStorageErr; + } + err = data_store->initialize(); + if (!err.success()) { + spdlog::error("Failed to initialize data storage: {}", err.description); + return cStorageErr; + } + + // Get scheduler id and addr + boost::uuids::random_generator gen; + boost::uuids::uuid const scheduler_id = gen(); + std::optional const optional_scheduler_addr = spider::core::get_address(); + if (!optional_scheduler_addr.has_value()) { + spdlog::error("Failed to get scheduler address"); + return cSchedulerAddrErr; + } + std::string const& scheduler_addr = optional_scheduler_addr.value(); + + // Start scheduler server + spider::core::StopToken stop_token; + std::shared_ptr const policy + = std::make_shared(); + spider::scheduler::SchedulerServer server{port, policy, metadata_store, data_store, stop_token}; + + // Register scheduler with storage + spider::core::Scheduler const scheduler{scheduler_id, scheduler_addr, port}; + err = metadata_store->add_scheduler(scheduler); + if (!err.success()) { + spdlog::error("Failed to register scheduler with storage server: {}", err.description); + return cStorageErr; + } + + try { + // Start a thread that periodically updates the scheduler's heartbeat + std::thread heartbeat_thread{ + heartbeat_loop, + std::cref(metadata_store), + std::ref(scheduler), + std::ref(stop_token), + }; + + // Start a thread that periodically starts cleanup + std::thread cleanup_thread{ + cleanup_loop, + std::cref(metadata_store), + std::cref(data_store), + std::ref(server), + std::cref(policy), + std::cref(scheduler), + std::ref(stop_token) + }; + + heartbeat_thread.join(); + cleanup_thread.join(); + server.stop(); + } catch (std::system_error& e) { + spdlog::error("Failed to join thread: {}", e.what()); + } + + return 0; +} diff --git a/src/spider/utils/StopToken.hpp b/src/spider/utils/StopToken.hpp new file mode 100644 index 0000000..28abb81 --- /dev/null +++ b/src/spider/utils/StopToken.hpp @@ -0,0 +1,22 @@ +#ifndef SPIDER_UTILS_STOPTOKEN_HPP +#define SPIDER_UTILS_STOPTOKEN_HPP + +#include + +namespace spider::core { +class StopToken { +public: + StopToken() : m_stop{false} {} + + auto request_stop() -> void { m_stop = true; } + + [[nodiscard]] auto stop_requested() const -> bool { return m_stop; } + + auto reset() -> void { m_stop = false; } + +private: + std::atomic m_stop; +}; +} // namespace spider::core + +#endif diff --git a/tests/scheduler/test-SchedulerServer.cpp b/tests/scheduler/test-SchedulerServer.cpp index dc88a0d..775fbae 100644 --- a/tests/scheduler/test-SchedulerServer.cpp +++ b/tests/scheduler/test-SchedulerServer.cpp @@ -1,7 +1,6 @@ // NOLINTBEGIN(cert-err58-cpp,cppcoreguidelines-avoid-do-while,readability-function-cognitive-complexity,cppcoreguidelines-avoid-non-const-global-variables,cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays,clang-analyzer-optin.core.EnumCastOutOfRange) #include #include -#include #include #include #include @@ -22,6 +21,7 @@ #include "../../src/spider/scheduler/SchedulerServer.hpp" #include "../../src/spider/storage/DataStorage.hpp" #include "../../src/spider/storage/MetadataStorage.hpp" +#include "../../src/spider/utils/StopToken.hpp" #include "../storage/StorageTestHelper.hpp" namespace { @@ -44,10 +44,13 @@ TEMPLATE_LIST_TEST_CASE( = std::make_shared(); constexpr unsigned short cPort = 6021; - spider::scheduler::SchedulerServer server{cPort, policy, metadata_store, data_store}; + spider::core::StopToken stop_token; + spider::scheduler::SchedulerServer + server{cPort, policy, metadata_store, data_store, stop_token}; - // Start server in another thread - std::thread thread{[&]() { server.run(); }}; + // Pause and resume server + server.pause(); + server.resume(); // Create client socket boost::asio::io_context context; @@ -87,7 +90,6 @@ TEMPLATE_LIST_TEST_CASE( } socket.close(); server.stop(); - thread.join(); } } // namespace