Skip to content

Commit

Permalink
Improve topology pausing logic
Browse files Browse the repository at this point in the history
Previously, it would attempt to pause the topology regardless of runstate, including RunState::Finished, which could hypothetically lead to an invalid RunState::Finished->RunState::Paused transition. This also cleans up the narrative in the log messages.
  • Loading branch information
nathanwbrei committed Nov 27, 2024
1 parent 0e5d1d0 commit f9f0cbe
Showing 1 changed file with 27 additions and 22 deletions.
49 changes: 27 additions & 22 deletions src/libraries/JANA/Engine/JExecutionEngine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -569,31 +569,36 @@ void JExecutionEngine::FindNextReadyTask_Unsafe(Task& task, WorkerState& worker)
// so we check whether more are potentially coming. If not, we can pause the topology.
// Note that our worker threads will still wait at ExchangeTask() until they get
// shut down separately during Scale().

bool any_active_source_found = false;
bool any_active_task_found = false;

LOG_DEBUG(GetLogger()) << "Scheduler: No tasks ready" << LOG_END;

for (size_t arrow_id = 0; arrow_id < m_arrow_states.size(); ++arrow_id) {
auto& state = m_arrow_states[arrow_id];
any_active_source_found |= (state.status == ArrowState::Status::Running && state.is_source);
any_active_task_found |= (state.active_tasks != 0);
// A source might have been deactivated by RequestPause, Ctrl-C, etc, and might be inactive even though it still has active tasks
}

if (!any_active_source_found && !any_active_task_found) {
// Pause the topology
m_time_at_finish = clock_t::now();
m_event_count_at_finish = 0;
for (auto& arrow_state : m_arrow_states) {
if (arrow_state.is_sink) {
m_event_count_at_finish += arrow_state.events_processed;
if (m_runstatus == RunStatus::Running || m_runstatus == RunStatus::Pausing || m_runstatus == RunStatus::Draining) {
// We want to avoid scenarios such as where the topology already Finished but then gets reset to Paused
// This also leaves a cleaner narrative in the logs.

bool any_active_source_found = false;
bool any_active_task_found = false;

LOG_DEBUG(GetLogger()) << "Scheduler: No tasks ready" << LOG_END;

for (size_t arrow_id = 0; arrow_id < m_arrow_states.size(); ++arrow_id) {
auto& state = m_arrow_states[arrow_id];
any_active_source_found |= (state.status == ArrowState::Status::Running && state.is_source);
any_active_task_found |= (state.active_tasks != 0);
// A source might have been deactivated by RequestPause, Ctrl-C, etc, and might be inactive even though it still has active tasks
}

if (!any_active_source_found && !any_active_task_found) {
// Pause the topology
m_time_at_finish = clock_t::now();
m_event_count_at_finish = 0;
for (auto& arrow_state : m_arrow_states) {
if (arrow_state.is_sink) {
m_event_count_at_finish += arrow_state.events_processed;
}
}
LOG_DEBUG(GetLogger()) << "Scheduler: Processing paused" << LOG_END;
m_runstatus = RunStatus::Paused;
// I think this is the ONLY site where the topology gets paused. Verify this?
}
LOG_DEBUG(GetLogger()) << "Scheduler: Processing paused" << LOG_END;
m_runstatus = RunStatus::Paused;
// I think this is the ONLY site where the topology gets paused. Verify this?
}

worker.last_arrow_id = -1;
Expand Down

0 comments on commit f9f0cbe

Please sign in to comment.