Skip to content

Commit

Permalink
Merge pull request #21 from byuccl/netbooter
Browse files Browse the repository at this point in the history
Netbooter
  • Loading branch information
jgoeders authored Jun 20, 2024
2 parents 76b4c86 + 4f30c55 commit 574ba27
Show file tree
Hide file tree
Showing 6 changed files with 256 additions and 56 deletions.
2 changes: 1 addition & 1 deletion Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ Repository for python code used to control lab test and measurement equipment.
The following equipment is supported in this repository:

* Keysite Oscilliscope
* [Synaccess NetBooter](./Pyswitch/Readme.md)
* [Synaccess NetBooter](./pdu/README.md)

2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
setup(
name="yinstruments",
packages=find_packages(),
version="1.3.4",
version="1.4.0",
description="Experiment device control scripts for BYU's Configurable Computing Lab (https://ccl.byu.edu/)",
author="Jeff Goeders",
author_email="[email protected]",
Expand Down
5 changes: 3 additions & 2 deletions yinstruments/pdu/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ classes have the following callable functions: get_status, reboot, off, and on.

To instance a version of your pdu in a different file, add the following code:

from yinstruments.yinstruments.pdu.lindy import Lindy
from yinstruments.yinstruments.pdu.netbooter import Netbooter
from yinstruments.pdu.lindy import Lindy
from yinstruments.pdu.netbooter import Netbooter


Example of calling an instance of a pdu:

Expand Down
203 changes: 154 additions & 49 deletions yinstruments/pdu/netbooter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""This file contians the Netbooter class which inherits
from the PDU class"""
""" This file contians the Netbooter class which inherits
from the PDU class.
"""

import telnetlib
import time
Expand All @@ -8,69 +10,172 @@


class Netbooter(PDU):
"""This is the Netbooter class"""

def __init__(self, ip_address, port, timeout=3.0):
super().__init__(ip_address, port)
"""This is the Netbooter class
List of commands available to the Netbooter
>help
ip Sets static IP addr. "ip xx.xx.xx.xx"
gw Sets Static gateway IP.
mask Sets Static network mask.
dhcp v Sets IP in static or DHCP mode. "off"-Static. "on"-DHCP.
emailsend Sends a test mail.
hp Sets HTTP port #.
tp Sets TELNET port #.
help or ? Displays Help menu.
login Enters user login.
logout Exits current login.
mac Displays Ethernet port Mac address.
nwset Restarts Ethernet network interface.
nwshow Displays network Status.
lc Turns an outlet ON/OFF with loop control. See Help in web.
ping Pings a host. E.g.: ping 192.168.0.1, or ping yahoo.com.
pset n v Sets outlet #n to v(value 1-on,0-off).
gpset n v Sets outlet group #n to v(value 1-on,0-off).
ps v Sets all power outlets to v(value 1-on,0-off).
pshow Displays outlet status.
reset Reloads default settings.
rb n Reboots outlet #n.
grb n Reboots outlet group #n.
sysshow Displays system information.
time Displays current time.
ver Displays hardware and software versions.
web v Turns Web access ON/OFF. "1"=ON. "0"-OFF.
"""

DEFAULT_NETBOOTER_PORT = 23
DEFAULT_TIMEOUT_TIME = 3.0 # 3 seconds
DEFAULT_NETBOOTER_DELAY = 0.05 # 50 ms

def __init__(
self,
ip_address,
port=DEFAULT_NETBOOTER_PORT,
timeout=PDU.DEFAULT_TIMEOUT_TIME,
command_delay=DEFAULT_NETBOOTER_DELAY,
):
"""Netbooter constructor.
ip_address: IP address of the netbooter
port: the TCP port used for the telnet session."""
super().__init__(ip_address, port, timeout, command_delay)
self.telnet = None

def __str__(self):
return f"{self.ip_address}:{self.port}"

def reboot(self, port_num):
telnet = telnetlib.Telnet(self.ip_address, self.port, timeout=self.timeout)

string = telnet.read_some()
time.sleep(self.sleep_time)

string = ("rb " + str(port_num)).encode("ascii") + b"\r\n\r\n"
telnet.write(string)
time.sleep(self.sleep_time)
telnet.close()

def on(self, port_num):
telnet = telnetlib.Telnet(self.ip_address, self.port, timeout=self.timeout)

string = telnet.read_some()
time.sleep(self.sleep_time)

string = ("pset " + str(port_num) + " 1").encode("ascii") + b"\r\n\r\n"
telnet.write(string)
def create_telnet_session(self):
"""Creates a telnet session to the Netbooter."""
self.telnet = telnetlib.Telnet(self.ip_address, self.port, timeout=self.timeout)
return self.telnet

def close_telnet_session(self):
"""Close Netbooter telnet session."""
if self.telnet is not None:
self.telnet.close()

def read_some(self):
"""Read from the Netbooter."""
if self.telnet is None:
return None
string = self.telnet.read_some()
# Short time needed before next command
time.sleep(self.sleep_time)
telnet.close()

def off(self, port_num):
telnet = telnetlib.Telnet(self.ip_address, self.port, timeout=self.timeout)

string = telnet.read_some()

time.sleep(self.sleep_time)

string = ("pset " + str(port_num) + " 0").encode("ascii") + b"\r\n\r\n"
telnet.write(string)
time.sleep(self.sleep_time)
telnet.close()

def get_status(self):
telnet = telnetlib.Telnet(self.ip_address, self.port, timeout=self.timeout)
return string

string = telnet.read_some()
def write(self, command):
"""Write data to the Netbooter."""
if self.telnet is None:
return
self.telnet.write(command)
# Short time needed before next command
time.sleep(self.sleep_time)

string = "pshow".encode("ascii") + b"\r\n"
telnet.write(string)
time.sleep(self.sleep_time)
def encode_command(self, cmd: str):
return cmd.encode("ascii") + b"\r\n\r\n"

def encode_request(self, req: str):
return req.encode("ascii") + b"\r\n"

def send_command(self, cmd: str):
"""Send an aribitray command to the Netbooter."""
self.create_telnet_session()
string = self.read_some()
string = self.encode_command(cmd)
self.write(string)
self.close_telnet_session()

def request_response(self, req: str):
"""Send a request for a response from the Netbooter. Returns an array of strings.
The initial and ending prompt are removed. In addition, the response is split into
strings based on line termination."""
self.create_telnet_session()
string = self.read_some()
string = self.encode_request(req)
# Send request
self.write(string)
string = ""
while True:
text = telnet.read_eager()
text = self.telnet.read_eager()
string += text.decode()
if len(text) == 0:
break
self.close_telnet_session()
# Remove the initial prompt "\r\n>" from string
string = string[3:]
# Remove the command and the following "\n\r" from teh string
chars_to_remove = len(req) + 2
string = string[chars_to_remove:]
# Remove the ending prompt "\r\n>" from string
string = string[:-3]
# Split response into strings.
strings = string.split("\n\r")
return strings

telnet.close()
# returns a organized graphic of the ports and the status of the ports
return string
def reboot(self, port_num):
"""Issue 'reboot' command to a Netbooter port."""
self.send_command("rb " + str(port_num))

def on(self, port_num):
"""Turn on a Netbooter port."""
self.send_command("pset " + str(port_num) + " 1")

def off(self, port_num):
"""Turn off a Netbooter port."""
self.send_command("pset " + str(port_num) + " 0")

def get_status(self):
"""Executes the status command and resturns the string output."""
return self.request_response("pshow")

def get_port_status(self):
"""Returns a dictionary between the port number (int) and a boolean (True=ON, False = Off)"""
status_str = self.get_status()
status = {}
for port_status_str in status_str:
status_tuple = self._str_to_port_status(port_status_str)
if status_tuple is not None:
status[status_tuple[0]] = status_tuple[1]
return status

def _str_to_port_status(self, status_str):
"""Parses a status string and returns the tuple (port:int,status:Boolean)
Returns None if the string doesn't match"""
# Example String
# 1 | ZCU102 | ON |
status_re = r"\s+(\d+)\s+\|.+\|\s+(\w+).+"
match = re.match(status_re, status_str)
if match:
port_num = int(match.group(1))
if match.group(2) == "ON":
value = True
else:
value = False
return (port_num, value)
return None

def is_on(self, port_num):
"""Working?"""
text = self.get_status()
lines = text.splitlines()

Expand Down
11 changes: 9 additions & 2 deletions yinstruments/pdu/pdu.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,17 @@ class PDUType(Enum):
class PDU:
"""Generic class for PDU"""

DEFAULT_TIMEOUT_TIME = 3.0 # 3 seconds
DEFAULT_COMMAND_DELAY = (
1.0 # This is the original delay that may be needed by the Lindy. It is probably excessive
)

# initializes your PDU with callable characteristics
@abstractmethod
def __init__(self, ip_address, port, timeout=3.0):
self.sleep_time = 1.0
def __init__(
self, ip_address, port, timeout=DEFAULT_TIMEOUT_TIME, command_delay=DEFAULT_COMMAND_DELAY
):
self.sleep_time = command_delay
self.timeout = timeout
self.ip_address = ip_address
self.port = port
Expand Down
89 changes: 88 additions & 1 deletion yinstruments/usb_finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,95 @@
import pyudev


class USBTTYDevice:
"""Represents a USB TTY device as captured by the 'udevadm' Linux command."""

def __init__(self, roothub_num, phys_port, configuration, interface, device_path):
"""Initializes the object based on the 'udevadm' path"""
self.roothub_num = roothub_num
self.phys_port = phys_port
self.configuration = configuration
self.interface = interface
self.device_path = device_path


class USBFindError(Exception):
"""Empty exception for USB finding"""

pass


def createUSBTTYDevice(dev_path_str):
"""Creates a USBTTYDevice from the given device string (i.e., /dev/ttyUSB0).
If the device is not a USB tty device then None is returned.
The "udevadm" command is called querying information in the 'udev'
database ('udev' is the Linux subsystem for managing device events like plugging things in).
An example of the command that is used is as follows:
udevadm info -q path -n /dev/ttyUSB0
The "info" command command option provides information about a device in the database (i.e.,
a device that is currently hooked up). The "-q path" option provides the "sysfs" device path
and the -n /dev/ttyUSB0 option specifies the device path to query.
The result of such a command is a string such as the following:
/devices/pci0000:00/0000:00:14.0/usb1/1-10/1-10.1/1-10.1:1.1/ttyUSB3/tty/ttyUSB3
see: http://www.linux-usb.org/FAQ.html#i6
"""

p = subprocess.run(
["udevadm", "info", "-q", "path", "-n", dev_path_str], stdout=subprocess.PIPE
)

USB_DEVICE_REGEX = (
f"\/devices\/.*?\/usb(\d+)\/.+\/(\d+\-\d+\.\d+)\:(\d+)\.(\d+).+\/(tty\w+\d+)$"
)
USB_ROOTHUB_NUM_RE_GROUP = 1
PHYS_PORT_RE_GROUP = 2
CONFIGURATION_RE_GROUP = 3
INTERFACE_RE_GROUP = 4
DEVICE_PATH_RE_GROUP = 5

m = re.match(USB_DEVICE_REGEX, p.stdout.decode())
if not m:
return None
match_rootnubnum = m.group(USB_ROOTHUB_NUM_RE_GROUP)
match_phys_port = m.group(PHYS_PORT_RE_GROUP)
match_configuration = int(m.group(CONFIGURATION_RE_GROUP))
match_interface = int(m.group(INTERFACE_RE_GROUP))
match_device_path = m.group(DEVICE_PATH_RE_GROUP)
return USBTTYDevice(
match_rootnubnum, match_phys_port, match_configuration, match_interface, match_device_path
)


def find_usbtty_devices():
usbtty_devices = {} # dictionary between device file and usb device
# Iterate over all files in /dev file system
for f in Path("/dev").iterdir():
device_str = str(f)
# Only query those that match a 'tty' device and have at least two letters after tty
# (i.e., USB or ACM). Ignore the tty\d+ and \ttyS\d+ entries
if re.match("/dev/tty[a-zA-Z][a-zA-Z]+\d+", device_str):
device = createUSBTTYDevice(device_str)
if device is not None:
usbtty_devices[device_str] = device
return usbtty_devices


def find_dev_file_tty(usb_phys_port, interface=0):
devices = find_usbtty_devices()
for dev_file_str in devices:
device = devices[dev_file_str]
if device.phys_port == usb_phys_port and device.interface == interface:
return dev_file_str
# No match found. Return None
return None


def find_dev_file_ttyUSB(usb_phys_port, interface):
"""This will find the correct /dev/ttyUSBX file for a given physical USB port and interface.
Expand Down Expand Up @@ -40,7 +125,9 @@ def _find_dev_file(usb_phys_port, ttyType, match_str, interface):
match_found = None
p = None
for f in Path("/dev").iterdir():
# Only query those that match the given 'ttyType' (i.e., USB, ACM, etc.)
if re.match("/dev/tty" + ttyType + "\d+", str(f)):
# Run the `udevadm` command for the given matching device
p = subprocess.run(
["udevadm", "info", "-q", "path", "-n", str(f)], stdout=subprocess.PIPE
)
Expand Down Expand Up @@ -74,7 +161,7 @@ def _find_dev_file(usb_phys_port, ttyType, match_str, interface):

def find_dev_file_usb_bus(usb_phys_port):
"""
For a given usb physical port, this function finds the USB bus device file.
This function finds the USB bus device file for a given usb physical port.
For example, a USB device at physical port 1-7.1 may be mapped to bus=3,device=7, which
would mean that the associated usb bus device file would be
Expand Down

0 comments on commit 574ba27

Please sign in to comment.