Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add porting files of Kafka v0.45.0 #400

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion acto/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,7 @@ def __init__(

self.sequence_base = 0

self.custom_oracle: Optional[type[CheckerInterface]] = None
self.custom_checker: Optional[type[CheckerInterface]] = None
self.custom_on_init: Optional[Callable] = None
if operator_config.custom_oracle is not None:
module = importlib.import_module(operator_config.custom_oracle)
Expand Down
20 changes: 20 additions & 0 deletions acto/input/test_generators/primitive.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
OpaqueSchema,
StringSchema,
)
from acto.schema.oneof import OneOfSchema
from acto.utils.thread_logger import get_thread_logger


Expand Down Expand Up @@ -49,8 +50,8 @@

ret: list[TestCase] = []
if schema.enum is not None:
for case in schema.enum:
ret.append(EnumTestCase(case, primitive=True))

Check warning on line 54 in acto/input/test_generators/primitive.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on lines 53-54
else:
for sub_schema in schema.possibilities:
testcases = resolve_testcases(sub_schema)
Expand All @@ -62,6 +63,25 @@
return ret


@test_generator(property_type="OneOf", priority=Priority.PRIMITIVE)
def one_of_tests(schema: OneOfSchema):
"""Generate testcases for AnyOf type"""

ret: list[TestCase] = []

Check warning on line 70 in acto/input/test_generators/primitive.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on line 70
if schema.enum is not None:
for case in schema.enum:
ret.append(EnumTestCase(case, primitive=True))
else:
for sub_schema in schema.possibilities:
testcases = resolve_testcases(sub_schema)
for testcase in testcases:
testcase.add_precondition(
SchemaPrecondition(sub_schema).precondition
)
ret.extend(testcases)
return ret


@test_generator(property_type="Array", priority=Priority.PRIMITIVE)
def array_tests(schema: ArraySchema):
"""Representation of an array node
Expand Down
223 changes: 130 additions & 93 deletions acto/parse_log/parse_log.py
Original file line number Diff line number Diff line change
@@ -1,61 +1,73 @@
import json
import logging
import re

from acto.common import get_thread_logger

klog_regex = r'^\s*'
klog_regex += r'(\w)' # group 1: level
KLOG_REGEX = r"^\s*"
KLOG_REGEX += r"(\w)" # group 1: level
# group 2-7: timestamp
klog_regex += r'(\d{2})(\d{2})\s(\d{2}):(\d{2}):(\d{2})\.(\d{6})'
klog_regex += r'\s+'
klog_regex += r'(\d+)' # group 8
klog_regex += r'\s'
klog_regex += r'(.+):' # group 9: filename
klog_regex += r'(\d+)' # group 10: lineno
klog_regex += r'\]\s'
klog_regex += r'(.*?)' # group 11: message
klog_regex += r'\s*$'

logr_regex = r'^\s*'
KLOG_REGEX += r"(\d{2})(\d{2})\s(\d{2}):(\d{2}):(\d{2})\.(\d{6})"
KLOG_REGEX += r"\s+"
KLOG_REGEX += r"(\d+)" # group 8
KLOG_REGEX += r"\s"
KLOG_REGEX += r"(.+):" # group 9: filename
KLOG_REGEX += r"(\d+)" # group 10: lineno
KLOG_REGEX += r"\]\s"
KLOG_REGEX += r"(.*?)" # group 11: message
KLOG_REGEX += r"\s*$"

LOGR_REGEX = r"^\s*"
# group 1: timestamp
logr_regex += r'(\d{4}\-\d{2}\-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z)'
logr_regex += r'\s+([A-Z]+)' # group 2: level
logr_regex += r'\s+(\S+)' # group 3: source
logr_regex += r'\s+(.*?)' # group 4: message
logr_regex += r'\s*$'
LOGR_REGEX += r"(\d{4}\-\d{2}\-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z)"
LOGR_REGEX += r"\s+([A-Z]+)" # group 2: level
LOGR_REGEX += r"\s+(\S+)" # group 3: source
LOGR_REGEX += r"\s+(.*?)" # group 4: message
LOGR_REGEX += r"\s*$"

# 1.6599427639039357e+09 INFO controllers.CassandraDatacenter Reconcile loop completed {"cassandradatacenter": "cass-operator/test-cluster", "requestNamespace": "cass-operator", "requestName": "test-cluster", "loopID": "be419d0c-c7d0-4dfa-8596-af94ea15d4f6", "duration": 0.253729569}
logr_special_regex = r'^\s*'
logr_special_regex += r'(\d{1}\.\d+e\+\d{2})' # group 1: timestamp
logr_special_regex += r'\s+([A-Z]+)' # group 2: level
logr_special_regex += r'\s+(\S+)' # group 3: source
logr_special_regex += r'\s+(.*?)' # group 4: message
logr_special_regex += r'\s*$'
LOGR_SPECIAL_REGEX = r"^\s*"
LOGR_SPECIAL_REGEX += r"(\d{1}\.\d+e\+\d{2})" # group 1: timestamp
LOGR_SPECIAL_REGEX += r"\s+([A-Z]+)" # group 2: level
LOGR_SPECIAL_REGEX += r"\s+(\S+)" # group 3: source
LOGR_SPECIAL_REGEX += r"\s+(.*?)" # group 4: message
LOGR_SPECIAL_REGEX += r"\s*$"

# time="2022-08-08T03:21:28Z" level=debug msg="Sentinel is not monitoring the correct master, changing..." src="checker.go:175"
# time="2022-08-08T03:21:56Z" level=info msg="deployment updated" deployment=rfs-test-cluster namespace=acto-namespace service=k8s.deployment src="deployment.go:102"
logrus_regex = r'^\s*'
LOGRUS_REGEX = r"^\s*"
# group 1: timestamp
logrus_regex += r'time="(\d{4}\-\d{2}\-\d{2}T\d{2}:\d{2}:\d{2}Z)"'
logrus_regex += r'\s+level=([a-z]+)' # group 2: level
logrus_regex += r'\s+msg="(.*?[^\\])"' # group 3: message
logrus_regex += r'.*'
logrus_regex += r'\s+(src="(.*?)")?' # group 4: src
logrus_regex += r'\s*$'
LOGRUS_REGEX += r'time="(\d{4}\-\d{2}\-\d{2}T\d{2}:\d{2}:\d{2}Z)"'
LOGRUS_REGEX += r"\s+level=([a-z]+)" # group 2: level
LOGRUS_REGEX += r'\s+msg="(.*?[^\\])"' # group 3: message
LOGRUS_REGEX += r".*"
LOGRUS_REGEX += r'\s+(src="(.*?)")?' # group 4: src
LOGRUS_REGEX += r"\s*$"
# this is semi-auto generated by copilot, holy moly

# This one is similar to logr_special_regex and logrus_regex, but with some differences
# This one is similar to LOGR_SPECIAL_REGEX and LOGRUS_REGEX, but with some differences
# 2024-03-05T10:07:17Z ERROR GrafanaReconciler reconciler error in stage {"controller": "grafana", "controllerGroup": "grafana.integreatly.org", "controllerKind": "Grafana", "Grafana": {"name":"test-cluster","namespace":"grafana"}, "namespace": "grafana", "name": "test-cluster", "reconcileID": "5aa39e3e-d5d3-47fc-848d-c3d15dfbcc3d", "stage": "deployment", "error": "Deployment.apps \"test-cluster-deployment\" is invalid: [spec.template.spec.containers[0].image: Required value, spec.template.spec.affinity.podAntiAffinity.requiredDuringSchedulingIgnoredDuringExecution[0].topologyKey: Required value: can not be empty, spec.template.spec.affinity.podAntiAffinity.requiredDuringSchedulingIgnoredDuringExecution[0].topologyKey: Invalid value: \"\": name part must be non-empty, spec.template.spec.affinity.podAntiAffinity.requiredDuringSchedulingIgnoredDuringExecution[0].topologyKey: Invalid value: \"\": name part must consist of alphanumeric characters, '-', '_' or '.', and must start and end with an alphanumeric character (e.g. 'MyName', or 'my.name', or '123-abc', regex used for validation is '([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9]')]"}
grafana_logr_regex = r'^\s*'
grafana_logr_regex += r'(\d{4}\-\d{2}\-\d{2}T\d{2}:\d{2}:\d{2}Z)' # Group 1: timestamp
grafana_logr_regex += r'\s+([A-Z]+)' # Group 2: level
grafana_logr_regex += r'\s+(\S+)' # Group 3: Source
grafana_logr_regex += r'\s+(.*?)' # Group 4: Message
grafana_logr_regex += r'\s*$' # Take up any remaining whitespace
GRAFANA_LOGR_REGEX = r"^\s*"
GRAFANA_LOGR_REGEX += (
r"(\d{4}\-\d{2}\-\d{2}T\d{2}:\d{2}:\d{2}Z)" # Group 1: timestamp
)
GRAFANA_LOGR_REGEX += r"\s+([A-Z]+)" # Group 2: level
GRAFANA_LOGR_REGEX += r"\s+(\S+)" # Group 3: Source
GRAFANA_LOGR_REGEX += r"\s+(.*?)" # Group 4: Message
GRAFANA_LOGR_REGEX += r"\s*$" # Take up any remaining whitespace

# Kafka log format
# 2025-01-24 22:52:03 WARN AbstractConfiguration:93 - Reconciliation #27(watch) Kafka(acto-namespace/test-cluster): Configuration option "process.roles" is forbidden and will be ignored
KAFKA_LOG_REGEX = r"^\s*"
KAFKA_LOG_REGEX += (
r"(\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2})\s+" # Group 1: timestamp
)
KAFKA_LOG_REGEX += r"(INFO|WARN|ERROR|DEBUG)\s+" # Group 2: level
KAFKA_LOG_REGEX += r"(\S+):(\d+)\s+-" # Group 3: Source
KAFKA_LOG_REGEX += r"\s+(.*?)$" # Group 4: Message


def parse_log(line: str) -> dict:
'''Try to parse the log line with some predefined format
"""Try to parse the log line with some predefined format

Currently only support three formats:
- klog
Expand All @@ -65,72 +77,97 @@
Returns:
a dict containing 'level' and 'message'
'level' will always be a lowercase string
'''
"""
logger = get_thread_logger(with_prefix=True)

log_line = {}
if re.search(klog_regex, line) != None:
if re.search(KLOG_REGEX, line) is not None:
# log is in klog format
match = re.search(klog_regex, line)
if match.group(1) == 'E':
log_line['level'] = 'error'
elif match.group(1) == 'I':
log_line['level'] = 'info'
elif match.group(1) == 'W':
log_line['level'] = 'warn'
elif match.group(1) == 'F':
log_line['level'] = 'fatal'

log_line['msg'] = match.group(11)
elif re.search(logr_regex, line) != None:
match = re.search(KLOG_REGEX, line)
if match is None:
logger.debug("parse_log() cannot parse line %s", line)
return {}

Check warning on line 89 in acto/parse_log/parse_log.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on lines 88-89
if match.group(1) == "E":
log_line["level"] = "error"
elif match.group(1) == "I":
log_line["level"] = "info"
elif match.group(1) == "W":
log_line["level"] = "warn"
elif match.group(1) == "F":
log_line["level"] = "fatal"

Check warning on line 97 in acto/parse_log/parse_log.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on lines 94-97

log_line["msg"] = match.group(11)
elif re.search(LOGR_REGEX, line) is not None:
# log is in logr format
match = re.search(logr_regex, line)
log_line['level'] = match.group(2).lower()
log_line['msg'] = match.group(4)
elif re.search(logr_special_regex, line) != None:
match = re.search(LOGR_REGEX, line)
if match is None:
logger.debug("parse_log() cannot parse line %s", line)
return {}

Check warning on line 105 in acto/parse_log/parse_log.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on lines 104-105
log_line["level"] = match.group(2).lower()
log_line["msg"] = match.group(4)
elif re.search(LOGR_SPECIAL_REGEX, line) is not None:
# log is in logr special format
match = re.search(logr_special_regex, line)
log_line['level'] = match.group(2).lower()
log_line['msg'] = match.group(4)
elif re.search(logrus_regex, line) != None:
match = re.search(LOGR_SPECIAL_REGEX, line)
if match is None:
logger.debug("parse_log() cannot parse line %s", line)
return {}
log_line["level"] = match.group(2).lower()
log_line["msg"] = match.group(4)

Check warning on line 115 in acto/parse_log/parse_log.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on lines 110-115
elif re.search(LOGRUS_REGEX, line) is not None:
# log is in logrus format
match = re.search(logrus_regex, line)
log_line['level'] = match.group(2)
log_line['msg'] = match.group(3)
elif re.search(grafana_logr_regex, line) != None:
match = re.search(grafana_logr_regex, line)
log_line['level'] = match.group(2).lower()
log_line['msg'] = match.group(4)
match = re.search(LOGRUS_REGEX, line)
if match is None:
logger.debug("parse_log() cannot parse line %s", line)
return {}
log_line["level"] = match.group(2)
log_line["msg"] = match.group(3)

Check warning on line 123 in acto/parse_log/parse_log.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on lines 118-123
elif re.search(GRAFANA_LOGR_REGEX, line) is not None:
match = re.search(GRAFANA_LOGR_REGEX, line)
if match is None:
logger.debug("parse_log() cannot parse line %s", line)
return {}
log_line["level"] = match.group(2).lower()
log_line["msg"] = match.group(4)

Check warning on line 130 in acto/parse_log/parse_log.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on lines 125-130
elif re.search(KAFKA_LOG_REGEX, line) is not None:
match = re.search(KAFKA_LOG_REGEX, line)
if match is None:
logger.debug("parse_log() cannot parse line %s", line)
return {}
log_line["level"] = match.group(2).lower()
log_line["msg"] = match.group(5)

Check warning on line 137 in acto/parse_log/parse_log.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on lines 132-137
else:
try:
log_line = json.loads(line)
if 'level' not in log_line:
log_line['level'] = log_line['severity']
if "level" not in log_line:
log_line["level"] = log_line["severity"]

del log_line['severity']
log_line['level'] = log_line['level'].lower()
del log_line["severity"]

Check warning on line 144 in acto/parse_log/parse_log.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on lines 142-144
log_line["level"] = log_line["level"].lower()
except Exception as e:
logger.debug(f"parse_log() cannot parse line {line} due to {e}")
pass
logger.debug("parse_log() cannot parse line %s due to %s", line, e)

return log_line


if __name__ == '__main__':
# line = '   Ports: []v1.ServicePort{'
# line = 'E0714 23:11:19.386396 1 pd_failover.go:70] PD failover replicas (0) reaches the limit (0), skip failover'
# line = '{"level":"error","ts":1655678404.9488907,"logger":"controller-runtime.injectors-warning","msg":"Injectors are deprecated, and will be removed in v0.10.x"}'

# line = 'time="2022-08-08T03:21:56Z" level=info msg="deployment updated" deployment=rfs-test-cluster namespace=acto-namespace service=k8s.deployment src="deployment.go:102"'
# print(logrus_regex)
# print(parse_log(line)['msg'])

with open('testrun-2022-08-10-15-59/trial-01-0000/operator-0.log', 'r') as f:
for line in f.readlines():
print(f"Parsing log: {line}")

if parse_log(line) == {} or parse_log(line)['level'].lower() != 'error' and parse_log(line)['level'].lower() != 'fatal':
print(f'Test passed: {line} {parse_log(line)}')
else:
print(f"Message Raw: {line}, Parsed {parse_log(line)}")
break
# if __name__ == "__main__":
# line = '   Ports: []v1.ServicePort{'
# line = 'E0714 23:11:19.386396 1 pd_failover.go:70] PD failover replicas (0) reaches the limit (0), skip failover'
# line = '{"level":"error","ts":1655678404.9488907,"logger":"controller-runtime.injectors-warning","msg":"Injectors are deprecated, and will be removed in v0.10.x"}'

# line = 'time="2022-08-08T03:21:56Z" level=info msg="deployment updated" deployment=rfs-test-cluster namespace=acto-namespace service=k8s.deployment src="deployment.go:102"'
# print(LOGRUS_REGEX)
# print(parse_log(line)['msg'])

# with open("testrun-kafka-config/trial-00-0020/operator-002.log", "r") as f:
# for line in f.readlines():
# print(f"Parsing log: {line}")

# if (
# parse_log(line) == {}
# or parse_log(line)["level"].lower() != "error"
# and parse_log(line)["level"].lower() != "fatal"
# ):
# print(f"Test passed: {line} {parse_log(line)}")
# else:
# print(f"Message Raw: {line}, Parsed {parse_log(line)}")
# break
48 changes: 44 additions & 4 deletions acto/schema/oneof.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
from copy import deepcopy
from typing import List, Tuple

from .array import ArraySchema
from .base import BaseSchema, TreeNode
from .object import ObjectSchema


class OneOfSchema(BaseSchema):
Expand All @@ -18,10 +20,20 @@ def __init__(self, path: list, schema: dict) -> None:
for index, v in enumerate(schema["oneOf"]):
base_schema = deepcopy(schema)
del base_schema["oneOf"]
base_schema.update(v)
self.possibilities.append(
extract_schema(self.path + [str(index)], base_schema)
)
self.__recursive_update(base_schema, v)
self.possibilities.append(extract_schema(self.path, base_schema))

def __recursive_update(self, left: dict, right: dict):
"""Recursively update left dict with right dict"""
for key, value in right.items():
if (
key in left
and isinstance(left[key], dict)
and isinstance(value, dict)
):
self.__recursive_update(left[key], value)
else:
left[key] = value

def get_possibilities(self):
"""Return all possibilities of the anyOf schema"""
Expand Down Expand Up @@ -70,3 +82,31 @@ def __str__(self) -> str:
ret += ", "
ret += "]"
return ret

def __getitem__(self, key):
if isinstance(key, int):
for i in self.possibilities:
if isinstance(i, ArraySchema):
return i[key]
raise RuntimeError("No array schema found in oneOf")
if isinstance(key, str):
for i in self.possibilities:
if isinstance(i, ObjectSchema):
return i[key]
raise RuntimeError("No object schema found in oneOf")
raise TypeError("Key must be either int or str")

def __setitem__(self, key, value):
if isinstance(key, int):
for i in self.possibilities:
if isinstance(i, ArraySchema):
i[key] = value
return
raise RuntimeError("No array schema found in oneOf")
if isinstance(key, str):
for i in self.possibilities:
if isinstance(i, ObjectSchema):
i[key] = value
return
raise RuntimeError("No object schema found in oneOf")
raise TypeError("Key must be either int or str")
Loading
Loading