Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add scheduler scheduling #31

Merged
merged 14 commits into from
Dec 5, 2024
22 changes: 21 additions & 1 deletion src/spider/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ set(SPIDER_CORE_HEADERS
core/Data.hpp
core/Task.hpp
core/TaskGraph.hpp
core/JobMetadata.hpp
core/Serializer.hpp
core/BoostAsio.hpp
core/MsgPack.hpp
Expand All @@ -28,7 +29,7 @@ target_sources(spider_core PUBLIC ${SPIDER_CORE_HEADERS})
target_link_libraries(
spider_core
PUBLIC
Boost::boost
Boost::headers
absl::flat_hash_map
MariaDBClientCpp::MariaDBClientCpp
msgpack-cxx
Expand Down Expand Up @@ -84,6 +85,25 @@ target_link_libraries(
spdlog::spdlog
)

set(SPIDER_SCHEDULER_SOURCES
scheduler/SchedulerPolicy.hpp
scheduler/FifoPolicy.cpp
scheduler/FifoPolicy.hpp
CACHE INTERNAL
"spider scheduler source files"
)
add_executable(spider_scheduler)
target_sources(spider_scheduler PRIVATE ${SPIDER_SCHEDULER_SOURCES})
target_sources(spider_scheduler PRIVATE scheduler/scheduler.cpp)
target_link_libraries(spider_scheduler PRIVATE spider_core)
target_link_libraries(
spider_scheduler
PRIVATE
Boost::headers
absl::flat_hash_map
spdlog::spdlog
)

set(SPIDER_CLIENT_SHARED_SOURCES CACHE INTERNAL "spider client shared source files")

set(SPIDER_CLIENT_SHARED_HEADERS
Expand Down
39 changes: 39 additions & 0 deletions src/spider/core/JobMetadata.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#ifndef SPIDER_CORE_JOBMETADATA_HPP
#define SPIDER_CORE_JOBMETADATA_HPP

#include <chrono>

#include <boost/uuid/uuid.hpp>

namespace spider::core {

class JobMetadata {
public:
JobMetadata() = default;

JobMetadata(
boost::uuids::uuid id,
boost::uuids::uuid client_id,
std::chrono::system_clock::time_point creation_time
)
: m_id{id},
m_client_id{client_id},
m_creation_time{creation_time} {}

[[nodiscard]] auto get_id() -> boost::uuids::uuid { return m_id; }

[[nodiscard]] auto get_client_id() -> boost::uuids::uuid { return m_client_id; }

[[nodiscard]] auto get_creation_time() -> std::chrono::system_clock::time_point {
return m_creation_time;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Mark getter methods as const

Marking the getter methods as const ensures they do not modify the object's state and conveys the intended usage.

Apply this diff to make the methods const:

 [[nodiscard]] auto get_id() -> boost::uuids::uuid {
+    return m_id;
+}

 [[nodiscard]] auto get_client_id() -> boost::uuids::uuid {
+    return m_client_id;
 }

 [[nodiscard]] auto get_creation_time() -> std::chrono::system_clock::time_point {
     return m_creation_time;
 }

After modification:

[[nodiscard]] auto get_id() const -> boost::uuids::uuid { return m_id; }

[[nodiscard]] auto get_client_id() const -> boost::uuids::uuid { return m_client_id; }

[[nodiscard]] auto get_creation_time() const -> std::chrono::system_clock::time_point {
    return m_creation_time;
}


private:
boost::uuids::uuid m_id;
boost::uuids::uuid m_client_id;
std::chrono::system_clock::time_point m_creation_time;
};

} // namespace spider::core

#endif // SPIDER_CORE_JOBMETADATA_HPP
125 changes: 125 additions & 0 deletions src/spider/scheduler/FifoPolicy.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
#include "FifoPolicy.hpp"

#include <algorithm>
#include <chrono>
#include <memory>
#include <optional>
#include <stdexcept>
#include <string>
#include <vector>

#include <absl/container/flat_hash_map.h>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <fmt/format.h>

#include "../core/Data.hpp"
#include "../core/JobMetadata.hpp"
#include "../core/Task.hpp"
#include "../storage/DataStorage.hpp"
#include "../storage/MetadataStorage.hpp"

namespace {

auto task_locality_satisfied(
std::shared_ptr<spider::core::DataStorage> const& data_store,
spider::core::Task const& task,
std::string const& addr
) -> bool {
for (auto const& input : task.get_inputs()) {
if (input.get_value().has_value()) {
continue;
}
std::optional<boost::uuids::uuid> optional_data_id = input.get_data_id();
if (!optional_data_id.has_value()) {
continue;
}
boost::uuids::uuid const data_id = optional_data_id.value();
spider::core::Data data;
if (false == data_store->get_data(data_id, &data).success()) {
throw std::runtime_error(
fmt::format("Data with id {} not exists.", boost::uuids::to_string((data_id)))
);
}
if (false == data.is_hard_locality()) {
continue;
}
std::vector<std::string> const& locality = data.get_locality();
if (locality.empty()) {
continue;
}
if (std::ranges::find(locality, addr) == locality.end()) {
return false;
}
}
return true;
}

} // namespace

namespace spider::scheduler {

auto FifoPolicy::schedule_next(
std::shared_ptr<core::MetadataStorage> metadata_store,
std::shared_ptr<core::DataStorage> data_store,
boost::uuids::uuid const /*worker_id*/,
std::string const& worker_addr
) -> std::optional<boost::uuids::uuid> {
std::vector<core::Task> ready_tasks;
metadata_store->get_ready_tasks(&ready_tasks);

std::erase_if(ready_tasks, [data_store, worker_addr](core::Task const& task) -> bool {
return !task_locality_satisfied(data_store, task, worker_addr);
});

if (ready_tasks.empty()) {
return std::nullopt;
}

auto const earliest_task = std::ranges::min_element(
ready_tasks,
{},
[this,
metadata_store](core::Task const& task) -> std::chrono::system_clock::time_point {
boost::uuids::uuid const task_id = task.get_id();
boost::uuids::uuid job_id;
if (m_task_job_map.contains(task_id)) {
job_id = m_task_job_map[task_id];
} else {
if (false == metadata_store->get_task_job_id(task_id, &job_id).success()) {
throw std::runtime_error(fmt::format(
"Task with id {} not exists.",
boost::uuids::to_string(task_id)
));
}
m_task_job_map.emplace(task_id, job_id);
}

if (m_job_time_map.contains(job_id)) {
return m_job_time_map[job_id];
}

core::JobMetadata job_metadata;
if (false == metadata_store->get_job_metadata(job_id, &job_metadata).success()) {
throw std::runtime_error(fmt::format(
"Job with id {} not exists.",
boost::uuids::to_string(job_id)
));
}
m_job_time_map.emplace(job_id, job_metadata.get_creation_time());
return job_metadata.get_creation_time();
}
);

return earliest_task->get_id();
}

auto FifoPolicy::cleanup_job(boost::uuids::uuid const job_id) -> void {
absl::erase_if(m_task_job_map, [&job_id](auto const& item) -> bool {
auto const& [item_task_id, item_job_id] = item;
return item_job_id == job_id;
});
m_job_time_map.erase(job_id);
}

} // namespace spider::scheduler
35 changes: 35 additions & 0 deletions src/spider/scheduler/FifoPolicy.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#ifndef SPIDER_SCHEDULER_FIFOPOLICY_HPP
#define SPIDER_SCHEDULER_FIFOPOLICY_HPP

#include <chrono>
#include <memory>
#include <optional>
#include <string>

#include <absl/container/flat_hash_map.h>
#include <boost/uuid/uuid.hpp>

#include "../storage/DataStorage.hpp"
#include "../storage/MetadataStorage.hpp"
#include "SchedulerPolicy.hpp"

namespace spider::scheduler {

class FifoPolicy final : public SchedulerPolicy {
public:
auto schedule_next(
std::shared_ptr<core::MetadataStorage> metadata_store,
std::shared_ptr<core::DataStorage> data_store,
boost::uuids::uuid worker_id,
std::string const& worker_addr
) -> std::optional<boost::uuids::uuid> override;
auto cleanup_job(boost::uuids::uuid job_id) -> void override;

private:
absl::flat_hash_map<boost::uuids::uuid, boost::uuids::uuid> m_task_job_map;
absl::flat_hash_map<boost::uuids::uuid, std::chrono::system_clock::time_point> m_job_time_map;
};

} // namespace spider::scheduler

#endif // SPIDER_SCHEDULER_FIFOPOLICY_HPP
35 changes: 35 additions & 0 deletions src/spider/scheduler/SchedulerPolicy.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#ifndef SPIDER_SCHEDULER_SCHEDULERPOLICY_HPP
#define SPIDER_SCHEDULER_SCHEDULERPOLICY_HPP

#include <memory>
#include <optional>
#include <string>

#include <boost/uuid/uuid.hpp>

#include "../storage/DataStorage.hpp"
#include "../storage/MetadataStorage.hpp"

namespace spider::scheduler {
class SchedulerPolicy {
public:
SchedulerPolicy() = default;
SchedulerPolicy(SchedulerPolicy const&) = default;
auto operator=(SchedulerPolicy const&) -> SchedulerPolicy& = default;
SchedulerPolicy(SchedulerPolicy&&) = default;
auto operator=(SchedulerPolicy&&) -> SchedulerPolicy& = default;
virtual ~SchedulerPolicy() = default;

virtual auto schedule_next(
std::shared_ptr<core::MetadataStorage> metadata_store,
std::shared_ptr<core::DataStorage> data_store,
boost::uuids::uuid worker_id,
std::string const& worker_addr
) -> std::optional<boost::uuids::uuid> = 0;

virtual auto cleanup_job(boost::uuids::uuid job_id) -> void = 0;
};

} // namespace spider::scheduler

#endif // SPIDER_SCHEDULER_SCHEDULERPOLICY_HPP
2 changes: 2 additions & 0 deletions src/spider/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@

auto main(int argc, char** argv) -> int {}
4 changes: 4 additions & 0 deletions src/spider/storage/MetadataStorage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <boost/uuid/uuid.hpp>

#include "../core/Error.hpp"
#include "../core/JobMetadata.hpp"
#include "../core/Task.hpp"
#include "../core/TaskGraph.hpp"

Expand All @@ -32,6 +33,7 @@ class MetadataStorage {
virtual auto
add_job(boost::uuids::uuid job_id, boost::uuids::uuid client_id, TaskGraph const& task_graph
) -> StorageErr = 0;
virtual auto get_job_metadata(boost::uuids::uuid id, JobMetadata* job) -> StorageErr = 0;
virtual auto get_task_graph(boost::uuids::uuid id, TaskGraph* task_graph) -> StorageErr = 0;
virtual auto get_jobs_by_client_id(
boost::uuids::uuid client_id,
Expand All @@ -40,6 +42,8 @@ class MetadataStorage {
virtual auto remove_job(boost::uuids::uuid id) -> StorageErr = 0;
virtual auto add_child(boost::uuids::uuid parent_id, Task const& child) -> StorageErr = 0;
virtual auto get_task(boost::uuids::uuid id, Task* task) -> StorageErr = 0;
virtual auto get_task_job_id(boost::uuids::uuid id, boost::uuids::uuid* job_id) -> StorageErr
= 0;
virtual auto get_ready_tasks(std::vector<Task>* tasks) -> StorageErr = 0;
virtual auto set_task_state(boost::uuids::uuid id, TaskState state) -> StorageErr = 0;
virtual auto add_task_instance(TaskInstance const& instance) -> StorageErr = 0;
Expand Down
Loading
Loading