-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlibpbspyworker.py
129 lines (93 loc) · 2.76 KB
/
libpbspyworker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#!/usr/bin/env python
"""
python utilities for TORQUE
libpbspyworker provides a high level front-end to pbs_python.
This module provides wrappers that simplify submission and collection of jobs.
"""
import os
import re
import sys
import uuid
import inspect
err_message = """
The 'libpbspyworker' runner depends on 'pbs_python' which is not installed
or not configured properly. please follow the instructions found at:
https://oss.trac.surfsara.nl/pbs_python
Additional errors may follow:
%s
"""
HAVE_PBS_PYTHON = False
try:
import pbs
HAVE_PBS_PYTHON = True
except Exception, ECP:
raise Exception( err_message % str( ECP ) )
class Job(object):
"""
Job setup
"""
def __init__( self, fn, args, kwlist=dict(), param=None, cleanup=True ):
"""
Job construction
"""
self.fn = fn.__name__
self.args = args
self.kwlist = kwlist
self.cleanup = cleanup
self.environment = None
self.working_dir = None
if param != None:
self.__set_parameters(param)
self.ret = None
self.outdir = None
self.name = 'pj_' + str(uuid.uuid1())
self.jobid = ""
def __set_function(self, fn):
"""
setter for function that carefully take care of namespace, avoiding __main__ as a module
"""
module = inspect.getmodule(fn)
if module.__name__ != "__main__":
self.fn = fn
def __set_parameters(self, param):
"""
function to set the parameters from a dict
"""
assert param!=None
for key, val in param.items():
setattr(self, key, val)
return self
def execute(self):
"""
execute the function fn with given arguments
return value to ret variable
"""
try:
self.ret = apply(self.fn, self.args, self.kwlist)
except Exception, e:
print "Exception encountered"
print e
class PBSJob(Job):
"""
Job runner
"""
def __init__( self, fn, args, kwlist=dict(), param=None, cleanup=True ):
"""
Default settings for Job starting
"""
Job.__int__( self, fn, args, kwlist, param=param, cleanup=cleanup )
self.default_pbs_server = None
self.mem = ""
self.nodes = ""
self.ppn = ""
self.walltime = ""
#self.host_name = ""
self.working_dir = os.getcwd()
#self.num_resubmits = 0
#self.pbs_queue_name = None
def process_jobs(jobs, local=False, maxNumThreads=1):
"""
function to decide whether a job to run on the cluster or locally
"""
if not local and HAVE_PBS_PYTHON:
local = False