Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Precomputing Lumi Mask For Event Based Data RelVals #138

Merged
merged 10 commits into from
Jan 20, 2025
10 changes: 6 additions & 4 deletions core/controller/relval_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ def get_task_dict(self, relval, step, step_index):
"Getting step %s dict for %s", step_index, relval.get_prepid()
)
task_dict = {}
# If it's firtst step and not input file - it is generator
# If it's first step and not input file - it is generator
# set Seeding to AutomaticSeeding, RequestNumEvets, EventsPerJob and EventsPerLumi
# It expects --relval attribute
if step_index == 0:
Expand All @@ -264,6 +264,9 @@ def get_task_dict(self, relval, step, step_index):
task_dict["EventsPerLumi"] = int(events_per_job)
else:
input_step = relval.get("steps")[step.get_input_step_index()]

task_dict["SplittingAlgo"] = "LumiBased"

if input_step.get_step_type() == "input_file":
input_dict = input_step.get("input")
# Input file step is not a task
Expand All @@ -273,16 +276,15 @@ def get_task_dict(self, relval, step, step_index):
task_dict["LumiList"] = input_dict["lumisection"]
elif input_dict["run"]:
task_dict["RunWhitelist"] = input_dict["run"]

else:
task_dict["InputTask"] = input_step.get_short_name()
_, input_module = step.get_input_eventcontent(input_step)
task_dict["InputFromOutputModule"] = f"{input_module}output"

if step.get("lumis_per_job") != "":
task_dict["LumisPerJob"] = int(step.get("lumis_per_job"))

task_dict["SplittingAlgo"] = "LumiBased"


task_dict["TaskName"] = step.get_short_name()
task_dict["ConfigCacheID"] = step.get("config_id")
task_dict["KeepOutput"] = step.get("keep_output")
Expand Down
16 changes: 16 additions & 0 deletions core/controller/ticket_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,12 +365,28 @@ def generate_workflows(self, ticket, ssh_executor):
"core/utils/run_the_matrix_pdmv.py",
f"{remote_directory}/run_the_matrix_pdmv.py",
)

ssh_executor.upload_file(
"core/utils/dqm.py",
f"{remote_directory}/dqm.py",
)
ssh_executor.upload_file(
"core/utils/das.py",
f"{remote_directory}/das.py",
)
command = [
f"cd {remote_directory}",
"voms-proxy-init -voms cms --valid 4:00 --out $(pwd)/proxy.txt",
]
ssh_executor.execute_command(command)

# Defined a name for output file
file_name = f"{ticket_prepid}.json"
# Execute run_the_matrix_pdmv.py
matrix_command = run_commands_in_cmsenv(
[
f"cd {remote_directory}",
"export X509_USER_PROXY=$(pwd)/proxy.txt",
"$PYTHON_INT run_the_matrix_pdmv.py "
f"-l={workflow_ids} "
f"-w={matrix} "
Expand Down
17 changes: 0 additions & 17 deletions core/model/relval_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,23 +284,6 @@ def __build_das_command(self, step_index):
return (comment + '\n' + command).strip()

events = input_dict['events']

## N.B. das-up-to-nevents.py exists only from 14_1_0_pre7
cmssw_components = lambda x: x.strip().split("_")
cmssw_release = cmssw_components(self.get_release())
check_das_up_to_nevents = cmssw_release >= cmssw_components("14_1_0_pre7")

if events > 0 and check_das_up_to_nevents:
self.logger.info('Making a DAS command for step %s with max events', step_index)
files_name = f'step{step_index + 1}_files.txt'
comment = f'# Arguments for step {step_index + 1}:\n'
command = f'# Command for step {step_index + 1}:\n'
comment += f'# dataset: {dataset}\n'
comment += f'# events : {events}\n'
command += f'echo "" > {files_name}\n'
command += f'das-up-to-nevents.py -d {dataset} -e {events} '
command += f'>> {files_name}\n'
return (comment + '\n' + command).strip()

return f'# Step {step_index + 1} is input dataset for next step: {dataset}'

Expand Down
133 changes: 133 additions & 0 deletions core/utils/das.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
#!/usr/bin/env python3
import pandas as pd
import subprocess
import itertools
import numpy as np

def get_lumi_ranges(i):
result = []
for _, b in itertools.groupby(enumerate(i), lambda pair: pair[1] - pair[0]):
b = list(b)
result.append([b[0][1],b[-1][1]])
return result

def das_do_command(cmd):
out = subprocess.check_output(cmd, shell=True, executable="/bin/bash").decode('utf8')
return out.split("\n")

def das_file_site(dataset, site):
cmd = "dasgoclient --query='file dataset=%s site=%s'"%(dataset,site)
out = das_do_command(cmd)
df = pd.DataFrame(out,columns=["file"])

return df

def das_file_data(dataset,opt=""):
cmd = "dasgoclient --query='file dataset=%s %s| grep file.name, file.nevents'"%(dataset,opt)
out = das_do_command(cmd)
out = [np.array(r.split(" "))[[0,3]] for r in out if len(r) > 0]

df = pd.DataFrame(out,columns=["file","events"])
df.events = df.events.values.astype(int)

return df

def das_lumi_data(dataset,opt=""):
cmd = "dasgoclient --query='file,lumi,run dataset=%s %s'"%(dataset,opt)

out = das_do_command(cmd)
out = [r.split(" ") for r in out if len(r)>0]

df = pd.DataFrame(out,columns=["file","run","lumis"])

return df

def das_run_events_data(dataset,run,opt=""):
cmd = "dasgoclient --query='file dataset=%s run=%s %s | sum(file.nevents) '"%(dataset,run,opt)
out = das_do_command(cmd)[0]

out = [o for o in out.split(" ") if "sum" not in o]
out = int([r.split(" ") for r in out if len(r)>0][0][0])

return out

def das_run_data(dataset,opt=""):
cmd = "dasgoclient --query='run dataset=%s %s '"%(dataset,opt)
out = das_do_command(cmd)

return out

def get_events_df(golden,dataset,events):

'''
Given in input:
- a run by run certification json
- the dataset name
- the number of desired events
this produces a pandas dataframe with a file per row.

For each row it has: the file name, lumisections, run and number
of events and the cumulative sum of the events.
'''

lumi_df = das_lumi_data(dataset)
file_df = das_file_data(dataset)

df = lumi_df.merge(file_df,on="file",how="inner") # merge file informations with run and lumis
df["lumis"] = [[int(ff) for ff in f.replace("[","").replace("]","").split(",")] for f in df.lumis.values]

df_rs = []

for r in golden:
cut = (df["run"] == r)
if not any(cut):
continue

df_r = df[cut]

# jumping very low event count runs
if df_r["events"].sum() < 10000:
continue

good_lumis = np.array([len([ll for ll in l if ll in golden[r]]) for l in df_r.lumis])
n_lumis = np.array([len(l) for l in df_r.lumis])
df_rs.append(df_r[good_lumis==n_lumis])

if len(df_rs)==0:
return pd.DataFrame([])
elif len(df_rs)==1:
df = df_rs
else:
df = pd.concat(df_rs)

## lumi sorting
df.loc[:,"min_lumi"] = [min(f) for f in df.lumis]
df.loc[:,"max_lumi"] = [max(f) for f in df.lumis]
df = df.sort_values(["run","min_lumi","max_lumi"])

## events skimming
df = df[df["events"] <= events] #jump too big files
df.loc[:,"sum_evs"] = df.loc[:,"events"].cumsum()
df = df[df["sum_evs"] < events]

return df

def get_run_lumi(df):

if len(df) == 0:
return {}

run_list = np.unique(df.run.values).tolist()
lumi_list = [get_lumi_ranges(np.sort(np.concatenate(df.loc[df["run"]==r,"lumis"].values).ravel()).tolist()) for r in run_list]

lumi_ranges = dict(zip(run_list,lumi_list))

return lumi_ranges

def get_lumi_dict(golden,dataset,events):

df = get_events_df(golden,dataset,events)
lumi = get_run_lumi(df)

return lumi

91 changes: 91 additions & 0 deletions core/utils/dqm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from bs4 import BeautifulSoup
import pycurl
from io import BytesIO
import os
import ast
import numpy as np
import json

base_cert_url = "https://cms-service-dqmdc.web.cern.ch/CAF/certification/"
base_cert_path = "/eos/user/c/cmsdqm/www/CAF/certification/"

def get_url_clean(url):

buffer = BytesIO()
c = pycurl.Curl()
c.setopt(c.URL, url)
c.setopt(c.WRITEDATA, buffer)
c.perform()
c.close()

return BeautifulSoup(buffer.getvalue(), "lxml").text

def get_cert_type(dataset):

year = dataset.split("Run")[1][2:4] # from 20XX to XX
PD = dataset.split("/")[1]
cert_type = "Collisions" + str(year)
if "Cosmics" in dataset:
cert_type = "Cosmics" + str(year)
elif "Commisioning" in dataset:
cert_type = "Commisioning2020"
elif "HI" in PD:
cert_type = "Collisions" + str(year) + "HI"

return cert_type

def get_json_list(dataset,cert_type,web_fallback):

## if we have access to eos we get from there ...
if not web_fallback:
cert_path = base_cert_path + cert_type + "/"
json_list = os.listdir(cert_path)
if len(json_list) == 0:
web_fallback == True
json_list = [c for c in json_list if "Golden" in c and "era" not in c]
json_list = [c for c in json_list if c.startswith("Cert_C") and c.endswith("json")]
## ... if not we go to the website
else:
cert_url = base_cert_url + cert_type + "/"
json_list = get_url_clean(cert_url).split("\n")
json_list = [c for c in json_list if "Golden" in c and "era" not in c and "Cert_C" in c]
json_list = [[cc for cc in c.split(" ") if cc.startswith("Cert_C") and cc.endswith("json")][0] for c in json_list]

return json_list

def get_golden_json(dataset):
'''
Get the flattened golden json with highest
lumi range based on the dataset name.
'''

golden_flat = {}

cert_type = get_cert_type(dataset)
cert_path = base_cert_path + cert_type + "/"
cert_url = base_cert_url + cert_type + "/"
web_fallback = not os.path.isdir(cert_path)

json_list = get_json_list(dataset,cert_type,web_fallback)

# the larger the better, assuming file naming schema
# Cert_X_RunStart_RunFinish_Type.json
run_ranges = [int(c.split("_")[3]) - int(c.split("_")[2]) for c in json_list]
latest_json = np.array(json_list[np.argmax(run_ranges)]).reshape(1,-1)[0].astype(str)
best_json = str(latest_json[0])
if not web_fallback:
with open(cert_path + "/" + best_json) as js:
golden = json.load(js)
else:
golden = get_url_clean(cert_url + best_json)
golden = ast.literal_eval(golden) #converts string to dict

# golden json with all the lumisections one by one
for k in golden:
R = []
for r in golden[k]:
R = R + [f for f in range(r[0],r[1]+1)]
golden_flat[k] = R

return golden_flat

20 changes: 19 additions & 1 deletion core/utils/run_the_matrix_pdmv.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import importlib
import inspect
import re
from das import get_lumi_dict
from dqm import get_golden_json
from os import environ as env
#pylint: disable=wrong-import-position,import-error
import Configuration.PyReleaseValidation.relval_steps as steps_module
from Configuration.PyReleaseValidation.MatrixInjector import MatrixInjector
Expand Down Expand Up @@ -153,11 +156,21 @@ def merge_additional_command(workflow_step, command):
print('Merging to %s' % (workflow_step))
return steps_module.merge([command_dict, workflow_step])

def get_lumi_ranges_from_dict(step_input):
"""
Given the dict of an INPUT step outputs the lumi mask to be used
taking into account the max number of events to be processed
"""
golden = get_golden_json(step_input.dataSet)
lumi = get_lumi_dict(golden,step_input.dataSet,step_input.events)

return lumi

def make_relval_step(workflow_step, workflow_step_name, wmsplit):
"""
Build one workflow step - either input dataset or cmsDriver
"""
print(workflow_step)
step = {'name': workflow_step_name}
if workflow_step_name in wmsplit:
step['lumis_per_job'] = wmsplit[workflow_step_name]
Expand All @@ -170,8 +183,13 @@ def make_relval_step(workflow_step, workflow_step_name, wmsplit):
if 'INPUT' in workflow_step:
# This step has input dataset
step_input = workflow_step['INPUT']
lumisections = step_input.ls

if not step_input.ls and step_input.events:
lumisections = get_lumi_ranges_from_dict(step_input)

step['input'] = {'dataset': step_input.dataSet,
'lumisection': step_input.ls,
'lumisection': lumisections,
'run': step_input.run,
'label': step_input.label,
'events': step_input.events}
Expand Down
Loading