Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hardware system tests #53

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 32 additions & 122 deletions src/htss_rig_bluesky/plans/exercise.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,46 +7,12 @@

import bluesky.plan_stubs as bps
import bluesky.plans as bp
from dodal.beamlines.training_rig import TrainingRigSampleStage as SampleStage
import bluesky.preprocessors as bpp
from bluesky.protocols import Status
from bluesky.utils import MsgGenerator
from ophyd_async.epics.adaravis import AravisDetector
from ophyd_async.epics.motor import Motor

from .detector import ensure_detector_ready


def exercise_beamline(det: AravisDetector, sample: SampleStage) -> Generator:
"""
Perform all beamline exercise plans sequentially.

Args:
det: Detector
sample: Sample stage

Yields:
Plan
"""

yield from exercise_motors(sample)
yield from exercise_detector(det)
yield from exercise_scan(det, sample)


def exercise_motors(sample: SampleStage) -> Generator:
"""
exercise the motors on the sample stage.

Args:
sample: Sample stage

Yields:
Plan
"""

yield from exercise_motor(sample.x, -24.9, 14.0, tolerance=0.1)
yield from exercise_motor(
sample.theta, -1000.0, 1000.0, tolerance=0.1, check_limits=False
)


def exercise_detector(det: AravisDetector) -> Generator:
"""
Expand All @@ -59,105 +25,49 @@ def exercise_detector(det: AravisDetector) -> Generator:
Plan
"""

print(f"Excercising {det}")
yield from ensure_detector_ready(det)
yield from bp.count([det])


def exercise_scan(det: AravisDetector, sample: SampleStage) -> Generator:
def exercise_motor(motor: Motor) -> Generator:
"""
Perform a short scan to exercise the test rig.

Args:
det (AravisDetector): Detector
sample (SampleStage): Sample stage

Yields:
Plan
"""

print("Excercising scan")
yield from ensure_detector_ready(det)
yield from bp.scan([det], sample.theta, -180.0, 180.0, 10)


def exercise_motor(
motor: Motor,
low_limit: float,
high_limit: float,
tolerance: float = 0.0,
check_limits: bool = True,
) -> Generator:
"""
exercise a motor by making sure it can traverse between a low point
Exercise a motor by making sure it can traverse between a low point
and a high point.

Args:
motor: The motor
low_limit: Place to start
high_limit: Place to end
tolerance: Tolerance for checking motor position. Defaults to 0.0.
check_limits: Check whether the motor's limits fall within the bounds,
disable for limitless motors. Defaults to True.

Yields:
Plan
"""

name = motor.name
print(f"Excercising {name}")

if check_limits:
yield from assert_limits_within(motor, low_limit, high_limit)
low_limit, high_limit = yield from get_limits(motor, padding=0.2)

@bpp.run_decorator()
@bpp.stage_decorator([motor])
def move_and_monitor():
status: Status = yield from bps.abs_set(
motor,
high_limit,
wait=False,
group=move_and_monitor.__name__,
)
while not status.done:
yield from bps.trigger_and_read([motor])
yield from bps.sleep(0.1)
yield from bps.wait(group=move_and_monitor.__name__)

# Perform plan as max velocity
max_velocity = yield from bps.rd(motor.velocity)
yield from bps.abs_set(motor.velocity, max_velocity)

# Move to start point
yield from bps.abs_set(motor, low_limit, wait=True)
yield from assert_motor_at(motor, low_limit, tolerance)
yield from bps.abs_set(motor, high_limit, wait=True)
yield from assert_motor_at(motor, high_limit, tolerance)

# Traverse to high limit, taking a snapshot of motor's position at 10Hz
yield from move_and_monitor()

def assert_limits_within(
motor: Motor, low_limit: float, high_limit: float
) -> Generator:
"""
Check a motors limits fall within the bounds supplied.
Note this is not an exact check, just whether the real limits exceed
the "limit limits" supplied.

Args:
motor: The motor with limits
low_limit: The lower bound
high_limit: The upper bound
"""

name = motor.name
motor_high_limit: float = yield from bps.rd(motor.high_limit_travel)
motor_low_limit: float = yield from bps.rd(motor.low_limit_travel)
assert motor_high_limit >= high_limit, (
f"{name}'s upper limit is {motor.high_limit_travel}, should be >= {high_limit}"
)
assert motor_low_limit <= low_limit, (
f"{name}'s lower limit is {motor_low_limit}, should be <= {low_limit}"
)


def assert_motor_at(motor: Motor, pos: float, tolerance: float = 0.0) -> Generator:
"""
Check a motor has reached a required position

Args:
motor: The motor to check
pos: The required position
tolerance: Plus or minus tolerance, useful for
less precise motors. Defaults to 0.0.

Yields:
Plan
"""

actual_pos = yield from bps.rd(motor)
upper_bound = pos + (tolerance / 2.0)
lower_bound = pos - (tolerance / 2.0)
assert upper_bound >= actual_pos >= lower_bound, (
f"{motor.name} is at {actual_pos}, "
)
f"should be between {lower_bound} and {upper_bound}"
def get_limits(motor: Motor, padding: float = 0.0) -> MsgGenerator[tuple[float, float]]:
low_limit = yield from bps.rd(motor.low_limit_travel)
high_limit = yield from bps.rd(motor.high_limit_travel)
return low_limit + padding, high_limit - padding
19 changes: 0 additions & 19 deletions tests/system_tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,11 @@
import pytest
from blueapi.client.client import BlueapiClient
from blueapi.config import ApplicationConfig, RestConfig, StompConfig
from blueapi.worker.task import Task
from bluesky_stomp.models import BasicAuthentication

from htss_rig_bluesky.names import BEAMLINE


@pytest.fixture
def task_definition() -> dict[str, Task]:
return {
"step_scan_plan": Task(
name="step_scan_plan",
params={"detectors": "det", "motor": "sample_stage.theta"},
),
"fly_and_collect_plan": Task(
name="fly_and_collect_plan",
params={"panda": "panda", "diff": "det"},
),
"log_scan_plan": Task(
name="log_scan_plan",
params={"detectors": "det", "motor": "sample_stage.x"},
),
}


@pytest.fixture
def config() -> ApplicationConfig:
if BEAMLINE == "p46":
Expand Down
Loading
Loading