Skip to content

Commit

Permalink
Skip dinamically some checks for the validation procedure
Browse files Browse the repository at this point in the history
For a given request prepid set in the `validation_skip_check`, skip if
required some checks related to the `time_per_event`, `size_per_event`
`memory` and `filter_efficiency`.
  • Loading branch information
ggonzr committed Mar 6, 2025
1 parent 438ec71 commit eb13eb4
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 1 deletion.
63 changes: 63 additions & 0 deletions mcm/automatic_scripts/validation/validation_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ def get_reports(self, validation_name, threads, expected):
return reports

def check_time_per_event(self, request_name, expected, report):
if self.skip_check(request_name, "time_per_event"):
self.logger.warning("Skipping the time_per_event check for request %s", request_name)
return True, 'Check for time_per_event has been skipped!'

time_per_event_margin = settings.get_value('timing_fraction')
for sequence_index in range(len(expected)):
expected_time_per_event = expected[sequence_index]['time_per_event']
Expand Down Expand Up @@ -287,6 +291,10 @@ def check_time_per_event(self, request_name, expected, report):
return True, ''

def check_size_per_event(self, request_name, expected, report):
if self.skip_check(request_name, "size_per_event"):
self.logger.warning("Skipping the size_per_event check for request %s", request_name)
return True, 'Check for size_per_event has been skipped!'

size_per_event_margin = 0.5
for sequence_index in range(len(expected)):
expected_size_per_event = expected[sequence_index]['size_per_event']
Expand All @@ -313,6 +321,10 @@ def check_size_per_event(self, request_name, expected, report):
return True, ''

def check_memory(self, request_name, expected, report):
if self.skip_check(request_name, "memory"):
self.logger.warning("Skipping the memory check for request %s", request_name)
return True, 'Check for memory has been skipped!'

for sequence_index in range(len(expected)):
expected_memory = expected[sequence_index]['memory']
actual_memory = report[sequence_index]['peak_value_rss']
Expand All @@ -332,6 +344,10 @@ def check_memory(self, request_name, expected, report):
return True, ''

def check_filter_efficiency(self, request_name, expected, report):
if self.skip_check(request_name, "filter_efficiency"):
self.logger.warning("Skipping the filter_efficiency check for request %s", request_name)
return True, 'Check for filter_efficiency has been skipped!'

expected_filter_efficiency = expected[0]['filter_efficiency'] # Because all have same expected eff
actual_filter_efficiency = self.list_prod([r['filter_efficiency'] for r in report])
expected_events = report[0]['expected_events']
Expand Down Expand Up @@ -585,6 +601,7 @@ def process_done_validation(self, validation_name, threads):
self.json_dumps(report))
else:
attempt_number = threads_dict['attempt_number']
checks_skipped = []
self.logger.info('This was attempt number %s for %s thread validation', attempt_number, threads)
for request_name, report in reports.items():
# Check report only for single core validation
Expand Down Expand Up @@ -621,6 +638,9 @@ def process_done_validation(self, validation_name, threads):
self.notify_validation_failed(validation_name, message)
return False

if self._message_related_to_skip(message):
checks_skipped.append(message)

# Check size per event
passed, message = self.check_size_per_event(request_name, expected_dict, report)
if not passed:
Expand All @@ -636,6 +656,9 @@ def process_done_validation(self, validation_name, threads):
self.notify_validation_failed(validation_name, message)
return False

if self._message_related_to_skip(message):
checks_skipped.append(message)

# Check memory usage
passed, message = self.check_memory(request_name, expected_dict, report)
if not passed:
Expand All @@ -644,6 +667,9 @@ def process_done_validation(self, validation_name, threads):
self.notify_validation_failed(validation_name, message)
return False

if self._message_related_to_skip(message):
checks_skipped.append(message)

# Check filter efficiency
passed, message = self.check_filter_efficiency(request_name, expected_dict, report)
if not passed:
Expand All @@ -652,6 +678,9 @@ def process_done_validation(self, validation_name, threads):
self.notify_validation_failed(validation_name, message)
return False

if self._message_related_to_skip(message):
checks_skipped.append(message)

# Add CPU name
cpu_name = self.extract_cpu_name(out_file)
if cpu_name:
Expand All @@ -667,6 +696,7 @@ def process_done_validation(self, validation_name, threads):
# If there was no break in the loop - nothing was resubmitted
del running[threads]
storage_item['done'][threads] = reports
storage_item['skip_check'] = checks_skipped
self.storage.save(validation_name, storage_item)
return True

Expand Down Expand Up @@ -701,6 +731,7 @@ def notify_validation_suceeded(self, validation_name):
subject = 'Validation succeeded for %s' % (validation_name)
message = 'Hello,\n\nValidation of %s succeeded.\nMeasured values:\n' % (validation_name)
storage_item = self.storage.get(validation_name)['done']
skip_check = self.storage.get(validation_name).get('skip_check', [])
for threads in sorted(storage_item.keys()):
threads_dict = storage_item[threads]
message += '\nThreads: %s\n' % (threads)
Expand All @@ -713,6 +744,11 @@ def notify_validation_suceeded(self, validation_name):
value = sequence[key]
message += ' %s: %s\n' % (key, value)

# Notify about checks skipped for the validation
if skip_check:
message += '\nSome checks have been skipped for this validation:'
message += '\n'.join(skip_check)

message = re.sub(r'[^\x00-\x7f]', '?', message)
item.notify(subject, message)

Expand Down Expand Up @@ -1133,6 +1169,33 @@ def get_requests_from_chained_request(self, chained_request):

return requests

def skip_check(self, request_name, to_skip):
"""
Determines if a given check should be bypassed for a request.
"""
allowed_to_skip = Request.get_allowed_skip_validation_check()
if request_name not in allowed_to_skip:
self.logger.debug("Request %s not in the 'validation_skip_check' object", request_name)
return False

checks_to_skip = allowed_to_skip.get(request_name)
if "all" in checks_to_skip:
self.logger.debug("All checks are disabled for request: %s", request_name)
return True

if to_skip not in checks_to_skip:
self.logger.debug(
"Is it not allowed to skip the check %s for request %s. Values allowed: %s",
to_skip, request_name, checks_to_skip
)
return False

# It is allowed
return True

def _message_related_to_skip(self, message):
return re.match(r"^Check for [a-z|_]+ has been skipped!$", message)


if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG,
Expand Down
28 changes: 27 additions & 1 deletion mcm/json_layer/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -3025,11 +3025,19 @@ def request_to_tasks(self, base, depend):

self.logger.info('Validation info of %s:\n%s', self.get_attribute('prepid'), dumps(validation_info, indent=2, sort_keys=True))
filter_efficiency = self.get_efficiency()
request_skip_check = self.get_allowed_skip_validation_check().get(self.get_attribute("prepid"))
filter_eff_skipped = False
if request_skip_check:
# If any of the following options is set
skipped_if = set(("filter_efficiency", "all"))
if skipped_if.intersection(set(request_skip_check)):
filter_eff_skipped = True

if validation_info:
# Average filter efficiency
filter_efficiency = sum([x['filter_efficiency'] for x in validation_info]) / len(validation_info)
filter_efficiency_threshold = 0.001
if filter_efficiency < filter_efficiency_threshold:
if filter_efficiency < filter_efficiency_threshold and not filter_eff_skipped:
# If filter eff < 10^-3, then filter based on events/lumi first and choose highest cpu efficiency
self.logger.info('Filter efficiency lower than %s (%s), choosing cores based on evens/lumi',
filter_efficiency_threshold,
Expand Down Expand Up @@ -3260,3 +3268,21 @@ def _pileup_only_from_site(self):
pass

return pileup_from

@staticmethod
def get_allowed_skip_validation_check():
"""
Get the requests allowed to skip some validation checks
Returns:
dict[str, list[str]]: A dict with the request's ids and the checks
allowed to be skipped per each request.
"""
allowed_to_skip = {}
try:
allowed_to_skip = settings.get_value("validation_skip_check")
except TypeError:
# The key does not exists in the database
pass

return allowed_to_skip

0 comments on commit eb13eb4

Please sign in to comment.