Skip to content

Commit

Permalink
Fix block creation algorithm
Browse files Browse the repository at this point in the history
  • Loading branch information
Hassen Riahi authored and Hassen Riahi committed Jul 23, 2012
1 parent 280bdac commit 02eb3a7
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 11 deletions.
2 changes: 1 addition & 1 deletion configuration/Example.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,6 @@
config.DBSPublisher.serverDN = hostDN
config.DBSPublisher.serviceCert = serviceCert
config.DBSPublisher.serviceKey = "/path/to/valid/host-key"
config.DBSPublisher.min_files_per_block = 1
config.DBSPublisher.max_files_per_block = 100
config.DBSPublisher.workflow_expiration_time = 3

41 changes: 31 additions & 10 deletions src/python/AsyncStageOut/PublisherWorker.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,12 @@ def __init__(self, user, config):
self.ufc = UserFileCache({'endpoint': self.userFileCacheEndpoint})
os.environ['X509_USER_PROXY'] = self.userProxy
self.phedexApi = PhEDEx(responseType='json')
self.max_files_per_block = self.config.max_files_per_block


def __call__(self):
"""
1- check the nubmer of files in wf to publish if it is > min_files_per_block
1- check the nubmer of files in wf to publish if it is < max_files_per_block
2- check in wf if now - last_finished_job > max_publish_time
3- then call publish, mark_good, mark_failed for each wf
"""
Expand All @@ -156,8 +158,8 @@ def __call__(self):
wf_jobs_endtime.append(file['value'][5])
lfn_ready.append(file['value'][1])
self.logger.debug('LFNs ready %s' %lfn_ready)
# If the number of files < min_files_per_block then check the oldness of the workflow
if wf['value'] < self.config.min_files_per_block:
# If the number of files < max_files_per_block then check the oldness of the workflow
if wf['value'] < self.max_files_per_block:
wf_jobs_endtime.sort()
if (( time.time() - wf_jobs_endtime[len(wf_jobs_endtime) - 1] )/3600) < self.config.workflow_expiration_time:
continue
Expand Down Expand Up @@ -308,7 +310,7 @@ def decodeAsString(a):
toPublish[datasetName].append(lfn)
else:
toPublish[datasetName] = copy.deepcopy(requestPublish[datasetName])
# Clean toPublish keeping only completed files
# Clean toPublish keeping only completed files
fail_files, toPublish = self.clean(lfn_ready, toPublish)
tgz.close()
shutil.rmtree(tmpDir, ignore_errors=True)
Expand Down Expand Up @@ -387,6 +389,7 @@ def getParentLFNs(self):
parents.append({'LogicalFileName': str(lfn)})
return parents

publish_next_iteration = []
failed = []
published = []
results = {}
Expand Down Expand Up @@ -463,23 +466,41 @@ def getParentLFNs(self):

count = 0
blockCount = 0
while count < len(dbsFiles):
if len(dbsFiles) < self.max_files_per_block:
try:
block = createFileBlock(apiRef=destApi, datasetPath=processed, seName=seName)
status = insertFiles(apiRef=destApi, datasetPath=str(datasetPath), files=dbsFiles[count:count+blockSize], block=block, maxFiles=100)
status = insertFiles(apiRef=destApi, datasetPath=str(datasetPath), files=dbsFiles[count:count+blockSize], block=block, maxFiles=blockSize)
count += blockSize
blockCount += 1
status = closeBlock(apiRef=destApi, block=block)
# TODO: check here last files and decide whether a new block is required
# New block is required only max_files_per_block + 50 % > remaining_files > max_files_per_block
except Exception, ex:
failed = self.dbsFiles_to_failed(dbsFiles)
msg = "Error when publishing"
msg += str(ex)
msg += str(traceback.format_exc())
self.logger.error(msg)
else:
while count < len(dbsFiles):
try:
if len(dbsFiles[count:len(dbsFiles)]) < self.max_files_per_block:
for file in dbsFiles[count:len(dbsFiles)]:
publish_next_iteration.append(file['LogicalFileName'])
count += blockSize
continue
block = createFileBlock(apiRef=destApi, datasetPath=processed, seName=seName)
status = insertFiles(apiRef=destApi, datasetPath=str(datasetPath), files=dbsFiles[count:count+blockSize], block=block, maxFiles=blockSize)
count += blockSize
blockCount += 1
status = closeBlock(apiRef=destApi, block=block)
except Exception, ex:
failed = self.dbsFiles_to_failed(dbsFiles)
msg = "Error when publishing"
msg += str(ex)
msg += str(traceback.format_exc())
self.logger.error(msg)

results[datasetPath]['files'] = len(dbsFiles)
results[datasetPath]['blocks'] = blockCount
published = filter(lambda x: x not in failed, published)
self.logger.info("end of publication failed %s published %s results %s" %(failed, published, results))
published = filter(lambda x: x not in failed + publish_next_iteration, published)
self.logger.info("end of publication failed %s published %s publish_next_iteration %s results %s" %(failed, published, publish_next_iteration, results))
return failed, published, results

0 comments on commit 02eb3a7

Please sign in to comment.