Skip to content

Commit

Permalink
Create plan for exercising motor
Browse files Browse the repository at this point in the history
  • Loading branch information
callumforrester committed Feb 6, 2025
1 parent cc05d7b commit 4ee8a72
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 122 deletions.
154 changes: 32 additions & 122 deletions src/htss_rig_bluesky/plans/exercise.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,46 +7,12 @@

import bluesky.plan_stubs as bps
import bluesky.plans as bp
from dodal.beamlines.training_rig import TrainingRigSampleStage as SampleStage
import bluesky.preprocessors as bpp
from bluesky.protocols import Status
from bluesky.utils import MsgGenerator
from ophyd_async.epics.adaravis import AravisDetector
from ophyd_async.epics.motor import Motor

from .detector import ensure_detector_ready


def exercise_beamline(det: AravisDetector, sample: SampleStage) -> Generator:
"""
Perform all beamline exercise plans sequentially.
Args:
det: Detector
sample: Sample stage
Yields:
Plan
"""

yield from exercise_motors(sample)
yield from exercise_detector(det)
yield from exercise_scan(det, sample)


def exercise_motors(sample: SampleStage) -> Generator:
"""
exercise the motors on the sample stage.
Args:
sample: Sample stage
Yields:
Plan
"""

yield from exercise_motor(sample.x, -24.9, 14.0, tolerance=0.1)
yield from exercise_motor(
sample.theta, -1000.0, 1000.0, tolerance=0.1, check_limits=False
)


def exercise_detector(det: AravisDetector) -> Generator:
"""
Expand All @@ -59,105 +25,49 @@ def exercise_detector(det: AravisDetector) -> Generator:
Plan
"""

print(f"Excercising {det}")
yield from ensure_detector_ready(det)
yield from bp.count([det])


def exercise_scan(det: AravisDetector, sample: SampleStage) -> Generator:
def exercise_motor(motor: Motor) -> Generator:
"""
Perform a short scan to exercise the test rig.
Args:
det (AravisDetector): Detector
sample (SampleStage): Sample stage
Yields:
Plan
"""

print("Excercising scan")
yield from ensure_detector_ready(det)
yield from bp.scan([det], sample.theta, -180.0, 180.0, 10)


def exercise_motor(
motor: Motor,
low_limit: float,
high_limit: float,
tolerance: float = 0.0,
check_limits: bool = True,
) -> Generator:
"""
exercise a motor by making sure it can traverse between a low point
Exercise a motor by making sure it can traverse between a low point
and a high point.
Args:
motor: The motor
low_limit: Place to start
high_limit: Place to end
tolerance: Tolerance for checking motor position. Defaults to 0.0.
check_limits: Check whether the motor's limits fall within the bounds,
disable for limitless motors. Defaults to True.
Yields:
Plan
"""

name = motor.name
print(f"Excercising {name}")

if check_limits:
yield from assert_limits_within(motor, low_limit, high_limit)
low_limit, high_limit = yield from get_limits(motor, padding=0.2)

@bpp.run_decorator()
@bpp.stage_decorator([motor])
def move_and_monitor():
status: Status = yield from bps.abs_set(
motor,
high_limit,
wait=False,
group=move_and_monitor.__name__,
)
while not status.done:
yield from bps.trigger_and_read([motor])
yield from bps.sleep(0.1)
yield from bps.wait(group=move_and_monitor.__name__)

# Perform plan as max velocity
max_velocity = yield from bps.rd(motor.velocity)
yield from bps.abs_set(motor.velocity, max_velocity)

# Move to start point
yield from bps.abs_set(motor, low_limit, wait=True)
yield from assert_motor_at(motor, low_limit, tolerance)
yield from bps.abs_set(motor, high_limit, wait=True)
yield from assert_motor_at(motor, high_limit, tolerance)

# Traverse to high limit, taking a snapshot of motor's position at 10Hz
yield from move_and_monitor()

def assert_limits_within(
motor: Motor, low_limit: float, high_limit: float
) -> Generator:
"""
Check a motors limits fall within the bounds supplied.
Note this is not an exact check, just whether the real limits exceed
the "limit limits" supplied.
Args:
motor: The motor with limits
low_limit: The lower bound
high_limit: The upper bound
"""

name = motor.name
motor_high_limit: float = yield from bps.rd(motor.high_limit_travel)
motor_low_limit: float = yield from bps.rd(motor.low_limit_travel)
assert motor_high_limit >= high_limit, (
f"{name}'s upper limit is {motor.high_limit_travel}, should be >= {high_limit}"
)
assert motor_low_limit <= low_limit, (
f"{name}'s lower limit is {motor_low_limit}, should be <= {low_limit}"
)


def assert_motor_at(motor: Motor, pos: float, tolerance: float = 0.0) -> Generator:
"""
Check a motor has reached a required position
Args:
motor: The motor to check
pos: The required position
tolerance: Plus or minus tolerance, useful for
less precise motors. Defaults to 0.0.
Yields:
Plan
"""

actual_pos = yield from bps.rd(motor)
upper_bound = pos + (tolerance / 2.0)
lower_bound = pos - (tolerance / 2.0)
assert upper_bound >= actual_pos >= lower_bound, (
f"{motor.name} is at {actual_pos}, "
)
f"should be between {lower_bound} and {upper_bound}"
def get_limits(motor: Motor, padding: float = 0.0) -> MsgGenerator[tuple[float, float]]:
low_limit = yield from bps.rd(motor.low_limit_travel)
high_limit = yield from bps.rd(motor.high_limit_travel)
return low_limit + padding, high_limit - padding
6 changes: 6 additions & 0 deletions tests/system_tests/test_plans.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ def test_device_present(client: BlueapiClient, device: str):
assert client.get_device(device), f"{device} is not available"


@pytest.mark.parametrize("motor", ["sample_stage.x", "sample_stage.theta"])
def test_motor_behavoir(client: BlueapiClient, motor: str):
task = Task(name="exercise_motor", params={"motor": motor})
run_plan_test(client, task)


@pytest.mark.parametrize(
"task",
[
Expand Down

0 comments on commit 4ee8a72

Please sign in to comment.