diff --git a/circus/arbiter.py b/circus/arbiter.py index f76dff7a3..d507e6628 100644 --- a/circus/arbiter.py +++ b/circus/arbiter.py @@ -242,7 +242,7 @@ def running(self): def _init_context(self, context): self.context = context or zmq.Context.instance() if self.loop is None: - self.loop = ioloop.IOLoop.instance() + self.loop = ioloop.IOLoop.current() self.ctrl = Controller(self.endpoint, self.multicast_endpoint, self.context, self.loop, self, self.check_delay, self.endpoint_owner) diff --git a/circus/client.py b/circus/client.py index e95d39243..490742eb7 100644 --- a/circus/client.py +++ b/circus/client.py @@ -5,6 +5,7 @@ import zmq.utils.jsonapi as json from zmq.eventloop.zmqstream import ZMQStream import tornado +from tornado import concurrent from circus.exc import CallError from circus.util import DEFAULT_ENDPOINT_DEALER, get_connection, to_bytes @@ -35,7 +36,7 @@ def __init__(self, context=None, endpoint=DEFAULT_ENDPOINT_DEALER, get_connection(self.socket, endpoint, ssh_server, ssh_keyfile) self._timeout = timeout self.timeout = timeout * 1000 - self.stream = ZMQStream(self.socket, tornado.ioloop.IOLoop.instance()) + self.stream = ZMQStream(self.socket, tornado.ioloop.IOLoop.current()) def _init_context(self, context): self.context = context or zmq.Context.instance() @@ -65,12 +66,20 @@ def call(self, cmd): raise CallError(str(e)) try: - yield tornado.gen.Task(self.stream.send, cmd) + future = concurrent.Future() + + def cb(msg, status): + future.set_result(msg) + self.stream.send(cmd, callback=cb) + yield future except zmq.ZMQError as e: raise CallError(str(e)) while True: - messages = yield tornado.gen.Task(self.stream.on_recv) + future = concurrent.Future() + self.stream.on_recv(future.set_result) + messages = yield future + for message in messages: try: res = json.loads(message) diff --git a/circus/controller.py b/circus/controller.py index ad7b5e7b9..c76d62f81 100644 --- a/circus/controller.py +++ b/circus/controller.py @@ -109,7 +109,7 @@ def start(self): # so with no period callback to manage_watchers # is probably "unit tests only" self.caller = ioloop.PeriodicCallback(self.manage_watchers, - self.check_delay, self.loop) + self.check_delay) self.caller.start() self.started = True diff --git a/circus/green/arbiter.py b/circus/green/arbiter.py index 966cde45e..2bb477de4 100644 --- a/circus/green/arbiter.py +++ b/circus/green/arbiter.py @@ -8,6 +8,6 @@ class Arbiter(_Arbiter): def _init_context(self, context): self.context = context or Context.instance() - self.loop = ioloop.IOLoop.instance() + self.loop = ioloop.IOLoop.current() self.ctrl = Controller(self.endpoint, self.multicast_endpoint, self.context, self.loop, self, self.check_delay) diff --git a/circus/green/controller.py b/circus/green/controller.py index c5599269d..73a9c021f 100644 --- a/circus/green/controller.py +++ b/circus/green/controller.py @@ -14,7 +14,8 @@ def _init_stream(self): self.stream.on_recv(self.handle_message) def start(self): + self.loop.make_current() self.initialize() self.caller = ioloop.PeriodicCallback(self.arbiter.manage_watchers, - self.check_delay, self.loop) + self.check_delay) self.caller.start() diff --git a/circus/plugins/__init__.py b/circus/plugins/__init__.py index e5c7d2467..4c0fa98f7 100644 --- a/circus/plugins/__init__.py +++ b/circus/plugins/__init__.py @@ -58,6 +58,7 @@ def initialize(self): @debuglog def start(self): + self.loop.make_current() if not self.active: raise ValueError('Will not start an inactive plugin') self.handle_init() diff --git a/circus/plugins/command_reloader.py b/circus/plugins/command_reloader.py index 7057668bd..2522ba932 100644 --- a/circus/plugins/command_reloader.py +++ b/circus/plugins/command_reloader.py @@ -48,8 +48,7 @@ def look_after(self): def handle_init(self): self.period = ioloop.PeriodicCallback(self.look_after, - self.loop_rate * 1000, - self.loop) + self.loop_rate * 1000) self.period.start() def handle_stop(self): diff --git a/circus/plugins/statsd.py b/circus/plugins/statsd.py index 777eeef21..11cd5eb1f 100644 --- a/circus/plugins/statsd.py +++ b/circus/plugins/statsd.py @@ -81,7 +81,7 @@ def __init__(self, *args, **config): def handle_init(self): self.period = ioloop.PeriodicCallback(self.look_after, - self.loop_rate * 1000, self.loop) + self.loop_rate * 1000) self.period.start() def handle_stop(self): diff --git a/circus/plugins/watchdog.py b/circus/plugins/watchdog.py index ad30c030f..3f9701b1f 100644 --- a/circus/plugins/watchdog.py +++ b/circus/plugins/watchdog.py @@ -79,8 +79,7 @@ def handle_init(self): - create the listening UDP socket """ self.period = ioloop.PeriodicCallback(self.look_after, - self.loop_rate * 1000, - self.loop) + self.loop_rate * 1000) self.period.start() self._bind_socket() diff --git a/circus/stats/collector.py b/circus/stats/collector.py index 4784c9e83..af2165214 100644 --- a/circus/stats/collector.py +++ b/circus/stats/collector.py @@ -12,7 +12,7 @@ class BaseStatsCollector(ioloop.PeriodicCallback): def __init__(self, streamer, name, callback_time=1., io_loop=None): ioloop.PeriodicCallback.__init__(self, self._callback, - callback_time * 1000, io_loop) + callback_time * 1000) self.streamer = streamer self.name = name @@ -110,8 +110,7 @@ def __init__(self, streamer, name, callback_time=1., io_loop=None): callback_time, io_loop) self._rstats = defaultdict(int) self.sockets = [sock for sock, address, fd in self.streamer.sockets] - self._p = ioloop.PeriodicCallback(self._select, _LOOP_RES, - io_loop=io_loop) + self._p = ioloop.PeriodicCallback(self._select, _LOOP_RES) def start(self): self._p.start() diff --git a/circus/stats/streamer.py b/circus/stats/streamer.py index 569a6c7ed..e5f96a945 100644 --- a/circus/stats/streamer.py +++ b/circus/stats/streamer.py @@ -27,7 +27,7 @@ def __init__(self, endpoint, pubsub_endoint, stats_endpoint, self.sub_socket = self.ctx.socket(zmq.SUB) self.sub_socket.setsockopt(zmq.SUBSCRIBE, self.topic) self.sub_socket.connect(self.pubsub_endpoint) - self.loop = loop or ioloop.IOLoop.instance() + self.loop = loop or ioloop.IOLoop.current() self.substream = zmqstream.ZMQStream(self.sub_socket, self.loop) self.substream.on_recv(self.handle_recv) self.client = CircusClient(context=self.ctx, endpoint=endpoint, diff --git a/circus/stream/redirector.py b/circus/stream/redirector.py index 39ef99b41..312153410 100644 --- a/circus/stream/redirector.py +++ b/circus/stream/redirector.py @@ -42,7 +42,7 @@ def __init__(self, stdout_redirect, stderr_redirect, buffer=1024, self._active = {} self.redirect = {'stdout': stdout_redirect, 'stderr': stderr_redirect} self.buffer = buffer - self.loop = loop or ioloop.IOLoop.instance() + self.loop = loop or ioloop.IOLoop.current() def _start_one(self, fd, stream_name, process, pipe): if fd not in self._active: diff --git a/circus/tests/support.py b/circus/tests/support.py index bfd22eef5..c2e77f31f 100644 --- a/circus/tests/support.py +++ b/circus/tests/support.py @@ -11,11 +11,12 @@ import multiprocessing import socket import sysconfig +import concurrent from unittest import skip, skipIf, TestCase, TestSuite, findTestCases # noqa: F401 from tornado.testing import AsyncTestCase -import mock +from unittest import mock import tornado from circus import get_arbiter @@ -50,7 +51,7 @@ def __init__(self, name): def get_ioloop(): from tornado import ioloop - return ioloop.IOLoop.instance() + return ioloop.IOLoop.current() def get_available_port(): @@ -473,7 +474,7 @@ def stop(self): pass -class MagicMockFuture(mock.MagicMock, tornado.concurrent.Future): +class MagicMockFuture(mock.MagicMock, concurrent.futures.Future): def cancel(self): return False diff --git a/circus/tests/test_arbiter.py b/circus/tests/test_arbiter.py index d4c73a5c6..3198d9719 100644 --- a/circus/tests/test_arbiter.py +++ b/circus/tests/test_arbiter.py @@ -5,7 +5,7 @@ from tempfile import mkstemp from time import time import zmq.utils.jsonapi as json -import mock +from unittest import mock from urllib.parse import urlparse from circus.arbiter import Arbiter @@ -500,6 +500,7 @@ def test_plugins(self): loop=get_ioloop()) def incr_processes(cli): + # return a coroutine if cli is Async return cli.send_message('incr', name='test') # wait for the plugin to be started @@ -511,7 +512,7 @@ def incr_processes(cli): res = yield cli.send_message('list', name='test') self.assertEqual(len(res.get('pids')), 1) - incr_processes(cli) + yield incr_processes(cli) res = yield cli.send_message('list', name='test') self.assertEqual(len(res.get('pids')), 2) # wait for the plugin to receive the signal @@ -519,7 +520,7 @@ def incr_processes(cli): self.assertTrue(res) truncate_file(datafile) - incr_processes(cli) + yield incr_processes(cli) res = yield cli.send_message('list', name='test') self.assertEqual(len(res.get('pids')), 3) @@ -595,8 +596,7 @@ def _test_start_watchers_warmup_delay(self): @tornado.gen.coroutine def _sleep(duration): called.append(duration) - loop = get_ioloop() - yield tornado.gen.Task(loop.add_timeout, time() + duration) + yield tornado.gen.sleep(duration) watcher_mod.tornado_sleep = _sleep @@ -633,6 +633,21 @@ def callback(*args): self.assertEqual(callee.call_count, 1) + def test_start_with_callback_delay(self): + controller = "tcp://127.0.0.1:%d" % get_available_port() + sub = "tcp://127.0.0.1:%d" % get_available_port() + arbiter = Arbiter([], controller, sub, check_delay=1) + + callee = mock.MagicMock() + + def callback(*args): + callee() + arbiter.stop() + + arbiter.start(cb=callback) + + self.assertEqual(callee.call_count, 1) + @tornado.testing.gen_test def test_start_with_callback_and_given_loop(self): controller = "tcp://127.0.0.1:%d" % get_available_port() diff --git a/circus/tests/test_circusctl.py b/circus/tests/test_circusctl.py index 90289e23d..b95e7b3b7 100644 --- a/circus/tests/test_circusctl.py +++ b/circus/tests/test_circusctl.py @@ -1,6 +1,6 @@ import subprocess import shlex -from mock import patch +from unittest.mock import patch from multiprocessing import Process, Queue from tornado.testing import gen_test diff --git a/circus/tests/test_config.py b/circus/tests/test_config.py index be9a2b6cb..8304c231c 100644 --- a/circus/tests/test_config.py +++ b/circus/tests/test_config.py @@ -1,6 +1,6 @@ import os import signal -from mock import patch +from unittest.mock import patch from circus import logger from circus.arbiter import Arbiter diff --git a/circus/tests/test_controller.py b/circus/tests/test_controller.py index b988fb6ea..bfea57cfc 100644 --- a/circus/tests/test_controller.py +++ b/circus/tests/test_controller.py @@ -4,7 +4,7 @@ from circus import logger import circus.controller -import mock +from unittest import mock class TestController(TestCase): diff --git a/circus/tests/test_plugin_command_reloader.py b/circus/tests/test_plugin_command_reloader.py index dcbe1c678..5a8433d0c 100644 --- a/circus/tests/test_plugin_command_reloader.py +++ b/circus/tests/test_plugin_command_reloader.py @@ -1,4 +1,4 @@ -from mock import patch +from unittest.mock import patch from circus.plugins.command_reloader import CommandReloader from circus.tests.support import TestCircus, EasyTestSuite diff --git a/circus/tests/test_plugin_flapping.py b/circus/tests/test_plugin_flapping.py index 97925e84d..db7a9dad6 100644 --- a/circus/tests/test_plugin_flapping.py +++ b/circus/tests/test_plugin_flapping.py @@ -1,4 +1,4 @@ -from mock import patch +from unittest.mock import patch from circus.tests.support import TestCircus, EasyTestSuite from circus.plugins.flapping import Flapping diff --git a/circus/tests/test_reloadconfig.py b/circus/tests/test_reloadconfig.py index 4a57feaca..9912e8d3b 100644 --- a/circus/tests/test_reloadconfig.py +++ b/circus/tests/test_reloadconfig.py @@ -55,10 +55,10 @@ def _tear_down_arbiter(self, a): a.sockets.close_all() def get_new_ioloop(self): - return tornado.ioloop.IOLoop.instance() + return tornado.ioloop.IOLoop.current() def _load_base_arbiter(self, name='reload_base'): - loop = tornado.ioloop.IOLoop.instance() + loop = tornado.ioloop.IOLoop.current() a = Arbiter.load_from_config(_CONF[name], loop=loop) a.evpub_socket = FakeSocket() # initialize watchers diff --git a/circus/tests/test_sockets.py b/circus/tests/test_sockets.py index e5df45700..37142afde 100644 --- a/circus/tests/test_sockets.py +++ b/circus/tests/test_sockets.py @@ -5,7 +5,7 @@ import IN except ImportError: pass -import mock +from unittest import mock import fcntl from circus.tests.support import TestCase, skipIf, EasyTestSuite, IS_WINDOWS diff --git a/circus/tests/test_stats_collector.py b/circus/tests/test_stats_collector.py index ece51f705..eca60eeb7 100644 --- a/circus/tests/test_stats_collector.py +++ b/circus/tests/test_stats_collector.py @@ -72,6 +72,7 @@ def __init__(this, streamer): this.daemon = True def run(self): + self.loop.make_current() collector = collector_class( self.streamer, 'sockets', callback_time=0.1, io_loop=self.loop) diff --git a/circus/tests/test_stats_publisher.py b/circus/tests/test_stats_publisher.py index 2dd79b349..df25cd763 100644 --- a/circus/tests/test_stats_publisher.py +++ b/circus/tests/test_stats_publisher.py @@ -1,4 +1,4 @@ -import mock +from unittest import mock import zmq import zmq.utils.jsonapi as json diff --git a/circus/tests/test_stats_streamer.py b/circus/tests/test_stats_streamer.py index 947cf5f39..f0c0a40d2 100644 --- a/circus/tests/test_stats_streamer.py +++ b/circus/tests/test_stats_streamer.py @@ -1,7 +1,7 @@ import os import tempfile -import mock +from unittest import mock from circus.tests.support import TestCircus, EasyTestSuite from circus.stats.streamer import StatsStreamer diff --git a/circus/tests/test_util.py b/circus/tests/test_util.py index e74f48e13..6235cb37d 100644 --- a/circus/tests/test_util.py +++ b/circus/tests/test_util.py @@ -13,7 +13,7 @@ import psutil from psutil import Popen -import mock +from unittest import mock from circus.tests.support import (TestCase, EasyTestSuite, skipIf, IS_WINDOWS, SLEEP) diff --git a/circus/tests/test_validate_option.py b/circus/tests/test_validate_option.py index 48f79a644..b7eef3053 100644 --- a/circus/tests/test_validate_option.py +++ b/circus/tests/test_validate_option.py @@ -1,5 +1,5 @@ from circus.tests.support import TestCase, EasyTestSuite, IS_WINDOWS -from mock import patch +from unittest.mock import patch from circus.commands.util import validate_option from circus.exc import MessageError diff --git a/circus/tests/test_watcher.py b/circus/tests/test_watcher.py index 6c9222c78..52b4e749a 100644 --- a/circus/tests/test_watcher.py +++ b/circus/tests/test_watcher.py @@ -16,7 +16,7 @@ captured_output = None # NOQA import tornado -import mock +from unittest import mock from circus import logger from circus.process import RUNNING, UNEXISTING @@ -317,7 +317,7 @@ def __init__(self, stream=False, loop=None, **kw): self.watcher = None self.kw = kw if loop is None: - self.loop = tornado.ioloop.IOLoop.instance() + self.loop = tornado.ioloop.IOLoop.current() else: self.loop = loop diff --git a/circus/util.py b/circus/util.py index 29205ff7d..8b59125b0 100644 --- a/circus/util.py +++ b/circus/util.py @@ -26,7 +26,6 @@ fcntl = None grp = None pwd = None -from tornado.ioloop import IOLoop from tornado import gen from tornado import concurrent @@ -1033,7 +1032,7 @@ def wrapper(self, *args, **kwargs): finally: if isinstance(resp, concurrent.Future): cb = functools.partial(_synchronized_cb, arbiter) - resp.add_done_callback(cb) + concurrent.future_add_done_callback(resp, cb) else: if arbiter is not None: arbiter._exclusive_running_command = None @@ -1048,7 +1047,7 @@ def tornado_sleep(duration): To use with a gen.coroutines decorated function Thanks to http://stackoverflow.com/a/11135204/433050 """ - return gen.Task(IOLoop.instance().add_timeout, time.time() + duration) + return gen.sleep(duration) class TransformableFuture(concurrent.Future): diff --git a/circus/watcher.py b/circus/watcher.py index 3a843cac2..38d03b07b 100644 --- a/circus/watcher.py +++ b/circus/watcher.py @@ -242,7 +242,7 @@ def __init__(self, name, cmd, args=None, numprocesses=1, warmup_delay=0., self.close_child_stdout = close_child_stdout self.close_child_stderr = close_child_stderr self.use_papa = use_papa and papa is not None - self.loop = loop or ioloop.IOLoop.instance() + self.loop = loop or ioloop.IOLoop.current() if singleton and self.numprocesses not in (0, 1): raise ValueError("Cannot have %d processes with a singleton " diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 8aa15f09c..74971b697 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -4,6 +4,7 @@ Changelog history unreleased ---------- +- Drop support for tornado<5 and start using asyncio eventl loop - #1129 - Fix mem_info readings to be more reliable - #1128 - Drop support for Python 2.7 & 3.4 - #1126 - Speedup reloadconfig for large number of sockets - #1121 diff --git a/examples/test.sh b/examples/test.sh index fae14ff24..61b584faa 100755 --- a/examples/test.sh +++ b/examples/test.sh @@ -1,2 +1,3 @@ #!/bin/sh -;sleep 1 +sleep 1 +echo "@@@" diff --git a/setup.py b/setup.py index 9a784defb..db7319816 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ raise SystemExit("Circus requires Python 3.5 or higher.") -install_requires = ['psutil', 'pyzmq>=17.0', 'tornado>=3.0,<5.0'] +install_requires = ['psutil', 'pyzmq>=17.0', 'tornado>=5.0.2'] try: import argparse # NOQA @@ -36,6 +36,20 @@ "License :: OSI Approved :: Apache Software License" ], install_requires=install_requires, + extras_require={ + 'test': [ + 'nose', + 'nose-cov', + 'coverage', + 'mock', + 'circus-web', + 'gevent', + 'papa', + 'PyYAML', + 'pyzmq>=17.0', + 'flake8==2.1.0', + ], + }, test_suite='circus.tests', entry_points=""" [console_scripts] diff --git a/tox.ini b/tox.ini index 900c9c3fb..7ec5b51ab 100644 --- a/tox.ini +++ b/tox.ini @@ -12,7 +12,7 @@ deps = gevent papa PyYAML - tornado>=3.0,<5.0 + tornado>=5.0.2 pyzmq>=17.0 setenv =