Skip to content

Commit

Permalink
feat: Add scheduler that schedules tasks and garbage collect periodic…
Browse files Browse the repository at this point in the history
…ally (#37)
  • Loading branch information
sitaowang1998 authored Dec 18, 2024
1 parent 38297fc commit 88f6232
Show file tree
Hide file tree
Showing 8 changed files with 376 additions and 50 deletions.
2 changes: 2 additions & 0 deletions src/spider/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ set(SPIDER_SCHEDULER_SOURCES
scheduler/SchedulerMessage.hpp
scheduler/SchedulerServer.cpp
scheduler/SchedulerServer.hpp
utils/StopToken.hpp
CACHE INTERNAL
"spider scheduler source files"
)
Expand All @@ -110,6 +111,7 @@ target_link_libraries(
spider_scheduler
PRIVATE
Boost::headers
Boost::program_options
absl::flat_hash_map
spdlog::spdlog
)
Expand Down
36 changes: 36 additions & 0 deletions src/spider/io/BoostAsio.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#ifndef SPIDER_CORE_BOOSTASIO_HPP
#define SPIDER_CORE_BOOSTASIO_HPP

#include <optional>

// clang-format off
// IWYU pragma: begin_exports

Expand All @@ -20,13 +22,15 @@

#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ip/address.hpp>
#include <boost/asio/ip/host_name.hpp>
#include <boost/asio/impl/connect.hpp>

#include <boost/asio/detached.hpp>
#include <boost/asio/impl/co_spawn.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/asio/use_future.hpp>
#include <boost/asio/executor_work_guard.hpp>

#include <boost/asio/impl/write.hpp>
#include <boost/asio/impl/read.hpp>
Expand All @@ -35,4 +39,36 @@

// IWYU pragma: end_exports
// clang-format on

#include <string>

#include <spdlog/spdlog.h>

namespace spider::core {
inline auto get_address() -> std::optional<std::string> {
try {
boost::asio::io_context io_context;
boost::asio::ip::tcp::resolver resolver(io_context);
auto const endpoints = resolver.resolve(boost::asio::ip::host_name(), "");
for (auto const& endpoint : endpoints) {
if (endpoint.endpoint().address().is_v4()
&& !endpoint.endpoint().address().is_loopback())
{
return endpoint.endpoint().address().to_string();
}
}
// If no non-loopback address found, return loopback address
spdlog::warn("No non-loopback address found, using loopback address");
for (auto const& endpoint : endpoints) {
if (endpoint.endpoint().address().is_v4()) {
return endpoint.endpoint().address().to_string();
}
}
return std::nullopt;
} catch (boost::system::system_error const& e) {
return std::nullopt;
}
}
} // namespace spider::core

#endif // SPIDER_CORE_BOOSTASIO_HPP
9 changes: 6 additions & 3 deletions src/spider/io/Serializer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,16 @@ struct msgpack::adaptor::pack<boost::uuids::uuid> {
}
};

template <class T>
concept SerializableImpl = requires(T t) {
template <class Buffer, class T>
concept Packable = requires(Buffer buffer, T t) {
{
msgpack::pack(msgpack::sbuffer{}, t)
msgpack::pack(buffer, t)
};
};

template <class T>
concept SerializableImpl = Packable<msgpack::sbuffer, T>;

template <class T>
concept DeSerializableImpl = requires(T t) {
{
Expand Down
89 changes: 57 additions & 32 deletions src/spider/scheduler/SchedulerServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <mutex>
#include <optional>
#include <stdexcept>
#include <thread>
#include <utility>

#include <boost/uuid/uuid.hpp>
Expand All @@ -16,6 +17,7 @@
#include "../io/Serializer.hpp" // IWYU pragma: keep
#include "../storage/DataStorage.hpp"
#include "../storage/MetadataStorage.hpp"
#include "../utils/StopToken.hpp"
#include "SchedulerMessage.hpp"
#include "SchedulerPolicy.hpp"

Expand All @@ -25,47 +27,75 @@ SchedulerServer::SchedulerServer(
unsigned short const port,
std::shared_ptr<SchedulerPolicy> policy,
std::shared_ptr<core::MetadataStorage> metadata_store,
std::shared_ptr<core::DataStorage> data_store
std::shared_ptr<core::DataStorage> data_store,
core::StopToken& stop_token
)
: m_acceptor{m_context, {boost::asio::ip::tcp::v4(), port}},
: m_port{port},
m_policy{std::move(policy)},
m_metadata_store{std::move(metadata_store)},
m_data_store{std::move(data_store)} {
// Ignore the returned future as we do not need its value
boost::asio::co_spawn(m_context, receive_message(), boost::asio::use_future);
m_data_store{std::move(data_store)},
m_stop_token{stop_token} {
boost::asio::co_spawn(m_context, receive_message(), boost::asio::detached);
std::lock_guard const lock{m_mutex};
m_thread = std::make_unique<std::thread>([&] { m_context.run(); });
}

auto SchedulerServer::pause() -> void {
std::lock_guard const lock{m_mutex};
if (m_thread == nullptr) {
return;
}
m_context.stop();
m_thread->join();
m_thread = nullptr;
}

auto SchedulerServer::run() -> void {
m_context.run();
auto SchedulerServer::resume() -> void {
std::lock_guard const lock{m_mutex};
if (m_thread != nullptr) {
return;
}
m_thread = std::make_unique<std::thread>([&] {
m_context.restart();
m_context.run();
});
}

auto SchedulerServer::stop() -> void {
m_context.stop();
std::lock_guard const lock{m_mutex};
m_stop = true;
if (m_thread == nullptr) {
return;
}
m_context.stop();
m_thread->join();
m_thread = nullptr;
}

auto SchedulerServer::receive_message() -> boost::asio::awaitable<void> {
while (!should_stop()) {
// std::unique_ptr<boost::asio::ip::tcp::socket> socket
// = std::make_unique<boost::asio::ip::tcp::socket>(m_context);
boost::asio::ip::tcp::socket socket{m_context};
auto const& [ec] = co_await m_acceptor.async_accept(
socket,
boost::asio::as_tuple(boost::asio::use_awaitable)
);
if (ec) {
spdlog::error("Cannot accept connection {}: {}", ec.value(), ec.what());
continue;
try {
boost::asio::ip::tcp::acceptor acceptor{m_context, {boost::asio::ip::tcp::v4(), m_port}};
while (true) {
boost::asio::ip::tcp::socket socket{m_context};
auto const& [ec] = co_await acceptor.async_accept(
socket,
boost::asio::as_tuple(boost::asio::use_awaitable)
);
if (ec) {
spdlog::error("Cannot accept connection {}: {}", ec.value(), ec.what());
continue;
}
boost::asio::co_spawn(
m_context,
process_message(std::move(socket)),
boost::asio::detached
);
}
// Ignore the returned future as we do not need its value
boost::asio::co_spawn(
m_context,
process_message(std::move(socket)),
boost::asio::use_future
);
co_return;
} catch (boost::system::system_error& e) {
spdlog::error("Fail to accept connection: {}", e.what());
m_stop_token.request_stop();
co_return;
}
co_return;
}

namespace {
Expand Down Expand Up @@ -125,9 +155,4 @@ auto SchedulerServer::process_message(boost::asio::ip::tcp::socket socket
co_return;
}

auto SchedulerServer::should_stop() -> bool {
std::lock_guard const lock{m_mutex};
return m_stop;
}

} // namespace spider::scheduler
19 changes: 10 additions & 9 deletions src/spider/scheduler/SchedulerServer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@

#include <memory>
#include <mutex>
#include <thread>

#include "../io/BoostAsio.hpp" // IWYU pragma: keep
#include "../storage/DataStorage.hpp"
#include "../storage/MetadataStorage.hpp"
#include "../utils/StopToken.hpp"
#include "SchedulerPolicy.hpp"

namespace spider::scheduler {
Expand All @@ -24,13 +26,12 @@ class SchedulerServer {
unsigned short port,
std::shared_ptr<SchedulerPolicy> policy,
std::shared_ptr<core::MetadataStorage> metadata_store,
std::shared_ptr<core::DataStorage> data_store
std::shared_ptr<core::DataStorage> data_store,
core::StopToken& stop_token
);

/**
* Run the server loop. This function blocks until stop is called.
*/
auto run() -> void;
auto pause() -> void;
auto resume() -> void;

auto stop() -> void;

Expand All @@ -39,17 +40,17 @@ class SchedulerServer {

auto process_message(boost::asio::ip::tcp::socket socket) -> boost::asio::awaitable<void>;

auto should_stop() -> bool;

unsigned short m_port;
std::shared_ptr<SchedulerPolicy> m_policy;
std::shared_ptr<core::MetadataStorage> m_metadata_store;
std::shared_ptr<core::DataStorage> m_data_store;

boost::asio::io_context m_context;
boost::asio::ip::tcp::acceptor m_acceptor;

std::mutex m_mutex;
bool m_stop = false;
std::unique_ptr<std::thread> m_thread;

core::StopToken& m_stop_token;
};

} // namespace spider::scheduler
Expand Down
Loading

0 comments on commit 88f6232

Please sign in to comment.