Skip to content

Commit

Permalink
feat: Add scheduler server that listens and responds to schedule task…
Browse files Browse the repository at this point in the history
… requests (#33)
  • Loading branch information
sitaowang1998 authored Dec 8, 2024
1 parent 411924e commit 083b26f
Show file tree
Hide file tree
Showing 10 changed files with 364 additions and 12 deletions.
3 changes: 3 additions & 0 deletions src/spider/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ set(SPIDER_SCHEDULER_SOURCES
scheduler/SchedulerPolicy.hpp
scheduler/FifoPolicy.cpp
scheduler/FifoPolicy.hpp
scheduler/SchedulerMessage.hpp
scheduler/SchedulerServer.cpp
scheduler/SchedulerServer.hpp
CACHE INTERNAL
"spider scheduler source files"
)
Expand Down
56 changes: 56 additions & 0 deletions src/spider/scheduler/SchedulerMessage.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#ifndef SPIDER_SCHEDULER_SCHEDULERMESSAGE_HPP
#define SPIDER_SCHEDULER_SCHEDULERMESSAGE_HPP

#include <optional>
#include <string>
#include <utility>

#include <boost/uuid/uuid.hpp>

#include "../io/MsgPack.hpp" // IWYU pragma: keep
#include "../io/Serializer.hpp" // IWYU pragma: keep

namespace spider::scheduler {

class ScheduleTaskRequest {
public:
/**
* Default constructor for msgpack. Do __not__ use it directly.
*/
ScheduleTaskRequest() = default;

ScheduleTaskRequest(boost::uuids::uuid const worker_id, std::string addr)
: m_worker_id{worker_id},
m_worker_addr{std::move(addr)} {}

[[nodiscard]] auto get_worker_id() const -> boost::uuids::uuid { return m_worker_id; }

[[nodiscard]] auto get_worker_addr() const -> std::string const& { return m_worker_addr; }

MSGPACK_DEFINE_ARRAY(m_worker_id, m_worker_addr);

private:
boost::uuids::uuid m_worker_id;
std::string m_worker_addr;
};

class ScheduleTaskResponse {
public:
ScheduleTaskResponse() = default;

explicit ScheduleTaskResponse(boost::uuids::uuid const task_id) : m_task_id{task_id} {}

[[nodiscard]] auto has_task_id() const -> bool { return m_task_id.has_value(); }

// NOLINTNEXTLINE(bugprone-unchecked-optional-access)
[[nodiscard]] auto get_task_id() const -> boost::uuids::uuid { return m_task_id.value(); }

MSGPACK_DEFINE_ARRAY(m_task_id);

private:
std::optional<boost::uuids::uuid> m_task_id = std::nullopt;
};

} // namespace spider::scheduler

#endif // SPIDER_SCHEDULER_SCHEDULERMESSAGE_HPP
133 changes: 133 additions & 0 deletions src/spider/scheduler/SchedulerServer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
#include "SchedulerServer.hpp"

#include <memory>
#include <mutex>
#include <optional>
#include <stdexcept>
#include <utility>

#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <spdlog/spdlog.h>

#include "../io/BoostAsio.hpp" // IWYU pragma: keep
#include "../io/MsgPack.hpp" // IWYU pragma: keep
#include "../io/msgpack_message.hpp"
#include "../io/Serializer.hpp" // IWYU pragma: keep
#include "../storage/DataStorage.hpp"
#include "../storage/MetadataStorage.hpp"
#include "SchedulerMessage.hpp"
#include "SchedulerPolicy.hpp"

namespace spider::scheduler {

SchedulerServer::SchedulerServer(
unsigned short const port,
std::shared_ptr<SchedulerPolicy> policy,
std::shared_ptr<core::MetadataStorage> metadata_store,
std::shared_ptr<core::DataStorage> data_store
)
: m_acceptor{m_context, {boost::asio::ip::tcp::v4(), port}},
m_policy{std::move(policy)},
m_metadata_store{std::move(metadata_store)},
m_data_store{std::move(data_store)} {
// Ignore the returned future as we do not need its value
boost::asio::co_spawn(m_context, receive_message(), boost::asio::use_future);
}

auto SchedulerServer::run() -> void {
m_context.run();
}

auto SchedulerServer::stop() -> void {
m_context.stop();
std::lock_guard const lock{m_mutex};
m_stop = true;
}

auto SchedulerServer::receive_message() -> boost::asio::awaitable<void> {
while (!should_stop()) {
// std::unique_ptr<boost::asio::ip::tcp::socket> socket
// = std::make_unique<boost::asio::ip::tcp::socket>(m_context);
boost::asio::ip::tcp::socket socket{m_context};
auto const& [ec] = co_await m_acceptor.async_accept(
socket,
boost::asio::as_tuple(boost::asio::use_awaitable)
);
if (ec) {
spdlog::error("Cannot accept connection {}: {}", ec.value(), ec.what());
continue;
}
// Ignore the returned future as we do not need its value
boost::asio::co_spawn(
m_context,
process_message(std::move(socket)),
boost::asio::use_future
);
}
co_return;
}

namespace {
auto deserialize_message(msgpack::sbuffer const& buffer) -> std::optional<ScheduleTaskRequest> {
try {
msgpack::object_handle const handle = msgpack::unpack(buffer.data(), buffer.size());
msgpack::object const object = handle.get();
return object.as<ScheduleTaskRequest>();
} catch (std::runtime_error& e) {
spdlog::error("Cannot unpack message to ScheduleTaskRequest: {}", e.what());
return std::nullopt;
}
}
} // namespace

auto SchedulerServer::process_message(boost::asio::ip::tcp::socket socket
) -> boost::asio::awaitable<void> {
// NOLINTBEGIN(clang-analyzer-core.CallAndMessage)
std::optional<msgpack::sbuffer> const& optional_message_buffer
= co_await core::receive_message_async(socket);
// NOLINTEND(clang-analyzer-core.CallAndMessage)

if (false == optional_message_buffer.has_value()) {
spdlog::error("Cannot receive message from worker");
co_return;
}
msgpack::sbuffer const& message_buffer = optional_message_buffer.value();
std::optional<ScheduleTaskRequest> const& optional_request
= deserialize_message(message_buffer);
if (false == optional_request.has_value()) {
spdlog::error("Cannot parse message into schedule task request");
co_return;
}
ScheduleTaskRequest const& request = optional_request.value();

std::optional<boost::uuids::uuid> const task_id = m_policy->schedule_next(
m_metadata_store,
m_data_store,
request.get_worker_id(),
request.get_worker_addr()
);
ScheduleTaskResponse response{};
if (task_id.has_value()) {
response = ScheduleTaskResponse{task_id.value()};
}
msgpack::sbuffer response_buffer;
msgpack::pack(response_buffer, response);

bool const success = co_await core::send_message_async(socket, response_buffer);
if (!success) {
spdlog::error(
"Cannot send message to worker {} at {}",
boost::uuids::to_string(request.get_worker_id()),
request.get_worker_addr()
);
}
co_return;
}

auto SchedulerServer::should_stop() -> bool {
std::lock_guard const lock{m_mutex};
return m_stop;
}

} // namespace spider::scheduler
57 changes: 57 additions & 0 deletions src/spider/scheduler/SchedulerServer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#ifndef SPIDER_SCHEDULER_SCHEDULERSERVER_HPP
#define SPIDER_SCHEDULER_SCHEDULERSERVER_HPP

#include <memory>
#include <mutex>

#include "../io/BoostAsio.hpp" // IWYU pragma: keep
#include "../storage/DataStorage.hpp"
#include "../storage/MetadataStorage.hpp"
#include "SchedulerPolicy.hpp"

namespace spider::scheduler {

class SchedulerServer {
public:
// Delete copy & move constructor and assignment operator
SchedulerServer(SchedulerServer const&) = delete;
auto operator=(SchedulerServer const&) -> SchedulerServer& = delete;
SchedulerServer(SchedulerServer&&) = delete;
auto operator=(SchedulerServer&&) noexcept -> SchedulerServer& = delete;
~SchedulerServer() = default;

SchedulerServer(
unsigned short port,
std::shared_ptr<SchedulerPolicy> policy,
std::shared_ptr<core::MetadataStorage> metadata_store,
std::shared_ptr<core::DataStorage> data_store
);

/**
* Run the server loop. This function blocks until stop is called.
*/
auto run() -> void;

auto stop() -> void;

private:
auto receive_message() -> boost::asio::awaitable<void>;

auto process_message(boost::asio::ip::tcp::socket socket) -> boost::asio::awaitable<void>;

auto should_stop() -> bool;

std::shared_ptr<SchedulerPolicy> m_policy;
std::shared_ptr<core::MetadataStorage> m_metadata_store;
std::shared_ptr<core::DataStorage> m_data_store;

boost::asio::io_context m_context;
boost::asio::ip::tcp::acceptor m_acceptor;

std::mutex m_mutex;
bool m_stop = false;
};

} // namespace spider::scheduler

#endif // SPIDER_SCHEDULER_SCHEDULERSERVER_HPP
11 changes: 11 additions & 0 deletions src/spider/storage/MysqlStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,17 @@ auto MySqlMetadataStorage::add_job(
dep_statement->setBytes(2, &child_id_bytes);
dep_statement->executeUpdate();
}

// Mark head tasks as ready
for (boost::uuids::uuid const& task_id : task_graph.get_head_tasks()) {
std::unique_ptr<sql::PreparedStatement> statement(
m_conn->prepareStatement("UPDATE `tasks` SET `state` = 'ready' WHERE `id` = ?")
);
sql::bytes task_id_bytes = uuid_get_bytes(task_id);
statement->setBytes(1, &task_id_bytes);
statement->executeUpdate();
}

} catch (sql::SQLException& e) {
m_conn->rollback();
if (e.getErrorCode() == ErDupKey || e.getErrorCode() == ErDupEntry) {
Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ set(SPIDER_TEST_SOURCES
worker/test-TaskExecutor.cpp
io/test-MsgpackMessage.cpp
scheduler/test-SchedulerPolicy.cpp
scheduler/test-SchedulerServer.cpp
CACHE INTERNAL
"spider test source files"
)
Expand Down
8 changes: 0 additions & 8 deletions tests/scheduler/test-SchedulerPolicy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,12 @@ TEMPLATE_LIST_TEST_CASE(
graph_1.add_task(task_1);
boost::uuids::uuid const job_id_1 = gen();
REQUIRE(metadata_store->add_job(job_id_1, client_id, graph_1).success());
REQUIRE(metadata_store->set_task_state(task_1.get_id(), spider::core::TaskState::Ready)
.success());
std::this_thread::sleep_for(std::chrono::seconds(1));
spider::core::Task const task_2{"task_2"};
spider::core::TaskGraph graph_2;
graph_2.add_task(task_2);
boost::uuids::uuid const job_id_2 = gen();
REQUIRE(metadata_store->add_job(job_id_2, client_id, graph_2).success());
REQUIRE(metadata_store->set_task_state(task_2.get_id(), spider::core::TaskState::Ready)
.success());

spider::scheduler::FifoPolicy policy;

Expand Down Expand Up @@ -97,8 +93,6 @@ TEMPLATE_LIST_TEST_CASE(
spider::core::TaskGraph graph;
graph.add_task(task);
REQUIRE(metadata_store->add_job(job_id, gen(), graph).success());
REQUIRE(metadata_store->set_task_state(task.get_id(), spider::core::TaskState::Ready).success()
);

spider::scheduler::FifoPolicy policy;
// Schedule with wrong address
Expand Down Expand Up @@ -142,8 +136,6 @@ TEMPLATE_LIST_TEST_CASE(
spider::core::TaskGraph graph;
graph.add_task(task);
REQUIRE(metadata_store->add_job(job_id, gen(), graph).success());
REQUIRE(metadata_store->set_task_state(task.get_id(), spider::core::TaskState::Ready).success()
);

spider::scheduler::FifoPolicy policy;
// Schedule with wrong address
Expand Down
Loading

0 comments on commit 083b26f

Please sign in to comment.