Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python3 Support, replacing SocketIO with SockJS #65

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 36 additions & 16 deletions circusweb/circushttpd.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
from __future__ import print_function
from __future__ import print_function, unicode_literals

import argparse
import os
import os.path
import sys
import json
from base64 import b64encode, b64decode
from base64 import b64encode as _b64encode
from base64 import b64decode as _b64decode
from zmq.eventloop import ioloop
import socket

from six import string_types, iteritems
# Install zmq.eventloop to replace tornado.ioloop
ioloop.install()

Expand All @@ -22,7 +23,7 @@
from tornado.options import define, options # NOQA
from tornado.web import URLSpec

import tornadio2
from sockjs.tornado import SockJSRouter

from mako import exceptions
from tomako import MakoTemplateLoader
Expand All @@ -37,7 +38,7 @@
from circus.exc import CallError
from circus.util import configure_logger, LOG_LEVELS

from circusweb.namespace import SocketIOConnection
from circusweb.namespace import SocketConnection
from circusweb import __version__, logger
from circusweb.util import (run_command, AutoDiscovery)
from circusweb.session import (
Expand All @@ -60,18 +61,39 @@
}


def require_logged_user(func):
def b64encode(s):
return _b64encode(s.encode('ascii'))


def b64decode(s):
return _b64decode(s).decode('utf-8')


def _decode(e):
try:
e = e.decode('utf-8')
except AttributeError:
pass
return e


def jsonify_list(s):
try:
s = s.decode('utf-8')
except AttributeError:
if not isinstance(s, string_types):
s = [_decode(e) for e in s]
return json.dumps(s)


def require_logged_user(func):
@wraps(func)
def wrapped(self, *args, **kwargs):
controller = get_controller()

if not self.session.connected or not controller:
self.clean_user_session()
return self.redirect(self.application.reverse_url('connect'))

return func(self, *args, **kwargs)

return wrapped


Expand Down Expand Up @@ -99,7 +121,7 @@ def render_template(self, template_path, **data):
namespace.update({'controller': get_controller(),
'version': __version__,
'b64encode': b64encode,
'dumps': json.dumps,
'dumps': jsonify_list,
'session': self.session, 'messages': messages,
'SERVER': server})

Expand Down Expand Up @@ -186,7 +208,6 @@ def post(self):
for endpoint in endpoints_list:
if endpoint not in endpoints:
self.session.endpoints.remove(endpoint)

self.redirect(self.reverse_url('index'))


Expand All @@ -207,11 +228,10 @@ class WatcherAddHandler(BaseHandler):
def post(self, endpoint):
url = yield self.run_command(
'add_watcher',
kwargs=dict((k, v[0]) for k, v in
self.request.arguments.iteritems()),
kwargs=dict((k, v[0]) for k, v in iteritems(self.request.arguments)),
message='added a new watcher', endpoint=b64decode(endpoint),
redirect_url=self.reverse_url('watcher',
self.get_argument('name').lower()),
redirect_url=self.reverse_url(
'watcher', self.get_argument('name').lower()),
redirect_on_error=self.reverse_url('index'))
self.redirect(url)

Expand Down Expand Up @@ -364,7 +384,7 @@ def __init__(self):
]

self.loader = MakoTemplateLoader(TMPLDIR)
self.router = tornadio2.TornadioRouter(SocketIOConnection)
self.router = SockJSRouter(SocketConnection)
handlers += self.router.urls

settings = {
Expand Down
6 changes: 4 additions & 2 deletions circusweb/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# -*- coding: utf-8 -
from __future__ import unicode_literals

import uuid

import zmq
Expand Down Expand Up @@ -59,7 +61,7 @@ def call(self, cmd, callback):
raise CallError(str(e))

socket = self.context.socket(zmq.DEALER)
socket.setsockopt(zmq.IDENTITY, uuid.uuid4().hex)
socket.setsockopt_string(zmq.IDENTITY, uuid.uuid4().hex)
socket.setsockopt(zmq.LINGER, 0)
get_connection(socket, self.endpoint, self.ssh_server,
self.ssh_keyfile)
Expand All @@ -84,7 +86,7 @@ def recv_callback(msg):
stream.on_recv(recv_callback)

try:
socket.send(cmd)
socket.send_string(cmd)
except zmq.ZMQError as e:
raise CallError(str(e))

Expand Down
7 changes: 4 additions & 3 deletions circusweb/controller.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from __future__ import unicode_literals

from circus.commands import get_commands
from circusweb.client import AsynchronousCircusClient
from circusweb.stats_client import AsynchronousStatsConsumer
from circusweb.namespace import SocketIOConnection
from circusweb.namespace import SocketConnection

from tornado import gen

Expand Down Expand Up @@ -43,9 +45,8 @@ def connect_to_stats_endpoint(self, stats_endpoint):

stats_client = AsynchronousStatsConsumer(
['stat.'], self.loop,
SocketIOConnection.consume_stats, endpoint=stats_endpoint,
SocketConnection.consume_stats, endpoint=stats_endpoint,
ssh_server=self.ssh_server)

stats_client.count += 1
self.stats_clients[stats_endpoint] = stats_client

Expand Down
201 changes: 201 additions & 0 deletions circusweb/media/chuckt.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/**
* License: MIT
* @see https://github.com/epixa/chuckt/blob/master/LICENSE.md
* @version 0.2.0
*/

window.epixa || (window.epixa = {});

(function(){
/**
* Convenience method for applying chuckt event functionality to a socket
*
* IMPORTANT NOTE: Using this method will override any existing onmessage
* callback on the given socket, so this should only be used if the only
* messages you expect to receive through the socket are generated by a
* chuckt server implementation.
*
* @param socket
*/
epixa.chucktify = function(socket) {
var chuckt = new ChuckT(socket);
socket.onmessage = function(e) {
chuckt.process(e.data);
};
return chuckt;
};

/**
* Enables ChuckT event functionality on the given socket
*
* In terms of receiving responses from the backend, no assumptions are made.
* Since the websocket API currently only supports a single onmessage
* callback, the ChuckT client app is responsible for calling chuck.process()
* when a message is received from the backend. Messages do not have to be
* pre-filtered -- if chuckt.process() is called on a non-chuckt message, it
* will simply be ignored.
*
* The only hard requirement of the given socket is that it implements a
* send() method that takes in a single message string. It is assumed, but
* not required, that the send() method will transport the encoded message to
* the backend.
*
* @constructor
*/
var ChuckT = epixa.ChuckT = function(socket) {
this.socket = socket;
this.callbacks = new Callbacks();
this.listeners = {};
};

/**
* Emits an event that proxies through the socket connection
*
* In addition to the name of the event to fire, you can pass any number of
* arguments and they will be sent along with the event.
*
* If the last argument passed is a function, then it is registered as a
* callback and is invoked whenever receipt is acknowledged on the backend.
*
* Usage:
* chuckt.emit('my-event', 'foo', 'bar');
* sends: {"chuckt":{"event":"my-event","args":["foo","bar"]}}
*
* chuckt.emit('my-event', 'foo', console.log);
* sends: {"chuckt":{"event":"my-event","args":["foo","bar"],"callbackid":0}}
* and console.log() is called if the backend returns a callback response
*
* @param event
*/
ChuckT.prototype.emit = function(event) {
var data = { event: event };
var args = Array.prototype.slice.call(arguments).slice(1);

if (args.length > 0 && typeof args[args.length - 1] == 'function') {
var callback = args.pop();
data.callbackid = this.callbacks.register(callback);
}

if (args.length > 0) {
data.args = args;
}
this.socket.send(this.serialize(data));
};

/**
* Adds the given callback as a listener to the given event
*
* When an event is received from the backend, all of its listeners are fired
* in order. Any arguments passed with the event will be passed to each
* listener.
*
* @param event
* @param callback
*/
ChuckT.prototype.on = function(event, callback) {
if (!this.listeners[event]) {
this.listeners[event] = [];
}
this.listeners[event].push(callback);
};

/**
* Removes event listeners
*
* If an event is specified, then only event listeners for that specific
* event will be removed. If no event is specified, then all listeners
* are removed.
*
* @param event
*/
ChuckT.prototype.removeListeners = function(event) {
if (typeof event === 'undefined') {
this.listeners = {};
} else {
delete this.listeners[event];
}
};

/**
* Processes the given socket message
*
* Generally speaking, this method will be manually called from within the
* socket's onmessage() callback.
*
* If the message is not a "chuckt" message, then it is ignored.
*
* If the message is a callback message, then the callback matching the
* callbackid is invoked with any arguments that may also have been passed.
*
* If the message is an event message, then all listeners bound to that event
* are fired in order with any arguments that may also have been passed.
*
* @param message
*/
ChuckT.prototype.process = function(message) {
var parsed = JSON.parse(message);
// don't handle non-chuckt messages
if (typeof parsed.chuckt !== 'object') return;
var chuckt = parsed.chuckt;

// message is a callback, so execute it
if (typeof chuckt.callbackid !== 'undefined') {
this.callbacks.use(chuckt.callbackid, chuckt.args);
return;
}

// message is an emitted event, so execute all corresponding listeners
if (typeof chuckt.event === 'string') {
if (!this.listeners[chuckt.event]) return;
for (var i in this.listeners[chuckt.event]) {
this.listeners[chuckt.event][i].apply(this, chuckt.args);
}
}
};

/**
* Serializes the given data as a json string with the chuckt prefix
*
* @param data
* @return {*}
*/
ChuckT.prototype.serialize = function(data) {
return JSON.stringify({ chuckt: data });
};

/**
* Callback collection
*
* Since callbacks are inherently volatile (once a registered callback is
* fired, it is permanently deleted), some minor callback management is
* necessary to minimize CPU and memory impact.
*
* @constructor
*/
var Callbacks = function(){
this.callbacks = [];
this.max = -1;
};

/**
* Registers the given callback and returns the callback's id
*
* @param callback
* @return {*}
*/
Callbacks.prototype.register = function(callback){
this.callbacks[++this.max] = callback;
return this.max;
};

/**
* Invokes the callback identified by the callbackid with any arguments
*
* @param callbackid
* @param args
*/
Callbacks.prototype.use = function(callbackid, args){
this.callbacks[callbackid].apply(this, args || {});
delete this.callbacks[callbackid];
};
})();
Loading