From 7b123020ae7f77abd447f3fba4a271e021dfa09b Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Fri, 28 Oct 2022 20:15:44 +0300 Subject: [PATCH] Improve OnIdleTasks management (#23) Signed-off-by: Roman Gershman --- util/proactor_base.cc | 72 ++++++++++++++++++++++++------------------ util/proactor_base.h | 42 ++++++++++++++++-------- util/uring/proactor.cc | 10 ++---- 3 files changed, 73 insertions(+), 51 deletions(-) diff --git a/util/proactor_base.cc b/util/proactor_base.cc index 87fe827f..0281e463 100644 --- a/util/proactor_base.cc +++ b/util/proactor_base.cc @@ -82,53 +82,63 @@ void ProactorBase::Stop() { VLOG(1) << "Proactor::StopFinish"; } -uint32_t ProactorBase::AddIdleTask(IdleTask f) { +uint32_t ProactorBase::AddOnIdleTask(OnIdleTask f) { DCHECK(InMyThread()); - auto id = next_task_id_++; - auto res = on_idle_map_.emplace(id, std::move(f)); - CHECK(res.second); - idle_it_ = on_idle_map_.begin(); // reset position to the first item. + uint32_t res = on_idle_arr_.size(); + on_idle_arr_.push_back(OnIdleWrapper{.task = std::move(f), .next_ts = 0}); - return id; + return res; } -void ProactorBase::RunOnIdleTasks() { - if (on_idle_map_.empty()) - return; - - if (idle_it_ == on_idle_map_.end()) { - idle_it_ = on_idle_map_.begin(); - } +bool ProactorBase::RunOnIdleTasks() { + if (on_idle_arr_.empty()) + return false; uint64_t start = GetClockNanos(); - tl_info_.monotonic_time = start; + uint64_t curr_ts = start; - // Perform round robin with idle_it_ saving the position between runs. + bool should_spin = false; + + DCHECK_LT(on_idle_next_, on_idle_arr_.size()); + + // Perform round robin with on_idle_next_ saving the position between runs. do { - bool res = idle_it_->second(); + OnIdleWrapper& on_idle = on_idle_arr_[on_idle_next_]; - if (!res) { - on_idle_map_.erase(idle_it_); - idle_it_ = on_idle_map_.begin(); - break; + if (on_idle.task && on_idle.next_ts <= curr_ts) { + tl_info_.monotonic_time = curr_ts; + + uint32_t level = on_idle.task(); // run the task + + curr_ts = GetClockNanos(); + + if (level >= kOnIdleMaxLevel) { + level = kOnIdleMaxLevel; + should_spin = true; + } else { + uint64_t delta_ns = uint64_t(kIdleCycleMaxMicros) * 1000 / (1 << level); + on_idle.next_ts = curr_ts + delta_ns; + } } - ++idle_it_; - if (idle_it_ == on_idle_map_.end()) { - idle_it_ = on_idle_map_.begin(); + ++on_idle_next_; + if (on_idle_next_ == on_idle_arr_.size()) { + on_idle_next_ = 0; + break; } + } while (curr_ts < start + 10000); // 10usec for the run. - tl_info_.monotonic_time = GetClockNanos(); - } while (tl_info_.monotonic_time < start + 100000); // 100usec for the run. + return should_spin; } -void ProactorBase::CancelIdleTask(uint32_t id) { - auto it = on_idle_map_.find(id); - if (it != on_idle_map_.end()) { - on_idle_map_.erase(it); - idle_it_ = on_idle_map_.begin(); - } +bool ProactorBase::RemoveOnIdleTask(uint32_t id) { + if (id >= on_idle_arr_.size() || !on_idle_arr_[id].task) + return false; + + on_idle_arr_[id].task = OnIdleTask{}; + + return true; } uint32_t ProactorBase::AddPeriodic(uint32_t ms, PeriodicTask f) { diff --git a/util/proactor_base.h b/util/proactor_base.h index e80833fa..6b0ee2b4 100644 --- a/util/proactor_base.h +++ b/util/proactor_base.h @@ -35,6 +35,14 @@ class ProactorBase { public: enum ProactorKind { EPOLL = 1, IOURING = 2 }; + // Corresponds to level 0. + // Idle tasks will rest at least kIdleCycleMaxMicros / (2^level) time between runs. + static const uint32_t kIdleCycleMaxMicros = 1000000u; + + // The "hottest" task can have this maxlevel. + // In that case, proactor will always spin calling that task until it will cool down. + static const uint32_t kOnIdleMaxLevel = 21; + ProactorBase(); virtual ~ProactorBase(); @@ -135,7 +143,7 @@ class ProactorBase { return fb; } - using IdleTask = std::function; + using OnIdleTask = std::function; using PeriodicTask = std::function; /** @@ -147,11 +155,7 @@ class ProactorBase { * @param f * @return uint32_t an unique ids denoting this task. Can be used for cancellation. */ - uint32_t AddIdleTask(IdleTask f); - - bool IsIdleTaskActive(uint32_t id) const { - return on_idle_map_.contains(id); - } + uint32_t AddOnIdleTask(OnIdleTask f); //! Must be called from the proactor thread. //! PeriodicTask should not block since it runs from I/O loop. @@ -161,7 +165,8 @@ class ProactorBase { //! Blocking until the task has been cancelled. Should not run directly from I/O loop //! i.e. only from Await or another fiber. void CancelPeriodic(uint32_t id); - void CancelIdleTask(uint32_t id); + + bool RemoveOnIdleTask(uint32_t id); // Migrates the calling fibers to the destination proactor. // Calling fiber must belong to this proactor. @@ -199,16 +204,21 @@ class ProactorBase { virtual void SchedulePeriodic(uint32_t id, PeriodicItem* item) = 0; virtual void CancelPeriodicInternal(uint32_t val1, uint32_t val2) = 0; - void RunOnIdleTasks(); + + // Returns true if we should continue spinning or false otherwise. + bool RunOnIdleTasks(); static void Pause(unsigned strength); static void ModuleInit(); static uint64_t GetClockNanos() { - timespec ts; - // absl::GetCurrentTimeNanos() is not monotonic and it syncs with the system clock. + // absl::GetCurrentTimeNanos() might be non-monotonic and sync with the system clock. + // but it's much more efficient than clock_gettime. + return absl::GetCurrentTimeNanos(); + /*timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); - return ts.tv_nsec + ts.tv_sec * 1000000000UL; + return ts.tv_nsec + ts.tv_sec * 1000000000UL;*/ } pthread_t thread_id_ = 0U; @@ -240,8 +250,14 @@ class ProactorBase { FiberSchedAlgo* scheduler_ = nullptr; // Runs tasks when there is available cpu time and no I/O events demand it. - absl::flat_hash_map on_idle_map_; - absl::flat_hash_map::const_iterator idle_it_; + struct OnIdleWrapper { + OnIdleTask task; + uint64_t next_ts; // when to run the next time in nano seconds. + }; + + std::vector on_idle_arr_; + uint32_t on_idle_next_ = 0; + absl::flat_hash_map periodic_map_; struct TLInfo { diff --git a/util/uring/proactor.cc b/util/uring/proactor.cc index f8bd3007..8bd75ae9 100644 --- a/util/uring/proactor.cc +++ b/util/uring/proactor.cc @@ -86,8 +86,6 @@ Proactor::Proactor() : ProactorBase() { } Proactor::~Proactor() { - on_idle_map_.clear(); - CHECK(is_stopped_); if (thread_id_ != -1U) { io_uring_queue_exit(&ring_); @@ -251,6 +249,8 @@ void Proactor::Run() { continue; } + bool should_spin = RunOnIdleTasks(); + // Dispatcher runs the scheduling loop. Every time a fiber preempts it awakens dispatcher // so that when that we eventually get to the dispatcher fiber again. dispatcher calls // suspend_until() only when pick_next returns null, i.e. there are no active fibers to run. @@ -263,15 +263,11 @@ void Proactor::Run() { goto spin_start; } - if (!on_idle_map_.empty()) { + if (should_spin) { // if on_idle_map_ is not empty we should not block on WAIT_SECTION_STATE. // Instead we use the cpu time on doing on_idle work. wait_for_cqe(&ring_, 0); // a dip into kernel to fetch more cqes. - if (!CQReadyCount(ring_)) { - RunOnIdleTasks(); - } - continue; // continue spinning until on_idle_map_ is empty. }