Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates for runTheMatrix.py: input checks, GPUs repartition, input recycling #47377

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions Configuration/PyReleaseValidation/python/MatrixReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from Configuration.PyReleaseValidation.WorkFlow import WorkFlow
from Configuration.PyReleaseValidation.MatrixUtil import InputInfo

from Configuration.PyReleaseValidation.upgradeWorkflowComponents import defaultDataSets,undefInput
# ================================================================================

class MatrixException(Exception):
Expand All @@ -25,8 +25,9 @@ def __init__(self, opt):
self.apply=opt.apply
self.commandLineWf=opt.workflow
self.overWrite=opt.overWrite

self.noRun = opt.noRun
self.checkInputs = opt.checkInputs
return

def reset(self, what='all'):
Expand Down Expand Up @@ -127,6 +128,21 @@ def makeStep(self,step,overrides):
else:
return step

def verifyDefaultInputs(self):
for wf in self.workFlowSteps.values():
undefs = [driver for driver in wf[2] if isinstance(driver,str) and undefInput in driver ]
if len(undefs)>0:
raise ValueError("""in MatrixReader.py:{0}
=============================================================================
For wf {1}(*) the default dataset not defined in defaultDataSets dictionary.
With --checkInputs option this throws an error.

(*)
{2}

=============================================================================
""".format(sys._getframe(1).f_lineno - 1,wf[0],wf))

def readMatrix(self, fileNameIn, useInput=None, refRel=None, fromScratch=None):

prefix = self.filesPrefMap[fileNameIn]
Expand Down Expand Up @@ -332,6 +348,8 @@ def showRaw(self, useInput, refRel=None, fromScratch=None, what='all',step1Only=

try:
self.readMatrix(matrixFile, useInput, refRel, fromScratch)
if self.checkInputs:
self.verifyDefaultInputs()
except Exception as e:
print("ERROR reading file:", matrixFile, str(e))
raise
Expand Down Expand Up @@ -507,6 +525,8 @@ def prepare(self, useInput=None, refRel='', fromScratch=None):

try:
self.readMatrix(matrixFile, useInput, refRel, fromScratch)
if self.checkInputs:
self.verifyDefaultInputs()
except Exception as e:
print("ERROR reading file:", matrixFile, str(e))
raise
Expand Down
33 changes: 23 additions & 10 deletions Configuration/PyReleaseValidation/python/MatrixRunner.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
import os, sys, time

from Configuration.PyReleaseValidation.WorkFlow import WorkFlow
from Configuration.PyReleaseValidation.WorkFlowRunner import WorkFlowRunner
from collections import Counter

from Configuration.PyReleaseValidation.WorkFlowRunner import WorkFlowRunner
from Configuration.PyReleaseValidation.MatrixUtil import check_dups
# ================================================================================

class MatrixRunner(object):

def __init__(self, wfIn=None, nThrMax=4, nThreads=1):
def __init__(self, wfIn=None, nThrMax=4, nThreads=1, gpus=None):

self.workFlows = wfIn

self.threadList = []
self.maxThreads = nThrMax
self.nThreads = nThreads
self.gpus = gpus

#the directories in which it happened
self.runDirs={}
Expand Down Expand Up @@ -43,12 +45,20 @@ def runTests(self, opt):
print('resetting to default number of process threads = %s' % self.maxThreads)

print('Running %s %s %s, each with %s thread%s per process' % ('up to' if self.maxThreads > 1 else '', self.maxThreads, 'concurrent jobs' if self.maxThreads > 1 else 'job', self.nThreads, 's' if self.nThreads > 1 else ''))


for wf in self.workFlows:

if testList and float(wf.numId) not in [float(x) for x in testList]: continue


njob = None
wfs_to_run = self.workFlows
withDups = False
if testList:
if opt.allowDuplicates:
withDups = len(check_dups(testList))>0
wfs_to_run = [wf for wf in self.workFlows if float(wf.numId) in testList for i in range(Counter(testList)[wf.numId])]

for n,wf in enumerate(wfs_to_run):

if opt.allowDuplicates and withDups and opt.nProcs > 1: # to avoid overwriting the work areas
njob = n

item = wf.nameId
if os.path.islink(item) : continue # ignore symlinks

Expand All @@ -58,7 +68,10 @@ def runTests(self, opt):

print('\nPreparing to run %s %s' % (wf.numId, item))
sys.stdout.flush()
current = WorkFlowRunner(wf,noRun,dryRun,cafVeto, opt.dasOptions, opt.jobReports, opt.nThreads, opt.nStreams, opt.maxSteps, opt.nEvents)
gpu_cmd = None
if self.gpus is not None:
gpu_cmd = next(self.gpus).gpuBind()
current = WorkFlowRunner(wf,opt,noRun,dryRun,cafVeto,njob,gpu_cmd)
self.threadList.append(current)
current.start()
if not dryRun:
Expand Down
57 changes: 57 additions & 0 deletions Configuration/PyReleaseValidation/python/MatrixUtil.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import os
import subprocess

class Matrix(dict):
def __setitem__(self,key,value):
if key in self:
Expand Down Expand Up @@ -281,3 +283,58 @@ def check_dups(input):
dups = set(x for x in input if x in seen or seen.add(x))

return dups

class AvailableGPU():

def __init__(self, make, counter, id, capability, name):
self.make = make
self.counter = counter
self.id = id
self.capability = capability
self.name = name

def __str__(self):
return "> GPU no.{0}: {1} - {2} - {3} - {4}".format(self.counter,self.make,self.id,self.capability,self.name)

def isCUDA(self):
return self.make == 'CUDA'
def isROCM(self):
return self.make == 'ROCM'

def gpuBind(self):

cmd = ''
if self.make == 'CUDA':
cmd = 'CUDA_VISIBLE_DEVICES=' + str(self.id) + " HIP_VISIBLE_DEVICES= "
elif self.make == 'ROCM':
cmd = 'CUDA_VISIBLE_DEVICES= HIP_VISIBLE_DEVICES=' + str(self.id) + " "

return cmd

def cleanComputeCapabilities(make, offset = 0):

# Building on top of {cuda|rocm}ComputeCapabilities
# with output:
# ID computeCapability Architetcure Model Info

out = subprocess.run(make + "ComputeCapabilities", capture_output = True, text = True)

if out.returncode > 0:
return []

gpus = []
for f in out.stdout.split("\n"):

if not len(f)>0:
continue

if "unsupported" in f:
print("> Warning! Unsupported GPU:")
print(" > " + " ".join(f))
continue

gpus.append(f.split())

gpus = [AvailableGPU(make.upper(), i + offset, int(f[0]),f[1]," ".join(f[2:])) for i,f in enumerate(gpus)]

return gpus
53 changes: 38 additions & 15 deletions Configuration/PyReleaseValidation/python/WorkFlowRunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,33 @@
from datetime import datetime

class WorkFlowRunner(Thread):
def __init__(self, wf, noRun=False,dryRun=False,cafVeto=True,dasOptions="",jobReport=False, nThreads=1, nStreams=0, maxSteps=9999, nEvents=0):
def __init__(self, wf, opt, noRun=False, dryRun=False, cafVeto=True, jobNumber=None, gpu = None):
Thread.__init__(self)
self.wf = wf

self.status=-1
self.report=''
self.nfail=0
self.npass=0
self.noRun=noRun
self.dryRun=dryRun
self.cafVeto=cafVeto
self.dasOptions=dasOptions
self.jobReport=jobReport
self.nThreads=nThreads
self.nStreams=nStreams
self.maxSteps=maxSteps
self.nEvents=nEvents
self.recoOutput=''
self.status = -1
self.report =''
self.nfail = 0
self.npass = 0
self.noRun = noRun
self.dryRun = dryRun
self.cafVeto = cafVeto
self.gpu = gpu

self.dasOptions = opt.dasOptions
self.jobReport = opt.jobReports
self.nThreads = opt.nThreads
self.nStreams = opt.nStreams
self.maxSteps = opt.maxSteps
self.nEvents = opt.nEvents
self.recoOutput = ''
self.startFrom = opt.startFrom
self.recycle = opt.recycle

self.wfDir=str(self.wf.numId)+'_'+self.wf.nameId
if jobNumber is not None:
self.wfDir = self.wfDir + '_job' + str(jobNumber)

return

def doCmd(self, cmd):
Expand Down Expand Up @@ -98,6 +105,9 @@ def closeCmd(i,ID):
self.stat.append('NOTRUN')
continue
if not isinstance(com,str):
if self.recycle:
inFile = self.recycle
continue
if self.cafVeto and (com.location == 'CAF' and not onCAF):
print("You need to be no CAF to run",self.wf.numId)
self.npass.append(0)
Expand Down Expand Up @@ -146,7 +156,19 @@ def closeCmd(i,ID):

else:
#chaining IO , which should be done in WF object already and not using stepX.root but <stepName>.root
if self.gpu is not None:
cmd = cmd + self.gpu

cmd += com

if self.startFrom:
steps = cmd.split("-s ")[1].split(" ")[0]
if self.startFrom not in steps:
continue
else:
self.startFrom = False
inFile = self.recycle

if self.noRun:
cmd +=' --no_exec'
# in case previous step used DAS query (either filelist of das:)
Expand Down Expand Up @@ -191,6 +213,7 @@ def closeCmd(i,ID):
cmd = split[0] + event_token + '%s ' % self.nEvents + pos_cmd
cmd+=closeCmd(istep,self.wf.nameId)
retStep = 0

if istep>self.maxSteps:
wf_stats = open("%s/wf_steps.txt" % self.wfDir,"a")
wf_stats.write('step%s:%s\n' % (istep, cmd))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
wf_number = wf_number + offset_pd * p_n
wf_number = wf_number + offset_events * evs
wf_number = round(wf_number,6)
step_name = "Run" + pd + era.split("Run")[1] + "_10k"
step_name = "Run" + pd + era.split("Run")[1] + "_" + e_key
y = str(int(base_wf))
suff = "ZB_" if "ZeroBias" in step_name else ""
workflows[wf_number] = ['',[step_name,'HLTDR3_' + y,'RECONANORUN3_' + suff + 'reHLT_'+y,'HARVESTRUN3_' + suff + y]]
Expand Down
11 changes: 8 additions & 3 deletions Configuration/PyReleaseValidation/python/relval_steps.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import sys

from .MatrixUtil import *

from Configuration.HLT.autoHLT import autoHLT
from Configuration.AlCa.autoPCL import autoPCL
from Configuration.Skimming.autoSkim import autoSkim
from .upgradeWorkflowComponents import step3_trackingOnly
from Configuration.PyReleaseValidation.upgradeWorkflowComponents import step3_trackingOnly,undefInput

# step1 gensim: for run1
step1Defaults = {'--relval' : None, # need to be explicitly set
Expand Down Expand Up @@ -4579,13 +4581,17 @@ def gen2024HiMix(fragment,howMuch):
for gen in upgradeFragments:
for ds in defaultDataSets:
key=gen[:-4]+'_'+ds
version='1'
version = "1"
if defaultDataSets[ds] == '' or ds =='Run4D98':
version = undefInput
if key in versionOverrides:
version = versionOverrides[key]
baseDataSetReleaseBetter[key]=defaultDataSets[ds]+version

PUDataSets={}
for ds in defaultDataSets:
if "GenOnly" in ds:
continue
key='MinBias_14TeV_pythia8_TuneCP5'+'_'+ds
name=baseDataSetReleaseBetter[key]
if '2017' in ds:
Expand All @@ -4605,7 +4611,6 @@ def gen2024HiMix(fragment,howMuch):
#PUDataSets[ds]={'-n':10,'--pileup':'AVE_50_BX_25ns','--pileup_input':'das:/RelValMinBias_13/%s/GEN-SIM'%(name,)}
#PUDataSets[ds]={'-n':10,'--pileup':'AVE_70_BX_25ns','--pileup_input':'das:/RelValMinBias_13/%s/GEN-SIM'%(name,)}


upgradeStepDict={}
for specialType,specialWF in upgradeWFs.items():
specialWF.init(upgradeStepDict)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from .MatrixUtil import merge, Kby, Mby, check_dups
import re

undefInput = "UNDEF"

U2000by1={'--relval': '2000,1'}

# DON'T CHANGE THE ORDER, only append new keys. Otherwise the numbering for the runTheMatrix tests will change.
Expand Down Expand Up @@ -3023,6 +3025,8 @@ def condition(self, fragment, stepList, key, hasHarvest):

# standard PU sequences
for key in list(upgradeProperties[2017].keys()):
if "GenOnly" in key:
continue
upgradeProperties[2017][key+'PU'] = deepcopy(upgradeProperties[2017][key])
if 'FS' not in key:
# update ScenToRun list
Expand Down Expand Up @@ -3251,6 +3255,8 @@ def condition(self, fragment, stepList, key, hasHarvest):

# standard PU sequences
for key in list(upgradeProperties['Run4'].keys()):
if "GenOnly" in key:
continue
upgradeProperties['Run4'][key+'PU'] = deepcopy(upgradeProperties['Run4'][key])
upgradeProperties['Run4'][key+'PU']['ScenToRun'] = ['GenSimHLBeamSpot','DigiTriggerPU','RecoGlobalPU', 'HARVESTGlobalPU']

Expand Down
Loading