-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrunner.py
60 lines (50 loc) · 1.44 KB
/
runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import os
from threading import Thread
from subprocess import Popen, PIPE
from select import select
from multiprocessing import Queue
class job:
def __init__(self, p, q):
self.p = p
self.q = q
def p(self):
return p
def q(self):
return q
class Runner:
"""
This class provides the container interface for Docker.
"""
def __init__(self):
"""
Inputs: config dictionary, Job ID, and optional logger
"""
self.procs = []
self.threads = []
def _readio(self, p, q):
cont = True
last = False
while cont:
rlist = [p.stdout, p.stderr]
x = select(rlist, [], [], 1)[0]
for f in x:
if f == p.stderr:
error = True
else:
error = False
line = f.readline().decode('utf-8')
if len(line) > 0:
q.put({'msg': 'output', 'line': line, 'error': error})
if last:
cont = False
if p.poll() is not None:
last = True
q.put({'msg': 'finished', 'exit': p.returncode})
def run(self, cmd):
q = Queue()
proc = Popen(cmd, bufsize=0, stdout=PIPE, stderr=PIPE)
out = Thread(target=self._readio, args=[proc, q])
self.threads.append(out)
out.start()
self.procs.append(proc)
return job(proc, q)