diff --git a/Configuration/PyReleaseValidation/python/MatrixReader.py b/Configuration/PyReleaseValidation/python/MatrixReader.py index 15bbc4fc31655..e625805d0de25 100644 --- a/Configuration/PyReleaseValidation/python/MatrixReader.py +++ b/Configuration/PyReleaseValidation/python/MatrixReader.py @@ -2,7 +2,7 @@ from Configuration.PyReleaseValidation.WorkFlow import WorkFlow from Configuration.PyReleaseValidation.MatrixUtil import InputInfo - +from Configuration.PyReleaseValidation.upgradeWorkflowComponents import defaultDataSets,undefInput # ================================================================================ class MatrixException(Exception): @@ -25,8 +25,9 @@ def __init__(self, opt): self.apply=opt.apply self.commandLineWf=opt.workflow self.overWrite=opt.overWrite - + self.noRun = opt.noRun + self.checkInputs = opt.checkInputs return def reset(self, what='all'): @@ -127,6 +128,21 @@ def makeStep(self,step,overrides): else: return step + def verifyDefaultInputs(self): + for wf in self.workFlowSteps.values(): + undefs = [driver for driver in wf[2] if isinstance(driver,str) and undefInput in driver ] + if len(undefs)>0: + raise ValueError("""in MatrixReader.py:{0} +============================================================================= +For wf {1}(*) the default dataset not defined in defaultDataSets dictionary. +With --checkInputs option this throws an error. + +(*) +{2} + +============================================================================= + """.format(sys._getframe(1).f_lineno - 1,wf[0],wf)) + def readMatrix(self, fileNameIn, useInput=None, refRel=None, fromScratch=None): prefix = self.filesPrefMap[fileNameIn] @@ -332,6 +348,8 @@ def showRaw(self, useInput, refRel=None, fromScratch=None, what='all',step1Only= try: self.readMatrix(matrixFile, useInput, refRel, fromScratch) + if self.checkInputs: + self.verifyDefaultInputs() except Exception as e: print("ERROR reading file:", matrixFile, str(e)) raise @@ -507,6 +525,8 @@ def prepare(self, useInput=None, refRel='', fromScratch=None): try: self.readMatrix(matrixFile, useInput, refRel, fromScratch) + if self.checkInputs: + self.verifyDefaultInputs() except Exception as e: print("ERROR reading file:", matrixFile, str(e)) raise diff --git a/Configuration/PyReleaseValidation/python/MatrixRunner.py b/Configuration/PyReleaseValidation/python/MatrixRunner.py index 752272ab11cff..b11517fd8b33e 100644 --- a/Configuration/PyReleaseValidation/python/MatrixRunner.py +++ b/Configuration/PyReleaseValidation/python/MatrixRunner.py @@ -1,19 +1,21 @@ import os, sys, time -from Configuration.PyReleaseValidation.WorkFlow import WorkFlow -from Configuration.PyReleaseValidation.WorkFlowRunner import WorkFlowRunner +from collections import Counter +from Configuration.PyReleaseValidation.WorkFlowRunner import WorkFlowRunner +from Configuration.PyReleaseValidation.MatrixUtil import check_dups # ================================================================================ class MatrixRunner(object): - def __init__(self, wfIn=None, nThrMax=4, nThreads=1): + def __init__(self, wfIn=None, nThrMax=4, nThreads=1, gpus=None): self.workFlows = wfIn self.threadList = [] self.maxThreads = nThrMax self.nThreads = nThreads + self.gpus = gpus #the directories in which it happened self.runDirs={} @@ -43,12 +45,20 @@ def runTests(self, opt): print('resetting to default number of process threads = %s' % self.maxThreads) print('Running %s %s %s, each with %s thread%s per process' % ('up to' if self.maxThreads > 1 else '', self.maxThreads, 'concurrent jobs' if self.maxThreads > 1 else 'job', self.nThreads, 's' if self.nThreads > 1 else '')) - - - for wf in self.workFlows: - - if testList and float(wf.numId) not in [float(x) for x in testList]: continue - + + njob = None + wfs_to_run = self.workFlows + withDups = False + if testList: + if opt.allowDuplicates: + withDups = len(check_dups(testList))>0 + wfs_to_run = [wf for wf in self.workFlows if float(wf.numId) in testList for i in range(Counter(testList)[wf.numId])] + + for n,wf in enumerate(wfs_to_run): + + if opt.allowDuplicates and withDups and opt.nProcs > 1: # to avoid overwriting the work areas + njob = n + item = wf.nameId if os.path.islink(item) : continue # ignore symlinks @@ -58,7 +68,10 @@ def runTests(self, opt): print('\nPreparing to run %s %s' % (wf.numId, item)) sys.stdout.flush() - current = WorkFlowRunner(wf,noRun,dryRun,cafVeto, opt.dasOptions, opt.jobReports, opt.nThreads, opt.nStreams, opt.maxSteps, opt.nEvents) + gpu_cmd = None + if self.gpus is not None: + gpu_cmd = next(self.gpus).gpuBind() + current = WorkFlowRunner(wf,opt,noRun,dryRun,cafVeto,njob,gpu_cmd) self.threadList.append(current) current.start() if not dryRun: diff --git a/Configuration/PyReleaseValidation/python/MatrixUtil.py b/Configuration/PyReleaseValidation/python/MatrixUtil.py index bd10cf37a2079..42256047a5098 100644 --- a/Configuration/PyReleaseValidation/python/MatrixUtil.py +++ b/Configuration/PyReleaseValidation/python/MatrixUtil.py @@ -1,4 +1,6 @@ import os +import subprocess + class Matrix(dict): def __setitem__(self,key,value): if key in self: @@ -281,3 +283,58 @@ def check_dups(input): dups = set(x for x in input if x in seen or seen.add(x)) return dups + +class AvailableGPU(): + + def __init__(self, make, counter, id, capability, name): + self.make = make + self.counter = counter + self.id = id + self.capability = capability + self.name = name + + def __str__(self): + return "> GPU no.{0}: {1} - {2} - {3} - {4}".format(self.counter,self.make,self.id,self.capability,self.name) + + def isCUDA(self): + return self.make == 'CUDA' + def isROCM(self): + return self.make == 'ROCM' + + def gpuBind(self): + + cmd = '' + if self.make == 'CUDA': + cmd = 'CUDA_VISIBLE_DEVICES=' + str(self.id) + " HIP_VISIBLE_DEVICES= " + elif self.make == 'ROCM': + cmd = 'CUDA_VISIBLE_DEVICES= HIP_VISIBLE_DEVICES=' + str(self.id) + " " + + return cmd + +def cleanComputeCapabilities(make, offset = 0): + + # Building on top of {cuda|rocm}ComputeCapabilities + # with output: + # ID computeCapability Architetcure Model Info + + out = subprocess.run(make + "ComputeCapabilities", capture_output = True, text = True) + + if out.returncode > 0: + return [] + + gpus = [] + for f in out.stdout.split("\n"): + + if not len(f)>0: + continue + + if "unsupported" in f: + print("> Warning! Unsupported GPU:") + print(" > " + " ".join(f)) + continue + + gpus.append(f.split()) + + gpus = [AvailableGPU(make.upper(), i + offset, int(f[0]),f[1]," ".join(f[2:])) for i,f in enumerate(gpus)] + + return gpus \ No newline at end of file diff --git a/Configuration/PyReleaseValidation/python/WorkFlowRunner.py b/Configuration/PyReleaseValidation/python/WorkFlowRunner.py index 2cf5b8a0d4184..447e1ffede80b 100644 --- a/Configuration/PyReleaseValidation/python/WorkFlowRunner.py +++ b/Configuration/PyReleaseValidation/python/WorkFlowRunner.py @@ -7,26 +7,33 @@ from datetime import datetime class WorkFlowRunner(Thread): - def __init__(self, wf, noRun=False,dryRun=False,cafVeto=True,dasOptions="",jobReport=False, nThreads=1, nStreams=0, maxSteps=9999, nEvents=0): + def __init__(self, wf, opt, noRun=False, dryRun=False, cafVeto=True, jobNumber=None, gpu = None): Thread.__init__(self) self.wf = wf - self.status=-1 - self.report='' - self.nfail=0 - self.npass=0 - self.noRun=noRun - self.dryRun=dryRun - self.cafVeto=cafVeto - self.dasOptions=dasOptions - self.jobReport=jobReport - self.nThreads=nThreads - self.nStreams=nStreams - self.maxSteps=maxSteps - self.nEvents=nEvents - self.recoOutput='' + self.status = -1 + self.report ='' + self.nfail = 0 + self.npass = 0 + self.noRun = noRun + self.dryRun = dryRun + self.cafVeto = cafVeto + self.gpu = gpu + + self.dasOptions = opt.dasOptions + self.jobReport = opt.jobReports + self.nThreads = opt.nThreads + self.nStreams = opt.nStreams + self.maxSteps = opt.maxSteps + self.nEvents = opt.nEvents + self.recoOutput = '' + self.startFrom = opt.startFrom + self.recycle = opt.recycle self.wfDir=str(self.wf.numId)+'_'+self.wf.nameId + if jobNumber is not None: + self.wfDir = self.wfDir + '_job' + str(jobNumber) + return def doCmd(self, cmd): @@ -98,6 +105,9 @@ def closeCmd(i,ID): self.stat.append('NOTRUN') continue if not isinstance(com,str): + if self.recycle: + inFile = self.recycle + continue if self.cafVeto and (com.location == 'CAF' and not onCAF): print("You need to be no CAF to run",self.wf.numId) self.npass.append(0) @@ -146,7 +156,19 @@ def closeCmd(i,ID): else: #chaining IO , which should be done in WF object already and not using stepX.root but .root + if self.gpu is not None: + cmd = cmd + self.gpu + cmd += com + + if self.startFrom: + steps = cmd.split("-s ")[1].split(" ")[0] + if self.startFrom not in steps: + continue + else: + self.startFrom = False + inFile = self.recycle + if self.noRun: cmd +=' --no_exec' # in case previous step used DAS query (either filelist of das:) @@ -191,6 +213,7 @@ def closeCmd(i,ID): cmd = split[0] + event_token + '%s ' % self.nEvents + pos_cmd cmd+=closeCmd(istep,self.wf.nameId) retStep = 0 + if istep>self.maxSteps: wf_stats = open("%s/wf_steps.txt" % self.wfDir,"a") wf_stats.write('step%s:%s\n' % (istep, cmd)) diff --git a/Configuration/PyReleaseValidation/python/relval_data_highstats.py b/Configuration/PyReleaseValidation/python/relval_data_highstats.py index f683ae5ab935e..9c4ac9f00c946 100644 --- a/Configuration/PyReleaseValidation/python/relval_data_highstats.py +++ b/Configuration/PyReleaseValidation/python/relval_data_highstats.py @@ -59,7 +59,7 @@ wf_number = wf_number + offset_pd * p_n wf_number = wf_number + offset_events * evs wf_number = round(wf_number,6) - step_name = "Run" + pd + era.split("Run")[1] + "_10k" + step_name = "Run" + pd + era.split("Run")[1] + "_" + e_key y = str(int(base_wf)) suff = "ZB_" if "ZeroBias" in step_name else "" workflows[wf_number] = ['',[step_name,'HLTDR3_' + y,'RECONANORUN3_' + suff + 'reHLT_'+y,'HARVESTRUN3_' + suff + y]] diff --git a/Configuration/PyReleaseValidation/python/relval_steps.py b/Configuration/PyReleaseValidation/python/relval_steps.py index 451570814f969..4ba55252a9b3c 100644 --- a/Configuration/PyReleaseValidation/python/relval_steps.py +++ b/Configuration/PyReleaseValidation/python/relval_steps.py @@ -1,9 +1,11 @@ +import sys + from .MatrixUtil import * from Configuration.HLT.autoHLT import autoHLT from Configuration.AlCa.autoPCL import autoPCL from Configuration.Skimming.autoSkim import autoSkim -from .upgradeWorkflowComponents import step3_trackingOnly +from Configuration.PyReleaseValidation.upgradeWorkflowComponents import step3_trackingOnly,undefInput # step1 gensim: for run1 step1Defaults = {'--relval' : None, # need to be explicitly set @@ -4579,13 +4581,17 @@ def gen2024HiMix(fragment,howMuch): for gen in upgradeFragments: for ds in defaultDataSets: key=gen[:-4]+'_'+ds - version='1' + version = "1" + if defaultDataSets[ds] == '' or ds =='Run4D98': + version = undefInput if key in versionOverrides: version = versionOverrides[key] baseDataSetReleaseBetter[key]=defaultDataSets[ds]+version PUDataSets={} for ds in defaultDataSets: + if "GenOnly" in ds: + continue key='MinBias_14TeV_pythia8_TuneCP5'+'_'+ds name=baseDataSetReleaseBetter[key] if '2017' in ds: @@ -4605,7 +4611,6 @@ def gen2024HiMix(fragment,howMuch): #PUDataSets[ds]={'-n':10,'--pileup':'AVE_50_BX_25ns','--pileup_input':'das:/RelValMinBias_13/%s/GEN-SIM'%(name,)} #PUDataSets[ds]={'-n':10,'--pileup':'AVE_70_BX_25ns','--pileup_input':'das:/RelValMinBias_13/%s/GEN-SIM'%(name,)} - upgradeStepDict={} for specialType,specialWF in upgradeWFs.items(): specialWF.init(upgradeStepDict) diff --git a/Configuration/PyReleaseValidation/python/upgradeWorkflowComponents.py b/Configuration/PyReleaseValidation/python/upgradeWorkflowComponents.py index 89e94819ac303..45a233da94037 100644 --- a/Configuration/PyReleaseValidation/python/upgradeWorkflowComponents.py +++ b/Configuration/PyReleaseValidation/python/upgradeWorkflowComponents.py @@ -3,6 +3,8 @@ from .MatrixUtil import merge, Kby, Mby, check_dups import re +undefInput = "UNDEF" + U2000by1={'--relval': '2000,1'} # DON'T CHANGE THE ORDER, only append new keys. Otherwise the numbering for the runTheMatrix tests will change. @@ -3023,6 +3025,8 @@ def condition(self, fragment, stepList, key, hasHarvest): # standard PU sequences for key in list(upgradeProperties[2017].keys()): + if "GenOnly" in key: + continue upgradeProperties[2017][key+'PU'] = deepcopy(upgradeProperties[2017][key]) if 'FS' not in key: # update ScenToRun list @@ -3251,6 +3255,8 @@ def condition(self, fragment, stepList, key, hasHarvest): # standard PU sequences for key in list(upgradeProperties['Run4'].keys()): + if "GenOnly" in key: + continue upgradeProperties['Run4'][key+'PU'] = deepcopy(upgradeProperties['Run4'][key]) upgradeProperties['Run4'][key+'PU']['ScenToRun'] = ['GenSimHLBeamSpot','DigiTriggerPU','RecoGlobalPU', 'HARVESTGlobalPU'] diff --git a/Configuration/PyReleaseValidation/scripts/runTheMatrix.py b/Configuration/PyReleaseValidation/scripts/runTheMatrix.py index 0be99cf5f46ca..b8231da7d3edb 100755 --- a/Configuration/PyReleaseValidation/scripts/runTheMatrix.py +++ b/Configuration/PyReleaseValidation/scripts/runTheMatrix.py @@ -1,10 +1,12 @@ #!/usr/bin/env python3 import sys, os +from itertools import cycle + from Configuration.PyReleaseValidation.MatrixReader import MatrixReader from Configuration.PyReleaseValidation.MatrixRunner import MatrixRunner from Configuration.PyReleaseValidation.MatrixInjector import MatrixInjector,performInjectionOptionTest - +from Configuration.PyReleaseValidation.MatrixUtil import cleanComputeCapabilities # ================================================================================ def showRaw(opt): @@ -28,15 +30,14 @@ def runSelected(opt): testSet = set(opt.testList) undefSet = testSet - definedSet if len(undefSet)>0: raise ValueError('Undefined workflows: '+', '.join(map(str,list(undefSet)))) - duplicates = [wf for wf in testSet if definedWf.count(wf)>1 ] - if len(duplicates)>0: raise ValueError('Duplicated workflows: '+', '.join(map(str,list(duplicates)))) - + if not opt.allowDuplicates: + testList = testSet ret = 0 if opt.show: mrd.show(opt.testList, opt.extended, opt.cafVeto) if opt.testList : print('selected items:', opt.testList) else: - mRunnerHi = MatrixRunner(mrd.workFlows, opt.nProcs, opt.nThreads) + mRunnerHi = MatrixRunner(mrd.workFlows, opt.nProcs, opt.nThreads, opt.selected_gpus) ret = mRunnerHi.runTests(opt) if opt.wmcontrol: @@ -173,6 +174,25 @@ def runSelected(opt): dest='memoryOffset', type=int, default=3000) + + parser.add_argument('--startFrom', + help='Start from a specific step (e.g. GEN,SIM,DIGI,RECO)', + dest='startFrom', + type=str, + default=None) + + parser.add_argument('--recycle', + help='Input file to recycle. To be used if the first step is an input step or togehter with --startFrom. ' + 'N.B.: runTheMatrix.py will create its own workdirectory so if yo use a relative path, be careful.', + dest='recycle', + type=str, + default=None) + + parser.add_argument('--allowDuplicates', + help='Allow to have duplicate workflow numbers in the list', + dest='allowDuplicates', + default=False, + action='store_true') parser.add_argument('--addMemPerCore', help='increase of memory per each n > 1 core: memory(n_core) = memoryOffset + (n_core-1) * memPerCore', @@ -216,6 +236,12 @@ def runSelected(opt): default=False, action='store_true') + parser.add_argument('-c','--checkInputs', + help='Check if the default inputs are well defined. To be used with --show', + dest='checkInputs', + default=False, + action='store_true') + parser.add_argument('-e','--extended', help='Show details of workflows, used with --show', dest='extended', @@ -402,7 +428,7 @@ def runSelected(opt): help='Specify a comma-separated list of CUDA "compute capabilities", or GPU hardware architectures, that the job can use.', dest='CUDACapabilities', type=lambda x: x.split(','), - default='6.0,6.1,6.2,7.0,7.2,7.5,8.0,8.6') + default='6.0,6.1,6.2,7.0,7.2,7.5,8.0,8.6,8.7,8.9,9.0,12.0') # read the CUDA runtime version included in CMSSW cudart_version = None @@ -431,6 +457,43 @@ def runSelected(opt): default='') opt = parser.parse_args() + opt.selected_gpus = None + + if opt.gpu != 'forbidden': + + print(">> Running with --gpu option. Checking the available and supported GPUs.") + gpus = cleanComputeCapabilities("cuda") + gpus = gpus + cleanComputeCapabilities("rocm", len(gpus)) + available_gpus = gpus + + if len(available_gpus) == 0: + print(">> No GPU available!") + opt.gpu = 'forbidden' + else: + print(">> GPUs available:") + [print(f) for f in available_gpus] + + # Filtering ONLY CUDA GPUs on capability + gpus = [g for g in gpus if not g.isCUDA() or (g.isCUDA() and g.capability in opt.CUDACapabilities)] + + # Filtering by name (if parsed) + if len(opt.GPUName) > 0: + gpus = [g for g in gpus if g.name == opt.GPUName] + + if available_gpus != gpus: + if len(gpus) > 0: + print(">> GPUs selected:") + [print(f) for f in gpus] + else: + print(">> No GPU selected!") + else: + print(">> All selected!") + + if len(gpus) > 0: + opt.selected_gpus = cycle(gpus) + else: + opt.gpu = 'forbidden' + if opt.command: opt.command = ' '.join(opt.command) os.environ["CMSSW_DAS_QUERY_SITES"]=opt.dasSites if opt.failed_from: @@ -478,7 +541,7 @@ def stepOrIndex(s): except: print(entry,'is not a possible selected entry') - opt.testList = list(set(testList)) + opt.testList = list(testList) if opt.wmcontrol: performInjectionOptionTest(opt)