diff --git a/esrally/driver/scheduler.py b/esrally/driver/scheduler.py index b8838f338..1613885b3 100644 --- a/esrally/driver/scheduler.py +++ b/esrally/driver/scheduler.py @@ -205,6 +205,9 @@ class Unthrottled(Scheduler): def next(self, current): return 0 + def __str__(self): + return "unthrottled" + class DeterministicScheduler(SimpleScheduler): """ @@ -264,9 +267,19 @@ def after_request(self, now, weight, unit, request_meta_data): expected_unit = self.task.target_throughput.unit actual_unit = f"{unit}/s" if actual_unit != expected_unit: - raise exceptions.RallyAssertionError(f"Target throughput for [{self.task}] is specified in " - f"[{expected_unit}] but the task throughput is measured " - f"in [{actual_unit}].") + # *temporary* workaround to convert mismatching units to ops/s to stay backwards-compatible. + # + # This ensures that we throttle based on ops/s but report based on the original unit (as before). + if expected_unit == "ops/s": + weight = 1 + if self.first_request: + logging.getLogger(__name__).warning("Task [%s] throttles based on [%s] but reports [%s]. " + "Please specify the target throughput in [%s] instead.", + self.task, expected_unit, actual_unit, actual_unit) + else: + raise exceptions.RallyAssertionError(f"Target throughput for [{self.task}] is specified in " + f"[{expected_unit}] but the task throughput is measured " + f"in [{actual_unit}].") self.first_request = False self.current_weight = weight diff --git a/esrally/track/loader.py b/esrally/track/loader.py index 8189da0e8..7ac92f06a 100644 --- a/esrally/track/loader.py +++ b/esrally/track/loader.py @@ -20,6 +20,7 @@ import logging import os import re +import sys import tempfile import urllib.error @@ -794,8 +795,12 @@ def post_process_for_test_mode(t): if logger.isEnabledFor(logging.DEBUG): logger.debug("Resetting measurement time period for [%s] to [%d] seconds.", str(leaf_task), leaf_task.time_period) - leaf_task.params.pop("target-throughput", None) - leaf_task.params.pop("target-interval", None) + # Keep throttled to expose any errors but increase the target throughput for short execution times. + if leaf_task.throttled: + original_throughput = leaf_task.target_throughput + leaf_task.params.pop("target-throughput", None) + leaf_task.params.pop("target-interval", None) + leaf_task.params["target-throughput"] = f"{sys.maxsize} {original_throughput.unit}" return t diff --git a/tests/driver/scheduler_test.py b/tests/driver/scheduler_test.py index 14a96320c..d79914fbb 100644 --- a/tests/driver/scheduler_test.py +++ b/tests/driver/scheduler_test.py @@ -100,6 +100,25 @@ def test_scheduler_adapts_to_changed_weights(self): s.after_request(now=None, weight=10000, unit="docs", request_meta_data=None) self.assertEqual(2 * task.clients, s.next(0)) + def test_scheduler_accepts_differing_units_pages_and_ops(self): + task = track.Task(name="scroll-query", + operation=track.Operation( + name="scroll-query", + operation_type=track.OperationType.Search.name), + clients=1, + params={ + # implicitly: ops/s + "target-throughput": 10 + }) + + s = scheduler.UnitAwareScheduler(task=task, scheduler_class=scheduler.DeterministicScheduler) + # first request is unthrottled + self.assertEqual(0, s.next(0)) + # no exception despite differing units ... + s.after_request(now=None, weight=20, unit="pages", request_meta_data=None) + # ... and it is still throttled in ops/s + self.assertEqual(0.1 * task.clients, s.next(0)) + class SchedulerCategorizationTests(TestCase): class LegacyScheduler: