Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Plans for controlling backlight #51

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ dependencies = [
"pandablocks",
"scipy",
"tiled==0.1.0a91",
"ophyd_async>=0.3.4",
"ophyd_async>=0.9.0a2",
]
dynamic = ["version"]
license.file = "LICENSE"
Expand All @@ -44,6 +44,7 @@ dev = [
"pydata-sphinx-theme>=0.12",
"pytest",
"pytest-cov",
"pytest-asyncio",
"ruff",
"sphinx-autobuild",
"sphinx-copybutton",
Expand Down Expand Up @@ -80,9 +81,8 @@ addopts = """
filterwarnings = "error"
# Doctest python code in docs, python code in src docstrings, test functions in tests
testpaths = "docs tests"
env= [
"BEAMLINE=",
]
env = ["BEAMLINE="]
asyncio_mode = "auto"

[tool.coverage.run]
data_file = "/tmp/htss_rig_bluesky.coverage"
Expand Down
81 changes: 81 additions & 0 deletions src/htss_rig_bluesky/plans/backlight.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import bluesky.plan_stubs as bps
from bluesky.utils import MsgGenerator
from ophyd_async.fastcs.panda import HDFPanda, TimeUnits


def set_backlight_intensity(
panda: HDFPanda,
intensity: float,
wait: bool = True,
group: str | None = None,
) -> MsgGenerator[None]:
if 0.0 <= intensity <= 1.0:
group = group or set_backlight_intensity.__name__
duty_cycle = int(round(intensity * 100))

if duty_cycle == 0:
port = "ZERO"
elif duty_cycle == 100:
port = "ONE"
else:
port = yield from create_pwm(panda, intensity, wait=False, group=group)

Check warning on line 21 in src/htss_rig_bluesky/plans/backlight.py

View check run for this annotation

Codecov / codecov/patch

src/htss_rig_bluesky/plans/backlight.py#L21

Added line #L21 was not covered by tests

yield from set_backlight_control_port(panda, port, wait=False, group=group)

if wait:
yield from bps.wait(group=group)
else:
raise ValueError(f"Given intensity {intensity} should be between 0.0 and 1.0")

Check warning on line 28 in src/htss_rig_bluesky/plans/backlight.py

View check run for this annotation

Codecov / codecov/patch

src/htss_rig_bluesky/plans/backlight.py#L28

Added line #L28 was not covered by tests


def create_pwm(
panda: HDFPanda,
duty_cycle: float,
period: float = 100.0,
wait: bool = True,
group: str | None = None,
) -> MsgGenerator[str]:
live_time = period * duty_cycle
dead_time = period - live_time

Check warning on line 39 in src/htss_rig_bluesky/plans/backlight.py

View check run for this annotation

Codecov / codecov/patch

src/htss_rig_bluesky/plans/backlight.py#L38-L39

Added lines #L38 - L39 were not covered by tests

clock = panda.clock[2]
pulse = panda.pulse[2]

Check warning on line 42 in src/htss_rig_bluesky/plans/backlight.py

View check run for this annotation

Codecov / codecov/patch

src/htss_rig_bluesky/plans/backlight.py#L41-L42

Added lines #L41 - L42 were not covered by tests

group = group or create_pwm.__name__

Check warning on line 44 in src/htss_rig_bluesky/plans/backlight.py

View check run for this annotation

Codecov / codecov/patch

src/htss_rig_bluesky/plans/backlight.py#L44

Added line #L44 was not covered by tests

for signal, value in {

Check warning on line 46 in src/htss_rig_bluesky/plans/backlight.py

View check run for this annotation

Codecov / codecov/patch

src/htss_rig_bluesky/plans/backlight.py#L46

Added line #L46 was not covered by tests
# Set units first
clock.period_units: TimeUnits.US,
pulse.delay_units: TimeUnits.US,
pulse.width_units: TimeUnits.US,
# Then set all other values
clock.enable: "ONE",
clock.enable_delay: 0.0,
clock.period: period,
pulse.enable: "ONE",
pulse.trig: "CLOCK2.OUT",
pulse.delay: dead_time,
pulse.width: live_time,
pulse.trig_edge: "Rising",
}.items():
yield from bps.abs_set(signal, value, wait=False, group=group)

Check warning on line 61 in src/htss_rig_bluesky/plans/backlight.py

View check run for this annotation

Codecov / codecov/patch

src/htss_rig_bluesky/plans/backlight.py#L61

Added line #L61 was not covered by tests

if wait:
yield from bps.wait(group)

Check warning on line 64 in src/htss_rig_bluesky/plans/backlight.py

View check run for this annotation

Codecov / codecov/patch

src/htss_rig_bluesky/plans/backlight.py#L63-L64

Added lines #L63 - L64 were not covered by tests

return "PULSE2.OUT"

Check warning on line 66 in src/htss_rig_bluesky/plans/backlight.py

View check run for this annotation

Codecov / codecov/patch

src/htss_rig_bluesky/plans/backlight.py#L66

Added line #L66 was not covered by tests


def set_backlight_control_port(
panda: HDFPanda,
port: str,
wait: bool = True,
group: str | None = None,
) -> MsgGenerator[None]:
output_port = panda.ttlout[2]
yield from bps.abs_set(
output_port.val,
port,
wait=wait,
group=group,
)
56 changes: 56 additions & 0 deletions tests/unit_tests/plans/test_backlight.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import pytest
from bluesky import RunEngine
from dodal.beamlines import training_rig as training_rig
from ophyd_async.core import (
Device,
DeviceVector,
)
from ophyd_async.epics.core import epics_signal_rw
from ophyd_async.fastcs.panda import HDFPanda
from ophyd_async.plan_stubs import ensure_connected

from htss_rig_bluesky.plans.backlight import (
set_backlight_control_port,
set_backlight_intensity,
)


@pytest.fixture
def run_engine() -> RunEngine:
return RunEngine()


@pytest.fixture
async def mock_panda(
run_engine: RunEngine,
):
class TtlOutBlock(Device):
def __init__(self, name: str = ""):
self.val = epics_signal_rw(str, "VAL")
super().__init__(name)

mock_panda = training_rig.panda()
mock_panda.phase_1_signal_units = epics_signal_rw(int, "")
mock_panda.ttlout = DeviceVector({i: TtlOutBlock() for i in range(1, 5)})

run_engine(ensure_connected(mock_panda, mock=True))
yield mock_panda


async def test_set_control_port_sets_control_port(
run_engine: RunEngine, mock_panda: HDFPanda
):
assert (await mock_panda.ttlout[2].val.get_value()) == ""
run_engine(set_backlight_control_port(mock_panda, "FOO"), wait=True)
assert (await mock_panda.ttlout[2].val.get_value()) == "FOO"


@pytest.mark.parametrize("intensity,port_value", [(0.0, "ZERO"), (1.0, "ONE")])
async def test_constant_port_value(
run_engine: RunEngine,
mock_panda: HDFPanda,
intensity: float,
port_value: str,
):
run_engine(set_backlight_intensity(mock_panda, intensity), wait=True)
assert (await mock_panda.ttlout[2].val.get_value()) == port_value
Loading