Skip to content
This repository has been archived by the owner on Jun 12, 2018. It is now read-only.

Add metrics resource to Go sandbox #121

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
257 changes: 257 additions & 0 deletions go/apps/jsbox/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
# -*- test-case-name: go.apps.jsbox.tests.test_metrics -*-
# -*- coding: utf-8 -*-

"""Metrics for JS Box sandboxes"""

import time
import json
import math

from twisted.internet.defer import inlineCallbacks, returnValue, maybeDeferred
from twisted.internet.task import LoopingCall

from vumi.application.sandbox import SandboxResource
from vumi.persist.fields import Unicode, ForeignKey, Json
from vumi.persist.model import Model
from vumi import log
from go.vumitools.account import UserAccount, PerAccountStore


class MetricsResource(SandboxResource):
"""Resource that provides metric storing."""

# default number of seconds between metric updates
DEFAULT_METRIC_INTERVAL = 300

def setup_resource(self):
metric_interval = self.config.get('metric_interval',
self.DEFAULT_METRIC_INTERVAL)
self.metrics_manager = MetricStoreManager(
self.app_worker.vumi_api, metric_interval,
update_callback=self._update_holodeck)

@inlineCallbacks
def teardown_resource(self):
yield self.metrics_manager.stop()

def _update_holodeck(self, user_account_key, store):
# TODO: implement
pass

def _event_for_command(self, etype, command):
return MetricEvent(event=etype, store=command.get('store', 'default'),
metric=command['metric'],
value=float(command['value']))

@inlineCallbacks
def _process_event_command(self, etype, user_account_key, command):
try:
event = self._event_for_command(etype, command)
except Exception, e:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just a KeyError here?

returnValue(self.reply(command, success=False, reason=str(e)))
yield self.metrics_manager.push_event(user_account_key, event)
returnValue(self.reply(command, success=True))

def handle_inc(self, api, command):
return self._process_event_command(MetricEvent.INC, command)

def handle_set(self, api, command):
return self._process_event_command(MetricEvent.SET, command)


class MetricsBundle(Model):
"""A metric store attached to an account"""
user_account = ForeignKey(UserAccount)
name = Unicode(max_length=255, index=True)
metrics = Json()


class MetricsBundleStore(PerAccountStore):
def setup_proxies(self):
self.metrics_bundles = self.manager.proxy(MetricsBundle)


class MetricEvent(object):

INC, SET = "inc", "set"

def __init__(self, event, store, metric, value, timestamp=None):
self.event = event
self.store = store
self.metric = metric
self.value = value
if timestamp is None:
timestamp = time.time()
self.timestamp = timestamp

def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
return all((self.event == other.event, self.store == other.store,
self.metric == other.metric, self.value == other.value,
self.timestamp == other.timestamp))

def to_json(self):
return json.dumps({'event': self.event, 'store': self.store,
'metric': self.metric, 'value': self.value,
'timestamp': self.timestamp})

@classmethod
def from_json(cls, data):
return cls(**json.loads(data))


class MetricStoreManager(object):

looping_call = LoopingCall

def __init__(self, api, metric_interval, polling_interval=None,
update_callback=None):
self.api = api
self.redis = api.redis.sub_manager("metrics_store_manager")
self.metric_interval = metric_interval
self.polling_interval = (polling_interval
if polling_interval is not None
else metric_interval)
self.update_callback = update_callback

self.update_loop = self.looping_call(self.process_updated_stores)
self.update_loop_done = self.update_loop.start(self.polling_interval)

@inlineCallbacks
def stop(self):
if self.update_loop.running:
self.update_loop.stop()
yield self.update_loop_done

def metrics_bundle_store(self, user_account_key):
return MetricsBundleStore(self.api.manager, user_account_key)

def rkey(self, *args):
return ":".join(args)

def store_id(self, user_account_key, store):
return self.rkey(user_account_key, store)

def parse_store_id(self, store_id):
user_account_key, store = store_id.split(':')
return user_account_key, store

def events_key(self, store_id):
return self.rkey("stores", store_id, "events")

def last_timebucket_key(self, store_id):
return self.rkey("stores", store_id, "last_timebucket")

def store_updated(self, store_id):
return self.redis.sadd("updated_stores", store_id)

def pop_updated(self):
return self.redis.spop("updated_stores")

def get_set_last_timebucket(self, store_id, new_timebucket):
last_timebucket_key = self.last_timebucket_key(store_id)
return self.redis.getset(last_timebucket_key, new_timebucket)

def timebucket(self, timestamp):
intervals = math.floor(timestamp / self.metric_interval)
return intervals * self.metric_interval

def push_event(self, store_id, event):
events_key = self.events_key(store_id)
timestamp = event['timestamp']
return self.redis.zadd(events_key, timestamp, event.to_json())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we float() the timestamp here just to be safe? It can be provided manually to the MetricEvent and isn't checked anywhere.


@inlineCallbacks
def pull_events(self, store_id, start, end):
events_key = self.events_key(store_id)
start = "(%f" % start
end = "%f" % end
raw_events = yield self.redis.zrangebyscore(events_key, start, end)
returnValue([MetricEvent.from_json(ev) for ev in raw_events])

def delete_events(self, store_id, start, end):
events_key = self.events_key(store_id)
start = "(%f" % start
end = "%f" % end
return self.redis.zremrangebyscore(events_key, start, end)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible that by the time delete_events() is called, new stuff has been added in between this start and end range that wasn't processed yet? Since the timestamp is used as the weight in theory it shouldn't be possible but I'm thinking of a scenario where our queues are backing up and we're using the timestamp value from a message that arrived late from the queue. (sorry commented on the commit earlier, copied & pasted to the PR now)


@inlineCallbacks
def first_event(self, store_id):
events_key = self.events_key(store_id)
events = yield self.redis.zrange(events_key, 0, 1)
if not events:
returnValue(None)
returnValue(json.loads(events[0]))

@inlineCallbacks
def add_event(self, user_account_id, event):
store_id = self.store_id(user_account_id, event.store)
yield self.mark_store_updated(store_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function doesn't seem to exist?

yield self.push_event(store_id, event)

@inlineCallbacks
def process_updated_stores(self):
while True:
store_id = yield self.pop_updated()
if store_id is None:
break
# most recently completed metric interval
final_timebucket = self.timebucket(time.time() -
self.metric_interval)

current_timebucket = self.get_set_last_timebucket(store_id,
final_timebucket)
if current_timebucket is None:
# the first time progressing we start from timebucket
# containing the first unprocessed event
first_event = yield self.first_event(store_id)
if first_event is None:
continue
current_timebucket = self.timebucket(first_event['timestamp'])

while (current_timebucket <
final_timebucket + 0.5 * self.metric_interval):
next_timebucket = current_timebucket + self.metric_interval
yield self.update_store(store_id, current_timebucket,
next_timebucket)
current_timebucket = next_timebucket

@inlineCallbacks
def update_store(self, store_id, start, end):
user_account_key, store = self.parse_store_id(store_id)
metrics_bundle_store = self.metrics_bundle_store(user_account_key)
metrics_bundles = metrics_bundle_store.metrics_bundles
bundles = yield metrics_bundles.index_lookup('name', store)
if not bundles:
metrics_bundle = metrics_bundles(user_account=user_account_key,
name=store, metrics={})
elif len(bundles) == 1:
metrics_bundle = bundles[0]
else:
log.error("Found multiple bundles for store id %r" % (store_id,))
return

events = yield self.pull_events(store_id, start, end)
for ev in events:
self.apply_event(metrics_bundle, ev)

yield metrics_bundle.save()
yield self.delete_events(store_id, start, end)
if self.update_callback is not None:
yield maybeDeferred(self.update_callback, user_account_key, store)

def apply_event(self, metrics_bundle, event):
method_name = 'apply_%s_event' % event['event']
method = getattr(self, method_name, self.apply_unknown_event)
return method(metrics_bundle, event)

def apply_unknown_event(self, mb, ev):
store_id = self.store_id(mb.user_account_key, mb.name)
log.warn("Unknown event for store_id %r: %r" % (store_id, ev))

def apply_inc_event(self, mb, ev):
mb.metrics[ev.metric] = mb.metrics.get(ev.metric, 0) + ev.value

def apply_set_event(self, mb, ev):
mb.metrics[ev.metric] = ev.value
72 changes: 72 additions & 0 deletions go/apps/jsbox/tests/test_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""Tests for go.apps.jsbox.metrics."""

import time
import json

from twisted.trial.unittest import TestCase
from twisted.internet.defer import inlineCallbacks
from twisted.internet.task import Clock, LoopingCall

from go.vumitools.tests.utils import GoPersistenceMixin
from go.vumitools.api import VumiApi

from go.apps.jsbox.metrics import MetricEvent, MetricStoreManager


class TestMetricEvent(TestCase):
def test_to_json(self):
now = time.time()
ev = MetricEvent(event=MetricEvent.INC, store='default',
metric='my.metric', value=1.5, timestamp=now)
self.assertEqual(ev.to_json(), json.dumps({
'event': MetricEvent.INC, 'store': 'default',
'metric': 'my.metric', 'value': 1.5,
'timestamp': now
}))

def test_from_json(self):
now = time.time()
ev = MetricEvent.from_json(json.dumps({
'event': MetricEvent.INC, 'store': 'default',
'metric': 'my.metric', 'value': 1.5,
'timestamp': now
}))
expected_ev = MetricEvent(event=MetricEvent.INC, store='default',
metric='my.metric', value=1.5, timestamp=now)
self.assertEqual(ev, expected_ev)


class TestMetricStoreManager(GoPersistenceMixin, TestCase):

use_riak = True
metric_interval = 300

@inlineCallbacks
def setUp(self):
yield super(TestMetricStoreManager, self).setUp()
yield self._persist_setUp()
self.clock = Clock()
self.patch(MetricStoreManager, 'looping_call',
self.looping_call)
self.vumi_api = yield VumiApi.from_config_async(self.mk_config({}))
self.user_account = yield self.mk_user(self.vumi_api, u'testuser')
self.msm = MetricStoreManager(self.vumi_api, self.metric_interval)

@inlineCallbacks
def tearDown(self):
yield super(TestMetricStoreManager, self).tearDown()
yield self._persist_tearDown()
yield self.msm.stop()

def looping_call(self, *args, **kw):
lc = LoopingCall(*args, **kw)
lc.clock = self.clock
return lc

def test_store_id(self):
self.assertEqual(self.msm.store_id("12345", "default"),
"12345:default")

def test_parse_store_id(self):
self.assertEqual(self.msm.parse_store_id("12345:default"),
("12345", "default"))