Skip to content

Commit

Permalink
Merge branch 'master' of ssh://github.com/dmwm/CRABClient into resubm…
Browse files Browse the repository at this point in the history
…it_push
  • Loading branch information
emaszs committed Oct 5, 2016
2 parents dc23dfc + 0991fde commit 11e8179
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 12 deletions.
4 changes: 3 additions & 1 deletion src/python/CRABClient/ClientMapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,9 @@
'checkusername' : {'acceptsArguments': False, 'requiresREST': False, 'initializeProxy': True, 'requiresDirOption': False, 'useCache': False, 'requiresProxyVOOptions': False, 'doProxyGroupRoleCheck': False, 'requiresLocalCache': False},
'checkwrite' : {'acceptsArguments': False, 'requiresREST': False, 'initializeProxy': True, 'requiresDirOption': False, 'useCache': False, 'requiresProxyVOOptions': True, 'doProxyGroupRoleCheck': True, 'requiresLocalCache': False},
'getlog' : {'acceptsArguments': False, 'requiresREST': True, 'initializeProxy': True, 'requiresDirOption': True, 'useCache': True, 'requiresProxyVOOptions': True, 'doProxyGroupRoleCheck': True, 'requiresLocalCache': True },
'getlog2' : {'acceptsArguments': False, 'requiresREST': True, 'initializeProxy': True, 'requiresDirOption': True, 'useCache': True, 'requiresProxyVOOptions': True, 'doProxyGroupRoleCheck': True, 'requiresLocalCache': True },
'getoutput' : {'acceptsArguments': False, 'requiresREST': True, 'initializeProxy': True, 'requiresDirOption': True, 'useCache': True, 'requiresProxyVOOptions': True, 'doProxyGroupRoleCheck': True, 'requiresLocalCache': True },
'getoutput2' : {'acceptsArguments': False, 'requiresREST': True, 'initializeProxy': True, 'requiresDirOption': True, 'useCache': True, 'requiresProxyVOOptions': True, 'doProxyGroupRoleCheck': True, 'requiresLocalCache': True },
'kill' : {'acceptsArguments': False, 'requiresREST': True, 'initializeProxy': True, 'requiresDirOption': True, 'useCache': False, 'requiresProxyVOOptions': False, 'doProxyGroupRoleCheck': False, 'requiresLocalCache': True },
'proceed' : {'acceptsArguments': False, 'requiresREST': True, 'initializeProxy': True, 'requiresDirOption': True, 'useCache': True, 'requiresProxyVOOptions': False, 'doProxyGroupRoleCheck': False, 'requiresLocalCache': True },
'purge' : {'acceptsArguments': False, 'requiresREST': True, 'initializeProxy': True, 'requiresDirOption': True, 'useCache': False, 'requiresProxyVOOptions': False, 'doProxyGroupRoleCheck': False, 'requiresLocalCache': True },
Expand All @@ -135,7 +137,7 @@
'resubmit' : {'acceptsArguments': False, 'requiresREST': True, 'initializeProxy': True, 'requiresDirOption': True, 'useCache': True, 'requiresProxyVOOptions': False, 'doProxyGroupRoleCheck': False, 'requiresLocalCache': True },
'resubmit2' : {'acceptsArguments': False, 'requiresREST': True, 'initializeProxy': True, 'requiresDirOption': True, 'useCache': True, 'requiresProxyVOOptions': False, 'doProxyGroupRoleCheck': False, 'requiresLocalCache': True },
'status' : {'acceptsArguments': False, 'requiresREST': True, 'initializeProxy': True, 'requiresDirOption': True, 'useCache': True, 'requiresProxyVOOptions': False, 'doProxyGroupRoleCheck': False, 'requiresLocalCache': True },
'status2' : {'acceptsArguments': False, 'requiresREST': True, 'initializeProxy': True, 'requiresDirOption': True, 'useCache': True, 'requiresProxyVOOptions': False, 'doProxyGroupRoleCheck': False, 'requiresLocalCache': True },
'status2' : {'acceptsArguments': False, 'requiresREST': True, 'initializeProxy': True, 'requiresDirOption': True, 'useCache': True, 'requiresProxyVOOptions': False, 'doProxyGroupRoleCheck': False, 'requiresLocalCache': True },
'submit' : {'acceptsArguments': True, 'requiresREST': True, 'initializeProxy': True, 'requiresDirOption': False, 'useCache': False, 'requiresProxyVOOptions': False, 'doProxyGroupRoleCheck': True , 'requiresLocalCache': False},
'tasks' : {'acceptsArguments': False, 'requiresREST': True, 'initializeProxy': True, 'requiresDirOption': False, 'useCache': False, 'requiresProxyVOOptions': False, 'doProxyGroupRoleCheck': False, 'requiresLocalCache': False},
'uploadlog' : {'acceptsArguments': False, 'requiresREST': True, 'initializeProxy': True, 'requiresDirOption': True, 'useCache': True, 'requiresProxyVOOptions': False, 'doProxyGroupRoleCheck': False, 'requiresLocalCache': False}
Expand Down
80 changes: 69 additions & 11 deletions src/python/CRABClient/Commands/getcommand.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
from CRABClient.ClientExceptions import ConfigurationException , RESTCommunicationException
from CRABClient.ClientUtilities import validateJobids, colors
from CRABClient import __version__

from WMCore.Services.PhEDEx.PhEDEx import PhEDEx

import CRABClient.Emulator

import os
Expand All @@ -20,12 +23,16 @@ class getcommand(SubCommand):


def __call__(self, **argv):
# TODO: remove this 'if' once transition to status2 is complete
if argv.get('subresource') in ['data2', 'logs2']:
self.processAndStoreJobIds()

## Retrieve the transferLogs parameter from the task database.
taskdbparam, configparam = '', ''
if argv.get('subresource') == 'logs':
if argv.get('subresource') in ['logs', 'logs2']:
taskdbparam = 'tm_save_logs'
configparam = "General.transferLogs"
elif argv.get('subresource') == 'data':
elif argv.get('subresource') in ['data', 'data2']:
taskdbparam = 'tm_transfer_outputs'
configparam = "General.transferOutputs"

Expand All @@ -34,22 +41,22 @@ def __call__(self, **argv):
serverFactory = CRABClient.Emulator.getEmulator('rest')
server = serverFactory(self.serverurl, self.proxyfilename, self.proxyfilename, version=__version__)
uri = self.getUrl(self.instance, resource = 'task')
dictresult, status, reason = server.get(uri, data = inputlist)
dictresult, status, _ = server.get(uri, data = inputlist)
self.logger.debug('Server result: %s' % dictresult)
if status == 200:
if 'desc' in dictresult and 'columns' in dictresult['desc']:
position = dictresult['desc']['columns'].index(taskdbparam)
position = dictresult['desc']['columns'].index(taskdbparam)
transferFlag = dictresult['result'][position] #= 'T' or 'F'
else:
self.logger.debug("Unable to locate %s in server result." % (taskdbparam))
## If transferFlag = False, there is nothing to retrieve.
if transferFlag == 'F':
msg = "No files to retrieve. Files not transferred to storage since task configuration parameter %s is False." % (configparam)
msg = "No files to retrieve. Files not transferred to storage since task configuration parameter %s is False." % (configparam)
self.logger.info(msg)
return {'success': {}, 'failed': {}}

## Retrieve tm_edm_outfiles, tm_tfile_outfiles and tm_outfiles from the task database and check if they are empty.
if argv.get('subresource') == 'data' and status == 200:
if argv.get('subresource') in ['data', 'data2'] and status == 200:
if 'desc' in dictresult and 'columns' in dictresult['desc']:
position = dictresult['desc']['columns'].index('tm_edm_outfiles')
tm_edm_outfiles = dictresult['result'][position]
Expand Down Expand Up @@ -86,20 +93,25 @@ def __call__(self, **argv):
raise RESTCommunicationException(msg)

totalfiles = len(dictresult['result'])
workflow = dictresult['result']
if len(workflow) > 0:
fileInfoList = dictresult['result']

# TODO: remove this 'if' once transition to status2 is complete
if argv.get('subresource') in ['data2', 'logs2']:
self.insertPfns(fileInfoList)

if len(fileInfoList) > 0:
if self.options.dump or self.options.xroot:
self.logger.debug("Getting url info")
else:
self.setDestination()
self.logger.info("Setting the destination to %s " % self.dest)
if self.options.xroot:
self.logger.debug("XRootD urls are requested")
xrootlfn = ["root://cms-xrd-global.cern.ch/%s" % link['lfn'] for link in workflow]
xrootlfn = ["root://cms-xrd-global.cern.ch/%s" % link['lfn'] for link in fileInfoList]
self.logger.info("\n".join(xrootlfn))
returndict = {'xrootd': xrootlfn}
elif self.options.dump:
jobid_pfn_lfn_list = sorted(map(lambda x: (x['jobid'], x['pfn'], x['lfn']), workflow))
jobid_pfn_lfn_list = sorted(map(lambda x: (x['jobid'], x['pfn'], x['lfn']), fileInfoList))
lastjobid = -1
filecounter = 1
msg = ""
Expand All @@ -115,7 +127,7 @@ def __call__(self, **argv):
returndict = {'pfn': [pfn for _, pfn, _ in jobid_pfn_lfn_list], 'lfn': [lfn for _, _, lfn in jobid_pfn_lfn_list]}
else:
self.logger.info("Retrieving %s files" % (totalfiles))
arglist = ['--destination', self.dest, '--input', workflow, '--dir', self.options.projdir, \
arglist = ['--destination', self.dest, '--input', fileInfoList, '--dir', self.options.projdir, \
'--proxy', self.proxyfilename, '--parallel', self.options.nparallel, '--wait', self.options.waittime, \
'--checksum', self.checksum, '--command', self.command]
copyoutput = remote_copy(self.logger, arglist)
Expand All @@ -134,6 +146,52 @@ def __call__(self, **argv):

return returndict

def processAndStoreJobIds(self):
"""
Call the status command to check that the jobids passed by the user are in a valid
state to retrieve files. Otherwise, if no jobids are passed by the user, populate the
list with all possible jobids.
Also store some information which is used later when deciding the correct pfn.
"""
mod = __import__('CRABClient.Commands.status2', fromlist='status2')

cmdobj = getattr(mod, 'status2')(self.logger)
_, jobList = cmdobj.__call__()
jobList = jobList['jobList']
transferringIds = [x[1] for x in jobList if x[0] in ['transferring', 'cooloff', 'held']]
finishedIds = [x[1] for x in jobList if x[0] in ['finished', 'failed', 'transferred']]
possibleJobIds = transferringIds + finishedIds

if self.options.jobids:
for jobid in self.options.jobids:
if not str(jobid[1]) in possibleJobIds:
raise ConfigurationException("The job with id %s is not in a valid state to retrieve output files" % jobid[1])
else:
## If the user does not give us jobids, set them to all possible ids.
self.options.jobids = []
for jobid in possibleJobIds:
self.options.jobids.append(('jobids', jobid))

self.transferringIds = transferringIds

def insertPfns(self, fileInfoList):
"""
Query phedex to retrieve the pfn for each file and store it in the passed fileInfoList.
"""
phedex = PhEDEx({'cert': self.proxyfilename, 'key': self.proxyfilename, 'logger': self.logger, 'pycurl': True})

# Pick out the correct lfns and sites
if len(fileInfoList) > 0:
for fileInfo in fileInfoList:
if str(fileInfo['jobid']) in self.transferringIds:
lfn = fileInfo['tmplfn']
site = fileInfo['tmpsite']
else:
lfn = fileInfo['lfn']
site = fileInfo['site']
pfn = phedex.getPFN(site, lfn)[(site, lfn)]
fileInfo['pfn'] = pfn

def setDestination(self):
#Setting default destination if -o is not provided
Expand Down
128 changes: 128 additions & 0 deletions src/python/CRABClient/Commands/getlog2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
from __future__ import print_function
from __future__ import division

import CRABClient.Emulator
from CRABClient import __version__
from CRABClient.ClientUtilities import colors
from CRABClient.UserUtilities import getFileFromURL
from CRABClient.Commands.getcommand import getcommand
from CRABClient.ClientExceptions import RESTCommunicationException, ClientException, MissingOptionException

from ServerUtilities import getProxiedWebDir


class getlog2(getcommand):
"""
Important: code here is identical to the old getlog implementation (aside from setting the subresource to
'logs2' when calling getcommand and the names of the command/class themselves). This was done because trying to
avoid copy-paste code isn't worth the effort in this case. When the status2 is working correctly, old code will
be easily removed and replaced with the 'getlog2' version. Also, the command 'getlog' itself is deprecated and
we don't expect to make any changes to it until it's removed.
Class description:
Retrieve the log files of a number of jobs specified by the -q/--quantity option.
-q logfiles per exit code are returned if transferLogs = False; otherwise all the log files
collected by the LogCollect job are returned. The task is identified by the -d/--dir option.
"""
name = 'getlog2'
shortnames = ['log2']
visible = True #overwrite getcommand

def __call__(self):
if self.options.short:
taskname = self.cachedinfo['RequestName']
inputlist = {'subresource': 'webdir', 'workflow': taskname}
serverFactory = CRABClient.Emulator.getEmulator('rest')
server = serverFactory(self.serverurl, self.proxyfilename, self.proxyfilename, version=__version__)
uri = self.getUrl(self.instance, resource = 'task')
webdir = getProxiedWebDir(taskname, self.serverurl, uri, self.proxyfilename, self.logger.debug)
if not webdir:
dictresult, status, reason = server.get(uri, data = inputlist)
webdir = dictresult['result'][0]
self.logger.info('Server result: %s' % webdir)
if status != 200:
msg = "Problem retrieving information from the server:\ninput:%s\noutput:%s\nreason:%s" % (str(inputlist), str(dictresult), str(reason))
raise RESTCommunicationException(msg)
self.setDestination()
self.logger.info("Setting the destination to %s " % self.dest)
failed, success = self.retrieveShortLogs(webdir, self.proxyfilename)
if failed:
msg = "%sError%s: Failed to retrieve the following files: %s" % (colors.RED,colors.NORMAL,failed)
self.logger.info(msg)
else:
self.logger.info("%sSuccess%s: All files successfully retrieved." % (colors.GREEN,colors.NORMAL))
returndict = {'success': success, 'failed': failed}
else:
# Different from the old getlog code: set 'logs2' as subresource so that 'getcommand' uses the new logic.
returndict = getcommand.__call__(self, subresource = 'logs2')
if ('success' in returndict and not returndict['success']) or \
('failed' in returndict and returndict['failed']):
msg = "You can use the --short option to retrieve a short version of the log files from the Grid scheduler."
self.logger.info(msg)

return returndict


def setOptions(self):
"""
__setOptions__
This allows to set specific command options
"""
self.parser.add_option( '--quantity',
dest = 'quantity',
help = 'The number of logs you want to retrieve (or "all"). Ignored if --jobids is used.' )
self.parser.add_option( '--parallel',
dest = 'nparallel',
help = 'Number of parallel download, default is 10 parallel download.',)
self.parser.add_option( '--wait',
dest = 'waittime',
help = 'Increase the sendreceive-timeout in second.',)
self.parser.add_option( '--short',
dest = 'short',
default = False,
action = 'store_true',
help = 'Get the short version of the log file. Use with --dir and --jobids.',)
getcommand.setOptions(self)


def validateOptions(self):
getcommand.validateOptions(self)
if self.options.short:
if self.options.jobids is None:
msg = "%sError%s: Please specify the job ids for which to retrieve the logs." % (colors.GREEN, colors.NORMAL)
msg += " Use the --jobids option."
ex = MissingOptionException(msg)
ex.missingOption = "jobids"
raise ex


def retrieveShortLogs(self, webdir, proxyfilename):
self.logger.info("Retrieving...")
success = []
failed = []
for _, jobid in self.options.jobids:
## We don't know a priori how many retries the job had. So we start with retry 0
## and increase it by 1 until we are unable to retrieve a log file (interpreting
## this as the fact that we reached the highest retry already).
retry = 0
succeded = True
while succeded:
filename = 'job_out.%s.%s.txt' % (jobid, retry)
url = webdir + '/' + filename
try:
getFileFromURL(url, self.dest + '/' + filename, proxyfilename)
self.logger.info('Retrieved %s' % (filename))
success.append(filename)
retry += 1 #To retrieve retried job log, if there is any.
except ClientException as ex:
succeded = False
## Ignore the exception if the HTTP status code is 404. Status 404 means file
## not found (see http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html). File
## not found error is expected, since we try all the job retries.
if not hasattr(ex, "status") or ex.status!=404:
self.logger.debug(str(ex))
failed.append(filename)

return failed, success

34 changes: 34 additions & 0 deletions src/python/CRABClient/Commands/getoutput2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from __future__ import print_function
from __future__ import division

from CRABClient.Commands.getcommand import getcommand

class getoutput2(getcommand):
""" Retrieve the output files of a number of jobs specified by the -q/--quantity option. The task
is identified by the -d/--dir option
"""
name = 'getoutput2'
shortnames = ['output2', 'out2']
visible = True #overwrite getcommand

def __call__(self):
returndict = getcommand.__call__(self, subresource = 'data2')

return returndict

def setOptions(self):
"""
__setOptions__
This allows to set specific command options
"""
self.parser.add_option( '--quantity',
dest = 'quantity',
help = 'The number of output files you want to retrieve (or "all"). Ignored if --jobids is used.' )
self.parser.add_option( '--parallel',
dest = 'nparallel',
help = 'Number of parallel download, default is 10 parallel download.',)
self.parser.add_option( '--wait',
dest = 'waittime',
help = 'Increase the sendreceive-timeout in second',)
getcommand.setOptions(self)

0 comments on commit 11e8179

Please sign in to comment.