Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DFReader: Create DFMetaData class to handle logger metadata #929

Merged
merged 1 commit into from
Apr 2, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 152 additions & 1 deletion DFReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import time

import struct
import sys
import gzip
import io

from . import mavutil

try:
Expand Down Expand Up @@ -555,6 +557,154 @@ def set_message_timestamp(self, m):
m._timestamp = self.timebase + count/rate


class DFMetaData(object):
'''handle dataflash messages metadata'''
def __init__(self, parent):
self.parent = parent
self.data = None
self.metadata_load_attempted = False

def reset(self):
'''clear cached data'''
self.data = None
self.metadata_load_attempted = False

@staticmethod
def dot_pymavlink(*args):
'''return a path to store pymavlink data'''
if 'HOME' not in os.environ:
dir = os.path.join(os.environ['LOCALAPPDATA'], '.pymavlink')
else:
dir = os.path.join(os.environ['HOME'], '.pymavlink')
if len(args) == 0:
return dir
return os.path.join(dir, *args)

@staticmethod
def download_url(url):
'''download a URL and return the content'''
if sys.version_info.major < 3:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably don't need this compatability code. IIRC the "exists_ok" used below makes this stuff Py3 only anyway...

from urllib2 import urlopen as url_open
from urllib2 import URLError as url_error
else:
from urllib.request import urlopen as url_open
from urllib.error import URLError as url_error
try:
resp = url_open(url)
except url_error as e:
print('Error downloading %s : %s' % (url, e))
return None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we really should be doing this rather than making the caller catch the exceptions...

return resp.read()

@staticmethod
def download():
# Make sure the folder to store XML in has been created
os.makedirs(DFMetaData.dot_pymavlink('LogMessages'), exist_ok=True)
# Loop through vehicles to download
for vehicle in ['Rover', 'Copter', 'Plane', 'Tracker', 'Blimp', 'Sub']:
url = 'http://autotest.ardupilot.org/LogMessages/%s/LogMessages.xml.gz' % vehicle
file = DFMetaData.dot_pymavlink('LogMessages', "%s.xml" % vehicle)
print("Downloading %s as %s" % (url, file))
data = DFMetaData.download_url(url)
if data is None:
continue
# decompress it...
with gzip.GzipFile(fileobj=io.BytesIO(data)) as gz:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually also produce .xz for these... they're about 75% the size of the .gz

data = gz.read()
try:
open(file, mode='wb').write(data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've become rather partial to pathlib myself, but I do understand this is just swiped from MAVProxy.

except Exception as e:
print("Failed to save to %s : %s" % (file, e))

def metadata_tree(self, verbose=False):
''' return a map between a log message and its metadata. May return
None if data is not available '''
# If we've already tried loading data, use it if we have it
# This avoid repeated attempts, when the file is not there
if self.metadata_load_attempted:
return self.data
self.metadata_load_attempted = True
# Get file name, based on vehicle type
mapping = {mavutil.mavlink.MAV_TYPE_GROUND_ROVER : "Rover",
mavutil.mavlink.MAV_TYPE_FIXED_WING : "Plane",
mavutil.mavlink.MAV_TYPE_QUADROTOR : "Copter",
mavutil.mavlink.MAV_TYPE_HELICOPTER : "Copter",
mavutil.mavlink.MAV_TYPE_ANTENNA_TRACKER : "Tracker",
mavutil.mavlink.MAV_TYPE_SUBMARINE : "Sub",
mavutil.mavlink.MAV_TYPE_AIRSHIP : "Blimp",
}
if self.parent.mav_type not in mapping:
return None
path = DFMetaData.dot_pymavlink("LogMessages", "%s.xml" % mapping[self.parent.mav_type])
# Does the file exist?
if not os.path.exists(path):
if verbose:
print("Can't find '%s'" % path)
print("Please run 'logmessage download' from MAVExplorer, or call")
print("DFMetaData.download() from Python.")
return None
# Read in the XML
xml = open(path, 'rb').read()
from lxml import objectify
objectify.enable_recursive_str()
tree = objectify.fromstring(xml)
data = {}
for p in tree.logformat:
n = p.get('name')
data[n] = p
# Cache and return data
self.data = data
return self.data

def print_help(self, msg):
'''print help for a log message'''
data = self.metadata_tree(verbose=True)
if data is None:
return
if msg not in data:
print("No help found for message: %s" % msg)
return
node = data[msg]
# Message name and description
print("Log Message: %s\n%s\n" % (msg, node.description.text))
# Protect against replay messages which dont list their fields
if not hasattr(node.fields, 'field'):
return
namelist = []
unitlist = []
# Loop through fields to build list of name/units
for f in node.fields.field:
namelist.append(f.get('name'))
units = f.get('units')
dtype = f.get('type')
if units:
unitlist.append("[%s] " % units)
elif 'char' in dtype:
unitlist.append("[%s] " % dtype)
elif hasattr(f, 'enum'):
unitlist.append("[enum] ")
elif hasattr(f, 'bitmask'):
unitlist.append("[bitmask] ")
else:
unitlist.append("")
# Now get the max string length from each list
namelen = len(max(namelist, key=len))
unitlen = len(max(unitlist, key=len))
# Loop through fields again to do the actual printing
for i in range(0, len(namelist)):
desc = node.fields.field[i].description.text
print("%-*s %-*s: %s" % (namelen, namelist[i], unitlen, unitlist[i], desc))

def get_description(self, msg):
'''get the description of a log message'''
data = self.metadata_tree()
if data is None:
return None
if msg in data:
return data[msg].description.text
return ""


class DFReader(object):
'''parse a generic dataflash file'''
def __init__(self):
Expand All @@ -572,6 +722,7 @@ def __init__(self):
self.percent = 0
self.unit_lookup = {} # lookup table of units defined by UNIT messages
self.mult_lookup = {} # lookup table of multipliers defined by MULT messages
self.metadata = DFMetaData(self)

def _rewind(self):
'''reset state on rewind'''
Expand Down
Loading