-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add scheduler server that listens and responds to schedule task requests #33
Conversation
WalkthroughThe pull request introduces several modifications to the Spider project, primarily focusing on the scheduler component. New source and header files, including Changes
Possibly related PRs
Suggested reviewers
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (3)
🧰 Additional context used🪛 cppcheck (2.10-2)tests/scheduler/test-SchedulerServer.cpp[performance] 88-88: Variable 'id' is assigned in constructor body. Consider performing initialization in initialization list. (useInitializationList) src/spider/scheduler/SchedulerServer.cpp[performance] 88-88: Variable 'id' is assigned in constructor body. Consider performing initialization in initialization list. (useInitializationList) [performance] 107-107: Variable 'm_id' is assigned in constructor body. Consider performing initialization in initialization list. (useInitializationList) 🔇 Additional comments (7)src/spider/scheduler/SchedulerServer.hpp (2)
The deletion of the copy and move constructors and assignment operators prevents unintended copying or moving of
All member variables are properly initialized using the initializer list, ensuring efficient construction of the tests/scheduler/test-SchedulerServer.cpp (3)
Socket operations might hang indefinitely if the server does not respond promptly. Adding appropriate timeouts to the client socket can prevent tests from blocking indefinitely. Apply this diff to set a timeout on the socket: boost::asio::ip::tcp::socket socket{context};
+boost::asio::socket_base::timeout timeout_option{boost::posix_time::seconds(5)};
+socket.set_option(timeout_option);
boost::asio::connect(socket, std::vector{endpoint});
To prevent potential undefined behaviour or exceptions if the thread has already been joined or not started, check if the thread is joinable before calling Apply this diff to safely join the thread: socket.close();
server.stop();
-thread.join();
+if (thread.joinable()) {
+ thread.join();
+}
Use dynamic port allocation to avoid conflicts Hardcoding the port number to Apply this diff to adjust the port allocation: -constexpr unsigned short cPort = 6021;
-spider::scheduler::SchedulerServer server{cPort, policy, metadata_store, data_store};
+spider::scheduler::SchedulerServer server{0, policy, metadata_store, data_store};
+unsigned short const port = server.get_port(); Ensure that you implement the src/spider/scheduler/SchedulerServer.cpp (2)
After logging the error when
The Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
🧹 Outside diff range and nitpick comments (3)
src/spider/scheduler/SchedulerServer.cpp (2)
86-87
: Simplify code by passingstd::optional
by valuePassing
std::optional<msgpack::sbuffer>
by const reference is unnecessary sincestd::optional
is a lightweight object designed to be passed by value. This change can simplify the code and avoid potential issues with references.Apply this diff to adjust the variable declaration:
- std::optional<msgpack::sbuffer> const& optional_message_buffer + std::optional<msgpack::sbuffer> optional_message_buffer
95-96
: Simplify code by passingstd::optional
by valueSimilarly, passing
std::optional<ScheduleTaskRequest>
by const reference is unnecessary. Passing it by value is more appropriate for this lightweight object.Apply this diff to adjust the variable declaration:
- std::optional<ScheduleTaskRequest> const& optional_request + std::optional<ScheduleTaskRequest> optional_requesttests/scheduler/test-SchedulerServer.cpp (1)
28-90
: Add negative test casesThe test only covers the happy path. Consider adding test cases for:
- Invalid worker addresses
- Malformed requests
- Server unavailable scenarios
- Network errors
🧰 Tools
🪛 cppcheck (2.10-2)
[performance] 88-88: Variable 'id' is assigned in constructor body. Consider performing initialization in initialization list.
(useInitializationList)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (11)
src/spider/CMakeLists.txt
(1 hunks)src/spider/scheduler/SchedulerMessage.hpp
(1 hunks)src/spider/scheduler/SchedulerServer.cpp
(1 hunks)src/spider/scheduler/SchedulerServer.hpp
(1 hunks)src/spider/storage/MysqlStorage.cpp
(1 hunks)test-tasks.yaml
(1 hunks)tests/CMakeLists.txt
(1 hunks)tests/scheduler/test-SchedulerPolicy.cpp
(0 hunks)tests/scheduler/test-SchedulerServer.cpp
(1 hunks)tests/storage/test-DataStorage.cpp
(2 hunks)tests/utils/CoreTaskUtils.cpp
(1 hunks)
💤 Files with no reviewable changes (1)
- tests/scheduler/test-SchedulerPolicy.cpp
🧰 Additional context used
🪛 cppcheck (2.10-2)
tests/scheduler/test-SchedulerServer.cpp
[performance] 88-88: Variable 'id' is assigned in constructor body. Consider performing initialization in initialization list.
(useInitializationList)
src/spider/scheduler/SchedulerServer.cpp
[performance] 88-88: Variable 'id' is assigned in constructor body. Consider performing initialization in initialization list.
(useInitializationList)
[performance] 107-107: Variable 'm_id' is assigned in constructor body. Consider performing initialization in initialization list.
(useInitializationList)
🔇 Additional comments (7)
test-tasks.yaml (1)
11-12
: Updated test commands include scheduler tests
The new test commands correctly include the scheduler tests and exclude the storage tests as intended.
tests/CMakeLists.txt (1)
13-13
: Addition of test-SchedulerServer.cpp
to test sources
Adding scheduler/test-SchedulerServer.cpp
to SPIDER_TEST_SOURCES
ensures that the new scheduler server tests are included in the test build.
src/spider/scheduler/SchedulerMessage.hpp (1)
22-24
: Consider validating worker_addr format
The constructor accepts any string as a worker address without validation. Consider adding format validation to ensure the address is properly formatted (e.g., IP:port or hostname:port).
src/spider/CMakeLists.txt (1)
94-96
: LGTM!
The new scheduler files are properly added to the build system.
tests/storage/test-DataStorage.cpp (1)
94-95
: LGTM! Good improvement to test cleanup
The changes improve test reliability by:
- Storing job_id in a variable for reuse
- Adding proper cleanup of the job after test completion
Also applies to: 109-111
src/spider/storage/MysqlStorage.cpp (1)
476-486
: LGTM! Verify task state transitions
The implementation correctly marks head tasks as ready within the same transaction, maintaining data consistency. The error handling with rollback is properly implemented.
Let's verify the task state transitions:
✅ Verification successful
LGTM! Task state transitions are properly implemented
The implementation correctly handles task state transitions with proper error handling and transaction management. The evidence shows:
- Task state changes are managed through the
set_task_state
method inMysqlStorage
- State transitions are properly validated through the
task_state_to_string
function - The TaskExecutor tests verify the task execution states (success, failure, errors)
- The storage layer implements proper state management through the
MetadataStorage
interface
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify that head tasks are properly marked as ready
# and that the state transitions are handled correctly
# Check for any tasks that might be in an inconsistent state
rg -A 5 "UPDATE.*tasks.*SET.*state.*=.*ready"
# Look for related test cases
rg -l "task.*state.*ready" "tests/"
Length of output: 580
Script:
#!/bin/bash
# Let's check for task state transitions and related test coverage
# Find all task state-related code
rg -A 3 "state.*=.*('ready'|'running'|'completed'|'failed')" --type cpp
# Look for task state enum or constants
ast-grep --pattern 'enum $_ {
$$$
ready
$$$
}'
# Check for task state validation
rg -A 3 "validate.*state|check.*state" --type cpp
# Look for test files specifically related to task scheduling
fd test.*task
Length of output: 971
Script:
#!/bin/bash
# Let's examine the test files and task state management more closely
# Check the task executor test content
cat tests/worker/test-TaskExecutor.cpp
# Look at the test tasks configuration
cat test-tasks.yaml
# Search for task state transition methods
rg -A 5 "task.*transition|set.*task.*state" --type cpp
# Find task state validation in MySQL storage
rg -A 5 "BEGIN|COMMIT|ROLLBACK" src/spider/storage/MysqlStorage.cpp
Length of output: 10254
tests/utils/CoreTaskUtils.cpp (1)
144-147
: Please clarify the rationale for excluding task state comparison
The comment "Task state might not be the same" doesn't fully explain why we're ignoring state differences in task equality comparisons. This change makes the equality check more permissive and could potentially mask state-related issues in tests.
Consider:
- Adding a more detailed comment explaining why task state comparison was removed
- Documenting any specific scheduler server test cases that require this behaviour
Let's verify how this affects scheduler server tests:
✅ Verification successful
Task state comparison removal is appropriate for scheduler testing
The removal of task state comparison is justified based on the following findings:
- From
Task.hpp
, we can see that tasks transition through multiple states (Pending, Ready, Running, Succeed, Failed, Canceled) - The scheduler test (
test-SchedulerServer.cpp
) shows that tasks are being scheduled and their states naturally change during the scheduling process - The task equality function is used in storage tests to verify task relationships and core properties, where the state is not a critical comparison factor
The current comment could be more descriptive, but the change itself is correct as comparing states would make tests brittle when verifying task relationships in a scheduling context.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for test cases that compare tasks with different states
rg -A 5 "task_equal" tests/scheduler/
Length of output: 37
Script:
#!/bin/bash
# Let's search for task_equal usage across all test files
rg -A 5 "task_equal" tests/
Length of output: 2924
Script:
#!/bin/bash
# Let's also check where this function is used in the codebase
ast-grep --pattern 'task_equal($$$)'
Length of output: 38
Script:
#!/bin/bash
# Let's examine the full CoreTaskUtils file to understand the context better
cat tests/utils/CoreTaskUtils.cpp
Length of output: 5175
Script:
#!/bin/bash
# Let's look for scheduler test files to understand test patterns
fd -e cpp -e h scheduler tests/
Length of output: 115
Script:
#!/bin/bash
# Let's examine the scheduler test files to understand the context
cat tests/scheduler/test-SchedulerServer.cpp
Length of output: 4222
Script:
#!/bin/bash
# Let's also check the Task.hpp to understand task states
cat src/spider/core/Task.hpp
Length of output: 4712
SchedulerServer(SchedulerServer&&) = default; | ||
// NOLINTNEXTLINE(clang-diagnostic-defaulted-function-deleted) | ||
auto operator=(SchedulerServer&&) noexcept -> SchedulerServer& = default; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential issue with defaulting move constructor and move assignment operator
The member variables boost::asio::io_context
and boost::asio::ip::tcp::acceptor
are not movable. Defaulting the move constructor and move assignment operator may lead to compilation errors or unintended behaviour.
Consider deleting the move constructor and move assignment operator or properly implementing them to handle the non-movable member variables. Would you like assistance in adjusting the special member functions?
if (ec) { | ||
spdlog::error("Cannot accept connection {}: {}", ec.value(), ec.what()); | ||
} | ||
// Ignore the returned future as we do not need its value | ||
boost::asio::co_spawn( | ||
m_context, | ||
process_message(std::move(socket)), | ||
boost::asio::use_future | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle errors during accept appropriately
When the accept
operation fails (ec
is set), the code logs the error but continues to process the socket. This may lead to undefined behaviour since the socket may not be in a valid state. Consider adding a continue;
statement after logging the error to skip processing the invalid socket.
Apply this diff to fix the error handling:
if (ec) {
spdlog::error("Cannot accept connection {}: {}", ec.value(), ec.what());
+ continue;
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
if (ec) { | |
spdlog::error("Cannot accept connection {}: {}", ec.value(), ec.what()); | |
} | |
// Ignore the returned future as we do not need its value | |
boost::asio::co_spawn( | |
m_context, | |
process_message(std::move(socket)), | |
boost::asio::use_future | |
); | |
} | |
if (ec) { | |
spdlog::error("Cannot accept connection {}: {}", ec.value(), ec.what()); | |
continue; | |
} | |
// Ignore the returned future as we do not need its value | |
boost::asio::co_spawn( | |
m_context, | |
process_message(std::move(socket)), | |
boost::asio::use_future | |
); | |
} |
// NOLINTNEXTLINE(bugprone-unchecked-optional-access) | ||
[[nodiscard]] auto get_task_id() const -> boost::uuids::uuid { return m_task_id.value(); } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Document or handle potential std::optional::value() exception
The get_task_id()
method could throw if m_task_id
is empty. Consider either:
- Documenting when this might occur and the exception behaviour
- Adding error handling
- // NOLINTNEXTLINE(bugprone-unchecked-optional-access)
- [[nodiscard]] auto get_task_id() const -> boost::uuids::uuid { return m_task_id.value(); }
+ /**
+ * @return The task ID
+ * @throws std::bad_optional_access if no task ID is present
+ */
+ [[nodiscard]] auto get_task_id() const -> boost::uuids::uuid {
+ if (!has_task_id()) {
+ throw std::runtime_error("No task ID present in response");
+ }
+ return m_task_id.value();
+ }
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
// NOLINTNEXTLINE(bugprone-unchecked-optional-access) | |
[[nodiscard]] auto get_task_id() const -> boost::uuids::uuid { return m_task_id.value(); } | |
/** | |
* @return The task ID | |
* @throws std::bad_optional_access if no task ID is present | |
*/ | |
[[nodiscard]] auto get_task_id() const -> boost::uuids::uuid { | |
if (!has_task_id()) { | |
throw std::runtime_error("No task ID present in response"); | |
} | |
return m_task_id.value(); | |
} |
constexpr unsigned short cPort = 6021; | ||
spider::scheduler::SchedulerServer server{cPort, policy, metadata_store, data_store}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Use dynamic port allocation for tests
Hard-coded port number could cause conflicts in parallel test execution. Consider using port 0 to let the system assign an available port.
- constexpr unsigned short cPort = 6021;
- spider::scheduler::SchedulerServer server{cPort, policy, metadata_store, data_store};
+ spider::scheduler::SchedulerServer server{0, policy, metadata_store, data_store};
+ unsigned short const port = server.get_port();
Committable suggestion skipped: line range outside the PR's diff.
server.stop(); | ||
thread.join(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider race condition in server shutdown
The current server stop and thread join sequence could lead to a race condition. Consider implementing a proper shutdown sequence.
- server.stop();
- thread.join();
+ // Ensure clean shutdown
+ socket.close();
+ server.stop();
+ if (thread.joinable()) {
+ thread.join();
+ }
Committable suggestion skipped: line range outside the PR's diff.
🧰 Tools
🪛 cppcheck (2.10-2)
[performance] 88-88: Variable 'id' is assigned in constructor body. Consider performing initialization in initialization list.
(useInitializationList)
boost::asio::io_context context; | ||
boost::asio::ip::tcp::endpoint const endpoint{boost::asio::ip::tcp::v4(), cPort}; | ||
boost::asio::ip::tcp::socket socket{context}; | ||
boost::asio::connect(socket, std::vector{endpoint}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add timeout to socket operations
Socket operations could hang indefinitely. Consider adding appropriate timeouts.
boost::asio::io_context context;
- boost::asio::ip::tcp::endpoint const endpoint{boost::asio::ip::tcp::v4(), cPort};
+ boost::asio::ip::tcp::endpoint const endpoint{boost::asio::ip::tcp::v4(), port};
boost::asio::ip::tcp::socket socket{context};
+ socket.set_option(boost::asio::socket_base::timeout(boost::posix_time::seconds(5)));
boost::asio::connect(socket, std::vector{endpoint});
Committable suggestion skipped: line range outside the PR's diff.
Description
As title.
Validation performed
Summary by CodeRabbit
New Features
SchedulerServer
class for managing scheduling tasks and connections.ScheduleTaskRequest
andScheduleTaskResponse
.SchedulerServer
class.Bug Fixes
DataStorage
tests.Tests
SchedulerServer
.