diff --git a/artiq/test/test_worker.py b/artiq/test/test_worker.py index 88e3ead829..583f5ea43b 100644 --- a/artiq/test/test_worker.py +++ b/artiq/test/test_worker.py @@ -31,7 +31,7 @@ def build(self): def run(self): for i in range(10): - with watchdog(0.5*s): + with watchdog(0.5 * s): sleep(0.1) @@ -40,19 +40,38 @@ def build(self): pass def run(self): - with watchdog(0.1*s): + with watchdog(0.1 * s): sleep(100.0) class WatchdogTimeoutInBuild(EnvExperiment): def build(self): - with watchdog(0.1*s): + with watchdog(0.1 * s): sleep(100.0) def run(self): pass +class DatasetAccess(EnvExperiment): + def build(self): + self.setattr_argument( + "array_length", + NumberValue(default=10000, scale=1, ndecimals=0, step=1, type="int"), + ) + + def run(self): + from random import random + from timeit import timeit + + data = [random() for _ in range(self.array_length)] + + duration = timeit( + lambda: self.set_dataset("test", data, broadcast=True), number=1 + ) + self.set_dataset("duration", duration, broadcast=True) + + async def _call_worker(worker, expid): try: await worker.build(0, "main", None, expid, 0) @@ -63,15 +82,15 @@ async def _call_worker(worker, expid): await worker.close() -def _run_experiment(class_name): +def _run_experiment(class_name, handlers={}, args={}): expid = { "log_level": logging.WARNING, "file": sys.modules[__name__].__file__, "class_name": class_name, - "arguments": dict() + "arguments": args, } loop = asyncio.get_event_loop() - worker = Worker({}) + worker = Worker(handlers) loop.run_until_complete(_call_worker(worker, expid)) @@ -91,8 +110,7 @@ def test_exception(self): with self.assertRaises(WorkerInternalException): _run_experiment("ExceptionTermination") self.assertGreater(len(logs.records), 0) - self.assertIn("Terminating with exception (TypeError)", - logs.output[-1]) + self.assertIn("Terminating with exception (TypeError)", logs.output[-1]) def test_watchdog_no_timeout(self): _run_experiment("WatchdogNoTimeout") @@ -105,5 +123,25 @@ def test_watchdog_timeout_in_build(self): with self.assertRaises(WorkerWatchdogTimeout): _run_experiment("WatchdogTimeoutInBuild") + def test_data_transfer_speed(self): + sizes = [1e3, 1e4, 1e5, 2e5, 5e5, 8e5, 1e6] + durations = [] + + def mock_update_dataset(*args, **kwargs): + dataset_name = args[0]["key"] + new_value = args[0]["value"][1] + if dataset_name == "duration": + durations.append(new_value) + + for size in sizes: + _run_experiment( + "DatasetAccess", + {"update_dataset": mock_update_dataset}, + {"array_length": size}, + ) + + for size, duration in zip(sizes, durations): + print("Writing %s elements took %.2f ms" % (size, 1e3 * duration)) + def tearDown(self): self.loop.close()