From 7b87f9cda3d268ad08713d192fbef504f21255fd Mon Sep 17 00:00:00 2001 From: Sitao Wang Date: Tue, 3 Dec 2024 21:42:30 +0000 Subject: [PATCH 01/14] Add creation time in job metadata --- src/spider/CMakeLists.txt | 22 +++++++++- src/spider/core/JobMetadata.hpp | 39 +++++++++++++++++ src/spider/scheduler/FifoPolicy.cpp | 12 ++++++ src/spider/scheduler/FifoPolicy.hpp | 26 ++++++++++++ src/spider/scheduler/SchedulerPolicy.hpp | 26 ++++++++++++ src/spider/scheduler/scheduler.cpp | 2 + src/spider/storage/MetadataStorage.hpp | 2 + src/spider/storage/MysqlStorage.cpp | 53 +++++++++++++++++++++++- src/spider/storage/MysqlStorage.hpp | 1 + tests/storage/StorageTestHelper.hpp | 11 ++--- tests/storage/test-DataStorage.cpp | 15 +++---- tests/storage/test-MetadataStorage.cpp | 36 ++++++++++------ tests/utils/CoreTaskUtils.cpp | 11 ++--- tests/worker/test-FunctionManager.cpp | 11 ++--- tests/worker/test-MessagePipe.cpp | 15 +++---- tests/worker/test-TaskExecutor.cpp | 17 ++++---- tests/worker/worker-test.cpp | 4 +- 17 files changed, 249 insertions(+), 54 deletions(-) create mode 100644 src/spider/core/JobMetadata.hpp create mode 100644 src/spider/scheduler/FifoPolicy.cpp create mode 100644 src/spider/scheduler/FifoPolicy.hpp create mode 100644 src/spider/scheduler/SchedulerPolicy.hpp create mode 100644 src/spider/scheduler/scheduler.cpp diff --git a/src/spider/CMakeLists.txt b/src/spider/CMakeLists.txt index 616ef71..7a0a360 100644 --- a/src/spider/CMakeLists.txt +++ b/src/spider/CMakeLists.txt @@ -11,6 +11,7 @@ set(SPIDER_CORE_HEADERS core/Data.hpp core/Task.hpp core/TaskGraph.hpp + core/JobMetadata.hpp core/Serializer.hpp core/BoostAsio.hpp core/MsgPack.hpp @@ -28,7 +29,7 @@ target_sources(spider_core PUBLIC ${SPIDER_CORE_HEADERS}) target_link_libraries( spider_core PUBLIC - Boost::boost + Boost::headers absl::flat_hash_map MariaDBClientCpp::MariaDBClientCpp msgpack-cxx @@ -84,6 +85,25 @@ target_link_libraries( spdlog::spdlog ) +set(SPIDER_SCHEDULER_SOURCES + scheduler/SchedulerPolicy.hpp + scheduler/FifoPolicy.cpp + scheduler/FifoPolicy.hpp + CACHE INTERNAL + "spider scheduler source files" +) +add_executable(spider_scheduler) +target_sources(spider_scheduler PRIVATE ${SPIDER_SCHEDULER_SOURCES}) +target_sources(spider_scheduler PRIVATE scheduler/scheduler.cpp) +target_link_libraries(spider_scheduler PRIVATE spider_core) +target_link_libraries( + spider_scheduler + PRIVATE + Boost::headers + absl::flat_hash_map + spdlog::spdlog +) + set(SPIDER_CLIENT_SHARED_SOURCES CACHE INTERNAL "spider client shared source files") set(SPIDER_CLIENT_SHARED_HEADERS diff --git a/src/spider/core/JobMetadata.hpp b/src/spider/core/JobMetadata.hpp new file mode 100644 index 0000000..f29f8bb --- /dev/null +++ b/src/spider/core/JobMetadata.hpp @@ -0,0 +1,39 @@ +#ifndef SPIDER_CORE_JOBMETADATA_HPP +#define SPIDER_CORE_JOBMETADATA_HPP + +#include + +#include + +namespace spider::core { + +class JobMetadata { +public: + JobMetadata() = default; + + JobMetadata( + boost::uuids::uuid id, + boost::uuids::uuid client_id, + std::chrono::system_clock::time_point creation_time + ) + : m_id{id}, + m_client_id{client_id}, + m_creation_time{creation_time} {} + + [[nodiscard]] auto get_id() -> boost::uuids::uuid { return m_id; } + + [[nodiscard]] auto get_client_id() -> boost::uuids::uuid { return m_client_id; } + + [[nodiscard]] auto get_creation_time() -> std::chrono::system_clock::time_point { + return m_creation_time; + } + +private: + boost::uuids::uuid m_id; + boost::uuids::uuid m_client_id; + std::chrono::system_clock::time_point m_creation_time; +}; + +} // namespace spider::core + +#endif // SPIDER_CORE_JOBMETADATA_HPP diff --git a/src/spider/scheduler/FifoPolicy.cpp b/src/spider/scheduler/FifoPolicy.cpp new file mode 100644 index 0000000..a9aa1b3 --- /dev/null +++ b/src/spider/scheduler/FifoPolicy.cpp @@ -0,0 +1,12 @@ +#include "FifoPolicy.hpp" + +namespace spider::scheduler { + +auto FifoPolicy::schedule_next( + std::shared_ptr metadata_store, + std::shared_ptr data_store +) -> boost::uuids::uuid {} + +auto FifoPolicy::cleanup_job(boost::uuids::uuid job_id) -> void {} + +} // namespace spider::scheduler diff --git a/src/spider/scheduler/FifoPolicy.hpp b/src/spider/scheduler/FifoPolicy.hpp new file mode 100644 index 0000000..d715fcb --- /dev/null +++ b/src/spider/scheduler/FifoPolicy.hpp @@ -0,0 +1,26 @@ +#ifndef SPIDER_SCHEDULER_FIFOPOLICY_HPP +#define SPIDER_SCHEDULER_FIFOPOLICY_HPP + +#include + +#include + +#include "SchedulerPolicy.hpp" + +namespace spider::scheduler { + +class FifoPolicy final : public SchedulerPolicy { +public: + auto schedule_next( + std::shared_ptr metadata_store, + std::shared_ptr data_store + ) -> boost::uuids::uuid override; + auto cleanup_job(boost::uuids::uuid job_id) -> void override; + +private: + absl::flat_hash_map m_job_time_map; +}; + +} // namespace spider::scheduler + +#endif // SPIDER_SCHEDULER_FIFOPOLICY_HPP diff --git a/src/spider/scheduler/SchedulerPolicy.hpp b/src/spider/scheduler/SchedulerPolicy.hpp new file mode 100644 index 0000000..daa34be --- /dev/null +++ b/src/spider/scheduler/SchedulerPolicy.hpp @@ -0,0 +1,26 @@ +#ifndef SPIDER_SCHEDULER_SCHEDULERPOLICY_HPP +#define SPIDER_SCHEDULER_SCHEDULERPOLICY_HPP + +#include + +#include + +#include "../storage/DataStorage.hpp" +#include "../storage/MetadataStorage.hpp" + +namespace spider::scheduler { +class SchedulerPolicy { +public: + virtual ~SchedulerPolicy() = default; + + virtual auto schedule_next( + std::shared_ptr metadata_store, + std::shared_ptr data_store + ) -> boost::uuids::uuid; + + virtual auto cleanup_job(boost::uuids::uuid job_id) -> void; +}; + +} // namespace spider::scheduler + +#endif // SPIDER_SCHEDULER_SCHEDULERPOLICY_HPP diff --git a/src/spider/scheduler/scheduler.cpp b/src/spider/scheduler/scheduler.cpp new file mode 100644 index 0000000..3e4ee1d --- /dev/null +++ b/src/spider/scheduler/scheduler.cpp @@ -0,0 +1,2 @@ + +auto main(int argc, char** argv) -> int {} diff --git a/src/spider/storage/MetadataStorage.hpp b/src/spider/storage/MetadataStorage.hpp index 2f7abc6..d01f06d 100644 --- a/src/spider/storage/MetadataStorage.hpp +++ b/src/spider/storage/MetadataStorage.hpp @@ -7,6 +7,7 @@ #include #include "../core/Error.hpp" +#include "../core/JobMetadata.hpp" #include "../core/Task.hpp" #include "../core/TaskGraph.hpp" @@ -32,6 +33,7 @@ class MetadataStorage { virtual auto add_job(boost::uuids::uuid job_id, boost::uuids::uuid client_id, TaskGraph const& task_graph ) -> StorageErr = 0; + virtual auto get_job_metadata(boost::uuids::uuid id, JobMetadata* job) -> StorageErr = 0; virtual auto get_task_graph(boost::uuids::uuid id, TaskGraph* task_graph) -> StorageErr = 0; virtual auto get_jobs_by_client_id( boost::uuids::uuid client_id, diff --git a/src/spider/storage/MysqlStorage.cpp b/src/spider/storage/MysqlStorage.cpp index 961d46c..35e058b 100644 --- a/src/spider/storage/MysqlStorage.cpp +++ b/src/spider/storage/MysqlStorage.cpp @@ -1,10 +1,14 @@ #include "MysqlStorage.hpp" #include +#include #include +#include #include +#include #include #include +#include #include #include #include @@ -26,6 +30,7 @@ #include "../core/Data.hpp" #include "../core/Error.hpp" +#include "../core/JobMetadata.hpp" #include "../core/Task.hpp" #include "../core/TaskGraph.hpp" @@ -46,7 +51,7 @@ namespace { char const* const cCreateDriverTable = R"(CREATE TABLE IF NOT EXISTS `drivers` ( `id` BINARY(16) NOT NULL, `address` VARCHAR(40) NOT NULL, - `heartbeat` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + `heartbeat` NOT NULL TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ))"; @@ -61,6 +66,7 @@ char const* const cCreateSchedulerTable = R"(CREATE TABLE IF NOT EXISTS `schedul char const* const cCreateJobTable = R"(CREATE TABLE IF NOT EXISTS jobs ( `id` BINARY(16) NOT NULL, `client_id` BINARY(16) NOT NULL, + `creation_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, KEY (`client_id`) USING BTREE, PRIMARY KEY (`id`) ))"; @@ -114,7 +120,7 @@ char const* const cCreateTaskDependencyTable = R"(CREATE TABLE IF NOT EXISTS `ta char const* const cCreateTaskInstanceTable = R"(CREATE TABLE IF NOT EXISTS `task_instances` ( `id` BINARY(16) NOT NULL, `task_id` BINARY(16) NOT NULL, - `start_time` TIMESTAMP NOT NULL, + `start_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,, CONSTRAINT `instance_task_id` FOREIGN KEY (`task_id`) REFERENCES `tasks` (`id`) ON UPDATE NO ACTION ON DELETE CASCADE, PRIMARY KEY (`id`) ))"; @@ -696,6 +702,49 @@ auto MySqlMetadataStorage::get_task_graph(boost::uuids::uuid id, TaskGraph* task m_conn->commit(); return StorageErr{}; } +} // namespace spider::core + +namespace { + +auto parse_timestamp(std::string const& timestamp) -> std::chrono::system_clock::time_point { + std::tm time_date{}; + std::stringstream ss{timestamp}; + ss >> std::get_time(&time_date, "%Y-%m-%d %H:%M:%S"); + return std::chrono::system_clock::from_time_t(std::mktime(&time_date)); +} + +} // namespace + +namespace spider::core { + +auto MySqlMetadataStorage::get_job_metadata(boost::uuids::uuid id, JobMetadata* job) -> StorageErr { + try { + std::unique_ptr statement{m_conn->prepareStatement( + "SELECT `client_id`, `creation_time` FROM `jobs` WHERE `id` = ?" + )}; + sql::bytes id_bytes = uuid_get_bytes(id); + statement->setBytes(1, &id_bytes); + std::unique_ptr const res{statement->executeQuery()}; + if (0 == res->rowsCount()) { + m_conn->commit(); + return StorageErr{ + StorageErrType::KeyNotFoundErr, + fmt::format("No job with id {} ", boost::uuids::to_string(id)) + }; + } + res->next(); + boost::uuids::uuid const client_id = read_id(res->getBinaryStream("client_id")); + std::chrono::system_clock::time_point const creation_time + = parse_timestamp(res->getString("creation_time").c_str()); + *job = JobMetadata{id, client_id, creation_time}; + return StorageErr{}; + } catch (sql::SQLException& e) { + m_conn->rollback(); + return StorageErr{StorageErrType::OtherErr, e.what()}; + } + m_conn->commit(); + return StorageErr{}; +} auto MySqlMetadataStorage::get_jobs_by_client_id( boost::uuids::uuid client_id, diff --git a/src/spider/storage/MysqlStorage.hpp b/src/spider/storage/MysqlStorage.hpp index 8909ac9..d0ca13f 100644 --- a/src/spider/storage/MysqlStorage.hpp +++ b/src/spider/storage/MysqlStorage.hpp @@ -36,6 +36,7 @@ class MySqlMetadataStorage : public MetadataStorage { auto add_job(boost::uuids::uuid job_id, boost::uuids::uuid client_id, TaskGraph const& task_graph ) -> StorageErr override; + auto get_job_metadata(boost::uuids::uuid id, JobMetadata* job) -> StorageErr override; auto get_task_graph(boost::uuids::uuid id, TaskGraph* task_graph) -> StorageErr override; auto get_jobs_by_client_id( boost::uuids::uuid client_id, diff --git a/tests/storage/StorageTestHelper.hpp b/tests/storage/StorageTestHelper.hpp index 4f81439..8b7459d 100644 --- a/tests/storage/StorageTestHelper.hpp +++ b/tests/storage/StorageTestHelper.hpp @@ -2,16 +2,17 @@ #define SPIDER_TESTS_STORAGETESTHELPER_HPP // NOLINTBEGIN(cert-err58-cpp,cppcoreguidelines-avoid-do-while,readability-function-cognitive-complexity) -#include "../../src/spider/storage/DataStorage.hpp" -#include "../../src/spider/storage/MetadataStorage.hpp" -#include "../../src/spider/storage/MysqlStorage.hpp" - -#include #include #include #include #include +#include + +#include "../../src/spider/storage/DataStorage.hpp" +#include "../../src/spider/storage/MetadataStorage.hpp" +#include "../../src/spider/storage/MysqlStorage.hpp" + namespace spider::test { char const* const cStorageUrl = "jdbc:mariadb://localhost:3306/spider_test?user=root&password=password"; diff --git a/tests/storage/test-DataStorage.cpp b/tests/storage/test-DataStorage.cpp index 5d3e9bc..6e0e548 100644 --- a/tests/storage/test-DataStorage.cpp +++ b/tests/storage/test-DataStorage.cpp @@ -1,4 +1,12 @@ // NOLINTBEGIN(cert-err58-cpp,cppcoreguidelines-avoid-do-while,readability-function-cognitive-complexity,cppcoreguidelines-avoid-non-const-global-variables,cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays) +#include +#include + +#include +#include +#include +#include + #include "../../src/spider/core/Data.hpp" #include "../../src/spider/core/Error.hpp" #include "../../src/spider/core/Task.hpp" @@ -7,13 +15,6 @@ #include "../utils/CoreDataUtils.hpp" #include "StorageTestHelper.hpp" -#include -#include -#include -#include -#include -#include - namespace { TEMPLATE_LIST_TEST_CASE( diff --git a/tests/storage/test-MetadataStorage.cpp b/tests/storage/test-MetadataStorage.cpp index 494ea11..56862ba 100644 --- a/tests/storage/test-MetadataStorage.cpp +++ b/tests/storage/test-MetadataStorage.cpp @@ -1,22 +1,24 @@ // NOLINTBEGIN(cert-err58-cpp,cppcoreguidelines-avoid-do-while,readability-function-cognitive-complexity,cppcoreguidelines-avoid-non-const-global-variables,cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays) -#include "../../src/spider/core/Error.hpp" -#include "../../src/spider/core/Task.hpp" -#include "../../src/spider/core/TaskGraph.hpp" -#include "../../src/spider/storage/MetadataStorage.hpp" -#include "../utils/CoreTaskUtils.hpp" -#include "StorageTestHelper.hpp" +#include +#include +#include +#include +#include #include -#include #include #include #include #include -#include -#include -#include -#include + +#include "../../src/spider/core/Error.hpp" +#include "../../src/spider/core/JobMetadata.hpp" +#include "../../src/spider/core/Task.hpp" +#include "../../src/spider/core/TaskGraph.hpp" +#include "../../src/spider/storage/MetadataStorage.hpp" +#include "../utils/CoreTaskUtils.hpp" +#include "StorageTestHelper.hpp" namespace { @@ -142,6 +144,8 @@ TEMPLATE_LIST_TEST_CASE( REQUIRE(heads.contains(parent_1.get_id())); REQUIRE(heads.contains(parent_2.get_id())); + std::chrono::system_clock::time_point job_creation_time = std::chrono::system_clock::now(); + // Submit a simple job boost::uuids::uuid const simple_job_id = gen(); spider::core::Task const simple_task{"simple"}; @@ -161,7 +165,7 @@ TEMPLATE_LIST_TEST_CASE( REQUIRE(storage->get_jobs_by_client_id(gen(), &job_ids).success()); REQUIRE(job_ids.empty()); - // Get job id for client id should get correct value; + // Get job id for client id should get correct value REQUIRE(storage->get_jobs_by_client_id(client_id, &job_ids).success()); REQUIRE(2 == job_ids.size()); REQUIRE( @@ -169,6 +173,14 @@ TEMPLATE_LIST_TEST_CASE( || (job_ids[0] == simple_job_id && job_ids[1] == job_id)) ); + // Get job metadata should get correct value + spider::core::JobMetadata job_metadata{}; + REQUIRE(storage->get_job_metadata(job_id, &job_metadata).success()); + REQUIRE(job_id == job_metadata.get_id()); + REQUIRE(client_id == job_metadata.get_client_id()); + std::chrono::seconds const time_delta{1}; + REQUIRE(job_creation_time + time_delta >= job_metadata.get_creation_time()); + // Get task graph should succeed spider::core::TaskGraph graph_res{}; REQUIRE(storage->get_task_graph(job_id, &graph_res).success()); diff --git a/tests/utils/CoreTaskUtils.cpp b/tests/utils/CoreTaskUtils.cpp index 225cc77..5e7dc74 100644 --- a/tests/utils/CoreTaskUtils.cpp +++ b/tests/utils/CoreTaskUtils.cpp @@ -1,11 +1,6 @@ #include "CoreTaskUtils.hpp" -#include "../../src/spider/core/Task.hpp" -#include "../../src/spider/core/TaskGraph.hpp" - -#include #include -#include #include #include #include @@ -15,6 +10,12 @@ #include #include +#include +#include + +#include "../../src/spider/core/Task.hpp" +#include "../../src/spider/core/TaskGraph.hpp" + namespace spider::test { namespace { diff --git a/tests/worker/test-FunctionManager.cpp b/tests/worker/test-FunctionManager.cpp index 378938b..91a3d68 100644 --- a/tests/worker/test-FunctionManager.cpp +++ b/tests/worker/test-FunctionManager.cpp @@ -1,13 +1,14 @@ // NOLINTBEGIN(cert-err58-cpp,cppcoreguidelines-avoid-do-while,readability-function-cognitive-complexity,cppcoreguidelines-avoid-non-const-global-variables,cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays) -#include "../../src/spider/core/Data.hpp" -#include "../../src/spider/core/MsgPack.hpp" // IWYU pragma: keep -#include "../../src/spider/worker/FunctionManager.hpp" - -#include #include #include #include +#include + +#include "../../src/spider/core/Data.hpp" +#include "../../src/spider/core/MsgPack.hpp" // IWYU pragma: keep +#include "../../src/spider/worker/FunctionManager.hpp" + namespace { auto int_test(int const x, int const y) -> int { return x + y; diff --git a/tests/worker/test-MessagePipe.cpp b/tests/worker/test-MessagePipe.cpp index f3de62f..130467e 100644 --- a/tests/worker/test-MessagePipe.cpp +++ b/tests/worker/test-MessagePipe.cpp @@ -1,16 +1,17 @@ -#include "../../src/spider/core/BoostAsio.hpp" -#include "../../src/spider/core/MsgPack.hpp" // IWYU pragma: keep -#include "../../src/spider/worker/FunctionManager.hpp" -#include "../../src/spider/worker/message_pipe.hpp" -#include "../../src/spider/worker/TaskExecutorMessage.hpp" - -#include #include #include #include #include #include +#include + +#include "../../src/spider/core/BoostAsio.hpp" +#include "../../src/spider/core/MsgPack.hpp" // IWYU pragma: keep +#include "../../src/spider/worker/FunctionManager.hpp" +#include "../../src/spider/worker/message_pipe.hpp" +#include "../../src/spider/worker/TaskExecutorMessage.hpp" + // NOLINTBEGIN(cert-err58-cpp,cppcoreguidelines-avoid-do-while,readability-function-cognitive-complexity,cppcoreguidelines-avoid-non-const-global-variables,cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays) namespace { TEST_CASE("pipe message response", "[worker]") { diff --git a/tests/worker/test-TaskExecutor.cpp b/tests/worker/test-TaskExecutor.cpp index 2f3d991..aa7075c 100644 --- a/tests/worker/test-TaskExecutor.cpp +++ b/tests/worker/test-TaskExecutor.cpp @@ -1,17 +1,18 @@ -#include "../../src/spider/core/BoostAsio.hpp" // IWYU pragma: keep -#include "../../src/spider/worker/FunctionManager.hpp" -#include "../../src/spider/worker/TaskExecutor.hpp" +#include +#include +#include +#include +#include #include #include #include #include #include -#include -#include -#include -#include -#include + +#include "../../src/spider/core/BoostAsio.hpp" // IWYU pragma: keep +#include "../../src/spider/worker/FunctionManager.hpp" +#include "../../src/spider/worker/TaskExecutor.hpp" // NOLINTBEGIN(cert-err58-cpp,cppcoreguidelines-avoid-do-while,readability-function-cognitive-complexity,cppcoreguidelines-avoid-non-const-global-variables,cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays) diff --git a/tests/worker/worker-test.cpp b/tests/worker/worker-test.cpp index 8e4beba..c02453d 100644 --- a/tests/worker/worker-test.cpp +++ b/tests/worker/worker-test.cpp @@ -1,7 +1,7 @@ -#include "../../src/spider/worker/FunctionManager.hpp" - #include +#include "../../src/spider/worker/FunctionManager.hpp" + namespace { auto sum_test(int const x, int const y) -> int { return x + y; From 49c0ee36b9cfa668b6a70304128d483549dc5802 Mon Sep 17 00:00:00 2001 From: Sitao Wang Date: Tue, 3 Dec 2024 22:10:06 +0000 Subject: [PATCH 02/14] Add clang format for tests --- tests/.clang-format | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 tests/.clang-format diff --git a/tests/.clang-format b/tests/.clang-format new file mode 100644 index 0000000..06150ee --- /dev/null +++ b/tests/.clang-format @@ -0,0 +1,20 @@ +BasedOnStyle: "InheritParentConfig" + +IncludeCategories: + # NOTE: A header is grouped by first matching regex + # Project headers + - Regex: "^" + Priority: 1 + # C++ standard libraries + - Regex: "^<.+>" + Priority: 2 From 7e5eb95b488085e3f7d2416b301dfa9f86e4fced Mon Sep 17 00:00:00 2001 From: Sitao Wang Date: Tue, 3 Dec 2024 22:10:33 +0000 Subject: [PATCH 03/14] Make SchedulerPolicy pure virtual --- src/spider/scheduler/SchedulerPolicy.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/spider/scheduler/SchedulerPolicy.hpp b/src/spider/scheduler/SchedulerPolicy.hpp index daa34be..c05ab93 100644 --- a/src/spider/scheduler/SchedulerPolicy.hpp +++ b/src/spider/scheduler/SchedulerPolicy.hpp @@ -16,9 +16,9 @@ class SchedulerPolicy { virtual auto schedule_next( std::shared_ptr metadata_store, std::shared_ptr data_store - ) -> boost::uuids::uuid; + ) -> boost::uuids::uuid = 0; - virtual auto cleanup_job(boost::uuids::uuid job_id) -> void; + virtual auto cleanup_job(boost::uuids::uuid job_id) -> void = 0; }; } // namespace spider::scheduler From 954416d919926c96653bd4df9bc138929cdcc367 Mon Sep 17 00:00:00 2001 From: Sitao Wang Date: Tue, 3 Dec 2024 22:43:31 +0000 Subject: [PATCH 04/14] Fix bug in mysql table creation --- src/spider/storage/MysqlStorage.cpp | 4 ++-- tests/storage/test-MetadataStorage.cpp | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/spider/storage/MysqlStorage.cpp b/src/spider/storage/MysqlStorage.cpp index 35e058b..606abbd 100644 --- a/src/spider/storage/MysqlStorage.cpp +++ b/src/spider/storage/MysqlStorage.cpp @@ -51,7 +51,7 @@ namespace { char const* const cCreateDriverTable = R"(CREATE TABLE IF NOT EXISTS `drivers` ( `id` BINARY(16) NOT NULL, `address` VARCHAR(40) NOT NULL, - `heartbeat` NOT NULL TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + `heartbeat` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ))"; @@ -120,7 +120,7 @@ char const* const cCreateTaskDependencyTable = R"(CREATE TABLE IF NOT EXISTS `ta char const* const cCreateTaskInstanceTable = R"(CREATE TABLE IF NOT EXISTS `task_instances` ( `id` BINARY(16) NOT NULL, `task_id` BINARY(16) NOT NULL, - `start_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,, + `start_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, CONSTRAINT `instance_task_id` FOREIGN KEY (`task_id`) REFERENCES `tasks` (`id`) ON UPDATE NO ACTION ON DELETE CASCADE, PRIMARY KEY (`id`) ))"; diff --git a/tests/storage/test-MetadataStorage.cpp b/tests/storage/test-MetadataStorage.cpp index 56862ba..166c80f 100644 --- a/tests/storage/test-MetadataStorage.cpp +++ b/tests/storage/test-MetadataStorage.cpp @@ -144,7 +144,7 @@ TEMPLATE_LIST_TEST_CASE( REQUIRE(heads.contains(parent_1.get_id())); REQUIRE(heads.contains(parent_2.get_id())); - std::chrono::system_clock::time_point job_creation_time = std::chrono::system_clock::now(); + std::chrono::system_clock::time_point const job_creation_time = std::chrono::system_clock::now(); // Submit a simple job boost::uuids::uuid const simple_job_id = gen(); @@ -180,6 +180,7 @@ TEMPLATE_LIST_TEST_CASE( REQUIRE(client_id == job_metadata.get_client_id()); std::chrono::seconds const time_delta{1}; REQUIRE(job_creation_time + time_delta >= job_metadata.get_creation_time()); + REQUIRE(job_creation_time - time_delta <= job_metadata.get_creation_time()); // Get task graph should succeed spider::core::TaskGraph graph_res{}; From 2248eb14a11f68fa83567cb711a480ed6cf6df40 Mon Sep 17 00:00:00 2001 From: Sitao Wang Date: Wed, 4 Dec 2024 04:36:36 +0000 Subject: [PATCH 05/14] Add fifo job scheduler --- src/spider/scheduler/FifoPolicy.cpp | 104 ++++++++++++++++++++++- src/spider/scheduler/FifoPolicy.hpp | 12 ++- src/spider/scheduler/SchedulerPolicy.hpp | 7 +- src/spider/storage/MetadataStorage.hpp | 2 + src/spider/storage/MysqlStorage.cpp | 27 +++++- src/spider/storage/MysqlStorage.hpp | 1 + tests/storage/test-MetadataStorage.cpp | 3 +- 7 files changed, 146 insertions(+), 10 deletions(-) diff --git a/src/spider/scheduler/FifoPolicy.cpp b/src/spider/scheduler/FifoPolicy.cpp index a9aa1b3..ce5c27a 100644 --- a/src/spider/scheduler/FifoPolicy.cpp +++ b/src/spider/scheduler/FifoPolicy.cpp @@ -1,12 +1,110 @@ #include "FifoPolicy.hpp" +#include + +#include +#include +#include + +#include "../core/Task.hpp" +#include "../storage/DataStorage.hpp" +#include "../storage/MetadataStorage.hpp" + +namespace { + +auto task_locality_satisfied( + std::shared_ptr data_store, + spider::core::Task const& task, + std::string const& addr +) -> bool { + for (auto const& input : task.get_inputs()) { + if (input.get_value().has_value()) { + continue; + } + std::optional optional_data_id = input.get_data_id(); + if (!optional_data_id.has_value()) { + continue; + } + boost::uuids::uuid const data_id = optional_data_id.value(); + spider::core::Data data; + if (false == data_store->get_data(data_id, &data).success()) { + throw std::runtime_error{ + fmt::format("Data with id {} not exists.", boost::uuids::to_string((data_id))) + }; + } + if (false == data.is_hard_locality()) { + continue; + } + std::vector const& locality = data.get_locality(); + if (locality.empty()) { + continue; + } + if (std::ranges::find(locality, addr) == locality.end()) { + return false; + } + } +} + +} // namespace + namespace spider::scheduler { auto FifoPolicy::schedule_next( std::shared_ptr metadata_store, - std::shared_ptr data_store -) -> boost::uuids::uuid {} + std::shared_ptr data_store, + boost::uuids::uuid const /*worker_id*/, + std::string const& worker_addr +) -> std::optional { + std::vector ready_tasks; + metadata_store->get_ready_tasks(&ready_tasks); + + std::erase_if(ready_tasks, [data_store, worker_addr](core::Task const& task) -> bool { + return task_locality_satisfied(data_store, task, worker_addr); + }); + + if (ready_tasks.empty()) { + return std::nullopt; + } + + auto const earliest_task = std::ranges::min_element( + ready_tasks, + {}, + [this, + metadata_store](core::Task const& task) -> std::chrono::system_clock::time_point { + boost::uuids::uuid const task_id = task.get_id(); + boost::uuids::uuid job_id; + if (m_task_job_map.contains(task_id)) { + job_id = m_task_job_map[task_id]; + } else { + if (false == metadata_store->get_task_job_id(task_id, &job_id).success()) { + throw std::runtime_error{fmt::format("Task with id {} not exists.", task_id) + }; + } + m_task_job_map.emplace(task_id, job_id); + } + + if (m_job_time_map.contains(job_id)) { + return m_job_time_map[job_id]; + } + + core::JobMetadata job_metadata; + if (false == metadata_store->get_job_metadata(job_id, &job_metadata).success()) { + throw std::runtime_error{fmt::format("Job with id {} not exists.", job_id)}; + } + m_job_time_map.emplace(job_id, job_metadata.get_creation_time()); + return job_metadata.get_creation_time(); + } + ); + + return earliest_task->get_id(); +} -auto FifoPolicy::cleanup_job(boost::uuids::uuid job_id) -> void {} +auto FifoPolicy::cleanup_job(boost::uuids::uuid const job_id) -> void { + absl::erase_if(m_task_job_map, [&job_id](auto const& item) -> bool { + auto const& [item_task_id, item_job_id] = item; + return item_job_id == job_id; + }); + m_job_time_map.erase(job_id); +} } // namespace spider::scheduler diff --git a/src/spider/scheduler/FifoPolicy.hpp b/src/spider/scheduler/FifoPolicy.hpp index d715fcb..433a7ff 100644 --- a/src/spider/scheduler/FifoPolicy.hpp +++ b/src/spider/scheduler/FifoPolicy.hpp @@ -2,9 +2,12 @@ #define SPIDER_SCHEDULER_FIFOPOLICY_HPP #include +#include #include +#include "../storage/DataStorage.hpp" +#include "../storage/MetadataStorage.hpp" #include "SchedulerPolicy.hpp" namespace spider::scheduler { @@ -13,11 +16,14 @@ class FifoPolicy final : public SchedulerPolicy { public: auto schedule_next( std::shared_ptr metadata_store, - std::shared_ptr data_store - ) -> boost::uuids::uuid override; - auto cleanup_job(boost::uuids::uuid job_id) -> void override; + std::shared_ptr data_store, + boost::uuids::uuid const worker_id, + std::string const& worker_addr + ) -> std::optional override; + auto cleanup_job(boost::uuids::uuid const job_id) -> void override; private: + absl::flat_hash_map m_task_job_map; absl::flat_hash_map m_job_time_map; }; diff --git a/src/spider/scheduler/SchedulerPolicy.hpp b/src/spider/scheduler/SchedulerPolicy.hpp index c05ab93..f43ed27 100644 --- a/src/spider/scheduler/SchedulerPolicy.hpp +++ b/src/spider/scheduler/SchedulerPolicy.hpp @@ -2,6 +2,7 @@ #define SPIDER_SCHEDULER_SCHEDULERPOLICY_HPP #include +#include #include @@ -15,8 +16,10 @@ class SchedulerPolicy { virtual auto schedule_next( std::shared_ptr metadata_store, - std::shared_ptr data_store - ) -> boost::uuids::uuid = 0; + std::shared_ptr data_store, + boost::uuids::uuid worker_id, + std::string const& worker_addr + ) -> std::optional = 0; virtual auto cleanup_job(boost::uuids::uuid job_id) -> void = 0; }; diff --git a/src/spider/storage/MetadataStorage.hpp b/src/spider/storage/MetadataStorage.hpp index d01f06d..14fd5aa 100644 --- a/src/spider/storage/MetadataStorage.hpp +++ b/src/spider/storage/MetadataStorage.hpp @@ -42,6 +42,8 @@ class MetadataStorage { virtual auto remove_job(boost::uuids::uuid id) -> StorageErr = 0; virtual auto add_child(boost::uuids::uuid parent_id, Task const& child) -> StorageErr = 0; virtual auto get_task(boost::uuids::uuid id, Task* task) -> StorageErr = 0; + virtual auto get_task_job_id(boost::uuids::uuid id, boost::uuids::uuid* job_id) -> StorageErr + = 0; virtual auto get_ready_tasks(std::vector* tasks) -> StorageErr = 0; virtual auto set_task_state(boost::uuids::uuid id, TaskState state) -> StorageErr = 0; virtual auto add_task_instance(TaskInstance const& instance) -> StorageErr = 0; diff --git a/src/spider/storage/MysqlStorage.cpp b/src/spider/storage/MysqlStorage.cpp index 606abbd..857a014 100644 --- a/src/spider/storage/MysqlStorage.cpp +++ b/src/spider/storage/MysqlStorage.cpp @@ -737,7 +737,6 @@ auto MySqlMetadataStorage::get_job_metadata(boost::uuids::uuid id, JobMetadata* std::chrono::system_clock::time_point const creation_time = parse_timestamp(res->getString("creation_time").c_str()); *job = JobMetadata{id, client_id, creation_time}; - return StorageErr{}; } catch (sql::SQLException& e) { m_conn->rollback(); return StorageErr{StorageErrType::OtherErr, e.what()}; @@ -836,6 +835,32 @@ auto MySqlMetadataStorage::get_task(boost::uuids::uuid id, Task* task) -> Storag return StorageErr{}; } +auto MySqlMetadataStorage::get_task_job_id(boost::uuids::uuid id, boost::uuids::uuid* job_id) + -> StorageErr { + try { + std::unique_ptr statement( + m_conn->prepareStatement("SELECT `job_id` FROM `tasks` WHERE `id` = ?") + ); + sql::bytes id_bytes = uuid_get_bytes(id); + statement->setBytes(1, &id_bytes); + std::unique_ptr const res(statement->executeQuery()); + if (res->rowsCount() == 0) { + m_conn->commit(); + return StorageErr{ + StorageErrType::KeyNotFoundErr, + fmt::format("no task with id {}", boost::uuids::to_string(id)) + }; + } + res->next(); + *job_id = read_id(res->getBinaryStream("job_id")); + } catch (sql::SQLException& e) { + m_conn->rollback(); + return StorageErr{StorageErrType::OtherErr, e.what()}; + } + m_conn->commit(); + return StorageErr{}; +} + auto MySqlMetadataStorage::get_ready_tasks(std::vector* tasks) -> StorageErr { try { std::unique_ptr statement(m_conn->createStatement()); diff --git a/src/spider/storage/MysqlStorage.hpp b/src/spider/storage/MysqlStorage.hpp index d0ca13f..ed4abe3 100644 --- a/src/spider/storage/MysqlStorage.hpp +++ b/src/spider/storage/MysqlStorage.hpp @@ -45,6 +45,7 @@ class MySqlMetadataStorage : public MetadataStorage { auto remove_job(boost::uuids::uuid id) -> StorageErr override; auto add_child(boost::uuids::uuid parent_id, Task const& child) -> StorageErr override; auto get_task(boost::uuids::uuid id, Task* task) -> StorageErr override; + auto get_task_job_id(boost::uuids::uuid id, boost::uuids::uuid* job_id) -> StorageErr override; auto get_ready_tasks(std::vector* tasks) -> StorageErr override; auto set_task_state(boost::uuids::uuid id, TaskState state) -> StorageErr override; auto add_task_instance(TaskInstance const& instance) -> StorageErr override; diff --git a/tests/storage/test-MetadataStorage.cpp b/tests/storage/test-MetadataStorage.cpp index 166c80f..9bf0f50 100644 --- a/tests/storage/test-MetadataStorage.cpp +++ b/tests/storage/test-MetadataStorage.cpp @@ -144,7 +144,8 @@ TEMPLATE_LIST_TEST_CASE( REQUIRE(heads.contains(parent_1.get_id())); REQUIRE(heads.contains(parent_2.get_id())); - std::chrono::system_clock::time_point const job_creation_time = std::chrono::system_clock::now(); + std::chrono::system_clock::time_point const job_creation_time + = std::chrono::system_clock::now(); // Submit a simple job boost::uuids::uuid const simple_job_id = gen(); From d826b4fb0b8ee0d67b41ff01534bce1a6503447a Mon Sep 17 00:00:00 2001 From: Sitao Wang Date: Wed, 4 Dec 2024 05:48:20 +0000 Subject: [PATCH 06/14] Add scheduler test --- src/spider/scheduler/FifoPolicy.cpp | 16 ++- tests/CMakeLists.txt | 9 ++ tests/scheduler/test-SchedulerPolicy.cpp | 141 +++++++++++++++++++++++ 3 files changed, 161 insertions(+), 5 deletions(-) create mode 100644 tests/scheduler/test-SchedulerPolicy.cpp diff --git a/src/spider/scheduler/FifoPolicy.cpp b/src/spider/scheduler/FifoPolicy.cpp index ce5c27a..53470b7 100644 --- a/src/spider/scheduler/FifoPolicy.cpp +++ b/src/spider/scheduler/FifoPolicy.cpp @@ -28,9 +28,9 @@ auto task_locality_satisfied( boost::uuids::uuid const data_id = optional_data_id.value(); spider::core::Data data; if (false == data_store->get_data(data_id, &data).success()) { - throw std::runtime_error{ + throw std::runtime_error( fmt::format("Data with id {} not exists.", boost::uuids::to_string((data_id))) - }; + ); } if (false == data.is_hard_locality()) { continue; @@ -43,6 +43,7 @@ auto task_locality_satisfied( return false; } } + return true; } } // namespace @@ -77,8 +78,10 @@ auto FifoPolicy::schedule_next( job_id = m_task_job_map[task_id]; } else { if (false == metadata_store->get_task_job_id(task_id, &job_id).success()) { - throw std::runtime_error{fmt::format("Task with id {} not exists.", task_id) - }; + throw std::runtime_error(fmt::format( + "Task with id {} not exists.", + boost::uuids::to_string(task_id) + )); } m_task_job_map.emplace(task_id, job_id); } @@ -89,7 +92,10 @@ auto FifoPolicy::schedule_next( core::JobMetadata job_metadata; if (false == metadata_store->get_job_metadata(job_id, &job_metadata).success()) { - throw std::runtime_error{fmt::format("Job with id {} not exists.", job_id)}; + throw std::runtime_error(fmt::format( + "Job with id {} not exists.", + boost::uuids::to_string(job_id) + )); } m_job_time_map.emplace(job_id, job_metadata.get_creation_time()); return job_metadata.get_creation_time(); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 5b1beb5..bf5c0e5 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -8,17 +8,26 @@ set(SPIDER_TEST_SOURCES worker/test-FunctionManager.cpp worker/test-MessagePipe.cpp worker/test-TaskExecutor.cpp + scheduler/test-SchedulerPolicy.cpp CACHE INTERNAL "spider test source files" ) add_executable(unitTest) target_sources(unitTest PRIVATE ${SPIDER_TEST_SOURCES}) + set(SPIDER_TEST_WORKER_SOURCES) foreach(worker_source ${SPIDER_WORKER_SOURCES}) list(APPEND SPIDER_TEST_WORKER_SOURCES "../src/spider/${worker_source}") endforeach() target_sources(unitTest PRIVATE ${SPIDER_TEST_WORKER_SOURCES}) + +set(SPIDER_TEST_SCHEDULER_SOURCES) +foreach(scheduler_source ${SPIDER_SCHEDULER_SOURCES}) + list(APPEND SPIDER_TEST_SCHEDULER_SOURCES "../src/spider/${scheduler_source}") +endforeach() +target_sources(unitTest PRIVATE ${SPIDER_TEST_SCHEDULER_SOURCES}) + target_link_libraries(unitTest PRIVATE Catch2::Catch2WithMain) target_link_libraries(unitTest PRIVATE spider_core) target_link_libraries( diff --git a/tests/scheduler/test-SchedulerPolicy.cpp b/tests/scheduler/test-SchedulerPolicy.cpp new file mode 100644 index 0000000..36ea108 --- /dev/null +++ b/tests/scheduler/test-SchedulerPolicy.cpp @@ -0,0 +1,141 @@ +// NOLINTBEGIN(cert-err58-cpp,cppcoreguidelines-avoid-do-while,readability-function-cognitive-complexity,cppcoreguidelines-avoid-non-const-global-variables,cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays) + +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "../../src/spider/core/Data.hpp" +#include "../../src/spider/core/Task.hpp" +#include "../../src/spider/core/TaskGraph.hpp" +#include "../../src/spider/scheduler/FifoPolicy.hpp" +#include "../../src/spider/storage/DataStorage.hpp" +#include "../../src/spider/storage/MetadataStorage.hpp" +#include "../storage/StorageTestHelper.hpp" + +TEMPLATE_LIST_TEST_CASE( + "FIFO schedule order", + "[scheduler][storage]", + spider::test::StorageTypeList +) { + std::tuple< + std::unique_ptr, + std::unique_ptr> + storages = spider::test::create_storage< + std::tuple_element_t<0, TestType>, + std::tuple_element_t<1, TestType>>(); + std::shared_ptr const metadata_store + = std::move(std::get<0>(storages)); + std::shared_ptr const data_store = std::move(std::get<1>(storages)); + + boost::uuids::random_generator gen; + boost::uuids::uuid const client_id = gen(); + // Submit tasks + spider::core::Task const task_1{"task_1"}; + spider::core::TaskGraph graph_1; + graph_1.add_task(task_1); + boost::uuids::uuid const job_id_1 = gen(); + metadata_store->add_job(job_id_1, client_id, graph_1); + std::this_thread::sleep_for(std::chrono::seconds(1)); + spider::core::Task const task_2{"task_2"}; + spider::core::TaskGraph graph_2; + graph_2.add_task(task_2); + boost::uuids::uuid const job_id_2 = gen(); + metadata_store->add_job(job_id_2, client_id, graph_2); + + spider::scheduler::FifoPolicy policy; + + // Scheduler the earlier task + std::optional const optional_task_id + = policy.schedule_next(metadata_store, data_store, gen(), ""); + REQUIRE(optional_task_id.has_value()); + if (optional_task_id.has_value()) { + boost::uuids::uuid const& task_id = optional_task_id.value(); + REQUIRE(task_id == task_1.get_id()); + } +} + +TEMPLATE_LIST_TEST_CASE( + "Schedule hard locality", + "[scheduler][storage]", + spider::test::StorageTypeList +) { + std::tuple< + std::unique_ptr, + std::unique_ptr> + storages = spider::test::create_storage< + std::tuple_element_t<0, TestType>, + std::tuple_element_t<1, TestType>>(); + std::shared_ptr const metadata_store + = std::move(std::get<0>(storages)); + std::shared_ptr const data_store = std::move(std::get<1>(storages)); + + boost::uuids::random_generator gen; + // Submit task with hard locality + spider::core::Task task{"task"}; + spider::core::Data data; + data.set_hard_locality(true); + data.set_locality({"127.0.0.1"}); + data_store->add_data(data); + task.add_input(spider::core::TaskInput{data.get_id(), "int"}); + spider::core::TaskGraph graph; + graph.add_task(task); + metadata_store->add_job(gen(), gen(), graph); + + spider::scheduler::FifoPolicy policy; + // Schedule with wrong address + REQUIRE(false == policy.schedule_next(metadata_store, data_store, gen(), "").has_value()); + // Schedule with correct address + std::optional const optional_task_id + = policy.schedule_next(metadata_store, data_store, gen(), "127.0.0.1"); + REQUIRE(optional_task_id.has_value()); + if (optional_task_id.has_value()) { + boost::uuids::uuid const& task_id = optional_task_id.value(); + REQUIRE(task_id == task.get_id()); + } +} + +TEMPLATE_LIST_TEST_CASE( + "Schedule soft locality", + "[scheduler][storage]", + spider::test::StorageTypeList +) { + std::tuple< + std::unique_ptr, + std::unique_ptr> + storages = spider::test::create_storage< + std::tuple_element_t<0, TestType>, + std::tuple_element_t<1, TestType>>(); + std::shared_ptr const metadata_store + = std::move(std::get<0>(storages)); + std::shared_ptr const data_store = std::move(std::get<1>(storages)); + + boost::uuids::random_generator gen; + // Submit task with hard locality + spider::core::Task task{"task"}; + spider::core::Data data; + data.set_hard_locality(false); + data.set_locality({"127.0.0.1"}); + data_store->add_data(data); + task.add_input(spider::core::TaskInput{data.get_id(), "int"}); + spider::core::TaskGraph graph; + graph.add_task(task); + metadata_store->add_job(gen(), gen(), graph); + + spider::scheduler::FifoPolicy policy; + // Schedule with wrong address + std::optional const optional_task_id + = policy.schedule_next(metadata_store, data_store, gen(), ""); + REQUIRE(optional_task_id.has_value()); + if (optional_task_id.has_value()) { + boost::uuids::uuid const& task_id = optional_task_id.value(); + REQUIRE(task_id == task.get_id()); + } +} + +// NOLINTEND(cert-err58-cpp,cppcoreguidelines-avoid-do-while,readability-function-cognitive-complexity,cppcoreguidelines-avoid-non-const-global-variables,cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays) From 54e23a55a4720ecdb5128d9421e826bb0d2a5e56 Mon Sep 17 00:00:00 2001 From: Sitao Wang Date: Wed, 4 Dec 2024 06:50:55 +0000 Subject: [PATCH 07/14] Fix scheduler --- src/spider/scheduler/FifoPolicy.cpp | 2 +- src/spider/storage/MysqlStorage.cpp | 2 +- tests/scheduler/test-SchedulerPolicy.cpp | 31 ++++++++++++++++++------ tests/storage/test-MetadataStorage.cpp | 1 + 4 files changed, 27 insertions(+), 9 deletions(-) diff --git a/src/spider/scheduler/FifoPolicy.cpp b/src/spider/scheduler/FifoPolicy.cpp index 53470b7..0d121f0 100644 --- a/src/spider/scheduler/FifoPolicy.cpp +++ b/src/spider/scheduler/FifoPolicy.cpp @@ -60,7 +60,7 @@ auto FifoPolicy::schedule_next( metadata_store->get_ready_tasks(&ready_tasks); std::erase_if(ready_tasks, [data_store, worker_addr](core::Task const& task) -> bool { - return task_locality_satisfied(data_store, task, worker_addr); + return !task_locality_satisfied(data_store, task, worker_addr); }); if (ready_tasks.empty()) { diff --git a/src/spider/storage/MysqlStorage.cpp b/src/spider/storage/MysqlStorage.cpp index 857a014..bcbbfc5 100644 --- a/src/spider/storage/MysqlStorage.cpp +++ b/src/spider/storage/MysqlStorage.cpp @@ -138,7 +138,7 @@ char const* const cCreateDataTable = R"(CREATE TABLE IF NOT EXISTS `data` ( char const* const cCreateDataLocalityTable = R"(CREATE TABLE IF NOT EXISTS `data_locality` ( `id` BINARY(16) NOT NULL, - `address` INT UNSIGNED NOT NULL, + `address` VARCHAR(40) NOT NULL, KEY (`id`) USING BTREE, CONSTRAINT `locality_data_id` FOREIGN KEY (`id`) REFERENCES `data` (`id`) ON UPDATE NO ACTION ON DELETE CASCADE ))"; diff --git a/tests/scheduler/test-SchedulerPolicy.cpp b/tests/scheduler/test-SchedulerPolicy.cpp index 36ea108..6cda7e6 100644 --- a/tests/scheduler/test-SchedulerPolicy.cpp +++ b/tests/scheduler/test-SchedulerPolicy.cpp @@ -40,13 +40,17 @@ TEMPLATE_LIST_TEST_CASE( spider::core::TaskGraph graph_1; graph_1.add_task(task_1); boost::uuids::uuid const job_id_1 = gen(); - metadata_store->add_job(job_id_1, client_id, graph_1); + REQUIRE(metadata_store->add_job(job_id_1, client_id, graph_1).success()); + REQUIRE(metadata_store->set_task_state(task_1.get_id(), spider::core::TaskState::Ready) + .success()); std::this_thread::sleep_for(std::chrono::seconds(1)); spider::core::Task const task_2{"task_2"}; spider::core::TaskGraph graph_2; graph_2.add_task(task_2); boost::uuids::uuid const job_id_2 = gen(); - metadata_store->add_job(job_id_2, client_id, graph_2); + REQUIRE(metadata_store->add_job(job_id_2, client_id, graph_2).success()); + REQUIRE(metadata_store->set_task_state(task_2.get_id(), spider::core::TaskState::Ready) + .success()); spider::scheduler::FifoPolicy policy; @@ -58,6 +62,9 @@ TEMPLATE_LIST_TEST_CASE( boost::uuids::uuid const& task_id = optional_task_id.value(); REQUIRE(task_id == task_1.get_id()); } + + REQUIRE(metadata_store->remove_job(job_id_1).success()); + REQUIRE(metadata_store->remove_job(job_id_2).success()); } TEMPLATE_LIST_TEST_CASE( @@ -76,20 +83,23 @@ TEMPLATE_LIST_TEST_CASE( std::shared_ptr const data_store = std::move(std::get<1>(storages)); boost::uuids::random_generator gen; + boost::uuids::uuid const job_id = gen(); // Submit task with hard locality spider::core::Task task{"task"}; - spider::core::Data data; + spider::core::Data data{"value"}; data.set_hard_locality(true); data.set_locality({"127.0.0.1"}); - data_store->add_data(data); + auto err = data_store->add_data(data); task.add_input(spider::core::TaskInput{data.get_id(), "int"}); spider::core::TaskGraph graph; graph.add_task(task); - metadata_store->add_job(gen(), gen(), graph); + REQUIRE(metadata_store->add_job(job_id, gen(), graph).success()); + REQUIRE(metadata_store->set_task_state(task.get_id(), spider::core::TaskState::Ready).success() + ); spider::scheduler::FifoPolicy policy; // Schedule with wrong address - REQUIRE(false == policy.schedule_next(metadata_store, data_store, gen(), "").has_value()); + REQUIRE_FALSE(policy.schedule_next(metadata_store, data_store, gen(), "").has_value()); // Schedule with correct address std::optional const optional_task_id = policy.schedule_next(metadata_store, data_store, gen(), "127.0.0.1"); @@ -98,6 +108,8 @@ TEMPLATE_LIST_TEST_CASE( boost::uuids::uuid const& task_id = optional_task_id.value(); REQUIRE(task_id == task.get_id()); } + + // REQUIRE(metadata_store->remove_job(job_id).success()); } TEMPLATE_LIST_TEST_CASE( @@ -116,6 +128,7 @@ TEMPLATE_LIST_TEST_CASE( std::shared_ptr const data_store = std::move(std::get<1>(storages)); boost::uuids::random_generator gen; + boost::uuids::uuid const job_id = gen(); // Submit task with hard locality spider::core::Task task{"task"}; spider::core::Data data; @@ -125,7 +138,9 @@ TEMPLATE_LIST_TEST_CASE( task.add_input(spider::core::TaskInput{data.get_id(), "int"}); spider::core::TaskGraph graph; graph.add_task(task); - metadata_store->add_job(gen(), gen(), graph); + REQUIRE(metadata_store->add_job(job_id, gen(), graph).success()); + REQUIRE(metadata_store->set_task_state(task.get_id(), spider::core::TaskState::Ready).success() + ); spider::scheduler::FifoPolicy policy; // Schedule with wrong address @@ -136,6 +151,8 @@ TEMPLATE_LIST_TEST_CASE( boost::uuids::uuid const& task_id = optional_task_id.value(); REQUIRE(task_id == task.get_id()); } + + REQUIRE(metadata_store->remove_job(job_id).success()); } // NOLINTEND(cert-err58-cpp,cppcoreguidelines-avoid-do-while,readability-function-cognitive-complexity,cppcoreguidelines-avoid-non-const-global-variables,cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays) diff --git a/tests/storage/test-MetadataStorage.cpp b/tests/storage/test-MetadataStorage.cpp index 9bf0f50..cd534f6 100644 --- a/tests/storage/test-MetadataStorage.cpp +++ b/tests/storage/test-MetadataStorage.cpp @@ -220,6 +220,7 @@ TEMPLATE_LIST_TEST_CASE( graph_res = spider::core::TaskGraph{}; REQUIRE(storage->get_task_graph(job_id, &graph_res).success()); REQUIRE(spider::test::task_graph_equal(graph, graph_res)); + REQUIRE(storage->remove_job(job_id).success()); } } // namespace From 7e3dfb6fb18ce1ca5219965bea244e79d7d1470a Mon Sep 17 00:00:00 2001 From: Sitao Wang Date: Wed, 4 Dec 2024 07:32:06 +0000 Subject: [PATCH 08/14] Fix clang tidy --- src/spider/scheduler/FifoPolicy.cpp | 10 +++++++++- src/spider/scheduler/FifoPolicy.hpp | 7 +++++-- src/spider/scheduler/SchedulerPolicy.hpp | 10 ++++++++-- src/spider/storage/MysqlStorage.hpp | 1 + tests/scheduler/test-SchedulerPolicy.cpp | 8 ++++++-- 5 files changed, 29 insertions(+), 7 deletions(-) diff --git a/src/spider/scheduler/FifoPolicy.cpp b/src/spider/scheduler/FifoPolicy.cpp index 0d121f0..aea50f5 100644 --- a/src/spider/scheduler/FifoPolicy.cpp +++ b/src/spider/scheduler/FifoPolicy.cpp @@ -1,19 +1,27 @@ #include "FifoPolicy.hpp" +#include +#include #include +#include +#include +#include +#include +#include #include #include #include #include "../core/Task.hpp" +#include "../core/JobMetadata.hpp" #include "../storage/DataStorage.hpp" #include "../storage/MetadataStorage.hpp" namespace { auto task_locality_satisfied( - std::shared_ptr data_store, + std::shared_ptr const& data_store, spider::core::Task const& task, std::string const& addr ) -> bool { diff --git a/src/spider/scheduler/FifoPolicy.hpp b/src/spider/scheduler/FifoPolicy.hpp index 433a7ff..0ee1f13 100644 --- a/src/spider/scheduler/FifoPolicy.hpp +++ b/src/spider/scheduler/FifoPolicy.hpp @@ -3,8 +3,11 @@ #include #include +#include +#include #include +#include #include "../storage/DataStorage.hpp" #include "../storage/MetadataStorage.hpp" @@ -17,10 +20,10 @@ class FifoPolicy final : public SchedulerPolicy { auto schedule_next( std::shared_ptr metadata_store, std::shared_ptr data_store, - boost::uuids::uuid const worker_id, + boost::uuids::uuid worker_id, std::string const& worker_addr ) -> std::optional override; - auto cleanup_job(boost::uuids::uuid const job_id) -> void override; + auto cleanup_job(boost::uuids::uuid job_id) -> void override; private: absl::flat_hash_map m_task_job_map; diff --git a/src/spider/scheduler/SchedulerPolicy.hpp b/src/spider/scheduler/SchedulerPolicy.hpp index f43ed27..7c41474 100644 --- a/src/spider/scheduler/SchedulerPolicy.hpp +++ b/src/spider/scheduler/SchedulerPolicy.hpp @@ -3,6 +3,7 @@ #include #include +#include #include @@ -12,16 +13,21 @@ namespace spider::scheduler { class SchedulerPolicy { public: + SchedulerPolicy() = default; + SchedulerPolicy(SchedulerPolicy const&) = default; + auto operator=(SchedulerPolicy const&) -> SchedulerPolicy& = default; + SchedulerPolicy(SchedulerPolicy&&) = default; + auto operator=(SchedulerPolicy&&) -> SchedulerPolicy& = default; virtual ~SchedulerPolicy() = default; virtual auto schedule_next( std::shared_ptr metadata_store, std::shared_ptr data_store, - boost::uuids::uuid worker_id, + boost::uuids::uuid const worker_id, std::string const& worker_addr ) -> std::optional = 0; - virtual auto cleanup_job(boost::uuids::uuid job_id) -> void = 0; + virtual auto cleanup_job(boost::uuids::uuid const job_id) -> void = 0; }; } // namespace spider::scheduler diff --git a/src/spider/storage/MysqlStorage.hpp b/src/spider/storage/MysqlStorage.hpp index ed4abe3..6ea38db 100644 --- a/src/spider/storage/MysqlStorage.hpp +++ b/src/spider/storage/MysqlStorage.hpp @@ -14,6 +14,7 @@ #include "../core/Error.hpp" #include "../core/Task.hpp" #include "../core/TaskGraph.hpp" +#include "../core/JobMetadata.hpp" #include "DataStorage.hpp" #include "MetadataStorage.hpp" diff --git a/tests/scheduler/test-SchedulerPolicy.cpp b/tests/scheduler/test-SchedulerPolicy.cpp index 6cda7e6..1abd3ab 100644 --- a/tests/scheduler/test-SchedulerPolicy.cpp +++ b/tests/scheduler/test-SchedulerPolicy.cpp @@ -1,9 +1,11 @@ -// NOLINTBEGIN(cert-err58-cpp,cppcoreguidelines-avoid-do-while,readability-function-cognitive-complexity,cppcoreguidelines-avoid-non-const-global-variables,cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays) +// NOLINTBEGIN(cert-err58-cpp,cppcoreguidelines-avoid-do-while,readability-function-cognitive-complexity,cppcoreguidelines-avoid-non-const-global-variables,cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays,clang-analyzer-optin.core.EnumCastOutOfRange) #include #include #include #include +#include +#include #include #include @@ -18,6 +20,7 @@ #include "../../src/spider/storage/MetadataStorage.hpp" #include "../storage/StorageTestHelper.hpp" +namespace { TEMPLATE_LIST_TEST_CASE( "FIFO schedule order", "[scheduler][storage]", @@ -154,5 +157,6 @@ TEMPLATE_LIST_TEST_CASE( REQUIRE(metadata_store->remove_job(job_id).success()); } +} // namespace -// NOLINTEND(cert-err58-cpp,cppcoreguidelines-avoid-do-while,readability-function-cognitive-complexity,cppcoreguidelines-avoid-non-const-global-variables,cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays) +// NOLINTEND(cert-err58-cpp,cppcoreguidelines-avoid-do-while,readability-function-cognitive-complexity,cppcoreguidelines-avoid-non-const-global-variables,cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays,clang-analyzer-optin.core.EnumCastOutOfRange) From cd01beb2f1a0e186409237eba9ce1e706ce10464 Mon Sep 17 00:00:00 2001 From: Sitao Wang Date: Wed, 4 Dec 2024 07:33:13 +0000 Subject: [PATCH 09/14] Fix clang tidy --- src/spider/scheduler/FifoPolicy.cpp | 6 +++--- src/spider/storage/MysqlStorage.hpp | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/spider/scheduler/FifoPolicy.cpp b/src/spider/scheduler/FifoPolicy.cpp index aea50f5..fac95fd 100644 --- a/src/spider/scheduler/FifoPolicy.cpp +++ b/src/spider/scheduler/FifoPolicy.cpp @@ -1,20 +1,20 @@ #include "FifoPolicy.hpp" #include -#include #include #include -#include #include +#include #include +#include #include #include #include #include -#include "../core/Task.hpp" #include "../core/JobMetadata.hpp" +#include "../core/Task.hpp" #include "../storage/DataStorage.hpp" #include "../storage/MetadataStorage.hpp" diff --git a/src/spider/storage/MysqlStorage.hpp b/src/spider/storage/MysqlStorage.hpp index 6ea38db..6a3938c 100644 --- a/src/spider/storage/MysqlStorage.hpp +++ b/src/spider/storage/MysqlStorage.hpp @@ -12,9 +12,9 @@ #include "../core/Data.hpp" #include "../core/Error.hpp" +#include "../core/JobMetadata.hpp" #include "../core/Task.hpp" #include "../core/TaskGraph.hpp" -#include "../core/JobMetadata.hpp" #include "DataStorage.hpp" #include "MetadataStorage.hpp" From 289a151cc23fd6d96302ff6e65d56059a145e66f Mon Sep 17 00:00:00 2001 From: Sitao Wang Date: Wed, 4 Dec 2024 08:02:21 +0000 Subject: [PATCH 10/14] Fix clang tidy --- src/spider/scheduler/FifoPolicy.cpp | 3 ++- src/spider/scheduler/SchedulerPolicy.hpp | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/spider/scheduler/FifoPolicy.cpp b/src/spider/scheduler/FifoPolicy.cpp index fac95fd..2f1bf8e 100644 --- a/src/spider/scheduler/FifoPolicy.cpp +++ b/src/spider/scheduler/FifoPolicy.cpp @@ -1,9 +1,9 @@ #include "FifoPolicy.hpp" +#include #include #include #include -#include #include #include #include @@ -13,6 +13,7 @@ #include #include +#include "../core/Data.hpp" #include "../core/JobMetadata.hpp" #include "../core/Task.hpp" #include "../storage/DataStorage.hpp" diff --git a/src/spider/scheduler/SchedulerPolicy.hpp b/src/spider/scheduler/SchedulerPolicy.hpp index 7c41474..8295975 100644 --- a/src/spider/scheduler/SchedulerPolicy.hpp +++ b/src/spider/scheduler/SchedulerPolicy.hpp @@ -23,11 +23,11 @@ class SchedulerPolicy { virtual auto schedule_next( std::shared_ptr metadata_store, std::shared_ptr data_store, - boost::uuids::uuid const worker_id, + boost::uuids::uuid worker_id, std::string const& worker_addr ) -> std::optional = 0; - virtual auto cleanup_job(boost::uuids::uuid const job_id) -> void = 0; + virtual auto cleanup_job(boost::uuids::uuid job_id) -> void = 0; }; } // namespace spider::scheduler From d614d9b0a7942263656eb954d50e975856b5a0d3 Mon Sep 17 00:00:00 2001 From: Sitao Wang Date: Wed, 4 Dec 2024 21:25:00 +0000 Subject: [PATCH 11/14] Add error handle for mysql timestamp parsing --- src/spider/storage/MysqlStorage.cpp | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/src/spider/storage/MysqlStorage.cpp b/src/spider/storage/MysqlStorage.cpp index bcbbfc5..0f3e756 100644 --- a/src/spider/storage/MysqlStorage.cpp +++ b/src/spider/storage/MysqlStorage.cpp @@ -706,10 +706,14 @@ auto MySqlMetadataStorage::get_task_graph(boost::uuids::uuid id, TaskGraph* task namespace { -auto parse_timestamp(std::string const& timestamp) -> std::chrono::system_clock::time_point { +auto parse_timestamp(std::string const& timestamp +) -> std::optional { std::tm time_date{}; std::stringstream ss{timestamp}; ss >> std::get_time(&time_date, "%Y-%m-%d %H:%M:%S"); + if (ss.fail()) { + return std::nullopt; + } return std::chrono::system_clock::from_time_t(std::mktime(&time_date)); } @@ -734,9 +738,19 @@ auto MySqlMetadataStorage::get_job_metadata(boost::uuids::uuid id, JobMetadata* } res->next(); boost::uuids::uuid const client_id = read_id(res->getBinaryStream("client_id")); - std::chrono::system_clock::time_point const creation_time + std::optional const optional_creation_time = parse_timestamp(res->getString("creation_time").c_str()); - *job = JobMetadata{id, client_id, creation_time}; + if (false == optional_creation_time.has_value()) { + m_conn->rollback(); + return StorageErr{ + StorageErrType::OtherErr, + fmt::format( + "Cannot parse timestamp {}", + res->getString("creation_time").c_str() + ) + }; + } + *job = JobMetadata{id, client_id, optional_creation_time.value()}; } catch (sql::SQLException& e) { m_conn->rollback(); return StorageErr{StorageErrType::OtherErr, e.what()}; From c10a764c1bb8eb51d3966d1b9e58763725547b3c Mon Sep 17 00:00:00 2001 From: Sitao Wang Date: Wed, 4 Dec 2024 21:26:01 +0000 Subject: [PATCH 12/14] Mark JobMetadata getter as const --- src/spider/core/JobMetadata.hpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/spider/core/JobMetadata.hpp b/src/spider/core/JobMetadata.hpp index f29f8bb..9f4715f 100644 --- a/src/spider/core/JobMetadata.hpp +++ b/src/spider/core/JobMetadata.hpp @@ -20,11 +20,11 @@ class JobMetadata { m_client_id{client_id}, m_creation_time{creation_time} {} - [[nodiscard]] auto get_id() -> boost::uuids::uuid { return m_id; } + [[nodiscard]] auto get_id() const -> boost::uuids::uuid { return m_id; } - [[nodiscard]] auto get_client_id() -> boost::uuids::uuid { return m_client_id; } + [[nodiscard]] auto get_client_id() const -> boost::uuids::uuid { return m_client_id; } - [[nodiscard]] auto get_creation_time() -> std::chrono::system_clock::time_point { + [[nodiscard]] auto get_creation_time() const -> std::chrono::system_clock::time_point { return m_creation_time; } From 93481f85231a064e67c25652166a9e1a8c6f411f Mon Sep 17 00:00:00 2001 From: Sitao Wang Date: Wed, 4 Dec 2024 21:28:34 +0000 Subject: [PATCH 13/14] Add more check for scheduler test --- tests/scheduler/test-SchedulerPolicy.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/scheduler/test-SchedulerPolicy.cpp b/tests/scheduler/test-SchedulerPolicy.cpp index 1abd3ab..19dc99c 100644 --- a/tests/scheduler/test-SchedulerPolicy.cpp +++ b/tests/scheduler/test-SchedulerPolicy.cpp @@ -92,7 +92,7 @@ TEMPLATE_LIST_TEST_CASE( spider::core::Data data{"value"}; data.set_hard_locality(true); data.set_locality({"127.0.0.1"}); - auto err = data_store->add_data(data); + REQUIRE(data_store->add_data(data).success()); task.add_input(spider::core::TaskInput{data.get_id(), "int"}); spider::core::TaskGraph graph; graph.add_task(task); @@ -112,7 +112,7 @@ TEMPLATE_LIST_TEST_CASE( REQUIRE(task_id == task.get_id()); } - // REQUIRE(metadata_store->remove_job(job_id).success()); + REQUIRE(metadata_store->remove_job(job_id).success()); } TEMPLATE_LIST_TEST_CASE( @@ -137,7 +137,7 @@ TEMPLATE_LIST_TEST_CASE( spider::core::Data data; data.set_hard_locality(false); data.set_locality({"127.0.0.1"}); - data_store->add_data(data); + REQUIRE(data_store->add_data(data).success()); task.add_input(spider::core::TaskInput{data.get_id(), "int"}); spider::core::TaskGraph graph; graph.add_task(task); From 7725428c84b532e42a34296ec73b95d436baa91f Mon Sep 17 00:00:00 2001 From: Sitao Wang Date: Wed, 4 Dec 2024 21:33:46 +0000 Subject: [PATCH 14/14] Fix typo in comment --- tests/scheduler/test-SchedulerPolicy.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/scheduler/test-SchedulerPolicy.cpp b/tests/scheduler/test-SchedulerPolicy.cpp index 19dc99c..6e2b823 100644 --- a/tests/scheduler/test-SchedulerPolicy.cpp +++ b/tests/scheduler/test-SchedulerPolicy.cpp @@ -132,7 +132,7 @@ TEMPLATE_LIST_TEST_CASE( boost::uuids::random_generator gen; boost::uuids::uuid const job_id = gen(); - // Submit task with hard locality + // Submit task with soft locality spider::core::Task task{"task"}; spider::core::Data data; data.set_hard_locality(false);