Skip to content

Commit

Permalink
Merge pull request #38 from asam-ev/2024-07-30
Browse files Browse the repository at this point in the history
Add 5 ASAM OpenScenario XML checks
  • Loading branch information
andreaskern74 authored Aug 5, 2024
2 parents 5a0a1a3 + 821caf1 commit a830ded
Show file tree
Hide file tree
Showing 37 changed files with 3,020 additions and 26 deletions.
4 changes: 4 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from qc_openscenario.checks.basic_checker import basic_checker
from qc_openscenario.checks.reference_checker import reference_checker
from qc_openscenario.checks.parameters_checker import parameters_checker
from qc_openscenario.checks.data_type_checker import data_type_checker

logging.basicConfig(format="%(asctime)s - %(message)s", level=logging.INFO)

Expand Down Expand Up @@ -59,6 +60,9 @@ def main():
# 4. Run parameters checks
parameters_checker.run_checks(checker_data)

# 5. Run data_type checks
data_type_checker.run_checks(checker_data)

result.write_to_file(
config.get_checker_bundle_param(
checker_bundle_name=constants.BUNDLE_NAME, param_name="resultFile"
Expand Down
7 changes: 7 additions & 0 deletions qc_openscenario/checks/data_type_checker/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from . import data_type_constants as data_type_constants
from . import data_type_checker as data_type_checker
from . import allowed_operators as allowed_operators
from . import (
non_negative_transition_time_in_light_state_action as non_negative_transition_time_in_light_state_action,
)
from . import positive_duration_in_phase as positive_duration_in_phase
182 changes: 182 additions & 0 deletions qc_openscenario/checks/data_type_checker/allowed_operators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
import os, logging

from dataclasses import dataclass
from typing import List, Union

from lxml import etree

from qc_baselib import Configuration, Result, IssueSeverity

from qc_openscenario import constants
from qc_openscenario.schema import schema_files
from qc_openscenario.checks import utils, models

from qc_openscenario.checks.data_type_checker import data_type_constants

import re
import enum

MIN_RULE_VERSION = "1.2.0"
RULE_SEVERITY = IssueSeverity.ERROR
ALLOWED_OPERANDS = set()
ALLOWED_OPERANDS.add("-")
ALLOWED_OPERANDS.add("round")
ALLOWED_OPERANDS.add("floor")
ALLOWED_OPERANDS.add("ceil")
ALLOWED_OPERANDS.add("sqrt")
ALLOWED_OPERANDS.add("pow")
ALLOWED_OPERANDS.add("*")
ALLOWED_OPERANDS.add("/")
ALLOWED_OPERANDS.add("%")
ALLOWED_OPERANDS.add("+")
ALLOWED_OPERANDS.add("-")
ALLOWED_OPERANDS.add("not")
ALLOWED_OPERANDS.add("and")
ALLOWED_OPERANDS.add("or")


class ExpressionMember(enum.IntEnum):
INVALID = 0
VARIABLE = 1
OPERATOR = 2
NUMBER = 3
PARENTHESIS = 4


@dataclass
class QueueNode:
element: etree._ElementTree
xpath: Union[str, None]


@dataclass
class AttributeInfo:
name: str
value: str
xpath: str


def get_all_attributes(tree: etree._ElementTree, root: etree._Element):
attributes = []
stack = [
QueueNode(root, tree.getpath(root))
] # Initialize stack with the root element

while stack:
current_node = stack.pop()
current_element = current_node.element
current_xpath = current_node.xpath

# Process attributes of the current element
for attr, value in current_element.attrib.items():
attributes.append(AttributeInfo(attr, value, current_xpath))

# Push children to the stack for further processing
stack.extend(
reversed(
[QueueNode(x, tree.getpath(x)) for x in current_element.getchildren()]
)
)

return attributes


def filter_expressions(attribute):
return (
attribute.value.startswith("$")
and utils.get_attribute_type(attribute.value) != models.AttributeType.PARAMETER
)


def check_rule(checker_data: models.CheckerData) -> None:
"""
Rule ID: asam.net:xosc:1.2.0:data_type.allowed_operators
Description: Expressions in OpenSCENARIO must only use the following operands:
-, round, floor, ceil, sqrt, pow, *, /, %, +, -, not, and, or.
Severity: ERROR
Version range: [1.2.0, )
Remark:
None
More info at
- https://github.com/asam-ev/qc-openscenarioxml/issues/31
"""
logging.info("Executing allowed_operators check")

schema_version = checker_data.schema_version
if schema_version is None:
logging.info(f"- Version not found in the file. Skipping check")
return

if utils.compare_versions(schema_version, MIN_RULE_VERSION) < 0:
logging.info(
f"- Version {schema_version} is less than minimum required version {MIN_RULE_VERSION}. Skipping check"
)
return

rule_uid = checker_data.result.register_rule(
checker_bundle_name=constants.BUNDLE_NAME,
checker_id=data_type_constants.CHECKER_ID,
emanating_entity="asam.net",
standard="xosc",
definition_setting=MIN_RULE_VERSION,
rule_full_name="data_type.allowed_operators",
)

tree = checker_data.input_file_xml_root
root = tree.getroot()
attributes = get_all_attributes(tree, root)
filtered_attributes = list(filter(filter_expressions, attributes))

logging.debug(f"attributes: {attributes}")
logging.debug(f"filtered_attributes: {filtered_attributes}")

for attribute in filtered_attributes:
# Remove starting "${" and trailing "}"
expression_candidate = attribute.value[2:-1]
logging.debug(f"expression_candidate: {expression_candidate}")

# Tokenize the expression using regular expressions
token_pattern = r"\$[A-Za-z_]\w*|\d+\.\d+|\d+|[\+\-\*/\^%()**//\{\}]|\w+"
tokens = re.findall(token_pattern, expression_candidate)

token_type = None
logging.debug(f"tokens: {tokens}")
for token in tokens:
if token.startswith("$"):
token_type = ExpressionMember.VARIABLE
elif token in ALLOWED_OPERANDS:
token_type = ExpressionMember.OPERATOR
elif re.match(r"\d+\.\d+", token) or token.isdigit():
token_type = ExpressionMember.NUMBER
elif token in ["(", ")"]:
token_type = ExpressionMember.PARENTHESIS
else:
token_type = ExpressionMember.INVALID

logging.debug(f"token {token} - type {token_type}")
if token_type is None:
continue

has_issue = token_type == ExpressionMember.INVALID
if has_issue:
logging.debug(f"Invalid operand {token}")
xpath = attribute.xpath

issue_id = checker_data.result.register_issue(
checker_bundle_name=constants.BUNDLE_NAME,
checker_id=data_type_constants.CHECKER_ID,
description="Issue flagging invalid operand is used within expression",
level=RULE_SEVERITY,
rule_uid=rule_uid,
)
checker_data.result.add_xml_location(
checker_bundle_name=constants.BUNDLE_NAME,
checker_id=data_type_constants.CHECKER_ID,
issue_id=issue_id,
xpath=xpath,
description=f"Invalid operand {token} used",
)
70 changes: 70 additions & 0 deletions qc_openscenario/checks/data_type_checker/data_type_checker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import logging

from lxml import etree

from qc_baselib import Configuration, Result, StatusType

from qc_openscenario import constants
from qc_openscenario.checks import utils, models
from qc_openscenario.schema import schema_files

from qc_openscenario.checks.data_type_checker import (
data_type_constants,
allowed_operators,
non_negative_transition_time_in_light_state_action,
positive_duration_in_phase,
)


def run_checks(checker_data: models.CheckerData) -> None:
logging.info("Executing data_type checks")

checker_data.result.register_checker(
checker_bundle_name=constants.BUNDLE_NAME,
checker_id=data_type_constants.CHECKER_ID,
description="Check if data_type properties of input file are properly set",
summary="",
)

if checker_data.input_file_xml_root is None:
logging.error(
f"Invalid xml input file. Checker {data_type_constants.CHECKER_ID} skipped"
)
checker_data.result.set_checker_status(
checker_bundle_name=constants.BUNDLE_NAME,
checker_id=data_type_constants.CHECKER_ID,
status=StatusType.SKIPPED,
)
return

if checker_data.schema_version not in schema_files.SCHEMA_FILES:

logging.error(
f"Version {checker_data.schema_version} unsupported. Checker {data_type_constants.CHECKER_ID} skipped"
)
checker_data.result.set_checker_status(
checker_bundle_name=constants.BUNDLE_NAME,
checker_id=data_type_constants.CHECKER_ID,
status=StatusType.SKIPPED,
)
return

rule_list = [
allowed_operators.check_rule,
non_negative_transition_time_in_light_state_action.check_rule,
positive_duration_in_phase.check_rule,
]

for rule in rule_list:
rule(checker_data=checker_data)

logging.info(
f"Issues found - {checker_data.result.get_checker_issue_count(checker_bundle_name=constants.BUNDLE_NAME, checker_id=data_type_constants.CHECKER_ID)}"
)

# TODO: Add logic to deal with error or to skip it
checker_data.result.set_checker_status(
checker_bundle_name=constants.BUNDLE_NAME,
checker_id=data_type_constants.CHECKER_ID,
status=StatusType.COMPLETED,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CHECKER_ID = "data_type_xosc"
Loading

0 comments on commit a830ded

Please sign in to comment.