diff --git a/.gitignore b/.gitignore index 1285286312..91a8669f8d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ doc/*.html __pycache__ *.swp +*~ +*.pyc diff --git a/server/pbench/bin/gold/test-11.txt b/server/pbench/bin/gold/test-11.txt new file mode 100644 index 0000000000..204d3245e3 --- /dev/null +++ b/server/pbench/bin/gold/test-11.txt @@ -0,0 +1,1028 @@ ++++ Running indexing on /var/tmp/pbench-test-server/test-11 +len(actions) = 4 +[ + { + "_id": "25685d21315d1b5fdd9593be5f1ab773", + "_index": "pbench.run.2016-08", + "_op_type": "create", + "_source": { + "@metadata": { + "file-date": "2017-03-16", + "file-name": "/var/tmp/pbench-test-server/test-11.tar.xz", + "generated-by": "index-pbench", + "generated-by-version": "0.1.0.0", + "md5": "25685d21315d1b5fdd9593be5f1ab773", + "pbench-agent-version": "" + }, + "@timestamp": "2016-08-02T22:47:17.596797999", + "host_tools_info": [ + { + "hostname": "localhost", + "hostname-f": "localhost.localdomain\n", + "tools": { + "iostat": "--interval=3", + "mpstat": "--interval=3", + "perf": "--record-opts=record -a --freq=100", + "pidstat": "--interval=3", + "proc-interrupts": "--interval=3", + "proc-vmstat": "--interval=3", + "sar": "--interval=3", + "turbostat": "--interval=3" + } + }, + { + "hostname": "192.168.122.83", + "hostname-f": null, + "label": "sysbenchguest", + "tools": {} + } + ], + "run": { + "config": "", + "controller": "localhost.localdomain", + "date": "2016-08-02T22:47:17", + "end_run": "2016-08-02T22:47:58.249377123", + "id": "25685d21315d1b5fdd9593be5f1ab773", + "name": "pbench-user-benchmark__2016-08-02_22:47:17", + "script": "pbench-user-benchmark", + "start_run": "2016-08-02T22:47:17.596797999", + "tarball-dirname": "test-11", + "tarball-toc-prefix": "test-11" + }, + "sosreports": [ + { + "hostname-f": "localhost.localdomain\n", + "hostname-s": "localhost", + "md5": "72d90c40c542f21b76116189f5b24177 sosreport.tar.xz", + "name": "test-11/sysinfo/localhost/end/sosreport.tar.xz" + } + ] + }, + "_type": "pbench-run" + }, + { + "_id": "39683a1426eb92acccd7bc147a356f84", + "_index": "pbench.run.2016-08", + "_op_type": "create", + "_parent": "25685d21315d1b5fdd9593be5f1ab773", + "_source": { + "@timestamp": "2016-08-02T22:47:17.596797999", + "directory": "/1/reference-result/tools-default/localhost/sar/", + "files": [ + { + "mode": "0o644", + "name": "sar.data", + "size": 43236 + }, + { + "mode": "0o775", + "name": "sar.cmd", + "size": 146 + } + ] + }, + "_type": "pbench-run-toc-entry" + }, + { + "_id": "91b78182343b2b5b52d72b195ab889da", + "_index": "pbench.run.2016-08", + "_op_type": "create", + "_parent": "25685d21315d1b5fdd9593be5f1ab773", + "_source": { + "@timestamp": "2016-08-02T22:47:17.596797999", + "directory": "/sysinfo/localhost/end/", + "files": [ + { + "mode": "0o664", + "name": "sosreport.tar.xz.md5", + "size": 51 + }, + { + "mode": "0o664", + "name": "sosreport.tar.xz", + "size": 888 + } + ] + }, + "_type": "pbench-run-toc-entry" + }, + { + "_id": "36208d6dd70b5131e6e2ab7eb7621a5c", + "_index": "pbench.run.2016-08", + "_op_type": "create", + "_parent": "25685d21315d1b5fdd9593be5f1ab773", + "_source": { + "@timestamp": "2016-08-02T22:47:17.596797999", + "directory": "/1/" + }, + "_type": "pbench-run-toc-entry" + } +] + + + + + 3.1 + + Linux + 4.6.3-300.fc24.x86_64 + x86_64 + 4 + 2016-08-02 + 22:47:40 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + 6.67 + + + + + 1514280 + 6345684 + 80.73 + 372440 + 2902960 + 10290592 + 44.88 + 3932160 + 1838516 + 580 + 2495716 + 391308 + 12000 + 75656 + 0 + 15069180 + 0 + 0.00 + 0 + 0.00 + 0.67 + 2.00 + 36.33 + + + 0 + 0 + 0.00 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +--- Finished indexing on /var/tmp/pbench-test-server/test-11 (status=0) ++++ pbench tree state +/var/tmp/pbench-test-server/pbench +--- pbench tree state ++++ pbench log file contents +--- pbench log file contents ++++ test-execution.log file contents +grep: /var/tmp/pbench-test-server/test-execution.log: No such file or directory +--- test-execution.log file contents diff --git a/server/pbench/bin/gold/test-7.8.txt b/server/pbench/bin/gold/test-7.8.txt index 0a87414bb1..7753db4807 100644 --- a/server/pbench/bin/gold/test-7.8.txt +++ b/server/pbench/bin/gold/test-7.8.txt @@ -7,7 +7,7 @@ len(actions) = 19 "_op_type": "create", "_source": { "@metadata": { - "file-date": "2017-02-22", + "file-date": "2017-03-20", "file-name": "/var/tmp/pbench-test-server/uperf__2016-10-06_16:34:03.tar.xz", "generated-by": "index-pbench", "generated-by-version": "0.1.0.0", diff --git a/server/pbench/bin/index-pbench b/server/pbench/bin/index-pbench index ad4c49d540..2cf1cfa32b 100755 --- a/server/pbench/bin/index-pbench +++ b/server/pbench/bin/index-pbench @@ -7,18 +7,32 @@ from __future__ import print_function -import sys, os, time, re, stat, copy -import hashlib, json, glob, csv, tarfile +import sys +import os +import re +import stat +import csv +import copy +import time +import json +import glob +import pathlib +import hashlib +import tarfile +import subprocess +from shutil import rmtree +from pprint import pprint +from collections import defaultdict +from datetime import datetime, timedelta +from satools import tstos, index_sar, extract_sa +from optparse import OptionParser, make_option +from urllib3 import exceptions as ul_excs, Timeout +from elasticsearch import VERSION, Elasticsearch, helpers, exceptions as es_excs try: from configparser import SafeConfigParser, NoSectionError, NoOptionError -except ImportError: +except: from ConfigParser import SafeConfigParser, NoSectionError, NoOptionError -from optparse import OptionParser, make_option -from elasticsearch import VERSION, Elasticsearch, helpers, exceptions as es_excs -from urllib3 import exceptions as ul_excs, Timeout -from datetime import datetime, timedelta -from vos.analysis.lib import tstos _VERSION_ = "0.1.0.0" _NAME_ = "index-pbench" @@ -31,6 +45,11 @@ _dict_const = dict _tmpdir = None +# list of file names present in the pbench tar ball, +# that use a special indexing method. Currently, sar is supported. +SPECIAL_PARAMS_LIST = ['sar.data'] +_special_paths = defaultdict(list) + class MappingFileError(Exception): pass @@ -744,6 +763,7 @@ def mk_toc(tb, md5sum, options): # by a dictionary with a 'name' key, whose value is the pathname of the dictionary # and a 'files' key, which is a list of the files contained in the directory. d = _dict_const() + global _special_paths for m in members: if m.isdir(): dname = os.path.dirname(m.name)[len(prefix):] + os.path.sep @@ -770,6 +790,11 @@ def mk_toc(tb, md5sum, options): # the directory entry always preceded the file entry # in the tarball. d[dname] = _dict_const(directory=dname, files=[fentry]) + + if m.path.split('/')[-1] in SPECIAL_PARAMS_LIST: + # import pdb; pdb.set_trace() + _special_paths[m.path.split('/')[-1]].append(m) + else: # if debug: print(repr(m)) continue @@ -826,7 +851,6 @@ def search_by_host(sos_d_list, host): return None def search_by_ip(sos_d_list, ip): - # import pdb; pdb.set_trace() for sos_d in sos_d_list: for l in sos_d.values(): if type(l) != type([]): @@ -905,6 +929,7 @@ def ip_address_to_ip_o_addr(s): # If not, grovel through the ip_address file, collect the juicy pieces # and fake a string that is similar in format to the preferred case - # at least similar enough to satisfy the caller of this function. + as_is = True pat = re.compile(r'^[0-9]+:') @@ -1372,6 +1397,43 @@ def main(options, args): if options.debug_time_operations: ts("es_index") res = es_index(es, actions) + if _special_paths: + # print("no special paths found (to be indexed separately). " + \ + # "indexing pbench-run and pbench-run-toc-entry", + # file=sys.stderr) + # define a temporary dir to be used for temporary sar data extraction + if not _tmpdir: + print("setting /tmp as default since none found in environment variable..") + _tmpdir = '/tmp' + + # process contents of _special_paths + for special_case in _special_paths: + if special_case == 'sar.data': + # convert sar.data files into xml files (to reside inside _tmpdir) + # and then use index_sar.call_indexer() to index them using this + for m in _special_paths[special_case]: + uid = m.name.split('/sar/sar.data')[0] + sa_data_file = os.path.join(_tmpdir, m.path) + extraction_status, nodename, xml_fp = extract_sa.process_binary(path=sa_data_file, + cfg=cfg_name, + write_data_to_file=True) + + if not extraction_status: + continue + + if not options.debug_unittest: + index_sar.call_indexer(file_path=xml_fp, + _nodename=nodename, + cfg_name=cfg_name, + run_unique_id=uid, + run_md5=ptb.md5sum) + else: + print(open(xml_fp, 'r').read()) + + # TODO: generate grafana dashboard using create_dashboard + # finally, cleanup _tmpdir tmp files (xmls, binaries) + rmtree(os.path.join(_tmpdir, pathlib.Path(m.name).parts[0])) + except ConfigFileNotSpecified as e: print(e, file=sys.stderr) res = 2 diff --git a/server/pbench/lib/vos/analysis/lib/__init__.py b/server/pbench/bin/satools/__init__.py similarity index 100% rename from server/pbench/lib/vos/analysis/lib/__init__.py rename to server/pbench/bin/satools/__init__.py diff --git a/server/pbench/bin/satools/extract_sa.py b/server/pbench/bin/satools/extract_sa.py new file mode 100755 index 0000000000..900ce56022 --- /dev/null +++ b/server/pbench/bin/satools/extract_sa.py @@ -0,0 +1,220 @@ +#!/usr/bin/env python3 + +import os +import re +import sys +import subprocess +import configparser +from satools import oscode +from distutils.spawn import find_executable +from optparse import OptionParser, make_option + +try: + from configparser import SafeConfigParser, NoSectionError, NoOptionError +except: + from ConfigParser import SafeConfigParser, NoSectionError, NoOptionError + +BASE_DIR = os.path.abspath(os.path.dirname('__file__')) +DEFAULT_SADF_PATH = find_executable('sadf') +nodename_pattern = re.compile(r'nodename=".*"') + +class ConfigFileNotSpecified(Exception): + pass + +class ConfigFileError(Exception): + pass + + +def extract_xml(sa_file_path='/tmp/sa01', + sadf_script_path=DEFAULT_SADF_PATH): + CMD_CONVERT = ['-x', "--", "-A"] + CMD_CONVERT.insert(0, sadf_script_path) + CMD_CONVERT.insert(-2, sa_file_path) + + target = os.path.dirname(sa_file_path) + filename = os.path.basename(sa_file_path) + p1 = subprocess.Popen( + CMD_CONVERT, + env={'LC_ALL': 'C', 'TZ': 'UTC'}, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + bufsize=1 + ) + + XML_DATA = '' + with p1.stdout: + for line in iter(p1.stdout.readline, b''): + XML_DATA+=line.decode('utf-8') + rc = p1.wait() + err = p1.stderr + err = err.read().decode() + if rc == 0: + print(err, file=sys.stdout) + NODENAME = nodename_pattern.findall(XML_DATA)[0]\ + .replace("nodename=","").replace('"','') + return (True, rc, NODENAME, XML_DATA) + else: + print(err, file=sys.stderr) + del XML_DATA + if "cannot read the format" in err: + return (True, rc, None, None) + else: + print("ERROR: Supplied path doesn't yield an SA binary file. Check your input!", file=sys.stderr) + # sys.exit(rc) + return (False, rc, None, None) + + +def convert_binary(sa_file_path='/tmp/sa01', + sadf_script_path=DEFAULT_SADF_PATH): + """ + From sadf man page: + + >> Convert an old system activity binary datafile + >> (version 9.1.6 and later) to current up-to-date format. + >> Use the following syntax: + >> $ sadf -c old_datafile > new_datafile + """ + SA_FILEPATH_CONV = "%s_conv" % sa_file_path + CMD_CONVERT = [sadf_script_path, '-c', sa_file_path] + p2 = subprocess.Popen(CMD_CONVERT, stdout=open(SA_FILEPATH_CONV ,'w'), + stderr=subprocess.PIPE, env={'LC_ALL': 'C', 'TZ': 'UTC'}) + # p2.wait() + err = p2.stderr + if err: + err = err.read().decode() + print(err, file=sys.stderr) + if "Cannot convert" in err: + os.remove(SA_FILEPATH_CONV) + return (False, '') + + print("converted binary to current version of sysstat..") + print(p2.communicate()[0]) + + return (True, SA_FILEPATH_CONV) + + +def check_sadf_compliance(sa_file_path='/tmp/sa01', cfg_name=None, path=None): + """ + Attempt to determine os-code for a SA binary which failed to + convert to XML in first go. If attempt fails, try to convert that + binary to be compatible with a newer sysstat version using `sadf -c`. + + Returns: + - (True, path-to-new-sa-binary) if conversion happens + - (False, os-code) if conversion failed and we now need OS specific + sadf binaries to process this SA file. + """ + res = oscode.determine_version(file_path=sa_file_path) + if res[0]: + print("compatible OS version for this binary: %s" % res[1]) + script_path = os.path.join(path, "sadf-%s-64" % res[1]) + if os.path.exists(script_path): + return extract_xml(sa_file_path=sa_file_path, + sadf_script_path=script_path) + else: + msg = "Appropriate sadf-- script necessary to process this file not found!\n" + msg += "Please provide a path to folder containing sadf script corresponding to Red Hat OS: %s\n" % res[1] + msg += "Path should be configured under [SAR] section in %s" % cfg_name + print(msg, file=sys.stderr) + # sys.exit(1) + return (False, 1, None, None) + + else: + print("ERROR: [oscode/sysstat-version]::[file-magic] record for this binary not found.", file=sys.stderr) + print("Please file a bug @ https://github.com/distributed-system-analysis/satools. Thanks!", file=sys.stderr) + # sys.exit(1) + return (False, 1, None, None) + + +def dump_xml(data='', sa_file_path=None): + XML_DUMP_FILE = os.path.basename(sa_file_path) + ".xml" + NEW_PATH = os.path.join( + os.path.dirname(sa_file_path), + XML_DUMP_FILE + ) + with open(NEW_PATH, 'w') as f: + f.write(data) + + return NEW_PATH + + +def process_binary(path=None, cfg=None, write_data_to_file=False): + """ + main block responsible for handling SA binary file. + + Returns: + - (status, nodename, path_to_dumped_xml_file) if write_data_to_file == True + - (status, nodename, xml_data) if write_data_to_file == False + """ + try: + config = SafeConfigParser() + config.read(cfg) + sadf_binaries_path = config.get('SAR', 'sadf_binaries_path') + + extraction_status, rc, nodename, data = extract_xml(sa_file_path=path) + if extraction_status == True and rc != 0: + # Couldn't extract in one go. So then, try to convert to latest sysstat version. + conversion_status = convert_binary(sa_file_path=path) + + if conversion_status[0]: + extraction_status, rc, nodename, data = extract_xml(sa_file_path=conversion_status[1]) + os.remove(conversion_status[1]) + if extraction_status == True and rc != 0: + # Per se, extraction failed after conversion to latest sysstat version's format. + # So now, try to detect oscode and follow that path. + print("Failed to extract converted binary. Checking oscode..", file=sys.stderr) + extraction_status, rc, nodename, data = check_sadf_compliance(sa_file_path=path, + cfg_name=cfg, + path=sadf_binaries_path) + else: + # so to speak, failed to convert to latest sysstat's versioning. + # Fall back to oscode detection route. + extraction_status, rc, nodename, data = check_sadf_compliance(sa_file_path=path, + cfg_name=cfg, + path=sadf_binaries_path) + if write_data_to_file and extraction_status: + xml_file_path = dump_xml(data=data, sa_file_path=path) + del data + return (extraction_status, nodename, xml_file_path) + else: + return (extraction_status, nodename, data) + + except ConfigFileNotSpecified as e: + print(e, file=sys.stderr) + + except ConfigFileError as e: + print(e, file=sys.stderr) + + except Exception as e: + print("Other error", e, file=sys.stderr) + import traceback + print(traceback.format_exc()) + + +if __name__ == '__main__': + + usage = "Usage: extract_sa.py [--config ] " + parser = OptionParser(usage) + o = make_option("-c", "--config", dest="cfg_name", help="Specify config file") + parser.add_option(o) + (options, args) = parser.parse_args() + if not options.cfg_name: + parser.error('Path to pbench-index.cfg required.') + + try: + SA_FILEPATH = args[0] + status, nodename, fp = process_binary(path=SA_FILEPATH, + cfg=options.cfg_name, + write_data_to_file=True) + if status: + print("Nodename: %s" % nodename) + print("XML data saved to: %s" % fp) + else: + sys.exit(1) + + except IndexError as e: + parser.error("No SA binary file supplied to script") + except Exception as e: + print("Other error", e, file=sys.stderr) + import traceback + print(traceback.format_exc()) diff --git a/server/pbench/bin/satools/index_sar.py b/server/pbench/bin/satools/index_sar.py new file mode 100755 index 0000000000..af7699f043 --- /dev/null +++ b/server/pbench/bin/satools/index_sar.py @@ -0,0 +1,917 @@ +#!/usr/bin/env python3 + +from __future__ import print_function + +_DEBUG = 1 +_PROFILE = False + +# Version of this tool, ..-, where: +# +# major: systemic layout change to _metadata or other field structures, +# not backward compatible +# minor: backward compatible layout changes to names, etc. +# rev: completely compatible changes or additions made +# build: identifying build number, should not represent any changes +# +# Started at 1.0.0-0 since we initiated this after a trial period. +# +import os +import sys +import hashlib +import logging +import configparser +import json, collections +import lzma, contextlib +import time +import xml.parsers.expat +import mmap +from urllib3 import exceptions as ul_excs, Timeout +from elasticsearch import VERSION, Elasticsearch, helpers, exceptions as es_excs + +if _PROFILE: + import cProfile, cStringIO as StringIO, pstats + +try: + from logging import NullHandler +except ImportError: + class NullHandler(logging.Handler): + def handle(self, record): + pass + def emit(self, record): + pass + def createLock(self): + self.lock = None + +_VERSION_ = "1.1.2-0" +if VERSION < (1, 0, 0): + print("At least v1.0.0 of the ElasticSearch Python client is required," + " found %r" % (VERSION,), file=sys.stderr) + sys.exit(1) + +_read_timeout = 120 +timeoutobj = Timeout(total=1200, connect=10, read=_read_timeout) +# Silence logging messages from the elasticsearch client library +logging.getLogger('elasticsearch').addHandler(NullHandler()) +# to keep track of timestamp +DOCTYPE = 'sar' +_NAME_ = "index-sar" +TS_ALL = [] +_op_type = 'create' + + +def gen_action(index, rec, nodename, ts): + md5 = hashlib.md5((nodename + ts).encode('utf-8')) + TS_ALL.append(ts) + # ix_all.append(index) + action = { + "_op_type": _op_type, + "_index": index, + "_type": DOCTYPE, + "_id": md5.hexdigest(), + "_timestamp": ts, + "_source": rec + } + return action + + +def cvtnum(v): + """ + Convert a string value to an integer or a float. If the given value is + neither, return it quoted. + """ + try: + # Attempt to consider it an int first. + new_v = int(v) + except ValueError: + try: + # Then see if it could be considered a float. + new_v = float(v) + except ValueError: + # Otherwise, just return it as a quoted string. + new_v = v + else: + # ElasticSearch / Lucene have a max range for long types of + # [-9223372036854775808, 9223372036854775807], assumes 64-bit + # platforms! + if new_v > sys.maxsize: + new_v = sys.maxsize + elif new_v < -sys.maxsize: + new_v = -sys.maxsize + return new_v + +def tstos(ts=None): + return time.strftime("%Y-%m-%dT%H:%M:%S-%Z", time.localtime(ts)) + + +class SysStatParse(object): + """ + Parse sysstat XML generated data into discrete JSON documents, with + appropriate meta data applied for indexing in ElasticSearch. + + The operation of this code relies on the order in which elements are + written to the XML from the sadf utility, where the first few elements can + data that applies to all statistics, comments and restarts (machine name, + linux version, etc.). + + For the most part, all the element names and attributes translate over + one-to-one. There are a few exceptions: + + 1. int-proc data has no mapping in the more recent sysstat versions, + so a new key is used, "interrupts-processor" instead (note that + int-global already maps to "interrupts") + 2. the file-sz and inode-sz elements are renamed to file-nr and inode-nr + to match later output + 3. the net-device element is not carried over + 4. the rxbyt and txbyt net-dev values are renamed to rxkB and txkB, and + their values devided by 1,024 to match + + No attempt is made to map older element or attribute names into their more + recent forms, e.g. + + * "processes" and "context-switch" combined into the newer + "process-and-context-switch" element + + @pbench_run_md5: This md5 corresponds to the pbench-run._metadata.md5 + @pbench_run_unique_id: This is a path that represents the unique location + of that SAR data being indexed. + """ + _valid_states = set(( + 'START', 'sysstat', 'host', 'statistics', 'timestamp', 'cpu-load', + 'cpu-load-all', 'io', 'memory', 'hugepages', 'kernel', 'serial', + 'power-management', 'disk', 'network', 'interrupts', 'int-global', + 'int-proc', 'filesystems','cpu-frequency', 'fan-speed', 'comments', + 'voltage-input', 'temperature', 'usb-devices', 'restarts')) + + _int_name_map = { 'int-global': 'interrupts', + 'int-proc': 'interrupts-processor' } + + def __init__(self, fname, target_nodename, es, unique_id, md5, + idx_prefix, blk_action_count): + if _DEBUG > 9: + import pdb; pdb.set_trace() + + # Input filename being considered + self.fname = fname + # set references to a pbench run + self.target_nodename = target_nodename + self.pbench_run_md5 = md5 + self.pbench_run_unique_id = unique_id + self.es = es + self.INDEX_PREFIX = idx_prefix + self.BULK_ACTION_COUNT = blk_action_count + # XML element parsing state and the stack that helps us transitions + # between states; see _valid_states above + self.state = 'START' + self.stack = [] + + # Saved key/value pairs being assembled into a new dictionary, + # currently used to merge "processes" and "context-switch" element + # dictionaries into the one "process-and-context-switch" dictionary. + self.saved = {} + + # Used when in the context of an element who's character data values + # are destined for normal statistics data. + self.curr_element = None + # Used when in the context of an element who's character data values + # are destined for metadata. + self.curr_meta_element = None + + # Metadata dictionary to accumulate top level element values and + # attributes that should be applied as metadata to all statistics, + # comments, and restarts. + self.metadata = {} + + # Dictionary or list representing the current element's context as its + # being processed. + self.curr_element_dict = None + self.curr_element_dict_stack = [] + + # Buffer for accumulating data between elements (all accumulated data + # is stripped of whitespace when the end element is encountered). The + # data stack allows us to accumulate data between elements, pushing a + # new buffer when we encounter a new element start before the previous + # one ends. + self.data_buf = '' + self.data_stack = [] + + # State for handling net-dev and net-edev elements + self.net_devs = [] + self.net_edevs = [] + self.net_device_iface = None + + # For each "host" element encountered, this is the nodename attribute + # value from that element. + self.nodename = '' + + # Accumulated list of ElasticSearch "actions" for bulk indexing + self.actions = [] + + # ElasticSerach indexing counts + self.totals = collections.defaultdict(int) + self.successes = 0 + self.duplicates = 0 + self.errors = 0 + self.exceptions = 0 + + def _error(self, msg, *args): + print(msg % args, file=sys.stderr) + if _DEBUG > 8: + import pdb; pdb.set_trace() + print(repr(self.stack)) + + def _warn(self, msg, *args): + print(msg % args, file=sys.stderr) + if _DEBUG > 9: + import pdb; pdb.set_trace() + print(repr(self.stack)) + + def _dump_actions(self): + for act in self.actions: + print(json.dumps(act, indent=4, sort_keys=True)) + # If we are dumping the actions out, we are consuming them + del self.actions[0:len(self.actions)] + + def _push(self, state=None, noindent=False): + self.stack.append((self.state, self.saved)) + self.state = state if state is not None else self.state + self.saved = {} + + def _pop(self): + self.state, self.saved = self.stack.pop() + + def _push_dict(self, obj, name=None): + self.curr_element_dict_stack.append(self.curr_element_dict) + self.curr_element_dict = obj + if _DEBUG > 8: + print(name, "pushed", self.curr_element_dict, self.curr_element_dict_stack) + + def _pop_dict(self, name=None): + if _DEBUG > 8: + if len(self.curr_element_dict_stack) == 0: + import pdb; pdb.set_trace() + print(name, "popping", self.curr_element_dict, self.curr_element_dict_stack) + else: + assert len(self.curr_element_dict_stack) > 0 + + ced = self.curr_element_dict + assert ced is not None + + self.curr_element_dict = self.curr_element_dict_stack.pop() + if isinstance(self.curr_element_dict, list): + self.curr_element_dict.append(ced) + else: + if _DEBUG > 8: + if name is None or self.curr_element_dict is None: + import pdb; pdb.set_trace() + else: + assert name is not None + self.curr_element_dict[name] = ced + + def _normalize_attrs(self, attrs): + new_attrs = {} + for k, v in attrs.items(): + if k == "per" and v == "second": + continue + new_attrs[k] = cvtnum(v) + return new_attrs + + def _create_record(self, name, attrs): + assert name in ('comment', 'boot', 'timestamp') + timestamp_d = {} + name_val = None + for k, v in attrs.items(): + if k in ('date', 'time', 'utc', 'interval'): + timestamp_d[k] = cvtnum(v) + else: + assert k != 'com' or name_val is None + name_val = cvtnum(v) + if not name_val: + if name == 'boot': + name_val = "recorded" + assert timestamp_d + record = { 'timestamp': timestamp_d } + if name != 'timestamp': + record[name] = name_val + return record + + def _metadata_element(self, name, attrs): + if not attrs: + assert name, "Expected an element name, not: %r" % name + # Remember this element name, and wait for possible + # character data + self.curr_meta_element = name + else: + for k, v in attrs.items(): + if k == "per" and v == "second": + continue + self.metadata[k] = cvtnum(v) + + def _register_action(self, name): + #assert self.curr_element_dict is not None + if self.curr_element_dict is None: + if _DEBUG > 8: + import pdb; pdb.set_trace() + print("bad state: _register_action() called with no action", + file=sys.stderr) + self.exceptions += 1 + return + #assert len(self.curr_element_dict_stack) == 0 + if len(self.curr_element_dict_stack) > 0: + if _DEBUG > 8: + import pdb; pdb.set_trace() + print("bad state: _register_action() called with a non-empty stack", + file=sys.stderr) + self.exceptions += 1 + return + + record = self.curr_element_dict + self.curr_element_dict = None + self.metadata["generated-by"] = _NAME_ + self.metadata["generated-by-version"] = _VERSION_ + self.metadata["pbench_run_md5"] = self.pbench_run_md5 + self.metadata["pbench_run_unique_id"] = self.pbench_run_unique_id + record['_metadata'] = self.metadata + try: + timestamp = record['timestamp'] + ts = timestamp['date'] + 'T' + timestamp['time'] + except KeyError: + print("Seems to be an invalid sar XML file, where a" + " '%s' element does not have date and time" + " attributes" % name, file=sys.stderr) + if _DEBUG > 8: + import pdb; pdb.set_trace() + self.exceptons += 1 + return + else: + index = "%s.sar-%s" % (self.INDEX_PREFIX, timestamp['date']) + self.actions.append(gen_action(index, record, self.nodename, ts)) + + self.totals['records'] += 1 + self.totals[self.state] += 1 + + if len(self.actions) >= self.BULK_ACTION_COUNT: + self._bulk_upload() + + def _bulk_upload(self): + if _DEBUG > 1: + self._dump_actions() + if len(self.actions) == 0: + return + + beg, end = time.time(), None + start = beg + if _DEBUG > 0: + print("\tbulk index (beg ts: %s) ..." % tstos(beg)) + sys.stdout.flush() + delay = _read_timeout + tries = 20 + try: + while True: + try: + res = helpers.bulk(self.es, self.actions) + except es_excs.ConnectionError as err: + end = time.time() + if isinstance(err.info, ul_excs.ReadTimeoutError): + tries -= 1 + if tries > 0: + print("\t\tWARNING (end ts: %s, duration: %.2fs):" + " read timeout, delaying %d seconds before" + " retrying (%d attempts remaining)..." % ( + tstos(end), end - beg, delay, tries), + file=sys.stderr) + time.sleep(delay) + delay *= 2 + beg, end = time.time(), None + print("\t\tWARNING (beg ts: %s): retrying..." % ( + tstos(beg)), file=sys.stderr) + continue + if _DEBUG > 8: + import pdb; pdb.set_trace() + print("\tERROR (end ts: %s, duration: %.2fs): %s" % ( + tstos(end), end - start, err), file=sys.stderr) + self.exceptions += 1 + except Exception as err: + end = time.time() + if _DEBUG > 8: + import pdb; pdb.set_trace() + # print("\tERROR (end ts: %s, duration: %.2fs): %s" % ( + # tstos(end), end - start, err), file=sys.stderr) + self.exceptions += 1 + else: + end = time.time() + lcl_successes = res[0] + self.successes += lcl_successes + lcl_duplicates = 0 + lcl_errors = 0 + len_res1 = len(res[1]) + for idx, ires in enumerate(res[1]): + sts = ires[_op_type]['status'] + if sts not in (200, 201): + if _op_type == 'create' and sts == 409: + self.duplicates += 1 + lcl_duplicates += 1 + else: + print("\t\tERRORS (%d of %d): %r" % ( + idx, len_res1, ires[_op_type]['error']), + file=sys.stderr) + self.errors += 1 + lcl_errors += 1 + else: + self.successes += 1 + lcl_successes += 1 + if _DEBUG > 0 or lcl_errors > 0: + print("\tdone (end ts: %s, duration: %.2fs," + " success: %d, duplicates: %d, errors: %d)" % ( + tstos(end), end - start, lcl_successes, + lcl_duplicates, lcl_errors)) + sys.stdout.flush() + break + finally: + del self.actions[0:len(self.actions)] + + def start_element(self, name, attrs): + assert self.state in self._valid_states + assert not (self.curr_element is not None and self.curr_meta_element is not None) + self.data_stack.append(self.data_buf.strip()) + self.data_buf = '' + + if self.state == 'int-global': + if name == 'irq': + if attrs['value'] != "0.00": + self.curr_element_dict.append(self._normalize_attrs(attrs)) + else: + self._error("Ignoring start for element: %s, attrs: %r", name, attrs) + elif self.state == 'int-proc': + if name == 'irqcpu': + if attrs['value'] != "0.00": + self.curr_element_dict.append(self._normalize_attrs(attrs)) + else: + self._error("Ignoring start for element: %s, attrs: %r", name, attrs) + elif self.state == 'interrupts': + if name == 'int-global': + self._push(state=name) + self._push_dict([]) + elif name == 'int-proc': + self._push(state=name) + self._push_dict([]) + else: + self._error("Ignoring start for element: %s, attrs: %r", name, attrs) + elif self.state in ('cpu-load', 'cpu-load-all'): + if name == 'cpu': + try: + attrs['cpu'] = attrs['number'] + except KeyError: + pass + else: + del attrs['number'] + self.curr_element_dict.append(self._normalize_attrs(attrs)) + else: + self._error("Ignoring start for element: %s, attrs: %r", name, attrs) + elif self.state == 'disk': + if name == 'disk-device': + try: + attrs['disk-device'] = attrs['dev'] + except KeyError: + pass + else: + del attrs['dev'] + self.curr_element_dict.append(self._normalize_attrs(attrs)) + else: + self._error("Ignoring start for element: %s, attrs: %r", name, attrs) + elif self.state in ('cpu-frequency', 'fan-speed', 'temperature', + 'voltage-input', 'usb-devices', 'filesystems'): + if self.state == 'filesystems': + try: + attrs['filesystem'] = attrs['fsname'] + except KeyError: + pass + else: + del attrs['fsname'] + self.curr_element_dict.append(self._normalize_attrs(attrs)) + elif self.state == 'network': + if name == 'net-dev': + if self.net_device_iface is not None: + attrs['iface'] = self.net_device_iface + self.net_devs.append(self._normalize_attrs(attrs)) + elif name == 'net-edev': + if self.net_device_iface is not None: + attrs['iface'] = self.net_device_iface + self.net_edevs.append(self._normalize_attrs(attrs)) + elif name == 'net-device': + self.net_device_iface = attrs['iface'] + else: + self.curr_element_dict[name] = self._normalize_attrs(attrs) + elif self.state in ('io', 'memory', 'kernel', 'hugepages'): + if not attrs: + assert name, "Expected an element name, not: %r" % name + # Remember this element name, and wait for possible + # character data + self.curr_element = name + else: + self.curr_element_dict[name] = self._normalize_attrs(attrs) + elif self.state == 'serial': + if name == 'tty': + self.curr_element_dict.append(self._normalize_attrs(attrs)) + else: + self._error("Ignoring start for element: %s, attrs: %r", name, attrs) + elif self.state == 'power-management': + if name in ('cpu-frequency', 'fan-speed', 'temperature', + 'voltage-input', 'usb-devices'): + self._push(state=name) + self._push_dict([]) + else: + self._error("Ignoring start for element: %s, attrs: %r", name, attrs) + elif self.state == 'timestamp': + if name in ('cpu-load-all', 'cpu-load', 'disk', 'serial', 'filesystems'): + self._push(state=name) + self._push_dict([]) + elif name == 'interrupts': + self._push(state=name, noindent=True) + elif name in ('io', 'memory', 'network', 'hugepages', 'power-management'): + self._push(state=name) + self._push_dict({}) + if self.state == 'network': + self.net_devs = [] + self.net_edevs = [] + elif name == 'kernel': + try: + del attrs['per'] + except KeyError: + pass + if attrs: + # Starting with sysstat 10.1.x (maybe earlier) kernel has + # attributes on element + self._push_dict(self._normalize_attrs(attrs)) + else: + # Pre sysstat 10.1.x kernel has sub elements with attributes + self._push(state=name) + self._push_dict({}) + elif name in ('process-and-context-switch', 'swap-pages', 'paging', 'queue'): + self._push_dict(self._normalize_attrs(attrs)) + elif name in ('processes', 'context-switch'): + self.saved[name] = attrs + else: + self._error("Ignoring start for element: %s, attrs: %r", name, attrs) + elif self.state == 'statistics': + if name == 'timestamp': + self._push(state='timestamp') + assert self.curr_element_dict == None + assert len(self.curr_element_dict_stack) == 0 + self.curr_element_dict = self._create_record(name, attrs) + else: + self._error("Ignoring element: %s, attrs: %r", name, attrs) + elif self.state == 'restarts': + if name == 'boot': + assert self.curr_element_dict == None + assert len(self.curr_element_dict_stack) == 0 + self.curr_element_dict = self._create_record(name, attrs) + else: + self._error("Ignoring start for element: %s, attrs: %r", name, attrs) + elif self.state == 'comments': + if name == 'comment': + assert self.curr_element_dict == None + assert len(self.curr_element_dict_stack) == 0 + self.curr_element_dict = self._create_record(name, attrs) + else: + self._error("Ignoring start for element: %s, attrs: %r", name, attrs) + elif self.state == 'host': + if name in ('statistics', 'restarts', 'comments'): + self._push(state=name, noindent=True) + elif name in ('sysname', 'release', 'machine', 'number-of-cpus', 'file-date', 'file-utc-time'): + self._metadata_element(name, attrs) + else: + self._error("Ignoring start for element: %s, attrs: %r", name, attrs) + elif self.state == 'sysstat': + if name == 'host': + self._push(state='host', noindent=True) + self._metadata_element(name, attrs) + try: + self.nodename = self.metadata['nodename'] + except KeyError: + print("Seems to be an invalid sar XML file," + " where the host element does not" + " have a nodename attribute", file=sys.stderr) + raise + else: + if self.nodename != self.target_nodename: + raise Exception("Unexpected nodename, %s, expected: %s" % (self.nodename, self.target_nodename)) + elif name in ('sysdata-version'): + self._metadata_element(name, attrs) + else: + self._error("Ignoring start for element: %s, attrs: %r", name, attrs) + elif self.state == 'START': + if name == 'sysstat': + self._push(state='sysstat', noindent=True) + else: + self._warn("Ignoring element: %s, attrs: %r", name, attrs) + else: + self._error("Unexpected state: %s", self.state) + + def end_element(self, name): + assert self.state in self._valid_states + assert not (self.curr_element is not None and self.curr_meta_element is not None) + + data_buf = self.data_buf.strip() + self.data_buf = self.data_stack.pop() + + if self.curr_element is not None: + if self.curr_element != name: + if data_buf != '': + # Encountered the end of another element, when expecting something else + if _DEBUG > 8: + import pdb; pdb.set_trace() + pass + else: + # Quietly swallow other empty elements that don't match this one + pass + return + if data_buf: + if self.curr_element_dict is None: + import pdb; pdb.set_trace() + self.curr_element_dict[name] = cvtnum(data_buf) + else: + if _DEBUG > 8: + import pdb; pdb.set_trace() + pass + else: + # Not debugging, ignore whitespace only element values + pass + self.curr_element = None + elif self.curr_meta_element is not None: + if self.curr_meta_element != name: + if data_buf != '': + # Encountered the end of another element, when expecting something else + if _DEBUG > 8: + import pdb; pdb.set_trace() + pass + else: + # Quietly swallow other empty elements that don't match this one + pass + return + if data_buf: + self.metadata[name] = cvtnum(data_buf) + else: + if _DEBUG > 8: + import pdb; pdb.set_trace() + pass + else: + # Not debugging, ignore whitespace only values elements + pass + self.curr_meta_element = None + else: + # Nothing to do for this ending element. + assert data_buf == '' + + if self.state in ('int-global', 'int-proc'): + if name == self.state: + self._pop_dict(self._int_name_map[name]) + self._pop() + elif self.state in ('cpu-load', 'cpu-load-all'): + if name == self.state: + self._pop_dict(name) + if name == 'cpu-load-all': + # Synthesize a cpu-load entry from a cpu-load-all: + cpu_load = [] + for clad in self.curr_element_dict[name]: + cld = { + 'cpu': clad['cpu'], + 'idle': clad['idle'], + 'iowait': clad['iowait'], + 'nice': clad['nice'], + 'steal': clad['steal'], + 'system': clad['sys'] + clad['irq'] + clad['soft'], + 'user': clad['usr'] + clad['guest'] + } + try: + # with sysstat 10.1.6 and later we might have + # gnice values, which roll up into + # cpu-load.cpu.nice. + cld['nice'] += clad['gnice'] + except KeyError: + pass + cpu_load.append(cld) + self.curr_element_dict['cpu-load'] = cpu_load + self._pop() + elif self.state in ('serial', 'disk', 'cpu-frequency', 'fan-speed', + 'temperature', 'voltage-input', 'usb-devices', + 'filesystems'): + if name == self.state: + self._pop_dict(name) + self._pop() + elif self.state == 'interrupts': + if name == self.state: + self._pop() + elif self.state == 'network': + if name == self.state: + if self.net_devs: + assert self.curr_element_dict is not None + if 'rxbyt' in self.net_devs[0]: + for nd in self.net_devs: + try: + nd['rxkB'] = nd['rxbyt'] / 1024 + nd['txkB'] = nd['txbyt'] / 1024 + except KeyError: + pass + else: + del nd['rxbyt'] + del nd['txbyt'] + self.curr_element_dict['net-dev'] = self.net_devs + self.net_devs = [] + if self.net_edevs: + assert self.curr_element_dict is not None + self.curr_element_dict['net-edev'] = self.net_edevs + self.net_edevs = [] + if self.net_device_iface is not None: + self.net_device_iface = None + self._pop_dict(name) + self._pop() + elif self.state in ('io', 'memory', 'kernel', 'hugepages', 'power-management'): + if name == self.state: + if name == 'kernel': + # Make sysstat-7.x like sysstat-9.x and later + try: + self.curr_element_dict['file-nr'] = self.curr_element_dict['file-sz'] + except KeyError: + pass + else: + del self.curr_element_dict['file-sz'] + try: + self.curr_element_dict['inode-nr'] = self.curr_element_dict['inode-sz'] + except KeyError: + pass + else: + del self.curr_element_dict['inode-sz'] + self._pop_dict(name) + self._pop() + elif self.state == 'timestamp': + if name == self.state: + try: + pattrs = self.saved['processes'] + except KeyError: + combined_attrs = {} + else: + combined_attrs = pattrs + try: + cattrs = self.saved['context-switch'] + except KeyError: + pass + else: + combined_attrs.update(cattrs) + if combined_attrs: + self.curr_element_dict['process-and-context-switch'] = self._normalize_attrs(combined_attrs) + self._pop() + self._register_action(name) + elif name not in ('processes', 'context-switch'): + self._pop_dict(name) + elif self.state == 'statistics': + if name == self.state: + self._pop() + elif self.state in ('restarts', 'comments'): + if name == self.state: + self._pop() + else: + self._register_action(name) + elif self.state == 'host': + # Add any other elements we find, just add + if name == 'host': + self._pop() + elif self.state == 'sysstat': + if name == 'sysstat': + self._pop() + else: + self._error("Unexpected state: %s", self.state) + + def char_data(self, data): + # Simply accumulate all the data given. This method may be called more + # than once between start_element and end_element invocations. + self.data_buf += data + + +def process_fp(fp, p, sparse): + + if _PROFILE: + pr = cProfile.Profile() + pr.enable() + beg = time.time() + try: + p.ParseFile(fp) + except xml.parsers.expat.ExpatError as err: + print("Bad XML: %r" % err, file=sys.stderr) + sparse.exceptions += 1 + # Bulk upload the remainder + sparse._bulk_upload() + end = time.time() + if _PROFILE: + pr.disable() + s = StringIO.StringIO() + sortby = 'cumulative' + ps = pstats.Stats(pr, stream=s).sort_stats(sortby) + ps.print_stats() + print(s.getvalue()) + sys.stdout.flush() + return beg, end + +def call_indexer(file_path=None, _nodename=None, TS_ALL=TS_ALL, + _index_name='', cfg_name=None, run_unique_id=None, + run_md5=None): + if file_path == '-': + fname = '-' + else: + try: + fname = os.path.abspath(file_path) + except IndexError: + print("We need a XML file to process", file=sys.stderr) + sys.exit(1) + else: + # FIXME: This is a bit simplistic + bname = os.path.basename(fname) + if not bname.endswith(('.xml', '.xml.xz')): + print("Are you sure this is an XML file? (%s)" % fname, + file=sys.stderr) + sys.exit(1) + + try: + target_nodename = _nodename.strip() + except IndexError: + print("We need a target nodename to use verifying the XML", file=sys.stderr) + sys.exit(1) + + try: + indexname = _index_name.strip() + except IndexError: + indexname = '' + + # cfg_name = "/etc/pbench-index.cfg" + if cfg_name is None: + print("cfg_name was not supplied. Reading path from VOS_CONFIG_PATH") + cfg_name = os.environ.get('VOS_CONFIG_PATH') + if cfg_name is None: + print("Need VOS_CONFIG_PATH environment variable defined", + file=sys.stderr) + sys.exit(1) + + config = configparser.ConfigParser() + config.read(cfg_name) + + try: + URL = config.get('Server', 'server') + except configparser.NoSectionError: + print("Need a [Server] section with host and port defined in %s" + " configuration file" % cfg_name, file=sys.stderr) + sys.exit(1) + except configparser.NoOptionError: + host = config.get('Server', 'host') + port = config.get('Server', 'port') + else: + host, port = URL.rsplit(':', 1) + hosts = [dict(host=host, port=port, timeout=timeoutobj),] + es = Elasticsearch(hosts, max_retries=0) + + INDEX_PREFIX = config.get('Settings', 'index_prefix') + if indexname: + INDEX_PREFIX += '.%s' % indexname + BULK_ACTION_COUNT = int(config.get('Settings', 'bulk_action_count')) + + # Setup XML element parser + sparse = SysStatParse(fname, target_nodename, es, run_unique_id, + run_md5, INDEX_PREFIX, BULK_ACTION_COUNT) + # Setup XML parser to use our element callback parser + p = xml.parsers.expat.ParserCreate() + p.StartElementHandler = sparse.start_element + p.EndElementHandler = sparse.end_element + p.CharacterDataHandler = sparse.char_data + + if _DEBUG > 0: + print("parsing %s..." % (fname if fname != '-' else '',)) + sys.stdout.flush() + + if fname == '-': + beg, end = process_fp(sys.stdin.buffer, p, sparse) + else: + try: + inf = lzma.LZMAFile(fname, "rb") + with contextlib.closing(inf): + beg, end = process_fp(inf, p, sparse) + except (IOError, lzma.LZMAError): + with open(fname, "r+b") as inf: + mm = mmap.mmap(inf.fileno(), 0) + beg, end = process_fp(mm, p, sparse) + + TS_ALL = sorted(TS_ALL) + + + if _DEBUG > 0: + print("grafana_range_begin %s" % (TS_ALL[0])) + print("grafana_range_end %s" % (TS_ALL[-1])) + print("...parsed %s (%.03f secs)" % + (fname if fname != '-' else '', + end - beg)) + sys.stdout.flush() + + if sparse.exceptions + sparse.errors > 0: + print("ERROR(%s) errors encountered during indexing" % ( + _NAME_,), file=sys.stderr) + if sparse.successes + sparse.duplicates == 0: + # Only return a total failure if no records were actually indexed + # successfully + sys.exit(1) diff --git a/server/pbench/bin/satools/oscode.py b/server/pbench/bin/satools/oscode.py new file mode 100755 index 0000000000..9b0cdfb466 --- /dev/null +++ b/server/pbench/bin/satools/oscode.py @@ -0,0 +1,46 @@ +#!/usr/bin/env python3 + +# From DSA/satools/satools/ +# https://github.com/distributed-system-analysis/satools + +from __future__ import print_function + +import os +import sys + +try: + from sysstat import fetch_fileheader, fetch_os_code, Invalid +except: + from .sysstat import fetch_fileheader, fetch_os_code, Invalid + +def determine_version(file_path=None): + if os.path.getsize(file_path) == 0: + return (False, "Invalid - %s: empty data file" % file_path) + + try: + fm, fh, fa, magic = fetch_fileheader(file_path) + except Invalid as e: + return (False, "Invalid - %s: %s" % (file_path, e)) + + except Exception as e: + return (False, "Error - %s: %s" % (file_path, e)) + + + try: + val = fetch_os_code(magic) + except Invalid as e: + return (False, "Invalid - %s: %s" % (file_path, e)) + + except Exception as e: + return (False, "Error - %s: %s" % (file_path, e)) + + else: + return (True, val) + +if __name__ == '__main__': + res = determine_version(file_path=sys.argv[1]) + if res[0]: + print(res[1]) + else: + print(res[1], file=sys.stderr) + sys.exit(1) diff --git a/server/pbench/bin/satools/sysstat.py b/server/pbench/bin/satools/sysstat.py new file mode 100755 index 0000000000..269f671934 --- /dev/null +++ b/server/pbench/bin/satools/sysstat.py @@ -0,0 +1,1582 @@ +#!/usr/bin/env python3 + +# From DSA/satools/satools/ +# https://github.com/distributed-system-analysis/satools + +""" +A simple module to facilitate reading sa binary data files, providing classes +appropriate to different versions of the various binary data file formats, and +simple methods for dumping the data. + +Test in this tree via: + + for i in samples/sa.* ; do echo $i; python sysstat.py $i ; done + +/* + *************************************************************************** + * Definitions of header structures. + * + * Format of system activity data files (from at least FORMAT_MAGIC 0x2170 and later): + * __ + * | + * | file_magic structure + * | + * |-- + * | + * | file_header structure + * | + * |-- --| + * | | + * | file_activity structure | * sa_nr_act + * | | + * |-- --| + * | | + * | record_header structure | + * | | + * |-- | * + * | | + * | Statistics structures...(*) | + * | | + * |-- --| + * + * (*)Note: If it's a special record, we may find a comment instead of + * statistics (R_COMMENT record type) or the number of CPU items (R_RESTART + * record type). + *************************************************************************** + */ +""" + +import sys, time, os.path, os, lzma +from datetime import datetime +from ctypes import Structure, c_int, c_uint, c_ushort, c_uint8, c_ulong, c_char, c_ulonglong +from abc import ABCMeta, abstractmethod +from contextlib import closing + + +class Corruption(Exception): + """ + sa binary data file is corrupted somehow. + """ + pass + + +class Invalid(Exception): + """ + sa binary data file is corrupted somehow. + """ + pass + + +class Truncated(Exception): + """ + sa binary data file is corrupted somehow. + """ + pass + + +DEBUG = False + +TWO_DAYS_SECONDS = (2 * 24 * 60 * 60) + +#/* Maximum length of a comment */ +MAX_COMMENT_LEN = 64 + +#define UTSNAME_LEN 65 +UTSNAME_LEN = 65 + +#/* Get IFNAMSIZ */ +#include +#ifndef IFNAMSIZ +#define IFNAMSIZ 16 +#endif +#define MAX_IFACE_LEN IFNAMSIZ +MAX_IFACE_LEN = 16 + +#/* +# * Sysstat magic number. Should never be modified. +# * Indicate that the file was created by sysstat. +# */ +##define SYSSTAT_MAGIC 0xd596 +SYSSTAT_MAGIC = 0xd596 + +#/* Record type */ +#/* +# * R_STATS means that this is a record of statistics. +# */ +#define R_STATS 1 +R_STATS = 1 +#/* +# * R_RESTART means that this is a special record containing +# * a LINUX RESTART message. +# */ +#define R_RESTART 2 +R_RESTART = 2 +R_DUMMY = 2 # Under 2169 formats +#/* +# * R_LAST_STATS warns sar that this is the last record to be written +# * to file before a file rotation, and that the next data to come will +# * be a header file. +# * Such a record is tagged R_STATS anyway before being written to file. +# */ +#define R_LAST_STATS 3 +R_LAST_STATS = 3 +#/* +# * R_COMMENT means that this is a special record containing +# * a comment. +# */ +#define R_COMMENT 4 +R_COMMENT = 4 + +# 0x2169 based formats, add "_B" to differentiate. +#/* Define activities */ +A_PROC_B = 0x000001 +A_CTXSW_B = 0x000002 +A_CPU_B = 0x000004 +A_IRQ_B = 0x000008 +A_ONE_IRQ_B = 0x000010 +A_SWAP_B = 0x000020 +A_IO_B = 0x000040 +A_MEMORY_B = 0x000080 +A_SERIAL_B = 0x000100 +A_NET_DEV_B = 0x000200 +A_NET_EDEV_B = 0x000400 +A_DISK_B = 0x000800 +A_PID_B = 0x001000 +A_CPID_B = 0x002000 +A_NET_NFS_B = 0x004000 +A_NET_NFSD_B = 0x008000 +A_PAGE_B = 0x010000 +A_MEM_AMT_B = 0x020000 +A_KTABLES_B = 0x040000 +A_NET_SOCK_B = 0x080000 +A_QUEUE_B = 0x100000 +#define A_LAST 0x100000 + +#/* Activities */ +A_CPU = 1 +A_PCSW = 2 +A_IRQ = 3 +A_SWAP = 4 +A_PAGE = 5 +A_IO = 6 +A_MEMORY = 7 +A_KTABLES = 8 +A_QUEUE = 9 +A_SERIAL = 10 +A_DISK = 11 +A_NET_DEV = 12 +A_NET_EDEV = 13 +A_NET_NFS = 14 +A_NET_NFSD = 15 +A_NET_SOCK = 16 +A_NET_IP = 17 +A_NET_EIP = 18 +A_NET_ICMP = 19 +A_NET_EICMP = 20 +A_NET_TCP = 21 +A_NET_ETCP = 22 +A_NET_UDP = 23 +A_NET_SOCK6 = 24 +A_NET_IP6 = 25 +A_NET_EIP6 = 26 +A_NET_ICMP6 = 27 +A_NET_EICMP6 = 28 +A_NET_UDP6 = 29 +A_PWR_CPUFREQ = 30 + + +class FileMagic(Structure): + """ +As of the final git v10.2.1: + +This structure should only be extended in the future, so can serve as the +generic base structure from which we can interrupt the initial bytes of a file +to determine what the actual format version is for the rest of the file. If it +changes in size, which is likely for 10.3.x and later, we can define another +version for that specific number and insert it in the table below. + +/* + * Datafile format magic number. + * Modified to indicate that the format of the file is + * no longer compatible with that of previous sysstat versions. + */ +#define FORMAT_MAGIC 0x2171 + +/* Structure for file magic header data */ +struct file_magic { + /* + * This field identifies the file as a file created by sysstat. + */ + unsigned short sysstat_magic; + /* + * The value of this field varies whenever datafile format changes. + */ + unsigned short format_magic; + /* + * Sysstat version used to create the file. + */ + unsigned char sysstat_version; + unsigned char sysstat_patchlevel; + unsigned char sysstat_sublevel; + unsigned char sysstat_extraversion; +}; + +#define FILE_MAGIC_SIZE (sizeof(struct file_magic)) + """ + _fields_ = [ ('sysstat_magic', c_ushort), + ('format_magic', c_ushort), + ('sysstat_version', c_uint8), + ('sysstat_patchlevel', c_uint8), + ('sysstat_sublevel', c_uint8), + ('sysstat_extraversion', c_uint8) + ] + #FORMAT_MAGIC = 0x2171 + SIZE = ((2 * 2) + + (4 * 1)) + + def dump(self, *args, **kwargs): + print("file_magic:") + print("\tsysstat_magic = 0x%04x" % self.sysstat_magic) + print("\tformat_magic = 0x%04x" % self.format_magic) + print("\tsysstat_version", repr(self.sysstat_version)) + print("\tsysstat_patchlevel", repr(self.sysstat_patchlevel)) + print("\tsysstat_sublevel", repr(self.sysstat_sublevel)) + print("\tsysstat_extraversion", repr(self.sysstat_extraversion)) + + +class FileHeader(Structure): + """ + File Header structure shared by format versions under a 0xd596 + SYSSTAT_MAGIC value: + + 0x2170 (9.0.0, RHEL 6.x), + 0x1170 (RHEL 6.5+), + 0x2171 (10.1.5, RHEL 7-Beta1, Fedora 19+) + +/* Header structure for system activity data file */ +struct file_header { + /* + * Timestamp in seconds since the epoch. + */ + unsigned long sa_ust_time __attribute__ ((aligned (8))); + /* + * Number of activities saved in the file + */ + unsigned int sa_nr_act __attribute__ ((aligned (8))); + /* + * Current day, month and year. + * No need to save DST (Daylight Saving Time) flag, since it is not taken + * into account by the strftime() function used to print the timestamp. + */ + unsigned char sa_day; + unsigned char sa_month; + unsigned char sa_year; + /* + * Size of a long integer. Useful to know the architecture on which + * the datafile was created. + */ + char sa_sizeof_long; + /* + * Operating system name. + */ + char sa_sysname[UTSNAME_LEN]; + /* + * Machine hostname. + */ + char sa_nodename[UTSNAME_LEN]; + /* + * Operating system release number. + */ + char sa_release[UTSNAME_LEN]; + /* + * Machine architecture. + */ + char sa_machine[UTSNAME_LEN]; +}; + +#define FILE_HEADER_SIZE (sizeof(struct file_header)) + """ + _fields_ = [ ('sa_ust_time', c_ulong), + ('sa_nr_act', c_uint), + ('sa_day', c_uint8), + ('sa_month', c_uint8), + ('sa_year', c_uint8), + ('sa_sizeof_long', c_char), + ('sa_sysname', c_char * UTSNAME_LEN), + ('sa_nodename', c_char * UTSNAME_LEN), + ('sa_release', c_char * UTSNAME_LEN), + ('sa_machine', c_char * UTSNAME_LEN), + ('_alignment_padding', c_uint), # Padding due to alignment of first element? + ] + SIZE = ((1 * 8) + + (1 * 4) + + (4 * 1) + + (4 * UTSNAME_LEN) + + 4) + + def dump(self, format_magic, *args, **kwargs): + print("file_header (0x%04x):" % format_magic) + print("\tsa_ust_time", repr(self.sa_ust_time), datetime.utcfromtimestamp(self.sa_ust_time)) + print("\tsa_nr_act", repr(self.sa_nr_act)) + print("\tsa_day", repr(self.sa_day)) + print("\tsa_month", repr(self.sa_month)) + print("\tsa_year", repr(self.sa_year)) + print("\tsa_sizeof_long", repr(self.sa_sizeof_long)) + print("\tsa_sysname", repr(self.sa_sysname)) + print("\tsa_nodename", repr(self.sa_nodename)) + print("\tsa_release", repr(self.sa_release)) + print("\tsa_machine", repr(self.sa_machine)) + + +class FileStats2169(Structure): + """ + File Header layout for sysstat 7.x.x versions (RHEL 5.x, bascially), that + is, those with a file magic number of 0x2169. + +struct file_stats { + /* --- LONG LONG --- */ + /* Machine uptime (multiplied by the # of proc) */ + unsigned long long uptime __attribute__ ((aligned (16))); + /* Uptime reduced to one processor. Set *only* on SMP machines */ + unsigned long long uptime0 __attribute__ ((aligned (16))); + unsigned long long context_swtch __attribute__ ((aligned (16))); + unsigned long long cpu_user __attribute__ ((aligned (16))); + unsigned long long cpu_nice __attribute__ ((aligned (16))); + unsigned long long cpu_system __attribute__ ((aligned (16))); + unsigned long long cpu_idle __attribute__ ((aligned (16))); + unsigned long long cpu_iowait __attribute__ ((aligned (16))); + unsigned long long cpu_steal __attribute__ ((aligned (16))); + unsigned long long irq_sum __attribute__ ((aligned (16))); + /* --- LONG --- */ + /* Time stamp (number of seconds since the epoch) */ + unsigned long ust_time __attribute__ ((aligned (16))); + unsigned long processes __attribute__ ((aligned (8))); + unsigned long pgpgin __attribute__ ((aligned (8))); + unsigned long pgpgout __attribute__ ((aligned (8))); + unsigned long pswpin __attribute__ ((aligned (8))); + unsigned long pswpout __attribute__ ((aligned (8))); + /* Memory stats in kB */ + unsigned long frmkb __attribute__ ((aligned (8))); + unsigned long bufkb __attribute__ ((aligned (8))); + unsigned long camkb __attribute__ ((aligned (8))); + unsigned long tlmkb __attribute__ ((aligned (8))); + unsigned long frskb __attribute__ ((aligned (8))); + unsigned long tlskb __attribute__ ((aligned (8))); + unsigned long caskb __attribute__ ((aligned (8))); + unsigned long nr_running __attribute__ ((aligned (8))); + unsigned long pgfault __attribute__ ((aligned (8))); + unsigned long pgmajfault __attribute__ ((aligned (8))); + /* --- INT --- */ + unsigned int dk_drive __attribute__ ((aligned (8))); + unsigned int dk_drive_rio __attribute__ ((packed)); + unsigned int dk_drive_wio __attribute__ ((packed)); + unsigned int dk_drive_rblk __attribute__ ((packed)); + unsigned int dk_drive_wblk __attribute__ ((packed)); + unsigned int file_used __attribute__ ((packed)); + unsigned int inode_used __attribute__ ((packed)); + unsigned int super_used __attribute__ ((packed)); + unsigned int super_max __attribute__ ((packed)); + unsigned int dquot_used __attribute__ ((packed)); + unsigned int dquot_max __attribute__ ((packed)); + unsigned int rtsig_queued __attribute__ ((packed)); + unsigned int rtsig_max __attribute__ ((packed)); + unsigned int sock_inuse __attribute__ ((packed)); + unsigned int tcp_inuse __attribute__ ((packed)); + unsigned int udp_inuse __attribute__ ((packed)); + unsigned int raw_inuse __attribute__ ((packed)); + unsigned int frag_inuse __attribute__ ((packed)); + unsigned int dentry_stat __attribute__ ((packed)); + unsigned int load_avg_1 __attribute__ ((packed)); + unsigned int load_avg_5 __attribute__ ((packed)); + unsigned int load_avg_15 __attribute__ ((packed)); + unsigned int nr_threads __attribute__ ((packed)); + unsigned int nfs_rpccnt __attribute__ ((packed)); + unsigned int nfs_rpcretrans __attribute__ ((packed)); + unsigned int nfs_readcnt __attribute__ ((packed)); + unsigned int nfs_writecnt __attribute__ ((packed)); + unsigned int nfs_accesscnt __attribute__ ((packed)); + unsigned int nfs_getattcnt __attribute__ ((packed)); + unsigned int nfsd_rpccnt __attribute__ ((packed)); + unsigned int nfsd_rpcbad __attribute__ ((packed)); + unsigned int nfsd_netcnt __attribute__ ((packed)); + unsigned int nfsd_netudpcnt __attribute__ ((packed)); + unsigned int nfsd_nettcpcnt __attribute__ ((packed)); + unsigned int nfsd_rchits __attribute__ ((packed)); + unsigned int nfsd_rcmisses __attribute__ ((packed)); + unsigned int nfsd_readcnt __attribute__ ((packed)); + unsigned int nfsd_writecnt __attribute__ ((packed)); + unsigned int nfsd_accesscnt __attribute__ ((packed)); + unsigned int nfsd_getattcnt __attribute__ ((packed)); + /* --- CHAR --- */ + /* Record type: R_STATS or R_DUMMY */ + unsigned char record_type __attribute__ ((packed)); + /* + * Time stamp: hour, minute and second. + * Used to determine TRUE time (immutable, non locale dependent time). + */ + unsigned char hour /* (0-23) */ __attribute__ ((packed)); + unsigned char minute /* (0-59) */ __attribute__ ((packed)); + unsigned char second /* (0-59) */ __attribute__ ((packed)); +}; + """ + _fields_ = [ + # --- LONG LONG --- + ('uptime', c_ulonglong), # 8 bytes, 16 byte aligned + ('uptime_padding', c_ulong), # ... 8 bytes of padding + # Uptime reduced to one processor. Set *only* on SMP machines + ('uptime0', c_ulonglong), + ('uptime0_padding', c_ulong), # ... 8 bytes of padding + ('context_swtch', c_ulonglong), + ('context_swtch_padding', c_ulong), # ... 8 bytes of padding + ('cpu_user', c_ulonglong), + ('cpu_user_padding', c_ulong), # ... 8 bytes of padding + + ('cpu_nice', c_ulonglong), + ('cpu_nice_padding', c_ulong), # ... 8 bytes of padding + ('cpu_system', c_ulonglong), + ('cpu_system_padding', c_ulong), # ... 8 bytes of padding + ('cpu_idle', c_ulonglong), + ('cpu_idle_padding', c_ulong), # ... 8 bytes of padding + + ('cpu_iowait', c_ulonglong), + ('cpu_iowait_padding', c_ulong), # ... 8 bytes of padding + ('cpu_steal', c_ulonglong), + ('cpu_steal_padding', c_ulong), # ... 8 bytes of padding + ('irq_sum', c_ulonglong), + ('irq_sum_padding', c_ulong), # ... 8 bytes of padding + # --- LONG --- + # Time stamp (number of seconds since the epoch) + ('ust_time', c_ulong), # 8 bytes, 8 byte aligned + ('processes', c_ulong), + ('pgpgin', c_ulong), + + ('pgpgout', c_ulong), + ('pswpin', c_ulong), + ('pswpout', c_ulong), + # Memory stats in kB + ('frmkb', c_ulong), + ('bufkb', c_ulong), + ('camkb', c_ulong), + ('tlmkb', c_ulong), + ('frskb', c_ulong), + + ('tlskb', c_ulong), + ('caskb', c_ulong), + ('nr_running', c_ulong), + ('pgfault', c_ulong), + ('pgmajfault', c_ulong), + # --- INT --- + ('dk_drive', c_uint), # 4 bytes, packed + ('dk_drive_rio', c_uint), + ('dk_drive_wio', c_uint), + ('dk_drive_rblk', c_uint), + ('dk_drive_wblk', c_uint), + + ('file_used', c_uint), + ('inode_used', c_uint), + ('super_used', c_uint), + ('super_max', c_uint), + ('dquot_used', c_uint), + + ('dquot_max', c_uint), + ('rtsig_queued', c_uint), + ('rtsig_max', c_uint), + ('sock_inuse', c_uint), + ('tcp_inuse', c_uint), + + ('udp_inuse', c_uint), + ('raw_inuse', c_uint), + ('frag_inuse', c_uint), + ('dentry_stat', c_uint), + ('load_avg_1', c_uint), + + ('load_avg_5', c_uint), + ('load_avg_15', c_uint), + ('nr_threads', c_uint), + ('nfs_rpccnt', c_uint), + ('nfs_rpcretrans', c_uint), + + ('nfs_readcnt', c_uint), + ('nfs_writecnt', c_uint), + ('nfs_accesscnt', c_uint), + ('nfs_getattcnt', c_uint), + ('nfsd_rpccnt', c_uint), + + ('nfsd_rpcbad', c_uint), + ('nfsd_netcnt', c_uint), + ('nfsd_netudpcnt', c_uint), + ('nfsd_nettcpcnt', c_uint), + ('nfsd_rchits', c_uint), + + ('nfsd_rcmisses', c_uint), + ('nfsd_readcnt', c_uint), + ('nfsd_writecnt', c_uint), + ('nfsd_accesscnt', c_uint), + ('nfsd_getattcnt', c_uint), + # --- CHAR (uint8) --- + # Record type: R_STATS or R_DUMMY + ('record_type', c_uint8), # 1 byte, packed + # + # Time stamp: hour, minute and second. + # Used to determine TRUE time (immutable, non locale dependent time). + # + ('hour', c_uint8), + ('minute', c_uint8), + ('second', c_uint8), + # + # 12 bytes of padding follow because of initial "__attribute__ ((aligned (16)))" + # + ('_alignment_padding0', c_uint), + ('_alignment_padding1', c_uint), + ] + SIZE = ((10 * 16) # 10 unsigned long longs + + (16 * 8) # 16 unsigned longs + + (40 * 4) # 40 unsigned ints + + (4 * 1) # 4 unsigned chars + + 12) # 12 bytes of padding + + def integrity(self, offset=-1, *args, **kwargs): + for f in self._fields_: + if not f[0].endswith('_padding'): + continue + val = getattr(self, f[0]) + if val == 0: + continue + if DEBUG: + print(repr(f)) + self.dump(verbose=True) + import pdb; pdb.set_trace() + raise Corruption("non-zero filled padding: fs.%s = 0x%0x, offset: %d" % (f[0], val, offset)) + + def dump(self, verbose=False, *args, **kwargs): + print("file_stats: type %d, ts %r" % (self.record_type, time.gmtime(self.ust_time))) + if not verbose: + return + for f in self._fields_: + print("\t%s: %r" % (f[0], repr(getattr(self, f[0])))) + + +class StatsOneCpu2169(Structure): + """ + CPU stats layout for a single CPU for sysstat 7.x.x versions (RHEL 5.x, + bascially). + +struct stats_one_cpu { + unsigned long long per_cpu_idle __attribute__ ((aligned (16))); + unsigned long long per_cpu_iowait __attribute__ ((aligned (16))); + unsigned long long per_cpu_user __attribute__ ((aligned (16))); + unsigned long long per_cpu_nice __attribute__ ((aligned (16))); + unsigned long long per_cpu_system __attribute__ ((aligned (16))); + unsigned long long per_cpu_steal __attribute__ ((aligned (16))); + unsigned long long pad __attribute__ ((aligned (16))); +}; + """ + _fields_ = [ ('per_cpu_idle', c_ulonglong), + ('per_cpu_idle_padding', c_ulong), + ('per_cpu_iowait', c_ulonglong), + ('per_cpu_iowait_padding', c_ulong), + ('per_cpu_user', c_ulonglong), + ('per_cpu_user_padding', c_ulong), + ('per_cpu_nice', c_ulonglong), + ('per_cpu_nice_padding', c_ulong), + ('per_cpu_system', c_ulonglong), + ('per_cpu_system_padding', c_ulong), + ('per_cpu_steal', c_ulonglong), + ('per_cpu_steal_padding', c_ulong), + ('pad', c_ulonglong), + ('pad_padding', c_ulong), + ] + SIZE = (7 * 16) + + def dump(self): + pass + + +class StatsSerial2169(Structure): + """ + Serial stats layout for sysstat 7.x.x versions (RHEL 5.x, bascially). + +struct stats_serial { + unsigned int rx __attribute__ ((aligned (8))); + unsigned int tx __attribute__ ((packed)); + unsigned int frame __attribute__ ((packed)); + unsigned int parity __attribute__ ((packed)); + unsigned int brk __attribute__ ((packed)); + unsigned int overrun __attribute__ ((packed)); + unsigned int line __attribute__ ((packed)); + unsigned char pad[4] __attribute__ ((packed)); +}; + """ + _fields_ = [ ('rx', c_uint), + ('tx', c_uint), + ('frame', c_uint), + ('parity', c_uint), + ('brk', c_uint), + ('overrun', c_uint), + ('line', c_uint), + ('pad', c_uint8 * 4), + ] + SIZE = (7 * 4) + 4 + + def dump(self): + pass + + +NR_IRQS = 256 + +class StatInterrupt2169(Structure): + """ + """ + _fields_ = [ ('interrupt', c_uint * NR_IRQS) ] + + SIZE = 4 * NR_IRQS + + def dump(self): + pass + + +class StatsIrqCpu2169(Structure): + """ + IRQ CPU stats layout for sysstat 7.x.x versions (RHEL 5.x, bascially). + +struct stats_irq_cpu { + unsigned int interrupt __attribute__ ((aligned (8))); + unsigned int irq __attribute__ ((packed)); +}; + """ + _fields_ = [ ('interrupt', c_uint), + ('irq', c_uint), + ] + SIZE = (2 * 4) + + def dump(self): + pass + + +class StatsNetDev2169(Structure): + """ + Net Dev stats layout for sysstat 7.x.x versions (RHEL 5.x, bascially). + +struct stats_net_dev { + unsigned long rx_packets __attribute__ ((aligned (8))); + unsigned long tx_packets __attribute__ ((aligned (8))); + unsigned long rx_bytes __attribute__ ((aligned (8))); + unsigned long tx_bytes __attribute__ ((aligned (8))); + unsigned long rx_compressed __attribute__ ((aligned (8))); + unsigned long tx_compressed __attribute__ ((aligned (8))); + unsigned long multicast __attribute__ ((aligned (8))); + unsigned long collisions __attribute__ ((aligned (8))); + unsigned long rx_errors __attribute__ ((aligned (8))); + unsigned long tx_errors __attribute__ ((aligned (8))); + unsigned long rx_dropped __attribute__ ((aligned (8))); + unsigned long tx_dropped __attribute__ ((aligned (8))); + unsigned long rx_fifo_errors __attribute__ ((aligned (8))); + unsigned long tx_fifo_errors __attribute__ ((aligned (8))); + unsigned long rx_frame_errors __attribute__ ((aligned (8))); + unsigned long tx_carrier_errors __attribute__ ((aligned (8))); + char interface[MAX_IFACE_LEN] __attribute__ ((aligned (8))); +}; + """ + _fields_ = [ ('rx_packets', c_ulong), + ('tx_packets', c_ulong), + ('rx_bytes', c_ulong), + ('tx_bytes', c_ulong), + ('rx_compressed', c_ulong), + ('tx_compressed', c_ulong), + ('multicast', c_ulong), + ('collisions', c_ulong), + ('rx_errors', c_ulong), + ('tx_errors', c_ulong), + ('rx_dropped', c_ulong), + ('tx_dropped', c_ulong), + ('rx_fifo_errors', c_ulong), + ('tx_fifo_errors', c_ulong), + ('rx_frame_errors', c_ulong), + ('tx_carrier_errors', c_ulong), + ('interface', c_char * MAX_IFACE_LEN), + ] + SIZE = ((16 * 8) + + MAX_IFACE_LEN) + + def dump(self): + pass + + +class DiskStats2169(Structure): + """ + Disk stats layout for sysstat 7.x.x versions (RHEL 5.x, bascially). + +struct disk_stats { + unsigned long long rd_sect __attribute__ ((aligned (16))); + unsigned long long wr_sect __attribute__ ((aligned (16))); + unsigned long rd_ticks __attribute__ ((aligned (16))); + unsigned long wr_ticks __attribute__ ((aligned (8))); + unsigned long tot_ticks __attribute__ ((aligned (8))); + unsigned long rq_ticks __attribute__ ((aligned (8))); + unsigned long nr_ios __attribute__ ((aligned (8))); + unsigned int major __attribute__ ((aligned (8))); + unsigned int minor __attribute__ ((packed)); +}; + """ + _fields_ = [ ('rd_sect', c_ulonglong), + ('rd_sect_padding', c_ulong), + ('wr_sect', c_ulonglong), + ('wr_sect_padding', c_ulong), + ('rd_ticks', c_ulong), + ('wr_ticks', c_ulong), + ('tot_ticks', c_ulong), + ('rq_ticks', c_ulong), + ('nr_ios', c_ulong), + ('major', c_uint), + ('minor', c_uint), + ] + SIZE = ((2 * 16) + + (5 * 8) + + (2 * 4)) + + def dump(self): + pass + + +class FileHeader2169(Structure): + """ + File Header layout for sysstat 7.x.x versions (RHEL 5.x, bascially). + +/* System activity data file header */ +struct file_hdr { + /* --- LONG --- */ + /* Time stamp in seconds since the epoch */ + unsigned long sa_ust_time __attribute__ ((aligned (8))); + /* --- INT --- */ + /* Activity flag */ + unsigned int sa_actflag __attribute__ ((aligned (8))); + /* Number of processes to monitor ( {-x | -X } ALL) */ + unsigned int sa_nr_pid __attribute__ ((packed)); + /* Number of interrupts per processor: 2 means two interrupts */ + unsigned int sa_irqcpu __attribute__ ((packed)); + /* Number of disks */ + unsigned int sa_nr_disk __attribute__ ((packed)); + /* Number of processors: + * 0 means 1 proc and non SMP machine + * 1 means 1 proc and SMP machine + * 2 means two proc, etc. + */ + unsigned int sa_proc __attribute__ ((packed)); + /* Number of serial lines: 2 means two lines (ttyS00 and ttyS01) */ + unsigned int sa_serial __attribute__ ((packed)); + /* Number of network devices (interfaces): 2 means two lines */ + unsigned int sa_iface __attribute__ ((packed)); + /* --- SHORT --- */ + /* System activity data file magic number */ + unsigned short sa_magic __attribute__ ((packed)); + /* file_stats structure size */ + unsigned short sa_st_size __attribute__ ((packed)); + /* --- CHAR --- */ + /* + * Current day, month and year. + * No need to save DST (daylight saving time) flag, since it is not taken + * into account by the strftime() function used to print the timestamp. + */ + unsigned char sa_day __attribute__ ((packed)); + unsigned char sa_month __attribute__ ((packed)); + unsigned char sa_year __attribute__ ((packed)); + /* + * Size of a long integer. Useful to know the architecture on which + * the datafile was created. + */ + char sa_sizeof_long __attribute__ ((packed)); + /* Operating system name */ + char sa_sysname[UTSNAME_LEN] __attribute__ ((packed)); + /* Machine hostname */ + char sa_nodename[UTSNAME_LEN] __attribute__ ((packed)); + /* Operating system release number */ + char sa_release[UTSNAME_LEN] __attribute__ ((packed)); +}; + +#define FILE_HDR_SIZE (sizeof(struct file_hdr)) + """ + _fields_ = [ ('sa_ust_time', c_ulong), + ('sa_actflag', c_uint), + ('sa_nr_pid', c_uint), + ('sa_irqcpu', c_uint), + ('sa_nr_disk', c_uint), + ('sa_proc', c_uint), + ('sa_serial', c_uint), + ('sa_iface', c_uint), + ('sa_magic', c_ushort), + ('sa_st_size', c_ushort), + ('sa_day', c_uint8), + ('sa_month', c_uint8), + ('sa_year', c_uint8), + ('sa_sizeof_long', c_char), + ('sa_sysname', c_char * UTSNAME_LEN), + ('sa_nodename', c_char * UTSNAME_LEN), + ('sa_release', c_char * UTSNAME_LEN), + ('padding', c_char), + ] + SIZE = (8 + + (7 * 4) + + (2 * 2) + + (4 * 1) + + (3 * UTSNAME_LEN) + + 1) + + def dump(self, *args, **kwargs): + print("file_header (0x%04x):" % (self.sa_magic,)) + print("\tsa_ust_time", repr(self.sa_ust_time), datetime.utcfromtimestamp(self.sa_ust_time)) + print("\tsa_actflag", repr(self.sa_actflag)) + print("\tsa_nr_pid", repr(self.sa_nr_pid)) + print("\tsa_irqcpu", repr(self.sa_irqcpu)) + print("\tsa_nr_disk", repr(self.sa_nr_disk)) + print("\tsa_proc", repr(self.sa_proc)) + print("\tsa_serial", repr(self.sa_serial)) + print("\tsa_iface", repr(self.sa_iface)) + print("\tsa_magic 0x%04x" % self.sa_magic) + print("\tsa_st_size", repr(self.sa_st_size)) + print("\tsa_day", repr(self.sa_day)) + print("\tsa_month", repr(self.sa_month)) + print("\tsa_year", repr(self.sa_year)) + print("\tsa_sizeof_long", repr(self.sa_sizeof_long)) + print("\tsa_sysname", repr(self.sa_sysname)) + print("\tsa_nodename", repr(self.sa_nodename)) + print("\tsa_release", repr(self.sa_release)) + + +class FileHeaderOldGeneric(Structure): + """ + Old style sa datafiles has the magic value at offset 36 (both for 32 and + 64 bits). + """ + _fields_ = [ ('padding', c_char * 36), + ('sa_magic', c_ushort), + ] + SIZE = 36 + 2 + + +def check_readinto(obj, ret): + if ret != obj.SIZE: + if DEBUG: + import pdb; pdb.set_trace() + raise Truncated("Read %d, expected to read %d" % (ret, obj.SIZE)) + + +def check_timestamp(fh, rh, prev_rh): + if rh.ust_time == 0: + raise Corruption("Record timestamp is zero") + if (rh.ust_time - fh.sa_ust_time) < -60: + # We have seen cases where the file header is one second later than + # the first record, which is odd, but okay. So we only consider this + # to be invalid if the header is more than a minute later than a + # record. + raise Invalid("Binary data file record, %s, earlier than header, %s" % ( + time.strftime("%c", time.gmtime(rh.ust_time)), time.strftime("%c", time.gmtime(fh.sa_ust_time)))) + if prev_rh: + if rh.ust_time < prev_rh.ust_time: + raise Invalid("Binary data file record, %s, earlier than previous, %s" % ( + time.strftime("%c", time.gmtime(rh.ust_time)), time.strftime("%c", time.gmtime(prev_rh.ust_time)))) + if (rh.ust_time - prev_rh.ust_time) > TWO_DAYS_SECONDS: + raise Invalid("Binary data file record, %s, greater than two days from previous, %s" % ( + time.strftime("%c", time.gmtime(rh.ust_time)), time.strftime("%c", time.gmtime(prev_rh.ust_time)))) + else: + if (rh.ust_time - fh.sa_ust_time) > TWO_DAYS_SECONDS: + raise Invalid("Binary data file record, %s, greater than two days from header, %s" % ( + time.strftime("%c", time.gmtime(rh.ust_time)), time.strftime("%c", time.gmtime(fh.sa_ust_time)))) + + +def read_extra_stats2169(fp, fh, wl=None): + """ + These structures appear optionally, but always in this order. + """ + total_read = 0 + if fh.sa_proc: + c_read = 0 + for i in range(fh.sa_proc): + cpu_stats = StatsOneCpu2169() + ret = fp.readinto(cpu_stats) + check_readinto(cpu_stats, ret) + total_read += ret + c_read += ret + cpu_stats.dump() + if wl is not None: + wl.append(cpu_stats) + if DEBUG: + print("c_read = ", c_read) + if (fh.sa_actflag & A_ONE_IRQ_B) == A_ONE_IRQ_B: + interrupt_stats = StatInterrupt2169() + ret = fp.readinto(interrupt_stats) + check_readinto(interrupt_stats, ret) + if DEBUG: + print("int_read = ", ret) + total_read += ret + interrupt_stats.dump() + if wl is not None: + wl.append(interrupt_stats) + if fh.sa_serial: + s_read = 0 + for i in range(fh.sa_serial): + serial_stats = StatsSerial2169() + ret = fp.readinto(serial_stats) + check_readinto(serial_stats, ret) + s_read += ret + total_read += ret + serial_stats.dump() + if wl is not None: + wl.append(serial_stats) + if DEBUG: + print("s_read = ", s_read) + if fh.sa_irqcpu: + i_read = 0 + for i in range(fh.sa_proc * fh.sa_irqcpu): + irq_cpu_stats = StatsIrqCpu2169() + ret = fp.readinto(irq_cpu_stats) + check_readinto(irq_cpu_stats, ret) + i_read += ret + total_read += ret + irq_cpu_stats.dump() + if wl is not None: + wl.append(irq_cpu_stats) + if DEBUG: + print("i_read = ", i_read) + if fh.sa_iface: + if_read = 0 + for i in range(fh.sa_iface): + net_dev_stats = StatsNetDev2169() + ret = fp.readinto(net_dev_stats) + check_readinto(net_dev_stats, ret) + if_read += ret + total_read += ret + net_dev_stats.dump() + if wl is not None: + wl.append(net_dev_stats) + if DEBUG: + print("if_read = ", if_read) + if fh.sa_nr_disk: + d_read = 0 + for i in range(fh.sa_nr_disk): + disk_stats = DiskStats2169() + ret = fp.readinto(disk_stats) + check_readinto(disk_stats, ret) + d_read += ret + total_read += ret + disk_stats.dump() + if wl is not None: + wl.append(disk_stats) + if DEBUG: + print("d_read = ", d_read) + return total_read + + +def process_file_2169(fp, fm, fh, fa, magic, callback=None): + assert hasattr(fp, 'readinto') + assert fm is None + assert isinstance(fh, Structure) + assert fa is None + assert magic == 0x2169 + + assert FileHeader2169.SIZE == 240, "FileHeader2169.SIZE (%d) != 240" % FileHeader2169.SIZE + assert FileStats2169.SIZE == 464, "FileStats2169.SIZE (%d) != 464" % FileStats2169.SIZE + assert StatsOneCpu2169.SIZE == 112, "StatsOneCpu2169.SIZE (%d) != 112" % StatsOneCpu2169.SIZE + assert StatsSerial2169.SIZE == 32, "StatsSerial2169.SIZE (%d) != 32" % StatsSerial2169.SIZE + assert StatsIrqCpu2169.SIZE == 8, "StatsIrqCpu2169.SIZE (%d) != 8" % StatsIrqCpu2169.SIZE + assert StatsNetDev2169.SIZE == 144, "StatsNetDev2169.SIZE (%d) != 144" % StatsNetDev2169.SIZE + assert DiskStats2169.SIZE == 80, "DiskStats2169.SIZE (%d) != 80" % DiskStats2169.SIZE + + if FileStats2169.SIZE != fh.sa_st_size: + # If the file header is not valid, we're done + if DEBUG: + import pdb; pdb.set_trace() + raise Invalid( + "Invalid file header structure encountered," + " expected sizeof(struct file_stats) == %d" + " for magic 0x2169, but found .sa_st_size = %d" % ( + FileStats2169.SIZE, fh.sa_st_size)) + + if callback is not None: + callback.start(file_header=fh) + + try: + prev_fs = None + fs = None + while True: + prev_fs = fs + fs = FileStats2169() + try: + ret = fp.readinto(fs) + except Exception: + if DEBUG: + import pdb; pdb.set_trace() + raise + else: + if ret == 0: + # Indicates EOF + break + else: + check_readinto(fs, ret) + fs.integrity(fp.tell() - fs.SIZE) + check_timestamp(fh, fs, prev_fs) + if fs.record_type == R_DUMMY: + if callback is not None: + callback.handle_record(fs, record_payload=None) + continue + if callback is not None: + write_list = [] + else: + write_list = None + ret = read_extra_stats2169(fp, fh, write_list) + if callback is not None: + callback.handle_record(fs, record_payload=write_list) + finally: + if callback is not None: + callback.end() + + +class RecordHeader2170(Structure): + """ +/* Header structure for every record */ +struct record_header { + /* + * Machine uptime (multiplied by the # of proc). + */ + unsigned long long uptime __attribute__ ((aligned (16))); + /* + * Uptime reduced to one processor. Always set, even on UP machines. + */ + unsigned long long uptime0 __attribute__ ((aligned (16))); + /* + * Timestamp (number of seconds since the epoch). + */ + unsigned long ust_time __attribute__ ((aligned (16))); + /* + * Record type: R_STATS, R_RESTART,... + */ + unsigned char record_type __attribute__ ((aligned (8))); + /* + * Timestamp: Hour (0-23), minute (0-59) and second (0-59). + * Used to determine TRUE time (immutable, non locale dependent time). + */ + unsigned char hour; + unsigned char minute; + unsigned char second; +}; + +#define RECORD_HEADER_SIZE (sizeof(struct record_header)) + """ + _fields_ = [ ('uptime', c_ulonglong), # 8 bytes, __attribute__ ((aligned (16))) + ('uptime_padding', c_ulong), # ... 8 bytes of padding for next alignment + ('uptime0', c_ulonglong), # 8 bytes, __attribute__ ((aligned (16))) + ('uptime0_padding', c_ulong), # ... 8 bytes of padding for next alignment + ('ust_time', c_ulong), # 8 bytes, __attribute__ ((aligned (16))) + ('record_type', c_uint8), # 1 byte, __attribute__ ((aligned (8))); + ('hour', c_uint8), # 1 byte + ('minute', c_uint8), # 1 byte + ('second', c_uint8), # 1 byte + ('_alignment_padding', c_uint), # 4 bytes of padding due to alignment? + ] + SIZE = ((2 * 16) + + (1 * 8) + + (4 * 1) + + (1 * 4)) + + def integrity(self, offset=-1, *args, **kwargs): + for f in self._fields_: + if not f[0].endswith('_padding'): + continue + val = getattr(self, f[0]) + if val == 0: + continue + if DEBUG: + print(repr(f)) + self.dump(verbose=True) + import pdb; pdb.set_trace() + raise Corruption("non-zero filled padding: rh.%s = 0x%0x, offset: %d" % (f[0], val, offset)) + + def dump(self, verbose=False, *args, **kwargs): + print("record_header: type %d, ts %r" % (self.record_type, time.gmtime(self.ust_time))) + if not verbose: + return + for f in self._fields_: + print("\t%s: %r" % (f[0], repr(getattr(self, f[0])))) + + +class FileActivitySummary(object): + def __init__(self, fa, total_len): + self.fa = fa + self.total_len = total_len + + def dump(self, *args, **kwargs): + print("file activity summary, total length: %d" % (self.total_len,)) + + + +class FileActivity2170(Structure): + """ +/* List of activities saved in file */ +struct file_activity { + /* + * Identification value of activity. + */ + unsigned int id __attribute__ ((aligned (4))); + /* + * Number of items for this activity. + */ + __nr_t nr __attribute__ ((packed)); + /* + * Size of an item structure. + */ + int size __attribute__ ((packed)); +}; + """ + _fields_ = [ ('id', c_uint), + ('nr', c_int), + ('size', c_int), + ] + SIZE = 3 * 4 + + +def get_file_activity_2170(fp, fh): + # Read file activities + a_cpu = False + file_activities = [] + total_len = 0 + for i in range(fh.sa_nr_act): + act = FileActivity2170() + ret = fp.readinto(act) + check_readinto(act, ret) + if act.nr <= 0: + if DEBUG: + import pdb; pdb.set_trace() + print(repr(act)) + raise Invalid("activity count %d <= 0" % act.nr) + file_activities.append(act) + if act.id == A_CPU: + a_cpu = True + total_len += (act.nr * act.size) + + if not a_cpu: + if DEBUG: + import pdb; pdb.set_trace() + raise Invalid("expected CPU activity") + + return FileActivitySummary(file_activities, total_len) + + +def get_file_activity_1170(fp, fh): + return get_file_activity_2170(fp, fh) + + +def process_file_2170(fp, fm, fh, fa, magic, callback=None): + assert hasattr(fp, 'readinto') + assert isinstance(fm, Structure) + assert isinstance(fh, Structure) + assert isinstance(fa, FileActivitySummary) + assert magic == 0x1170 or magic > 0x2169 + + if callback is not None: + callback.start(file_magic=fm, file_header=fh, file_activities=fa) + + try: + prev_rh = None + rh = None + while True: + prev_rh = rh + rh = RecordHeader2170() + try: + ret = fp.readinto(rh) + except Exception: + if DEBUG: + import pdb; pdb.set_trace() + raise + else: + if ret == 0: + # Indicates EOF + break + else: + check_readinto(rh, ret) + rh.integrity(fp.tell() - rh.SIZE) + try: + check_timestamp(fh, rh, prev_rh) + except Invalid: + if callback is not None: + do_raise = callback.handle_invalid(rh, prev_rh) + else: + do_raise = True + if do_raise: + raise + + if rh.record_type == R_COMMENT: + fc = bytearray(MAX_COMMENT_LEN) + ret = fp.readinto(fc) + if ret != MAX_COMMENT_LEN: + if DEBUG: + import pdb; pdb.set_trace() + raise Truncated("Could not read entire comment," + " read %d, expected %d" % (ret, MAX_COMMENT_LEN)) + if callback is not None: + callback.handle_record(rh, record_payload=fc) + continue + elif rh.record_type == R_RESTART: + if callback is not None: + callback.handle_record(rh, record_payload=None) + continue + act_buf = bytearray(fa.total_len) + ret = fp.readinto(act_buf) + if ret != fa.total_len: + if DEBUG: + import pdb; pdb.set_trace() + raise Truncated("Could not read all activities," + " read %d, expected records of size %d" % (ret, fa.total_len)) + if callback is not None: + callback.handle_record(rh, record_payload=act_buf) + finally: + if callback is not None: + callback.end() + + +def process_file_1170(fp, fm, fh, fa, magic, callback=None): + """ + For some reason, RHEL 6.5 patch sysstat to mark a changed file format + using 0x1170. It is not clear where the format change came from, or what + the difference is, but it did not affect sizing or layouts, as far as we + can tell. + """ + process_file_2170(fp, fm, fh, fa, magic, callback=callback) + + +ACTIVITY_MAGIC_BASE = 0x8a +ACTIVITY_MAGIC_UNKNOWN = 0x89 + +class FileActivity2171(Structure): + """ +/* + * Base magical number for activities. + */ +#define ACTIVITY_MAGIC_BASE 0x8a +/* + * Magical value used for activities with + * unknown format (used for sadf -H only). + */ +#define ACTIVITY_MAGIC_UNKNOWN 0x89 + +/* List of activities saved in file */ +struct file_activity { + /* + * Identification value of activity. + */ + unsigned int id __attribute__ ((aligned (4))); + /* + * Activity magical number. + */ + unsigned int magic __attribute__ ((packed)); + /* + * Number of items for this activity. + */ + __nr_t nr __attribute__ ((packed)); + /* + * Number of sub-items for this activity. + */ + __nr_t nr2 __attribute__ ((packed)); + /* + * Size of an item structure. + */ + int size __attribute__ ((packed)); +}; + +#define FILE_ACTIVITY_SIZE (sizeof(struct file_activity)) + """ + _fields_ = [ ('id', c_uint), + ('magic', c_int), + ('nr', c_int), + ('nr2', c_int), + ('size', c_int), + ] + SIZE = 5 * 4 + + +def get_file_activity_2171(fp, fh): + # Read file activities + a_cpu = False + file_activities = [] + total_len = 0 + for i in range(fh.sa_nr_act): + act = FileActivity2171() + ret = fp.readinto(act) + check_readinto(act, ret) + if act.nr <= 0 or act.nr2 <= 0: + if DEBUG: + import pdb; pdb.set_trace() + print(repr(act)) + raise Invalid("activity counts: (nr %d or nr2 %d) <= 0" % (act.nr, act.nr2)) + file_activities.append(act) + if act.id == A_CPU: + a_cpu = True + total_len += (act.nr * act.nr2 * act.size) + + if not a_cpu: + if DEBUG: + import pdb; pdb.set_trace() + raise Invalid("expected CPU activity") + + return FileActivitySummary(file_activities, total_len) + + +def process_file_2171(fp, fm, fh, fa, magic, callback=None): + """ + While the format magic has changed, the actual on-disk format is not so + different since the activities have already been processed. Since all the + record reads are performed using the values stored in the activity set, + the processing remains the same. + """ + process_file_2170(fp, fm, fh, fa, magic, callback=callback) + + +class_map = { + 0x2169: { + "file_magic": None, + "file_header": FileHeader2169, + "process_file": process_file_2169, + "file_activity": None, + "os-code": "5x", + "rpm-versions": ("sysstat-7.0.0-3.el5", + "sysstat-7.0.2-1.el5", + "sysstat-7.0.2-3.el5", + "sysstat-7.0.2-3.el5_5.1", + "sysstat-7.0.2-11.el5", + "sysstat-7.0.2-12.el5",), + }, + 0x2170: { + "file_magic": FileMagic, + "file_header": FileHeader, + "process_file": process_file_2170, + "file_activity": get_file_activity_2170, + "os-code": "64", + "rpm-versions": ("sysstat-9.0.4-11.el6", + "sysstat-9.0.4-18.el6", + "sysstat-9.0.4-20.el6",), + }, + 0x1170: { + "file_magic": FileMagic, + "file_header": FileHeader, + "process_file": process_file_1170, + "file_activity": get_file_activity_1170, + "os-code": "65", + "rpm-versions": ("sysstat-9.0.4-22.el6",), + }, + 0x2171: { + "file_magic": FileMagic, + "file_header": FileHeader, + "process_file": process_file_2171, + "file_activity": get_file_activity_2171, + "os-code": "f19", + "rpm-versions": ("sysstat-10.1.5-1.el7",), + }, + } + + +def fetch_fileheader_with_fp(fp): + fm = FileMagic() + ret = fp.readinto(fm) + check_readinto(fm, ret) + + fp.seek(0) # Reset to the beginning to read into the proper structure below. + if fm.sysstat_magic == SYSSTAT_MAGIC: + # We have a 9.0.0 and later version + try: + the_class_map = class_map[fm.format_magic] + except KeyError: + raise Invalid("Unrecognized new format magic: 0x%04d" % fm.format_magic) + else: + magic = fm.format_magic + else: + # Now we have an old style sa binary data file, where the file + # header comes first, and the sa_magic field is at a defined + # offset inside that header. + fh = FileHeaderOldGeneric() + ret = fp.readinto(fh) + check_readinto(fh, ret) + try: + the_class_map = class_map[fh.sa_magic] + except KeyError: + raise Invalid("Unrecognized old sa magic: 0x%04d" % fh.sa_magic) + else: + magic = fh.sa_magic + # Will need to re-read from the beginning of the file to get the right + # mappings. + fp.seek(0) + + try: + fm = the_class_map['file_magic']() + except TypeError: + fm = None + else: + try: + ret = fp.readinto(fm) + check_readinto(fm, ret) + except Exception as err: + raise Invalid("Error reading file header: %s" % err) + + try: + fh = the_class_map['file_header']() + ret = fp.readinto(fh) + check_readinto(fh, ret) + except Exception as err: + raise Invalid("Error reading file header: %s" % err) + + try: + fa = the_class_map['file_activity'](fp, fh) + except TypeError: + fa = None + except Exception as err: + raise Invalid("Error reading file activities: %s" % err) + + return fm, fh, fa, magic + + +class ContentAction(object): + """ + The callback object argument of the module method, verify_contents(), + expects with these four methods. + """ + __metaclass__ = ABCMeta + + @abstractmethod + def start(self, file_magic=None, file_header=None, file_activities=None): + """ + Start the handling of a binary data file. The caller optionally + provides the file_magic record, always provides the required + file_header record, and optionally provides the file activities. + """ + pass + + @abstractmethod + def handle_record(self, record_header, record_payload=None): + """ + Handle a record header, along with its optional payload. + """ + pass + + @abstractmethod + def handle_invalid(self, record_header, prev_record_header): + """ + An invalid record header was encountered, the previous one is also + provided for the callee to inspect. If this method returns True, the + processing will continue the exception, if False, the exception will + be swallowed. + """ + pass + + @abstractmethod + def end(self): + """ + By hook, or by crook, we have reached the end of the binary data + file. No other methods will be invoked on the given object instance + after this invocation. + """ + pass + + +def verify_contents_fp(fp, tgt_hostname, callback): + fm, fh, fa, magic = fetch_fileheader_with_fp(fp) + try: + the_class_map = class_map[magic] + except KeyError: + raise Invalid("Unrecognized old sa magic: 0x%04d" % magic) + else: + if tgt_hostname and (tgt_hostname != fh.sa_nodename.decode('utf-8')): + raise Invalid("Target host name, %s, does not match file header node name, %s" % (tgt_hostname, fh.sa_nodename)) + process_file = the_class_map['process_file'] + process_file(fp, fm, fh, fa, magic, callback=callback) + + +def verify_contents(thefile, tgt_hostname=None, callback=None): + """ + Given a sysstat binary data file verify that it contains a set of well + formed data values. + + The optional 'tgt_hostname' argument is checked against the file header's + stored hostname value. + + The optional 'callback' argument, if provided, should be an instance of + the ContentAction class, where for each magic structure, file header, file + activity set, record header and record payload read the appropriate method + will be invoked, with the 'eof' method invoked at the end. + + One of the following exceptions will be raised if a problem is found with + the file: + + Invalid: The file header or record header metadata values do not make + sense in relation to each other + + Corruption: The file appears to be corrupted in some way + + Truncated: The file does not appear to contain all the data as + described by the file header or a given record header + """ + try: + with lzma.open(thefile, "rb") as fp: + verify_contents_fp(fp, tgt_hostname, callback) + except lzma.LZMAError: + with open(thefile, "rb") as fp: + verify_contents_fp(fp, tgt_hostname, callback) + + +def fetch_os_code(magic): + """ + Given a sysstat magic number value, return the "OS code" that maps to + version of Fedora or RHEL. + """ + try: + the_class_map = class_map[magic] + except KeyError: + raise Invalid("Unrecognized old sa magic: 0x%04d" % magic) + else: + return the_class_map['os-code'] + + +def fetch_fileheader(thefile): + """ + Fetch the sysstat FileHeader object for the given file path. + """ + try: + with lzma.open(thefile, "rb") as fp: + res = fetch_fileheader_with_fp(fp) + except lzma.LZMAError: + with open(thefile, "rb") as fp: + res = fetch_fileheader_with_fp(fp) + return res + + +if __name__ == "__main__": + # When invoked as a progrem, we'll just check the first argument to see + # that is has data in it, and if so, we'll process the header, fetch the + # OS code, and verify its contents. + + if os.path.getsize(sys.argv[1]) == 0: + print("Invalid - %s: empty data file" % (sys.argv[1],), file=sys.stderr) + sys.exit(1) + + try: + fm, fh, fa, magic = fetch_fileheader(sys.argv[1]) + except Invalid as e: + print("Invalid - %s: %s" % (sys.argv[1], e), file=sys.stderr) + sys.exit(1) + except Exception as e: + print("Error - %s: %s" % (sys.argv[1], e), file=sys.stderr) + sys.exit(1) + else: + if DEBUG or 1: + fm.dump() + fh.dump(fm.format_magic) + fa.dump() + val = fetch_os_code(magic) + print("os_code = ", val) + + try: + verify_contents(sys.argv[1]) + except Invalid as e: + print("Invalid - %s: %s" % (sys.argv[1], e), file=sys.stderr) + sys.exit(1) + except Corruption as e: + print("Corrupted - %s: %s" % (sys.argv[1], e), file=sys.stderr) + sys.exit(1) + except Truncated as e: + sys.exit(1) diff --git a/server/pbench/bin/state/test-11.tar.xz b/server/pbench/bin/state/test-11.tar.xz new file mode 100644 index 0000000000..4426cfe407 Binary files /dev/null and b/server/pbench/bin/state/test-11.tar.xz differ diff --git a/server/pbench/bin/state/test-7.8.tar.xz b/server/pbench/bin/state/test-7.8.tar.xz index 54da30e831..58089a69d8 100644 Binary files a/server/pbench/bin/state/test-7.8.tar.xz and b/server/pbench/bin/state/test-7.8.tar.xz differ diff --git a/server/pbench/bin/unittests b/server/pbench/bin/unittests index 8f62de1fad..90af2425fa 100755 --- a/server/pbench/bin/unittests +++ b/server/pbench/bin/unittests @@ -169,6 +169,7 @@ declare -A cmds=( [test-7.5]="_run_index $_testroot/test-7.5" [test-7.6]="_run_index $_testroot/test-7.6" [test-7.7]="_run_index $_testroot/test-7.7" + # uperf tarball [test-7.8]="_run_index $_testroot/test-7.8" @@ -189,6 +190,9 @@ declare -A cmds=( # pbench-rsync-satellite [test-10]="_run pbench-rsync-satellite TEST foo.bar.com $_testroot/pbench/archive" + + # verify special _special_paths in 'toc' (like sar.data) are processed + [test-11]="_run_index $_testroot/test-11" ) declare -A links=( @@ -218,4 +222,3 @@ for test in $tests ;do _reset_state done exit $failures - diff --git a/server/pbench/lib/config/pbench-index.cfg.example b/server/pbench/lib/config/pbench-index.cfg.example index 56024d4e34..bbc70c9615 100644 --- a/server/pbench/lib/config/pbench-index.cfg.example +++ b/server/pbench/lib/config/pbench-index.cfg.example @@ -11,3 +11,6 @@ bulk_action_count = 2000 # defaults for shards per index or replica count per index. #number_of_shards = 1 #number_of_replicas = 0 + +[SAR] +sadf_binaries_path = /usr/local/bin/ diff --git a/server/pbench/lib/mappings/sar.json b/server/pbench/lib/mappings/sar.json new file mode 100644 index 0000000000..84b7f5d7b4 --- /dev/null +++ b/server/pbench/lib/mappings/sar.json @@ -0,0 +1,300 @@ +{ + "sar": { + "_timestamp": { + "enabled": true, + "store": true, + "fielddata": { + "format": "doc_values" + } + }, + "properties": { + "_metadata": { + "properties": { + "file-date": { + "type": "date", + "format": "yyyy-MM-dd", + "fielddata": { + "format": "doc_values" + } + }, + "pbench_run_unique_id": { + "type": "string", + "index": "not_analyzed", + "fielddata": { + "format": "doc_values" + } + }, + "pbench_run_md5": { + "type": "string", + "index": "not_analyzed", + "fielddata": { + "format": "doc_values" + } + }, + "machine": { + "type": "string", + "index": "not_analyzed", + "fielddata": { + "format": "doc_values" + } + }, + "nodename": { + "type": "string", + "index": "not_analyzed", + "fielddata": { + "format": "doc_values" + } + }, + "number-of-cpus": { + "type": "integer", + "fielddata": { + "format": "doc_values" + } + }, + "release": { + "type": "string", + "index": "not_analyzed", + "fielddata": { + "format": "doc_values" + } + }, + "sysdata-version": { + "type": "float", + "fielddata": { + "format": "doc_values" + } + }, + "sysname": { + "type": "string", + "index": "not_analyzed", + "fielddata": { + "format": "doc_values" + } + } + } + }, + "timestamp": { + "properties": { + "date": { + "type": "date", + "format": "yyyy-MM-dd", + "fielddata": { + "format": "doc_values" + } + }, + "time": { + "type": "date", + "format": "HH:mm:ss", + "fielddata": { + "format": "doc_values" + } + } + } + }, + "cpu-load": { + "type": "nested", + "properties": { + "cpu": { + "type": "string", + "index": "not_analyzed", + "fielddata": { + "format": "doc_values" + } + } + } + }, + "cpu-load-all": { + "type": "nested", + "properties": { + "cpu": { + "type": "string", + "index": "not_analyzed", + "fielddata": { + "format": "doc_values" + } + } + } + }, + "disk": { + "type": "nested", + "properties": { + "disk-device": { + "type": "string", + "index": "not_analyzed", + "fielddata": { + "format": "doc_values" + } + } + } + }, + "interrupts": { + "type": "nested", + "properties": { + "intr": { + "type": "string", + "index": "not_analyzed", + "fielddata": { + "format": "doc_values" + } + } + } + }, + "interrupts-processor": { + "type": "nested", + "properties": { + "intr": { + "type": "string", + "index": "not_analyzed", + "fielddata": { + "format": "doc_values" + } + }, + "cpu": { + "type": "string", + "index": "not_analyzed", + "fielddata": { + "format": "doc_values" + } + } + } + }, + "filesystems": { + "type": "nested", + "properties": { + "filesystem": { + "type": "string", + "index": "not_analyzed", + "fielddata": { + "format": "doc_values" + } + } + } + }, + "network": { + "properties": { + "net-dev": { + "type": "nested", + "properties": { + "iface": { + "type": "string", + "index": "not_analyzed", + "fielddata": { + "format": "doc_values" + } + } + } + }, + "net-edev": { + "type": "nested", + "properties": { + "iface": { + "type": "string", + "index": "not_analyzed", + "fielddata": { + "format": "doc_values" + } + } + } + } + } + }, + "power-management": { + "properties": { + "cpu-frequency": { + "type": "nested", + "properties": { + "number": { + "type": "string", + "index": "not_analyzed", + "fielddata": { + "format": "doc_values" + } + } + } + }, + "fan-speed": { + "type": "nested", + "properties": { + "device": { + "type": "string" + }, + "number": { + "type": "string", + "index": "not_analyzed", + "fielddata": { + "format": "doc_values" + } + } + } + }, + "temperature": { + "type": "nested", + "properties": { + "device": { + "type": "string" + }, + "number": { + "type": "string", + "index": "not_analyzed", + "fielddata": { + "format": "doc_values" + } + } + } + }, + "voltage-input": { + "type": "nested", + "properties": { + "device": { + "type": "string" + }, + "number": { + "type": "string", + "index": "not_analyzed", + "fielddata": { + "format": "doc_values" + } + } + } + }, + "usb-devices": { + "type": "nested", + "properties": { + "bus_number": { + "type": "string", + "index": "not_analyzed", + "fielddata": { + "format": "doc_values" + } + }, + "idvendor": { + "type": "string" + }, + "idprod": { + "type": "string" + }, + "manufact": { + "type": "string" + }, + "product": { + "type": "string" + } + } + } + } + }, + "serial": { + "type": "nested", + "properties": { + "line": { + "type": "string", + "index": "not_analyzed", + "fielddata": { + "format": "doc_values" + } + } + } + } + } + } +} diff --git a/server/pbench/lib/vos/__init__.py b/server/pbench/lib/vos/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/server/pbench/lib/vos/analysis/__init__.py b/server/pbench/lib/vos/analysis/__init__.py deleted file mode 100644 index e69de29bb2..0000000000