diff --git a/include/seastar/core/fair_queue.hh b/include/seastar/core/fair_queue.hh index a8712f97d2..bbfde6c37d 100644 --- a/include/seastar/core/fair_queue.hh +++ b/include/seastar/core/fair_queue.hh @@ -254,6 +254,7 @@ public: capacity_t per_tick_grab_threshold() const noexcept { return _per_tick_threshold; } capacity_t grab_capacity(capacity_t cap) noexcept; clock_type::time_point replenished_ts() const noexcept { return _token_bucket.replenished_ts(); } + void refund_tokens(capacity_t) noexcept; void replenish_capacity(clock_type::time_point now) noexcept; void maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept; @@ -331,25 +332,27 @@ private: size_t _nr_classes = 0; capacity_t _last_accumulated = 0; - /* - * When the shared capacity os over the local queue delays - * further dispatching untill better times - * - * \head -- the value group head rover is expected to cross - * \cap -- the capacity that's accounted on the group - * - * The last field is needed to "rearm" the wait in case - * queue decides that it wants to dispatch another capacity - * in the middle of the waiting - */ + // _pending represents a reservation of tokens from the bucket. + // + // In the "dispatch timeline" defined by the growing bucket head of the group, + // tokens in the range [_pending.head - cap, _pending.head) belong + // to this queue. + // + // For example, if: + // _group._token_bucket.head == 300 + // _pending.head == 700 + // _pending.cap == 500 + // then the reservation is [200, 700), 100 tokens are ready to be dispatched by this queue, + // and another 400 tokens are going to be appear soon. (And after that, this queue + // will be able to make its next reservation). struct pending { - capacity_t head; - capacity_t cap; - - pending(capacity_t t, capacity_t c) noexcept : head(t), cap(c) {} + capacity_t head = 0; + capacity_t cap = 0; }; + pending _pending; - std::optional _pending; + // Total capacity of all requests waiting in the queue. + capacity_t _queued_capacity = 0; void push_priority_class(priority_class_data& pc) noexcept; void push_priority_class_from_idle(priority_class_data& pc) noexcept; @@ -357,9 +360,20 @@ private: void plug_priority_class(priority_class_data& pc) noexcept; void unplug_priority_class(priority_class_data& pc) noexcept; - enum class grab_result { grabbed, cant_preempt, pending }; - grab_result grab_capacity(const fair_queue_entry& ent) noexcept; - grab_result grab_pending_capacity(const fair_queue_entry& ent) noexcept; + // Replaces _pending with a new reservation starting at the current + // group bucket tail. + void grab_capacity(capacity_t cap) noexcept; + // Shaves off the fulfilled frontal part from `_pending` (if any), + // and returns the fulfilled tokens in `ready_tokens`. + // Sets `our_turn_has_come` to the truth value of "`_pending` is empty or + // there are no unfulfilled reservations (from other shards) earlier than `_pending`". + // + // Assumes that `_group.maybe_replenish_capacity()` was called recently. + struct reap_result { + capacity_t ready_tokens; + bool our_turn_has_come; + }; + reap_result reap_pending_capacity() noexcept; public: /// Constructs a fair queue with configuration parameters \c cfg. /// diff --git a/include/seastar/util/shared_token_bucket.hh b/include/seastar/util/shared_token_bucket.hh index 91c718763a..ad9fd11af6 100644 --- a/include/seastar/util/shared_token_bucket.hh +++ b/include/seastar/util/shared_token_bucket.hh @@ -159,6 +159,10 @@ public: _rovers.release(tokens); } + void refund(T tokens) noexcept { + fetch_add(_rovers.head, tokens); + } + void replenish(typename Clock::time_point now) noexcept { auto ts = _replenished.load(std::memory_order_relaxed); diff --git a/src/core/fair_queue.cc b/src/core/fair_queue.cc index f23edf3ded..81069d156a 100644 --- a/src/core/fair_queue.cc +++ b/src/core/fair_queue.cc @@ -117,6 +117,10 @@ void fair_group::replenish_capacity(clock_type::time_point now) noexcept { _token_bucket.replenish(now); } +void fair_group::refund_tokens(capacity_t cap) noexcept { + _token_bucket.refund(cap); +} + void fair_group::maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept { auto now = clock_type::now(); auto extra = _token_bucket.accumulated_in(now - local_ts); @@ -223,35 +227,22 @@ void fair_queue::unplug_class(class_id cid) noexcept { unplug_priority_class(*_priority_classes[cid]); } -auto fair_queue::grab_pending_capacity(const fair_queue_entry& ent) noexcept -> grab_result { - _group.maybe_replenish_capacity(_group_replenish); - - if (_group.capacity_deficiency(_pending->head)) { - return grab_result::pending; - } - - capacity_t cap = ent._capacity; - if (cap > _pending->cap) { - return grab_result::cant_preempt; +auto fair_queue::reap_pending_capacity() noexcept -> reap_result { + auto result = reap_result{.ready_tokens = 0, .our_turn_has_come = true}; + if (_pending.cap) { + capacity_t deficiency = _group.capacity_deficiency(_pending.head); + result.our_turn_has_come = deficiency <= _pending.cap; + if (result.our_turn_has_come) { + result.ready_tokens = _pending.cap - deficiency; + _pending.cap = deficiency; + } } - - _pending.reset(); - return grab_result::grabbed; + return result; } -auto fair_queue::grab_capacity(const fair_queue_entry& ent) noexcept -> grab_result { - if (_pending) { - return grab_pending_capacity(ent); - } - - capacity_t cap = ent._capacity; +auto fair_queue::grab_capacity(capacity_t cap) noexcept -> void { capacity_t want_head = _group.grab_capacity(cap); - if (_group.capacity_deficiency(want_head)) { - _pending.emplace(want_head, cap); - return grab_result::pending; - } - - return grab_result::grabbed; + _pending = pending{want_head, cap}; } void fair_queue::register_priority_class(class_id id, uint32_t shares) { @@ -297,17 +288,19 @@ void fair_queue::queue(class_id id, fair_queue_entry& ent) noexcept { push_priority_class_from_idle(pc); } pc._queue.push_back(ent); + _queued_capacity += ent.capacity(); } void fair_queue::notify_request_finished(fair_queue_entry::capacity_t cap) noexcept { } void fair_queue::notify_request_cancelled(fair_queue_entry& ent) noexcept { + _queued_capacity -= ent._capacity; ent._capacity = 0; } fair_queue::clock_type::time_point fair_queue::next_pending_aio() const noexcept { - if (_pending) { + if (_pending.cap) { /* * We expect the disk to release the ticket within some time, * but it's ... OK if it doesn't -- the pending wait still @@ -318,7 +311,7 @@ fair_queue::clock_type::time_point fair_queue::next_pending_aio() const noexcept * which's sub-optimal. The expectation is that we think disk * works faster, than it really does. */ - auto over = _group.capacity_deficiency(_pending->head); + auto over = _group.capacity_deficiency(_pending.head); auto ticks = _group.capacity_duration(over); return std::chrono::steady_clock::now() + std::chrono::duration_cast(ticks); } @@ -326,11 +319,31 @@ fair_queue::clock_type::time_point fair_queue::next_pending_aio() const noexcept return std::chrono::steady_clock::time_point::max(); } +// This function is called by the shard on every poll. +// It picks up tokens granted by the group, spends available tokens on IO dispatches, +// and makes a reservation for more tokens, if needed. +// +// Reservations are done in batches of size `_group.per_tick_grab_threshold()`. +// During contention, in an average moment in time each contending shard can be expected to +// be holding a reservation of such size after the current head of the token bucket. +// +// A shard which is currently calling `dispatch_requests()` can expect a latency +// of at most `nr_contenders * (_group.per_tick_grab_threshold() + max_request_cap)` before its next reservation is fulfilled. +// If a shard calls `dispatch_requests()` at least once per X total tokens, it should receive bandwidth +// of at least `_group.per_tick_grab_threshold() / (X + nr_contenders * (_group.per_tick_grab_threshold() + max_request_cap))`. +// +// A shard which is polling continuously should be able to grab its fair share of the disk for itself. +// +// Given a task quota of 500us and IO latency goal of 750 us, +// a CPU-starved shard should still be able to grab at least ~30% of its fair share in the worst case. +// This is far from ideal, but it's something. void fair_queue::dispatch_requests(std::function cb) { - capacity_t dispatched = 0; - boost::container::small_vector preempt; + _group.maybe_replenish_capacity(_group_replenish); + + const uint64_t max_unamortized_reservation = _group.per_tick_grab_threshold(); + auto available = reap_pending_capacity(); - while (!_handles.empty() && (dispatched < _group.per_tick_grab_threshold())) { + while (!_handles.empty()) { priority_class_data& h = *_handles.top(); if (h._queue.empty() || !h._plugged) { pop_priority_class(h); @@ -338,17 +351,51 @@ void fair_queue::dispatch_requests(std::function cb) { } auto& req = h._queue.front(); - auto gr = grab_capacity(req); - if (gr == grab_result::pending) { + if (req._capacity <= available.ready_tokens) { + // We can dispatch the request immediately. + // We do that after the if-else. + } else if (req._capacity <= available.ready_tokens + _pending.cap || _pending.cap >= max_unamortized_reservation) { + // We can't dispatch the request yet, but we already have a pending reservation + // which will provide us with enough tokens for it eventually, + // or our reservation is already max-size and we can't reserve more tokens until we reap some. + // So we should just wait. + // We return any immediately-available tokens back to `_pending` + // and we bail. The next `dispatch_request` will again take those tokens + // (possibly joined by some newly-granted tokens) and retry. + _pending.cap += available.ready_tokens; + available.ready_tokens = 0; break; - } - - if (gr == grab_result::cant_preempt) { - pop_priority_class(h); - preempt.emplace_back(&h); + } else if (available.our_turn_has_come) { + // The current reservation isn't enough to fulfill the next request, + // and we can cancel it (because `our_turn_has_come == true`) and make a bigger one + // (because `_pending.cap < can_grab_this_tick`). + // So we cancel it and do a bigger one. + + // We do token recycling here: we return the tokens which we have available, and the tokens we have reserved + // immediately after the group head, and we return them to the bucket, immediately grabbing the same amount from the tail. + // This is neutral to fairness. The bandwidth we consume is still influenced only by the + // `max_unarmortized_reservation` portions. + auto recycled = available.ready_tokens + _pending.cap; + capacity_t grab_amount = std::min(recycled + max_unamortized_reservation, _queued_capacity); + // There's technically nothing wrong with grabbing more than `_group.maximum_capacity()`, + // but the token bucket has an assert for that, and its a reasonable expectation, so let's respect that limit. + // It shouldn't matter in practice. + grab_amount = std::min(grab_amount, _group.maximum_capacity()); + _group.refund_tokens(recycled); + grab_capacity(grab_amount); + available = reap_pending_capacity(); continue; + } else { + // We can already see that our current reservation is going to be insufficient + // for the highest-priority request as of now. But since group head didn't touch + // it yet, there's no good way to cancel it, so we have no choice but to wait + // until the touch time. + assert(available.ready_tokens == 0); + break; } + available.ready_tokens -= req._capacity; + _last_accumulated = std::max(h._accumulated, _last_accumulated); pop_priority_class(h); h._queue.pop_front(); @@ -374,7 +421,7 @@ void fair_queue::dispatch_requests(std::function cb) { } h._accumulated += req_cost; h._pure_accumulated += req_cap; - dispatched += req_cap; + _queued_capacity -= req_cap; cb(req); @@ -383,9 +430,15 @@ void fair_queue::dispatch_requests(std::function cb) { } } - for (auto&& h : preempt) { - push_priority_class(*h); - } + assert(_handles.empty() || available.ready_tokens == 0); + + // Note: if IO cancellation happens, it's possible that we are still holding some tokens in `ready` here. + // + // We could refund them to the bucket, but permanently refunding tokens (as opposed to only + // "rotating" the bucket like the earlier refund() calls in this function do) is theoretically + // unpleasant (it can bloat the bucket beyond its size limit, and its hard to write a correct + // countermeasure for that), so we just discard the tokens. There's no harm in it, IO cancellation + // can't have resource-saving guarantees anyway. } std::vector fair_queue::metrics(class_id c) { diff --git a/tests/manual/iosched_reproducers/one_cpu_starved_shard_can_still_saturate_io.sh b/tests/manual/iosched_reproducers/one_cpu_starved_shard_can_still_saturate_io.sh new file mode 100644 index 0000000000..1854f4dd0b --- /dev/null +++ b/tests/manual/iosched_reproducers/one_cpu_starved_shard_can_still_saturate_io.sh @@ -0,0 +1,39 @@ +#!/usr/bin/env bash + +# Test scenario: +# A single CPU-starved shard has a batch IO job. +# Goal: it should be able to utilize the entire bandwidth of the disk, +# despite the rare polls. + +if [ $# -ne 1 ]; then + echo "Usage: $0 IO_TESTER_EXECUTABLE" >&2 + exit 1 +fi + +"$1" --smp=7 --storage=/dev/null --conf=<(cat <<'EOF' +- name: tablet-streaming + data_size: 1GB + shards: [0] + type: seqread + shard_info: + parallelism: 50 + reqsize: 128kB + shares: 200 +- name: cpuhog + type: cpu + shards: [0] + shard_info: + parallelism: 1 + execution_time: 550us + +EOF +) --io-properties-file=<(cat <<'EOF' +# i4i.2xlarge +disks: +- mountpoint: /dev + read_bandwidth: 1542559872 + read_iops: 218786 + write_bandwidth: 1130867072 + write_iops: 121499 +EOF +) diff --git a/tests/manual/iosched_reproducers/one_cpu_starved_shard_has_reasonable_fairness.sh b/tests/manual/iosched_reproducers/one_cpu_starved_shard_has_reasonable_fairness.sh new file mode 100644 index 0000000000..27dd05378b --- /dev/null +++ b/tests/manual/iosched_reproducers/one_cpu_starved_shard_has_reasonable_fairness.sh @@ -0,0 +1,39 @@ +#!/usr/bin/env bash + +# Test scenario: +# all shards contend for IO, but one shard is additionally CPU-starved +# and polls rarely. +# Goal: it should still be getting a reasonably fair share of disk bandwidth. + +if [ $# -ne 1 ]; then + echo "Usage: $0 IO_TESTER_EXECUTABLE" >&2 + exit 1 +fi + +"$1" --smp=7 --storage=/dev/null --conf=<(cat <<'EOF' +- name: tablet-streaming + data_size: 1GB + shards: all + type: seqread + shard_info: + parallelism: 50 + reqsize: 128kB + shares: 200 +- name: cpuhog + type: cpu + shards: [0] + shard_info: + parallelism: 1 + execution_time: 550us + +EOF +) --io-properties-file=<(cat <<'EOF' +# i4i.2xlarge +disks: +- mountpoint: /dev + read_bandwidth: 1542559872 + read_iops: 218786 + write_bandwidth: 1130867072 + write_iops: 121499 +EOF +) --duration=2 diff --git a/tests/manual/iosched_reproducers/scylla_tablet_migration.sh b/tests/manual/iosched_reproducers/scylla_tablet_migration.sh new file mode 100644 index 0000000000..1caffe558d --- /dev/null +++ b/tests/manual/iosched_reproducers/scylla_tablet_migration.sh @@ -0,0 +1,77 @@ +#!/usr/bin/env bash + +# Test scenario: +# Simulation of a ScyllaDB workload which prompted some changes to the IO scheduler: +# database queries concurrent with tablet streaming. +# +# All 7 shards are running a low-priority (200 shares) batch IO workload +# and a high-priority (1000 shares), moderate-bandwidth, interactive workload. +# +# The interactive workload requires about 30% of the node's +# total bandwidth (as measured in tokens), in small random reads. +# The batch workload does large sequential reads and wants to utilize all +# spare bandwidth. +# +# This workload is almost symmetric across shards, but is slightly skewed +# and shard 0 is slightly more loaded. But even on this shard, the workload +# doesn't need more than 35% of the fair bandwidth of this shard. +# +# Due to the distribution of shares across IO classes, the user expects that +# the interactive workload should be guaranteed (1000 / (1000 + 200)) == ~84% of +# the disk bandwidth on each shard. So if it's only asking for less than 35%, +# the lower-priority job shouldn't disturb it. +# +# But before the relevant IO scheduler changes, this goal wasn't met, +# and the interactive workload on shard 0 was instead starved for IO +# by the low-priority workloads on other shards. + +if [ $# -ne 1 ]; then + echo "Usage: $0 IO_TESTER_EXECUTABLE" >&2 + exit 1 +fi + +"$1" --smp=7 --storage=/dev/null --conf=<(cat <<'EOF' +- name: tablet-streaming + data_size: 1GB + shards: all + type: seqread + shard_info: + parallelism: 50 + reqsize: 128kB + shares: 200 +- name: cassandra-stress + shards: all + type: randread + data_size: 1GB + shard_info: + parallelism: 100 + reqsize: 1536 + shares: 1000 + rps: 75 + options: + pause_distribution: poisson + sleep_type: steady +- name: cassandra-stress-slight-imbalance + shards: [0] + type: randread + data_size: 1GB + shard_info: + parallelism: 100 + reqsize: 1536 + class: cassandra-stress + rps: 10 + options: + pause_distribution: poisson + sleep_type: steady + +EOF +) --io-properties-file=<(cat <<'EOF' +# i4i.2xlarge +disks: +- mountpoint: /dev + read_bandwidth: 1542559872 + read_iops: 218786 + write_bandwidth: 1130867072 + write_iops: 121499 +EOF +) diff --git a/tests/manual/iosched_reproducers/tau_nemesis.sh b/tests/manual/iosched_reproducers/tau_nemesis.sh new file mode 100644 index 0000000000..92031e38d4 --- /dev/null +++ b/tests/manual/iosched_reproducers/tau_nemesis.sh @@ -0,0 +1,58 @@ +#!/usr/bin/env bash + +# There is a `tau` mechanism in `fair_queue` which lets newly-activated +# IO classes to monopolize the shard's IO queue for a while. +# +# This isn't very useful and can result in major performance problems, +# as this test illustrates. The `highprio` workload could have tail latency +# of about 2 milliseconds, but the `bursty_lowprio` is allowed by `tau` to butt in +# periodically and preempt `highprio` for ~30ms, bringing its tail latency +# to that threshold. + +if [ $# -ne 1 ]; then + echo "Usage: $0 IO_TESTER_EXECUTABLE" >&2 + exit 1 +fi + +"$1" --smp=7 --storage=/dev/null --conf=<(cat <<'EOF' +- name: filler + data_size: 1GB + shards: all + type: seqread + shard_info: + parallelism: 10 + reqsize: 128kB + shares: 10 +- name: bursty_lowprio + data_size: 1GB + shards: all + type: seqread + shard_info: + parallelism: 1 + reqsize: 128kB + shares: 100 + batch: 50 + rps: 8 +- name: highprio + shards: all + type: randread + data_size: 1GB + shard_info: + parallelism: 100 + reqsize: 1536 + shares: 1000 + rps: 50 + options: + pause_distribution: poisson + sleep_type: steady +EOF +) --io-properties-file=<(cat <<'EOF' +# i4i.2xlarge +disks: +- mountpoint: /dev + read_bandwidth: 1542559872 + read_iops: 218786 + write_bandwidth: 1130867072 + write_iops: 121499 +EOF +) diff --git a/tests/unit/fair_queue_test.cc b/tests/unit/fair_queue_test.cc index fe620c7790..3574f46dd7 100644 --- a/tests/unit/fair_queue_test.cc +++ b/tests/unit/fair_queue_test.cc @@ -82,7 +82,14 @@ class test_env { test_env(unsigned capacity) : _fg(fg_config(capacity), 1) , _fq(_fg, fq_config()) - {} + { + // Move _fg._replenished_ts() to far future. + // This will prevent any `maybe_replenish_capacity` calls (indirectly done by `fair_queue::dispatch_requests()`) + // from replenishing tokens on its own, and ensure that the only source of replenishment will be tick(). + // + // Otherwise the rate of replenishment might be greater than expected by the test, breaking the results. + _fg.replenish_capacity(fair_group::clock_type::now() + std::chrono::days(1)); + } // As long as there is a request sitting in the queue, tick() will process // at least one request. The only situation in which tick() will return nothing