diff --git a/src/htss_rig_bluesky/plans/exercise.py b/src/htss_rig_bluesky/plans/exercise.py index cc5b33b..9ddc5d7 100644 --- a/src/htss_rig_bluesky/plans/exercise.py +++ b/src/htss_rig_bluesky/plans/exercise.py @@ -7,46 +7,12 @@ import bluesky.plan_stubs as bps import bluesky.plans as bp -from dodal.beamlines.training_rig import TrainingRigSampleStage as SampleStage +import bluesky.preprocessors as bpp +from bluesky.protocols import Status +from bluesky.utils import MsgGenerator from ophyd_async.epics.adaravis import AravisDetector from ophyd_async.epics.motor import Motor -from .detector import ensure_detector_ready - - -def exercise_beamline(det: AravisDetector, sample: SampleStage) -> Generator: - """ - Perform all beamline exercise plans sequentially. - - Args: - det: Detector - sample: Sample stage - - Yields: - Plan - """ - - yield from exercise_motors(sample) - yield from exercise_detector(det) - yield from exercise_scan(det, sample) - - -def exercise_motors(sample: SampleStage) -> Generator: - """ - exercise the motors on the sample stage. - - Args: - sample: Sample stage - - Yields: - Plan - """ - - yield from exercise_motor(sample.x, -24.9, 14.0, tolerance=0.1) - yield from exercise_motor( - sample.theta, -1000.0, 1000.0, tolerance=0.1, check_limits=False - ) - def exercise_detector(det: AravisDetector) -> Generator: """ @@ -59,105 +25,49 @@ def exercise_detector(det: AravisDetector) -> Generator: Plan """ - print(f"Excercising {det}") - yield from ensure_detector_ready(det) yield from bp.count([det]) -def exercise_scan(det: AravisDetector, sample: SampleStage) -> Generator: +def exercise_motor(motor: Motor) -> Generator: """ - Perform a short scan to exercise the test rig. - - Args: - det (AravisDetector): Detector - sample (SampleStage): Sample stage - - Yields: - Plan - """ - - print("Excercising scan") - yield from ensure_detector_ready(det) - yield from bp.scan([det], sample.theta, -180.0, 180.0, 10) - - -def exercise_motor( - motor: Motor, - low_limit: float, - high_limit: float, - tolerance: float = 0.0, - check_limits: bool = True, -) -> Generator: - """ - exercise a motor by making sure it can traverse between a low point + Exercise a motor by making sure it can traverse between a low point and a high point. Args: motor: The motor - low_limit: Place to start - high_limit: Place to end - tolerance: Tolerance for checking motor position. Defaults to 0.0. - check_limits: Check whether the motor's limits fall within the bounds, - disable for limitless motors. Defaults to True. Yields: Plan """ - name = motor.name - print(f"Excercising {name}") - - if check_limits: - yield from assert_limits_within(motor, low_limit, high_limit) + low_limit, high_limit = yield from get_limits(motor, padding=0.2) + + @bpp.run_decorator() + @bpp.stage_decorator([motor]) + def move_and_monitor(): + status: Status = yield from bps.abs_set( + motor, + high_limit, + wait=False, + group=move_and_monitor.__name__, + ) + while not status.done: + yield from bps.trigger_and_read([motor]) + yield from bps.sleep(0.1) + yield from bps.wait(group=move_and_monitor.__name__) + + # Perform plan as max velocity + max_velocity = yield from bps.rd(motor.velocity) + yield from bps.abs_set(motor.velocity, max_velocity) + + # Move to start point yield from bps.abs_set(motor, low_limit, wait=True) - yield from assert_motor_at(motor, low_limit, tolerance) - yield from bps.abs_set(motor, high_limit, wait=True) - yield from assert_motor_at(motor, high_limit, tolerance) + # Traverse to high limit, taking a snapshot of motor's position at 10Hz + yield from move_and_monitor() -def assert_limits_within( - motor: Motor, low_limit: float, high_limit: float -) -> Generator: - """ - Check a motors limits fall within the bounds supplied. - Note this is not an exact check, just whether the real limits exceed - the "limit limits" supplied. - - Args: - motor: The motor with limits - low_limit: The lower bound - high_limit: The upper bound - """ - - name = motor.name - motor_high_limit: float = yield from bps.rd(motor.high_limit_travel) - motor_low_limit: float = yield from bps.rd(motor.low_limit_travel) - assert motor_high_limit >= high_limit, ( - f"{name}'s upper limit is {motor.high_limit_travel}, should be >= {high_limit}" - ) - assert motor_low_limit <= low_limit, ( - f"{name}'s lower limit is {motor_low_limit}, should be <= {low_limit}" - ) - - -def assert_motor_at(motor: Motor, pos: float, tolerance: float = 0.0) -> Generator: - """ - Check a motor has reached a required position - - Args: - motor: The motor to check - pos: The required position - tolerance: Plus or minus tolerance, useful for - less precise motors. Defaults to 0.0. - - Yields: - Plan - """ - actual_pos = yield from bps.rd(motor) - upper_bound = pos + (tolerance / 2.0) - lower_bound = pos - (tolerance / 2.0) - assert upper_bound >= actual_pos >= lower_bound, ( - f"{motor.name} is at {actual_pos}, " - ) - f"should be between {lower_bound} and {upper_bound}" +def get_limits(motor: Motor, padding: float = 0.0) -> MsgGenerator[tuple[float, float]]: + low_limit = yield from bps.rd(motor.low_limit_travel) + high_limit = yield from bps.rd(motor.high_limit_travel) + return low_limit + padding, high_limit - padding diff --git a/tests/system_tests/test_plans.py b/tests/system_tests/test_plans.py index f9b29ec..28a3879 100644 --- a/tests/system_tests/test_plans.py +++ b/tests/system_tests/test_plans.py @@ -55,6 +55,12 @@ def test_device_present(client: BlueapiClient, device: str): assert client.get_device(device), f"{device} is not available" +@pytest.mark.parametrize("motor", ["sample_stage.x", "sample_stage.theta"]) +def test_motor_behavoir(client: BlueapiClient, motor: str): + task = Task(name="exercise_motor", params={"motor": motor}) + run_plan_test(client, task) + + @pytest.mark.parametrize( "task", [