diff --git a/doc/configuration.rst b/doc/configuration.rst index 4a3ffea66..a446dccb5 100644 --- a/doc/configuration.rst +++ b/doc/configuration.rst @@ -547,25 +547,77 @@ NetworkHIDRelay +++++++++++++++ A :any:`NetworkHIDRelay` describes an `HIDRelay`_ exported over the network. +LibGPIO ++++++++ + +A :any:`LibGPIO` resouce describes a gpiochip and it's line via `the libgpiod character device kernel interface `. + +.. code-block:: yaml + + LibGPIO: + gpiochip: '/dev/gpiochip0' + line: 0 + active_low: False + +Arguments: + - gpiochip (str): device name of the gpiochip + - line (int): line number on the gpiochip + - active_low (bool, default=False): optional, invert the logical line value when True + +Used by: + - `LibGPIODigitalOutputDriver`_ + +MatchedLibGPIO +++++++++++++++ +A :any:`MatchedSysfsGPIO` describes a gpiochip and it's line via `the libgpiod character device kernel interface `. + +The gpiochip is identified by matching udev properties. This allows +identification through hot-plugging or rebooting for controllers like +USB based gpiochips. + +.. code-block:: yaml + + MatchedSysfsGPIO: + match: + '@SUBSYSTEM': 'usb' + '@ID_SERIAL_SHORT': 'D38EJ8LF' + line: 0 + active_low: False + +The example would search for a USB gpiochip with the key `ID_SERIAL_SHORT` +and the value `D38EJ8LF` and use line 0 of this device. +The `ID_SERIAL_SHORT` property is set by the usb_id builtin helper program. + +Arguments: + - match (dict): key and value pairs for a udev match, see `udev Matching`_ + - line (int): line number on the matched gpiochip. + - active_low (bool, default=False): optional, invert the logical line value when True + +Used by: + - `LibGPIODigitalOutputDriver`_ + SysfsGPIO +++++++++ -A :any:`SysfsGPIO` resource describes a GPIO line. +A :any:`SysfsGPIO` resource describes a GPIO line via `the sysfs kernel interface ` which has been deprecated. Please use `LibGPIO`_ instead. .. code-block:: yaml SysfsGPIO: index: 12 + active_low: False Arguments: - index (int): index of the GPIO line + - active_low (bool, default=False): optional, invert the logical line value when True Used by: - `GpioDigitalOutputDriver`_ MatchedSysfsGPIO ++++++++++++++++ -A :any:`MatchedSysfsGPIO` describes a GPIO line, like a `SysfsGPIO`_. +A :any:`MatchedSysfsGPIO` describes a GPIO line, like a `SysfsGPIO`_ via `the sysfs kernel interface ` which has been deprecated. Please use `MatchedLibGPIO`_ instead. + The gpiochip is identified by matching udev properties. This allows identification through hot-plugging or rebooting for controllers like USB based gpiochips. @@ -577,6 +629,7 @@ USB based gpiochips. '@SUBSYSTEM': 'usb' '@ID_SERIAL_SHORT': 'D38EJ8LF' pin: 0 + active_low: False The example would search for a USB gpiochip with the key `ID_SERIAL_SHORT` and the value `D38EJ8LF` and use the pin 0 of this device. @@ -585,6 +638,7 @@ The `ID_SERIAL_SHORT` property is set by the usb_id builtin helper program. Arguments: - match (dict): key and value pairs for a udev match, see `udev Matching`_ - pin (int): gpio pin number within the matched gpiochip. + - active_low (bool, default=False): optional, invert the logical line value when True Used by: - `GpioDigitalOutputDriver`_ @@ -2213,11 +2267,103 @@ Implements: Arguments: - delay (float, default=2.0): delay in seconds between off and on +ManualButtonDriver +~~~~~~~~~~~~~~~~~~ +A :any:`ManualButtonDriver` requires the user to control the taget button. +Theis is required if a strategy is used with the target, but no automatic +button control is available. + +The driver's name will be displayed during interaction. + +Binds to: + - None + +Implements: + - :any:`ButtonProtocol` + +.. code-block:: yaml + + MantualButtonDriver: + name: 'example-board' + +Arguments: + - None + +ExternalButtonDriver +~~~~~~~~~~~~~~~~~~~~ +An :any:`ExternalButtonDriver` is used to control a target button via an +external command. + +Binds to: + - None + +Implements: + - :any:`ButtonProtocol` + +.. code-block:: yaml + + ExternalButtonDriver: + cmd_press: 'example_command press and hold' + cmd_release: 'example_command release' + cmd_press_for: 'example_command press_for' + delay: 2.0 + +Arguments: + - cmd_press (str): command to press and hold the button on the board + - cmd_release (str): command to release the button on the board + - cmd_press_for (str): command to press, pause, and release the button on the board + - delay (float, default=1.0): delay in seconds when calling press_for + +DigitalOutputButtonDriver +~~~~~~~~~~~~~~~~~~~~~~~~~ +An :any:`DigitalOutputButtonDriver` is used to control a target button via a +DigitalOutputDriver + +Binds to: + - :any:`DigitalOutputProtocol` + +Implements: + - :any:`ButtonProtocol` + +.. code-block:: yaml + + DigitalOutputButtonDriver: + delay: 2.0 + +Arguments: + - delay (float, default=1.0): delay in seconds when calling press_for + +LibGPIODigitalOutputDriver +~~~~~~~~~~~~~~~~~~~~~~~~~~ +The :any:`LibGPIODigitalOutputDriver` writes a digital signal to a GPIO line. + +This driver configures GPIO lines via `the libgpiod character device kernel interface `. +While the driver automatically exports the GPIO, it does not configure it in any other way than as an output. + +Binds to: + gpio: + - `LibGPIO`_ + - NetworkLibGPIO + +Implements: + - :any:`DigitalOutputProtocol` + - :any:`ResetProtocol` + - :any:`PowerProtocol` + - :any:`ButtonProtocol` + +.. code-block:: yaml + + LibGPIODigitalOutputDriver: + delay: 2.0 + +Arguments: + - delay (float, default=1.0): delay in seconds between off and on for a power cycle or between states for button press_for + GpioDigitalOutputDriver ~~~~~~~~~~~~~~~~~~~~~~~ The :any:`GpioDigitalOutputDriver` writes a digital signal to a GPIO line. -This driver configures GPIO lines via `the sysfs kernel interface `. +This driver configures GPIO lines via `the sysfs kernel interface ` which has been deprecated. Please use `LibGPIODigitalOutputDriver`_ instead. While the driver automatically exports the GPIO, it does not configure it in any other way than as an output. Binds to: @@ -2228,13 +2374,17 @@ Binds to: Implements: - :any:`DigitalOutputProtocol` + - :any:`ResetProtocol` + - :any:`PowerProtocol` + - :any:`ButtonProtocol` .. code-block:: yaml - GpioDigitalOutputDriver: {} + GpioDigitalOutputDriver: + delay: 2.0 Arguments: - - None + - delay (float, default=1.0): delay in seconds between off and on for a power cycle or between states for button press_for SerialPortDigitalOutputDriver ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/examples/libgpio/export-gpio.yaml b/examples/libgpio/export-gpio.yaml new file mode 100644 index 000000000..07d1d48ae --- /dev/null +++ b/examples/libgpio/export-gpio.yaml @@ -0,0 +1,5 @@ +desk: + LibGPIO: + gpiochip: '/dev/gpiochip0' + line: 10 + active_low: False diff --git a/examples/libgpio/import-gpio.yaml b/examples/libgpio/import-gpio.yaml new file mode 100644 index 000000000..22783734f --- /dev/null +++ b/examples/libgpio/import-gpio.yaml @@ -0,0 +1,7 @@ +targets: + main: + resources: + RemotePlace: + name: gpio + drivers: + LibGPIODigitalOutputDriver: {} diff --git a/examples/libgpio/libgpio.py b/examples/libgpio/libgpio.py new file mode 100644 index 000000000..76de9ca2d --- /dev/null +++ b/examples/libgpio/libgpio.py @@ -0,0 +1,61 @@ +import logging +import time + +from labgrid import Target +from labgrid.logging import basicConfig, StepLogger +from labgrid.driver import LibGPIODigitalOutputDriver +from labgrid.resource import LibGPIO + +# enable info logging +basicConfig(level=logging.INFO) + +# show labgrid steps on the console +StepLogger.start() + +t = Target("main") +r = LibGPIO(t, name=None, gpiochip="/dev/gpiochip0", line=10, active_low=True) +d = LibGPIODigitalOutputDriver(t, name=None) + +p = t.get_driver("DigitalOutputProtocol") +print(t.resources) +print("Testing IO") +p.set(True) +print(p.get()) +time.sleep(2) +p.set(False) +print(p.get()) +time.sleep(2) +p.set(True) +print(p.get()) +time.sleep(2) +p.invert() +print(p.get()) +time.sleep(2) +p.invert() +print(p.get()) +time.sleep(2) + +print("Testing Power") +p.off() +print(p.get()) +time.sleep(2) +p.on() +print(p.get()) +time.sleep(2) +p.cycle() +print(p.get()) +time.sleep(2) + +print("Testing Button") +p.release() +print(p.get()) +time.sleep(2) +p.press() +print(p.get()) +time.sleep(2) +p.release() +print(p.get()) +time.sleep(2) +p.press_for() +print(p.get()) + diff --git a/examples/libgpio/libgpio_remote.py b/examples/libgpio/libgpio_remote.py new file mode 100644 index 000000000..55357e316 --- /dev/null +++ b/examples/libgpio/libgpio_remote.py @@ -0,0 +1,58 @@ +import logging +import time + +from labgrid import Environment +from labgrid.logging import basicConfig, StepLogger + +# enable info logging +basicConfig(level=logging.INFO) + +# show labgrid steps on the console +StepLogger.start() + +e = Environment("import-gpio.yaml") +t = e.get_target() + +p = t.get_driver("DigitalOutputProtocol") +print(t.resources) +print("Testing IO") +p.set(True) +print(p.get()) +time.sleep(2) +p.set(False) +print(p.get()) +time.sleep(2) +p.set(True) +print(p.get()) +time.sleep(2) +p.invert() +print(p.get()) +time.sleep(2) +p.invert() +print(p.get()) +time.sleep(2) + +print("Testing Power") +p.off() +print(p.get()) +time.sleep(2) +p.on() +print(p.get()) +time.sleep(2) +p.cycle() +print(p.get()) +time.sleep(2) + +print("Testing Button") +p.release() +print(p.get()) +time.sleep(2) +p.press() +print(p.get()) +time.sleep(2) +p.release() +print(p.get()) +time.sleep(2) +p.press_for() +print(p.get()) + diff --git a/examples/sysfsgpio/export-gpio.yaml b/examples/sysfsgpio/export-gpio.yaml index 489befd16..1fa77f9c6 100644 --- a/examples/sysfsgpio/export-gpio.yaml +++ b/examples/sysfsgpio/export-gpio.yaml @@ -1,3 +1,4 @@ desk: - GpioDigitalOutputDriver: + SysfsGPIO: index: 60 + active_low: False diff --git a/examples/sysfsgpio/import-gpio.yaml b/examples/sysfsgpio/import-gpio.yaml index 4ba7b223f..40954a448 100644 --- a/examples/sysfsgpio/import-gpio.yaml +++ b/examples/sysfsgpio/import-gpio.yaml @@ -5,5 +5,3 @@ targets: name: gpio drivers: GpioDigitalOutputDriver: {} -options: - coordinator_address: 'labgrid:20408' diff --git a/examples/sysfsgpio/sysfsgpio.py b/examples/sysfsgpio/sysfsgpio.py index 98acf0ac1..a44a7b9bb 100644 --- a/examples/sysfsgpio/sysfsgpio.py +++ b/examples/sysfsgpio/sysfsgpio.py @@ -13,11 +13,12 @@ StepLogger.start() t = Target("main") -r = SysfsGPIO(t, name=None, index=60) +r = SysfsGPIO(t, name=None, index=60, active_low=True) d = GpioDigitalOutputDriver(t, name=None) p = t.get_driver("DigitalOutputProtocol") print(t.resources) +print("Testing IO") p.set(True) print(p.get()) time.sleep(2) @@ -26,3 +27,35 @@ time.sleep(2) p.set(True) print(p.get()) +time.sleep(2) +p.invert() +print(p.get()) +time.sleep(2) +p.invert() +print(p.get()) +time.sleep(2) + +print("Testing Power") +p.off() +print(p.get()) +time.sleep(2) +p.on() +print(p.get()) +time.sleep(2) +p.cycle() +print(p.get()) +time.sleep(2) + +print("Testing Button") +p.release() +print(p.get()) +time.sleep(2) +p.press() +print(p.get()) +time.sleep(2) +p.release() +print(p.get()) +time.sleep(2) +p.press_for() +print(p.get()) + diff --git a/examples/sysfsgpio/sysfsgpio_remote.py b/examples/sysfsgpio/sysfsgpio_remote.py index 4b16b8466..55357e316 100644 --- a/examples/sysfsgpio/sysfsgpio_remote.py +++ b/examples/sysfsgpio/sysfsgpio_remote.py @@ -15,6 +15,7 @@ p = t.get_driver("DigitalOutputProtocol") print(t.resources) +print("Testing IO") p.set(True) print(p.get()) time.sleep(2) @@ -23,3 +24,35 @@ time.sleep(2) p.set(True) print(p.get()) +time.sleep(2) +p.invert() +print(p.get()) +time.sleep(2) +p.invert() +print(p.get()) +time.sleep(2) + +print("Testing Power") +p.off() +print(p.get()) +time.sleep(2) +p.on() +print(p.get()) +time.sleep(2) +p.cycle() +print(p.get()) +time.sleep(2) + +print("Testing Button") +p.release() +print(p.get()) +time.sleep(2) +p.press() +print(p.get()) +time.sleep(2) +p.release() +print(p.get()) +time.sleep(2) +p.press_for() +print(p.get()) + diff --git a/labgrid/driver/__init__.py b/labgrid/driver/__init__.py index 721256bbf..94136d5ef 100644 --- a/labgrid/driver/__init__.py +++ b/labgrid/driver/__init__.py @@ -16,6 +16,8 @@ DigitalOutputPowerDriver, YKUSHPowerDriver, \ USBPowerDriver, SiSPMPowerDriver, NetworkPowerDriver, \ PDUDaemonDriver +from .buttondriver import ManualButtonDriver, ExternalButtonDriver, \ + DigitalOutputButtonDriver from .usbloader import MXSUSBDriver, IMXUSBDriver, BDIMXUSBDriver, RKUSBDriver, UUUDriver from .usbsdmuxdriver import USBSDMuxDriver from .usbsdwiredriver import USBSDWireDriver @@ -26,7 +28,7 @@ from .sigrokdriver import SigrokDriver, SigrokPowerDriver, SigrokDmmDriver from .usbstoragedriver import USBStorageDriver, NetworkUSBStorageDriver, Mode from .resetdriver import DigitalOutputResetDriver -from .gpiodriver import GpioDigitalOutputDriver +from .gpiodriver import LibGPIODigitalOutputDriver, GpioDigitalOutputDriver from .filedigitaloutput import FileDigitalOutputDriver from .serialdigitaloutput import SerialPortDigitalOutputDriver from .xenadriver import XenaDriver diff --git a/labgrid/driver/buttondriver.py b/labgrid/driver/buttondriver.py new file mode 100644 index 000000000..066c688f6 --- /dev/null +++ b/labgrid/driver/buttondriver.py @@ -0,0 +1,107 @@ +import shlex +import time +import math +from importlib import import_module + +import attr + +from ..factory import target_factory +from ..protocol import ButtonProtocol, DigitalOutputProtocol +from ..step import step +from ..util.proxy import proxymanager +from ..util.helper import processwrapper +from .common import Driver +from .exception import ExecutionError + + +@target_factory.reg_driver +@attr.s(eq=False) +class ManualButtonDriver(Driver, ButtonProtocol): + """ManualButtonDriver - Driver to tell the user to control a target's button""" + + @Driver.check_active + @step() + def press(self): + self.target.interact( + f"Press and hold the button on target {self.target.name} and press enter" + ) + + @Driver.check_active + @step() + def release(self): + self.target.interact( + f"Release the button on the target {self.target.name} press enter" + ) + + @Driver.check_active + @step() + def press_for(self): + self.target.interact( + f"Press and then Release the button on target {self.target.name} for {self.delay} seconds and press enter" + ) + +@target_factory.reg_driver +@attr.s(eq=False) +class ExternalButtonDriver(Driver, ButtonProtocol): + """ExternalButtonDriver - Driver using an external command to control a target's button""" + cmd_press = attr.ib(validator=attr.validators.instance_of(str)) + cmd_release = attr.ib(validator=attr.validators.instance_of(str)) + cmd_press_for = attr.ib(validator=attr.validators.instance_of(str)) + delay = attr.ib(default=1.0, validator=attr.validators.instance_of(float)) + + @Driver.check_active + @step() + def press(self): + cmd = shlex.split(self.cmd_press) + processwrapper.check_output(cmd) + + @Driver.check_active + @step() + def release(self): + cmd = shlex.split(self.cmd_release) + processwrapper.check_output(cmd) + + @Driver.check_active + @step() + def press_for(self): + if self.cmd_press_for is not None: + cmd = shlex.split(self.cmd_press_for) + processwrapper.check_output(cmd) + else: + self.press() + time.sleep(self.delay) + self.release() + +@target_factory.reg_driver +@attr.s(eq=False) +class DigitalOutputButtonDriver(Driver, ButtonProtocol): + """ + DigitalOutputButtonDriver uses a DigitalOutput to control a button + """ + bindings = {"output": DigitalOutputProtocol, } + delay = attr.ib(default=1.0, validator=attr.validators.instance_of(float)) + + def __attrs_post_init__(self): + super().__attrs_post_init__() + + @Driver.check_active + @step() + def press(self): + self.output.set(True) + + @Driver.check_active + @step() + def release(self): + self.output.set(False) + + @Driver.check_active + @step() + def press_for(self): + self.press() + time.sleep(self.delay) + self.release() + + @Driver.check_active + @step() + def get(self): + return self.output.get() diff --git a/labgrid/driver/gpiodriver.py b/labgrid/driver/gpiodriver.py index 2de90618e..aa965d2ed 100644 --- a/labgrid/driver/gpiodriver.py +++ b/labgrid/driver/gpiodriver.py @@ -1,9 +1,11 @@ """All GPIO-related drivers""" import attr +import time +import gpiod from ..factory import target_factory -from ..protocol import DigitalOutputProtocol -from ..resource.remote import NetworkSysfsGPIO +from ..protocol import DigitalOutputProtocol, ResetProtocol, PowerProtocol, ButtonProtocol +from ..resource.remote import NetworkLibGPIO, NetworkSysfsGPIO from ..step import step from .common import Driver from ..util.agentwrapper import AgentWrapper @@ -11,11 +13,109 @@ @target_factory.reg_driver @attr.s(eq=False) -class GpioDigitalOutputDriver(Driver, DigitalOutputProtocol): +class LibGPIODigitalOutputDriver(Driver, DigitalOutputProtocol, ResetProtocol, PowerProtocol, ButtonProtocol): + + bindings = { + "gpio": {"LibGPIO", "NetworkLibGPIO"}, + } + delay = attr.ib(default=1.0, validator=attr.validators.instance_of(float)) + + def __attrs_post_init__(self): + super().__attrs_post_init__() + + def on_activate(self): + if isinstance(self.gpio, NetworkLibGPIO): + host = self.gpio.host + else: + host = None + + if not gpiod.is_gpiochip_device(self.gpio.gpiochip): + raise ValueError(f'{self.gpio.gpiochip} is not a valid gpiochip') + try: + self.request=gpiod.request_lines(self.gpio.gpiochip, + consumer="labgrid", + config={ + self.gpio.line: gpiod.LineSettings( + direction=gpiod.line.Direction.AS_IS, + active_low=self.gpio.active_low + ) + }, + ) + except Exception as e: + raise type(e)(f'{self.gpio.gpiochip} {self.gpio.line}: {str(e)}') + + line_value=self.request.get_value(self.gpio.line) + self.request.reconfigure_lines(config={ + self.gpio.line: gpiod.LineSettings( + direction=gpiod.line.Direction.OUTPUT, + output_value=line_value, + active_low=self.gpio.active_low)}) + + def on_deactivate(self): + self.request.release() + + @Driver.check_active + @step(args=['status']) + def set(self, status): + self.request.set_value(self.gpio.line, gpiod.line.Value(status)) + + @Driver.check_active + @step(result=True) + def get(self): + return self.request.get_value(self.gpio.line) + + @Driver.check_active + @step(result=True) + def invert(self): + self.set(not self.get()) + + @Driver.check_active + @step(result=True) + def reset(self): + self.cycle() + + @Driver.check_active + @step(result=True) + def on(self): + self.set(True) + + @Driver.check_active + @step(result=True) + def off(self): + self.set(False) + + @Driver.check_active + @step(result=True) + def cycle(self): + self.off() + time.sleep(self.delay) + self.on() + + @Driver.check_active + @step(result=True) + def press(self): + self.set(True) + + @Driver.check_active + @step(result=True) + def release(self): + self.set(False) + + @Driver.check_active + @step(result=True) + def press_for(self): + self.press() + time.sleep(self.delay) + self.release() + +@target_factory.reg_driver +@attr.s(eq=False) +class GpioDigitalOutputDriver(Driver, DigitalOutputProtocol, ResetProtocol, PowerProtocol, ButtonProtocol): bindings = { "gpio": {"SysfsGPIO", "NetworkSysfsGPIO"}, } + delay = attr.ib(default=1.0, validator=attr.validators.instance_of(float)) def __attrs_post_init__(self): super().__attrs_post_init__() @@ -37,9 +137,53 @@ def on_deactivate(self): @Driver.check_active @step(args=['status']) def set(self, status): - self.proxy.set(self.gpio.index, status) + self.proxy.set(self.gpio.index, self.gpio.active_low, status) @Driver.check_active @step(result=True) def get(self): - return self.proxy.get(self.gpio.index) + return self.proxy.get(self.gpio.index, self.gpio.active_low) + + @Driver.check_active + @step(result=True) + def invert(self): + self.set(not self.get()) + + @Driver.check_active + @step(result=True) + def reset(self): + self.cycle() + + @Driver.check_active + @step(result=True) + def on(self): + self.set(True) + + @Driver.check_active + @step(result=True) + def off(self): + self.set(False) + + @Driver.check_active + @step(result=True) + def cycle(self): + self.off() + time.sleep(self.delay) + self.on() + + @Driver.check_active + @step(result=True) + def press(self): + self.set(True) + + @Driver.check_active + @step(result=True) + def release(self): + self.set(False) + + @Driver.check_active + @step(result=True) + def press_for(self): + self.press() + time.sleep(self.delay) + self.release() diff --git a/labgrid/protocol/__init__.py b/labgrid/protocol/__init__.py index 0ac225622..749539c81 100644 --- a/labgrid/protocol/__init__.py +++ b/labgrid/protocol/__init__.py @@ -3,6 +3,7 @@ from .consoleprotocol import ConsoleProtocol from .linuxbootprotocol import LinuxBootProtocol from .powerprotocol import PowerProtocol +from .buttonprotocol import ButtonProtocol from .filetransferprotocol import FileTransferProtocol from .infoprotocol import InfoProtocol from .digitaloutputprotocol import DigitalOutputProtocol diff --git a/labgrid/protocol/buttonprotocol.py b/labgrid/protocol/buttonprotocol.py new file mode 100644 index 000000000..abc4a292e --- /dev/null +++ b/labgrid/protocol/buttonprotocol.py @@ -0,0 +1,25 @@ +import abc + + +class ButtonProtocol(abc.ABC): + """Abstract class providing the ButtonProtocol interface""" + + @abc.abstractmethod + def press(self): + """Implementations should "press and hold" the button.""" + raise NotImplementedError + + @abc.abstractmethod + def release(self): + """Implementations should "release" the button""" + raise NotImplementedError + + @abc.abstractmethod + def press_for(self, time: float): + """Implementations should "press" the button for time seconds and then "release" the button again""" + raise NotImplementedError + + @abc.abstractmethod + def get(self): + """Implementations should return the status of the button""" + raise NotImplementedError diff --git a/labgrid/protocol/digitaloutputprotocol.py b/labgrid/protocol/digitaloutputprotocol.py index 6b1ce7e23..eec3f6b45 100644 --- a/labgrid/protocol/digitaloutputprotocol.py +++ b/labgrid/protocol/digitaloutputprotocol.py @@ -13,3 +13,7 @@ def get(self): def set(self, status): """Implementations should set the status of the digital output""" raise NotImplementedError + + @abc.abstractmethod + def invert(self): + """Implementations should invert the the status of the digital output""" diff --git a/labgrid/remote/client.py b/labgrid/remote/client.py index 5ab4f0683..18eab13c0 100755 --- a/labgrid/remote/client.py +++ b/labgrid/remote/client.py @@ -852,7 +852,7 @@ def power(self): name = self.args.name target = self._get_target(place) from ..resource.power import NetworkPowerPort, PDUDaemonPort - from ..resource.remote import NetworkUSBPowerPort, NetworkSiSPMPowerPort + from ..resource.remote import NetworkUSBPowerPort, NetworkSiSPMPowerPort, NetworkLibGPIO, NetworkSysfsGPIO from ..resource import TasmotaPowerPort, NetworkYKUSHPowerPort drv = None @@ -874,6 +874,10 @@ def power(self): drv = self._get_driver_or_new(target, "TasmotaPowerDriver", name=name) elif isinstance(resource, NetworkYKUSHPowerPort): drv = self._get_driver_or_new(target, "YKUSHPowerDriver", name=name) + elif isinstance(resource, NetworkLibGPIO): + drv = self._get_driver_or_new(target, "LibGPIODigitalOutputDriver", name=name) + elif isinstance(resource, NetworkSysfsGPIO): + drv = self._get_driver_or_new(target, "GpioDigitalOutputDriver", name=name) if drv: break @@ -885,13 +889,41 @@ def power(self): if action == "get": print(f"power{' ' + name if name else ''} for place {place.name} is {'on' if res else 'off'}") + def button(self): + place = self.get_acquired_place() + action = self.args.action + delay = self.args.delay + name = self.args.name + target = self._get_target(place) + from ..resource.remote import NetworkLibGPIO, NetworkSysfsGPIO + + drv = None + try: + drv = target.get_driver("ButtonProtocol", name=name) + except NoDriverFoundError: + for resource in target.resources: + if isinstance(resource, NetworkLibGPIO): + drv = self._get_driver_or_new(target, "LibGPIODigitalOutputDriver", name=name) + elif isinstance(resource, NetworkSysfsGPIO): + drv = self._get_driver_or_new(target, "GpioDigitalOutputDriver", name=name) + if drv: + break + + if not drv: + raise UserError("target has no compatible resource available") + if delay is not None: + drv.delay = delay + res = getattr(drv, action)() + if action == "get": + print(f"button{' ' + name if name else ''} for place {place.name} is {'pressed' if res else 'released'}") + def digital_io(self): place = self.get_acquired_place() action = self.args.action name = self.args.name target = self._get_target(place) from ..resource import ModbusTCPCoil, OneWirePIO, HttpDigitalOutput - from ..resource.remote import NetworkDeditecRelais8, NetworkSysfsGPIO, NetworkLXAIOBusPIO, NetworkHIDRelay + from ..resource.remote import NetworkDeditecRelais8, NetworkLibGPIO, NetworkSysfsGPIO, NetworkLXAIOBusPIO, NetworkHIDRelay drv = None try: @@ -906,6 +938,8 @@ def digital_io(self): drv = self._get_driver_or_new(target, "HttpDigitalOutputDriver", name=name) elif isinstance(resource, NetworkDeditecRelais8): drv = self._get_driver_or_new(target, "DeditecRelaisDriver", name=name) + elif isinstance(resource, NetworkLibGPIO): + drv = self._get_driver_or_new(target, "LibGPIODigitalOutputDriver", name=name) elif isinstance(resource, NetworkSysfsGPIO): drv = self._get_driver_or_new(target, "GpioDigitalOutputDriver", name=name) elif isinstance(resource, NetworkLXAIOBusPIO): @@ -923,6 +957,8 @@ def digital_io(self): drv.set(True) elif action == "low": drv.set(False) + elif action == "invert": + drv.invert() async def _console(self, place, target, timeout, *, logfile=None, loop=False, listen_only=False): name = self.args.name @@ -1807,8 +1843,16 @@ def main(): subparser.add_argument("--name", "-n", help="optional resource name") subparser.set_defaults(func=ClientSession.power) + subparser = subparsers.add_parser("button", help="change (or get) a place's button status") + subparser.add_argument("action", choices=["press", "release", "press_for", "get"]) + subparser.add_argument( + "-t", "--delay", type=float, default=None, help="wait time in seconds between the press and release during press_for" + ) + subparser.add_argument("--name", "-n", help="optional resource name") + subparser.set_defaults(func=ClientSession.button) + subparser = subparsers.add_parser("io", help="change (or get) a digital IO status") - subparser.add_argument("action", choices=["high", "low", "get"], help="action") + subparser.add_argument("action", choices=["high", "low", "invert", "get"], help="action") subparser.add_argument("name", help="optional resource name", nargs="?") subparser.set_defaults(func=ClientSession.digital_io) diff --git a/labgrid/remote/exporter.py b/labgrid/remote/exporter.py index 7831ef8a7..4b4f3e250 100755 --- a/labgrid/remote/exporter.py +++ b/labgrid/remote/exporter.py @@ -609,6 +609,31 @@ def _get_params(self): exports["SNMPEthernetPort"] = EthernetPortExport +@attr.s(eq=False) +class LibGPIOExport(ResourceExport): + + """ResourceExport for libgpiod based GPIO lines accessed directly from userspace""" + + def __attrs_post_init__(self): + super().__attrs_post_init__() + from ..resource.base import LibGPIO + + self.local = LibGPIO(target=None, name=None, **self.local_params) + self.data["cls"] = "NetworkLibGPIO" + + def _get_params(self): + """Helper function to return parameters""" + return { + "host": self.host, + "gpiochip": self.local.gpiochip, + "line": self.local.line, + "active_low": self.local.active_low, + } + +exports["LibGPIO"] = LibGPIOExport +exports["MatchedLibGPIO"] = LibGPIOExport + + @attr.s(eq=False) class GPIOSysFSExport(ResourceExport): _gpio_sysfs_path_prefix = "/sys/class/gpio" @@ -634,16 +659,19 @@ def _get_params(self): return { "host": self.host, "index": self.local.index, + "active_low": self.local.active_low, } def _get_start_params(self): return { "index": self.local.index, + "active_low": self.local.active_low, } def _start(self, start_params): """Start a GPIO export to userspace""" index = start_params["index"] + active_low = start_params["active_low"] if self.export_path.exists(): self.system_exported = True @@ -653,6 +681,11 @@ def _start(self, start_params): with open(export_sysfs_path, mode="wb") as export: export.write(str(index).encode("utf-8")) + #active_low_path = os.path.join(GPIOSysFSExport._gpio_sysfs_path_prefix, + # f'gpio{index}/active_low') + #with open(active_low_path, 'wb') as actvie_low_fd: + # active_low_fs.write(active_low) + def _stop(self, start_params): """Disable a GPIO export to userspace""" index = start_params["index"] diff --git a/labgrid/resource/__init__.py b/labgrid/resource/__init__.py index dd7554dff..c7b0ed5a7 100644 --- a/labgrid/resource/__init__.py +++ b/labgrid/resource/__init__.py @@ -1,4 +1,4 @@ -from .base import SerialPort, NetworkInterface, EthernetPort, SysfsGPIO +from .base import SerialPort, NetworkInterface, EthernetPort, LibGPIO, SysfsGPIO from .ethernetport import SNMPEthernetPort from .serialport import RawSerialPort, NetworkSerialPort from .modbus import ModbusTCPCoil @@ -15,6 +15,7 @@ HIDRelay, IMXUSBLoader, LXAUSBMux, + MatchedLibGPIO, MatchedSysfsGPIO, MXSUSBLoader, RKUSBLoader, diff --git a/labgrid/resource/base.py b/labgrid/resource/base.py index d8cdb984c..7cea97eb9 100644 --- a/labgrid/resource/base.py +++ b/labgrid/resource/base.py @@ -35,6 +35,21 @@ class EthernetPort(Resource): switch = attr.ib(default=None) interface = attr.ib(default=None) +@target_factory.reg_resource +@attr.s(eq=False) +class LibGPIO(Resource): + """The basic LibGPIO describes a gpiochip, it's line and active_low setting + + Args: + gpiochip (str): name of the gpiochip device. + line (int): line number on the gpiochip. + active_low (bool) : set to True if active_low should be used. Default False""" + gpiochip = attr.ib(default="/dev/gpiochip0", + validator=attr.validators.instance_of(str)) + line = attr.ib(default=None, + validator=attr.validators.instance_of(int)) + active_low = attr.ib(default=False, + validator=attr.validators.instance_of(bool)) @target_factory.reg_resource @attr.s(eq=False) @@ -42,5 +57,8 @@ class SysfsGPIO(Resource): """The basic SysfsGPIO contains an index Args: - index (int): index of target gpio line.""" + index (int): index of target gpio line. + active_low (bool) : set to True if active_low should be used. Default False""" index = attr.ib(default=None, validator=attr.validators.instance_of(int)) + active_low = attr.ib(default=False, + validator=attr.validators.instance_of(bool)) diff --git a/labgrid/resource/remote.py b/labgrid/resource/remote.py index b8adb2524..e70e50df1 100644 --- a/labgrid/resource/remote.py +++ b/labgrid/resource/remote.py @@ -324,14 +324,31 @@ def __attrs_post_init__(self): self.timeout = 10.0 super().__attrs_post_init__() +@target_factory.reg_resource +@attr.s(eq=False) +class NetworkLibGPIO(NetworkResource, ManagedResource): + manager_cls = RemotePlaceManager + + """The NetworkLibGPIO describes a remotely accessible gpio line using libgpiod""" + gpiochip = attr.ib(default="/dev/gpiochip0", + validator=attr.validators.instance_of(str)) + line = attr.ib(default=None, + validator=attr.validators.instance_of(int)) + active_low = attr.ib(default=False, + validator=attr.validators.instance_of(bool)) + + def __attrs_post_init__(self): + self.timeout = 10.0 + super().__attrs_post_init__() @target_factory.reg_resource @attr.s(eq=False) class NetworkSysfsGPIO(NetworkResource, ManagedResource): manager_cls = RemotePlaceManager - """The NetworkSysfsGPIO describes a remotely accessible gpio line""" + """The NetworkSysfsGPIO describes a remotely accessible gpio line using sysfs (deprecated)""" index = attr.ib(validator=attr.validators.optional(attr.validators.instance_of(int))) + active_low = attr.ib(default=False, validator=attr.validators.instance_of(bool)) def __attrs_post_init__(self): self.timeout = 10.0 super().__attrs_post_init__() diff --git a/labgrid/resource/suggest.py b/labgrid/resource/suggest.py index 707779bf8..a2a7c65dd 100644 --- a/labgrid/resource/suggest.py +++ b/labgrid/resource/suggest.py @@ -23,6 +23,7 @@ HIDRelay, USBDebugger, USBPowerPort, + MatchedLibGPIO, MatchedSysfsGPIO ) from ..util import dump @@ -57,6 +58,7 @@ def __init__(self, args): self.resources.append(HIDRelay(**args)) self.resources.append(USBDebugger(**args)) self.resources.append(USBPowerPort(**args, index=0)) + self.resources.append(MatchedLibGPIO(**args, line=0)) self.resources.append(MatchedSysfsGPIO(**args, pin=0)) def suggest_callback(self, resource, meta, suggestions): @@ -86,6 +88,8 @@ def suggest_callback(self, resource, meta, suggestions): )) if cls == 'USBPowerPort': print(' index: ?') + if cls == 'MatchedLibGPIO': + print(' line: ?') if cls == 'MatchedSysfsGPIO': print(' pin: ?') print(" ---") diff --git a/labgrid/resource/udev.py b/labgrid/resource/udev.py index eb553cfb2..d23cbcf09 100644 --- a/labgrid/resource/udev.py +++ b/labgrid/resource/udev.py @@ -752,14 +752,44 @@ def filter_match(self, device): return super().filter_match(device) +@target_factory.reg_resource +@attr.s(eq=False) +class MatchedLibGPIO(USBResource): + """The MatchedLibGPIO described a gpiochip matched by Udev. + + Args: + line (int): gpio pin number within the matched gpiochip. + active_low (bool): set to True if active_low should be used. Defalut False""" + line = attr.ib(default=None, validator=attr.validators.instance_of(int)) + active_low = attr.ib(default=False, validator=attr.validators.instance_of(bool)) + + def __attrs_post_init__(self): + self.match['SUBSYSTEM'] = 'gpio' + super().__attrs_post_init__() + + def filter_match(self, device): + # Filter out the char device + if device.properties.get('DEVNAME') is not None: + return False + return super().filter_match(device) + + def update(self): + super().update() + if self.device is not None: + if self.line >= int(self.read_attr('ngpio')): + raise ValueError("MatchedLibGPIO line out of bound") + @target_factory.reg_resource @attr.s(eq=False) class MatchedSysfsGPIO(USBResource): - """The MatchedSysfsGPIO described a SysfsGPIO matched by Udev + """The MatchedSysfsGPIO described a SysfsGPIO matched by Udev. SysfsGPIO + is deprecated. Please use MatchedLibGPIO instead. Args: - pin (int): gpio pin number within the matched gpiochip.""" + pin (int): gpio pin number within the matched gpiochip. + active_low (bool): set to True if active_low should be used. Defalut False""" pin = attr.ib(default=None, validator=attr.validators.instance_of(int)) + active_low = attr.ib(default=False, validator=attr.validators.instance_of(bool)) index = None def __attrs_post_init__(self): diff --git a/labgrid/util/agents/sysfsgpio.py b/labgrid/util/agents/sysfsgpio.py index 362eab5ce..dcb6d62ee 100644 --- a/labgrid/util/agents/sysfsgpio.py +++ b/labgrid/util/agents/sysfsgpio.py @@ -2,9 +2,11 @@ This module implements switching GPIOs via sysfs GPIO kernel interface. Takes an integer property 'index' which refers to the already exported GPIO device. +Takes an boolean property 'active_low' which inverts logical values if set to True """ import logging +import warnings import os class GpioDigitalOutput: @@ -23,7 +25,7 @@ def _assert_gpio_line_is_exported(index): if not os.path.exists(gpio_sysfs_path): raise ValueError("Device not found") - def __init__(self, index): + def __init__(self, index, active_low): self._logger = logging.getLogger("Device: ") GpioDigitalOutput._assert_gpio_line_is_exported(index) gpio_sysfs_path = os.path.join(GpioDigitalOutput._gpio_sysfs_path_prefix, @@ -40,6 +42,10 @@ def __init__(self, index): gpio_sysfs_value_path = os.path.join(gpio_sysfs_path, 'value') self.gpio_sysfs_value_fd = os.open(gpio_sysfs_value_path, flags=(os.O_RDWR | os.O_SYNC)) + gpio_sysfs_active_low_path = os.path.join(gpio_sysfs_path, 'active_low') + with open(gpio_sysfs_active_low_path, 'w') as active_low_fd: + active_low_fd.write(str(int(active_low))) + def __del__(self): os.close(self.gpio_sysfs_value_fd) self.gpio_sysfs_value_fd = None @@ -69,18 +75,26 @@ def set(self, status): _gpios = {} -def _get_gpio_line(index): +def _get_gpio_line(index, active_low): if index not in _gpios: - _gpios[index] = GpioDigitalOutput(index=index) + _gpios[index] = GpioDigitalOutput(index=index, active_low=active_low) return _gpios[index] -def handle_set(index, status): - gpio_line = _get_gpio_line(index) +def handle_set(index, active_low, status): + warnings.warn( + "SysfsGPIO has been deprecated. Please use LibGPIO. See https://www.kernel.org/doc/Documentation/gpio/sysfs.txt", + DeprecationWarning, + ) + gpio_line = _get_gpio_line(index, active_low) gpio_line.set(status) -def handle_get(index): - gpio_line = _get_gpio_line(index) +def handle_get(index, active_low): + warnings.warn( + "SysfsGPIO has been deprecated. Please use LibGPIO. See https://www.kernel.org/doc/Documentation/gpio/sysfs.txt", + DeprecationWarning, + ) + gpio_line = _get_gpio_line(index, active_low) return gpio_line.get() diff --git a/pyproject.toml b/pyproject.toml index 33c3a41fa..ad54f0eb1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,7 @@ classifiers = [ dependencies = [ "ansicolors>=1.1.8", "attrs>=21.4.0", + "gpiod>=2.2.3", "grpcio>=1.64.1, <2.0.0", "grpcio-reflection>=1.64.1, <2.0.0", "protobuf>=5.27.0",