From caf202a0fdbacc89f45a4d9b5432765cabe20eab Mon Sep 17 00:00:00 2001 From: Simon Cross Date: Wed, 19 Dec 2012 16:36:55 +0200 Subject: [PATCH 1/3] Draft of new MetricsStore for JS box application. --- go/apps/jsbox/metrics.py | 149 ++++++++++++++++++++++++++++ go/apps/jsbox/tests/test_metrics.py | 1 + 2 files changed, 150 insertions(+) create mode 100644 go/apps/jsbox/metrics.py create mode 100644 go/apps/jsbox/tests/test_metrics.py diff --git a/go/apps/jsbox/metrics.py b/go/apps/jsbox/metrics.py new file mode 100644 index 000000000..cca49c531 --- /dev/null +++ b/go/apps/jsbox/metrics.py @@ -0,0 +1,149 @@ +# -*- test-case-name: go.apps.jsbox.tests.test_metrics -*- +# -*- coding: utf-8 -*- + +"""Metrics for JS Box sandboxes""" + +import time +import json +import math + +from twisted.internet.defer import inlineCallbacks, returnValue + +from vumi.application.sandbox import SandboxResource + + +class MetricsResource(SandboxResource): + """Resource that provides metric storing.""" + + def setup_resource(self): + self.metrics_store = MetricsStore() + + def teardown_resource(self): + pass + + def _event_for_command(self, etype, command): + return { + 'event': etype, + 'metric': command['metric'], + 'value': float(command['value']), + 'timestamp': time.time(), + } + + @inlineCallbacks + def _process_event_command(self, etype, store_id, command): + try: + event = self._event_for_command(etype, command) + except Exception, e: + returnValue(self.reply(command, success=False, reason=str(e))) + yield self.metric_store.push_event(store_id, event) + returnValue(self.reply(command, success=True)) + + def handle_inc(self, api, command): + return self._process_event_command("inc", command) + + def handle_set(self, api, command): + return self._process_event_command("set", command) + + +class MetricsStore(object): + + def __init__(self, manager, redis, metric_interval, polling_interval=None): + self.manager = manager + self.redis = redis + self.metric_interval = metric_interval + self.polling_interval = (polling_interval + if polling_interval is not None + else metric_interval) + + def rkey(self, *args): + return ":".join(args) + + def events_key(self, store_id): + return self.rkey("stores", store_id, "events") + + def last_timebucket_key(self, store_id): + return self.rkey("stores", store_id, "last_timebucket") + + def store_updated(self, store_id): + return self.redis.sadd("updated_stores", store_id) + + def pop_updated(self): + return self.redis.spop("updated_stores") + + def get_set_last_timebucket(self, store_id, new_timebucket): + last_timebucket_key = self.last_timebucket_key(store_id) + return self.redis.getset(last_timebucket_key, new_timebucket) + + def timebucket(self, timestamp): + intervals = math.floor(timestamp / self.metric_interval) + return intervals * self.metric_interval + + def push_event(self, store_id, event): + events_key = self.events_key(store_id) + timestamp = event['timestamp'] + return self.redis.zadd(events_key, timestamp, json.dumps(event)) + + @inlineCallbacks + def pull_events(self, store_id, start, end): + events_key = self.events_key(store_id) + start = "(%f" % start + end = "%f" % end + raw_events = yield self.redis.zrangebyscore(events_key, start, end) + returnValue([json.loads(ev) for ev in raw_events]) + + def delete_events(self, store_id, start, end): + events_key = self.events_key(store_id) + start = "(%f" % start + end = "%f" % end + return self.redis.zremrangebyscore(events_key, start, end) + + @inlineCallbacks + def first_event(self, store_id): + events_key = self.events_key(store_id) + events = yield self.redis.zrange(events_key, 0, 1) + if not events: + return None + return json.loads(events[0]) + + @inlineCallbacks + def add_event(self, store_id, event): + yield self.mark_store_updated(store_id) + yield self.push_event(store_id, event) + + @inlineCallbacks + def process_updated_stores(self): + while True: + store_id = yield self.pop_updated_store() + if store_id is None: + break + # most recently completed metric interval + final_timebucket = self.timebucket(time.time() - + self.metric_interval) + + current_timebucket = self.get_set_last_timebucket(store_id, + final_timebucket) + if current_timebucket is None: + # first time progressing -- start from timebucket containing + # first unprocessed event + first_event = yield self.first_event(store_id) + if first_event is None: + continue + current_timebucket = self.timebucket(first_event['timestamp']) + + while (current_timebucket < + final_timebucket + 0.5 * self.metric_interval): + next_timebucket = current_timebucket + self.metric_interval + yield self.update_store(store_id, current_timebucket, + next_timebucket) + current_timebucket = next_timebucket + + @inlineCallbacks + def update_store(self, store_id, start, end): + events = yield self.pull_events(store_id, start, end) + # TODO: load metrics from riak + for ev in events: + # TODO: update metrics + pass + # TODO: save metrics to riak + yield self.delete_events(store_id, start, end) + # TODO: notify holodeck diff --git a/go/apps/jsbox/tests/test_metrics.py b/go/apps/jsbox/tests/test_metrics.py new file mode 100644 index 000000000..36ac3b770 --- /dev/null +++ b/go/apps/jsbox/tests/test_metrics.py @@ -0,0 +1 @@ +"""Tests for go.apps.jsbox.metrics.""" From 4ddad77981bd046cfe63fd7bcf13d563eb2675e2 Mon Sep 17 00:00:00 2001 From: Simon Cross Date: Wed, 19 Dec 2012 23:27:44 +0200 Subject: [PATCH 2/3] Flesh out metric store a lot -- now to test the whole thing. --- go/apps/jsbox/metrics.py | 153 ++++++++++++++++++++++++++++++++------- 1 file changed, 126 insertions(+), 27 deletions(-) diff --git a/go/apps/jsbox/metrics.py b/go/apps/jsbox/metrics.py index cca49c531..e7cb5b340 100644 --- a/go/apps/jsbox/metrics.py +++ b/go/apps/jsbox/metrics.py @@ -7,57 +7,127 @@ import json import math -from twisted.internet.defer import inlineCallbacks, returnValue +from twisted.internet.defer import inlineCallbacks, returnValue, maybeDeferred +from twisted.internet.task import LoopingCall from vumi.application.sandbox import SandboxResource +from vumi.persist.fields import Unicode, ForeignKey, JsonField +from vumi.persist.model import Model +from vumi import log +from go.vumitools.account import UserAccount, PerAccountStore class MetricsResource(SandboxResource): """Resource that provides metric storing.""" + # default number of seconds between metric updates + DEFAULT_METRIC_INTERVAL = 300 + def setup_resource(self): - self.metrics_store = MetricsStore() + metric_interval = self.config.get('metric_interval', + self.DEFAULT_METRIC_INTERVAL) + self.metrics_manager = MetricsStoreManager( + self.app_worker.vumi_api, metric_interval, + update_callback=self._update_holodeck) + @inlineCallbacks def teardown_resource(self): + yield self.metrics_manager.stop() + + def _update_holodeck(self, user_account_key, store): + # TODO: implement pass def _event_for_command(self, etype, command): - return { - 'event': etype, - 'metric': command['metric'], - 'value': float(command['value']), - 'timestamp': time.time(), - } + return MetricEvent(event=etype, store=command.get('store', 'default'), + metric=command['metric'], + value=float(command['value'])) @inlineCallbacks - def _process_event_command(self, etype, store_id, command): + def _process_event_command(self, etype, user_account_key, command): try: event = self._event_for_command(etype, command) except Exception, e: returnValue(self.reply(command, success=False, reason=str(e))) - yield self.metric_store.push_event(store_id, event) + yield self.metrics_manager.push_event(user_account_key, event) returnValue(self.reply(command, success=True)) def handle_inc(self, api, command): - return self._process_event_command("inc", command) + return self._process_event_command(MetricEvent.INC, command) def handle_set(self, api, command): - return self._process_event_command("set", command) + return self._process_event_command(MetricEvent.SET, command) + + +class MetricsBundle(Model): + """A metric store attached to an account""" + user_account = ForeignKey(UserAccount) + name = Unicode(max_length=255, index=True) + metrics = JsonField() + + +class MetricsBundleStore(PerAccountStore): + def setup_proxies(self): + self.metrics_bundles = self.manager.proxy(MetricsBundle) + + +class MetricEvent(object): + + INC, SET = "inc", "set" + + def __init__(self, event, store, metric, value, timestamp=None): + self.event = event + self.store = store + self.metric = metric + self.value = value + if timestamp is None: + timestamp = time.time() + self.timestamp = timestamp + + def to_json(self): + return json.dumps({'event': self.event, 'store': self.store, + 'metric': self.metric, 'value': self.value, + 'timestamp': self.timestamp}) + + @classmethod + def from_json(cls, data): + return cls(**json.loads(data)) -class MetricsStore(object): +class MetricsStoreManager(object): - def __init__(self, manager, redis, metric_interval, polling_interval=None): - self.manager = manager - self.redis = redis + def __init__(self, api, metric_interval, polling_interval=None, + update_callback=None): + self.api = api + self.redis = api.redis.sub_manager("metrics_store_manager") self.metric_interval = metric_interval self.polling_interval = (polling_interval if polling_interval is not None else metric_interval) + self.update_callback = update_callback + + self.update_loop = LoopingCall(self.process_updated_stores) + self.update_loop_done = self.update_loop.start(self.polling_interval) + + @inlineCallbacks + def stop(self, stop_redis=True): + if self.update_loop.running: + self.update_loop.stop() + yield self.update_loop_done + + def metrics_bundle_store(self, user_account_key): + return MetricsBundleStore(self.api.manager, user_account_key) def rkey(self, *args): return ":".join(args) + def store_id(self, user_account_key, store): + return self.rkey(user_account_key, store) + + def parse_store_id(self, store_id): + user_account_key, store = store_id.split(':') + return user_account_key, store + def events_key(self, store_id): return self.rkey("stores", store_id, "events") @@ -81,7 +151,7 @@ def timebucket(self, timestamp): def push_event(self, store_id, event): events_key = self.events_key(store_id) timestamp = event['timestamp'] - return self.redis.zadd(events_key, timestamp, json.dumps(event)) + return self.redis.zadd(events_key, timestamp, event.to_json()) @inlineCallbacks def pull_events(self, store_id, start, end): @@ -89,7 +159,7 @@ def pull_events(self, store_id, start, end): start = "(%f" % start end = "%f" % end raw_events = yield self.redis.zrangebyscore(events_key, start, end) - returnValue([json.loads(ev) for ev in raw_events]) + returnValue([MetricEvent.from_json(ev) for ev in raw_events]) def delete_events(self, store_id, start, end): events_key = self.events_key(store_id) @@ -106,7 +176,8 @@ def first_event(self, store_id): return json.loads(events[0]) @inlineCallbacks - def add_event(self, store_id, event): + def add_event(self, user_account_id, event): + store_id = self.store_id(user_account_id, event.store) yield self.mark_store_updated(store_id) yield self.push_event(store_id, event) @@ -118,13 +189,13 @@ def process_updated_stores(self): break # most recently completed metric interval final_timebucket = self.timebucket(time.time() - - self.metric_interval) + self.metric_interval) current_timebucket = self.get_set_last_timebucket(store_id, final_timebucket) if current_timebucket is None: - # first time progressing -- start from timebucket containing - # first unprocessed event + # the first time progressing we start from timebucket + # containing the first unprocessed event first_event = yield self.first_event(store_id) if first_event is None: continue @@ -139,11 +210,39 @@ def process_updated_stores(self): @inlineCallbacks def update_store(self, store_id, start, end): + user_account_key, store = self.parse_store_id(store_id) + metrics_bundle_store = self.metrics_bundle_store(user_account_key) + metrics_bundles = metrics_bundle_store.metrics_bundles + bundles = yield metrics_bundles.index_lookup('name', store) + if not bundles: + metrics_bundle = metrics_bundles(user_account=user_account_key, + name=store, metrics={}) + elif len(bundles) == 1: + metrics_bundle = bundles[0] + else: + log.error("Found multiple bundles for store id %r" % (store_id,)) + return + events = yield self.pull_events(store_id, start, end) - # TODO: load metrics from riak for ev in events: - # TODO: update metrics - pass - # TODO: save metrics to riak + self.apply_event(metrics_bundle, ev) + + yield metrics_bundle.save() yield self.delete_events(store_id, start, end) - # TODO: notify holodeck + if self.update_callback is not None: + yield maybeDeferred(self.update_callback, user_account_key, store) + + def apply_event(self, metrics_bundle, event): + method_name = 'apply_%s_event' % event['event'] + method = getattr(self, method_name, self.apply_unknown_event) + return method(metrics_bundle, event) + + def apply_unknown_event(self, mb, ev): + store_id = self.store_id(mb.user_account_key, mb.name) + log.warn("Unknown event for store_id %r: %r" % (store_id, ev)) + + def apply_inc_event(self, mb, ev): + mb.metrics[ev.metric] = mb.metrics.get(ev.metric, 0) + ev.value + + def apply_set_event(self, mb, ev): + mb.metrics[ev.metric] = ev.value From 51deee3c77403a3c094b544720285e7e1f108b50 Mon Sep 17 00:00:00 2001 From: Simon Cross Date: Thu, 20 Dec 2012 00:39:59 +0200 Subject: [PATCH 3/3] First tests for jsbox metrics and some minor fixes. --- go/apps/jsbox/metrics.py | 27 +++++++---- go/apps/jsbox/tests/test_metrics.py | 71 +++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+), 9 deletions(-) diff --git a/go/apps/jsbox/metrics.py b/go/apps/jsbox/metrics.py index e7cb5b340..e6ca90240 100644 --- a/go/apps/jsbox/metrics.py +++ b/go/apps/jsbox/metrics.py @@ -11,7 +11,7 @@ from twisted.internet.task import LoopingCall from vumi.application.sandbox import SandboxResource -from vumi.persist.fields import Unicode, ForeignKey, JsonField +from vumi.persist.fields import Unicode, ForeignKey, Json from vumi.persist.model import Model from vumi import log from go.vumitools.account import UserAccount, PerAccountStore @@ -26,7 +26,7 @@ class MetricsResource(SandboxResource): def setup_resource(self): metric_interval = self.config.get('metric_interval', self.DEFAULT_METRIC_INTERVAL) - self.metrics_manager = MetricsStoreManager( + self.metrics_manager = MetricStoreManager( self.app_worker.vumi_api, metric_interval, update_callback=self._update_holodeck) @@ -63,7 +63,7 @@ class MetricsBundle(Model): """A metric store attached to an account""" user_account = ForeignKey(UserAccount) name = Unicode(max_length=255, index=True) - metrics = JsonField() + metrics = Json() class MetricsBundleStore(PerAccountStore): @@ -84,6 +84,13 @@ def __init__(self, event, store, metric, value, timestamp=None): timestamp = time.time() self.timestamp = timestamp + def __eq__(self, other): + if not isinstance(other, self.__class__): + return False + return all((self.event == other.event, self.store == other.store, + self.metric == other.metric, self.value == other.value, + self.timestamp == other.timestamp)) + def to_json(self): return json.dumps({'event': self.event, 'store': self.store, 'metric': self.metric, 'value': self.value, @@ -94,7 +101,9 @@ def from_json(cls, data): return cls(**json.loads(data)) -class MetricsStoreManager(object): +class MetricStoreManager(object): + + looping_call = LoopingCall def __init__(self, api, metric_interval, polling_interval=None, update_callback=None): @@ -106,11 +115,11 @@ def __init__(self, api, metric_interval, polling_interval=None, else metric_interval) self.update_callback = update_callback - self.update_loop = LoopingCall(self.process_updated_stores) + self.update_loop = self.looping_call(self.process_updated_stores) self.update_loop_done = self.update_loop.start(self.polling_interval) @inlineCallbacks - def stop(self, stop_redis=True): + def stop(self): if self.update_loop.running: self.update_loop.stop() yield self.update_loop_done @@ -172,8 +181,8 @@ def first_event(self, store_id): events_key = self.events_key(store_id) events = yield self.redis.zrange(events_key, 0, 1) if not events: - return None - return json.loads(events[0]) + returnValue(None) + returnValue(json.loads(events[0])) @inlineCallbacks def add_event(self, user_account_id, event): @@ -184,7 +193,7 @@ def add_event(self, user_account_id, event): @inlineCallbacks def process_updated_stores(self): while True: - store_id = yield self.pop_updated_store() + store_id = yield self.pop_updated() if store_id is None: break # most recently completed metric interval diff --git a/go/apps/jsbox/tests/test_metrics.py b/go/apps/jsbox/tests/test_metrics.py index 36ac3b770..5e78691e2 100644 --- a/go/apps/jsbox/tests/test_metrics.py +++ b/go/apps/jsbox/tests/test_metrics.py @@ -1 +1,72 @@ """Tests for go.apps.jsbox.metrics.""" + +import time +import json + +from twisted.trial.unittest import TestCase +from twisted.internet.defer import inlineCallbacks +from twisted.internet.task import Clock, LoopingCall + +from go.vumitools.tests.utils import GoPersistenceMixin +from go.vumitools.api import VumiApi + +from go.apps.jsbox.metrics import MetricEvent, MetricStoreManager + + +class TestMetricEvent(TestCase): + def test_to_json(self): + now = time.time() + ev = MetricEvent(event=MetricEvent.INC, store='default', + metric='my.metric', value=1.5, timestamp=now) + self.assertEqual(ev.to_json(), json.dumps({ + 'event': MetricEvent.INC, 'store': 'default', + 'metric': 'my.metric', 'value': 1.5, + 'timestamp': now + })) + + def test_from_json(self): + now = time.time() + ev = MetricEvent.from_json(json.dumps({ + 'event': MetricEvent.INC, 'store': 'default', + 'metric': 'my.metric', 'value': 1.5, + 'timestamp': now + })) + expected_ev = MetricEvent(event=MetricEvent.INC, store='default', + metric='my.metric', value=1.5, timestamp=now) + self.assertEqual(ev, expected_ev) + + +class TestMetricStoreManager(GoPersistenceMixin, TestCase): + + use_riak = True + metric_interval = 300 + + @inlineCallbacks + def setUp(self): + yield super(TestMetricStoreManager, self).setUp() + yield self._persist_setUp() + self.clock = Clock() + self.patch(MetricStoreManager, 'looping_call', + self.looping_call) + self.vumi_api = yield VumiApi.from_config_async(self.mk_config({})) + self.user_account = yield self.mk_user(self.vumi_api, u'testuser') + self.msm = MetricStoreManager(self.vumi_api, self.metric_interval) + + @inlineCallbacks + def tearDown(self): + yield super(TestMetricStoreManager, self).tearDown() + yield self._persist_tearDown() + yield self.msm.stop() + + def looping_call(self, *args, **kw): + lc = LoopingCall(*args, **kw) + lc.clock = self.clock + return lc + + def test_store_id(self): + self.assertEqual(self.msm.store_id("12345", "default"), + "12345:default") + + def test_parse_store_id(self): + self.assertEqual(self.msm.parse_store_id("12345:default"), + ("12345", "default"))