Skip to content

Commit

Permalink
work in progress about circus-tent#987
Browse files Browse the repository at this point in the history
  • Loading branch information
thefab committed Jun 15, 2016
1 parent 41e0495 commit 6211774
Showing 1 changed file with 40 additions and 20 deletions.
60 changes: 40 additions & 20 deletions circus/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from psutil import NoSuchProcess, TimeoutExpired
import zmq.utils.jsonapi as json
from zmq.eventloop import ioloop
from functools import partial

from circus.process import Process, DEAD_OR_ZOMBIE, UNEXISTING
from circus.papa_process_proxy import PapaProcessProxy
Expand Down Expand Up @@ -284,6 +285,7 @@ def __init__(self, name, cmd, args=None, numprocesses=1, warmup_delay=0.,

self.working_dir = working_dir
self.processes = {}
self.async_killing_futures = {}
self.shell = shell
self.shell_args = shell_args
self.uid = uid
Expand Down Expand Up @@ -527,8 +529,9 @@ def reap_processes(self):
for pid in list(self.processes.keys()):
self.reap_process(pid)

def _async_kill_cb(self, future):
pass
def _async_kill_cb(self, pid, future):
logger.debug("_async_kill_cb called")
self.async_killing_futures.pop(pid)

@gen.coroutine
@util.debuglog
Expand All @@ -549,8 +552,10 @@ def manage_processes(self):
if len(self.processes) < self.numprocesses and not self.is_stopping():
if self.respawn:
yield self.spawn_processes()
elif not len(self.processes) and not self.on_demand:
yield self._stop()
elif not len(self.processes) and \
not self.on_demand and \
len(self.async_killing_futures) == 0:
yield self._stop(True)

# removing extra processes
if len(self.processes) > self.numprocesses:
Expand All @@ -564,8 +569,12 @@ def manage_processes(self):
processes_to_kill.append(process)
if self.async_kill:
for process in processes_to_kill:
self.loop.add_future(self.kill_process(process),
self._async_kill_cb)
self.processes.pop(process.pid)
future = self.kill_process(process)
self.async_killing_futures[process.pid] = future
self.loop.add_future(future,
partial(self._async_kill_cb,
process.pid))
else:
removes = yield [self.kill_process(process)
for process in processes_to_kill]
Expand All @@ -582,8 +591,11 @@ def remove_expired_processes(self):
if self.async_kill:
for process in expired_processes:
self.processes.pop(process.pid)
self.loop.add_future(self.kill_process(process),
self._async_kill_cb)
future = self.kill_process(process)
self.async_killing_futures[process.pid] = future
self.loop.add_future(future,
partial(self._async_kill_cb,
process.pid))
else:
removes = yield [self.kill_process(x) for x in expired_processes]
for i, process in enumerate(expired_processes):
Expand Down Expand Up @@ -617,7 +629,8 @@ def spawn_processes(self):
for i in range(self.numprocesses - len(self.processes)):
res = self.spawn_process()
if res is False:
yield self._stop()
if len(self.async_killing_futures) == 0:
yield self._stop(True)
break
delay = self.warmup_delay
if isinstance(res, float):
Expand Down Expand Up @@ -746,7 +759,7 @@ def kill_process(self, process, stop_signal=None, graceful_timeout=None):
if self.stop_children:
self.send_signal_process(process, stop_signal)
else:
self.send_signal(process.pid, stop_signal)
self._send_signal(process, stop_signal)
self.notify_event("kill", {"process_pid": process.pid,
"time": time.time()})
except NoSuchProcess:
Expand Down Expand Up @@ -788,18 +801,22 @@ def kill_processes(self, stop_signal=None, graceful_timeout=None):
raise

@util.debuglog
def send_signal(self, pid, signum):
def _send_signal(self, process, signum):
is_sigkill = hasattr(signal, 'SIGKILL') and signum == signal.SIGKILL
pid = process.pid
hook_result = self.call_hook("before_signal",
pid=pid, signum=signum)
if not is_sigkill and not hook_result:
logger.debug("before_signal hook didn't return True "
"=> signal %i is not sent to %i" % (signum, pid))
else:
process.send_signal(signum)
self.call_hook("after_signal", pid=pid, signum=signum)

@util.debuglog
def send_signal(self, pid, signum):
if pid in self.processes:
process = self.processes[pid]
hook_result = self.call_hook("before_signal",
pid=pid, signum=signum)
if not is_sigkill and not hook_result:
logger.debug("before_signal hook didn't return True "
"=> signal %i is not sent to %i" % (signum, pid))
else:
process.send_signal(signum)
self.call_hook("after_signal", pid=pid, signum=signum)
self._send_signal(self.processes[pid], signum)
else:
logger.debug('process %s does not exist' % pid)

Expand Down Expand Up @@ -866,6 +883,9 @@ def _stop(self, close_output_streams=False, for_shutdown=False):
# We ignore the hook result
self.call_hook('before_stop')
yield self.kill_processes()
if len(self.async_killing_futures) > 0:
yield self.async_killing_futures.values()
self.async_killing_futures = {}
self.reap_processes()

# stop redirectors
Expand Down

0 comments on commit 6211774

Please sign in to comment.