Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow plugins to have [env:name] and find plugins relatively based on… #1016

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
10 changes: 8 additions & 2 deletions circus/arbiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from circus.util import DictDiffer, synchronized, tornado_sleep, papa
from circus.util import IS_WINDOWS
from circus.config import get_config
from circus.plugins import get_plugin_cmd
from circus.plugins import CircusPlugin, get_plugin_cmd
from circus.sockets import CircusSocket, CircusSockets


Expand Down Expand Up @@ -429,6 +429,10 @@ def load_from_config(cls, config_file, loop=None):
for socket_ in cfg.get('sockets', []):
sockets.append(CircusSocket.load_from_config(socket_))

plugins = []
for plugin in cfg.get('plugins', []):
plugins.append(CircusPlugin.load_from_config(plugin))

httpd = cfg.get('httpd', False)
if httpd:
# controlling that we have what it takes to run the web UI
Expand All @@ -446,7 +450,9 @@ def load_from_config(cls, config_file, loop=None):
statsd=cfg.get('statsd', False),
stats_endpoint=cfg.get('stats_endpoint'),
multicast_endpoint=cfg.get('multicast_endpoint'),
plugins=cfg.get('plugins'), sockets=sockets,
plugins=plugins,
# plugins=cfg.get('plugins'),
sockets=sockets,
warmup_delay=cfg.get('warmup_delay', 0),
httpd=httpd,
loop=loop,
Expand Down
30 changes: 27 additions & 3 deletions circus/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ def watcher_defaults():
'use_papa': False}


def plugin_defaults():
return {
'name': '',
'stderr_stream': dict(),
'stdout_stream': dict(),
'priority': 0,
}


class DefaultConfigParser(StrictConfigParser):

def __init__(self, *args, **kw):
Expand Down Expand Up @@ -201,10 +210,25 @@ def get_config(config_file):
sockets.append(sock)

if section.startswith("plugin:"):
plugin = section_items
# plugin = section_items
plugin = plugin_defaults()
plugin.update(section_items)
plugin['name'] = section
if 'priority' in plugin:
plugin['priority'] = int(plugin['priority'])

# create watcher options
for opt, val in cfg.items(section, noreplace=True):
if opt == 'priority':
plugin['priority'] = int(plugin['priority'])
elif opt.startswith('stderr_stream') or \
opt.startswith('stdout_stream'):
stream_name, stream_opt = opt.split(".", 1)
plugin[stream_name][stream_opt] = val

if plugin.get('copy_env'):
plugin['env'] = dict(global_env)
else:
plugin['env'] = dict(local_env)

plugins.append(plugin)

if section.startswith("watcher:"):
Expand Down
61 changes: 40 additions & 21 deletions circus/plugins/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
""" Base class to create Circus subscribers plugins.
"""
import sys
import argparse
import copy
import errno
import os
import site
import sys
import uuid
import argparse

import zmq
import zmq.utils.jsonapi as json
Expand All @@ -12,9 +15,10 @@
from circus import logger, __version__
from circus.client import make_message, cast_message
from circus.py3compat import b, s
from circus.stream import get_stream
from circus.util import (debuglog, to_bool, resolve_name, configure_logger,
DEFAULT_ENDPOINT_DEALER, DEFAULT_ENDPOINT_SUB,
get_connection)
get_connection, get_python_version, parse_env_dict)


class CircusPlugin(object):
Expand All @@ -30,7 +34,8 @@ class CircusPlugin(object):
"""
name = ''

def __init__(self, endpoint, pubsub_endpoint, check_delay, ssh_server=None,
def __init__(self, endpoint, pubsub_endpoint, check_delay,
ssh_server=None, stdout_stream=None, stderr_stream=None,
**config):
self.daemon = True
self.config = config
Expand All @@ -41,6 +46,12 @@ def __init__(self, endpoint, pubsub_endpoint, check_delay, ssh_server=None,
self.ssh_server = ssh_server
self._id = b(uuid.uuid4().hex)
self.running = False

self.stdout_stream_conf = copy.copy(stdout_stream)
self.stderr_stream_conf = copy.copy(stderr_stream)
self.stdout_stream = get_stream(self.stdout_stream_conf)
self.stderr_stream = get_stream(self.stderr_stream_conf)

self.loop = ioloop.IOLoop()

@debuglog
Expand Down Expand Up @@ -157,31 +168,29 @@ def split_data(data):
def load_message(msg):
return json.loads(msg)

@classmethod
def load_from_config(cls, config):
if 'env' in config:
config['env'] = parse_env_dict(config['env'])
return config


def _cfg2str(cfg):
return ':::'.join(['%s:%s' % (key, val) for key, val in cfg.items()])
json_cfg = json.dumps(cfg, separators=(',', ':'))
if get_python_version() < (3, 0, 0):
return json_cfg.encode('unicode-escape')
else:
# zmq in py3 returns bytes
return json_cfg.decode("utf-8")


def _str2cfg(data):
cfg = {}
if data is None:
return cfg

for item in data.split(':::'):
item = item.split(':', 1)
if len(item) != 2:
continue
key, value = item
cfg[key.strip()] = value.strip()

return cfg
return json.loads(data)


def get_plugin_cmd(config, endpoint, pubsub, check_delay, ssh_server,
debug=False, loglevel=None, logoutput=None):
fqn = config['use']
# makes sure the name exists
resolve_name(fqn)

# we're good, serializing the config
del config['use']
Expand All @@ -192,7 +201,7 @@ def get_plugin_cmd(config, endpoint, pubsub, check_delay, ssh_server,
if ssh_server is not None:
cmd += ' --ssh %s' % ssh_server
if len(config) > 0:
cmd += ' --config %s' % config
cmd += ' --config %r' % config
if debug:
cmd += ' --log-level DEBUG'
elif loglevel:
Expand Down Expand Up @@ -245,6 +254,16 @@ def main():
parser.print_usage()
sys.exit(0)

cfg = _str2cfg(args.config)

# load directories in PYTHONPATH if provided
# so if a hook is there, it can be loaded
if 'env' in cfg and 'PYTHONPATH' in cfg['env']:
for path in cfg['env']['PYTHONPATH'].split(os.pathsep):
if path in sys.path:
continue
site.addsitedir(path)

factory = resolve_name(args.plugin)

# configure the logger
Expand All @@ -256,7 +275,7 @@ def main():
logger.info('Pub/sub: %r' % args.pubsub)
plugin = factory(args.endpoint, args.pubsub,
args.check_delay, args.ssh,
**_str2cfg(args.config))
**cfg)
logger.info('Starting')
try:
plugin.start()
Expand Down
9 changes: 9 additions & 0 deletions circus/tests/config/find_plugin_in_pythonpath.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[plugin:relative_plugin]
priority = 20
use = plugins_uniquename.my_plugin.MyPlugin
stdout_stream.class = StdoutStream
stderr_stream.class = StdoutStream
# copy_env = True

[env:relative_plugin]
PYTHONPATH = $PWD/circus/tests/config
Empty file.
7 changes: 7 additions & 0 deletions circus/tests/config/plugins_uniquename/my_plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from circus.tests.test_arbiter import EventLoggingTestPlugin


# Plugin is the same as the EventLoggingTestPlugin,
# just in a directory outside of the circus modules.
class MyPlugin(EventLoggingTestPlugin):
pass
57 changes: 53 additions & 4 deletions circus/tests/test_arbiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
EasyTestSuite, skipIf, get_ioloop, SLEEP,
PYTHON)
from circus.util import (DEFAULT_ENDPOINT_DEALER, DEFAULT_ENDPOINT_MULTICAST,
DEFAULT_ENDPOINT_SUB)
DEFAULT_ENDPOINT_SUB, parse_env_dict)
from circus.tests.support import (MockWatcher, has_circusweb,
poll_for_callable, get_available_port)
from circus import watcher as watcher_mod
Expand All @@ -28,11 +28,12 @@
_GENERIC = os.path.join(os.path.dirname(__file__), 'generic.py')


class Plugin(CircusPlugin):
class EventLoggingTestPlugin(CircusPlugin):
name = 'dummy'

def __init__(self, *args, **kwargs):
super(Plugin, self).__init__(*args, **kwargs)
super(EventLoggingTestPlugin, self).__init__(*args, **kwargs)
self.name = kwargs.get('name')
with open(self.config['file'], 'a+') as f:
f.write('PLUGIN STARTED')

Expand Down Expand Up @@ -495,7 +496,7 @@ def test_plugins(self):
os.close(fd)

# setting up a circusd with a plugin
plugin = 'circus.tests.test_arbiter.Plugin'
plugin = 'circus.tests.test_arbiter.EventLoggingTestPlugin'
plugins = [{'use': plugin, 'file': datafile}]

yield self.start_arbiter(graceful_timeout=0, plugins=plugins,
Expand Down Expand Up @@ -531,6 +532,54 @@ def incr_processes(cli):
os.remove(datafile)
yield self.stop_arbiter()

@tornado.testing.gen_test
def test_relative_plugin(self):
fd, datafile = mkstemp()
os.close(fd)

# setting up a circusd with a plugin
plugin = 'plugins_uniquename.my_plugin.MyPlugin'
plugins = [{
'use': plugin,
'file': datafile,
'env': parse_env_dict({
'PYTHONPATH': '$PWD/circus/tests/config'
}),
}]

yield self.start_arbiter(graceful_timeout=0, plugins=plugins,
loop=get_ioloop())

def incr_processes(cli):
return cli.send_message('incr', name='test')

# wait for the plugin to be started
res = yield async_poll_for(datafile, 'PLUGIN STARTED')
self.assertTrue(res)

cli = AsyncCircusClient(endpoint=self.arbiter.endpoint)

res = yield cli.send_message('list', name='test')
self.assertEqual(len(res.get('pids')), 1)

incr_processes(cli)
res = yield cli.send_message('list', name='test')
self.assertEqual(len(res.get('pids')), 2)
# wait for the plugin to receive the signal
res = yield async_poll_for(datafile, 'test:spawn')
self.assertTrue(res)
truncate_file(datafile)

incr_processes(cli)
res = yield cli.send_message('list', name='test')
self.assertEqual(len(res.get('pids')), 3)

# wait for the plugin to receive the signal
res = yield async_poll_for(datafile, 'test:spawn')
self.assertTrue(res)
os.remove(datafile)
yield self.stop_arbiter()

@tornado.testing.gen_test
def test_singleton(self):
# yield self._stop_runners()
Expand Down
8 changes: 8 additions & 0 deletions circus/tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
'hooks': os.path.join(CONFIG_DIR, 'hooks.ini'),
'find_hook_in_pythonpath': os.path.join(CONFIG_DIR,
'find_hook_in_pythonpath.ini'),
'find_plugin_in_pythonpath': os.path.join(CONFIG_DIR,
'find_plugin_in_pythonpath.ini'),
'env_var': os.path.join(CONFIG_DIR, 'env_var.ini'),
'env_section': os.path.join(CONFIG_DIR, 'env_section.ini'),
'multiple_wildcard': os.path.join(CONFIG_DIR, 'multiple_wildcard.ini'),
Expand Down Expand Up @@ -230,6 +232,12 @@ def test_watcher_env_var(self):
self.assertEqual("%s:/bin" % os.getenv('PATH'), watcher.env['PATH'])
watcher.stop()

def test_find_plugin_in_pythonpath(self):
# this tests that the config was loaded, NOT that the config works.
arbiter = Arbiter.load_from_config(_CONF['find_plugin_in_pythonpath'])
watchers = arbiter.iter_watchers()
self.assertEqual(watchers[0].name, 'plugin:relative_plugin')

def test_env_section(self):
conf = get_config(_CONF['env_section'])
watchers_conf = {}
Expand Down
28 changes: 28 additions & 0 deletions circus/tests/test_plugins.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from circus.plugins import _cfg2str, _str2cfg
from circus.util import get_python_version
from circus.tests.support import TestCase, EasyTestSuite


class TestPluginUtils(TestCase):

def test_cfg_str(self):
data_obj = {
'use': 'derp',
'copy_env': True,
'number': 1234,
'env': {
'PYTHONPATH': 'dot.path.to.whereever',
'more_truth': True,
},
}
data_strung = _cfg2str(data_obj)

# need to decode, like what would happen automatically when
# passing an arg into and out of the commandline
if get_python_version() < (3, 0, 0):
return data_strung.decode('unicode-escape')
data_unstrung = _str2cfg(data_strung)
self.assertEqual(data_obj, data_unstrung)


test_suite = EasyTestSuite(__name__)
1 change: 1 addition & 0 deletions circus/tests/test_reloadconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,5 +168,6 @@ def test_reload_ignorearbiterwatchers(self):
statsd = a.get_watcher('circusd-stats')
yield a.reload_from_config(_CONF['reload_statsd'])
self.assertEqual(statsd, a.get_watcher('circusd-stats'))
yield self._tear_down_arbiter(a)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think actually fixed the tests, but I think that it did help stop having dangling plugin processes (as in #1017 (comment) )


test_suite = EasyTestSuite(__name__)
15 changes: 15 additions & 0 deletions docs/source/for-devs/writing-plugins.rst
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,21 @@ by Python.
For example, :class:`Logger` may be found in a *plugins* module within a
*myproject* package.

If you'd like to use a plugin that is defined in a relative python module
(as opposed to a globally installed module) then you need to define the
PYTHONPATH in *[env:pluginname]*.

.. code-block:: ini

[plugin:foo]
use = 'myplugin.MyPlugin'

[env:foo]
PYTHONPATH = $PYTHONPATH:$PWD

You can use environment variables like *$PWD* in the *PYTHONPATH*.


Async requests
--------------

Expand Down