diff --git a/circus/arbiter.py b/circus/arbiter.py index f76dff7a3..d507e6628 100644 --- a/circus/arbiter.py +++ b/circus/arbiter.py @@ -242,7 +242,7 @@ def running(self): def _init_context(self, context): self.context = context or zmq.Context.instance() if self.loop is None: - self.loop = ioloop.IOLoop.instance() + self.loop = ioloop.IOLoop.current() self.ctrl = Controller(self.endpoint, self.multicast_endpoint, self.context, self.loop, self, self.check_delay, self.endpoint_owner) diff --git a/circus/client.py b/circus/client.py index ff2a0d098..75ae0cf9e 100644 --- a/circus/client.py +++ b/circus/client.py @@ -36,7 +36,7 @@ def __init__(self, context=None, endpoint=DEFAULT_ENDPOINT_DEALER, get_connection(self.socket, endpoint, ssh_server, ssh_keyfile) self._timeout = timeout self.timeout = timeout * 1000 - self.stream = ZMQStream(self.socket, tornado.ioloop.IOLoop.instance()) + self.stream = ZMQStream(self.socket, tornado.ioloop.IOLoop.current()) def _init_context(self, context): self.context = context or zmq.Context.instance() diff --git a/circus/controller.py b/circus/controller.py index ad7b5e7b9..c76d62f81 100644 --- a/circus/controller.py +++ b/circus/controller.py @@ -109,7 +109,7 @@ def start(self): # so with no period callback to manage_watchers # is probably "unit tests only" self.caller = ioloop.PeriodicCallback(self.manage_watchers, - self.check_delay, self.loop) + self.check_delay) self.caller.start() self.started = True diff --git a/circus/green/arbiter.py b/circus/green/arbiter.py index 966cde45e..2bb477de4 100644 --- a/circus/green/arbiter.py +++ b/circus/green/arbiter.py @@ -8,6 +8,6 @@ class Arbiter(_Arbiter): def _init_context(self, context): self.context = context or Context.instance() - self.loop = ioloop.IOLoop.instance() + self.loop = ioloop.IOLoop.current() self.ctrl = Controller(self.endpoint, self.multicast_endpoint, self.context, self.loop, self, self.check_delay) diff --git a/circus/green/controller.py b/circus/green/controller.py index c5599269d..73a9c021f 100644 --- a/circus/green/controller.py +++ b/circus/green/controller.py @@ -14,7 +14,8 @@ def _init_stream(self): self.stream.on_recv(self.handle_message) def start(self): + self.loop.make_current() self.initialize() self.caller = ioloop.PeriodicCallback(self.arbiter.manage_watchers, - self.check_delay, self.loop) + self.check_delay) self.caller.start() diff --git a/circus/plugins/__init__.py b/circus/plugins/__init__.py index e5c7d2467..4c0fa98f7 100644 --- a/circus/plugins/__init__.py +++ b/circus/plugins/__init__.py @@ -58,6 +58,7 @@ def initialize(self): @debuglog def start(self): + self.loop.make_current() if not self.active: raise ValueError('Will not start an inactive plugin') self.handle_init() diff --git a/circus/plugins/command_reloader.py b/circus/plugins/command_reloader.py index 7057668bd..2522ba932 100644 --- a/circus/plugins/command_reloader.py +++ b/circus/plugins/command_reloader.py @@ -48,8 +48,7 @@ def look_after(self): def handle_init(self): self.period = ioloop.PeriodicCallback(self.look_after, - self.loop_rate * 1000, - self.loop) + self.loop_rate * 1000) self.period.start() def handle_stop(self): diff --git a/circus/plugins/statsd.py b/circus/plugins/statsd.py index 777eeef21..11cd5eb1f 100644 --- a/circus/plugins/statsd.py +++ b/circus/plugins/statsd.py @@ -81,7 +81,7 @@ def __init__(self, *args, **config): def handle_init(self): self.period = ioloop.PeriodicCallback(self.look_after, - self.loop_rate * 1000, self.loop) + self.loop_rate * 1000) self.period.start() def handle_stop(self): diff --git a/circus/plugins/watchdog.py b/circus/plugins/watchdog.py index ad30c030f..3f9701b1f 100644 --- a/circus/plugins/watchdog.py +++ b/circus/plugins/watchdog.py @@ -79,8 +79,7 @@ def handle_init(self): - create the listening UDP socket """ self.period = ioloop.PeriodicCallback(self.look_after, - self.loop_rate * 1000, - self.loop) + self.loop_rate * 1000) self.period.start() self._bind_socket() diff --git a/circus/stats/collector.py b/circus/stats/collector.py index 4784c9e83..af2165214 100644 --- a/circus/stats/collector.py +++ b/circus/stats/collector.py @@ -12,7 +12,7 @@ class BaseStatsCollector(ioloop.PeriodicCallback): def __init__(self, streamer, name, callback_time=1., io_loop=None): ioloop.PeriodicCallback.__init__(self, self._callback, - callback_time * 1000, io_loop) + callback_time * 1000) self.streamer = streamer self.name = name @@ -110,8 +110,7 @@ def __init__(self, streamer, name, callback_time=1., io_loop=None): callback_time, io_loop) self._rstats = defaultdict(int) self.sockets = [sock for sock, address, fd in self.streamer.sockets] - self._p = ioloop.PeriodicCallback(self._select, _LOOP_RES, - io_loop=io_loop) + self._p = ioloop.PeriodicCallback(self._select, _LOOP_RES) def start(self): self._p.start() diff --git a/circus/stats/streamer.py b/circus/stats/streamer.py index 569a6c7ed..e5f96a945 100644 --- a/circus/stats/streamer.py +++ b/circus/stats/streamer.py @@ -27,7 +27,7 @@ def __init__(self, endpoint, pubsub_endoint, stats_endpoint, self.sub_socket = self.ctx.socket(zmq.SUB) self.sub_socket.setsockopt(zmq.SUBSCRIBE, self.topic) self.sub_socket.connect(self.pubsub_endpoint) - self.loop = loop or ioloop.IOLoop.instance() + self.loop = loop or ioloop.IOLoop.current() self.substream = zmqstream.ZMQStream(self.sub_socket, self.loop) self.substream.on_recv(self.handle_recv) self.client = CircusClient(context=self.ctx, endpoint=endpoint, diff --git a/circus/stream/redirector.py b/circus/stream/redirector.py index 39ef99b41..312153410 100644 --- a/circus/stream/redirector.py +++ b/circus/stream/redirector.py @@ -42,7 +42,7 @@ def __init__(self, stdout_redirect, stderr_redirect, buffer=1024, self._active = {} self.redirect = {'stdout': stdout_redirect, 'stderr': stderr_redirect} self.buffer = buffer - self.loop = loop or ioloop.IOLoop.instance() + self.loop = loop or ioloop.IOLoop.current() def _start_one(self, fd, stream_name, process, pipe): if fd not in self._active: diff --git a/circus/tests/support.py b/circus/tests/support.py index 42688e829..999632a6f 100644 --- a/circus/tests/support.py +++ b/circus/tests/support.py @@ -51,7 +51,7 @@ def __init__(self, name): def get_ioloop(): from tornado import ioloop - return ioloop.IOLoop.instance() + return ioloop.IOLoop.current() def get_available_port(): diff --git a/circus/tests/test_reloadconfig.py b/circus/tests/test_reloadconfig.py index 4a57feaca..9912e8d3b 100644 --- a/circus/tests/test_reloadconfig.py +++ b/circus/tests/test_reloadconfig.py @@ -55,10 +55,10 @@ def _tear_down_arbiter(self, a): a.sockets.close_all() def get_new_ioloop(self): - return tornado.ioloop.IOLoop.instance() + return tornado.ioloop.IOLoop.current() def _load_base_arbiter(self, name='reload_base'): - loop = tornado.ioloop.IOLoop.instance() + loop = tornado.ioloop.IOLoop.current() a = Arbiter.load_from_config(_CONF[name], loop=loop) a.evpub_socket = FakeSocket() # initialize watchers diff --git a/circus/tests/test_stats_collector.py b/circus/tests/test_stats_collector.py index ece51f705..eca60eeb7 100644 --- a/circus/tests/test_stats_collector.py +++ b/circus/tests/test_stats_collector.py @@ -72,6 +72,7 @@ def __init__(this, streamer): this.daemon = True def run(self): + self.loop.make_current() collector = collector_class( self.streamer, 'sockets', callback_time=0.1, io_loop=self.loop) diff --git a/circus/tests/test_watcher.py b/circus/tests/test_watcher.py index d0a771629..52b4e749a 100644 --- a/circus/tests/test_watcher.py +++ b/circus/tests/test_watcher.py @@ -317,7 +317,7 @@ def __init__(self, stream=False, loop=None, **kw): self.watcher = None self.kw = kw if loop is None: - self.loop = tornado.ioloop.IOLoop.instance() + self.loop = tornado.ioloop.IOLoop.current() else: self.loop = loop diff --git a/circus/watcher.py b/circus/watcher.py index 3a843cac2..38d03b07b 100644 --- a/circus/watcher.py +++ b/circus/watcher.py @@ -242,7 +242,7 @@ def __init__(self, name, cmd, args=None, numprocesses=1, warmup_delay=0., self.close_child_stdout = close_child_stdout self.close_child_stderr = close_child_stderr self.use_papa = use_papa and papa is not None - self.loop = loop or ioloop.IOLoop.instance() + self.loop = loop or ioloop.IOLoop.current() if singleton and self.numprocesses not in (0, 1): raise ValueError("Cannot have %d processes with a singleton "