Skip to content

Commit

Permalink
Improve OnIdleTasks management (#23)
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Oct 28, 2022
1 parent 07631a2 commit 7b12302
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 51 deletions.
72 changes: 41 additions & 31 deletions util/proactor_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,53 +82,63 @@ void ProactorBase::Stop() {
VLOG(1) << "Proactor::StopFinish";
}

uint32_t ProactorBase::AddIdleTask(IdleTask f) {
uint32_t ProactorBase::AddOnIdleTask(OnIdleTask f) {
DCHECK(InMyThread());

auto id = next_task_id_++;
auto res = on_idle_map_.emplace(id, std::move(f));
CHECK(res.second);
idle_it_ = on_idle_map_.begin(); // reset position to the first item.
uint32_t res = on_idle_arr_.size();
on_idle_arr_.push_back(OnIdleWrapper{.task = std::move(f), .next_ts = 0});

return id;
return res;
}

void ProactorBase::RunOnIdleTasks() {
if (on_idle_map_.empty())
return;

if (idle_it_ == on_idle_map_.end()) {
idle_it_ = on_idle_map_.begin();
}
bool ProactorBase::RunOnIdleTasks() {
if (on_idle_arr_.empty())
return false;

uint64_t start = GetClockNanos();
tl_info_.monotonic_time = start;
uint64_t curr_ts = start;

// Perform round robin with idle_it_ saving the position between runs.
bool should_spin = false;

DCHECK_LT(on_idle_next_, on_idle_arr_.size());

// Perform round robin with on_idle_next_ saving the position between runs.
do {
bool res = idle_it_->second();
OnIdleWrapper& on_idle = on_idle_arr_[on_idle_next_];

if (!res) {
on_idle_map_.erase(idle_it_);
idle_it_ = on_idle_map_.begin();
break;
if (on_idle.task && on_idle.next_ts <= curr_ts) {
tl_info_.monotonic_time = curr_ts;

uint32_t level = on_idle.task(); // run the task

curr_ts = GetClockNanos();

if (level >= kOnIdleMaxLevel) {
level = kOnIdleMaxLevel;
should_spin = true;
} else {
uint64_t delta_ns = uint64_t(kIdleCycleMaxMicros) * 1000 / (1 << level);
on_idle.next_ts = curr_ts + delta_ns;
}
}

++idle_it_;
if (idle_it_ == on_idle_map_.end()) {
idle_it_ = on_idle_map_.begin();
++on_idle_next_;
if (on_idle_next_ == on_idle_arr_.size()) {
on_idle_next_ = 0;
break;
}
} while (curr_ts < start + 10000); // 10usec for the run.

tl_info_.monotonic_time = GetClockNanos();
} while (tl_info_.monotonic_time < start + 100000); // 100usec for the run.
return should_spin;
}

void ProactorBase::CancelIdleTask(uint32_t id) {
auto it = on_idle_map_.find(id);
if (it != on_idle_map_.end()) {
on_idle_map_.erase(it);
idle_it_ = on_idle_map_.begin();
}
bool ProactorBase::RemoveOnIdleTask(uint32_t id) {
if (id >= on_idle_arr_.size() || !on_idle_arr_[id].task)
return false;

on_idle_arr_[id].task = OnIdleTask{};

return true;
}

uint32_t ProactorBase::AddPeriodic(uint32_t ms, PeriodicTask f) {
Expand Down
42 changes: 29 additions & 13 deletions util/proactor_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ class ProactorBase {
public:
enum ProactorKind { EPOLL = 1, IOURING = 2 };

// Corresponds to level 0.
// Idle tasks will rest at least kIdleCycleMaxMicros / (2^level) time between runs.
static const uint32_t kIdleCycleMaxMicros = 1000000u;

// The "hottest" task can have this maxlevel.
// In that case, proactor will always spin calling that task until it will cool down.
static const uint32_t kOnIdleMaxLevel = 21;

ProactorBase();
virtual ~ProactorBase();

Expand Down Expand Up @@ -135,7 +143,7 @@ class ProactorBase {
return fb;
}

using IdleTask = std::function<bool()>;
using OnIdleTask = std::function<uint32_t()>;
using PeriodicTask = std::function<void()>;

/**
Expand All @@ -147,11 +155,7 @@ class ProactorBase {
* @param f
* @return uint32_t an unique ids denoting this task. Can be used for cancellation.
*/
uint32_t AddIdleTask(IdleTask f);

bool IsIdleTaskActive(uint32_t id) const {
return on_idle_map_.contains(id);
}
uint32_t AddOnIdleTask(OnIdleTask f);

//! Must be called from the proactor thread.
//! PeriodicTask should not block since it runs from I/O loop.
Expand All @@ -161,7 +165,8 @@ class ProactorBase {
//! Blocking until the task has been cancelled. Should not run directly from I/O loop
//! i.e. only from Await or another fiber.
void CancelPeriodic(uint32_t id);
void CancelIdleTask(uint32_t id);

bool RemoveOnIdleTask(uint32_t id);

// Migrates the calling fibers to the destination proactor.
// Calling fiber must belong to this proactor.
Expand Down Expand Up @@ -199,16 +204,21 @@ class ProactorBase {

virtual void SchedulePeriodic(uint32_t id, PeriodicItem* item) = 0;
virtual void CancelPeriodicInternal(uint32_t val1, uint32_t val2) = 0;
void RunOnIdleTasks();

// Returns true if we should continue spinning or false otherwise.
bool RunOnIdleTasks();

static void Pause(unsigned strength);
static void ModuleInit();

static uint64_t GetClockNanos() {
timespec ts;
// absl::GetCurrentTimeNanos() is not monotonic and it syncs with the system clock.
// absl::GetCurrentTimeNanos() might be non-monotonic and sync with the system clock.
// but it's much more efficient than clock_gettime.
return absl::GetCurrentTimeNanos();
/*timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return ts.tv_nsec + ts.tv_sec * 1000000000UL;
return ts.tv_nsec + ts.tv_sec * 1000000000UL;*/
}

pthread_t thread_id_ = 0U;
Expand Down Expand Up @@ -240,8 +250,14 @@ class ProactorBase {
FiberSchedAlgo* scheduler_ = nullptr;

// Runs tasks when there is available cpu time and no I/O events demand it.
absl::flat_hash_map<uint32_t, IdleTask> on_idle_map_;
absl::flat_hash_map<uint32_t, IdleTask>::const_iterator idle_it_;
struct OnIdleWrapper {
OnIdleTask task;
uint64_t next_ts; // when to run the next time in nano seconds.
};

std::vector<OnIdleWrapper> on_idle_arr_;
uint32_t on_idle_next_ = 0;

absl::flat_hash_map<uint32_t, PeriodicItem*> periodic_map_;

struct TLInfo {
Expand Down
10 changes: 3 additions & 7 deletions util/uring/proactor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ Proactor::Proactor() : ProactorBase() {
}

Proactor::~Proactor() {
on_idle_map_.clear();

CHECK(is_stopped_);
if (thread_id_ != -1U) {
io_uring_queue_exit(&ring_);
Expand Down Expand Up @@ -251,6 +249,8 @@ void Proactor::Run() {
continue;
}

bool should_spin = RunOnIdleTasks();

// Dispatcher runs the scheduling loop. Every time a fiber preempts it awakens dispatcher
// so that when that we eventually get to the dispatcher fiber again. dispatcher calls
// suspend_until() only when pick_next returns null, i.e. there are no active fibers to run.
Expand All @@ -263,15 +263,11 @@ void Proactor::Run() {
goto spin_start;
}

if (!on_idle_map_.empty()) {
if (should_spin) {
// if on_idle_map_ is not empty we should not block on WAIT_SECTION_STATE.
// Instead we use the cpu time on doing on_idle work.
wait_for_cqe(&ring_, 0); // a dip into kernel to fetch more cqes.

if (!CQReadyCount(ring_)) {
RunOnIdleTasks();
}

continue; // continue spinning until on_idle_map_ is empty.
}

Expand Down

0 comments on commit 7b12302

Please sign in to comment.