From 4b695770445d6bc85e76594f44e3dc398a3ea50f Mon Sep 17 00:00:00 2001 From: adiflori Date: Mon, 2 Dec 2024 12:28:11 +0100 Subject: [PATCH 1/9] A simplified das-up-to-nevents.py for precomputing lumisections when nevents limit is needed --- core/controller/relval_controller.py | 10 +- core/controller/ticket_controller.py | 16 ++++ core/model/relval_step.py | 17 ---- core/utils/das.py | 133 +++++++++++++++++++++++++++ core/utils/dqm.py | 91 ++++++++++++++++++ core/utils/run_the_matrix_pdmv.py | 20 +++- 6 files changed, 265 insertions(+), 22 deletions(-) create mode 100755 core/utils/das.py create mode 100644 core/utils/dqm.py diff --git a/core/controller/relval_controller.py b/core/controller/relval_controller.py index c695d7b..365876e 100644 --- a/core/controller/relval_controller.py +++ b/core/controller/relval_controller.py @@ -244,7 +244,7 @@ def get_task_dict(self, relval, step, step_index): "Getting step %s dict for %s", step_index, relval.get_prepid() ) task_dict = {} - # If it's firtst step and not input file - it is generator + # If it's first step and not input file - it is generator # set Seeding to AutomaticSeeding, RequestNumEvets, EventsPerJob and EventsPerLumi # It expects --relval attribute if step_index == 0: @@ -264,6 +264,9 @@ def get_task_dict(self, relval, step, step_index): task_dict["EventsPerLumi"] = int(events_per_job) else: input_step = relval.get("steps")[step.get_input_step_index()] + + task_dict["SplittingAlgo"] = "LumiBased" + if input_step.get_step_type() == "input_file": input_dict = input_step.get("input") # Input file step is not a task @@ -273,6 +276,7 @@ def get_task_dict(self, relval, step, step_index): task_dict["LumiList"] = input_dict["lumisection"] elif input_dict["run"]: task_dict["RunWhitelist"] = input_dict["run"] + else: task_dict["InputTask"] = input_step.get_short_name() _, input_module = step.get_input_eventcontent(input_step) @@ -280,9 +284,7 @@ def get_task_dict(self, relval, step, step_index): if step.get("lumis_per_job") != "": task_dict["LumisPerJob"] = int(step.get("lumis_per_job")) - - task_dict["SplittingAlgo"] = "LumiBased" - + task_dict["TaskName"] = step.get_short_name() task_dict["ConfigCacheID"] = step.get("config_id") task_dict["KeepOutput"] = step.get("keep_output") diff --git a/core/controller/ticket_controller.py b/core/controller/ticket_controller.py index 7e45d71..43215e1 100644 --- a/core/controller/ticket_controller.py +++ b/core/controller/ticket_controller.py @@ -365,12 +365,28 @@ def generate_workflows(self, ticket, ssh_executor): "core/utils/run_the_matrix_pdmv.py", f"{remote_directory}/run_the_matrix_pdmv.py", ) + + ssh_executor.upload_file( + "core/utils/dqm.py", + f"{remote_directory}/dqm.py", + ) + ssh_executor.upload_file( + "core/utils/das.py", + f"{remote_directory}/das.py", + ) + command = [ + f"cd {remote_directory}", + "voms-proxy-init -voms cms --valid 4:00 --out $(pwd)/proxy.txt", + ] + ssh_executor.execute_command(command) + # Defined a name for output file file_name = f"{ticket_prepid}.json" # Execute run_the_matrix_pdmv.py matrix_command = run_commands_in_cmsenv( [ f"cd {remote_directory}", + "export X509_USER_PROXY=$(pwd)/proxy.txt", "$PYTHON_INT run_the_matrix_pdmv.py " f"-l={workflow_ids} " f"-w={matrix} " diff --git a/core/model/relval_step.py b/core/model/relval_step.py index e0ebc99..d91fabb 100644 --- a/core/model/relval_step.py +++ b/core/model/relval_step.py @@ -284,23 +284,6 @@ def __build_das_command(self, step_index): return (comment + '\n' + command).strip() events = input_dict['events'] - - ## N.B. das-up-to-nevents.py exists only from 14_1_0_pre7 - cmssw_components = lambda x: x.strip().split("_") - cmssw_release = cmssw_components(self.get_release()) - check_das_up_to_nevents = cmssw_release >= cmssw_components("14_1_0_pre7") - - if events > 0 and check_das_up_to_nevents: - self.logger.info('Making a DAS command for step %s with max events', step_index) - files_name = f'step{step_index + 1}_files.txt' - comment = f'# Arguments for step {step_index + 1}:\n' - command = f'# Command for step {step_index + 1}:\n' - comment += f'# dataset: {dataset}\n' - comment += f'# events : {events}\n' - command += f'echo "" > {files_name}\n' - command += f'das-up-to-nevents.py -d {dataset} -e {events} ' - command += f'>> {files_name}\n' - return (comment + '\n' + command).strip() return f'# Step {step_index + 1} is input dataset for next step: {dataset}' diff --git a/core/utils/das.py b/core/utils/das.py new file mode 100755 index 0000000..2537efa --- /dev/null +++ b/core/utils/das.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python3 +import pandas as pd +import subprocess +import itertools +import numpy as np + +def get_lumi_ranges(i): + result = [] + for _, b in itertools.groupby(enumerate(i), lambda pair: pair[1] - pair[0]): + b = list(b) + result.append([b[0][1],b[-1][1]]) + return result + +def das_do_command(cmd): + out = subprocess.check_output(cmd, shell=True, executable="/bin/bash").decode('utf8') + return out.split("\n") + +def das_file_site(dataset, site): + cmd = "dasgoclient --query='file dataset=%s site=%s'"%(dataset,site) + out = das_do_command(cmd) + df = pd.DataFrame(out,columns=["file"]) + + return df + +def das_file_data(dataset,opt=""): + cmd = "dasgoclient --query='file dataset=%s %s| grep file.name, file.nevents'"%(dataset,opt) + out = das_do_command(cmd) + out = [np.array(r.split(" "))[[0,3]] for r in out if len(r) > 0] + + df = pd.DataFrame(out,columns=["file","events"]) + df.events = df.events.values.astype(int) + + return df + +def das_lumi_data(dataset,opt=""): + cmd = "dasgoclient --query='file,lumi,run dataset=%s %s'"%(dataset,opt) + + out = das_do_command(cmd) + out = [r.split(" ") for r in out if len(r)>0] + + df = pd.DataFrame(out,columns=["file","run","lumis"]) + + return df + +def das_run_events_data(dataset,run,opt=""): + cmd = "dasgoclient --query='file dataset=%s run=%s %s | sum(file.nevents) '"%(dataset,run,opt) + out = das_do_command(cmd)[0] + + out = [o for o in out.split(" ") if "sum" not in o] + out = int([r.split(" ") for r in out if len(r)>0][0][0]) + + return out + +def das_run_data(dataset,opt=""): + cmd = "dasgoclient --query='run dataset=%s %s '"%(dataset,opt) + out = das_do_command(cmd) + + return out + +def get_events_df(golden,dataset,events): + + ''' + Given in input: + - a run by run certification json + - the dataset name + - the number of desired events + this produces a pandas dataframe with a file per row. + + For each row it has: the file name, lumisections, run and number + of events and the cumulative sum of the events. + ''' + + lumi_df = das_lumi_data(dataset) + file_df = das_file_data(dataset) + + df = lumi_df.merge(file_df,on="file",how="inner") # merge file informations with run and lumis + df["lumis"] = [[int(ff) for ff in f.replace("[","").replace("]","").split(",")] for f in df.lumis.values] + + df_rs = [] + + for r in golden: + cut = (df["run"] == r) + if not any(cut): + continue + + df_r = df[cut] + + # jumping very low event count runs + if df_r["events"].sum() < 10000: + continue + + good_lumis = np.array([len([ll for ll in l if ll in golden[r]]) for l in df_r.lumis]) + n_lumis = np.array([len(l) for l in df_r.lumis]) + df_rs.append(df_r[good_lumis==n_lumis]) + + if len(df_rs)==0: + return pd.DataFrame([]) + elif len(df_rs)==1: + df = df_rs + else: + df = pd.concat(df_rs) + + ## lumi sorting + df.loc[:,"min_lumi"] = [min(f) for f in df.lumis] + df.loc[:,"max_lumi"] = [max(f) for f in df.lumis] + df = df.sort_values(["run","min_lumi","max_lumi"]) + + ## events skimming + df = df[df["events"] <= events] #jump too big files + df.loc[:,"sum_evs"] = df.loc[:,"events"].cumsum() + df = df[df["sum_evs"] < events] + + return df + +def get_run_lumi(df): + + if len(df) == 0: + return {} + + run_list = np.unique(df.run.values).tolist() + lumi_list = [get_lumi_ranges(np.sort(np.concatenate(df.loc[df["run"]==r,"lumis"].values).ravel()).tolist()) for r in run_list] + + lumi_ranges = dict(zip(run_list,lumi_list)) + + return lumi_ranges + +def get_lumi_dict(golden,dataset,events): + + df = get_events_df(golden,dataset,events) + lumi = get_run_lumi(df) + + return lumi + diff --git a/core/utils/dqm.py b/core/utils/dqm.py new file mode 100644 index 0000000..e3a5860 --- /dev/null +++ b/core/utils/dqm.py @@ -0,0 +1,91 @@ +from bs4 import BeautifulSoup +import pycurl +from io import BytesIO +import os +import ast +import numpy as np +import json + +base_cert_url = "https://cms-service-dqmdc.web.cern.ch/CAF/certification/" +base_cert_path = "/eos/user/c/cmsdqm/www/CAF/certification/" + +def get_url_clean(url): + + buffer = BytesIO() + c = pycurl.Curl() + c.setopt(c.URL, url) + c.setopt(c.WRITEDATA, buffer) + c.perform() + c.close() + + return BeautifulSoup(buffer.getvalue(), "lxml").text + +def get_cert_type(dataset): + + year = dataset.split("Run")[1][2:4] # from 20XX to XX + PD = dataset.split("/")[1] + cert_type = "Collisions" + str(year) + if "Cosmics" in dataset: + cert_type = "Cosmics" + str(year) + elif "Commisioning" in dataset: + cert_type = "Commisioning2020" + elif "HI" in PD: + cert_type = "Collisions" + str(year) + "HI" + + return cert_type + +def get_json_list(dataset,cert_type,web_fallback): + + ## if we have access to eos we get from there ... + if not web_fallback: + cert_path = base_cert_path + cert_type + "/" + json_list = os.listdir(cert_path) + if len(json_list) == 0: + web_fallback == True + json_list = [c for c in json_list if "Golden" in c and "era" not in c] + json_list = [c for c in json_list if c.startswith("Cert_C") and c.endswith("json")] + ## ... if not we go to the website + else: + cert_url = base_cert_url + cert_type + "/" + json_list = get_url_clean(cert_url).split("\n") + json_list = [c for c in json_list if "Golden" in c and "era" not in c and "Cert_C" in c] + json_list = [[cc for cc in c.split(" ") if cc.startswith("Cert_C") and cc.endswith("json")][0] for c in json_list] + + return json_list + +def get_golden_json(dataset): + ''' + Get the flattened golden json with highest + lumi range based on the dataset name. + ''' + + golden_flat = {} + + cert_type = get_cert_type(dataset) + cert_path = base_cert_path + cert_type + "/" + cert_url = base_cert_url + cert_type + "/" + web_fallback = not os.path.isdir(cert_path) + + json_list = get_json_list(dataset,cert_type,web_fallback) + + # the larger the better, assuming file naming schema + # Cert_X_RunStart_RunFinish_Type.json + run_ranges = [int(c.split("_")[3]) - int(c.split("_")[2]) for c in json_list] + latest_json = np.array(json_list[np.argmax(run_ranges)]).reshape(1,-1)[0].astype(str) + best_json = str(latest_json[0]) + if not web_fallback: + with open(cert_path + "/" + best_json) as js: + golden = json.load(js) + else: + golden = get_url_clean(cert_url + best_json) + golden = ast.literal_eval(golden) #converts string to dict + + # golden json with all the lumisections one by one + for k in golden: + R = [] + for r in golden[k]: + R = R + [f for f in range(r[0],r[1]+1)] + golden_flat[k] = R + + return golden_flat + diff --git a/core/utils/run_the_matrix_pdmv.py b/core/utils/run_the_matrix_pdmv.py index e8a64ca..df26544 100644 --- a/core/utils/run_the_matrix_pdmv.py +++ b/core/utils/run_the_matrix_pdmv.py @@ -8,6 +8,9 @@ import importlib import inspect import re +from das import get_lumi_dict +from dqm import get_golden_json +from os import environ as env #pylint: disable=wrong-import-position,import-error import Configuration.PyReleaseValidation.relval_steps as steps_module from Configuration.PyReleaseValidation.MatrixInjector import MatrixInjector @@ -153,11 +156,21 @@ def merge_additional_command(workflow_step, command): print('Merging to %s' % (workflow_step)) return steps_module.merge([command_dict, workflow_step]) +def get_lumi_ranges_from_dict(step_input): + """ + Given the dict of an INPUT step outputs the lumi mask to be used + taking into account the max number of events to be processed + """ + golden = get_golden_json(step_input.dataSet) + lumi = get_lumi_dict(golden,step_input.dataSet,step_input.events) + + return lumi def make_relval_step(workflow_step, workflow_step_name, wmsplit): """ Build one workflow step - either input dataset or cmsDriver """ + print(workflow_step) step = {'name': workflow_step_name} if workflow_step_name in wmsplit: step['lumis_per_job'] = wmsplit[workflow_step_name] @@ -170,8 +183,13 @@ def make_relval_step(workflow_step, workflow_step_name, wmsplit): if 'INPUT' in workflow_step: # This step has input dataset step_input = workflow_step['INPUT'] + lumisections = step_input.ls + + if not step_input.ls and step_input.events: + lumisections = get_lumi_ranges_from_dict(step_input) + step['input'] = {'dataset': step_input.dataSet, - 'lumisection': step_input.ls, + 'lumisection': lumisections, 'run': step_input.run, 'label': step_input.label, 'events': step_input.events} From 641889abe2ad87feee3a90cf7293ad69044aa4c2 Mon Sep 17 00:00:00 2001 From: Geovanny Gonzalez Date: Thu, 12 Dec 2024 01:16:37 +0100 Subject: [PATCH 2/9] Suggestion: Remove `get_url_clean` function Instead provide two new functions `list_certification_files` and `get_certification_file` for listing all the files related to a certification type and get a specific golden JSON file. Also, sort the imports using `isort`. --- core/utils/dqm.py | 95 ++++++++++++++++++++++++++++++++++++----------- 1 file changed, 73 insertions(+), 22 deletions(-) diff --git a/core/utils/dqm.py b/core/utils/dqm.py index e3a5860..28065c0 100644 --- a/core/utils/dqm.py +++ b/core/utils/dqm.py @@ -1,24 +1,73 @@ -from bs4 import BeautifulSoup -import pycurl -from io import BytesIO +import json import os -import ast + import numpy as np -import json +import requests +from bs4 import BeautifulSoup +from requests.exceptions import HTTPError base_cert_url = "https://cms-service-dqmdc.web.cern.ch/CAF/certification/" base_cert_path = "/eos/user/c/cmsdqm/www/CAF/certification/" -def get_url_clean(url): - - buffer = BytesIO() - c = pycurl.Curl() - c.setopt(c.URL, url) - c.setopt(c.WRITEDATA, buffer) - c.perform() - c.close() - - return BeautifulSoup(buffer.getvalue(), "lxml").text +def list_certification_files(cert_type: str) -> list[str]: + """ + List all the certification files related to a certification type + in the CMS DQM certification server. + + Args: + cert_type: Certification type. This corresponds to the folder + name available in the server. + + Returns: + All the JSON certification file names. + + Raises: + HTTPError: If it is not possible to retrieve the index HTML + page related to the certification type from the server. + """ + dqm_cert_url = "https://cms-service-dqmdc.web.cern.ch/CAF/certification" + page_content = requests.get(url=f"{dqm_cert_url}/{cert_type}/") + if page_content.status_code != 200: + raise HTTPError( + "Unable to retrieve the content related to: %s", + cert_type, + response=page_content + ) + + # Parse the HTML and retrieve the file names + page_content = BeautifulSoup(page_content.text, "lxml").text + + file_names = [] + for file_line in page_content.strip().split("\n"): + if file_line and ".json" in file_line: + metadata = file_line.strip().split(" ") + file_name = metadata[0] + if file_name.endswith(".json"): + file_names.append(file_name) + + return file_names + +def get_certification_file(path: str) -> dict: + """ + Get a certification file from the CMS DQM certification + server. + + Args: + path: Path to the certification file on the server. + + Returns: + Golden JSON file + """ + dqm_cert_url = "https://cms-service-dqmdc.web.cern.ch/CAF/certification" + file = requests.get(url=f"{dqm_cert_url}/{path}") + if file.status_code != 200: + raise HTTPError( + "Unable to retrieve the content related to: %s", + path, + response=file + ) + + return file.json() def get_cert_type(dataset): @@ -46,10 +95,14 @@ def get_json_list(dataset,cert_type,web_fallback): json_list = [c for c in json_list if c.startswith("Cert_C") and c.endswith("json")] ## ... if not we go to the website else: - cert_url = base_cert_url + cert_type + "/" - json_list = get_url_clean(cert_url).split("\n") - json_list = [c for c in json_list if "Golden" in c and "era" not in c and "Cert_C" in c] - json_list = [[cc for cc in c.split(" ") if cc.startswith("Cert_C") and cc.endswith("json")][0] for c in json_list] + json_list = list_certification_files(cert_type=cert_type) + json_list = [ + file_name + for file_name in json_list + if "Golden" in file_name + and "Cert_C" in file_name + and "era" not in file_name + ] return json_list @@ -63,7 +116,6 @@ def get_golden_json(dataset): cert_type = get_cert_type(dataset) cert_path = base_cert_path + cert_type + "/" - cert_url = base_cert_url + cert_type + "/" web_fallback = not os.path.isdir(cert_path) json_list = get_json_list(dataset,cert_type,web_fallback) @@ -77,8 +129,7 @@ def get_golden_json(dataset): with open(cert_path + "/" + best_json) as js: golden = json.load(js) else: - golden = get_url_clean(cert_url + best_json) - golden = ast.literal_eval(golden) #converts string to dict + golden = get_certification_file(path=f"{cert_type}/{best_json}") # golden json with all the lumisections one by one for k in golden: From b20f7d01524f6cbbd87ffb5c82156559a6e6f7d6 Mon Sep 17 00:00:00 2001 From: Geovanny Gonzalez Date: Thu, 12 Dec 2024 01:54:24 +0100 Subject: [PATCH 3/9] Fix: Backward compatibility with old releases Remove type hints as old Python version do not support them --- core/utils/dqm.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/core/utils/dqm.py b/core/utils/dqm.py index 28065c0..a1a8ead 100644 --- a/core/utils/dqm.py +++ b/core/utils/dqm.py @@ -9,24 +9,25 @@ base_cert_url = "https://cms-service-dqmdc.web.cern.ch/CAF/certification/" base_cert_path = "/eos/user/c/cmsdqm/www/CAF/certification/" -def list_certification_files(cert_type: str) -> list[str]: +def list_certification_files(cert_type): """ List all the certification files related to a certification type in the CMS DQM certification server. Args: - cert_type: Certification type. This corresponds to the folder + cert_type (str): Certification type. This corresponds to the folder name available in the server. Returns: - All the JSON certification file names. + list[str]: All the JSON certification file names. Raises: HTTPError: If it is not possible to retrieve the index HTML page related to the certification type from the server. """ dqm_cert_url = "https://cms-service-dqmdc.web.cern.ch/CAF/certification" - page_content = requests.get(url=f"{dqm_cert_url}/{cert_type}/") + url = "%s/%s/" % (dqm_cert_url, cert_type) + page_content = requests.get(url=url) if page_content.status_code != 200: raise HTTPError( "Unable to retrieve the content related to: %s", @@ -47,19 +48,20 @@ def list_certification_files(cert_type: str) -> list[str]: return file_names -def get_certification_file(path: str) -> dict: +def get_certification_file(path): """ Get a certification file from the CMS DQM certification server. Args: - path: Path to the certification file on the server. + path (str): Path to the certification file on the server. Returns: - Golden JSON file + dict: Golden JSON file """ dqm_cert_url = "https://cms-service-dqmdc.web.cern.ch/CAF/certification" - file = requests.get(url=f"{dqm_cert_url}/{path}") + url = "%s/%s" % (dqm_cert_url, path) + file = requests.get(url=url) if file.status_code != 200: raise HTTPError( "Unable to retrieve the content related to: %s", @@ -129,7 +131,8 @@ def get_golden_json(dataset): with open(cert_path + "/" + best_json) as js: golden = json.load(js) else: - golden = get_certification_file(path=f"{cert_type}/{best_json}") + path = "%s/%s" % (cert_type, best_json) + golden = get_certification_file(path=path) # golden json with all the lumisections one by one for k in golden: From 63dbb5e40ff807963efe69eab676de5df62c0c8b Mon Sep 17 00:00:00 2001 From: adiflori Date: Thu, 12 Dec 2024 11:04:01 +0100 Subject: [PATCH 4/9] Add some docs --- core/utils/das.py | 105 ++++++++++++++++++++++++++++++++-------------- core/utils/dqm.py | 33 +++++++++++---- 2 files changed, 97 insertions(+), 41 deletions(-) diff --git a/core/utils/das.py b/core/utils/das.py index 2537efa..ea8021e 100755 --- a/core/utils/das.py +++ b/core/utils/das.py @@ -5,6 +5,13 @@ import numpy as np def get_lumi_ranges(i): + """ + Having in input a list of lumis this outputs a list of list + grouping contigous elements in a single rangel-like list. + E.g. + > input = [4,5,6,7,8,200,201,202,222] + > output = [[4, 8], [200, 202], [222, 222]] + """ result = [] for _, b in itertools.groupby(enumerate(i), lambda pair: pair[1] - pair[0]): b = list(b) @@ -12,17 +19,18 @@ def get_lumi_ranges(i): return result def das_do_command(cmd): + """ + A simple wrapper for dasgoclient + """ out = subprocess.check_output(cmd, shell=True, executable="/bin/bash").decode('utf8') return out.split("\n") -def das_file_site(dataset, site): - cmd = "dasgoclient --query='file dataset=%s site=%s'"%(dataset,site) - out = das_do_command(cmd) - df = pd.DataFrame(out,columns=["file"]) - - return df - def das_file_data(dataset,opt=""): + """ + Given a dataset create a pandas DataFrame + with the list of file names and number + of events per file + """ cmd = "dasgoclient --query='file dataset=%s %s| grep file.name, file.nevents'"%(dataset,opt) out = das_do_command(cmd) out = [np.array(r.split(" "))[[0,3]] for r in out if len(r) > 0] @@ -33,6 +41,18 @@ def das_file_data(dataset,opt=""): return df def das_lumi_data(dataset,opt=""): + """ + Produces a file by file+lumi+run pandas DataFrame + + Args: + dataset: the dataset anme '/PD/GTString/DATA-TIER' + + Returns: + A pandas DataFrame having for each row a single file and as columns: + - the file name; + - the lumisections; + + """ cmd = "dasgoclient --query='file,lumi,run dataset=%s %s'"%(dataset,opt) out = das_do_command(cmd) @@ -42,33 +62,24 @@ def das_lumi_data(dataset,opt=""): return df -def das_run_events_data(dataset,run,opt=""): - cmd = "dasgoclient --query='file dataset=%s run=%s %s | sum(file.nevents) '"%(dataset,run,opt) - out = das_do_command(cmd)[0] - - out = [o for o in out.split(" ") if "sum" not in o] - out = int([r.split(" ") for r in out if len(r)>0][0][0]) - - return out - -def das_run_data(dataset,opt=""): - cmd = "dasgoclient --query='run dataset=%s %s '"%(dataset,opt) - out = das_do_command(cmd) +def get_events_df(golden,dataset,events): - return out + """ + Produces a file by file pandas DataFrame -def get_events_df(golden,dataset,events): + Args: + golden: a run by run certification json + dataset: the dataset anme '/PD/GTString/DATA-TIER' + events: max number of events (an int). - ''' - Given in input: - - a run by run certification json - - the dataset name - - the number of desired events - this produces a pandas dataframe with a file per row. + Returns: + A pandas DataFrame having for each row a single file and as columns: + - the file name; + - the lumisections; + - the run; + - the number of events. - For each row it has: the file name, lumisections, run and number - of events and the cumulative sum of the events. - ''' + """ lumi_df = das_lumi_data(dataset) file_df = das_file_data(dataset) @@ -113,7 +124,22 @@ def get_events_df(golden,dataset,events): return df def get_run_lumi(df): - + """ + Produces the lumi mask dict starting from a pandas DataFrame + + Args: + df: a pandas DataFrame having for each row a single file and as columns: + - the file name; + - the lumisections; + - the run; + - the number of events. + Returns: + A "CMS"-like lumi mask dict mapping: + - the run number; + - to the list of good lumisection ranges. + + E.g. {run : [[lumi_1,lumi_2],[lumi_3,lumi_4]]} + """ if len(df) == 0: return {} @@ -125,7 +151,22 @@ def get_run_lumi(df): return lumi_ranges def get_lumi_dict(golden,dataset,events): - + """ + Produces a lumi mask for a given datasets, up to envets, using a certification json + + Args: + golden: a run by run certification json + dataset: the dataset anme '/PD/GTString/DATA-TIER' + events: max number of events (an int). + + Returns: + A "CMS"-like lumi mask dict mapping: + - the run number; + - to the list of good lumisection ranges. + + E.g. {run : [[lumi_1,lumi_2],[lumi_3,lumi_4]]} + """ + df = get_events_df(golden,dataset,events) lumi = get_run_lumi(df) diff --git a/core/utils/dqm.py b/core/utils/dqm.py index e3a5860..da0b6cf 100644 --- a/core/utils/dqm.py +++ b/core/utils/dqm.py @@ -1,5 +1,5 @@ from bs4 import BeautifulSoup -import pycurl +import requests from io import BytesIO import os import ast @@ -10,7 +10,9 @@ base_cert_path = "/eos/user/c/cmsdqm/www/CAF/certification/" def get_url_clean(url): - + """ + A simple cleaner for pycurl + """ buffer = BytesIO() c = pycurl.Curl() c.setopt(c.URL, url) @@ -21,7 +23,9 @@ def get_url_clean(url): return BeautifulSoup(buffer.getvalue(), "lxml").text def get_cert_type(dataset): - + """ + Get the type of certification linked to a dataset + """ year = dataset.split("Run")[1][2:4] # from 20XX to XX PD = dataset.split("/")[1] cert_type = "Collisions" + str(year) @@ -34,8 +38,11 @@ def get_cert_type(dataset): return cert_type -def get_json_list(dataset,cert_type,web_fallback): - +def get_json_list(cert_type,web_fallback): + """ + Get the list of Golden jsons given the type of + certification we are looking for (Collisions,Cosmics,Commisioning,HI) + """ ## if we have access to eos we get from there ... if not web_fallback: cert_path = base_cert_path + cert_type + "/" @@ -47,17 +54,25 @@ def get_json_list(dataset,cert_type,web_fallback): ## ... if not we go to the website else: cert_url = base_cert_url + cert_type + "/" - json_list = get_url_clean(cert_url).split("\n") + + response = requests.get(url=cert_url) + if response.status_code != 200: + raise RuntimeError(f"Unable to retrieve the content from: {url}") + + page_content = response.text + json_list = BeautifulSoup(buffer.getvalue(), "lxml").text + + #json_list = get_url_clean(cert_url).split("\n") json_list = [c for c in json_list if "Golden" in c and "era" not in c and "Cert_C" in c] json_list = [[cc for cc in c.split(" ") if cc.startswith("Cert_C") and cc.endswith("json")][0] for c in json_list] return json_list def get_golden_json(dataset): - ''' + """ Get the flattened golden json with highest lumi range based on the dataset name. - ''' + """ golden_flat = {} @@ -66,7 +81,7 @@ def get_golden_json(dataset): cert_url = base_cert_url + cert_type + "/" web_fallback = not os.path.isdir(cert_path) - json_list = get_json_list(dataset,cert_type,web_fallback) + json_list = get_json_list(cert_type,web_fallback) # the larger the better, assuming file naming schema # Cert_X_RunStart_RunFinish_Type.json From b6a78c66f461deb88a192bf4356565cd5d0288d3 Mon Sep 17 00:00:00 2001 From: adiflori Date: Mon, 16 Dec 2024 15:55:28 +0100 Subject: [PATCH 5/9] Docs and preselection for data wfs with input (via label) --- core/utils/das.py | 54 ++++++++++++++++++++----------- core/utils/dqm.py | 41 +++++++++++++++++++---- core/utils/requirements.txt | 2 ++ core/utils/run_the_matrix_pdmv.py | 2 +- 4 files changed, 73 insertions(+), 26 deletions(-) create mode 100644 core/utils/requirements.txt diff --git a/core/utils/das.py b/core/utils/das.py index ea8021e..830cb3c 100755 --- a/core/utils/das.py +++ b/core/utils/das.py @@ -6,11 +6,14 @@ def get_lumi_ranges(i): """ - Having in input a list of lumis this outputs a list of list - grouping contigous elements in a single rangel-like list. - E.g. - > input = [4,5,6,7,8,200,201,202,222] - > output = [[4, 8], [200, 202], [222, 222]] + An helper to transform a list of lumisections into a list of lists (ranges). + It groups contigous elements in a single rangel-like list. + + Args: + i: a list of ints. + + Returns: + list[list[int]]: a single rangel-like list. """ result = [] for _, b in itertools.groupby(enumerate(i), lambda pair: pair[1] - pair[0]): @@ -18,21 +21,36 @@ def get_lumi_ranges(i): result.append([b[0][1],b[-1][1]]) return result -def das_do_command(cmd): +def das_do_command(query): """ - A simple wrapper for dasgoclient + A simple wrapper for dasgoclient. + + Args: + cmd: a dasgoclient query. + + Returns: + list[str]: the dasgoclient command output split by newlines. + """ + cmd = 'dasgoclient --query="%s"'%(query) out = subprocess.check_output(cmd, shell=True, executable="/bin/bash").decode('utf8') return out.split("\n") -def das_file_data(dataset,opt=""): +def das_file_data(dataset): """ - Given a dataset create a pandas DataFrame - with the list of file names and number - of events per file + Given a dataset create a pandas DataFrame with the + list of file names and number of events per file. + + Args: + dataset: the dataset anme '/PD/GTString/DATA-TIER' + + Returns: + A pandas DataFrame having for each row a single file and as columns: + - the file name; + - the number of events in each file. """ - cmd = "dasgoclient --query='file dataset=%s %s| grep file.name, file.nevents'"%(dataset,opt) - out = das_do_command(cmd) + query = 'file dataset=%s | grep file.name, file.nevents'%(dataset) + out = das_do_command(query) out = [np.array(r.split(" "))[[0,3]] for r in out if len(r) > 0] df = pd.DataFrame(out,columns=["file","events"]) @@ -40,7 +58,7 @@ def das_file_data(dataset,opt=""): return df -def das_lumi_data(dataset,opt=""): +def das_lumi_data(dataset): """ Produces a file by file+lumi+run pandas DataFrame @@ -50,12 +68,12 @@ def das_lumi_data(dataset,opt=""): Returns: A pandas DataFrame having for each row a single file and as columns: - the file name; - - the lumisections; + - the lumisections. """ - cmd = "dasgoclient --query='file,lumi,run dataset=%s %s'"%(dataset,opt) + query = 'file,lumi,run dataset=%s '%(dataset) - out = das_do_command(cmd) + out = das_do_command(query) out = [r.split(" ") for r in out if len(r)>0] df = pd.DataFrame(out,columns=["file","run","lumis"]) @@ -69,7 +87,7 @@ def get_events_df(golden,dataset,events): Args: golden: a run by run certification json - dataset: the dataset anme '/PD/GTString/DATA-TIER' + dataset: the dataset name as a string '/PD/GTString/DATA-TIER' events: max number of events (an int). Returns: diff --git a/core/utils/dqm.py b/core/utils/dqm.py index 2c08ad8..0d99cff 100644 --- a/core/utils/dqm.py +++ b/core/utils/dqm.py @@ -6,7 +6,6 @@ from bs4 import BeautifulSoup from requests.exceptions import HTTPError -base_cert_url = "https://cms-service-dqmdc.web.cern.ch/CAF/certification/" base_cert_path = "/eos/user/c/cmsdqm/www/CAF/certification/" def list_certification_files(cert_type): @@ -57,7 +56,7 @@ def get_certification_file(path): path (str): Path to the certification file on the server. Returns: - dict: Golden JSON file + dict: Golden JSON file. """ dqm_cert_url = "https://cms-service-dqmdc.web.cern.ch/CAF/certification" url = "%s/%s" % (dqm_cert_url, path) @@ -73,7 +72,16 @@ def get_certification_file(path): def get_cert_type(dataset): """ - Get the type of certification linked to a dataset + List all the certification files related to a certification type + in the CMS DQM certification server. + + Args: + dataset: the dataset name as a string '/PD/GTString/DATA-TIER'. + + Returns: + str: The type of certification we seek (Collisions, HI, Cosmics + or Commisioning). + """ year = dataset.split("Run")[1][2:4] # from 20XX to XX PD = dataset.split("/")[1] @@ -89,9 +97,19 @@ def get_cert_type(dataset): def get_json_list(cert_type,web_fallback): """ - Get the list of Golden jsons given the type of - certification we are looking for (Collisions,Cosmics,Commisioning,HI) + List all the certification files related to a certification type + either stored on CMS DQM EOS either, as a fallback, + in the CMS DQM certification server. + + Args: + dataset: the dataset name as a string '/PD/GTString/DATA-TIER'. + web_fallback: a bool flag enabling looking for the list on CMS DQM server. + + Returns: + list[str]: All the JSON certification file names. + """ + ## if we have access to eos we get from there ... if not web_fallback: cert_path = base_cert_path + cert_type + "/" @@ -116,8 +134,17 @@ def get_json_list(cert_type,web_fallback): def get_golden_json(dataset): """ - Get the flattened golden json with highest - lumi range based on the dataset name. + Output a the golden certification dictionary (json) for a specific datasets. + In case of multiple json files available, the one with the highest + lumi range is selected. The dictionary maps each run number with a complete list + of the correspondinig golden lumisections. + + Args: + dataset: the dataset name as a string '/PD/GTString/DATA-TIER' + + Returns: + dict: Golden Run-Lumisection dictionary. + """ golden_flat = {} diff --git a/core/utils/requirements.txt b/core/utils/requirements.txt new file mode 100644 index 0000000..fab4563 --- /dev/null +++ b/core/utils/requirements.txt @@ -0,0 +1,2 @@ +pandas==2.0.1 +numpy==1.24.3 \ No newline at end of file diff --git a/core/utils/run_the_matrix_pdmv.py b/core/utils/run_the_matrix_pdmv.py index df26544..1c9ef0e 100644 --- a/core/utils/run_the_matrix_pdmv.py +++ b/core/utils/run_the_matrix_pdmv.py @@ -185,7 +185,7 @@ def make_relval_step(workflow_step, workflow_step_name, wmsplit): step_input = workflow_step['INPUT'] lumisections = step_input.ls - if not step_input.ls and step_input.events: + if not step_input.ls and step_input.events and step_input.label: lumisections = get_lumi_ranges_from_dict(step_input) step['input'] = {'dataset': step_input.dataSet, From 37b0947d8d5e6924effc2c02809fee497d6d3da6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Geovanny=20Gonz=C3=A1lez-Rodr=C3=ADguez?= Date: Mon, 20 Jan 2025 05:04:37 -0500 Subject: [PATCH 6/9] Fix typos Fix some typos for docstrings --- core/utils/das.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/utils/das.py b/core/utils/das.py index 830cb3c..8fa714b 100755 --- a/core/utils/das.py +++ b/core/utils/das.py @@ -26,7 +26,7 @@ def das_do_command(query): A simple wrapper for dasgoclient. Args: - cmd: a dasgoclient query. + query: a dasgoclient query. Returns: list[str]: the dasgoclient command output split by newlines. @@ -42,7 +42,7 @@ def das_file_data(dataset): list of file names and number of events per file. Args: - dataset: the dataset anme '/PD/GTString/DATA-TIER' + dataset: the dataset name '/PD/GTString/DATA-TIER' Returns: A pandas DataFrame having for each row a single file and as columns: @@ -63,7 +63,7 @@ def das_lumi_data(dataset): Produces a file by file+lumi+run pandas DataFrame Args: - dataset: the dataset anme '/PD/GTString/DATA-TIER' + dataset: the dataset name '/PD/GTString/DATA-TIER' Returns: A pandas DataFrame having for each row a single file and as columns: @@ -170,11 +170,11 @@ def get_run_lumi(df): def get_lumi_dict(golden,dataset,events): """ - Produces a lumi mask for a given datasets, up to envets, using a certification json + Produces a lumi mask for a given dataset, up to events, using a certification json Args: golden: a run by run certification json - dataset: the dataset anme '/PD/GTString/DATA-TIER' + dataset: the dataset name '/PD/GTString/DATA-TIER' events: max number of events (an int). Returns: From 2d1956e65cd4bf77d18c4e169a8d0ceb162306b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Geovanny=20Gonz=C3=A1lez-Rodr=C3=ADguez?= Date: Mon, 20 Jan 2025 13:27:24 +0100 Subject: [PATCH 7/9] Clarify the purpose of this `requirements.txt` file List the packages and versions used for the development of remote modules located in `utils/` --- core/utils/requirements.txt | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/core/utils/requirements.txt b/core/utils/requirements.txt index fab4563..9f58652 100644 --- a/core/utils/requirements.txt +++ b/core/utils/requirements.txt @@ -1,2 +1,9 @@ +# Just for reference, the following are the packages +# and versions used for the development of some modules. +# However, the runtime environment may contain different +# versions pre-included in the `cms-sw` container. +# +# For releases different than `el9` it is not possible to +# update the installed packages to match these versions. pandas==2.0.1 numpy==1.24.3 \ No newline at end of file From 3b292fb3e255985f213bef688dd48e3bf02021a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Geovanny=20Gonz=C3=A1lez-Rodr=C3=ADguez?= Date: Mon, 20 Jan 2025 15:26:13 +0100 Subject: [PATCH 8/9] Apply `pylint` suggestions --- core/controller/relval_controller.py | 6 +-- core/controller/ticket_controller.py | 2 +- core/model/relval_step.py | 9 ++-- core/utils/das.py | 65 ++++++++++++++++------------ core/utils/dqm.py | 54 ++++++++++------------- core/utils/run_the_matrix_pdmv.py | 14 +++--- 6 files changed, 78 insertions(+), 72 deletions(-) diff --git a/core/controller/relval_controller.py b/core/controller/relval_controller.py index 365876e..a1e7a41 100644 --- a/core/controller/relval_controller.py +++ b/core/controller/relval_controller.py @@ -284,7 +284,7 @@ def get_task_dict(self, relval, step, step_index): if step.get("lumis_per_job") != "": task_dict["LumisPerJob"] = int(step.get("lumis_per_job")) - + task_dict["TaskName"] = step.get_short_name() task_dict["ConfigCacheID"] = step.get("config_id") task_dict["KeepOutput"] = step.get("keep_output") @@ -330,7 +330,7 @@ def get_request_priority(self, relval: RelVal) -> int: priority ) return priority - + if "HighPrio" in campaign: priority = 500000 self.logger.info( @@ -338,7 +338,7 @@ def get_request_priority(self, relval: RelVal) -> int: priority ) return priority - + return 450000 def get_job_dict(self, relval): diff --git a/core/controller/ticket_controller.py b/core/controller/ticket_controller.py index 43215e1..edad349 100644 --- a/core/controller/ticket_controller.py +++ b/core/controller/ticket_controller.py @@ -365,7 +365,7 @@ def generate_workflows(self, ticket, ssh_executor): "core/utils/run_the_matrix_pdmv.py", f"{remote_directory}/run_the_matrix_pdmv.py", ) - + ssh_executor.upload_file( "core/utils/dqm.py", f"{remote_directory}/dqm.py", diff --git a/core/model/relval_step.py b/core/model/relval_step.py index d91fabb..b9a563a 100644 --- a/core/model/relval_step.py +++ b/core/model/relval_step.py @@ -1,9 +1,10 @@ """ Module that contains RelValStep class """ -import weakref import json +import weakref from copy import deepcopy + from core.model.model_base import ModelBase from core_lib.utils.common_utils import get_scram_arch @@ -111,14 +112,14 @@ def __init__(self, json_input=None, parent=None, check_attributes=True): json_input['gpu'] = schema.get('gpu') json_input['gpu']['requires'] = 'forbidden' step_input = json_input['input'] - + for key, default_value in schema['input'].items(): if key not in step_input: step_input[key] = default_value else: json_input['driver'] = {k.lstrip('-'): v for k, v in json_input['driver'].items()} json_input['input'] = schema.get('input') - + if json_input.get('gpu', {}).get('requires') not in ('optional', 'required'): json_input['gpu'] = schema.get('gpu') json_input['gpu']['requires'] = 'forbidden' @@ -283,8 +284,6 @@ def __build_das_command(self, step_index): command += f'>> {files_name}\n' return (comment + '\n' + command).strip() - events = input_dict['events'] - return f'# Step {step_index + 1} is input dataset for next step: {dataset}' def get_command(self, custom_fragment=None, for_submission=False): diff --git a/core/utils/das.py b/core/utils/das.py index 8fa714b..2ce39e3 100755 --- a/core/utils/das.py +++ b/core/utils/das.py @@ -1,8 +1,10 @@ #!/usr/bin/env python3 -import pandas as pd -import subprocess import itertools -import numpy as np +import subprocess + +import numpy as np # pylint: disable=import-error +import pandas as pd # pylint: disable=import-error + def get_lumi_ranges(i): """ @@ -11,14 +13,14 @@ def get_lumi_ranges(i): Args: i: a list of ints. - + Returns: list[list[int]]: a single rangel-like list. """ result = [] for _, b in itertools.groupby(enumerate(i), lambda pair: pair[1] - pair[0]): b = list(b) - result.append([b[0][1],b[-1][1]]) + result.append([b[0][1],b[-1][1]]) return result def das_do_command(query): @@ -27,10 +29,10 @@ def das_do_command(query): Args: query: a dasgoclient query. - + Returns: list[str]: the dasgoclient command output split by newlines. - + """ cmd = 'dasgoclient --query="%s"'%(query) out = subprocess.check_output(cmd, shell=True, executable="/bin/bash").decode('utf8') @@ -38,14 +40,14 @@ def das_do_command(query): def das_file_data(dataset): """ - Given a dataset create a pandas DataFrame with the + Given a dataset create a pandas DataFrame with the list of file names and number of events per file. Args: dataset: the dataset name '/PD/GTString/DATA-TIER' Returns: - A pandas DataFrame having for each row a single file and as columns: + A pandas DataFrame having for each row a single file and as columns: - the file name; - the number of events in each file. """ @@ -55,7 +57,7 @@ def das_file_data(dataset): df = pd.DataFrame(out,columns=["file","events"]) df.events = df.events.values.astype(int) - + return df def das_lumi_data(dataset): @@ -64,20 +66,20 @@ def das_lumi_data(dataset): Args: dataset: the dataset name '/PD/GTString/DATA-TIER' - + Returns: - A pandas DataFrame having for each row a single file and as columns: + A pandas DataFrame having for each row a single file and as columns: - the file name; - the lumisections. - + """ query = 'file,lumi,run dataset=%s '%(dataset) - + out = das_do_command(query) out = [r.split(" ") for r in out if len(r)>0] - + df = pd.DataFrame(out,columns=["file","run","lumis"]) - + return df def get_events_df(golden,dataset,events): @@ -91,7 +93,7 @@ def get_events_df(golden,dataset,events): events: max number of events (an int). Returns: - A pandas DataFrame having for each row a single file and as columns: + A pandas DataFrame having for each row a single file and as columns: - the file name; - the lumisections; - the run; @@ -103,12 +105,15 @@ def get_events_df(golden,dataset,events): file_df = das_file_data(dataset) df = lumi_df.merge(file_df,on="file",how="inner") # merge file informations with run and lumis - df["lumis"] = [[int(ff) for ff in f.replace("[","").replace("]","").split(",")] for f in df.lumis.values] - + df["lumis"] = [ + [int(ff) for ff in f.replace("[","").replace("]","").split(",")] + for f in df.lumis.values + ] + df_rs = [] for r in golden: - cut = (df["run"] == r) + cut = df["run"] == r if not any(cut): continue @@ -124,7 +129,7 @@ def get_events_df(golden,dataset,events): if len(df_rs)==0: return pd.DataFrame([]) - elif len(df_rs)==1: + if len(df_rs)==1: df = df_rs else: df = pd.concat(df_rs) @@ -140,13 +145,13 @@ def get_events_df(golden,dataset,events): df = df[df["sum_evs"] < events] return df - + def get_run_lumi(df): """ Produces the lumi mask dict starting from a pandas DataFrame Args: - df: a pandas DataFrame having for each row a single file and as columns: + df: a pandas DataFrame having for each row a single file and as columns: - the file name; - the lumisections; - the run; @@ -162,8 +167,15 @@ def get_run_lumi(df): return {} run_list = np.unique(df.run.values).tolist() - lumi_list = [get_lumi_ranges(np.sort(np.concatenate(df.loc[df["run"]==r,"lumis"].values).ravel()).tolist()) for r in run_list] - + lumi_list = [ + get_lumi_ranges( + np.sort( + np.concatenate(df.loc[df["run"]==r,"lumis"].values).ravel() + ).tolist() + ) + for r in run_list + ] + lumi_ranges = dict(zip(run_list,lumi_list)) return lumi_ranges @@ -181,7 +193,7 @@ def get_lumi_dict(golden,dataset,events): A "CMS"-like lumi mask dict mapping: - the run number; - to the list of good lumisection ranges. - + E.g. {run : [[lumi_1,lumi_2],[lumi_3,lumi_4]]} """ @@ -189,4 +201,3 @@ def get_lumi_dict(golden,dataset,events): lumi = get_run_lumi(df) return lumi - diff --git a/core/utils/dqm.py b/core/utils/dqm.py index 0d99cff..f7cd030 100644 --- a/core/utils/dqm.py +++ b/core/utils/dqm.py @@ -1,11 +1,15 @@ +import codecs import json import os +#pylint: disable=import-error import numpy as np import requests from bs4 import BeautifulSoup from requests.exceptions import HTTPError +#pylint: enable=import-error + base_cert_path = "/eos/user/c/cmsdqm/www/CAF/certification/" def list_certification_files(cert_type): @@ -26,13 +30,10 @@ def list_certification_files(cert_type): """ dqm_cert_url = "https://cms-service-dqmdc.web.cern.ch/CAF/certification" url = "%s/%s/" % (dqm_cert_url, cert_type) - page_content = requests.get(url=url) + page_content = requests.get(url=url, timeout=30) if page_content.status_code != 200: - raise HTTPError( - "Unable to retrieve the content related to: %s", - cert_type, - response=page_content - ) + error_msg = f"Unable to retrieve the content related to {cert_type}" + raise HTTPError(error_msg, response=page_content) # Parse the HTML and retrieve the file names page_content = BeautifulSoup(page_content.text, "lxml").text @@ -60,13 +61,10 @@ def get_certification_file(path): """ dqm_cert_url = "https://cms-service-dqmdc.web.cern.ch/CAF/certification" url = "%s/%s" % (dqm_cert_url, path) - file = requests.get(url=url) + file = requests.get(url=url, timeout=30) if file.status_code != 200: - raise HTTPError( - "Unable to retrieve the content related to: %s", - path, - response=file - ) + error_msg = f"Unable to retrieve the content related to {path}" + raise HTTPError(error_msg, response=file) return file.json() @@ -79,12 +77,12 @@ def get_cert_type(dataset): dataset: the dataset name as a string '/PD/GTString/DATA-TIER'. Returns: - str: The type of certification we seek (Collisions, HI, Cosmics + str: The type of certification we seek (Collisions, HI, Cosmics or Commisioning). """ year = dataset.split("Run")[1][2:4] # from 20XX to XX - PD = dataset.split("/")[1] + PD = dataset.split("/")[1] # pylint: disable=invalid-name cert_type = "Collisions" + str(year) if "Cosmics" in dataset: cert_type = "Cosmics" + str(year) @@ -92,7 +90,7 @@ def get_cert_type(dataset): cert_type = "Commisioning2020" elif "HI" in PD: cert_type = "Collisions" + str(year) + "HI" - + return cert_type def get_json_list(cert_type,web_fallback): @@ -109,35 +107,32 @@ def get_json_list(cert_type,web_fallback): list[str]: All the JSON certification file names. """ - + ## if we have access to eos we get from there ... if not web_fallback: cert_path = base_cert_path + cert_type + "/" json_list = os.listdir(cert_path) - if len(json_list) == 0: - web_fallback == True json_list = [c for c in json_list if "Golden" in c and "era" not in c] json_list = [c for c in json_list if c.startswith("Cert_C") and c.endswith("json")] ## ... if not we go to the website else: - json_list = list_certification_files(cert_type=cert_type) json_list = [ file_name for file_name in json_list - if "Golden" in file_name + if "Golden" in file_name and "Cert_C" in file_name - and "era" not in file_name + and "era" not in file_name ] return json_list def get_golden_json(dataset): - """ - Output a the golden certification dictionary (json) for a specific datasets. - In case of multiple json files available, the one with the highest + """ + Output a the golden certification dictionary (json) for a specific datasets. + In case of multiple json files available, the one with the highest lumi range is selected. The dictionary maps each run number with a complete list - of the correspondinig golden lumisections. + of the correspondinig golden lumisections. Args: dataset: the dataset name as a string '/PD/GTString/DATA-TIER' @@ -161,18 +156,17 @@ def get_golden_json(dataset): latest_json = np.array(json_list[np.argmax(run_ranges)]).reshape(1,-1)[0].astype(str) best_json = str(latest_json[0]) if not web_fallback: - with open(cert_path + "/" + best_json) as js: + with codecs.open(cert_path + "/" + best_json, encoding="utf-8") as js: golden = json.load(js) else: path = "%s/%s" % (cert_type, best_json) golden = get_certification_file(path=path) - + # golden json with all the lumisections one by one for k in golden: - R = [] + R = [] # pylint: disable=invalid-name for r in golden[k]: - R = R + [f for f in range(r[0],r[1]+1)] + R = R + list(range(r[0], r[1] + 1)) # pylint: disable=invalid-name golden_flat[k] = R return golden_flat - diff --git a/core/utils/run_the_matrix_pdmv.py b/core/utils/run_the_matrix_pdmv.py index 1c9ef0e..a1ec2eb 100644 --- a/core/utils/run_the_matrix_pdmv.py +++ b/core/utils/run_the_matrix_pdmv.py @@ -2,18 +2,20 @@ PdmV's simplified implementation of runTheMatrix.py """ from __future__ import print_function -import sys + import argparse -import json import importlib import inspect +import json import re -from das import get_lumi_dict -from dqm import get_golden_json -from os import environ as env +import sys + #pylint: disable=wrong-import-position,import-error import Configuration.PyReleaseValidation.relval_steps as steps_module from Configuration.PyReleaseValidation.MatrixInjector import MatrixInjector +from das import get_lumi_dict +from dqm import get_golden_json + #pylint: enable=wrong-import-position,import-error @@ -163,7 +165,7 @@ def get_lumi_ranges_from_dict(step_input): """ golden = get_golden_json(step_input.dataSet) lumi = get_lumi_dict(golden,step_input.dataSet,step_input.events) - + return lumi def make_relval_step(workflow_step, workflow_step_name, wmsplit): From 80ef044a7c48f5a10c5056dbc46129230e2d3457 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Geovanny=20Gonz=C3=A1lez-Rodr=C3=ADguez?= Date: Mon, 20 Jan 2025 15:32:18 +0100 Subject: [PATCH 9/9] Format code with `black` Format some modules, in special the new ones. --- core/controller/ticket_controller.py | 1 + core/utils/das.py | 61 ++++++++++++++++------------ core/utils/dqm.py | 29 ++++++++----- 3 files changed, 56 insertions(+), 35 deletions(-) diff --git a/core/controller/ticket_controller.py b/core/controller/ticket_controller.py index edad349..c625ae3 100644 --- a/core/controller/ticket_controller.py +++ b/core/controller/ticket_controller.py @@ -1,6 +1,7 @@ """ Module that contains TicketController class """ + import json from copy import deepcopy from environment import ( diff --git a/core/utils/das.py b/core/utils/das.py index 2ce39e3..192f5a9 100755 --- a/core/utils/das.py +++ b/core/utils/das.py @@ -20,9 +20,10 @@ def get_lumi_ranges(i): result = [] for _, b in itertools.groupby(enumerate(i), lambda pair: pair[1] - pair[0]): b = list(b) - result.append([b[0][1],b[-1][1]]) + result.append([b[0][1], b[-1][1]]) return result + def das_do_command(query): """ A simple wrapper for dasgoclient. @@ -34,10 +35,13 @@ def das_do_command(query): list[str]: the dasgoclient command output split by newlines. """ - cmd = 'dasgoclient --query="%s"'%(query) - out = subprocess.check_output(cmd, shell=True, executable="/bin/bash").decode('utf8') + cmd = 'dasgoclient --query="%s"' % (query) + out = subprocess.check_output(cmd, shell=True, executable="/bin/bash").decode( + "utf8" + ) return out.split("\n") + def das_file_data(dataset): """ Given a dataset create a pandas DataFrame with the @@ -51,15 +55,16 @@ def das_file_data(dataset): - the file name; - the number of events in each file. """ - query = 'file dataset=%s | grep file.name, file.nevents'%(dataset) + query = "file dataset=%s | grep file.name, file.nevents" % (dataset) out = das_do_command(query) - out = [np.array(r.split(" "))[[0,3]] for r in out if len(r) > 0] + out = [np.array(r.split(" "))[[0, 3]] for r in out if len(r) > 0] - df = pd.DataFrame(out,columns=["file","events"]) + df = pd.DataFrame(out, columns=["file", "events"]) df.events = df.events.values.astype(int) return df + def das_lumi_data(dataset): """ Produces a file by file+lumi+run pandas DataFrame @@ -73,17 +78,17 @@ def das_lumi_data(dataset): - the lumisections. """ - query = 'file,lumi,run dataset=%s '%(dataset) + query = "file,lumi,run dataset=%s " % (dataset) out = das_do_command(query) - out = [r.split(" ") for r in out if len(r)>0] + out = [r.split(" ") for r in out if len(r) > 0] - df = pd.DataFrame(out,columns=["file","run","lumis"]) + df = pd.DataFrame(out, columns=["file", "run", "lumis"]) return df -def get_events_df(golden,dataset,events): +def get_events_df(golden, dataset, events): """ Produces a file by file pandas DataFrame @@ -104,9 +109,11 @@ def get_events_df(golden,dataset,events): lumi_df = das_lumi_data(dataset) file_df = das_file_data(dataset) - df = lumi_df.merge(file_df,on="file",how="inner") # merge file informations with run and lumis + df = lumi_df.merge( + file_df, on="file", how="inner" + ) # merge file informations with run and lumis df["lumis"] = [ - [int(ff) for ff in f.replace("[","").replace("]","").split(",")] + [int(ff) for ff in f.replace("[", "").replace("]", "").split(",")] for f in df.lumis.values ] @@ -123,29 +130,32 @@ def get_events_df(golden,dataset,events): if df_r["events"].sum() < 10000: continue - good_lumis = np.array([len([ll for ll in l if ll in golden[r]]) for l in df_r.lumis]) + good_lumis = np.array( + [len([ll for ll in l if ll in golden[r]]) for l in df_r.lumis] + ) n_lumis = np.array([len(l) for l in df_r.lumis]) - df_rs.append(df_r[good_lumis==n_lumis]) + df_rs.append(df_r[good_lumis == n_lumis]) - if len(df_rs)==0: + if len(df_rs) == 0: return pd.DataFrame([]) - if len(df_rs)==1: + if len(df_rs) == 1: df = df_rs else: df = pd.concat(df_rs) ## lumi sorting - df.loc[:,"min_lumi"] = [min(f) for f in df.lumis] - df.loc[:,"max_lumi"] = [max(f) for f in df.lumis] - df = df.sort_values(["run","min_lumi","max_lumi"]) + df.loc[:, "min_lumi"] = [min(f) for f in df.lumis] + df.loc[:, "max_lumi"] = [max(f) for f in df.lumis] + df = df.sort_values(["run", "min_lumi", "max_lumi"]) ## events skimming - df = df[df["events"] <= events] #jump too big files - df.loc[:,"sum_evs"] = df.loc[:,"events"].cumsum() + df = df[df["events"] <= events] # jump too big files + df.loc[:, "sum_evs"] = df.loc[:, "events"].cumsum() df = df[df["sum_evs"] < events] return df + def get_run_lumi(df): """ Produces the lumi mask dict starting from a pandas DataFrame @@ -170,17 +180,18 @@ def get_run_lumi(df): lumi_list = [ get_lumi_ranges( np.sort( - np.concatenate(df.loc[df["run"]==r,"lumis"].values).ravel() + np.concatenate(df.loc[df["run"] == r, "lumis"].values).ravel() ).tolist() ) for r in run_list ] - lumi_ranges = dict(zip(run_list,lumi_list)) + lumi_ranges = dict(zip(run_list, lumi_list)) return lumi_ranges -def get_lumi_dict(golden,dataset,events): + +def get_lumi_dict(golden, dataset, events): """ Produces a lumi mask for a given dataset, up to events, using a certification json @@ -197,7 +208,7 @@ def get_lumi_dict(golden,dataset,events): E.g. {run : [[lumi_1,lumi_2],[lumi_3,lumi_4]]} """ - df = get_events_df(golden,dataset,events) + df = get_events_df(golden, dataset, events) lumi = get_run_lumi(df) return lumi diff --git a/core/utils/dqm.py b/core/utils/dqm.py index f7cd030..f4da920 100644 --- a/core/utils/dqm.py +++ b/core/utils/dqm.py @@ -2,16 +2,17 @@ import json import os -#pylint: disable=import-error +# pylint: disable=import-error import numpy as np import requests from bs4 import BeautifulSoup from requests.exceptions import HTTPError -#pylint: enable=import-error +# pylint: enable=import-error base_cert_path = "/eos/user/c/cmsdqm/www/CAF/certification/" + def list_certification_files(cert_type): """ List all the certification files related to a certification type @@ -48,6 +49,7 @@ def list_certification_files(cert_type): return file_names + def get_certification_file(path): """ Get a certification file from the CMS DQM certification @@ -68,6 +70,7 @@ def get_certification_file(path): return file.json() + def get_cert_type(dataset): """ List all the certification files related to a certification type @@ -81,8 +84,8 @@ def get_cert_type(dataset): or Commisioning). """ - year = dataset.split("Run")[1][2:4] # from 20XX to XX - PD = dataset.split("/")[1] # pylint: disable=invalid-name + year = dataset.split("Run")[1][2:4] # from 20XX to XX + PD = dataset.split("/")[1] # pylint: disable=invalid-name cert_type = "Collisions" + str(year) if "Cosmics" in dataset: cert_type = "Cosmics" + str(year) @@ -93,7 +96,8 @@ def get_cert_type(dataset): return cert_type -def get_json_list(cert_type,web_fallback): + +def get_json_list(cert_type, web_fallback): """ List all the certification files related to a certification type either stored on CMS DQM EOS either, as a fallback, @@ -113,7 +117,9 @@ def get_json_list(cert_type,web_fallback): cert_path = base_cert_path + cert_type + "/" json_list = os.listdir(cert_path) json_list = [c for c in json_list if "Golden" in c and "era" not in c] - json_list = [c for c in json_list if c.startswith("Cert_C") and c.endswith("json")] + json_list = [ + c for c in json_list if c.startswith("Cert_C") and c.endswith("json") + ] ## ... if not we go to the website else: json_list = list_certification_files(cert_type=cert_type) @@ -127,6 +133,7 @@ def get_json_list(cert_type,web_fallback): return json_list + def get_golden_json(dataset): """ Output a the golden certification dictionary (json) for a specific datasets. @@ -148,12 +155,14 @@ def get_golden_json(dataset): cert_path = base_cert_path + cert_type + "/" web_fallback = not os.path.isdir(cert_path) - json_list = get_json_list(cert_type,web_fallback) + json_list = get_json_list(cert_type, web_fallback) # the larger the better, assuming file naming schema # Cert_X_RunStart_RunFinish_Type.json run_ranges = [int(c.split("_")[3]) - int(c.split("_")[2]) for c in json_list] - latest_json = np.array(json_list[np.argmax(run_ranges)]).reshape(1,-1)[0].astype(str) + latest_json = ( + np.array(json_list[np.argmax(run_ranges)]).reshape(1, -1)[0].astype(str) + ) best_json = str(latest_json[0]) if not web_fallback: with codecs.open(cert_path + "/" + best_json, encoding="utf-8") as js: @@ -164,9 +173,9 @@ def get_golden_json(dataset): # golden json with all the lumisections one by one for k in golden: - R = [] # pylint: disable=invalid-name + R = [] # pylint: disable=invalid-name for r in golden[k]: - R = R + list(range(r[0], r[1] + 1)) # pylint: disable=invalid-name + R = R + list(range(r[0], r[1] + 1)) # pylint: disable=invalid-name golden_flat[k] = R return golden_flat