From 6a333871988c729515157c86e68e129f116a3e4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Chojnowski?= Date: Thu, 2 Jan 2025 23:13:39 +0100 Subject: [PATCH 1/4] apps/io_tester: add some test cases for the IO scheduler Add some test cases useful for presenting improvement ideas for the IO scheduler. 3 of 4 tests added in this patch illustrate some goals which weren't met before but are met after this series. The `tau_nemesis` test illustrates a problem which is present both before and after this series. --- ...cpu_starved_shard_can_still_saturate_io.sh | 39 ++++++++++ ...u_starved_shard_has_reasonable_fairness.sh | 39 ++++++++++ .../scylla_tablet_migration.sh | 77 +++++++++++++++++++ .../manual/iosched_reproducers/tau_nemesis.sh | 58 ++++++++++++++ 4 files changed, 213 insertions(+) create mode 100644 tests/manual/iosched_reproducers/one_cpu_starved_shard_can_still_saturate_io.sh create mode 100644 tests/manual/iosched_reproducers/one_cpu_starved_shard_has_reasonable_fairness.sh create mode 100644 tests/manual/iosched_reproducers/scylla_tablet_migration.sh create mode 100644 tests/manual/iosched_reproducers/tau_nemesis.sh diff --git a/tests/manual/iosched_reproducers/one_cpu_starved_shard_can_still_saturate_io.sh b/tests/manual/iosched_reproducers/one_cpu_starved_shard_can_still_saturate_io.sh new file mode 100644 index 0000000000..1854f4dd0b --- /dev/null +++ b/tests/manual/iosched_reproducers/one_cpu_starved_shard_can_still_saturate_io.sh @@ -0,0 +1,39 @@ +#!/usr/bin/env bash + +# Test scenario: +# A single CPU-starved shard has a batch IO job. +# Goal: it should be able to utilize the entire bandwidth of the disk, +# despite the rare polls. + +if [ $# -ne 1 ]; then + echo "Usage: $0 IO_TESTER_EXECUTABLE" >&2 + exit 1 +fi + +"$1" --smp=7 --storage=/dev/null --conf=<(cat <<'EOF' +- name: tablet-streaming + data_size: 1GB + shards: [0] + type: seqread + shard_info: + parallelism: 50 + reqsize: 128kB + shares: 200 +- name: cpuhog + type: cpu + shards: [0] + shard_info: + parallelism: 1 + execution_time: 550us + +EOF +) --io-properties-file=<(cat <<'EOF' +# i4i.2xlarge +disks: +- mountpoint: /dev + read_bandwidth: 1542559872 + read_iops: 218786 + write_bandwidth: 1130867072 + write_iops: 121499 +EOF +) diff --git a/tests/manual/iosched_reproducers/one_cpu_starved_shard_has_reasonable_fairness.sh b/tests/manual/iosched_reproducers/one_cpu_starved_shard_has_reasonable_fairness.sh new file mode 100644 index 0000000000..27dd05378b --- /dev/null +++ b/tests/manual/iosched_reproducers/one_cpu_starved_shard_has_reasonable_fairness.sh @@ -0,0 +1,39 @@ +#!/usr/bin/env bash + +# Test scenario: +# all shards contend for IO, but one shard is additionally CPU-starved +# and polls rarely. +# Goal: it should still be getting a reasonably fair share of disk bandwidth. + +if [ $# -ne 1 ]; then + echo "Usage: $0 IO_TESTER_EXECUTABLE" >&2 + exit 1 +fi + +"$1" --smp=7 --storage=/dev/null --conf=<(cat <<'EOF' +- name: tablet-streaming + data_size: 1GB + shards: all + type: seqread + shard_info: + parallelism: 50 + reqsize: 128kB + shares: 200 +- name: cpuhog + type: cpu + shards: [0] + shard_info: + parallelism: 1 + execution_time: 550us + +EOF +) --io-properties-file=<(cat <<'EOF' +# i4i.2xlarge +disks: +- mountpoint: /dev + read_bandwidth: 1542559872 + read_iops: 218786 + write_bandwidth: 1130867072 + write_iops: 121499 +EOF +) --duration=2 diff --git a/tests/manual/iosched_reproducers/scylla_tablet_migration.sh b/tests/manual/iosched_reproducers/scylla_tablet_migration.sh new file mode 100644 index 0000000000..1caffe558d --- /dev/null +++ b/tests/manual/iosched_reproducers/scylla_tablet_migration.sh @@ -0,0 +1,77 @@ +#!/usr/bin/env bash + +# Test scenario: +# Simulation of a ScyllaDB workload which prompted some changes to the IO scheduler: +# database queries concurrent with tablet streaming. +# +# All 7 shards are running a low-priority (200 shares) batch IO workload +# and a high-priority (1000 shares), moderate-bandwidth, interactive workload. +# +# The interactive workload requires about 30% of the node's +# total bandwidth (as measured in tokens), in small random reads. +# The batch workload does large sequential reads and wants to utilize all +# spare bandwidth. +# +# This workload is almost symmetric across shards, but is slightly skewed +# and shard 0 is slightly more loaded. But even on this shard, the workload +# doesn't need more than 35% of the fair bandwidth of this shard. +# +# Due to the distribution of shares across IO classes, the user expects that +# the interactive workload should be guaranteed (1000 / (1000 + 200)) == ~84% of +# the disk bandwidth on each shard. So if it's only asking for less than 35%, +# the lower-priority job shouldn't disturb it. +# +# But before the relevant IO scheduler changes, this goal wasn't met, +# and the interactive workload on shard 0 was instead starved for IO +# by the low-priority workloads on other shards. + +if [ $# -ne 1 ]; then + echo "Usage: $0 IO_TESTER_EXECUTABLE" >&2 + exit 1 +fi + +"$1" --smp=7 --storage=/dev/null --conf=<(cat <<'EOF' +- name: tablet-streaming + data_size: 1GB + shards: all + type: seqread + shard_info: + parallelism: 50 + reqsize: 128kB + shares: 200 +- name: cassandra-stress + shards: all + type: randread + data_size: 1GB + shard_info: + parallelism: 100 + reqsize: 1536 + shares: 1000 + rps: 75 + options: + pause_distribution: poisson + sleep_type: steady +- name: cassandra-stress-slight-imbalance + shards: [0] + type: randread + data_size: 1GB + shard_info: + parallelism: 100 + reqsize: 1536 + class: cassandra-stress + rps: 10 + options: + pause_distribution: poisson + sleep_type: steady + +EOF +) --io-properties-file=<(cat <<'EOF' +# i4i.2xlarge +disks: +- mountpoint: /dev + read_bandwidth: 1542559872 + read_iops: 218786 + write_bandwidth: 1130867072 + write_iops: 121499 +EOF +) diff --git a/tests/manual/iosched_reproducers/tau_nemesis.sh b/tests/manual/iosched_reproducers/tau_nemesis.sh new file mode 100644 index 0000000000..92031e38d4 --- /dev/null +++ b/tests/manual/iosched_reproducers/tau_nemesis.sh @@ -0,0 +1,58 @@ +#!/usr/bin/env bash + +# There is a `tau` mechanism in `fair_queue` which lets newly-activated +# IO classes to monopolize the shard's IO queue for a while. +# +# This isn't very useful and can result in major performance problems, +# as this test illustrates. The `highprio` workload could have tail latency +# of about 2 milliseconds, but the `bursty_lowprio` is allowed by `tau` to butt in +# periodically and preempt `highprio` for ~30ms, bringing its tail latency +# to that threshold. + +if [ $# -ne 1 ]; then + echo "Usage: $0 IO_TESTER_EXECUTABLE" >&2 + exit 1 +fi + +"$1" --smp=7 --storage=/dev/null --conf=<(cat <<'EOF' +- name: filler + data_size: 1GB + shards: all + type: seqread + shard_info: + parallelism: 10 + reqsize: 128kB + shares: 10 +- name: bursty_lowprio + data_size: 1GB + shards: all + type: seqread + shard_info: + parallelism: 1 + reqsize: 128kB + shares: 100 + batch: 50 + rps: 8 +- name: highprio + shards: all + type: randread + data_size: 1GB + shard_info: + parallelism: 100 + reqsize: 1536 + shares: 1000 + rps: 50 + options: + pause_distribution: poisson + sleep_type: steady +EOF +) --io-properties-file=<(cat <<'EOF' +# i4i.2xlarge +disks: +- mountpoint: /dev + read_bandwidth: 1542559872 + read_iops: 218786 + write_bandwidth: 1130867072 + write_iops: 121499 +EOF +) From 1e597cd7154220fecfb30f99e43e573839b0e264 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Chojnowski?= Date: Mon, 20 Jan 2025 01:33:03 +0100 Subject: [PATCH 2/4] test: in fair_queue_test, ensure that tokens are only replenished by test_env fair_queue_test implicitly assumes that `fair_group::clock_type::now()` is always smaller than `_fg.replenished_ts() + std::chrono::microseconds(1)`. This ensures that the amount of replenished tokens does not exceed what the `tick()` calls are supposed to replenish. If this assumption is violated by the point of some `replenish_capacity()` call in `tick()`, then `tick()` will not replenish tokens, and instead tokens will be replenished by the `maybe_replenish_capacity()` call done by `dispatch_requests()`, which will replenish more than 1us worth of tokens, breaking the assumptions of the test. To prevent this, and ensure that the test doesn't rely on timing, we can initialize `_fg.replenished_ts()` to some point in the future, which will ensure that `now()` won't catch up to it. --- tests/unit/fair_queue_test.cc | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/tests/unit/fair_queue_test.cc b/tests/unit/fair_queue_test.cc index fe620c7790..3574f46dd7 100644 --- a/tests/unit/fair_queue_test.cc +++ b/tests/unit/fair_queue_test.cc @@ -82,7 +82,14 @@ class test_env { test_env(unsigned capacity) : _fg(fg_config(capacity), 1) , _fq(_fg, fq_config()) - {} + { + // Move _fg._replenished_ts() to far future. + // This will prevent any `maybe_replenish_capacity` calls (indirectly done by `fair_queue::dispatch_requests()`) + // from replenishing tokens on its own, and ensure that the only source of replenishment will be tick(). + // + // Otherwise the rate of replenishment might be greater than expected by the test, breaking the results. + _fg.replenish_capacity(fair_group::clock_type::now() + std::chrono::days(1)); + } // As long as there is a request sitting in the queue, tick() will process // at least one request. The only situation in which tick() will return nothing From 7781c4c8e21bb0f6cb8b3b03d8cf4e76355f9cb1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Chojnowski?= Date: Fri, 17 Jan 2025 17:20:20 +0100 Subject: [PATCH 3/4] fair_queue: track the total capacity of queued requests Adds a member variable which tracks the summed capacity of all requests waiting in the queue. This is a piece of data which might be valuable to the IO scheduler. We make use of it in later patches in the series. --- include/seastar/core/fair_queue.hh | 2 ++ src/core/fair_queue.cc | 3 +++ 2 files changed, 5 insertions(+) diff --git a/include/seastar/core/fair_queue.hh b/include/seastar/core/fair_queue.hh index a8712f97d2..639df0c0af 100644 --- a/include/seastar/core/fair_queue.hh +++ b/include/seastar/core/fair_queue.hh @@ -350,6 +350,8 @@ private: }; std::optional _pending; + // Total capacity of all requests waiting in the queue. + capacity_t _queued_capacity = 0; void push_priority_class(priority_class_data& pc) noexcept; void push_priority_class_from_idle(priority_class_data& pc) noexcept; diff --git a/src/core/fair_queue.cc b/src/core/fair_queue.cc index f23edf3ded..1e8af93ca3 100644 --- a/src/core/fair_queue.cc +++ b/src/core/fair_queue.cc @@ -297,12 +297,14 @@ void fair_queue::queue(class_id id, fair_queue_entry& ent) noexcept { push_priority_class_from_idle(pc); } pc._queue.push_back(ent); + _queued_capacity += ent.capacity(); } void fair_queue::notify_request_finished(fair_queue_entry::capacity_t cap) noexcept { } void fair_queue::notify_request_cancelled(fair_queue_entry& ent) noexcept { + _queued_capacity -= ent._capacity; ent._capacity = 0; } @@ -375,6 +377,7 @@ void fair_queue::dispatch_requests(std::function cb) { h._accumulated += req_cost; h._pure_accumulated += req_cap; dispatched += req_cap; + _queued_capacity -= req_cap; cb(req); From 23309298b480d365e1b69bb96214201a83ea2273 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Chojnowski?= Date: Sun, 19 Jan 2025 22:33:52 +0100 Subject: [PATCH 4/4] fair_queue: make the fair_group token grabbing discipline more fair The current design of `fair_group` isn't fair enough to shards. During contention, the group will be -- aproximately -- taking requests from shards one-by-one, in round robin. This guarantees that each contender will dispatch an equal *number* of requests. This is some kind of fairness, but it's not the kind we want, probably ever. A better kind of fairness is that under contention, each shard should be guaranteed `1/nr_shards` of the disk's IOPS and/or `1/nr_shards` of byte-bandwidth, whichever dimension it pressures more. This is needed so that each shard can be relied on to sustain a certain rate of requests -- the lower bound of the slowest shard's throughput usually dictates the throughput of the entire cluster. But those two kinds of fairness are only the same if all IO requests have the same size and direction. Otherwise they can be drastically different. With the current design it's easy to create a situation where a shard receives an arbitrarily small fraction of both IOPS and bandwidth, despite being IO-bound. (Example: a node with X shards, where one shard spams only very small requests and other shards spams only big requests). This is a problem in practice. In ScyllaDB, we observed IO starvation of some shards during realistic workloads. While they require some workload asymmetry to occur, even small asymmetries can cause serious unfairness to occur. (For example, a shard which receives 6% more of database queries than other shards can be starved to less than 50% of its fair share of IOPS and/or bandwidth -- because each of those 1 kiB queries is "fairly" matched with 16x costlier 128 kiB low-priority batch IO requests on other shards). To improve this, `fair_group` needs a different queueing discipline. There are many possible ways, but this patch chooses the one which is relatively the most similar to the current one. The main idea is that we still rely on the "approximate round robin" of token queue as the basis for fairness, but we reserve a fixed-size batch of tokens at a time, rather than a fixed-size (i.e. 1) batch of _requests_ at a time. This turns the discipline from approximately-request-fair to approximately-token-fair, which is what we want. The implementation details are non-trivial, though, and should be carefully reviewed. --- include/seastar/core/fair_queue.hh | 50 +++++--- include/seastar/util/shared_token_bucket.hh | 4 + src/core/fair_queue.cc | 132 ++++++++++++++------ 3 files changed, 126 insertions(+), 60 deletions(-) diff --git a/include/seastar/core/fair_queue.hh b/include/seastar/core/fair_queue.hh index 639df0c0af..bbfde6c37d 100644 --- a/include/seastar/core/fair_queue.hh +++ b/include/seastar/core/fair_queue.hh @@ -254,6 +254,7 @@ public: capacity_t per_tick_grab_threshold() const noexcept { return _per_tick_threshold; } capacity_t grab_capacity(capacity_t cap) noexcept; clock_type::time_point replenished_ts() const noexcept { return _token_bucket.replenished_ts(); } + void refund_tokens(capacity_t) noexcept; void replenish_capacity(clock_type::time_point now) noexcept; void maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept; @@ -331,25 +332,25 @@ private: size_t _nr_classes = 0; capacity_t _last_accumulated = 0; - /* - * When the shared capacity os over the local queue delays - * further dispatching untill better times - * - * \head -- the value group head rover is expected to cross - * \cap -- the capacity that's accounted on the group - * - * The last field is needed to "rearm" the wait in case - * queue decides that it wants to dispatch another capacity - * in the middle of the waiting - */ + // _pending represents a reservation of tokens from the bucket. + // + // In the "dispatch timeline" defined by the growing bucket head of the group, + // tokens in the range [_pending.head - cap, _pending.head) belong + // to this queue. + // + // For example, if: + // _group._token_bucket.head == 300 + // _pending.head == 700 + // _pending.cap == 500 + // then the reservation is [200, 700), 100 tokens are ready to be dispatched by this queue, + // and another 400 tokens are going to be appear soon. (And after that, this queue + // will be able to make its next reservation). struct pending { - capacity_t head; - capacity_t cap; - - pending(capacity_t t, capacity_t c) noexcept : head(t), cap(c) {} + capacity_t head = 0; + capacity_t cap = 0; }; + pending _pending; - std::optional _pending; // Total capacity of all requests waiting in the queue. capacity_t _queued_capacity = 0; @@ -359,9 +360,20 @@ private: void plug_priority_class(priority_class_data& pc) noexcept; void unplug_priority_class(priority_class_data& pc) noexcept; - enum class grab_result { grabbed, cant_preempt, pending }; - grab_result grab_capacity(const fair_queue_entry& ent) noexcept; - grab_result grab_pending_capacity(const fair_queue_entry& ent) noexcept; + // Replaces _pending with a new reservation starting at the current + // group bucket tail. + void grab_capacity(capacity_t cap) noexcept; + // Shaves off the fulfilled frontal part from `_pending` (if any), + // and returns the fulfilled tokens in `ready_tokens`. + // Sets `our_turn_has_come` to the truth value of "`_pending` is empty or + // there are no unfulfilled reservations (from other shards) earlier than `_pending`". + // + // Assumes that `_group.maybe_replenish_capacity()` was called recently. + struct reap_result { + capacity_t ready_tokens; + bool our_turn_has_come; + }; + reap_result reap_pending_capacity() noexcept; public: /// Constructs a fair queue with configuration parameters \c cfg. /// diff --git a/include/seastar/util/shared_token_bucket.hh b/include/seastar/util/shared_token_bucket.hh index 91c718763a..ad9fd11af6 100644 --- a/include/seastar/util/shared_token_bucket.hh +++ b/include/seastar/util/shared_token_bucket.hh @@ -159,6 +159,10 @@ public: _rovers.release(tokens); } + void refund(T tokens) noexcept { + fetch_add(_rovers.head, tokens); + } + void replenish(typename Clock::time_point now) noexcept { auto ts = _replenished.load(std::memory_order_relaxed); diff --git a/src/core/fair_queue.cc b/src/core/fair_queue.cc index 1e8af93ca3..81069d156a 100644 --- a/src/core/fair_queue.cc +++ b/src/core/fair_queue.cc @@ -117,6 +117,10 @@ void fair_group::replenish_capacity(clock_type::time_point now) noexcept { _token_bucket.replenish(now); } +void fair_group::refund_tokens(capacity_t cap) noexcept { + _token_bucket.refund(cap); +} + void fair_group::maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept { auto now = clock_type::now(); auto extra = _token_bucket.accumulated_in(now - local_ts); @@ -223,35 +227,22 @@ void fair_queue::unplug_class(class_id cid) noexcept { unplug_priority_class(*_priority_classes[cid]); } -auto fair_queue::grab_pending_capacity(const fair_queue_entry& ent) noexcept -> grab_result { - _group.maybe_replenish_capacity(_group_replenish); - - if (_group.capacity_deficiency(_pending->head)) { - return grab_result::pending; - } - - capacity_t cap = ent._capacity; - if (cap > _pending->cap) { - return grab_result::cant_preempt; +auto fair_queue::reap_pending_capacity() noexcept -> reap_result { + auto result = reap_result{.ready_tokens = 0, .our_turn_has_come = true}; + if (_pending.cap) { + capacity_t deficiency = _group.capacity_deficiency(_pending.head); + result.our_turn_has_come = deficiency <= _pending.cap; + if (result.our_turn_has_come) { + result.ready_tokens = _pending.cap - deficiency; + _pending.cap = deficiency; + } } - - _pending.reset(); - return grab_result::grabbed; + return result; } -auto fair_queue::grab_capacity(const fair_queue_entry& ent) noexcept -> grab_result { - if (_pending) { - return grab_pending_capacity(ent); - } - - capacity_t cap = ent._capacity; +auto fair_queue::grab_capacity(capacity_t cap) noexcept -> void { capacity_t want_head = _group.grab_capacity(cap); - if (_group.capacity_deficiency(want_head)) { - _pending.emplace(want_head, cap); - return grab_result::pending; - } - - return grab_result::grabbed; + _pending = pending{want_head, cap}; } void fair_queue::register_priority_class(class_id id, uint32_t shares) { @@ -309,7 +300,7 @@ void fair_queue::notify_request_cancelled(fair_queue_entry& ent) noexcept { } fair_queue::clock_type::time_point fair_queue::next_pending_aio() const noexcept { - if (_pending) { + if (_pending.cap) { /* * We expect the disk to release the ticket within some time, * but it's ... OK if it doesn't -- the pending wait still @@ -320,7 +311,7 @@ fair_queue::clock_type::time_point fair_queue::next_pending_aio() const noexcept * which's sub-optimal. The expectation is that we think disk * works faster, than it really does. */ - auto over = _group.capacity_deficiency(_pending->head); + auto over = _group.capacity_deficiency(_pending.head); auto ticks = _group.capacity_duration(over); return std::chrono::steady_clock::now() + std::chrono::duration_cast(ticks); } @@ -328,11 +319,31 @@ fair_queue::clock_type::time_point fair_queue::next_pending_aio() const noexcept return std::chrono::steady_clock::time_point::max(); } +// This function is called by the shard on every poll. +// It picks up tokens granted by the group, spends available tokens on IO dispatches, +// and makes a reservation for more tokens, if needed. +// +// Reservations are done in batches of size `_group.per_tick_grab_threshold()`. +// During contention, in an average moment in time each contending shard can be expected to +// be holding a reservation of such size after the current head of the token bucket. +// +// A shard which is currently calling `dispatch_requests()` can expect a latency +// of at most `nr_contenders * (_group.per_tick_grab_threshold() + max_request_cap)` before its next reservation is fulfilled. +// If a shard calls `dispatch_requests()` at least once per X total tokens, it should receive bandwidth +// of at least `_group.per_tick_grab_threshold() / (X + nr_contenders * (_group.per_tick_grab_threshold() + max_request_cap))`. +// +// A shard which is polling continuously should be able to grab its fair share of the disk for itself. +// +// Given a task quota of 500us and IO latency goal of 750 us, +// a CPU-starved shard should still be able to grab at least ~30% of its fair share in the worst case. +// This is far from ideal, but it's something. void fair_queue::dispatch_requests(std::function cb) { - capacity_t dispatched = 0; - boost::container::small_vector preempt; + _group.maybe_replenish_capacity(_group_replenish); + + const uint64_t max_unamortized_reservation = _group.per_tick_grab_threshold(); + auto available = reap_pending_capacity(); - while (!_handles.empty() && (dispatched < _group.per_tick_grab_threshold())) { + while (!_handles.empty()) { priority_class_data& h = *_handles.top(); if (h._queue.empty() || !h._plugged) { pop_priority_class(h); @@ -340,17 +351,51 @@ void fair_queue::dispatch_requests(std::function cb) { } auto& req = h._queue.front(); - auto gr = grab_capacity(req); - if (gr == grab_result::pending) { + if (req._capacity <= available.ready_tokens) { + // We can dispatch the request immediately. + // We do that after the if-else. + } else if (req._capacity <= available.ready_tokens + _pending.cap || _pending.cap >= max_unamortized_reservation) { + // We can't dispatch the request yet, but we already have a pending reservation + // which will provide us with enough tokens for it eventually, + // or our reservation is already max-size and we can't reserve more tokens until we reap some. + // So we should just wait. + // We return any immediately-available tokens back to `_pending` + // and we bail. The next `dispatch_request` will again take those tokens + // (possibly joined by some newly-granted tokens) and retry. + _pending.cap += available.ready_tokens; + available.ready_tokens = 0; break; - } - - if (gr == grab_result::cant_preempt) { - pop_priority_class(h); - preempt.emplace_back(&h); + } else if (available.our_turn_has_come) { + // The current reservation isn't enough to fulfill the next request, + // and we can cancel it (because `our_turn_has_come == true`) and make a bigger one + // (because `_pending.cap < can_grab_this_tick`). + // So we cancel it and do a bigger one. + + // We do token recycling here: we return the tokens which we have available, and the tokens we have reserved + // immediately after the group head, and we return them to the bucket, immediately grabbing the same amount from the tail. + // This is neutral to fairness. The bandwidth we consume is still influenced only by the + // `max_unarmortized_reservation` portions. + auto recycled = available.ready_tokens + _pending.cap; + capacity_t grab_amount = std::min(recycled + max_unamortized_reservation, _queued_capacity); + // There's technically nothing wrong with grabbing more than `_group.maximum_capacity()`, + // but the token bucket has an assert for that, and its a reasonable expectation, so let's respect that limit. + // It shouldn't matter in practice. + grab_amount = std::min(grab_amount, _group.maximum_capacity()); + _group.refund_tokens(recycled); + grab_capacity(grab_amount); + available = reap_pending_capacity(); continue; + } else { + // We can already see that our current reservation is going to be insufficient + // for the highest-priority request as of now. But since group head didn't touch + // it yet, there's no good way to cancel it, so we have no choice but to wait + // until the touch time. + assert(available.ready_tokens == 0); + break; } + available.ready_tokens -= req._capacity; + _last_accumulated = std::max(h._accumulated, _last_accumulated); pop_priority_class(h); h._queue.pop_front(); @@ -376,7 +421,6 @@ void fair_queue::dispatch_requests(std::function cb) { } h._accumulated += req_cost; h._pure_accumulated += req_cap; - dispatched += req_cap; _queued_capacity -= req_cap; cb(req); @@ -386,9 +430,15 @@ void fair_queue::dispatch_requests(std::function cb) { } } - for (auto&& h : preempt) { - push_priority_class(*h); - } + assert(_handles.empty() || available.ready_tokens == 0); + + // Note: if IO cancellation happens, it's possible that we are still holding some tokens in `ready` here. + // + // We could refund them to the bucket, but permanently refunding tokens (as opposed to only + // "rotating" the bucket like the earlier refund() calls in this function do) is theoretically + // unpleasant (it can bloat the bucket beyond its size limit, and its hard to write a correct + // countermeasure for that), so we just discard the tokens. There's no harm in it, IO cancellation + // can't have resource-saving guarantees anyway. } std::vector fair_queue::metrics(class_id c) {