Skip to content

Commit

Permalink
Fix location of published files
Browse files Browse the repository at this point in the history
  • Loading branch information
Hassen Riahi authored and Hassen Riahi committed Jul 21, 2012
1 parent a3792be commit 280bdac
Showing 1 changed file with 18 additions and 7 deletions.
25 changes: 18 additions & 7 deletions src/python/AsyncStageOut/PublisherWorker.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from AsyncStageOut import getHashLfn
from WMCore.Services.UserFileCache.UserFileCache import UserFileCache
from WMCore.DataStructs.Run import Run
from WMCore.Services.PhEDEx.PhEDEx import PhEDEx

def getProxy(userdn, group, role, defaultDelegation, logger):
"""
Expand Down Expand Up @@ -133,6 +134,7 @@ def __init__(self, user, config):
self.userProxy = config.serviceCert
self.ufc = UserFileCache({'endpoint': self.userFileCacheEndpoint})
os.environ['X509_USER_PROXY'] = self.userProxy
self.phedexApi = PhEDEx(responseType='json')

def __call__(self):
"""
Expand Down Expand Up @@ -160,7 +162,17 @@ def __call__(self):
if (( time.time() - wf_jobs_endtime[len(wf_jobs_endtime) - 1] )/3600) < self.config.workflow_expiration_time:
continue
if lfn_ready:
failed_files, good_files = self.publish( str(file['key'][4]), str(file['value'][2]), str(file['key'][3]), str(file['key'][0]), str(file['value'][3]), str(file['value'][4]), lfn_ready )
try:
seName = self.phedexApi.getNodeSE( str(file['value'][0]) )
if not seName:
continue
except Exception, ex:
msg = "SE of %s cannot be retrieved" % str(file['value'][0])
msg += str(ex)
msg += str(traceback.format_exc())
self.logger.error(msg)
continue
failed_files, good_files = self.publish( str(file['key'][4]), str(file['value'][2]), str(file['key'][3]), str(file['key'][0]), str(file['value'][3]), str(file['value'][4]), str(seName), lfn_ready )
self.mark_failed( failed_files )
self.mark_good( good_files )

Expand Down Expand Up @@ -236,7 +248,7 @@ def mark_failed( self, files=[] ):
msg += str(traceback.format_exc())
self.logger.error(msg)

def publish(self, workflow, dbsurl, userdn, userhn, inputDataset, sourceurl, lfn_ready):
def publish(self, workflow, dbsurl, userdn, userhn, inputDataset, sourceurl, targetSE, lfn_ready):
"""Perform the data publication of the workflow result.
:arg str workflow: a workflow name
:arg str dbsurl: the DBS URL endpoint where to publish
Expand All @@ -252,7 +264,7 @@ def publish(self, workflow, dbsurl, userdn, userhn, inputDataset, sourceurl, lfn
self.logger.info("Starting data publication for: " + str(workflow))
failed, done, dbsResults = self.publishInDBS(userdn=userdn, sourceURL=sourceurl,
inputDataset=inputDataset, toPublish=toPublish,
destURL=dbsurl)
destURL=dbsurl, targetSE=targetSE)
self.logger.debug("DBS publication results %s" % dbsResults)
return failed, done

Expand Down Expand Up @@ -296,7 +308,7 @@ def decodeAsString(a):
toPublish[datasetName].append(lfn)
else:
toPublish[datasetName] = copy.deepcopy(requestPublish[datasetName])
# Clean toPublish keeping only completed files
# Clean toPublish keeping only completed files
fail_files, toPublish = self.clean(lfn_ready, toPublish)
tgz.close()
shutil.rmtree(tmpDir, ignore_errors=True)
Expand Down Expand Up @@ -341,7 +353,7 @@ def dbsFiles_to_failed(self, dbsFiles):
failed.append(file['LogicalFileName'])
return failed

def publishInDBS(self, userdn, sourceURL, inputDataset, toPublish, destURL):
def publishInDBS(self, userdn, sourceURL, inputDataset, toPublish, destURL, targetSE):
"""
Actually do the publishing
"""
Expand Down Expand Up @@ -403,8 +415,7 @@ def getParentLFNs(self):
appName = files[0]["appName"]
appVer = files[0]["appVer"]
appFam = files[0]["appFam"]
seName = {'Name': files[0]["locations"][0]}
seName = str(files[0]["locations"][0])
seName = targetSE

empty, primName, procName, tier = datasetPath.split('/')

Expand Down

0 comments on commit 280bdac

Please sign in to comment.