Skip to content

Commit

Permalink
x411: mpm: add support for GPSDO from USRP N2x0 (JacksonLabs Firefly)
Browse files Browse the repository at this point in the history
The GPSDO has to be configured before use through RS232 with following
commands:

SYST:COMM:SER:ECHO OFF
SYST:COMM:SER:PRO OFF
GPS:GPGGA 1
GPS:GGAST 0
GPS:GPRMC 1
SERV:TRAC 1
SYSTem:COMMunicate:SERial:BAUD 38400

The code is totally not clean, but at this stage it works.
  • Loading branch information
ptrkrysik committed May 23, 2023
1 parent e07a4b0 commit ad3f782
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 17 deletions.
15 changes: 13 additions & 2 deletions mpm/python/usrp_mpm/gpsd_iface.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ def __init__(self):
# Make a socket to connect to GPSD
self.gpsd_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.gpsd_sockfile = self.gpsd_socket.makefile(encoding='ascii')
self.gps_available = False

def __enter__(self):
self.open()
Expand All @@ -155,8 +156,15 @@ def close(self):
def enable_watch(self):
"""Send a WATCH command, which starts operation"""
self.gpsd_socket.sendall(b'?WATCH={"enable":true};')
self.log.trace(self.read_class("DEVICES"))
self.log.trace(self.read_class("WATCH"))
devices = self.read_class("DEVICES")
self.log.info(devices)
self.log.info(self.read_class("WATCH"))
if devices["devices"] is not None:
self.gps_available = True
else:
self.gps_available = False
def is_gps_enabled(self):
return self.gps_available

def poll_request(self, socket_timeout=60, num_retry=10):
"""Send a POLL command
Expand Down Expand Up @@ -413,6 +421,9 @@ def get_gps_lock(self):
# https://gpsd.gitlab.io/gpsd/gpsd_json.html
return gps_info.get("mode", 0) >= 2

def is_gps_enabled(self):
self._log.info("GPS enabled "+str(self._gpsd_iface.is_gps_enabled()))
return self._gpsd_iface.is_gps_enabled()

def main():
"""Test functionality of the GPSDIface class"""
Expand Down
26 changes: 18 additions & 8 deletions mpm/python/usrp_mpm/periph_manager/x4xx.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from usrp_mpm.periph_manager.x4xx_clk_aux import ClockingAuxBrdControl
from usrp_mpm.periph_manager.x4xx_clk_mgr import X4xxClockMgr
from usrp_mpm.periph_manager.zcu111_clk_mgr import ZCU111ClockMgr
from usrp_mpm.periph_manager.zcu111_gps_mgr import ZCU111GPSMgr
from usrp_mpm.periph_manager.x4xx_gps_mgr import X4xxGPSMgr
from usrp_mpm.periph_manager.x4xx_rfdc_ctrl import X4xxRfdcCtrl
from usrp_mpm.periph_manager.zcu111_dio_control import ZCU111DioControl
Expand Down Expand Up @@ -392,7 +393,10 @@ def _init_gps_mgr(self):
is pushed into the GPS manager class.
"""
self.log.debug("Found GPS, adding sensors.")
gps_mgr = X4xxGPSMgr(self._clocking_auxbrd, self.log)
if self.mboard_info.get('product') == 'x411':
gps_mgr = ZCU111GPSMgr(self.log)
else:
gps_mgr = X4xxGPSMgr(self._clocking_auxbrd, self.log)
# We can't use _add_public_methods(), because we only want a subset of
# the public methods. Also, we want to know which sensors were added so
# we can also add them to mboard_sensor_callback_map.
Expand Down Expand Up @@ -469,6 +473,9 @@ def _init_peripherals(self, args):
self.log.warning(
"GPIO I2C bus could not be found for the Clocking Aux Board, "
"disabling Clocking Aux Board functionality.")
else:
has_gps = True

self._clocking_auxbrd = None
self._safe_sync_source = {
'clock_source': X4xxClockMgr.CLOCK_SOURCE_MBOARD,
Expand Down Expand Up @@ -1361,10 +1368,13 @@ def get_gps_sensor_status(self):
"""
assert self._gps_mgr
self.log.trace("Reading all GPS status pins")
return f"""
{self.get_gps_lock_sensor()}
{self.get_gps_alarm_sensor()}
{self.get_gps_warmup_sensor()}
{self.get_gps_survey_sensor()}
{self.get_gps_phase_lock_sensor()}
"""
if (self.mboard_info.get('product') == 'x411'):
return ""
else:
return f"""
{self.get_gps_lock_sensor()}
{self.get_gps_alarm_sensor()}
{self.get_gps_warmup_sensor()}
{self.get_gps_survey_sensor()}
{self.get_gps_phase_lock_sensor()}
"""
17 changes: 10 additions & 7 deletions mpm/python/usrp_mpm/periph_manager/zcu111_clk_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
# this is not the frequency out of the GPSDO(GPS Lite, 20MHz) itself but
# the GPSDO on the CLKAUX board is used to fine tune the OCXO via EFC
# which is running at 10MHz
# X400_GPSDO_OCXO_CLOCK_FREQ = 10e6
X400_GPSDO_OCXO_CLOCK_FREQ = 10e6
# X400_RPLL_I2C_LABEL = 'rpll_i2c'
# X400_DEFAULT_RPLL_REF_SOURCE = '100M_reliable_clk'
# X400_DEFAULT_MGT_CLOCK_RATE = 156.25e6
Expand All @@ -47,12 +47,12 @@ class ZCU111ClockMgr:
CLOCK_SOURCE_MBOARD = "mboard"
# CLOCK_SOURCE_INTERNAL = ClockingAuxBrdControl.SOURCE_INTERNAL
CLOCK_SOURCE_EXTERNAL = "external"
# CLOCK_SOURCE_GPSDO = ClockingAuxBrdControl.SOURCE_GPSDO
CLOCK_SOURCE_GPSDO = "gpsdo"
# CLOCK_SOURCE_NSYNC = ClockingAuxBrdControl.SOURCE_NSYNC

TIME_SOURCE_INTERNAL = "internal"
TIME_SOURCE_EXTERNAL = "external"
# TIME_SOURCE_GPSDO = "gpsdo"
TIME_SOURCE_GPSDO = "gpsdo"
# TIME_SOURCE_QSFP0 = "qsfp0"

# All valid sync_sources for X4xx in the form of (clock_source, time_source)
Expand All @@ -61,7 +61,7 @@ class ZCU111ClockMgr:
# (CLOCK_SOURCE_INTERNAL, TIME_SOURCE_INTERNAL),
(CLOCK_SOURCE_EXTERNAL, TIME_SOURCE_EXTERNAL),
# (CLOCK_SOURCE_EXTERNAL, TIME_SOURCE_INTERNAL),
# (CLOCK_SOURCE_GPSDO, TIME_SOURCE_GPSDO),
(CLOCK_SOURCE_GPSDO, TIME_SOURCE_GPSDO),
# (CLOCK_SOURCE_GPSDO, TIME_SOURCE_INTERNAL),
# (CLOCK_SOURCE_NSYNC, TIME_SOURCE_INTERNAL),
}
Expand Down Expand Up @@ -133,7 +133,8 @@ def _init_available_srcs(self):
"""
Initialize the available clock and time sources.
"""
# has_gps = self._clocking_auxbrd and self._clocking_auxbrd.is_gps_supported()
has_gps = True #TODO PK: maybe try to check it somehow
#self._clocking_auxbrd and self._clocking_auxbrd.is_gps_supported()
self._avail_clk_sources = [self.CLOCK_SOURCE_MBOARD, self.CLOCK_SOURCE_EXTERNAL]
# if self._clocking_auxbrd:
# self._avail_clk_sources.extend([
Expand All @@ -143,13 +144,15 @@ def _init_available_srcs(self):
# self._avail_clk_sources.append(self.CLOCK_SOURCE_NSYNC)
# if has_gps:
# self._avail_clk_sources.append(self.CLOCK_SOURCE_GPSDO)
if has_gps:
self._avail_clk_sources.append(self.CLOCK_SOURCE_GPSDO)
self.log.trace(f"Available clock sources are: {self._avail_clk_sources}")
self._avail_time_sources = [
self.TIME_SOURCE_INTERNAL
, self.TIME_SOURCE_EXTERNAL]
#, self.TIME_SOURCE_QSFP0]
# if has_gps:
# self._avail_time_sources.append(self.TIME_SOURCE_GPSDO)
if has_gps:
self._avail_time_sources.append(self.TIME_SOURCE_GPSDO)
self.log.trace("Available time sources are: {}".format(self._avail_time_sources))

def _init_clk_peripherals(self):
Expand Down
193 changes: 193 additions & 0 deletions mpm/python/usrp_mpm/periph_manager/zcu111_gps_mgr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
#
# Copyright 2021 Ettus Research, a National Instruments Company
# Copyright 2023 Piotr Krysik <[email protected]>
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
"""
X4XX GPS Manager
Handles GPS-related tasks
"""

import re
from usrp_mpm.gpsd_iface import GPSDIfaceExtension

class ZCU111GPSMgr:
"""
Manager class for GPS-related actions for the X4XX.
This also "disables" the sensors when the GPS is not enabled.
"""
def __init__(self, log):
# assert clk_aux_board and clk_aux_board.is_gps_supported()
# self._clocking_auxbrd = clk_aux_board
self.log = log.getChild('GPS')
self.log.trace("Initializing GPSd interface")
self._gpsd = GPSDIfaceExtension()
# To disable sensors, we simply return an empty value if GPS is disabled.
# For TPV, SKY, and GPGGA, we can do this in the same fashion (they are
# very similar). gps_time is different (it returns an int) so for sake
# of simplicity it's defined separately below.
for sensor_name in ('gps_tpv', 'gps_sky', 'gps_gpgga'):
sensor_api = f'get_{sensor_name}_sensor'
setattr(
self, sensor_api,
lambda sensor_name=sensor_name: {
'name': sensor_name, 'type': 'STRING',
'unit': '', 'value': 'n/a'} \
if not self.is_gps_enabled() \
else getattr(self._gpsd, f'get_{sensor_name}_sensor')()
)

def extend(self, context):
"""
Extend 'context' with the sensor methods of this class (get_gps_*_sensor).
If 'context' already has such a method, it is skipped.
Returns a dictionary compatible to mboard_sensor_callback_map.
"""
new_methods = {
re.search(r"get_(.*)_sensor", method_name).group(1): method_name
for method_name in dir(self)
if not method_name.startswith('_') \
and callable(getattr(self, method_name)) \
and method_name.endswith("sensor")}
for method_name in new_methods.values():
if hasattr(context, method_name):
continue
new_method = getattr(self, method_name)
self.log.trace("%s: Adding %s method", context, method_name)
setattr(context, method_name, new_method)
return new_methods

def is_gps_enabled(self):
"""
Return True if the GPS is enabled/active.
"""
# return self._clocking_auxbrd.is_gps_enabled()
return self._gpsd.is_gps_enabled()

def get_gps_enabled_sensor(self):
"""
Get enabled status of GPS as a sensor dict
"""
gps_enabled = self.is_gps_enabled()
return {
'name': 'gps_enabled',
'type': 'BOOLEAN',
'unit': 'enabled' if gps_enabled else 'disabled',
'value': str(gps_enabled).lower(),
}

def get_gps_locked_sensor(self):
"""
Get lock status of GPS as a sensor dict
"""
gps_locked = self._gpsd.get_gps_lock()

# self.is_gps_enabled() and \
# bool(self._clocking_auxbrd.get_gps_lock())
return {
'name': 'gps_lock',
'type': 'BOOLEAN',
'unit': 'locked' if gps_locked else 'unlocked',
'value': str(gps_locked).lower(),
}
# TODO PK: connect lock pin of GPSDO to a GPIO pin?

# return {
# 'name': 'gps_lock',
# 'type': 'BOOLEAN',
# 'unit': 'locked',
# 'value': 'true',
# }

# def get_gps_alarm_sensor(self):
# """
# Get alarm status of GPS as a sensor dict
# """
# # gps_alarm = self.is_gps_enabled() and \
# # bool(self._clocking_auxbrd.get_gps_alarm())
# # return {
# # 'name': 'gps_alarm',
# # 'type': 'BOOLEAN',
# # 'unit': 'active' if gps_alarm else 'not active',
# # 'value': str(gps_alarm).lower(),
# # }
# return {
# 'name': 'gps_alarm',
# 'type': 'BOOLEAN',
# 'unit': 'not active',
# 'value': 'false',
# }

# def get_gps_warmup_sensor(self):
# """
# Get warmup status of GPS as a sensor dict
# """
# # gps_warmup = self.is_gps_enabled() and \
# # bool(self._clocking_auxbrd.get_gps_warmup())
# # return {
# # 'name': 'gps_warmup',
# # 'type': 'BOOLEAN',
# # 'unit': 'warming up' if gps_warmup else 'warmup done',
# # 'value': str(gps_warmup).lower(),
# # }
# return {
# 'name': 'gps_warmup',
# 'type': 'BOOLEAN',
# 'unit': 'warmup done',
# 'value': 'false',
# }

# def get_gps_survey_sensor(self):
# """
# Get survey status of GPS as a sensor dict
# """
# # gps_survey = self.is_gps_enabled() and \
# # bool(self._clocking_auxbrd.get_gps_survey())
# # return {
# # 'name': 'gps_survey',
# # 'type': 'BOOLEAN',
# # 'unit': 'survey active' if gps_survey else 'survey not active',
# # 'value': str(gps_survey).lower(),
# # }
# return {
# 'name': 'gps_survey',
# 'type': 'BOOLEAN',
# 'unit': 'survey not active',
# 'value': 'False',
# }

# def get_gps_phase_lock_sensor(self):
# """
# Get phase_lock status of GPS as a sensor dict
# """
# # gps_phase_lock = self.is_gps_enabled() and \
# # bool(self._clocking_auxbrd.get_gps_phase_lock())
# # return {
# # 'name': 'gps_phase_lock',
# # 'type': 'BOOLEAN',
# # 'unit': 'phase locked' if gps_phase_lock else 'no phase lock',
# # 'value': str(gps_phase_lock).lower(),
# # }
# return {
# 'name': 'gps_phase_lock',
# 'type': 'BOOLEAN',
# 'unit': 'phase locked',
# 'value': 'true'
# }

def get_gps_time_sensor(self):
"""
Get GPS time in integer seconds as a sensor dict
"""
if not self.is_gps_enabled():
return {
'name': 'gps_time',
'type': 'INTEGER',
'unit': 'seconds',
'value': str(-1),
}
return self._gpsd.get_gps_time_sensor()

0 comments on commit ad3f782

Please sign in to comment.