Skip to content

Commit

Permalink
running event loop in corresponding thread
Browse files Browse the repository at this point in the history
event loop is no longer passed as a parameter e.g. in `ioloop.PeriodicCallback`.

 - before PeriodicCallback start specifies the event loop by make_current().
 - IOLoop.instance() to IOLoop.current()
  • Loading branch information
unkcpz committed Jun 13, 2020
1 parent 2b9cd7f commit bc19b02
Show file tree
Hide file tree
Showing 17 changed files with 20 additions and 20 deletions.
2 changes: 1 addition & 1 deletion circus/arbiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ def running(self):
def _init_context(self, context):
self.context = context or zmq.Context.instance()
if self.loop is None:
self.loop = ioloop.IOLoop.instance()
self.loop = ioloop.IOLoop.current()
self.ctrl = Controller(self.endpoint, self.multicast_endpoint,
self.context, self.loop, self, self.check_delay,
self.endpoint_owner)
Expand Down
2 changes: 1 addition & 1 deletion circus/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(self, context=None, endpoint=DEFAULT_ENDPOINT_DEALER,
get_connection(self.socket, endpoint, ssh_server, ssh_keyfile)
self._timeout = timeout
self.timeout = timeout * 1000
self.stream = ZMQStream(self.socket, tornado.ioloop.IOLoop.instance())
self.stream = ZMQStream(self.socket, tornado.ioloop.IOLoop.current())

def _init_context(self, context):
self.context = context or zmq.Context.instance()
Expand Down
2 changes: 1 addition & 1 deletion circus/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def start(self):
# so with no period callback to manage_watchers
# is probably "unit tests only"
self.caller = ioloop.PeriodicCallback(self.manage_watchers,
self.check_delay, self.loop)
self.check_delay)
self.caller.start()
self.started = True

Expand Down
2 changes: 1 addition & 1 deletion circus/green/arbiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@
class Arbiter(_Arbiter):
def _init_context(self, context):
self.context = context or Context.instance()
self.loop = ioloop.IOLoop.instance()
self.loop = ioloop.IOLoop.current()
self.ctrl = Controller(self.endpoint, self.multicast_endpoint,
self.context, self.loop, self, self.check_delay)
3 changes: 2 additions & 1 deletion circus/green/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ def _init_stream(self):
self.stream.on_recv(self.handle_message)

def start(self):
self.loop.make_current()
self.initialize()
self.caller = ioloop.PeriodicCallback(self.arbiter.manage_watchers,
self.check_delay, self.loop)
self.check_delay)
self.caller.start()
1 change: 1 addition & 0 deletions circus/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def initialize(self):

@debuglog
def start(self):
self.loop.make_current()
if not self.active:
raise ValueError('Will not start an inactive plugin')
self.handle_init()
Expand Down
3 changes: 1 addition & 2 deletions circus/plugins/command_reloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ def look_after(self):

def handle_init(self):
self.period = ioloop.PeriodicCallback(self.look_after,
self.loop_rate * 1000,
self.loop)
self.loop_rate * 1000)
self.period.start()

def handle_stop(self):
Expand Down
2 changes: 1 addition & 1 deletion circus/plugins/statsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def __init__(self, *args, **config):

def handle_init(self):
self.period = ioloop.PeriodicCallback(self.look_after,
self.loop_rate * 1000, self.loop)
self.loop_rate * 1000)
self.period.start()

def handle_stop(self):
Expand Down
3 changes: 1 addition & 2 deletions circus/plugins/watchdog.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ def handle_init(self):
- create the listening UDP socket
"""
self.period = ioloop.PeriodicCallback(self.look_after,
self.loop_rate * 1000,
self.loop)
self.loop_rate * 1000)
self.period.start()
self._bind_socket()

Expand Down
5 changes: 2 additions & 3 deletions circus/stats/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class BaseStatsCollector(ioloop.PeriodicCallback):

def __init__(self, streamer, name, callback_time=1., io_loop=None):
ioloop.PeriodicCallback.__init__(self, self._callback,
callback_time * 1000, io_loop)
callback_time * 1000)
self.streamer = streamer
self.name = name

Expand Down Expand Up @@ -110,8 +110,7 @@ def __init__(self, streamer, name, callback_time=1., io_loop=None):
callback_time, io_loop)
self._rstats = defaultdict(int)
self.sockets = [sock for sock, address, fd in self.streamer.sockets]
self._p = ioloop.PeriodicCallback(self._select, _LOOP_RES,
io_loop=io_loop)
self._p = ioloop.PeriodicCallback(self._select, _LOOP_RES)

def start(self):
self._p.start()
Expand Down
2 changes: 1 addition & 1 deletion circus/stats/streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(self, endpoint, pubsub_endoint, stats_endpoint,
self.sub_socket = self.ctx.socket(zmq.SUB)
self.sub_socket.setsockopt(zmq.SUBSCRIBE, self.topic)
self.sub_socket.connect(self.pubsub_endpoint)
self.loop = loop or ioloop.IOLoop.instance()
self.loop = loop or ioloop.IOLoop.current()
self.substream = zmqstream.ZMQStream(self.sub_socket, self.loop)
self.substream.on_recv(self.handle_recv)
self.client = CircusClient(context=self.ctx, endpoint=endpoint,
Expand Down
2 changes: 1 addition & 1 deletion circus/stream/redirector.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def __init__(self, stdout_redirect, stderr_redirect, buffer=1024,
self._active = {}
self.redirect = {'stdout': stdout_redirect, 'stderr': stderr_redirect}
self.buffer = buffer
self.loop = loop or ioloop.IOLoop.instance()
self.loop = loop or ioloop.IOLoop.current()

def _start_one(self, fd, stream_name, process, pipe):
if fd not in self._active:
Expand Down
2 changes: 1 addition & 1 deletion circus/tests/support.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def __init__(self, name):

def get_ioloop():
from tornado import ioloop
return ioloop.IOLoop.instance()
return ioloop.IOLoop.current()


def get_available_port():
Expand Down
4 changes: 2 additions & 2 deletions circus/tests/test_reloadconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ def _tear_down_arbiter(self, a):
a.sockets.close_all()

def get_new_ioloop(self):
return tornado.ioloop.IOLoop.instance()
return tornado.ioloop.IOLoop.current()

def _load_base_arbiter(self, name='reload_base'):
loop = tornado.ioloop.IOLoop.instance()
loop = tornado.ioloop.IOLoop.current()
a = Arbiter.load_from_config(_CONF[name], loop=loop)
a.evpub_socket = FakeSocket()
# initialize watchers
Expand Down
1 change: 1 addition & 0 deletions circus/tests/test_stats_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def __init__(this, streamer):
this.daemon = True

def run(self):
self.loop.make_current()
collector = collector_class(
self.streamer, 'sockets', callback_time=0.1,
io_loop=self.loop)
Expand Down
2 changes: 1 addition & 1 deletion circus/tests/test_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ def __init__(self, stream=False, loop=None, **kw):
self.watcher = None
self.kw = kw
if loop is None:
self.loop = tornado.ioloop.IOLoop.instance()
self.loop = tornado.ioloop.IOLoop.current()
else:
self.loop = loop

Expand Down
2 changes: 1 addition & 1 deletion circus/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ def __init__(self, name, cmd, args=None, numprocesses=1, warmup_delay=0.,
self.close_child_stdout = close_child_stdout
self.close_child_stderr = close_child_stderr
self.use_papa = use_papa and papa is not None
self.loop = loop or ioloop.IOLoop.instance()
self.loop = loop or ioloop.IOLoop.current()

if singleton and self.numprocesses not in (0, 1):
raise ValueError("Cannot have %d processes with a singleton "
Expand Down

0 comments on commit bc19b02

Please sign in to comment.