diff --git a/core/controller/relval_controller.py b/core/controller/relval_controller.py index c695d7b..a1e7a41 100644 --- a/core/controller/relval_controller.py +++ b/core/controller/relval_controller.py @@ -244,7 +244,7 @@ def get_task_dict(self, relval, step, step_index): "Getting step %s dict for %s", step_index, relval.get_prepid() ) task_dict = {} - # If it's firtst step and not input file - it is generator + # If it's first step and not input file - it is generator # set Seeding to AutomaticSeeding, RequestNumEvets, EventsPerJob and EventsPerLumi # It expects --relval attribute if step_index == 0: @@ -264,6 +264,9 @@ def get_task_dict(self, relval, step, step_index): task_dict["EventsPerLumi"] = int(events_per_job) else: input_step = relval.get("steps")[step.get_input_step_index()] + + task_dict["SplittingAlgo"] = "LumiBased" + if input_step.get_step_type() == "input_file": input_dict = input_step.get("input") # Input file step is not a task @@ -273,6 +276,7 @@ def get_task_dict(self, relval, step, step_index): task_dict["LumiList"] = input_dict["lumisection"] elif input_dict["run"]: task_dict["RunWhitelist"] = input_dict["run"] + else: task_dict["InputTask"] = input_step.get_short_name() _, input_module = step.get_input_eventcontent(input_step) @@ -281,8 +285,6 @@ def get_task_dict(self, relval, step, step_index): if step.get("lumis_per_job") != "": task_dict["LumisPerJob"] = int(step.get("lumis_per_job")) - task_dict["SplittingAlgo"] = "LumiBased" - task_dict["TaskName"] = step.get_short_name() task_dict["ConfigCacheID"] = step.get("config_id") task_dict["KeepOutput"] = step.get("keep_output") @@ -328,7 +330,7 @@ def get_request_priority(self, relval: RelVal) -> int: priority ) return priority - + if "HighPrio" in campaign: priority = 500000 self.logger.info( @@ -336,7 +338,7 @@ def get_request_priority(self, relval: RelVal) -> int: priority ) return priority - + return 450000 def get_job_dict(self, relval): diff --git a/core/controller/ticket_controller.py b/core/controller/ticket_controller.py index 7e45d71..c625ae3 100644 --- a/core/controller/ticket_controller.py +++ b/core/controller/ticket_controller.py @@ -1,6 +1,7 @@ """ Module that contains TicketController class """ + import json from copy import deepcopy from environment import ( @@ -365,12 +366,28 @@ def generate_workflows(self, ticket, ssh_executor): "core/utils/run_the_matrix_pdmv.py", f"{remote_directory}/run_the_matrix_pdmv.py", ) + + ssh_executor.upload_file( + "core/utils/dqm.py", + f"{remote_directory}/dqm.py", + ) + ssh_executor.upload_file( + "core/utils/das.py", + f"{remote_directory}/das.py", + ) + command = [ + f"cd {remote_directory}", + "voms-proxy-init -voms cms --valid 4:00 --out $(pwd)/proxy.txt", + ] + ssh_executor.execute_command(command) + # Defined a name for output file file_name = f"{ticket_prepid}.json" # Execute run_the_matrix_pdmv.py matrix_command = run_commands_in_cmsenv( [ f"cd {remote_directory}", + "export X509_USER_PROXY=$(pwd)/proxy.txt", "$PYTHON_INT run_the_matrix_pdmv.py " f"-l={workflow_ids} " f"-w={matrix} " diff --git a/core/model/relval_step.py b/core/model/relval_step.py index e0ebc99..b9a563a 100644 --- a/core/model/relval_step.py +++ b/core/model/relval_step.py @@ -1,9 +1,10 @@ """ Module that contains RelValStep class """ -import weakref import json +import weakref from copy import deepcopy + from core.model.model_base import ModelBase from core_lib.utils.common_utils import get_scram_arch @@ -111,14 +112,14 @@ def __init__(self, json_input=None, parent=None, check_attributes=True): json_input['gpu'] = schema.get('gpu') json_input['gpu']['requires'] = 'forbidden' step_input = json_input['input'] - + for key, default_value in schema['input'].items(): if key not in step_input: step_input[key] = default_value else: json_input['driver'] = {k.lstrip('-'): v for k, v in json_input['driver'].items()} json_input['input'] = schema.get('input') - + if json_input.get('gpu', {}).get('requires') not in ('optional', 'required'): json_input['gpu'] = schema.get('gpu') json_input['gpu']['requires'] = 'forbidden' @@ -283,25 +284,6 @@ def __build_das_command(self, step_index): command += f'>> {files_name}\n' return (comment + '\n' + command).strip() - events = input_dict['events'] - - ## N.B. das-up-to-nevents.py exists only from 14_1_0_pre7 - cmssw_components = lambda x: x.strip().split("_") - cmssw_release = cmssw_components(self.get_release()) - check_das_up_to_nevents = cmssw_release >= cmssw_components("14_1_0_pre7") - - if events > 0 and check_das_up_to_nevents: - self.logger.info('Making a DAS command for step %s with max events', step_index) - files_name = f'step{step_index + 1}_files.txt' - comment = f'# Arguments for step {step_index + 1}:\n' - command = f'# Command for step {step_index + 1}:\n' - comment += f'# dataset: {dataset}\n' - comment += f'# events : {events}\n' - command += f'echo "" > {files_name}\n' - command += f'das-up-to-nevents.py -d {dataset} -e {events} ' - command += f'>> {files_name}\n' - return (comment + '\n' + command).strip() - return f'# Step {step_index + 1} is input dataset for next step: {dataset}' def get_command(self, custom_fragment=None, for_submission=False): diff --git a/core/utils/das.py b/core/utils/das.py new file mode 100755 index 0000000..192f5a9 --- /dev/null +++ b/core/utils/das.py @@ -0,0 +1,214 @@ +#!/usr/bin/env python3 +import itertools +import subprocess + +import numpy as np # pylint: disable=import-error +import pandas as pd # pylint: disable=import-error + + +def get_lumi_ranges(i): + """ + An helper to transform a list of lumisections into a list of lists (ranges). + It groups contigous elements in a single rangel-like list. + + Args: + i: a list of ints. + + Returns: + list[list[int]]: a single rangel-like list. + """ + result = [] + for _, b in itertools.groupby(enumerate(i), lambda pair: pair[1] - pair[0]): + b = list(b) + result.append([b[0][1], b[-1][1]]) + return result + + +def das_do_command(query): + """ + A simple wrapper for dasgoclient. + + Args: + query: a dasgoclient query. + + Returns: + list[str]: the dasgoclient command output split by newlines. + + """ + cmd = 'dasgoclient --query="%s"' % (query) + out = subprocess.check_output(cmd, shell=True, executable="/bin/bash").decode( + "utf8" + ) + return out.split("\n") + + +def das_file_data(dataset): + """ + Given a dataset create a pandas DataFrame with the + list of file names and number of events per file. + + Args: + dataset: the dataset name '/PD/GTString/DATA-TIER' + + Returns: + A pandas DataFrame having for each row a single file and as columns: + - the file name; + - the number of events in each file. + """ + query = "file dataset=%s | grep file.name, file.nevents" % (dataset) + out = das_do_command(query) + out = [np.array(r.split(" "))[[0, 3]] for r in out if len(r) > 0] + + df = pd.DataFrame(out, columns=["file", "events"]) + df.events = df.events.values.astype(int) + + return df + + +def das_lumi_data(dataset): + """ + Produces a file by file+lumi+run pandas DataFrame + + Args: + dataset: the dataset name '/PD/GTString/DATA-TIER' + + Returns: + A pandas DataFrame having for each row a single file and as columns: + - the file name; + - the lumisections. + + """ + query = "file,lumi,run dataset=%s " % (dataset) + + out = das_do_command(query) + out = [r.split(" ") for r in out if len(r) > 0] + + df = pd.DataFrame(out, columns=["file", "run", "lumis"]) + + return df + + +def get_events_df(golden, dataset, events): + """ + Produces a file by file pandas DataFrame + + Args: + golden: a run by run certification json + dataset: the dataset name as a string '/PD/GTString/DATA-TIER' + events: max number of events (an int). + + Returns: + A pandas DataFrame having for each row a single file and as columns: + - the file name; + - the lumisections; + - the run; + - the number of events. + + """ + + lumi_df = das_lumi_data(dataset) + file_df = das_file_data(dataset) + + df = lumi_df.merge( + file_df, on="file", how="inner" + ) # merge file informations with run and lumis + df["lumis"] = [ + [int(ff) for ff in f.replace("[", "").replace("]", "").split(",")] + for f in df.lumis.values + ] + + df_rs = [] + + for r in golden: + cut = df["run"] == r + if not any(cut): + continue + + df_r = df[cut] + + # jumping very low event count runs + if df_r["events"].sum() < 10000: + continue + + good_lumis = np.array( + [len([ll for ll in l if ll in golden[r]]) for l in df_r.lumis] + ) + n_lumis = np.array([len(l) for l in df_r.lumis]) + df_rs.append(df_r[good_lumis == n_lumis]) + + if len(df_rs) == 0: + return pd.DataFrame([]) + if len(df_rs) == 1: + df = df_rs + else: + df = pd.concat(df_rs) + + ## lumi sorting + df.loc[:, "min_lumi"] = [min(f) for f in df.lumis] + df.loc[:, "max_lumi"] = [max(f) for f in df.lumis] + df = df.sort_values(["run", "min_lumi", "max_lumi"]) + + ## events skimming + df = df[df["events"] <= events] # jump too big files + df.loc[:, "sum_evs"] = df.loc[:, "events"].cumsum() + df = df[df["sum_evs"] < events] + + return df + + +def get_run_lumi(df): + """ + Produces the lumi mask dict starting from a pandas DataFrame + + Args: + df: a pandas DataFrame having for each row a single file and as columns: + - the file name; + - the lumisections; + - the run; + - the number of events. + Returns: + A "CMS"-like lumi mask dict mapping: + - the run number; + - to the list of good lumisection ranges. + + E.g. {run : [[lumi_1,lumi_2],[lumi_3,lumi_4]]} + """ + if len(df) == 0: + return {} + + run_list = np.unique(df.run.values).tolist() + lumi_list = [ + get_lumi_ranges( + np.sort( + np.concatenate(df.loc[df["run"] == r, "lumis"].values).ravel() + ).tolist() + ) + for r in run_list + ] + + lumi_ranges = dict(zip(run_list, lumi_list)) + + return lumi_ranges + + +def get_lumi_dict(golden, dataset, events): + """ + Produces a lumi mask for a given dataset, up to events, using a certification json + + Args: + golden: a run by run certification json + dataset: the dataset name '/PD/GTString/DATA-TIER' + events: max number of events (an int). + + Returns: + A "CMS"-like lumi mask dict mapping: + - the run number; + - to the list of good lumisection ranges. + + E.g. {run : [[lumi_1,lumi_2],[lumi_3,lumi_4]]} + """ + + df = get_events_df(golden, dataset, events) + lumi = get_run_lumi(df) + + return lumi diff --git a/core/utils/dqm.py b/core/utils/dqm.py new file mode 100644 index 0000000..f4da920 --- /dev/null +++ b/core/utils/dqm.py @@ -0,0 +1,181 @@ +import codecs +import json +import os + +# pylint: disable=import-error +import numpy as np +import requests +from bs4 import BeautifulSoup +from requests.exceptions import HTTPError + +# pylint: enable=import-error + +base_cert_path = "/eos/user/c/cmsdqm/www/CAF/certification/" + + +def list_certification_files(cert_type): + """ + List all the certification files related to a certification type + in the CMS DQM certification server. + + Args: + cert_type (str): Certification type. This corresponds to the folder + name available in the server. + + Returns: + list[str]: All the JSON certification file names. + + Raises: + HTTPError: If it is not possible to retrieve the index HTML + page related to the certification type from the server. + """ + dqm_cert_url = "https://cms-service-dqmdc.web.cern.ch/CAF/certification" + url = "%s/%s/" % (dqm_cert_url, cert_type) + page_content = requests.get(url=url, timeout=30) + if page_content.status_code != 200: + error_msg = f"Unable to retrieve the content related to {cert_type}" + raise HTTPError(error_msg, response=page_content) + + # Parse the HTML and retrieve the file names + page_content = BeautifulSoup(page_content.text, "lxml").text + + file_names = [] + for file_line in page_content.strip().split("\n"): + if file_line and ".json" in file_line: + metadata = file_line.strip().split(" ") + file_name = metadata[0] + if file_name.endswith(".json"): + file_names.append(file_name) + + return file_names + + +def get_certification_file(path): + """ + Get a certification file from the CMS DQM certification + server. + + Args: + path (str): Path to the certification file on the server. + + Returns: + dict: Golden JSON file. + """ + dqm_cert_url = "https://cms-service-dqmdc.web.cern.ch/CAF/certification" + url = "%s/%s" % (dqm_cert_url, path) + file = requests.get(url=url, timeout=30) + if file.status_code != 200: + error_msg = f"Unable to retrieve the content related to {path}" + raise HTTPError(error_msg, response=file) + + return file.json() + + +def get_cert_type(dataset): + """ + List all the certification files related to a certification type + in the CMS DQM certification server. + + Args: + dataset: the dataset name as a string '/PD/GTString/DATA-TIER'. + + Returns: + str: The type of certification we seek (Collisions, HI, Cosmics + or Commisioning). + + """ + year = dataset.split("Run")[1][2:4] # from 20XX to XX + PD = dataset.split("/")[1] # pylint: disable=invalid-name + cert_type = "Collisions" + str(year) + if "Cosmics" in dataset: + cert_type = "Cosmics" + str(year) + elif "Commisioning" in dataset: + cert_type = "Commisioning2020" + elif "HI" in PD: + cert_type = "Collisions" + str(year) + "HI" + + return cert_type + + +def get_json_list(cert_type, web_fallback): + """ + List all the certification files related to a certification type + either stored on CMS DQM EOS either, as a fallback, + in the CMS DQM certification server. + + Args: + dataset: the dataset name as a string '/PD/GTString/DATA-TIER'. + web_fallback: a bool flag enabling looking for the list on CMS DQM server. + + Returns: + list[str]: All the JSON certification file names. + + """ + + ## if we have access to eos we get from there ... + if not web_fallback: + cert_path = base_cert_path + cert_type + "/" + json_list = os.listdir(cert_path) + json_list = [c for c in json_list if "Golden" in c and "era" not in c] + json_list = [ + c for c in json_list if c.startswith("Cert_C") and c.endswith("json") + ] + ## ... if not we go to the website + else: + json_list = list_certification_files(cert_type=cert_type) + json_list = [ + file_name + for file_name in json_list + if "Golden" in file_name + and "Cert_C" in file_name + and "era" not in file_name + ] + + return json_list + + +def get_golden_json(dataset): + """ + Output a the golden certification dictionary (json) for a specific datasets. + In case of multiple json files available, the one with the highest + lumi range is selected. The dictionary maps each run number with a complete list + of the correspondinig golden lumisections. + + Args: + dataset: the dataset name as a string '/PD/GTString/DATA-TIER' + + Returns: + dict: Golden Run-Lumisection dictionary. + + """ + + golden_flat = {} + + cert_type = get_cert_type(dataset) + cert_path = base_cert_path + cert_type + "/" + web_fallback = not os.path.isdir(cert_path) + + json_list = get_json_list(cert_type, web_fallback) + + # the larger the better, assuming file naming schema + # Cert_X_RunStart_RunFinish_Type.json + run_ranges = [int(c.split("_")[3]) - int(c.split("_")[2]) for c in json_list] + latest_json = ( + np.array(json_list[np.argmax(run_ranges)]).reshape(1, -1)[0].astype(str) + ) + best_json = str(latest_json[0]) + if not web_fallback: + with codecs.open(cert_path + "/" + best_json, encoding="utf-8") as js: + golden = json.load(js) + else: + path = "%s/%s" % (cert_type, best_json) + golden = get_certification_file(path=path) + + # golden json with all the lumisections one by one + for k in golden: + R = [] # pylint: disable=invalid-name + for r in golden[k]: + R = R + list(range(r[0], r[1] + 1)) # pylint: disable=invalid-name + golden_flat[k] = R + + return golden_flat diff --git a/core/utils/requirements.txt b/core/utils/requirements.txt new file mode 100644 index 0000000..9f58652 --- /dev/null +++ b/core/utils/requirements.txt @@ -0,0 +1,9 @@ +# Just for reference, the following are the packages +# and versions used for the development of some modules. +# However, the runtime environment may contain different +# versions pre-included in the `cms-sw` container. +# +# For releases different than `el9` it is not possible to +# update the installed packages to match these versions. +pandas==2.0.1 +numpy==1.24.3 \ No newline at end of file diff --git a/core/utils/run_the_matrix_pdmv.py b/core/utils/run_the_matrix_pdmv.py index e8a64ca..a1ec2eb 100644 --- a/core/utils/run_the_matrix_pdmv.py +++ b/core/utils/run_the_matrix_pdmv.py @@ -2,15 +2,20 @@ PdmV's simplified implementation of runTheMatrix.py """ from __future__ import print_function -import sys + import argparse -import json import importlib import inspect +import json import re +import sys + #pylint: disable=wrong-import-position,import-error import Configuration.PyReleaseValidation.relval_steps as steps_module from Configuration.PyReleaseValidation.MatrixInjector import MatrixInjector +from das import get_lumi_dict +from dqm import get_golden_json + #pylint: enable=wrong-import-position,import-error @@ -153,11 +158,21 @@ def merge_additional_command(workflow_step, command): print('Merging to %s' % (workflow_step)) return steps_module.merge([command_dict, workflow_step]) +def get_lumi_ranges_from_dict(step_input): + """ + Given the dict of an INPUT step outputs the lumi mask to be used + taking into account the max number of events to be processed + """ + golden = get_golden_json(step_input.dataSet) + lumi = get_lumi_dict(golden,step_input.dataSet,step_input.events) + + return lumi def make_relval_step(workflow_step, workflow_step_name, wmsplit): """ Build one workflow step - either input dataset or cmsDriver """ + print(workflow_step) step = {'name': workflow_step_name} if workflow_step_name in wmsplit: step['lumis_per_job'] = wmsplit[workflow_step_name] @@ -170,8 +185,13 @@ def make_relval_step(workflow_step, workflow_step_name, wmsplit): if 'INPUT' in workflow_step: # This step has input dataset step_input = workflow_step['INPUT'] + lumisections = step_input.ls + + if not step_input.ls and step_input.events and step_input.label: + lumisections = get_lumi_ranges_from_dict(step_input) + step['input'] = {'dataset': step_input.dataSet, - 'lumisection': step_input.ls, + 'lumisection': lumisections, 'run': step_input.run, 'label': step_input.label, 'events': step_input.events}