Skip to content

Commit

Permalink
Merge "Memory reclamation infrastructure follow-up" from Tomasz
Browse files Browse the repository at this point in the history
"Various improvements."
  • Loading branch information
avikivity committed Aug 31, 2015
2 parents 5176352 + d88e34d commit 68fee6c
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 30 deletions.
34 changes: 19 additions & 15 deletions core/memory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ struct cpu_pages {

bool is_initialized() const;
bool initialize();
void run_reclaimers(reclaimer_scope);
reclaiming_result run_reclaimers(reclaimer_scope);
void schedule_reclaim();
void set_reclaim_hook(std::function<void (std::function<void ()>)> hook);
void resize(size_t new_size, allocate_system_memory_fn alloc_sys_mem);
Expand Down Expand Up @@ -435,10 +435,7 @@ cpu_pages::find_and_unlink_span_reclaiming(unsigned n_pages) {
if (span) {
return span;
}
auto free_pages_before = nr_free_pages;
run_reclaimers(reclaimer_scope::synchronous_with_alloc);
if (nr_free_pages <= free_pages_before) {
// Reclaimers made no forward progress
if (run_reclaimers(reclaimer_scope::sync) == reclaiming_result::reclaimed_nothing) {
return nullptr;
}
}
Expand Down Expand Up @@ -472,7 +469,7 @@ cpu_pages::allocate_large_and_trim(unsigned n_pages, Trimmer trimmer) {
span->pool = nullptr;
if (nr_free_pages < current_min_free_pages) {
drain_cross_cpu_freelist();
run_reclaimers(reclaimer_scope::synchronous_with_alloc);
run_reclaimers(reclaimer_scope::sync);
if (nr_free_pages < current_min_free_pages) {
schedule_reclaim();
}
Expand Down Expand Up @@ -745,28 +742,35 @@ void cpu_pages::resize(size_t new_size, allocate_system_memory_fn alloc_memory)
}
}

void cpu_pages::run_reclaimers(reclaimer_scope scope) {
reclaiming_result cpu_pages::run_reclaimers(reclaimer_scope scope) {
auto target = std::max(nr_free_pages + 1, min_free_pages);
reclaiming_result result = reclaiming_result::reclaimed_nothing;
while (nr_free_pages < target) {
auto before = nr_free_pages;
bool made_progress = false;
++g_reclaims;
for (auto&& r : reclaimers) {
if (r->scope() >= scope) {
r->do_reclaim();
made_progress |= r->do_reclaim() == reclaiming_result::reclaimed_something;
}
}
if (nr_free_pages <= before) {
// reclaimers made no forward progress
break;
if (!made_progress) {
return result;
}
result = reclaiming_result::reclaimed_something;
}
return result;
}

void cpu_pages::schedule_reclaim() {
current_min_free_pages = 0;
reclaim_hook([this] {
if (nr_free_pages < min_free_pages) {
run_reclaimers(reclaimer_scope::separate_fiber);
try {
run_reclaimers(reclaimer_scope::async);
} catch (...) {
current_min_free_pages = min_free_pages;
throw;
}
}
current_min_free_pages = min_free_pages;
});
Expand Down Expand Up @@ -973,7 +977,7 @@ void set_reclaim_hook(std::function<void (std::function<void ()>)> hook) {
cpu_mem.set_reclaim_hook(hook);
}

reclaimer::reclaimer(std::function<void ()> reclaim, reclaimer_scope scope)
reclaimer::reclaimer(reclaim_fn reclaim, reclaimer_scope scope)
: _reclaim(std::move(reclaim))
, _scope(scope) {
cpu_mem.reclaimers.push_back(this);
Expand Down Expand Up @@ -1321,7 +1325,7 @@ void operator delete[](void* ptr, with_alignment wa) {

namespace memory {

reclaimer::reclaimer(std::function<void ()> reclaim, reclaimer_scope) {
reclaimer::reclaimer(reclaim_fn reclaim, reclaimer_scope) {
}

reclaimer::~reclaimer() {
Expand Down
20 changes: 14 additions & 6 deletions core/memory.hh
Original file line number Diff line number Diff line change
Expand Up @@ -62,34 +62,42 @@ void configure(std::vector<resource::memory> m,

void* allocate_reclaimable(size_t size);

enum class reclaiming_result {
reclaimed_nothing,
reclaimed_something
};

// Determines when reclaimer can be invoked
enum class reclaimer_scope {
//
// Reclaimer is only invoked in its own fiber. That fiber will be
// given higher priority than regular application fibers.
//
separate_fiber,
async,

//
// Reclaimer may be invoked synchronously with allocation.
// It may also be invoked in separate_fiber scope.
// It may also be invoked in async scope.
//
// Reclaimer may invoke allocation, though it is discouraged because
// the system may be low on memory and such allocations may fail.
// Reclaimers which allocate should be prepared for re-entry.
//
synchronous_with_alloc
sync
};

class reclaimer {
std::function<void ()> _reclaim;
public:
using reclaim_fn = std::function<reclaiming_result ()>;
private:
reclaim_fn _reclaim;
reclaimer_scope _scope;
public:
// Installs new reclaimer which will be invoked when system is falling
// low on memory. 'scope' determines when reclaimer can be executed.
reclaimer(std::function<void ()> reclaim, reclaimer_scope scope = reclaimer_scope::separate_fiber);
reclaimer(reclaim_fn reclaim, reclaimer_scope scope = reclaimer_scope::async);
~reclaimer();
void do_reclaim() { _reclaim(); }
reclaiming_result do_reclaim() { return _reclaim(); }
reclaimer_scope scope() const { return _scope; }
};

Expand Down
13 changes: 9 additions & 4 deletions core/reactor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,9 @@ reactor::reactor()
assert(r == 0);
#endif
memory::set_reclaim_hook([this] (std::function<void ()> reclaim_fn) {
// push it in the front of the queue so we reclaim memory quickly
_pending_tasks.push_front(make_task([fn = std::move(reclaim_fn)] {
add_high_priority_task(make_task([fn = std::move(reclaim_fn)] {
fn();
}));
// stop any repeat() loops
task_quota = 0;
});
}

Expand Down Expand Up @@ -2150,3 +2147,11 @@ server_socket listen(socket_address sa, listen_options opts) {
future<connected_socket> connect(socket_address sa) {
return engine().connect(sa);
}

void reactor::add_high_priority_task(std::unique_ptr<task>&& t) {
_pending_tasks.push_front(std::move(t));
// stop any repeat() loops
task_quota = 0;
// break .then() chains
future_avail_count = max_inlined_continuations - 1;
}
2 changes: 2 additions & 0 deletions core/reactor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,8 @@ public:

void add_task(std::unique_ptr<task>&& t) { _pending_tasks.push_back(std::move(t)); }

void add_high_priority_task(std::unique_ptr<task>&&);

network_stack& net() { return *_network_stack; }
unsigned cpu_id() const { return _id; }

Expand Down
11 changes: 6 additions & 5 deletions core/slab.hh
Original file line number Diff line number Diff line change
Expand Up @@ -295,12 +295,12 @@ private:
memory::reclaimer *_reclaimer = nullptr;
bool _reclaimed = false;
private:
void evict_lru_slab_page() {
memory::reclaiming_result evict_lru_slab_page() {
if (_slab_page_desc_lru.empty()) {
// NOTE: Nothing to evict. If this happens, it implies that all
// slab pages in the slab are being used at the same time.
// That being said, this event is very unlikely to happen.
return;
return memory::reclaiming_result::reclaimed_nothing;
}
// get descriptor of the least-recently-used slab page and related info.
auto& desc = _slab_page_desc_lru.back();
Expand Down Expand Up @@ -345,17 +345,18 @@ private:
#endif
::free(slab_page); // free slab page object
delete &desc; // free its descriptor
return memory::reclaiming_result::reclaimed_something;
}

/*
* Reclaim the least recently used slab page that is unused.
*/
void reclaim() {
memory::reclaiming_result reclaim() {
// once reclaimer was called, slab pages should no longer be allocated, as the
// memory used by slab is supposed to be calibrated.
_reclaimed = true;
// FIXME: Should reclaim() only evict a single slab page at a time?
evict_lru_slab_page();
return evict_lru_slab_page();
}

void initialize_slab_allocator(double growth_factor, uint64_t limit) {
Expand All @@ -377,7 +378,7 @@ private:

// If slab limit is zero, enable reclaimer.
if (!limit) {
_reclaimer = new memory::reclaimer([this] { reclaim(); });
_reclaimer = new memory::reclaimer([this] { return reclaim(); });
} else {
_slab_pages_vector.reserve(_available_slab_pages);
}
Expand Down
28 changes: 28 additions & 0 deletions tests/futures_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -415,3 +415,31 @@ SEASTAR_TEST_CASE(test_parallel_for_each_waits_for_all_fibers_even_if_one_of_the
BOOST_REQUIRE(*can_exit);
});
}

SEASTAR_TEST_CASE(test_high_priority_task_runs_before_ready_continuations) {
return now().then([] {
auto flag = make_lw_shared<bool>(false);
engine().add_high_priority_task(make_task([flag] {
*flag = true;
}));
make_ready_future().then([flag] {
BOOST_REQUIRE(*flag);
});
});
}

SEASTAR_TEST_CASE(test_high_priority_task_runs_in_the_middle_of_loops) {
auto counter = make_lw_shared<int>(0);
auto flag = make_lw_shared<bool>(false);
return repeat([counter, flag] {
if (*counter == 1) {
BOOST_REQUIRE(*flag);
return stop_iteration::yes;
}
engine().add_high_priority_task(make_task([flag] {
*flag = true;
}));
++(*counter);
return stop_iteration::no;
});
}

0 comments on commit 68fee6c

Please sign in to comment.