From 0ca18636ba33cff9f785ae2783e0922284cfd450 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20N=C3=B6the?= Date: Thu, 25 Oct 2018 13:05:23 +0200 Subject: [PATCH 01/16] Start working on slurm backend for new isdc cluster --- erna/automatic_processing/job_submitter.py | 22 ++-- erna/automatic_processing/slurm.py | 131 +++++++++++++++++++++ erna/automatic_processing/utils.py | 5 +- 3 files changed, 143 insertions(+), 15 deletions(-) create mode 100644 erna/automatic_processing/slurm.py diff --git a/erna/automatic_processing/job_submitter.py b/erna/automatic_processing/job_submitter.py index c522c8d..f019004 100644 --- a/erna/automatic_processing/job_submitter.py +++ b/erna/automatic_processing/job_submitter.py @@ -13,16 +13,16 @@ class JobSubmitter(Thread): def __init__( - self, - interval, - max_queued_jobs, - data_directory, - host, - port, - group, - mail_address=None, - mail_settings='a', - ): + self, + interval, + max_queued_jobs, + data_directory, + host, + port, + group, + mail_address=None, + mail_settings='a', + ): ''' Parametrs ---------- @@ -61,7 +61,7 @@ def run(self): self.process_pending_jobs() except peewee.OperationalError: log.warning('Lost database connection') - except: + except Exception as e: log.exception('Error during submission') self.event.wait(self.interval) diff --git a/erna/automatic_processing/slurm.py b/erna/automatic_processing/slurm.py new file mode 100644 index 0000000..6c099c6 --- /dev/null +++ b/erna/automatic_processing/slurm.py @@ -0,0 +1,131 @@ +import subprocess as sp +import os +import logging + +from .utils import get_aux_dir +from .database import ProcessingState +from .database_utils import ( + build_output_base_name, build_output_directory_name, + save_xml, save_jar +) + + +log = logging.getLogger(__name__) + + +def build_sbatch_command( + executable, + *args, + stdout=None, + stderr=None, + job_name=None, + queue=None, + mail_address=None, + mail_settings='a', + resources=None, +): + command = [] + command.append('sbatch') + + if job_name: + command.extend(['-J', job_name]) + + if queue: + command.extend(['-p', queue]) + + if mail_address: + command.append('--mail-user={}'.format(mail_address)) + + command.append('--mail-type={}'.format(mail_settings)) + + if stdout: + command.extend(['-o', stdout]) + + if stderr: + command.extend(['-e', stderr]) + + if resources: + command.append('-l') + command.append(','.join( + '{}={}'.format(k, v) + for k, v in resources.items() + )) + + command.append(executable) + command.extend(args) + + return command + + +def build_automatic_processing_sbatch_command( + queue, + **kwargs +): + + executable = sp.check_output( + ['which', 'erna_automatic_processing_executor'] + ).decode().strip() + + cmd = build_sbatch_command( + executable=executable, + queue=queue, + **kwargs, + ) + + return cmd + + +def submit_job( + job, + output_base_dir, + data_dir, + submitter_host, + submitter_port, + group, + **kwargs +): + + jar_file = save_jar(job.jar_id, data_dir) + xml_file = save_xml(job.xml_id, data_dir) + + aux_dir = get_aux_dir(job.raw_data_file.night) + output_dir = build_output_directory_name(job, output_base_dir) + output_basename = build_output_base_name(job) + + log_dir = os.path.join(data_dir, 'logs') + os.makedirs(log_dir, exist_ok=True) + + cmd = build_automatic_processing_sbatch_command( + output_basename=output_basename, + output_dir=output_dir, + job_name='erna_{}'.format(job.id), + stdout=os.path.join(log_dir, 'erna_{:08d}.log'.format(job.id)), + queue=job.queue.name, + walltime=job.queue.walltime, + group=group, + **kwargs, + ) + + env = os.environ.copy() + env.update({ + 'JARFILE': jar_file, + 'XMLFILE': xml_file, + 'OUTPUTDIR': output_dir, + 'WALLTIME': job.queue.walltime, + 'SUBMITTER_HOST': submitter_host, + 'SUBMITTER_PORT': submitter_port, + 'facttools_infile': 'file:' + job.raw_data_file.get_path(), + 'facttools_drsfile': 'file:' + job.drs_file.get_path(), + 'facttools_aux_dir': 'file:' + aux_dir, + 'facttools_output_basename': output_basename, + 'ERNA_GROUP': group, + }) + + output = sp.check_output( + cmd, + env=env, + ) + log.debug(output.decode().strip()) + + job.status = ProcessingState.get(description='queued') + job.save() diff --git a/erna/automatic_processing/utils.py b/erna/automatic_processing/utils.py index 0be865a..815eee4 100644 --- a/erna/automatic_processing/utils.py +++ b/erna/automatic_processing/utils.py @@ -19,10 +19,7 @@ def parse_path(path): return date(year, month, day), run_id -def get_aux_dir(night): - - basepath = '/fact/aux' - +def get_aux_dir(night, basepath='/fact/aux'): return os.path.join( basepath, '{:04d}'.format(night.year), From 469737395ce92340b1a296b66271d4294f13470c Mon Sep 17 00:00:00 2001 From: Maximilian Noethe Date: Fri, 26 Oct 2018 15:46:56 +0200 Subject: [PATCH 02/16] Should work on new slurm sluster at geneva now --- erna/automatic_processing/__main__.py | 15 +- erna/automatic_processing/database.py | 29 +-- erna/automatic_processing/database_utils.py | 22 +-- erna/automatic_processing/job_submitter.py | 25 ++- erna/automatic_processing/qsub.py | 200 -------------------- erna/automatic_processing/slurm.py | 84 ++++---- erna/automatic_processing/utils.py | 10 - erna/scripts/submit_runlist.py | 8 +- screenrc_erna | 9 +- setup.py | 3 +- 10 files changed, 90 insertions(+), 315 deletions(-) delete mode 100644 erna/automatic_processing/qsub.py diff --git a/erna/automatic_processing/__main__.py b/erna/automatic_processing/__main__.py index 6e9b37f..4165645 100644 --- a/erna/automatic_processing/__main__.py +++ b/erna/automatic_processing/__main__.py @@ -28,7 +28,7 @@ def main(config, verbose): logging.getLogger('erna').setLevel(logging.DEBUG) stream_handler = logging.StreamHandler() - file_handler = logging.FileHandler(config['submitter']['logfile']) + file_handler = logging.FileHandler(config['submitter'].pop('logfile')) formatter = logging.Formatter( '%(asctime)s|%(levelname)s|%(name)s|%(message)s' ) @@ -44,16 +44,7 @@ def main(config, verbose): database.close() job_monitor = JobMonitor(port=config['submitter']['port']) - job_submitter = JobSubmitter( - interval=config['submitter']['interval'], - max_queued_jobs=config['submitter']['max_queued_jobs'], - data_directory=config['submitter']['data_directory'], - host=config['submitter']['host'], - port=config['submitter']['port'], - group=config['submitter']['group'], - mail_address=config['submitter']['mail_address'], - mail_settings=config['submitter']['mail_settings'], - ) + job_submitter = JobSubmitter(**config['submitter']) log.info('Starting main loop') try: @@ -75,6 +66,6 @@ def main(config, verbose): inserted = ProcessingState.get(description='inserted') for job in Job.select().where((Job.status == running) | (Job.status == queued)): - sp.run(['qdel', 'erna_{}'.format(job.id)]) + sp.run(['scancel', '--jobname=erna_{}'.format(job.id)]) job.status = inserted job.save() diff --git a/erna/automatic_processing/database.py b/erna/automatic_processing/database.py index 2fc3ab6..9844697 100644 --- a/erna/automatic_processing/database.py +++ b/erna/automatic_processing/database.py @@ -14,7 +14,7 @@ __all__ = [ 'RawDataFile', 'DrsFile', - 'Jar', 'XML', 'Job', 'Queue', + 'Jar', 'XML', 'Job', 'ProcessingState', 'database', 'setup_database', ] @@ -31,12 +31,6 @@ 'walltime_exceeded', ] -WALLTIMES = { - 'fact_short': 60 * 60, - 'fact_medium': 6 * 60 * 60, - 'fact_long': 7 * 24 * 60 * 60, -} - class RetryMySQLDatabase(RetryOperationalError, MySQLDatabase): ''' Automatically reconnect when connection went down''' @@ -71,9 +65,6 @@ def setup_database(database, drop=False): for description in PROCESSING_STATES: ProcessingState.get_or_create(description=description) - for name, walltime in WALLTIMES.items(): - Queue.get_or_create(name=name, walltime=walltime) - class File(Model): night = NightField() @@ -85,9 +76,9 @@ class Meta: database = database indexes = ((('night', 'run_id'), True), ) # unique index - def get_path(self): + def get_path(self, basepath='/fact/raw'): return os.path.join( - '/fact/raw', + basepath, str(self.night.year), '{:02d}'.format(self.night.month), '{:02d}'.format(self.night.day), @@ -166,15 +157,6 @@ def __repr__(self): return '{}'.format(self.description) -class Queue(Model): - name = CharField(unique=True) - walltime = IntegerField() - - class Meta: - database = database - db_table = 'queues' - - class Job(Model): raw_data_file = ForeignKeyField(RawDataFile, related_name='raw_data_file') drs_file = ForeignKeyField(DrsFile, related_name='drs_file') @@ -182,9 +164,9 @@ class Job(Model): result_file = CharField(null=True) status = ForeignKeyField(ProcessingState, related_name='status') priority = IntegerField(default=5) + walltime = IntegerField(default=60) xml = ForeignKeyField(XML) md5hash = FixedCharField(32, null=True) - queue = ForeignKeyField(Queue, related_name='queue') class Meta: database = database @@ -193,7 +175,8 @@ class Meta: (('raw_data_file', 'jar', 'xml'), True), # unique constraint ) -MODELS = [RawDataFile, DrsFile, Jar, XML, Job, ProcessingState, Queue] + +MODELS = [RawDataFile, DrsFile, Jar, XML, Job, ProcessingState] @wrapt.decorator diff --git a/erna/automatic_processing/database_utils.py b/erna/automatic_processing/database_utils.py index 09c6de9..7543268 100644 --- a/erna/automatic_processing/database_utils.py +++ b/erna/automatic_processing/database_utils.py @@ -99,7 +99,6 @@ def find_drs_file(raw_data_file, closest=True): ''' query = DrsFile.select() query = query.where(DrsFile.night == raw_data_file.night) - query = query.where(DrsFile.available) if raw_data_file.roi == 300: query = query.where((DrsFile.drs_step == 2) & (DrsFile.roi == 300)) @@ -129,7 +128,7 @@ def insert_new_job( raw_data_file, jar, xml, - queue, + walltime, priority=5, closest_drs_file=True, ): @@ -144,8 +143,8 @@ def insert_new_job( the fact-tools jar to use xml: XML the xml to use - queue: Queue - the queue to use + walltime: walltime + the walltime to use priority: int Priority for the Job. Lower numbers mean more important. closest_drs_file: bool @@ -169,7 +168,7 @@ def insert_new_job( raw_data_file=raw_data_file, drs_file=drs_file, jar=jar, - queue=queue, + walltime=walltime, status=ProcessingState.get(description='inserted'), priority=priority, xml=xml, @@ -179,7 +178,7 @@ def insert_new_job( @requires_database_connection -def insert_new_jobs(raw_data_files, jar, xml, queue, progress=True, **kwargs): +def insert_new_jobs(raw_data_files, jar, xml, walltime, progress=True, **kwargs): if isinstance(raw_data_files, list): total = len(raw_data_files) @@ -189,7 +188,7 @@ def insert_new_jobs(raw_data_files, jar, xml, queue, progress=True, **kwargs): failed_files = [] for f in tqdm(raw_data_files, total=total, disable=not progress): try: - insert_new_job(f, jar=jar, xml=xml, queue=queue, **kwargs) + insert_new_job(f, jar=jar, xml=xml, walltime=walltime, **kwargs) except peewee.IntegrityError: log.warning('Job already submitted: {}_{:03d}'.format(f.night, f.run_id)) except ValueError as e: @@ -281,17 +280,12 @@ def build_output_base_name(job): @requires_database_connection -def resubmit_walltime_exceeded(old_queue, new_queue): +def resubmit_walltime_exceeded(factor=1.5): ''' Resubmit jobs where walltime was exceeded. - Change queue from old_queue to new_queue ''' - if old_queue.walltime >= new_queue.walltime: - raise ValueError('New queue must have longer walltime for this to make sense') - return ( Job - .update(queue=new_queue, status=ProcessingState.get(description='inserted')) + .update(walltime=factor * Job.walltime, status=ProcessingState.get(description='inserted')) .where(Job.status == ProcessingState.get(description='walltime_exceeded')) - .where(Job.queue == old_queue) ).execute() diff --git a/erna/automatic_processing/job_submitter.py b/erna/automatic_processing/job_submitter.py index f019004..6471df7 100644 --- a/erna/automatic_processing/job_submitter.py +++ b/erna/automatic_processing/job_submitter.py @@ -5,7 +5,7 @@ from .database import ProcessingState, requires_database_connection from .database_utils import count_jobs, get_pending_jobs -from .qsub import submit_job, get_current_jobs +from .slurm import submit_job, get_current_jobs log = logging.getLogger(__name__) @@ -16,7 +16,10 @@ def __init__( self, interval, max_queued_jobs, - data_directory, + raw_dir, + aux_dir, + erna_dir, + script, host, port, group, @@ -32,7 +35,7 @@ def __init__( Maximum number of jobs in the queue of the grid engine No new jobs are submitted if the number of jobs in the queue is higher than this value - data_directory: str + erna_directory: str patch to the basic structure for erna. Logfiles, jars, xmls and analysis output are stored in subdirectories to this directory. host: str @@ -48,12 +51,15 @@ def __init__( self.event = Event() self.interval = interval self.max_queued_jobs = max_queued_jobs - self.data_directory = data_directory + self.erna_dir = erna_dir + self.aux_dir = aux_dir + self.raw_dir = raw_dir self.host = host self.port = port self.group = group self.mail_settings = mail_settings self.mail_address = mail_address + self.script = script def run(self): while not self.event.is_set(): @@ -62,7 +68,7 @@ def run(self): except peewee.OperationalError: log.warning('Lost database connection') except Exception as e: - log.exception('Error during submission') + log.exception('Error during submission: {}'.format(e)) self.event.wait(self.interval) def terminate(self): @@ -92,8 +98,11 @@ def process_pending_jobs(self): try: submit_job( job, - output_base_dir=os.path.join(self.data_directory, 'fact-tools'), - data_dir=self.data_directory, + script=self.script, + output_base_dir=os.path.join(self.erna_dir, 'fact-tools'), + raw_dir=self.raw_dir, + aux_dir=self.aux_dir, + erna_dir=self.erna_dir, mail_address=self.mail_address, mail_settings=self.mail_settings, submitter_host=self.host, @@ -103,5 +112,5 @@ def process_pending_jobs(self): log.info('New job with id {} queued'.format(job.id)) except: log.exception('Could not submit job') - job.status = ProcessingState.get(description='error') + job.status = ProcessingState.get(description='failed') job.save() diff --git a/erna/automatic_processing/qsub.py b/erna/automatic_processing/qsub.py deleted file mode 100644 index c2f0817..0000000 --- a/erna/automatic_processing/qsub.py +++ /dev/null @@ -1,200 +0,0 @@ -import xmltodict -import subprocess as sp -import os -import pandas as pd -import logging - -from .utils import get_aux_dir -from .database_utils import ( - build_output_base_name, build_output_directory_name, - save_xml, save_jar -) -from .database import ProcessingState - -log = logging.getLogger(__name__) - - -def get_current_jobs(user=None): - ''' Return a dataframe with current jobs of user ''' - user = user or os.environ['USER'] - xml = sp.check_output(['qstat', '-u', user, '-xml']).decode() - data = xmltodict.parse(xml) - job_info = data['job_info'] - queue_info = job_info['queue_info'] - job_info = job_info['job_info'] - queued_jobs = queue_info['job_list'] if queue_info else [] - running_jobs = job_info['job_list'] if job_info else [] - - df = pd.DataFrame(columns=[ - '@state', 'JB_job_number', 'JAT_prio', 'JB_name', 'JB_owner', - 'state', 'JB_submission_time', 'queue_name', 'slots', 'JAT_start_time' - ]) - - if not isinstance(running_jobs, list): - running_jobs = [running_jobs] - if not isinstance(queued_jobs, list): - queued_jobs = [queued_jobs] - - df = df.append(pd.DataFrame(running_jobs + queued_jobs), ignore_index=True) - - if len(df) == 0: - return df - - df.drop('state', axis=1, inplace=True) - df.rename(inplace=True, columns={ - '@state': 'state', - 'JB_owner': 'owner', - 'JB_name': 'name', - 'JB_job_number': 'job_number', - 'JB_submission_time': 'submission_time', - 'JAT_prio': 'priority', - 'JAT_start_time': 'start_time', - }) - - df = df.astype({'job_number': int, 'priority': float}) - df['start_time'] = pd.to_datetime(df['start_time']) - df['submission_time'] = pd.to_datetime(df['submission_time']) - return df - - -def build_qsub_command( - executable, - stdout=None, - stderr=None, - job_name=None, - queue=None, - mail_address=None, - mail_settings='a', - environment=None, - resources=None, - engine='SGE', - ): - command = [] - command.append('qsub') - - if job_name: - command.extend(['-N', job_name]) - - if queue: - command.extend(['-q', queue]) - - if mail_address: - command.extend(['-M', mail_address]) - - command.extend(['-m', mail_settings]) - - # allow a binary executable - if engine == 'SGE': - command.extend(['-b', 'yes']) - - if stdout: - command.extend(['-o', stdout]) - - if stderr: - command.extend(['-e', stderr]) - - if environment: - command.append('-v') - command.append(','.join( - '{}={}'.format(k, v) - for k, v in environment.items() - )) - - if resources: - command.append('-l') - command.append(','.join( - '{}={}'.format(k, v) - for k, v in resources.items() - )) - - command.append(executable) - - return command - - -def build_automatic_processing_qsub_command( - jar_file, - xml_file, - in_file, - drs_file, - aux_dir, - output_dir, - output_basename, - submitter_host, - submitter_port, - queue, - walltime, - group, - **kwargs - ): - - executable = sp.check_output( - ['which', 'erna_automatic_processing_executor'] - ).decode().strip() - - cmd = build_qsub_command( - executable=executable, - environment={ - 'JARFILE': jar_file, - 'XMLFILE': xml_file, - 'OUTPUTDIR': output_dir, - 'WALLTIME': walltime, - 'SUBMITTER_HOST': submitter_host, - 'SUBMITTER_PORT': submitter_port, - 'facttools_infile': 'file:' + in_file, - 'facttools_drsfile': 'file:' + drs_file, - 'facttools_aux_dir': 'file:' + aux_dir, - 'facttools_output_basename': output_basename, - 'ERNA_GROUP': group, - }, - queue=queue, - **kwargs, - ) - - return cmd - - -def submit_job( - job, - output_base_dir, - data_dir, - submitter_host, - submitter_port, - group, - **kwargs - ): - - jar_file = save_jar(job.jar_id, data_dir) - xml_file = save_xml(job.xml_id, data_dir) - - aux_dir = get_aux_dir(job.raw_data_file.night) - output_dir = build_output_directory_name(job, output_base_dir) - output_basename = build_output_base_name(job) - - log_dir = os.path.join(data_dir, 'logs') - os.makedirs(log_dir, exist_ok=True) - - cmd = build_automatic_processing_qsub_command( - jar_file=jar_file, - xml_file=xml_file, - in_file=job.raw_data_file.get_path(), - drs_file=job.drs_file.get_path(), - aux_dir=aux_dir, - output_basename=output_basename, - output_dir=output_dir, - submitter_host=submitter_host, - submitter_port=submitter_port, - job_name='erna_{}'.format(job.id), - stdout=os.path.join(log_dir, 'erna_{:08d}.o'.format(job.id)), - stderr=os.path.join(log_dir, 'erna_{:08d}.e'.format(job.id)), - queue=job.queue.name, - walltime=job.queue.walltime, - group=group, - **kwargs, - ) - - output = sp.check_output(cmd) - log.debug(output.decode().strip()) - - job.status = ProcessingState.get(description='queued') - job.save() diff --git a/erna/automatic_processing/slurm.py b/erna/automatic_processing/slurm.py index 6c099c6..5db67c1 100644 --- a/erna/automatic_processing/slurm.py +++ b/erna/automatic_processing/slurm.py @@ -1,18 +1,45 @@ import subprocess as sp import os import logging +import pandas as pd -from .utils import get_aux_dir from .database import ProcessingState from .database_utils import ( build_output_base_name, build_output_directory_name, save_xml, save_jar ) +from io import StringIO log = logging.getLogger(__name__) +def get_current_jobs(user=None): + ''' Return a dataframe with current jobs of user ''' + user = user or os.environ['USER'] + fmt = '%i,%j,%P,%S,%T,%p,%u,%V' + csv = StringIO(sp.check_output([ + 'squeue', '-u', user, '-o', fmt + ]).decode()) + + df = pd.read_csv(csv) + df.rename(inplace=True, columns={ + 'STATE': 'state', + 'USER': 'owner', + 'NAME': 'name', + 'JOBID': 'job_number', + 'SUBMIT_TIME': 'submission_time', + 'PRIORITY': 'priority', + 'START_TIME': 'start_time', + 'PARTITION': 'queue', + }) + df['state'] = df['state'].str.lower() + df['start_time'] = pd.to_datetime(df['start_time']) + df['submission_time'] = pd.to_datetime(df['submission_time']) + + return df + + def build_sbatch_command( executable, *args, @@ -21,8 +48,9 @@ def build_sbatch_command( job_name=None, queue=None, mail_address=None, - mail_settings='a', + mail_settings='FAIL', resources=None, + walltime=None, ): command = [] command.append('sbatch') @@ -51,58 +79,42 @@ def build_sbatch_command( for k, v in resources.items() )) + if walltime is not None: + command.append('--time={}'.format(walltime)) + command.append(executable) command.extend(args) return command -def build_automatic_processing_sbatch_command( - queue, - **kwargs -): - - executable = sp.check_output( - ['which', 'erna_automatic_processing_executor'] - ).decode().strip() - - cmd = build_sbatch_command( - executable=executable, - queue=queue, - **kwargs, - ) - - return cmd - - def submit_job( job, + script, + raw_dir, + aux_dir, + erna_dir, output_base_dir, - data_dir, submitter_host, submitter_port, group, **kwargs ): - jar_file = save_jar(job.jar_id, data_dir) - xml_file = save_xml(job.xml_id, data_dir) + jar_file = save_jar(job.jar_id, erna_dir) + xml_file = save_xml(job.xml_id, erna_dir) - aux_dir = get_aux_dir(job.raw_data_file.night) output_dir = build_output_directory_name(job, output_base_dir) output_basename = build_output_base_name(job) - log_dir = os.path.join(data_dir, 'logs') + log_dir = os.path.join(erna_dir, 'logs') os.makedirs(log_dir, exist_ok=True) - cmd = build_automatic_processing_sbatch_command( - output_basename=output_basename, - output_dir=output_dir, + cmd = build_sbatch_command( + script, job_name='erna_{}'.format(job.id), stdout=os.path.join(log_dir, 'erna_{:08d}.log'.format(job.id)), - queue=job.queue.name, - walltime=job.queue.walltime, - group=group, + walltime=job.walltime, **kwargs, ) @@ -111,14 +123,14 @@ def submit_job( 'JARFILE': jar_file, 'XMLFILE': xml_file, 'OUTPUTDIR': output_dir, - 'WALLTIME': job.queue.walltime, + 'WALLTIME': str(job.walltime), 'SUBMITTER_HOST': submitter_host, - 'SUBMITTER_PORT': submitter_port, - 'facttools_infile': 'file:' + job.raw_data_file.get_path(), - 'facttools_drsfile': 'file:' + job.drs_file.get_path(), + 'SUBMITTER_PORT': str(submitter_port), + 'facttools_infile': 'file:' + job.raw_data_file.get_path(basepath=raw_dir), + 'facttools_drsfile': 'file:' + job.drs_file.get_path(basepath=raw_dir), 'facttools_aux_dir': 'file:' + aux_dir, 'facttools_output_basename': output_basename, - 'ERNA_GROUP': group, + 'ERNA_GROUP': str(group), }) output = sp.check_output( diff --git a/erna/automatic_processing/utils.py b/erna/automatic_processing/utils.py index 815eee4..515a90b 100644 --- a/erna/automatic_processing/utils.py +++ b/erna/automatic_processing/utils.py @@ -1,6 +1,5 @@ import re from datetime import date -import os datafile_re = re.compile(r'(?:.*/)?([0-9]{4})([0-9]{2})([0-9]{2})_([0-9]{3})\.fits(?:\.[fg]z)?$') @@ -17,12 +16,3 @@ def parse_path(path): year, month, day, run_id = map(int, match.groups()) return date(year, month, day), run_id - - -def get_aux_dir(night, basepath='/fact/aux'): - return os.path.join( - basepath, - '{:04d}'.format(night.year), - '{:02d}'.format(night.month), - '{:02d}'.format(night.day) - ) diff --git a/erna/scripts/submit_runlist.py b/erna/scripts/submit_runlist.py index 0302f10..6e2c8ee 100644 --- a/erna/scripts/submit_runlist.py +++ b/erna/scripts/submit_runlist.py @@ -10,7 +10,6 @@ RawDataFile, Jar, XML, - Queue, Job, ) @@ -24,8 +23,8 @@ help='Priority of the jobs, lower value means more important' ) @click.option( - '-q', '--queue', default='fact_short', - help='Name of the queue to use' + '-w', '--walltime', default=60, + help='Walltime for the jobs' ) @click.option('--config', '-c', help='Path to the yaml config file') def main(runlist, jar, xml, priority, queue, config): @@ -46,7 +45,6 @@ def main(runlist, jar, xml, priority, queue, config): jar = Jar.select(Jar.id, Jar.version).where(Jar.version == jar).get() xml = XML.get(name=xml, jar=jar) - queue = Queue.get(name=queue) runs = pd.read_csv(runlist) runs['year'] = runs['night'] // 10000 @@ -58,7 +56,7 @@ def main(runlist, jar, xml, priority, queue, config): for row in runs.itertuples() ] - insert_new_jobs(files, xml=xml, jar=jar, queue=queue) + insert_new_jobs(files, xml=xml, jar=jar, walltime=walltime) if __name__ == '__main__': diff --git a/screenrc_erna b/screenrc_erna index 8a61714..d139928 100644 --- a/screenrc_erna +++ b/screenrc_erna @@ -7,13 +7,10 @@ hardstatus alwayslastline "%-Lw%{= BW}%50>%n%f* %t%{-}%+Lw%<" -screen -t erna_port_forwarding -stuff "ssh -N -L 3306:localhost:3306 isdc-nx00^M" - screen -t erna_proxy -stuff "ssh isdc-nx00^M" +stuff "ssh login01^M" sleep 5 -stuff "/swdev_nfs/socat-2.0.0-b9/socat TCP-LISTEN:3306,fork TCP:fact-mysql.app.tu-dortmund.de:3306^M" +stuff "socat TCP-LISTEN:3306,fork TCP:fact-mysql.app.tu-dortmund.de:3306^M" screen -t erna_submitter -stuff "/swdev_nfs/anaconda3/bin/erna_automatic_processing -v^M" +stuff "erna_automatic_processing -v^M" diff --git a/setup.py b/setup.py index e7d1adc..6f019c3 100644 --- a/setup.py +++ b/setup.py @@ -34,12 +34,13 @@ 'click', 'drmaa', 'pyzmq', - 'peewee', + 'peewee==2.*', 'numexpr', 'pyyaml', 'pytest', # also in conda 'xmltodict', 'wrapt', + 'retrying', # 'gridmap>=0.13.1', install from https://github.com/mackaiver/gridmap' # 'fact_condition', install from https://github.com/fact-project/fact_conditions ], From 8937e54416babde66a467721509a41fea0d689a6 Mon Sep 17 00:00:00 2001 From: Maximilian Noethe Date: Fri, 26 Oct 2018 18:16:54 +0200 Subject: [PATCH 03/16] Fixes for the fact-tools executor --- erna/automatic_processing/__main__.py | 2 ++ erna/automatic_processing/executor.py | 6 +++--- erna/automatic_processing/job_submitter.py | 4 ++-- erna/automatic_processing/slurm.py | 2 +- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/erna/automatic_processing/__main__.py b/erna/automatic_processing/__main__.py index 4165645..1838bdc 100644 --- a/erna/automatic_processing/__main__.py +++ b/erna/automatic_processing/__main__.py @@ -59,6 +59,7 @@ def main(config, verbose): job_submitter.terminate() job_submitter.join() log.info('Clean up running jobs') + database.connect() queued = ProcessingState.get(description='queued') @@ -69,3 +70,4 @@ def main(config, verbose): sp.run(['scancel', '--jobname=erna_{}'.format(job.id)]) job.status = inserted job.save() + database.close() diff --git a/erna/automatic_processing/executor.py b/erna/automatic_processing/executor.py index 7fbd179..d8c238f 100644 --- a/erna/automatic_processing/executor.py +++ b/erna/automatic_processing/executor.py @@ -1,5 +1,5 @@ import time -start_time = time.perf_counter() +start_time = time.monotonic() import subprocess as sp import os @@ -28,7 +28,7 @@ def main(): port = os.environ['SUBMITTER_PORT'] socket.connect('tcp://{}:{}'.format(host, port)) - job_id = int(os.environ['JOB_NAME'].replace('erna_', '')) + job_id = int(os.environ['SLURM_JOB_NAME'].replace('erna_', '')) socket.send_pyobj({'job_id': job_id, 'status': 'running'}) socket.recv() @@ -78,7 +78,7 @@ def main(): sp.run([java, '-Xmx512m', '-version'], check=True) log.info('Calling fact-tools with call: {}'.format(call)) - timeout = walltime - (time.perf_counter() - start_time) - 300 + timeout = walltime - (time.monotonic() - start_time) - 300 log.info('Setting fact-tools timout to %.0f', timeout) sp.run(call, cwd=tmp_dir, check=True, timeout=timeout) except sp.CalledProcessError: diff --git a/erna/automatic_processing/job_submitter.py b/erna/automatic_processing/job_submitter.py index 6471df7..c3b2d3d 100644 --- a/erna/automatic_processing/job_submitter.py +++ b/erna/automatic_processing/job_submitter.py @@ -19,7 +19,7 @@ def __init__( raw_dir, aux_dir, erna_dir, - script, + script, host, port, group, @@ -98,7 +98,7 @@ def process_pending_jobs(self): try: submit_job( job, - script=self.script, + script=self.script, output_base_dir=os.path.join(self.erna_dir, 'fact-tools'), raw_dir=self.raw_dir, aux_dir=self.aux_dir, diff --git a/erna/automatic_processing/slurm.py b/erna/automatic_processing/slurm.py index 5db67c1..645d0e9 100644 --- a/erna/automatic_processing/slurm.py +++ b/erna/automatic_processing/slurm.py @@ -123,7 +123,7 @@ def submit_job( 'JARFILE': jar_file, 'XMLFILE': xml_file, 'OUTPUTDIR': output_dir, - 'WALLTIME': str(job.walltime), + 'WALLTIME': str(job.walltime * 60), 'SUBMITTER_HOST': submitter_host, 'SUBMITTER_PORT': str(submitter_port), 'facttools_infile': 'file:' + job.raw_data_file.get_path(basepath=raw_dir), From c2de916839856fd5cd4d6403c212fd7bf1e2c235 Mon Sep 17 00:00:00 2001 From: Maximilian Noethe Date: Fri, 26 Oct 2018 18:20:30 +0200 Subject: [PATCH 04/16] Update config template, add sh file --- erna.sh | 3 +++ erna_template.yaml | 30 ++++++++++++++++-------------- 2 files changed, 19 insertions(+), 14 deletions(-) create mode 100755 erna.sh diff --git a/erna.sh b/erna.sh new file mode 100755 index 0000000..4d7f4e8 --- /dev/null +++ b/erna.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +erna_automatic_processing_executor diff --git a/erna_template.yaml b/erna_template.yaml index 0172913..5f71f48 100644 --- a/erna_template.yaml +++ b/erna_template.yaml @@ -1,23 +1,25 @@ processing_database: - host: fact-mysql.app.tu-dortmund.de - user: - password: - database: erna + host: login01.astro.unige.ch + user: + password: + database: erna_slurm fact_database: database: factdata - host: 129.194.168.95 - user: - password: + host: lp-fact + password: + user: submitter: interval: 15 - data_directory: /gpfs1/scratch/fact/erna + raw_dir: /hpcstorage/lyard/fact/fact-archive/rev_1/raw + aux_dir: /hpcstorage/lyard/fact/fact-archive/rev_1/aux + erna_dir: /hpcstorage/fact_tools/erna + logfile: /hpcstorage/fact_tools/erna/erna_submitter.log max_queued_jobs: 200 - logfile: /gpfs1/scratch/fact/erna/erna_submitter.log - location: isdc - mail_address: - mail_settings: a - port: 12700 - host: isdc-in04 + mail_address: maximilian.noethe@tu-dortmund.de,neised@phys.ethz.ch,jens.buss@tu-dortmund.de + mail_settings: FAIL + port: 12800 + host: lesta02 group: fact + script: /home/projects/fact_tools/erna.sh From a098f5cf9a9f24773d1bfec2938bc3639a18b444 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20N=C3=B6the?= Date: Tue, 30 Oct 2018 10:52:02 +0100 Subject: [PATCH 05/16] Set default walltime to 180 --- erna/automatic_processing/database.py | 2 +- erna/automatic_processing/database_utils.py | 1 - erna/scripts/submit_runlist.py | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/erna/automatic_processing/database.py b/erna/automatic_processing/database.py index 9844697..1064495 100644 --- a/erna/automatic_processing/database.py +++ b/erna/automatic_processing/database.py @@ -164,7 +164,7 @@ class Job(Model): result_file = CharField(null=True) status = ForeignKeyField(ProcessingState, related_name='status') priority = IntegerField(default=5) - walltime = IntegerField(default=60) + walltime = IntegerField(default=180) xml = ForeignKeyField(XML) md5hash = FixedCharField(32, null=True) diff --git a/erna/automatic_processing/database_utils.py b/erna/automatic_processing/database_utils.py index 7543268..4ac376b 100644 --- a/erna/automatic_processing/database_utils.py +++ b/erna/automatic_processing/database_utils.py @@ -51,7 +51,6 @@ def fill_drs_runs(df, database): if len(df) == 0: return df = df.copy() - print(df.columns) df.drop(['fRunTypeKey', 'fRunTypeName'], axis=1, inplace=True) df.rename( columns={ diff --git a/erna/scripts/submit_runlist.py b/erna/scripts/submit_runlist.py index 6e2c8ee..c7a263e 100644 --- a/erna/scripts/submit_runlist.py +++ b/erna/scripts/submit_runlist.py @@ -23,7 +23,7 @@ help='Priority of the jobs, lower value means more important' ) @click.option( - '-w', '--walltime', default=60, + '-w', '--walltime', default=180, help='Walltime for the jobs' ) @click.option('--config', '-c', help='Path to the yaml config file') From c3b4f4614e6c69e31f28df002daa558af0d63c4e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20N=C3=B6the?= Date: Tue, 30 Oct 2018 10:52:33 +0100 Subject: [PATCH 06/16] Improve executor logging --- erna/automatic_processing/executor.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/erna/automatic_processing/executor.py b/erna/automatic_processing/executor.py index d8c238f..1daf485 100644 --- a/erna/automatic_processing/executor.py +++ b/erna/automatic_processing/executor.py @@ -17,8 +17,11 @@ socket = context.socket(zmq.REQ) log = logging.getLogger('erna') -log.setLevel(logging.DEBUG) -logging.getLogger().addHandler(logging.StreamHandler(sys.stdout)) +log.setLevel(logging.INFO) +handler = logging.StreamHandler(sys.stdout) +fmt = logging.Formatter(fmt='%(asctime)s [%(levelname)-8s] %(message)s') +handler.setFormatter(fmt) +logging.getLogger().addHandler(handler) def main(): @@ -88,7 +91,7 @@ def main(): sys.exit(1) except sp.TimeoutExpired: socket.send_pyobj({'job_id': job_id, 'status': 'walltime_exceeded'}) - log.exception('FACT Tools about to run into wall-time, terminating') + log.error('FACT Tools about to run into wall-time, terminating') socket.recv() sys.exit(1) From 284c678f1413102aabae19b99b1636bd5188154a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20N=C3=B6the?= Date: Tue, 30 Oct 2018 10:54:11 +0100 Subject: [PATCH 07/16] Save logs in same folder structure as output files --- erna/automatic_processing/job_submitter.py | 1 - erna/automatic_processing/slurm.py | 7 +++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/erna/automatic_processing/job_submitter.py b/erna/automatic_processing/job_submitter.py index c3b2d3d..a12d856 100644 --- a/erna/automatic_processing/job_submitter.py +++ b/erna/automatic_processing/job_submitter.py @@ -99,7 +99,6 @@ def process_pending_jobs(self): submit_job( job, script=self.script, - output_base_dir=os.path.join(self.erna_dir, 'fact-tools'), raw_dir=self.raw_dir, aux_dir=self.aux_dir, erna_dir=self.erna_dir, diff --git a/erna/automatic_processing/slurm.py b/erna/automatic_processing/slurm.py index 645d0e9..ec394ff 100644 --- a/erna/automatic_processing/slurm.py +++ b/erna/automatic_processing/slurm.py @@ -94,7 +94,6 @@ def submit_job( raw_dir, aux_dir, erna_dir, - output_base_dir, submitter_host, submitter_port, group, @@ -104,16 +103,16 @@ def submit_job( jar_file = save_jar(job.jar_id, erna_dir) xml_file = save_xml(job.xml_id, erna_dir) - output_dir = build_output_directory_name(job, output_base_dir) + output_dir = build_output_directory_name(job, os.path.join(erna_dir, 'fact-tools')) output_basename = build_output_base_name(job) - log_dir = os.path.join(erna_dir, 'logs') + log_dir = build_output_directory_name(job, os.path.join(erna_dir, 'logs')) os.makedirs(log_dir, exist_ok=True) cmd = build_sbatch_command( script, job_name='erna_{}'.format(job.id), - stdout=os.path.join(log_dir, 'erna_{:08d}.log'.format(job.id)), + stdout=os.path.join(log_dir, output_basename + '.log'), walltime=job.walltime, **kwargs, ) From 5350dfcc612798b1f7c182e002c706c78f079ced Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20N=C3=B6the?= Date: Tue, 30 Oct 2018 10:55:01 +0100 Subject: [PATCH 08/16] Improve error reporting in fill_database --- erna/scripts/fill_database.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/erna/scripts/fill_database.py b/erna/scripts/fill_database.py index 77d37d8..8296381 100644 --- a/erna/scripts/fill_database.py +++ b/erna/scripts/fill_database.py @@ -60,8 +60,8 @@ def main(start, end, config): # fill all non drs runs into raw_data_files data_runs = runs.query('fDrsStep != 2').drop('fDrsStep', axis=1) nan_entries = data_runs.isnull().any(axis=1) - if len(data_runs[nan_entries]) != 0: - print('Found invalid entries, skipping:') + if nan_entries.sum() > 0: + print('Found invalid data runs, skipping:') print(data_runs[nan_entries]) data_runs.dropna(inplace=True) @@ -70,8 +70,8 @@ def main(start, end, config): # fill all drs runs into drs_files drs_runs = runs.query('(fRunTypeKey == 2) & (fDrsStep == 2)') nan_entries = drs_runs.isnull().any(axis=1) - if len(drs_runs[nan_entries]) != 0: - print('Found invalid entries, skipping:') + if nan_entries.sum() > 0: + print('Found invalid drs runs, skipping:') print(drs_runs[nan_entries]) drs_runs.dropna(inplace=True) fill_drs_runs(drs_runs, database=database) From 56b06f684c0afe29a0e5068eb775bf2a1ece4d5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20N=C3=B6the?= Date: Tue, 30 Oct 2018 11:25:13 +0100 Subject: [PATCH 09/16] Improve executor logging --- erna/automatic_processing/executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/erna/automatic_processing/executor.py b/erna/automatic_processing/executor.py index 1daf485..4d2502b 100644 --- a/erna/automatic_processing/executor.py +++ b/erna/automatic_processing/executor.py @@ -80,7 +80,7 @@ def main(): sp.run(['free', '-m'], check=True) sp.run([java, '-Xmx512m', '-version'], check=True) - log.info('Calling fact-tools with call: {}'.format(call)) + log.info('Calling fact-tools with call: "{}"'.format(' '.join(call))) timeout = walltime - (time.monotonic() - start_time) - 300 log.info('Setting fact-tools timout to %.0f', timeout) sp.run(call, cwd=tmp_dir, check=True, timeout=timeout) From a1823536bd64d7ac3137b1192bcee528f861cfaf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20N=C3=B6the?= Date: Tue, 30 Oct 2018 16:42:51 +0100 Subject: [PATCH 10/16] Use lsuffix to be able to have overlapping columns --- erna/scripts/gather_fits.py | 1 + 1 file changed, 1 insertion(+) diff --git a/erna/scripts/gather_fits.py b/erna/scripts/gather_fits.py index 87df499..f53fb55 100644 --- a/erna/scripts/gather_fits.py +++ b/erna/scripts/gather_fits.py @@ -136,6 +136,7 @@ def main(xml_name, ft_version, outputfile, config, start, end, source, datacheck jobs.set_index(['night', 'run_id']), on=['night', 'run_id'], how='inner', + lsuffix='user_input_', ) successful_jobs = jobs.query('status == "success"') From 1bf34451693140a03698fa0f64d95fd6aedb85f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20N=C3=B6the?= Date: Mon, 5 Nov 2018 11:43:24 +0100 Subject: [PATCH 11/16] Set file log level to INFO for automatic processing --- erna/automatic_processing/__main__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/erna/automatic_processing/__main__.py b/erna/automatic_processing/__main__.py index 1838bdc..af93d36 100644 --- a/erna/automatic_processing/__main__.py +++ b/erna/automatic_processing/__main__.py @@ -29,8 +29,9 @@ def main(config, verbose): stream_handler = logging.StreamHandler() file_handler = logging.FileHandler(config['submitter'].pop('logfile')) + file_handler.setLevel(logging.INFO) formatter = logging.Formatter( - '%(asctime)s|%(levelname)s|%(name)s|%(message)s' + '%(asctime)s|%(levelname)s|%(message)s' ) for handler in (stream_handler, file_handler): From e77d5672a83bbbbc416dd54839a54c7376c79963 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20N=C3=B6the?= Date: Mon, 29 Oct 2018 18:02:30 +0100 Subject: [PATCH 12/16] Fix submit runlist --- erna/scripts/submit_runlist.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/erna/scripts/submit_runlist.py b/erna/scripts/submit_runlist.py index c7a263e..734b149 100644 --- a/erna/scripts/submit_runlist.py +++ b/erna/scripts/submit_runlist.py @@ -27,7 +27,7 @@ help='Walltime for the jobs' ) @click.option('--config', '-c', help='Path to the yaml config file') -def main(runlist, jar, xml, priority, queue, config): +def main(runlist, jar, xml, priority, walltime, config): ''' Submit automatic processing jobs for a given runlist From 88c1a8961e5fa4be49fd0d594fe2761353121563 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20N=C3=B6the?= Date: Mon, 5 Nov 2018 15:38:55 +0100 Subject: [PATCH 13/16] Add xml for ft 1.1.0 --- ..._data.xml => std_analysis_data_v1.0.0.xml} | 0 .../xmls/std_analysis_data_v1.1.0.xml | 50 +++++++++++++++++++ 2 files changed, 50 insertions(+) rename erna/automatic_processing/xmls/{std_analysis_data.xml => std_analysis_data_v1.0.0.xml} (100%) create mode 100644 erna/automatic_processing/xmls/std_analysis_data_v1.1.0.xml diff --git a/erna/automatic_processing/xmls/std_analysis_data.xml b/erna/automatic_processing/xmls/std_analysis_data_v1.0.0.xml similarity index 100% rename from erna/automatic_processing/xmls/std_analysis_data.xml rename to erna/automatic_processing/xmls/std_analysis_data_v1.0.0.xml diff --git a/erna/automatic_processing/xmls/std_analysis_data_v1.1.0.xml b/erna/automatic_processing/xmls/std_analysis_data_v1.1.0.xml new file mode 100644 index 0000000..b512d08 --- /dev/null +++ b/erna/automatic_processing/xmls/std_analysis_data_v1.1.0.xml @@ -0,0 +1,50 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + From 029aa88e5b9bcce73b8fc44b65aa7afe3f282296 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20N=C3=B6the?= Date: Wed, 14 Nov 2018 11:16:05 +0100 Subject: [PATCH 14/16] Add xml for ft 1.1.1 --- .../xmls/std_analysis_data_v1.1.1.xml | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 erna/automatic_processing/xmls/std_analysis_data_v1.1.1.xml diff --git a/erna/automatic_processing/xmls/std_analysis_data_v1.1.1.xml b/erna/automatic_processing/xmls/std_analysis_data_v1.1.1.xml new file mode 100644 index 0000000..5fa657a --- /dev/null +++ b/erna/automatic_processing/xmls/std_analysis_data_v1.1.1.xml @@ -0,0 +1,50 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + From f6f81579df90c9aaafbc0238b73fe1f2b1a3448a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20N=C3=B6the?= Date: Tue, 12 Feb 2019 14:18:23 +0100 Subject: [PATCH 15/16] Upgrade to peewee 3.8 --- erna/automatic_processing/custom_fields.py | 2 +- erna/automatic_processing/database.py | 18 +------------- erna/automatic_processing/database_utils.py | 26 ++++++++++----------- erna/automatic_processing/job_monitor.py | 4 ++-- erna/automatic_processing/job_submitter.py | 10 ++++---- erna/scripts/gather_fits.py | 5 +++- setup.py | 2 +- 7 files changed, 28 insertions(+), 39 deletions(-) diff --git a/erna/automatic_processing/custom_fields.py b/erna/automatic_processing/custom_fields.py index d65b56c..82aec4a 100644 --- a/erna/automatic_processing/custom_fields.py +++ b/erna/automatic_processing/custom_fields.py @@ -7,7 +7,7 @@ class NightField(Field): - db_field = 'night' + db_field = 'integer' def db_value(self, value): return date_to_night_int(value) diff --git a/erna/automatic_processing/database.py b/erna/automatic_processing/database.py index 1064495..6d92d2e 100644 --- a/erna/automatic_processing/database.py +++ b/erna/automatic_processing/database.py @@ -2,10 +2,8 @@ Model, CharField, IntegerField, BooleanField, ForeignKeyField, FixedCharField, TextField, MySQLDatabase ) -from playhouse.shortcuts import RetryOperationalError import os import logging -import wrapt from .utils import parse_path from .custom_fields import NightField, LongBlobField @@ -32,15 +30,7 @@ ] -class RetryMySQLDatabase(RetryOperationalError, MySQLDatabase): - ''' Automatically reconnect when connection went down''' - pass - - -database = RetryMySQLDatabase(None, fields={ - 'night': 'INTEGER', - 'longblob': 'LONGBLOB', -}) +database = MySQLDatabase(None) def setup_database(database, drop=False): @@ -177,9 +167,3 @@ class Meta: MODELS = [RawDataFile, DrsFile, Jar, XML, Job, ProcessingState] - - -@wrapt.decorator -def requires_database_connection(wrapped, instance, args, kwargs): - database.get_conn() - return wrapped(*args, **kwargs) diff --git a/erna/automatic_processing/database_utils.py b/erna/automatic_processing/database_utils.py index 4ac376b..4f5f45c 100644 --- a/erna/automatic_processing/database_utils.py +++ b/erna/automatic_processing/database_utils.py @@ -6,7 +6,7 @@ from .database import ( RawDataFile, DrsFile, Job, ProcessingState, Jar, XML, - requires_database_connection + database ) @@ -19,7 +19,7 @@ ] -@requires_database_connection +@database.connection_context() def fill_data_runs(df, database): if len(df) == 0: return @@ -46,7 +46,7 @@ def fill_data_runs(df, database): database.execute_sql(sql, params=params) -@requires_database_connection +@database.connection_context() def fill_drs_runs(df, database): if len(df) == 0: return @@ -73,7 +73,7 @@ def fill_drs_runs(df, database): database.execute_sql(sql, params=params) -@requires_database_connection +@database.connection_context() def get_pending_jobs(limit=None): runs = ( Job @@ -89,7 +89,7 @@ def get_pending_jobs(limit=None): return runs -@requires_database_connection +@database.connection_context() def find_drs_file(raw_data_file, closest=True): ''' Find a drs file for the give raw data file. @@ -122,7 +122,7 @@ def find_drs_file(raw_data_file, closest=True): return drs_file -@requires_database_connection +@database.connection_context() def insert_new_job( raw_data_file, jar, @@ -176,7 +176,7 @@ def insert_new_job( job.save() -@requires_database_connection +@database.connection_context() def insert_new_jobs(raw_data_files, jar, xml, walltime, progress=True, **kwargs): if isinstance(raw_data_files, list): @@ -198,7 +198,7 @@ def insert_new_jobs(raw_data_files, jar, xml, walltime, progress=True, **kwargs) return failed_files -@requires_database_connection +@database.connection_context() def count_jobs(state=None): query = Job.select() @@ -209,7 +209,7 @@ def count_jobs(state=None): return query.count() -@requires_database_connection +@database.connection_context() def save_xml(xml_id, data_dir): if not os.path.exists(data_dir): os.makedirs(data_dir) @@ -232,7 +232,7 @@ def save_xml(xml_id, data_dir): return xml_file -@requires_database_connection +@database.connection_context() def save_jar(jar_id, data_dir): if not os.path.exists(data_dir): os.makedirs(data_dir) @@ -254,7 +254,7 @@ def save_jar(jar_id, data_dir): return jar_file -@requires_database_connection +@database.connection_context() def build_output_directory_name(job, output_base_dir): version = Jar.select(Jar.version).where(Jar.id == job.jar_id).get().version return os.path.join( @@ -267,7 +267,7 @@ def build_output_directory_name(job, output_base_dir): ) -@requires_database_connection +@database.connection_context() def build_output_base_name(job): version = Jar.select(Jar.version).where(Jar.id == job.jar_id).get().version return '{night:%Y%m%d}_{run_id:03d}_{version}_{name}'.format( @@ -278,7 +278,7 @@ def build_output_base_name(job): ) -@requires_database_connection +@database.connection_context() def resubmit_walltime_exceeded(factor=1.5): ''' Resubmit jobs where walltime was exceeded. diff --git a/erna/automatic_processing/job_monitor.py b/erna/automatic_processing/job_monitor.py index b25989d..1945440 100644 --- a/erna/automatic_processing/job_monitor.py +++ b/erna/automatic_processing/job_monitor.py @@ -4,7 +4,7 @@ from retrying import retry import peewee -from .database import Job, ProcessingState, requires_database_connection +from .database import Job, ProcessingState, database log = logging.getLogger(__name__) @@ -42,7 +42,7 @@ def run(self): self.update_job(status_update) @retry(retry_on_exception=is_operational_error) - @requires_database_connection + @database.connection_context() def update_job(self, status_update): job = Job.get(id=status_update['job_id']) status = status_update['status'] diff --git a/erna/automatic_processing/job_submitter.py b/erna/automatic_processing/job_submitter.py index a12d856..6ce3d1c 100644 --- a/erna/automatic_processing/job_submitter.py +++ b/erna/automatic_processing/job_submitter.py @@ -1,9 +1,8 @@ from threading import Thread, Event import logging import peewee -import os -from .database import ProcessingState, requires_database_connection +from .database import ProcessingState, database from .database_utils import count_jobs, get_pending_jobs from .slurm import submit_job, get_current_jobs @@ -24,7 +23,8 @@ def __init__( port, group, mail_address=None, - mail_settings='a', + mail_settings='FAIL', + queue=None, ): ''' Parametrs @@ -60,6 +60,7 @@ def __init__( self.mail_settings = mail_settings self.mail_address = mail_address self.script = script + self.queue = queue def run(self): while not self.event.is_set(): @@ -74,7 +75,7 @@ def run(self): def terminate(self): self.event.set() - @requires_database_connection + @database.connection_context() def process_pending_jobs(self): ''' Fetches pending runs from the processing database @@ -107,6 +108,7 @@ def process_pending_jobs(self): submitter_host=self.host, submitter_port=self.port, group=self.group, + queue=self.queue, ) log.info('New job with id {} queued'.format(job.id)) except: diff --git a/erna/scripts/gather_fits.py b/erna/scripts/gather_fits.py index f53fb55..22f0616 100644 --- a/erna/scripts/gather_fits.py +++ b/erna/scripts/gather_fits.py @@ -182,6 +182,9 @@ def main(xml_name, ft_version, outputfile, config, start, end, source, datacheck to_h5py(successful_jobs[columns], outputfile, key='runs', mode='w') with h5py.File(outputfile, 'a') as f: - f['runs'].attrs['datacheck'] = ' AND '.join(conditions) + if runlist is not None: + f['runs'].attrs['datacheck'] = 'RUNLIST' + else: + f['runs'].attrs['datacheck'] = ' AND '.join(conditions) write_fits_to_hdf5(outputfile, successful_jobs.result_file, mode='a') diff --git a/setup.py b/setup.py index 6f019c3..897f473 100644 --- a/setup.py +++ b/setup.py @@ -34,7 +34,7 @@ 'click', 'drmaa', 'pyzmq', - 'peewee==2.*', + 'peewee~=3.0', 'numexpr', 'pyyaml', 'pytest', # also in conda From 05d7febaf770211c1ee60c1f9937c356afbc4f79 Mon Sep 17 00:00:00 2001 From: Jens Buss Date: Tue, 12 Feb 2019 14:22:38 +0100 Subject: [PATCH 16/16] Update erna/automatic_processing/job_submitter.py Co-Authored-By: MaxNoe --- erna/automatic_processing/job_submitter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/erna/automatic_processing/job_submitter.py b/erna/automatic_processing/job_submitter.py index 6ce3d1c..8096f64 100644 --- a/erna/automatic_processing/job_submitter.py +++ b/erna/automatic_processing/job_submitter.py @@ -36,7 +36,7 @@ def __init__( No new jobs are submitted if the number of jobs in the queue is higher than this value erna_directory: str - patch to the basic structure for erna. Logfiles, jars, xmls and + path to the basic structure for erna. Logfiles, jars, xmls and analysis output are stored in subdirectories to this directory. host: str hostname of the submitter node