From bbbbaa5e4a94b8b17d8158414cec1a4ef7a94f54 Mon Sep 17 00:00:00 2001 From: Jeremy Thurgood Date: Fri, 3 Jul 2015 11:27:14 +0200 Subject: [PATCH 01/10] First stab at using the new message store. --- go/apps/http_api/tests/test_vumi_app.py | 3 +- go/apps/http_api_nostream/resource.py | 3 +- .../http_api_nostream/tests/test_vumi_app.py | 3 +- go/apps/jsbox/tests/test_message_store.py | 8 +-- go/apps/sequential_send/vumi_app.py | 8 ++- go/apps/tests/helpers.py | 4 +- .../management/commands/go_account_stats.py | 18 ++--- .../commands/go_manage_message_cache.py | 2 +- .../commands/go_migrate_conversations.py | 32 ++++----- go/base/management/commands/go_setup_env.py | 4 +- .../management/commands/go_system_stats.py | 4 +- go/base/tests/test_go_manage_message_cache.py | 12 ++-- .../tests/test_go_migrate_conversations.py | 33 +++++---- go/conversation/tasks.py | 18 ++--- go/conversation/tests.py | 4 +- go/conversation/view_definition.py | 14 ++-- go/vumitools/api.py | 34 +++++++--- go/vumitools/app_worker.py | 68 +++++-------------- go/vumitools/conversation/tests/test_utils.py | 6 +- go/vumitools/conversation/utils.py | 25 +++---- go/vumitools/metrics.py | 4 +- go/vumitools/routing.py | 5 +- go/vumitools/tests/helpers.py | 22 +++--- go/vumitools/tests/test_api.py | 11 +-- go/vumitools/tests/test_middleware.py | 22 +++--- go/vumitools/tests/test_routing.py | 39 +++++------ 26 files changed, 201 insertions(+), 205 deletions(-) diff --git a/go/apps/http_api/tests/test_vumi_app.py b/go/apps/http_api/tests/test_vumi_app.py index 846ec2d67..6ad565ee9 100644 --- a/go/apps/http_api/tests/test_vumi_app.py +++ b/go/apps/http_api/tests/test_vumi_app.py @@ -482,7 +482,8 @@ def test_invalid_in_reply_to_with_missing_conversation_key(self): # create a message with no conversation inbound_msg = self.app_helper.make_inbound('in 1', message_id='msg-1') vumi_api = self.app_helper.vumi_helper.get_vumi_api() - yield vumi_api.mdb.add_inbound_message(inbound_msg) + opms = vumi_api.get_operational_message_store() + yield opms.add_inbound_message(inbound_msg) msg = { 'content': 'foo', diff --git a/go/apps/http_api_nostream/resource.py b/go/apps/http_api_nostream/resource.py index df8d123ee..7291be22f 100644 --- a/go/apps/http_api_nostream/resource.py +++ b/go/apps/http_api_nostream/resource.py @@ -214,7 +214,8 @@ def handle_PUT_in_reply_to(self, request, payload, in_reply_to): user_account = request.getUser() conversation = yield self.get_conversation(user_account) - reply_to = yield self.vumi_api.mdb.get_inbound_message(in_reply_to) + opms = self.vumi_api.get_operational_message_store() + reply_to = yield opms.get_inbound_message(in_reply_to) if reply_to is None: self.client_error_response(request, 'Invalid in_reply_to value') return diff --git a/go/apps/http_api_nostream/tests/test_vumi_app.py b/go/apps/http_api_nostream/tests/test_vumi_app.py index 3840cf362..ca75f3259 100644 --- a/go/apps/http_api_nostream/tests/test_vumi_app.py +++ b/go/apps/http_api_nostream/tests/test_vumi_app.py @@ -642,7 +642,8 @@ def test_invalid_in_reply_to_with_missing_conversation_key(self): # create a message with no (None) conversation inbound_msg = self.app_helper.make_inbound('in 1', message_id='msg-1') vumi_api = self.app_helper.vumi_helper.get_vumi_api() - yield vumi_api.mdb.add_inbound_message(inbound_msg) + opms = vumi_api.get_operational_message_store() + yield opms.add_inbound_message(inbound_msg) msg = { 'content': 'foo', diff --git a/go/apps/jsbox/tests/test_message_store.py b/go/apps/jsbox/tests/test_message_store.py index e29f764c4..5492c8d0b 100644 --- a/go/apps/jsbox/tests/test_message_store.py +++ b/go/apps/jsbox/tests/test_message_store.py @@ -31,24 +31,24 @@ def setUp(self): self.user_helper = yield self.vumi_helper.make_user(u"user") self.app_worker.user_api = self.user_helper.user_api - self.message_store = self.vumi_helper.get_vumi_api().mdb + opms = self.vumi_helper.get_vumi_api().get_operational_message_store() self.conversation = yield self.user_helper.create_conversation( u'jsbox', started=True) # store inbound - yield self.message_store.add_inbound_message( + yield opms.add_inbound_message( self.msg_helper.make_inbound('hello'), batch_ids=[self.conversation.batch.key]) # store outbound outbound_msg = self.msg_helper.make_outbound('hello') - yield self.message_store.add_outbound_message( + yield opms.add_outbound_message( outbound_msg, batch_ids=[self.conversation.batch.key]) # ack outbound event = self.msg_helper.make_ack(outbound_msg) - yield self.message_store.add_event(event) + yield opms.add_event(event) # monkey patch for when no conversation_key is provided self.app_worker.conversation_for_api = lambda *a: self.conversation diff --git a/go/apps/sequential_send/vumi_app.py b/go/apps/sequential_send/vumi_app.py index a99003334..29305af1b 100644 --- a/go/apps/sequential_send/vumi_app.py +++ b/go/apps/sequential_send/vumi_app.py @@ -63,10 +63,10 @@ def consume_user_message(self, message): log.msg('WARNING: Received inbound message: %s' % (message,)) def consume_ack(self, event): - return self.vumi_api.mdb.add_event(event) + return self.vumi_api.get_operational_message_store().add_event(event) def consume_delivery_report(self, event): - return self.vumi_api.mdb.add_event(event) + return self.vumi_api.get_operational_message_store().add_event(event) def _get_last_poll_time(self): return self.redis.get('last_poll_time') @@ -152,7 +152,9 @@ def send_scheduled_messages(self, conv): def send_message(self, batch_id, to_addr, content, msg_options): msg = yield self.send_to( to_addr, content, endpoint='default', **msg_options) - yield self.vumi_api.mdb.add_outbound_message(msg, batch_ids=[batch_id]) + # XXX: Do we need to do this? + opms = self.vumi_api.get_operational_message_store() + yield opms.add_outbound_message(msg, batch_ids=[batch_id]) log.info('Stored outbound %s' % (msg,)) @inlineCallbacks diff --git a/go/apps/tests/helpers.py b/go/apps/tests/helpers.py index d438a9872..90e80ee83 100644 --- a/go/apps/tests/helpers.py +++ b/go/apps/tests/helpers.py @@ -103,7 +103,8 @@ def __init__(self, worker_class, vumi_helper=None, **msg_helper_args): self.vumi_helper = vumi_helper self._app_helper = ApplicationHelper( self._conversation_type(), self.vumi_helper) - self.msg_helper = GoMessageHelper(**msg_helper_args) + self.msg_helper = GoMessageHelper( + vumi_helper=vumi_helper, **msg_helper_args) self.transport_name = self.msg_helper.transport_name self.worker_helper = self.vumi_helper.get_worker_helper( self.transport_name) @@ -141,7 +142,6 @@ def get_app_worker(self, config=None, start=True, extra_worker=False): # Set up our other bits of helper. if not extra_worker: self.vumi_helper.set_vumi_api(worker.vumi_api) - self.msg_helper.mdb = worker.vumi_api.mdb returnValue(worker) @inlineCallbacks diff --git a/go/base/management/commands/go_account_stats.py b/go/base/management/commands/go_account_stats.py index da46a6ac4..e8d9c418f 100644 --- a/go/base/management/commands/go_account_stats.py +++ b/go/base/management/commands/go_account_stats.py @@ -85,12 +85,12 @@ def handle_stats(self, user, api, options): return conv_key = options[0] conversation = api.get_wrapped_conversation(conv_key) - message_store = api.api.mdb + qms = api.api.get_query_message_store() self.out(u'Conversation: %s\n' % (conversation.name,)) batch_key = conversation.batch.key - self.do_batch_key(message_store, batch_key) - self.do_batch_key_breakdown(message_store, batch_key) + self.do_batch_key(qms, batch_key) + self.do_batch_key_breakdown(qms, batch_key) def _count_results(self, index_page): count = 0 @@ -99,11 +99,11 @@ def _count_results(self, index_page): index_page = index_page.next_page() return count - def do_batch_key(self, message_store, batch_key): + def do_batch_key(self, qms, batch_key): in_count = self._count_results( - message_store.batch_inbound_keys_page(batch_key)) + qms.list_batch_inbound_keys(batch_key)) out_count = self._count_results( - message_store.batch_outbound_keys_page(batch_key)) + qms.list_batch_outbound_keys(batch_key)) self.out(u'Total Received in batch %s: %s\n' % (batch_key, in_count)) self.out(u'Total Sent in batch %s: %s\n' % (batch_key, out_count)) @@ -122,10 +122,10 @@ def collect_stats(self, index_page): index_page = index_page.next_page() return per_date, uniques - def do_batch_key_breakdown(self, msg_store, batch_key): - inbound = msg_store.batch_inbound_keys_with_addresses(batch_key) + def do_batch_key_breakdown(self, qms, batch_key): + inbound = qms.list_batch_inbound_keys_with_addresses(batch_key) inbound_per_date, inbound_uniques = self.collect_stats(inbound) - outbound = msg_store.batch_outbound_keys_with_addresses(batch_key) + outbound = qms.list_batch_outbound_keys_with_addresses(batch_key) outbound_per_date, outbound_uniques = self.collect_stats(outbound) all_uniques = inbound_uniques.union(outbound_uniques) diff --git a/go/base/management/commands/go_manage_message_cache.py b/go/base/management/commands/go_manage_message_cache.py index 36706c617..44cc71007 100644 --- a/go/base/management/commands/go_manage_message_cache.py +++ b/go/base/management/commands/go_manage_message_cache.py @@ -110,6 +110,6 @@ def _apply_command(self, func, dry_run=None): def handle_command_rebuild(self, *args, **options): def rebuild(batch_id): - self.vumi_api.mdb.reconcile_cache(batch_id) + self.vumi_api.FIXME_mdb.reconcile_cache(batch_id) self._apply_command(rebuild) diff --git a/go/base/management/commands/go_migrate_conversations.py b/go/base/management/commands/go_migrate_conversations.py index b49489b3b..4ffb3c122 100644 --- a/go/base/management/commands/go_migrate_conversations.py +++ b/go/base/management/commands/go_migrate_conversations.py @@ -107,19 +107,19 @@ def _process_pages(self, index_page, batch_id, get_message, add_message): add_message(get_message(key), batch_ids=[batch_id]) index_page = index_page.next_page() - def _copy_msgs(self, mdb, old_batch, new_batch): + def _copy_msgs(self, FIXME_mdb, old_batch, new_batch): self._process_pages( - mdb.batch_outbound_keys_page(old_batch), new_batch, - mdb.get_outbound_message, mdb.add_outbound_message) + FIXME_mdb.batch_outbound_keys_page(old_batch), new_batch, + FIXME_mdb.get_outbound_message, FIXME_mdb.add_outbound_message) self._process_pages( - mdb.batch_inbound_keys_page(old_batch), new_batch, - mdb.get_inbound_message, mdb.add_inbound_message) + FIXME_mdb.batch_inbound_keys_page(old_batch), new_batch, + FIXME_mdb.get_inbound_message, FIXME_mdb.add_inbound_message) def migrate(self, user_api, conv): conv_batches = conv.batches.keys() - new_batch = user_api.api.mdb.batch_start() + new_batch = user_api.api.get_batch_manager().batch_start() for batch in conv_batches: - self._copy_msgs(user_api.api.mdb, batch, new_batch) + self._copy_msgs(user_api.api.FIXME_mdb, batch, new_batch) conv.batches.clear() conv.batches.add_key(new_batch) conv.save() @@ -133,8 +133,8 @@ class SplitBatches(Migration): " conversation batches to the new batch.") def applies_to(self, user_api, conv): - mdb = user_api.api.mdb - tag_keys = mdb.current_tags.index_keys('current_batch', conv.batch.key) + FIXME_mdb = user_api.api.FIXME_mdb + tag_keys = FIXME_mdb.current_tags.index_keys('current_batch', conv.batch.key) if tag_keys: return True return False @@ -145,18 +145,18 @@ def _process_pages(self, index_page, batch_id, get_message, add_message): add_message(get_message(key), batch_ids=[batch_id]) index_page = index_page.next_page() - def _copy_msgs(self, mdb, old_batch, new_batch): + def _copy_msgs(self, FIXME_mdb, old_batch, new_batch): self._process_pages( - mdb.batch_outbound_keys_page(old_batch), new_batch, - mdb.get_outbound_message, mdb.add_outbound_message) + FIXME_mdb.batch_outbound_keys_page(old_batch), new_batch, + FIXME_mdb.get_outbound_message, FIXME_mdb.add_outbound_message) self._process_pages( - mdb.batch_inbound_keys_page(old_batch), new_batch, - mdb.get_inbound_message, mdb.add_inbound_message) + FIXME_mdb.batch_inbound_keys_page(old_batch), new_batch, + FIXME_mdb.get_inbound_message, FIXME_mdb.add_inbound_message) def migrate(self, user_api, conv): old_batch = conv.batch.key - new_batch = user_api.api.mdb.batch_start() - self._copy_msgs(user_api.api.mdb, old_batch, new_batch) + new_batch = user_api.api.get_batch_manager().batch_start() + self._copy_msgs(user_api.api.FIXME_mdb, old_batch, new_batch) conv.batch.key = new_batch conv.save() diff --git a/go/base/management/commands/go_setup_env.py b/go/base/management/commands/go_setup_env.py index dc7c945fb..d26c3e876 100644 --- a/go/base/management/commands/go_setup_env.py +++ b/go/base/management/commands/go_setup_env.py @@ -310,7 +310,7 @@ def setup_routers(self, user, routers): config = router_info.pop('config', {}) extra_inbound_endpoints = view_def.get_inbound_endpoints(config) extra_outbound_endpoints = view_def.get_outbound_endpoints(config) - batch_id = user_api.api.mdb.batch_start() + batch_id = user_api.api.get_batch_manager().batch_start() # We bypass the usual mechanisms so we can set the key ourselves. router = user_api.router_store.routers( @@ -341,7 +341,7 @@ def setup_conversations(self, user, conversations): conversation_type = conv_info.pop('conversation_type') view_def = get_conversation_view_definition(conversation_type) config = conv_info.pop('config', {}) - batch_id = user_api.api.mdb.batch_start() + batch_id = user_api.api.get_batch_manager().batch_start() # We bypass the usual mechanisms so we can set the key ourselves. conv = user_api.conversation_store.conversations( conversation_key, user_account=user_api.user_account_key, diff --git a/go/base/management/commands/go_system_stats.py b/go/base/management/commands/go_system_stats.py index 12448b2f6..6538f1a6c 100644 --- a/go/base/management/commands/go_system_stats.py +++ b/go/base/management/commands/go_system_stats.py @@ -105,8 +105,8 @@ def handle_command_conversation_types_by_month(self, *args, **options): writer.writerow(row) def _increment_msg_stats(self, conv, stats): - inbound_stats = conv.mdb.batch_inbound_stats(conv.batch.key) - outbound_stats = conv.mdb.batch_outbound_stats(conv.batch.key) + inbound_stats = conv.FIXME_mdb.batch_inbound_stats(conv.batch.key) + outbound_stats = conv.FIXME_mdb.batch_outbound_stats(conv.batch.key) stats["conversations_started"] += 1 stats["inbound_message_count"] += inbound_stats['total'] stats["outbound_message_count"] += outbound_stats['total'] diff --git a/go/base/tests/test_go_manage_message_cache.py b/go/base/tests/test_go_manage_message_cache.py index 6e0267de7..6d9ccae51 100644 --- a/go/base/tests/test_go_manage_message_cache.py +++ b/go/base/tests/test_go_manage_message_cache.py @@ -22,13 +22,13 @@ def clear_batches(self, batches): """ vumi_api = self.vumi_helper.get_vumi_api() for batch_id in batches: - vumi_api.mdb.cache.clear_batch(batch_id) + vumi_api.FIXME_mdb.cache.clear_batch(batch_id) self.assert_batches_cleared(batches) def assert_batches_cleared(self, batches): vumi_api = self.vumi_helper.get_vumi_api() for batch_id in batches: - self.assertFalse(vumi_api.mdb.cache.batch_exists(batch_id)) + self.assertFalse(vumi_api.FIXME_mdb.cache.batch_exists(batch_id)) def count_results(self, index_page): count = 0 @@ -48,14 +48,14 @@ def needs_rebuild(self, batch_id, delta=0.01): vumi_api = self.vumi_helper.get_vumi_api() inbound = float(self.count_results( - vumi_api.mdb.batch_inbound_keys_page(batch_id))) - cached_inbound = vumi_api.mdb.cache.inbound_message_count(batch_id) + vumi_api.FIXME_mdb.batch_inbound_keys_page(batch_id))) + cached_inbound = vumi_api.FIXME_mdb.cache.inbound_message_count(batch_id) if inbound and (abs(cached_inbound - inbound) / inbound) > delta: return True outbound = float(self.count_results( - vumi_api.mdb.batch_outbound_keys_page(batch_id))) - cached_outbound = vumi_api.mdb.cache.outbound_message_count(batch_id) + vumi_api.FIXME_mdb.batch_outbound_keys_page(batch_id))) + cached_outbound = vumi_api.FIXME_mdb.cache.outbound_message_count(batch_id) if outbound and (abs(cached_outbound - outbound) / outbound) > delta: return True diff --git a/go/base/tests/test_go_migrate_conversations.py b/go/base/tests/test_go_migrate_conversations.py index f8f561c40..af6f558f0 100644 --- a/go/base/tests/test_go_migrate_conversations.py +++ b/go/base/tests/test_go_migrate_conversations.py @@ -68,7 +68,8 @@ def mkoldconv(self, create_batch=True, **kwargs): conversation = self.old_conv_model(conversation_id, **conv_fields) if create_batch: - conversation.batches.add_key(self.user_api.api.mdb.batch_start()) + batch_manager = self.user_api.api.get_batch_manager() + conversation.batches.add_key(batch_manager.batch_start()) for group in groups: conversation.add_group(group) @@ -133,9 +134,9 @@ def test_migrate_models(self): self.assertEqual(conv.name, loaded_conv.name) def setup_fix_batches(self, tags=(), num_batches=1): - mdb = self.user_api.api.mdb + FIXME_mdb = self.user_api.api.FIXME_mdb msg_helper = GoMessageHelper() # We can't use .store_*(), so no mdb. - batches = [mdb.batch_start(tags=tags) for i in range(num_batches)] + batches = [FIXME_mdb.batch_start(tags=tags) for i in range(num_batches)] conv = self.mkoldconv( create_batch=False, conversation_type=u'dummy_type', @@ -145,9 +146,9 @@ def setup_fix_batches(self, tags=(), num_batches=1): for i, batch_id in enumerate(batches): conv.batches.add_key(batch_id) msg1 = msg_helper.make_inbound("in", message_id=u"msg-%d" % i) - mdb.add_inbound_message(msg1, batch_ids=[batch_id]) + FIXME_mdb.add_inbound_message(msg1, batch_ids=[batch_id]) msg2 = msg_helper.make_outbound("out", message_id=u"msg-%d" % i) - mdb.add_outbound_message(msg2, batch_ids=[batch_id]) + FIXME_mdb.add_outbound_message(msg2, batch_ids=[batch_id]) conv.save() @@ -160,16 +161,20 @@ def assert_batches_fixed(self, old_conv): new_batch = new_conv.batch.key self.assertTrue(new_batch not in old_batches) - mdb = self.user_api.api.mdb - old_out, old_in = set(), set() + FIXME_mdb = self.user_api.api.FIXME_mdb + old_outbound, old_inbound = set(), set() for batch in old_batches: - collect_all_results(mdb.batch_outbound_keys_page(batch), old_out) - collect_all_results(mdb.batch_inbound_keys_page(batch), old_in) - - new_out = collect_all_results(mdb.batch_outbound_keys_page(new_batch)) - new_in = collect_all_results(mdb.batch_inbound_keys_page(new_batch)) - self.assertEqual(new_out, old_out) - self.assertEqual(new_in, old_in) + collect_all_results( + FIXME_mdb.batch_outbound_keys_page(batch), old_outbound) + collect_all_results( + FIXME_mdb.batch_inbound_keys_page(batch), old_inbound) + + new_outbound = collect_all_results( + FIXME_mdb.batch_outbound_keys_page(new_batch)) + new_inbound = collect_all_results( + FIXME_mdb.batch_inbound_keys_page(new_batch)) + self.assertEqual(new_outbound, old_outbound) + self.assertEqual(new_inbound, old_inbound) def _check_fix_batches(self, migration_name, tags, num_batches, migrated): conv = self.setup_fix_batches(tags, num_batches) diff --git a/go/conversation/tasks.py b/go/conversation/tasks.py index 650bfbf4a..b798edeeb 100644 --- a/go/conversation/tasks.py +++ b/go/conversation/tasks.py @@ -51,8 +51,10 @@ def row_for_inbound_message(message): return row -def row_for_outbound_message(message, mdb): - events = sorted(mdb.get_events_for_message(message['message_id']), +def row_for_outbound_message(message, qms): + event_keys = qms.list_message_event_keys(message["message_id"]) + # NOTE: We assume only one page of results here. + events = sorted((qms.get_event(key) for key in event_keys), key=lambda event: event['timestamp'], reverse=True) row = dict((field, unicode(message.payload[field])) @@ -89,13 +91,13 @@ def load_messages_in_chunks(conversation, direction='inbound', modified on the fly. """ if direction == 'inbound': - index_page = conversation.mdb.batch_inbound_keys_page( + index_page = conversation.qms.list_batch_inbound_keys( conversation.batch.key) - get_msg = conversation.mdb.get_inbound_message + get_msg = conversation.qms.get_inbound_message elif direction == 'outbound': - index_page = conversation.mdb.batch_outbound_keys_page( + index_page = conversation.qms.list_batch_outbound_keys( conversation.batch.key) - get_msg = conversation.mdb.get_outbound_message + get_msg = conversation.qms.get_outbound_message else: raise ValueError('Invalid value (%s) received for `direction`. ' 'Only `inbound` and `outbound` are allowed.' % @@ -149,7 +151,7 @@ def export_conversation_messages_unsorted(account_key, conversation_key): for messages in load_messages_in_chunks(conversation, 'outbound'): for message in messages: - mdb = user_api.api.mdb - writer.writerow(row_for_outbound_message(message, mdb)) + qms = user_api.api.get_query_message_store() + writer.writerow(row_for_outbound_message(message, qms)) email_export(user_profile, conversation, io) diff --git a/go/conversation/tests.py b/go/conversation/tests.py index 04a3cf3de..3ff6215f7 100644 --- a/go/conversation/tests.py +++ b/go/conversation/tests.py @@ -1136,11 +1136,11 @@ def test_export_conversation_messages_unsorted(self): reader = csv.DictReader(fp) message_ids = [row['message_id'] for row in reader] all_keys = set() - index_page = conv.mdb.batch_inbound_keys_page(conv.batch.key) + index_page = conv.qms.list_batch_inbound_keys(conv.batch.key) while index_page is not None: all_keys.update(index_page) index_page = index_page.next_page() - index_page = conv.mdb.batch_outbound_keys_page(conv.batch.key) + index_page = conv.qms.list_batch_outbound_keys(conv.batch.key) while index_page is not None: all_keys.update(index_page) index_page = index_page.next_page() diff --git a/go/conversation/view_definition.py b/go/conversation/view_definition.py index 1458f9f31..19003334a 100644 --- a/go/conversation/view_definition.py +++ b/go/conversation/view_definition.py @@ -252,13 +252,14 @@ def add_event_status(msg): msg.event_status = "-" return msg msg.event_status = u"Sending" - get_event_info = conversation.mdb.message_event_keys_with_statuses + qms = conversation.qms + get_event_info = qms.list_message_event_keys_with_statuses for event_id, _, event_type in get_event_info(msg["message_id"]): if event_type == u"ack": msg.event_status = u"Accepted" break if event_type == u"nack": - event = conversation.mdb.get_event(event_id) + event = qms.get_event(event_id) msg.event_status = u"Rejected: %s" % ( event["nack_reason"],) break @@ -307,7 +308,8 @@ def get_sent_messages(start, stop): @staticmethod def send_one_off_reply(user_api, conversation, in_reply_to, content): - inbound_message = user_api.api.mdb.get_inbound_message(in_reply_to) + qms = user_api.api.get_query_message_store() + inbound_message = qms.get_inbound_message(in_reply_to) if inbound_message is None: logger.info('Replying to an unknown message: %s' % (in_reply_to,)) @@ -602,9 +604,9 @@ def get_aggregate_counts(self, conv, direction): Get aggregated total count of messages handled bucketed per day. """ message_callback = { - 'inbound': conv.mdb.batch_inbound_keys_with_timestamps, - 'outbound': conv.mdb.batch_outbound_keys_with_timestamps, - }.get(direction, conv.mdb.batch_inbound_keys_with_timestamps) + 'inbound': conv.qms.list_batch_inbound_keys_with_timestamps, + 'outbound': conv.qms.list_batch_outbound_keys_with_timestamps, + }.get(direction, conv.qms.list_batch_inbound_keys_with_timestamps) aggregates = defaultdict(int) index_page = message_callback(conv.batch.key) diff --git a/go/vumitools/api.py b/go/vumitools/api.py index 4e468458f..dea9b0e02 100644 --- a/go/vumitools/api.py +++ b/go/vumitools/api.py @@ -152,7 +152,7 @@ def get_router(self, router_key): @Manager.calls_manager def get_channel(self, tag): tagpool_meta = yield self.api.tpm.get_metadata(tag[0]) - tag_info = yield self.api.mdb.get_tag_info(tag) + tag_info = yield self.api.get_batch_manager().get_tag_info(tag) channel = yield self.channel_store.get_channel_by_tag( tag, tagpool_meta, tag_info.current_batch.key) returnValue(channel) @@ -277,7 +277,7 @@ def list_groups(self): def new_conversation(self, conversation_type, name, description, config, batch_id=None, **fields): if not batch_id: - batch_id = yield self.api.mdb.batch_start( + batch_id = yield self.api.get_batch_manager().batch_start( tags=[], user_account=self.user_account_key) conv = yield self.conversation_store.new_conversation( conversation_type, name, description, config, batch_id, **fields) @@ -287,7 +287,7 @@ def new_conversation(self, conversation_type, name, description, config, def new_router(self, router_type, name, description, config, batch_id=None, **fields): if not batch_id: - batch_id = yield self.api.mdb.batch_start( + batch_id = yield self.api.get_batch_manager().batch_start( tags=[], user_account=self.user_account_key) router = yield self.router_store.new_router( router_type, name, description, config, batch_id, **fields) @@ -354,9 +354,10 @@ def _update_tag_data_for_acquire(self, user_account, tag): # The batch we create here gets added to the tag_info and we can fish # it out later. When we replace this with proper channel objects we can # stash it there like we do with conversations and routers. - yield self.api.mdb.batch_start([tag], user_account=user_account.key) + yield self.api.get_batch_manager().batch_start( + [tag], user_account=user_account.key) user_account.tags.append(tag) - tag_info = yield self.api.mdb.get_tag_info(tag) + tag_info = yield self.api.get_batch_manager().get_tag_info(tag) tag_info.metadata['user_account'] = user_account.key.decode('utf-8') yield tag_info.save() yield user_account.save() @@ -424,7 +425,7 @@ def release_tag(self, tag): except ValueError, e: log.error("Tag not allocated to account: %s" % (tag,), e) else: - tag_info = yield self.api.mdb.get_tag_info(tag) + tag_info = yield self.api.get_batch_manager().get_tag_info(tag) if 'user_account' in tag_info.metadata: del tag_info.metadata['user_account'] yield tag_info.save() @@ -432,7 +433,8 @@ def release_tag(self, tag): # We should probably refactor the message store to make this # less clumsy. if tag_info.current_batch.key: - yield self.api.mdb.batch_done(tag_info.current_batch.key) + yield self.api.get_batch_manager().batch_done( + tag_info.current_batch.key) # Clean up routing table entries. routing_table = yield self.get_routing_table(user_account) @@ -535,8 +537,9 @@ def __init__(self, manager, redis, sender=None, metric_publisher=None): self.redis = redis self.tpm = TagpoolManager(self.redis.sub_manager('tagpool_store')) - self.mdb = MessageStore( + self.FIXME_mdb = MessageStore( self.manager, self.redis.sub_manager('message_store')) + self.mdb = None self.account_store = AccountStore(self.manager) self.token_manager = TokenManager( self.redis.sub_manager('token_manager')) @@ -585,6 +588,21 @@ def from_config_async(cls, config, command_publisher=None, sender = AsyncMessageSender(command_publisher) returnValue(cls(manager, redis, sender, metric_publisher)) + def get_batch_manager(self): + from vumi_message_store.message_store import MessageStoreBatchManager + return MessageStoreBatchManager( + self.manager, self.redis.sub_manager('message_store')) + + def get_operational_message_store(self): + from vumi_message_store.message_store import OperationalMessageStore + return OperationalMessageStore( + self.manager, self.redis.sub_manager('message_store')) + + def get_query_message_store(self): + from vumi_message_store.message_store import QueryMessageStore + return QueryMessageStore( + self.manager, self.redis.sub_manager('message_store')) + @Manager.calls_manager def user_exists(self, user_account_key): """ diff --git a/go/vumitools/app_worker.py b/go/vumitools/app_worker.py index 890a68758..5fddb32e2 100644 --- a/go/vumitools/app_worker.py +++ b/go/vumitools/app_worker.py @@ -203,7 +203,8 @@ def _find_outboundmessage_for_event(self, event): log.error('Received event without user_message_id: %s' % (event,)) return - msg = yield self.vumi_api.mdb.outbound_messages.load(user_message_id) + opms = self.vumi_api.get_operational_message_store() + msg = yield opms.get_outbound_message(user_message_id) if msg is None: log.error('Unable to find message for event: %s' % (event,)) @@ -211,53 +212,25 @@ def _find_outboundmessage_for_event(self, event): _EVENT_OUTBOUND_CACHE_KEY = "outbound_message_json" - def _get_outbound_from_event_cache( - self, event, key=_EVENT_OUTBOUND_CACHE_KEY): - """ Retrieve outbound message from the cache on an event. - - :type event: - TransportEvent - :param event: - Event to look up the outbound message on. - :param str key: - Cache key to look up the message under. Defaults to - _EVENT_OUTBOUND_CACHE_KEY. - - :returns: - A tuple ``(hit, outbound_message)``. ``hit`` is ``True`` if - the message was found in the cache and ``False`` if there was - a cache miss. + def _get_outbound_from_event_cache(self, event): """ - if key not in event.cache: + Retrieve outbound message from the cache on an event. + """ + if self._EVENT_OUTBOUND_CACHE_KEY not in event.cache: return False, None - outbound_json = event.cache[key] + outbound_json = event.cache[self._EVENT_OUTBOUND_CACHE_KEY] if outbound_json is None: return True, None return True, TransportUserMessage.from_json(outbound_json) - def _store_outbound_in_event_cache( - self, event, outbound, key=_EVENT_OUTBOUND_CACHE_KEY): - """ Store an outbound message in the cache on an event. - - :type event: - TransportEvent - :param event: - Event to look store the outbound message on. - :type outbound: - TransportUserMessage - :param outbound: - Outbound message to cache. - :param str key: - Cache key to store the message under. Defaults to - _EVENT_OUTBOUND_CACHE_KEY. - - :returns: - None + def _store_outbound_in_event_cache(self, event, outbound): + """ + Store an outbound message in the cache on an event. """ if outbound is None: - event.cache[key] = None + event.cache[self._EVENT_OUTBOUND_CACHE_KEY] = None else: - event.cache[key] = outbound.to_json() + event.cache[self._EVENT_OUTBOUND_CACHE_KEY] = outbound.to_json() @inlineCallbacks def find_message_for_event(self, event): @@ -265,30 +238,24 @@ def find_message_for_event(self, event): if hit: returnValue(outbound_msg) - outbound = yield self._find_outboundmessage_for_event(event) - outbound_msg = outbound.msg if outbound is not None else None + outbound_msg = yield self._find_outboundmessage_for_event(event) self._store_outbound_in_event_cache(event, outbound_msg) returnValue(outbound_msg) @inlineCallbacks - def _find_inboundmessage_for_reply(self, reply): + def find_message_for_reply(self, reply): user_message_id = reply.get('in_reply_to') if user_message_id is None: log.error('Received reply without in_reply_to: %s' % (reply,)) return - msg = yield self.vumi_api.mdb.inbound_messages.load(user_message_id) + opms = self.vumi_api.get_operational_message_store() + msg = yield opms.get_inbound_message(user_message_id) if msg is None: log.error('Unable to find message for reply: %s' % (reply,)) returnValue(msg) - @inlineCallbacks - def find_message_for_reply(self, reply): - inbound_message = yield self._find_inboundmessage_for_reply(reply) - if inbound_message: - returnValue(inbound_message.msg) - def event_for_message(self, message, event_type, content): msg_mdh = self.get_metadata_helper(message) return VumiApiEvent.event(msg_mdh.get_account_key(), @@ -423,7 +390,8 @@ def process_command_send_message(self, cmd_id, user_account_key, in_reply_to = msg_options.pop('in_reply_to', None) self.add_conv_to_msg_options(conv, msg_options) if in_reply_to: - msg = yield self.vumi_api.mdb.get_inbound_message(in_reply_to) + opms = self.vumi_api.get_operational_message_store() + msg = yield opms.get_inbound_message(in_reply_to) if msg: yield self.reply_to( msg, content, diff --git a/go/vumitools/conversation/tests/test_utils.py b/go/vumitools/conversation/tests/test_utils.py index b8fc38644..110cecc34 100644 --- a/go/vumitools/conversation/tests/test_utils.py +++ b/go/vumitools/conversation/tests/test_utils.py @@ -94,9 +94,10 @@ def test_count_outbound_uniques(self): def test_collect_messages(self): yield self.conv.start() created_msgs = yield self.msg_helper.add_inbound_to_conv(self.conv, 5) + qms = self.vumi_helper.get_vumi_api().get_query_message_store() collected_msgs = yield self.conv.collect_messages( [msg['message_id'] for msg in created_msgs], - self.conv.mdb.get_inbound_message, + qms.get_inbound_message, include_sensitive=False, scrubber=lambda msg: msg) self.assertEqual( [msg['message_id'] for msg in collected_msgs], @@ -106,9 +107,10 @@ def test_collect_messages(self): def test_collect_messages_with_unknown_key(self): yield self.conv.start() created_msgs = yield self.msg_helper.add_inbound_to_conv(self.conv, 5) + qms = self.vumi_helper.get_vumi_api().get_query_message_store() collected_msgs = yield self.conv.collect_messages( [msg['message_id'] for msg in created_msgs] + [u'unknown-key'], - self.conv.mdb.get_inbound_message, + qms.get_inbound_message, include_sensitive=False, scrubber=lambda msg: msg) self.assertEqual( [msg['message_id'] for msg in collected_msgs], diff --git a/go/vumitools/conversation/utils.py b/go/vumitools/conversation/utils.py index 08df56f04..174e7f234 100644 --- a/go/vumitools/conversation/utils.py +++ b/go/vumitools/conversation/utils.py @@ -20,10 +20,11 @@ def __init__(self, conversation, user_api): self.c = conversation self.user_api = user_api self.api = user_api.api - self.mdb = self.api.mdb + self.qms = self.api.get_query_message_store() self.manager = self.c.manager self.base_manager = self.api.manager self._channels = None + self.FIXME_mdb = self.api.FIXME_mdb @Manager.calls_manager def stop_conversation(self): @@ -115,7 +116,7 @@ def get_progress_status(self): 'delivery_report_delivered', 'delivery_report_failed', 'delivery_report_pending')) - batch_status = yield self.mdb.batch_status(self.batch.key) + batch_status = yield self.FIXME_mdb.batch_status(self.batch.key) for k, v in batch_status.items(): k = k.replace('.', '_') statuses[k] += v @@ -188,28 +189,28 @@ def count_inbound_messages(self): Count the total number of replies received. This is pulled from the cache. """ - return self.mdb.cache.count_inbound_message_keys(self.batch.key) + return self.FIXME_mdb.cache.count_inbound_message_keys(self.batch.key) def count_outbound_messages(self): """ Count the total number of messages sent. This is pulled from the cache. """ - return self.mdb.cache.count_outbound_message_keys(self.batch.key) + return self.FIXME_mdb.cache.count_outbound_message_keys(self.batch.key) def count_inbound_uniques(self): """ Count the total unique `from_addr` values seen for the batch_key. Pulled from the cache. """ - return self.mdb.cache.count_from_addrs(self.batch.key) + return self.FIXME_mdb.cache.count_from_addrs(self.batch.key) def count_outbound_uniques(self): """ Count the total unique `to_addr` values seen for the batch_key. Pulled from the cache. """ - return self.mdb.cache.count_to_addrs(self.batch.key) + return self.FIXME_mdb.cache.count_to_addrs(self.batch.key) @Manager.calls_manager def collect_messages(self, keys, get_msg, include_sensitive, scrubber): @@ -290,11 +291,11 @@ def received_messages_in_cache(self, start=0, limit=100, scrubber = scrubber or (lambda msg: msg) # Redis counts from zero, so we - 1 on the limit. - keys = yield self.mdb.cache.get_inbound_message_keys( + keys = yield self.FIXME_mdb.cache.get_inbound_message_keys( self.batch.key, start, limit - 1) replies = yield self.collect_messages( - keys, self.mdb.get_inbound_message, include_sensitive, scrubber) + keys, self.FIXME_mdb.get_inbound_message, include_sensitive, scrubber) # Preserve order returnValue( @@ -332,11 +333,11 @@ def sent_messages_in_cache(self, start=0, limit=100, """ scrubber = scrubber or (lambda msg: msg) - keys = yield self.mdb.cache.get_outbound_message_keys( + keys = yield self.FIXME_mdb.cache.get_outbound_message_keys( self.batch.key, start, limit - 1) sent_messages = yield self.collect_messages( - keys, self.mdb.get_outbound_message, include_sensitive, scrubber) + keys, self.FIXME_mdb.get_outbound_message, include_sensitive, scrubber) # Preserve order returnValue( @@ -382,7 +383,7 @@ def get_inbound_throughput(self, sample_time=300): Calculate how many inbound messages per minute we've been doing on average. """ - inbounds = yield self.mdb.cache.get_inbound_message_keys( + inbounds = yield self.FIXME_mdb.cache.get_inbound_message_keys( self.batch.key, with_timestamp=True) if not inbounds: returnValue(0.0) @@ -396,7 +397,7 @@ def get_outbound_throughput(self, sample_time=300): Calculate how many outbound messages per minute we've been doing on average. """ - outbounds = yield self.mdb.cache.get_outbound_message_keys( + outbounds = yield self.FIXME_mdb.cache.get_outbound_message_keys( self.batch.key, with_timestamp=True) if not outbounds: returnValue(0.0) diff --git a/go/vumitools/metrics.py b/go/vumitools/metrics.py index de64bc29d..421271a14 100644 --- a/go/vumitools/metrics.py +++ b/go/vumitools/metrics.py @@ -66,7 +66,7 @@ class MessagesSentMetric(ConversationMetric): def get_value(self, user_api): batch_id = self.conv.batch.key - return user_api.api.mdb.cache.count_outbound_message_keys(batch_id) + return user_api.api.FIXME_mdb.cache.count_outbound_message_keys(batch_id) class MessagesReceivedMetric(ConversationMetric): @@ -74,7 +74,7 @@ class MessagesReceivedMetric(ConversationMetric): def get_value(self, user_api): batch_id = self.conv.batch.key - return user_api.api.mdb.cache.count_inbound_message_keys(batch_id) + return user_api.api.FIXME_mdb.cache.count_inbound_message_keys(batch_id) def get_account_metric_prefix(account_key, store): diff --git a/go/vumitools/routing.py b/go/vumitools/routing.py index 113d379fd..9c94a2c85 100644 --- a/go/vumitools/routing.py +++ b/go/vumitools/routing.py @@ -344,6 +344,7 @@ def setup_dispatcher(self): self.transport_connectors -= self.billing_connectors self.optouts = OptOutHelper(self.vumi_api, config.optouts) + self.opms = self.vumi_api.get_operational_message_store() @inlineCallbacks def teardown_dispatcher(self): @@ -381,7 +382,7 @@ def get_config(self, msg): if msg_mdh.has_user_account(): user_account_key = msg_mdh.get_account_key() elif msg_mdh.tag is not None: - tag_info = yield self.vumi_api.mdb.get_tag_info(msg_mdh.tag) + tag_info = yield self.opms.get_tag_info(msg_mdh.tag) user_account_key = tag_info.metadata['user_account'] if user_account_key is None: raise UnownedTagError( @@ -658,7 +659,7 @@ def publish_outbound(self, msg, connector_name, endpoint): """ if connector_name in self.transport_connectors: if self.get_static_config().store_messages_to_transports: - yield self.vumi_api.mdb.add_outbound_message(msg) + yield self.opms.add_outbound_message(msg) yield super(RoutingTableDispatcher, self).publish_outbound( msg, connector_name, endpoint) diff --git a/go/vumitools/tests/helpers.py b/go/vumitools/tests/helpers.py index 4f1be1909..778e5e01d 100644 --- a/go/vumitools/tests/helpers.py +++ b/go/vumitools/tests/helpers.py @@ -52,9 +52,6 @@ def __init__(self, vumi_helper=None, **kw): self._msg_helper = MessageHelper(**kw) self.transport_name = self._msg_helper.transport_name self._vumi_helper = vumi_helper - self.mdb = None - if self._vumi_helper is not None: - self.mdb = self._vumi_helper.get_vumi_api().mdb def setup(self): pass @@ -62,6 +59,11 @@ def setup(self): def cleanup(self): return self._msg_helper.cleanup() + def _get_opms(self): + if self._vumi_helper is None: + raise ValueError("No message store provided.") + return self._vumi_helper.get_vumi_api().get_operational_message_store() + @proxyable def add_router_metadata(self, msg, router): msg.payload.setdefault('helper_metadata', {}) @@ -137,21 +139,17 @@ def make_reply(self, msg, content, **kw): @proxyable def store_inbound(self, conv, msg): - if self.mdb is None: - raise ValueError("No message store provided.") - return self.mdb.add_inbound_message(msg, batch_ids=[conv.batch.key]) + return self._get_opms().add_inbound_message( + msg, batch_ids=[conv.batch.key]) @proxyable def store_outbound(self, conv, msg): - if self.mdb is None: - raise ValueError("No message store provided.") - return self.mdb.add_outbound_message(msg, batch_ids=[conv.batch.key]) + return self._get_opms().add_outbound_message( + msg, batch_ids=[conv.batch.key]) @proxyable def store_event(self, event): - if self.mdb is None: - raise ValueError("No message store provided.") - return self.mdb.add_event(event) + return self._get_opms().add_event(event) @proxyable def make_stored_inbound(self, conv, content, **kw): diff --git a/go/vumitools/tests/test_api.py b/go/vumitools/tests/test_api.py index 83bcf7f36..f540d01d5 100644 --- a/go/vumitools/tests/test_api.py +++ b/go/vumitools/tests/test_api.py @@ -164,13 +164,14 @@ def assert_account_tags(self, expected): @inlineCallbacks def test_declare_acquire_and_release_tags(self): + batch_manager = self.vumi_api.get_batch_manager() tag1, tag2 = ("poolA", "tag1"), ("poolA", "tag2") yield self.vumi_api.tpm.declare_tags([tag1, tag2]) yield self.user_helper.add_tagpool_permission(u"poolA") yield self.user_helper.add_tagpool_permission(u"poolB") yield self.assert_account_tags([]) - tag2_info = yield self.vumi_api.mdb.get_tag_info(tag2) + tag2_info = yield batch_manager.get_tag_info(tag2) self.assertEqual(tag2_info.metadata['user_account'], None) self.assertEqual(tag2_info.current_batch.key, None) self.assertEqual((yield self.user_api.acquire_tag(u"poolA")), tag1) @@ -178,14 +179,14 @@ def test_declare_acquire_and_release_tags(self): self.assertEqual((yield self.user_api.acquire_tag(u"poolA")), None) self.assertEqual((yield self.user_api.acquire_tag(u"poolB")), None) yield self.assert_account_tags([list(tag1), list(tag2)]) - tag2_info = yield self.vumi_api.mdb.get_tag_info(tag2) + tag2_info = yield batch_manager.get_tag_info(tag2) self.assertEqual(tag2_info.metadata['user_account'], self.user_api.user_account_key) self.assertNotEqual(tag2_info.current_batch.key, None) yield self.user_api.release_tag(tag2) yield self.assert_account_tags([list(tag1)]) - tag2_info = yield self.vumi_api.mdb.get_tag_info(tag2) + tag2_info = yield batch_manager.get_tag_info(tag2) self.assertEqual(tag2_info.metadata['user_account'], None) self.assertEqual(tag2_info.current_batch.key, None) self.assertEqual((yield self.user_api.acquire_tag(u"poolA")), tag2) @@ -198,7 +199,7 @@ def test_release_tag_without_owner(self): yield self.user_helper.add_tagpool_permission(u"pool1") yield self.user_api.acquire_specific_tag(tag) - tag_info = yield self.vumi_api.mdb.get_tag_info(tag) + tag_info = yield self.vumi_api.get_batch_manager().get_tag_info(tag) del tag_info.metadata['user_account'] yield tag_info.save() @@ -211,7 +212,7 @@ def test_batch_id_for_specific_tag(self): [tag] = yield self.vumi_helper.setup_tagpool(u"poolA", [u"tag1"]) yield self.user_helper.add_tagpool_permission(u"poolA") yield self.user_api.acquire_specific_tag(tag) - tag_info = yield self.vumi_api.mdb.get_tag_info(tag) + tag_info = yield self.vumi_api.get_batch_manager().get_tag_info(tag) self.assertNotEqual(tag_info.current_batch.key, None) def _set_routing_table(self, user, entries): diff --git a/go/vumitools/tests/test_middleware.py b/go/vumitools/tests/test_middleware.py index a70f884ba..29f2bef8f 100644 --- a/go/vumitools/tests/test_middleware.py +++ b/go/vumitools/tests/test_middleware.py @@ -831,21 +831,20 @@ def setUp(self): self.mw_helper = self.add_helper( MiddlewareHelper(ConversationStoringMiddleware)) yield self.mw_helper.setup_vumi_api() + self.qms = self.mw_helper.get_vumi_api().get_query_message_store() self.user_helper = yield self.mw_helper.make_user(u'user') self.conv = yield self.user_helper.create_conversation(u'dummy_conv') @inlineCallbacks def assert_stored_inbound(self, msgs): - mdb = self.mw_helper.get_vumi_api().mdb - index_page = yield mdb.batch_inbound_keys_page(self.conv.batch.key) - ids = yield collect_all_results(index_page) + page = yield self.qms.list_batch_inbound_keys(self.conv.batch.key) + ids = yield collect_all_results(page) self.assertEqual(sorted(ids), sorted(m['message_id'] for m in msgs)) @inlineCallbacks def assert_stored_outbound(self, msgs): - mdb = self.mw_helper.get_vumi_api().mdb - index_page = yield mdb.batch_outbound_keys_page(self.conv.batch.key) - ids = yield collect_all_results(index_page) + page = yield self.qms.list_batch_outbound_keys(self.conv.batch.key) + ids = yield collect_all_results(page) self.assertEqual(sorted(ids), sorted(m['message_id'] for m in msgs)) @inlineCallbacks @@ -946,21 +945,20 @@ def setUp(self): self.mw_helper = self.add_helper( MiddlewareHelper(RouterStoringMiddleware)) yield self.mw_helper.setup_vumi_api() + self.qms = self.mw_helper.get_vumi_api().get_query_message_store() self.user_helper = yield self.mw_helper.make_user(u'user') self.router = yield self.user_helper.create_router(u'dummy_conv') @inlineCallbacks def assert_stored_inbound(self, msgs): - mdb = self.mw_helper.get_vumi_api().mdb - index_page = yield mdb.batch_inbound_keys_page(self.router.batch.key) - ids = yield collect_all_results(index_page) + page = yield self.qms.list_batch_inbound_keys(self.router.batch.key) + ids = yield collect_all_results(page) self.assertEqual(sorted(ids), sorted(m['message_id'] for m in msgs)) @inlineCallbacks def assert_stored_outbound(self, msgs): - mdb = self.mw_helper.get_vumi_api().mdb - index_page = yield mdb.batch_outbound_keys_page(self.router.batch.key) - ids = yield collect_all_results(index_page) + page = yield self.qms.list_batch_outbound_keys(self.router.batch.key) + ids = yield collect_all_results(page) self.assertEqual(sorted(ids), sorted(m['message_id'] for m in msgs)) @inlineCallbacks diff --git a/go/vumitools/tests/test_routing.py b/go/vumitools/tests/test_routing.py index 3c88e6407..088f557e2 100644 --- a/go/vumitools/tests/test_routing.py +++ b/go/vumitools/tests/test_routing.py @@ -301,6 +301,8 @@ def setUp(self): # We use vumi's MessageHelper here rather than our own GoMessageHelper # because we want to handle all the Go metadata stuff ourselves. self.msg_helper = self.add_helper(MessageHelper()) + vumi_api = self.vumi_helper.get_vumi_api() + self.opms = vumi_api.get_operational_message_store() def get_routing_table(self): return RoutingTable({ @@ -405,7 +407,7 @@ def assert_events_equal(self, events1, events2, clear_caches=True): def mk_msg_reply(self, **kw): "Create and store an outbound message, then create a reply for it." msg = self.with_md(self.msg_helper.make_inbound("foo"), **kw) - yield self.vumi_helper.get_vumi_api().mdb.add_inbound_message(msg) + yield self.opms.add_inbound_message(msg) reply = msg.reply(content="Reply") returnValue((msg, reply)) @@ -413,7 +415,7 @@ def mk_msg_reply(self, **kw): def mk_msg_ack(self, **kw): "Create and store an outbound message, then create an ack for it." msg = self.with_md(self.msg_helper.make_outbound("foo"), **kw) - yield self.vumi_helper.get_vumi_api().mdb.add_outbound_message(msg) + yield self.opms.add_outbound_message(msg) ack = self.msg_helper.make_ack(msg) returnValue((msg, ack)) @@ -796,8 +798,7 @@ def test_outbound_message_gets_stored_before_transport(self): msg = self.with_md( self.msg_helper.make_outbound("foo"), conv=('app1', 'conv1')) - mdb = self.vumi_helper.get_vumi_api().mdb - stored_msg = yield mdb.get_outbound_message(msg["message_id"]) + stored_msg = yield self.opms.get_outbound_message(msg["message_id"]) self.assertEqual(stored_msg, None) yield self.dispatch_outbound(msg, 'app1') @@ -808,7 +809,7 @@ def test_outbound_message_gets_stored_before_transport(self): ]) self.assertEqual([msg], self.get_dispatched_outbound('sphex')) - stored_msg = yield mdb.get_outbound_message(msg["message_id"]) + stored_msg = yield self.opms.get_outbound_message(msg["message_id"]) self.assertEqual(stored_msg, msg) @inlineCallbacks @@ -821,8 +822,7 @@ def test_outbound_message_does_not_get_stored_if_disabled(self): msg = self.with_md( self.msg_helper.make_outbound("foo"), conv=('app1', 'conv1')) - mdb = self.vumi_helper.get_vumi_api().mdb - stored_msg = yield mdb.get_outbound_message(msg["message_id"]) + stored_msg = yield self.opms.get_outbound_message(msg["message_id"]) self.assertEqual(stored_msg, None) yield self.dispatch_outbound(msg, 'app1') @@ -833,7 +833,7 @@ def test_outbound_message_does_not_get_stored_if_disabled(self): ]) self.assertEqual([msg], self.get_dispatched_outbound('sphex')) - stored_msg = yield mdb.get_outbound_message(msg["message_id"]) + stored_msg = yield self.opms.get_outbound_message(msg["message_id"]) self.assertEqual(stored_msg, None) @inlineCallbacks @@ -846,8 +846,7 @@ def test_outbound_message_does_not_get_stored_before_router(self): self.msg_helper.make_outbound("foo"), conv=('app1', 'conv1'), endpoint="router") - mdb = self.vumi_helper.get_vumi_api().mdb - stored_msg = yield mdb.get_outbound_message(msg["message_id"]) + stored_msg = yield self.opms.get_outbound_message(msg["message_id"]) self.assertEqual(stored_msg, None) yield self.dispatch_outbound(msg, 'app1') @@ -859,7 +858,7 @@ def test_outbound_message_does_not_get_stored_before_router(self): ]) self.assertEqual([msg], self.get_dispatched_outbound('router_ro')) - stored_msg = yield mdb.get_outbound_message(msg["message_id"]) + stored_msg = yield self.opms.get_outbound_message(msg["message_id"]) self.assertEqual(stored_msg, None) @@ -1121,8 +1120,7 @@ def test_outbound_message_gets_stored_before_transport(self): self.msg_helper.make_outbound("foo"), tag=("pool1", "1234"), conv=('app1', 'conv1'), is_paid=True) - mdb = self.vumi_helper.get_vumi_api().mdb - stored_msg = yield mdb.get_outbound_message(msg["message_id"]) + stored_msg = yield self.opms.get_outbound_message(msg["message_id"]) self.assertEqual(stored_msg, None) yield self.dispatch_outbound(msg, 'billing_dispatcher_ri') @@ -1136,7 +1134,7 @@ def test_outbound_message_gets_stored_before_transport(self): self.with_md(msg, hops=hops) self.assertEqual([msg], self.get_dispatched_outbound('sphex')) - stored_msg = yield mdb.get_outbound_message(msg["message_id"]) + stored_msg = yield self.opms.get_outbound_message(msg["message_id"]) self.assertEqual(stored_msg, msg) @inlineCallbacks @@ -1150,8 +1148,7 @@ def test_outbound_message_does_not_get_stored_if_disabled(self): self.msg_helper.make_outbound("foo"), tag=("pool1", "1234"), conv=('app1', 'conv1'), is_paid=True) - mdb = self.vumi_helper.get_vumi_api().mdb - stored_msg = yield mdb.get_outbound_message(msg["message_id"]) + stored_msg = yield self.opms.get_outbound_message(msg["message_id"]) self.assertEqual(stored_msg, None) yield self.dispatch_outbound(msg, 'billing_dispatcher_ri') @@ -1165,7 +1162,7 @@ def test_outbound_message_does_not_get_stored_if_disabled(self): self.with_md(msg, hops=hops) self.assertEqual([msg], self.get_dispatched_outbound('sphex')) - stored_msg = yield mdb.get_outbound_message(msg["message_id"]) + stored_msg = yield self.opms.get_outbound_message(msg["message_id"]) self.assertEqual(stored_msg, None) @inlineCallbacks @@ -1177,8 +1174,7 @@ def test_outbound_message_does_not_get_stored_before_billing(self): msg = self.with_md( self.msg_helper.make_outbound("foo"), conv=('app1', 'conv1')) - mdb = self.vumi_helper.get_vumi_api().mdb - stored_msg = yield mdb.get_outbound_message(msg["message_id"]) + stored_msg = yield self.opms.get_outbound_message(msg["message_id"]) self.assertEqual(stored_msg, None) yield self.dispatch_outbound(msg, 'app1') @@ -1193,7 +1189,7 @@ def test_outbound_message_does_not_get_stored_before_billing(self): self.assertEqual( [msg], self.get_dispatched_outbound('billing_dispatcher_ro')) - stored_msg = yield mdb.get_outbound_message(msg["message_id"]) + stored_msg = yield self.opms.get_outbound_message(msg["message_id"]) self.assertEqual(stored_msg, None) @@ -1398,6 +1394,5 @@ def test_reply_for_unroutable_inbound_gets_stored(self): optout={'optout': False}) [reply] = self.get_dispatched_outbound('sphex') - mdb = self.vumi_helper.get_vumi_api().mdb - stored_msg = yield mdb.get_outbound_message(reply["message_id"]) + stored_msg = yield self.opms.get_outbound_message(reply["message_id"]) self.assertEqual(stored_msg, reply) From 7d4500754a6675d8101be51551caa8f3b84fe590 Mon Sep 17 00:00:00 2001 From: Jeremy Thurgood Date: Fri, 3 Jul 2015 17:35:06 +0200 Subject: [PATCH 02/10] Clean up remaining uses of old message store. --- .../commands/go_manage_message_cache.py | 3 +- .../commands/go_migrate_conversations.py | 42 ++++++++++--------- .../management/commands/go_system_stats.py | 18 ++++---- go/base/tests/test_go_manage_message_cache.py | 17 ++++---- .../tests/test_go_migrate_conversations.py | 37 ++++++++-------- go/routers/tests/helpers.py | 1 - go/vumitools/api.py | 4 -- go/vumitools/conversation/utils.py | 30 ++++++------- go/vumitools/metrics.py | 6 ++- 9 files changed, 78 insertions(+), 80 deletions(-) diff --git a/go/base/management/commands/go_manage_message_cache.py b/go/base/management/commands/go_manage_message_cache.py index 44cc71007..8b64eb754 100644 --- a/go/base/management/commands/go_manage_message_cache.py +++ b/go/base/management/commands/go_manage_message_cache.py @@ -110,6 +110,7 @@ def _apply_command(self, func, dry_run=None): def handle_command_rebuild(self, *args, **options): def rebuild(batch_id): - self.vumi_api.FIXME_mdb.reconcile_cache(batch_id) + qms = self.vumi_api.get_query_message_store() + self.vumi_api.get_batch_manager().rebuild_cache(batch_id, qms) self._apply_command(rebuild) diff --git a/go/base/management/commands/go_migrate_conversations.py b/go/base/management/commands/go_migrate_conversations.py index 4ffb3c122..c118a3212 100644 --- a/go/base/management/commands/go_migrate_conversations.py +++ b/go/base/management/commands/go_migrate_conversations.py @@ -12,8 +12,9 @@ class Migration(object): """Base class for migrations.""" name = None - def __init__(self, dry_run): + def __init__(self, dry_run, vumi_api): self._dry_run = dry_run + self.vumi_api = vumi_api @classmethod def migrator_classes(cls): @@ -107,19 +108,21 @@ def _process_pages(self, index_page, batch_id, get_message, add_message): add_message(get_message(key), batch_ids=[batch_id]) index_page = index_page.next_page() - def _copy_msgs(self, FIXME_mdb, old_batch, new_batch): + def _copy_msgs(self, old_batch, new_batch): + qms = self.vumi_api.get_query_message_store() + opms = self.vumi_api.get_operational_message_store() self._process_pages( - FIXME_mdb.batch_outbound_keys_page(old_batch), new_batch, - FIXME_mdb.get_outbound_message, FIXME_mdb.add_outbound_message) + qms.list_batch_outbound_keys(old_batch), new_batch, + opms.get_outbound_message, opms.add_outbound_message) self._process_pages( - FIXME_mdb.batch_inbound_keys_page(old_batch), new_batch, - FIXME_mdb.get_inbound_message, FIXME_mdb.add_inbound_message) + qms.list_batch_inbound_keys(old_batch), new_batch, + opms.get_inbound_message, opms.add_inbound_message) def migrate(self, user_api, conv): conv_batches = conv.batches.keys() new_batch = user_api.api.get_batch_manager().batch_start() for batch in conv_batches: - self._copy_msgs(user_api.api.FIXME_mdb, batch, new_batch) + self._copy_msgs(batch, new_batch) conv.batches.clear() conv.batches.add_key(new_batch) conv.save() @@ -133,11 +136,10 @@ class SplitBatches(Migration): " conversation batches to the new batch.") def applies_to(self, user_api, conv): - FIXME_mdb = user_api.api.FIXME_mdb - tag_keys = FIXME_mdb.current_tags.index_keys('current_batch', conv.batch.key) - if tag_keys: - return True - return False + batch_manager = user_api.api.get_batch_manager() + batch = batch_manager.get_batch(conv.batch.key) + current_tags = [batch_manager.get_tag_info(tag) for tag in batch.tags] + return any(ct.current_batch.key == batch.key for ct in current_tags) def _process_pages(self, index_page, batch_id, get_message, add_message): while index_page is not None: @@ -145,18 +147,20 @@ def _process_pages(self, index_page, batch_id, get_message, add_message): add_message(get_message(key), batch_ids=[batch_id]) index_page = index_page.next_page() - def _copy_msgs(self, FIXME_mdb, old_batch, new_batch): + def _copy_msgs(self, old_batch, new_batch): + qms = self.vumi_api.get_query_message_store() + opms = self.vumi_api.get_operational_message_store() self._process_pages( - FIXME_mdb.batch_outbound_keys_page(old_batch), new_batch, - FIXME_mdb.get_outbound_message, FIXME_mdb.add_outbound_message) + qms.list_batch_outbound_keys(old_batch), new_batch, + opms.get_outbound_message, opms.add_outbound_message) self._process_pages( - FIXME_mdb.batch_inbound_keys_page(old_batch), new_batch, - FIXME_mdb.get_inbound_message, FIXME_mdb.add_inbound_message) + qms.list_batch_inbound_keys(old_batch), new_batch, + opms.get_inbound_message, opms.add_inbound_message) def migrate(self, user_api, conv): old_batch = conv.batch.key new_batch = user_api.api.get_batch_manager().batch_start() - self._copy_msgs(user_api.api.FIXME_mdb, old_batch, new_batch) + self._copy_msgs(old_batch, new_batch) conv.batch.key = new_batch conv.save() @@ -246,7 +250,7 @@ def get_migrator(self, migration_name, dry_run): migrator_cls = Migration.migrator_class(migration_name) if migrator_cls is None: return None - return migrator_cls(dry_run) + return migrator_cls(dry_run, self.vumi_api) def handle_user(self, user, migrator): user_api = self.user_api_for_user(user) diff --git a/go/base/management/commands/go_system_stats.py b/go/base/management/commands/go_system_stats.py index 6538f1a6c..42af7ff9f 100644 --- a/go/base/management/commands/go_system_stats.py +++ b/go/base/management/commands/go_system_stats.py @@ -105,16 +105,16 @@ def handle_command_conversation_types_by_month(self, *args, **options): writer.writerow(row) def _increment_msg_stats(self, conv, stats): - inbound_stats = conv.FIXME_mdb.batch_inbound_stats(conv.batch.key) - outbound_stats = conv.FIXME_mdb.batch_outbound_stats(conv.batch.key) + key = conv.batch.key + qms = conv.qms + from_addr_count = qms.get_batch_from_addr_count(key) + to_addr_count = qms.get_batch_to_addr_count(key) stats["conversations_started"] += 1 - stats["inbound_message_count"] += inbound_stats['total'] - stats["outbound_message_count"] += outbound_stats['total'] - stats["inbound_uniques"] += inbound_stats['unique_addresses'] - stats["outbound_uniques"] += outbound_stats['unique_addresses'] - stats["total_uniques"] += max( - inbound_stats['unique_addresses'], - outbound_stats['unique_addresses']) + stats["inbound_message_count"] += qms.get_batch_inbound_count(key) + stats["outbound_message_count"] += qms.get_batch_outbound_count(key) + stats["inbound_uniques"] += from_addr_count + stats["outbound_uniques"] += to_addr_count + stats["total_uniques"] += max(from_addr_count, to_addr_count) def handle_command_message_counts_by_month(self, *args, **options): month_stats = {} diff --git a/go/base/tests/test_go_manage_message_cache.py b/go/base/tests/test_go_manage_message_cache.py index 6d9ccae51..37fa32e52 100644 --- a/go/base/tests/test_go_manage_message_cache.py +++ b/go/base/tests/test_go_manage_message_cache.py @@ -15,20 +15,19 @@ class TestGoManageMessageCache(GoCommandTestCase): def setUp(self): self.setup_command(go_manage_message_cache.Command) + self.qms = self.vumi_helper.get_vumi_api().get_query_message_store() def clear_batches(self, batches): """ Clear the message cache so that all batches need reconciliation. """ - vumi_api = self.vumi_helper.get_vumi_api() for batch_id in batches: - vumi_api.FIXME_mdb.cache.clear_batch(batch_id) + self.qms.batch_info_cache.clear_batch(batch_id) self.assert_batches_cleared(batches) def assert_batches_cleared(self, batches): - vumi_api = self.vumi_helper.get_vumi_api() for batch_id in batches: - self.assertFalse(vumi_api.FIXME_mdb.cache.batch_exists(batch_id)) + self.assertFalse(self.qms.batch_info_cache.batch_exists(batch_id)) def count_results(self, index_page): count = 0 @@ -45,17 +44,15 @@ def needs_rebuild(self, batch_id, delta=0.01): What an acceptable delta is for the cached values. Defaults to 0.01 If the cached values are off by the delta then this returns True. """ - vumi_api = self.vumi_helper.get_vumi_api() - inbound = float(self.count_results( - vumi_api.FIXME_mdb.batch_inbound_keys_page(batch_id))) - cached_inbound = vumi_api.FIXME_mdb.cache.inbound_message_count(batch_id) + self.qms.list_batch_inbound_keys(batch_id))) + cached_inbound = self.qms.get_batch_inbound_count(batch_id) if inbound and (abs(cached_inbound - inbound) / inbound) > delta: return True outbound = float(self.count_results( - vumi_api.FIXME_mdb.batch_outbound_keys_page(batch_id))) - cached_outbound = vumi_api.FIXME_mdb.cache.outbound_message_count(batch_id) + self.qms.list_batch_outbound_keys(batch_id))) + cached_outbound = self.qms.get_batch_outbound_count(batch_id) if outbound and (abs(cached_outbound - outbound) / outbound) > delta: return True diff --git a/go/base/tests/test_go_migrate_conversations.py b/go/base/tests/test_go_migrate_conversations.py index af6f558f0..861d48dd9 100644 --- a/go/base/tests/test_go_migrate_conversations.py +++ b/go/base/tests/test_go_migrate_conversations.py @@ -134,9 +134,11 @@ def test_migrate_models(self): self.assertEqual(conv.name, loaded_conv.name) def setup_fix_batches(self, tags=(), num_batches=1): - FIXME_mdb = self.user_api.api.FIXME_mdb + batch_manager = self.user_api.api.get_batch_manager() + opms = self.user_api.api.get_operational_message_store() msg_helper = GoMessageHelper() # We can't use .store_*(), so no mdb. - batches = [FIXME_mdb.batch_start(tags=tags) for i in range(num_batches)] + batches = [batch_manager.batch_start(tags=tags) + for i in range(num_batches)] conv = self.mkoldconv( create_batch=False, conversation_type=u'dummy_type', @@ -146,35 +148,32 @@ def setup_fix_batches(self, tags=(), num_batches=1): for i, batch_id in enumerate(batches): conv.batches.add_key(batch_id) msg1 = msg_helper.make_inbound("in", message_id=u"msg-%d" % i) - FIXME_mdb.add_inbound_message(msg1, batch_ids=[batch_id]) + opms.add_inbound_message(msg1, batch_ids=[batch_id]) msg2 = msg_helper.make_outbound("out", message_id=u"msg-%d" % i) - FIXME_mdb.add_outbound_message(msg2, batch_ids=[batch_id]) + opms.add_outbound_message(msg2, batch_ids=[batch_id]) conv.save() return conv def assert_batches_fixed(self, old_conv): + # This assumes we only have one index page of results in each query. + qms = self.user_api.api.get_query_message_store() old_batches = old_conv.batches.keys() new_conv = self.user_api.conversation_store.get_conversation_by_key( old_conv.key) new_batch = new_conv.batch.key self.assertTrue(new_batch not in old_batches) - FIXME_mdb = self.user_api.api.FIXME_mdb - old_outbound, old_inbound = set(), set() + old_out, old_in = set(), set() for batch in old_batches: - collect_all_results( - FIXME_mdb.batch_outbound_keys_page(batch), old_outbound) - collect_all_results( - FIXME_mdb.batch_inbound_keys_page(batch), old_inbound) - - new_outbound = collect_all_results( - FIXME_mdb.batch_outbound_keys_page(new_batch)) - new_inbound = collect_all_results( - FIXME_mdb.batch_inbound_keys_page(new_batch)) - self.assertEqual(new_outbound, old_outbound) - self.assertEqual(new_inbound, old_inbound) + collect_all_results(qms.list_batch_outbound_keys(batch), old_out) + collect_all_results(qms.list_batch_inbound_keys(batch), old_in) + + new_out = collect_all_results(qms.list_batch_outbound_keys(new_batch)) + new_in = collect_all_results(qms.list_batch_inbound_keys(new_batch)) + self.assertEqual(new_out, old_out) + self.assertEqual(new_in, old_in) def _check_fix_batches(self, migration_name, tags, num_batches, migrated): conv = self.setup_fix_batches(tags, num_batches) @@ -220,8 +219,8 @@ def test_split_batches_on_conv_with_single_batch_with_no_tag(self): self.check_split_batches(tags=(), num_batches=1, migrated=False) def test_split_batches_on_conv_with_batch_with_tag(self): - self.check_split_batches(tags=[(u"pool", u"tag")], - num_batches=1, migrated=True) + self.check_split_batches( + tags=[(u"pool", u"tag")], num_batches=1, migrated=True) def test_fix_jsbox_endpoints(self): app_config = { diff --git a/go/routers/tests/helpers.py b/go/routers/tests/helpers.py index ef29b379b..21d25a40a 100644 --- a/go/routers/tests/helpers.py +++ b/go/routers/tests/helpers.py @@ -155,7 +155,6 @@ def get_router_worker(self, config=None, start=True): worker = yield self.ri.get_worker(self._worker_class, config, start) # Set up our other bits of helper. self.vumi_helper.set_vumi_api(worker.vumi_api) - self.msg_helper.mdb = worker.vumi_api.mdb returnValue(worker) @inlineCallbacks diff --git a/go/vumitools/api.py b/go/vumitools/api.py index dea9b0e02..ec12334d1 100644 --- a/go/vumitools/api.py +++ b/go/vumitools/api.py @@ -12,7 +12,6 @@ from vumi.errors import VumiError from vumi.message import Message from vumi.components.tagpool import TagpoolManager -from vumi.components.message_store import MessageStore from vumi.persist.model import Manager from vumi.persist.riak_manager import RiakManager from vumi.persist.txriak_manager import TxRiakManager @@ -537,9 +536,6 @@ def __init__(self, manager, redis, sender=None, metric_publisher=None): self.redis = redis self.tpm = TagpoolManager(self.redis.sub_manager('tagpool_store')) - self.FIXME_mdb = MessageStore( - self.manager, self.redis.sub_manager('message_store')) - self.mdb = None self.account_store = AccountStore(self.manager) self.token_manager = TokenManager( self.redis.sub_manager('token_manager')) diff --git a/go/vumitools/conversation/utils.py b/go/vumitools/conversation/utils.py index 174e7f234..b42bff75b 100644 --- a/go/vumitools/conversation/utils.py +++ b/go/vumitools/conversation/utils.py @@ -24,7 +24,6 @@ def __init__(self, conversation, user_api): self.manager = self.c.manager self.base_manager = self.api.manager self._channels = None - self.FIXME_mdb = self.api.FIXME_mdb @Manager.calls_manager def stop_conversation(self): @@ -116,7 +115,7 @@ def get_progress_status(self): 'delivery_report_delivered', 'delivery_report_failed', 'delivery_report_pending')) - batch_status = yield self.FIXME_mdb.batch_status(self.batch.key) + batch_status = yield self.qms.get_batch_info_status(self.batch.key) for k, v in batch_status.items(): k = k.replace('.', '_') statuses[k] += v @@ -189,28 +188,28 @@ def count_inbound_messages(self): Count the total number of replies received. This is pulled from the cache. """ - return self.FIXME_mdb.cache.count_inbound_message_keys(self.batch.key) + return self.qms.get_batch_inbound_count(self.batch.key) def count_outbound_messages(self): """ Count the total number of messages sent. This is pulled from the cache. """ - return self.FIXME_mdb.cache.count_outbound_message_keys(self.batch.key) + return self.qms.get_batch_outbound_count(self.batch.key) def count_inbound_uniques(self): """ Count the total unique `from_addr` values seen for the batch_key. Pulled from the cache. """ - return self.FIXME_mdb.cache.count_from_addrs(self.batch.key) + return self.qms.get_batch_from_addr_count(self.batch.key) def count_outbound_uniques(self): """ Count the total unique `to_addr` values seen for the batch_key. Pulled from the cache. """ - return self.FIXME_mdb.cache.count_to_addrs(self.batch.key) + return self.qms.get_batch_to_addr_count(self.batch.key) @Manager.calls_manager def collect_messages(self, keys, get_msg, include_sensitive, scrubber): @@ -288,14 +287,14 @@ def received_messages_in_cache(self, start=0, limit=100, content of the message to be scrubbed. By default it is a noop which leaves the content unchanged. """ + # FIXME: limit actually means end, apparently. scrubber = scrubber or (lambda msg: msg) - # Redis counts from zero, so we - 1 on the limit. - keys = yield self.FIXME_mdb.cache.get_inbound_message_keys( - self.batch.key, start, limit - 1) + keys = yield self.qms.list_batch_recent_inbound_keys(self.batch.key) + keys = keys[start:limit] replies = yield self.collect_messages( - keys, self.FIXME_mdb.get_inbound_message, include_sensitive, scrubber) + keys, self.qms.get_inbound_message, include_sensitive, scrubber) # Preserve order returnValue( @@ -331,13 +330,14 @@ def sent_messages_in_cache(self, start=0, limit=100, content of the message to be scrubbed. By default it is a noop which leaves the content unchanged. """ + # FIXME: limit actually means end, apparently. scrubber = scrubber or (lambda msg: msg) - keys = yield self.FIXME_mdb.cache.get_outbound_message_keys( - self.batch.key, start, limit - 1) + keys = yield self.qms.list_batch_recent_outbound_keys(self.batch.key) + keys = keys[start:limit] sent_messages = yield self.collect_messages( - keys, self.FIXME_mdb.get_outbound_message, include_sensitive, scrubber) + keys, self.qms.get_outbound_message, include_sensitive, scrubber) # Preserve order returnValue( @@ -383,7 +383,7 @@ def get_inbound_throughput(self, sample_time=300): Calculate how many inbound messages per minute we've been doing on average. """ - inbounds = yield self.FIXME_mdb.cache.get_inbound_message_keys( + inbounds = yield self.qms.list_batch_recent_inbound_keys( self.batch.key, with_timestamp=True) if not inbounds: returnValue(0.0) @@ -397,7 +397,7 @@ def get_outbound_throughput(self, sample_time=300): Calculate how many outbound messages per minute we've been doing on average. """ - outbounds = yield self.FIXME_mdb.cache.get_outbound_message_keys( + outbounds = yield self.qms.list_batch_recent_outbound_keys( self.batch.key, with_timestamp=True) if not outbounds: returnValue(0.0) diff --git a/go/vumitools/metrics.py b/go/vumitools/metrics.py index 421271a14..d5794e587 100644 --- a/go/vumitools/metrics.py +++ b/go/vumitools/metrics.py @@ -66,7 +66,8 @@ class MessagesSentMetric(ConversationMetric): def get_value(self, user_api): batch_id = self.conv.batch.key - return user_api.api.FIXME_mdb.cache.count_outbound_message_keys(batch_id) + qms = user_api.api.get_query_message_store() + return qms.get_batch_outbound_count(batch_id) class MessagesReceivedMetric(ConversationMetric): @@ -74,7 +75,8 @@ class MessagesReceivedMetric(ConversationMetric): def get_value(self, user_api): batch_id = self.conv.batch.key - return user_api.api.FIXME_mdb.cache.count_inbound_message_keys(batch_id) + qms = user_api.api.get_query_message_store() + return qms.get_batch_inbound_count(batch_id) def get_account_metric_prefix(account_key, store): From ced652c5c5c6e1db1421f1c4456bbb769d140f91 Mon Sep 17 00:00:00 2001 From: Jeremy Thurgood Date: Wed, 8 Jul 2015 17:29:04 +0200 Subject: [PATCH 03/10] Add vumi-message-store to requirements. --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index 1b1785dce..5b3d1b6f3 100644 --- a/setup.py +++ b/setup.py @@ -13,6 +13,7 @@ packages=find_packages(), install_requires=[ 'vumi>=0.5.21', + 'vumi-message-store', 'vxsandbox>=0.5.0', 'vxpolls', 'vumi-wikipedia>=0.2.1', From 1820cfbf03355081bc34d0a3cf8ba69f2bf96f56 Mon Sep 17 00:00:00 2001 From: Jeremy Thurgood Date: Thu, 9 Jul 2015 09:26:06 +0200 Subject: [PATCH 04/10] Add helper properly, remove obsolete comment. --- go/base/tests/test_go_migrate_conversations.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/base/tests/test_go_migrate_conversations.py b/go/base/tests/test_go_migrate_conversations.py index 861d48dd9..548751a8d 100644 --- a/go/base/tests/test_go_migrate_conversations.py +++ b/go/base/tests/test_go_migrate_conversations.py @@ -136,7 +136,7 @@ def test_migrate_models(self): def setup_fix_batches(self, tags=(), num_batches=1): batch_manager = self.user_api.api.get_batch_manager() opms = self.user_api.api.get_operational_message_store() - msg_helper = GoMessageHelper() # We can't use .store_*(), so no mdb. + msg_helper = self.add_helper(GoMessageHelper()) batches = [batch_manager.batch_start(tags=tags) for i in range(num_batches)] From 79bb772d295bd9606703948938db176a923f7490 Mon Sep 17 00:00:00 2001 From: Jeremy Thurgood Date: Thu, 9 Jul 2015 10:55:10 +0200 Subject: [PATCH 05/10] Reuse batch managers instead of creating new ones. --- go/vumitools/api.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/go/vumitools/api.py b/go/vumitools/api.py index ec12334d1..ec7984168 100644 --- a/go/vumitools/api.py +++ b/go/vumitools/api.py @@ -353,10 +353,10 @@ def _update_tag_data_for_acquire(self, user_account, tag): # The batch we create here gets added to the tag_info and we can fish # it out later. When we replace this with proper channel objects we can # stash it there like we do with conversations and routers. - yield self.api.get_batch_manager().batch_start( - [tag], user_account=user_account.key) + batch_manager = self.api.get_batch_manager() + yield batch_manager.batch_start([tag], user_account=user_account.key) user_account.tags.append(tag) - tag_info = yield self.api.get_batch_manager().get_tag_info(tag) + tag_info = yield batch_manager.get_tag_info(tag) tag_info.metadata['user_account'] = user_account.key.decode('utf-8') yield tag_info.save() yield user_account.save() @@ -424,7 +424,8 @@ def release_tag(self, tag): except ValueError, e: log.error("Tag not allocated to account: %s" % (tag,), e) else: - tag_info = yield self.api.get_batch_manager().get_tag_info(tag) + batch_manager = self.api.get_batch_manager() + tag_info = yield batch_manager.get_tag_info(tag) if 'user_account' in tag_info.metadata: del tag_info.metadata['user_account'] yield tag_info.save() @@ -432,8 +433,7 @@ def release_tag(self, tag): # We should probably refactor the message store to make this # less clumsy. if tag_info.current_batch.key: - yield self.api.get_batch_manager().batch_done( - tag_info.current_batch.key) + yield batch_manager.batch_done(tag_info.current_batch.key) # Clean up routing table entries. routing_table = yield self.get_routing_table(user_account) From 654cf8eff583dd6baf8ff9184e13a65e430dd316 Mon Sep 17 00:00:00 2001 From: Jeremy Thurgood Date: Tue, 28 Jul 2015 18:12:56 +0200 Subject: [PATCH 06/10] New message store in storing middleware and explicitly set batch_ids on events. --- go/apps/jsbox/tests/test_message_store.py | 2 +- go/vumitools/conversation/tests/test_utils.py | 2 +- go/vumitools/middleware.py | 140 +++++++- go/vumitools/tests/helpers.py | 24 +- go/vumitools/tests/test_middleware.py | 313 ++++++++++++++++-- 5 files changed, 437 insertions(+), 44 deletions(-) diff --git a/go/apps/jsbox/tests/test_message_store.py b/go/apps/jsbox/tests/test_message_store.py index 5492c8d0b..e689a74bb 100644 --- a/go/apps/jsbox/tests/test_message_store.py +++ b/go/apps/jsbox/tests/test_message_store.py @@ -48,7 +48,7 @@ def setUp(self): # ack outbound event = self.msg_helper.make_ack(outbound_msg) - yield opms.add_event(event) + yield opms.add_event(event, batch_ids=[self.conversation.batch.key]) # monkey patch for when no conversation_key is provided self.app_worker.conversation_for_api = lambda *a: self.conversation diff --git a/go/vumitools/conversation/tests/test_utils.py b/go/vumitools/conversation/tests/test_utils.py index 110cecc34..804ddb462 100644 --- a/go/vumitools/conversation/tests/test_utils.py +++ b/go/vumitools/conversation/tests/test_utils.py @@ -36,7 +36,7 @@ def store_events(self, outbound, event_type, count=None, **kwargs): for message in messages: event_maker = getattr(self.msg_helper, 'make_%s' % (event_type,)) event = event_maker(message, **kwargs) - yield self.msg_helper.store_event(event) + yield self.msg_helper.store_event(self.conv, event) events.append(event) returnValue(events) diff --git a/go/vumitools/middleware.py b/go/vumitools/middleware.py index b46f8c080..be2d89509 100644 --- a/go/vumitools/middleware.py +++ b/go/vumitools/middleware.py @@ -6,12 +6,12 @@ from twisted.internet import reactor from twisted.internet.defer import inlineCallbacks, returnValue +from vumi import log from vumi.config import ( ConfigBool, ConfigDict, ConfigFloat, ConfigInt, ConfigList, ConfigRiak, ConfigText) +from vumi.message import TransportUserMessage from vumi.middleware.base import TransportMiddleware, BaseMiddleware -from vumi.middleware.message_storing import ( - StoringMiddleware, StoringMiddlewareConfig) from vumi.middleware.tagger import TaggingMiddleware from vumi.utils import normalize_msisdn from vumi.blinkenlights.metrics import ( @@ -603,27 +603,64 @@ def handle_failure(self, failure, connector_name): return failure -class GoStoringMiddlewareConfig(StoringMiddlewareConfig): +class GoStoringMiddlewareConfig(BaseMiddleware.CONFIG_CLASS): """ GoStoringMiddleware configuration options. """ - + redis_manager = ConfigDict( + "Redis configuration parameters", default={}, static=True) + riak_manager = ConfigRiak( + "Riak configuration parameters. Must contain at least a bucket_prefix" + " key", required=True, static=True) + store_on_consume = ConfigBool( + "``True`` to store consumed messages as well as published ones, " + "``False`` to store only published messages.", default=True, + static=True) conversation_cache_ttl = ConfigInt( "Time in seconds to cache conversations for.", default=5, static=True) -class GoStoringMiddleware(StoringMiddleware): +class GoStoringMiddleware(BaseMiddleware): + """ + Middleware for storing inbound and outbound messages and events. + + Failures are not stored currently because these are typically + stored by :class:`vumi.transports.FailureWorker` instances. + + Messages are always stored. However, in order for messages to be associated + with a particular batch_id a batch needs to be created in the message store + (typically by an application worker that initiates sending outbound + messages) and messages need to be tagged with a tag associated with the + batch (typically by an application worker or middleware such as + :class:`vumi.middleware.TaggingMiddleware`). + + Configuration options: + + :param string store_prefix: + Prefix for message store keys in key-value store. + Default is 'message_store'. + :param dict redis_manager: + Redis configuration parameters. + :param dict riak_manager: + Riak configuration parameters. Must contain at least + a bucket_prefix key. + :param bool store_on_consume: + ``True`` to store consumed messages as well as published ones, + ``False`` to store only published messages. + Default is ``True``. + """ CONFIG_CLASS = GoStoringMiddlewareConfig @inlineCallbacks def setup_middleware(self): - yield super(GoStoringMiddleware, self).setup_middleware() self.vumi_api = yield VumiApi.from_config_async({ "riak_manager": self.config.riak_manager, "redis_manager": self.config.redis_manager, }) + self.opms = self.vumi_api.get_operational_message_store() + self.store_on_consume = self.config.store_on_consume # We don't have access to our worker's conversation cache (if any), so # we use our own here to avoid duplicate lookups between messages for # the same conversation. @@ -634,23 +671,108 @@ def setup_middleware(self): def teardown_middleware(self): yield self._conversation_cache.cleanup() yield self.vumi_api.close() - yield super(GoStoringMiddleware, self).teardown_middleware() def get_batch_id(self, msg): raise NotImplementedError("Sub-classes should implement .get_batch_id") + def handle_consume_inbound(self, message, connector_name): + if not self.store_on_consume: + return message + return self.handle_inbound(message, connector_name) + + def handle_consume_outbound(self, message, connector_name): + if not self.store_on_consume: + return message + return self.handle_outbound(message, connector_name) + + def handle_consume_event(self, event, connector_name): + if not self.store_on_consume: + return event + return self.handle_event(event, connector_name) + @inlineCallbacks def handle_inbound(self, message, connector_name): batch_id = yield self.get_batch_id(message) - yield self.store.add_inbound_message(message, batch_ids=[batch_id]) + yield self.opms.add_inbound_message(message, batch_ids=[batch_id]) returnValue(message) @inlineCallbacks def handle_outbound(self, message, connector_name): batch_id = yield self.get_batch_id(message) - yield self.store.add_outbound_message(message, batch_ids=[batch_id]) + yield self.opms.add_outbound_message(message, batch_ids=[batch_id]) returnValue(message) + @inlineCallbacks + def handle_event(self, event, connector_name): + outbound = yield self.find_message_for_event(event) + batch_id = yield self.get_batch_id(outbound) + + transport_metadata = event.get('transport_metadata', {}) + # FIXME: The SMPP transport writes a 'datetime' object + # in the 'date' of the transport_metadata. + # json.dumps() that RiakObject uses is unhappy with that. + if 'date' in transport_metadata: + date = transport_metadata['date'] + if not isinstance(date, basestring): + transport_metadata['date'] = date.isoformat() + yield self.opms.add_event(event, batch_ids=[batch_id]) + returnValue(event) + + @inlineCallbacks + def _find_outboundmessage_for_event(self, event): + user_message_id = event.get('user_message_id') + if user_message_id is None: + log.error('Received event without user_message_id: %s' % (event,)) + return + + msg = yield self.opms.get_outbound_message(user_message_id) + if msg is None: + log.error('Unable to find message for event: %s' % (event,)) + + returnValue(msg) + + _EVENT_OUTBOUND_CACHE_KEY = "outbound_message_json" + + def _get_outbound_from_event_cache(self, event): + """ + Retrieve outbound message from the cache on an event. + """ + if self._EVENT_OUTBOUND_CACHE_KEY not in event.cache: + return False, None + outbound_json = event.cache[self._EVENT_OUTBOUND_CACHE_KEY] + if outbound_json is None: + return True, None + return True, TransportUserMessage.from_json(outbound_json) + + def _store_outbound_in_event_cache(self, event, outbound): + """ + Store an outbound message in the cache on an event. + """ + if outbound is None: + event.cache[self._EVENT_OUTBOUND_CACHE_KEY] = None + else: + event.cache[self._EVENT_OUTBOUND_CACHE_KEY] = outbound.to_json() + + @inlineCallbacks + def find_message_for_event(self, event): + hit, outbound_msg = self._get_outbound_from_event_cache(event) + if hit: + returnValue(outbound_msg) + + outbound_msg = yield self._find_outboundmessage_for_event(event) + self._store_outbound_in_event_cache(event, outbound_msg) + returnValue(outbound_msg) + + +class TransportStoringMiddleware(GoStoringMiddleware): + @inlineCallbacks + def get_batch_id(self, msg): + mdh = MessageMetadataHelper(self.vumi_api, msg) + if mdh.tag is None: + returnValue(None) + tag_info = yield self.opms.get_tag_info(mdh.tag) + returnValue(tag_info.current_batch.key) + class ConversationStoringMiddleware(GoStoringMiddleware): @inlineCallbacks diff --git a/go/vumitools/tests/helpers.py b/go/vumitools/tests/helpers.py index 778e5e01d..da8228712 100644 --- a/go/vumitools/tests/helpers.py +++ b/go/vumitools/tests/helpers.py @@ -78,6 +78,12 @@ def add_conversation_metadata(self, msg, conv): md.set_conversation_info(conv.conversation_type, conv.key) md.set_user_account(conv.user_account.key) + @proxyable + def add_tag_metadata(self, msg, tag): + msg.payload.setdefault('helper_metadata', {}) + md = MessageMetadataHelper(None, msg) + md.set_tag(tag) + @proxyable def _add_go_metadata(self, msg, conv, router): if conv is not None: @@ -95,18 +101,22 @@ def _add_go_routing_metadata(self, msg, hops, outbound_hops): @proxyable def make_inbound(self, content, conv=None, router=None, - hops=None, outbound_hops=None, **kw): + hops=None, outbound_hops=None, tag=None, **kw): msg = self._msg_helper.make_inbound(content, **kw) self._add_go_metadata(msg, conv, router) self._add_go_routing_metadata(msg, hops, outbound_hops) + if tag is not None: + self.add_tag_metadata(msg, tag) return msg @proxyable def make_outbound(self, content, conv=None, router=None, - hops=None, outbound_hops=None, **kw): + hops=None, outbound_hops=None, tag=None, **kw): msg = self._msg_helper.make_outbound(content, **kw) self._add_go_metadata(msg, conv, router) self._add_go_routing_metadata(msg, hops, outbound_hops) + if tag is not None: + self.add_tag_metadata(msg, tag) return msg @proxyable @@ -148,8 +158,8 @@ def store_outbound(self, conv, msg): msg, batch_ids=[conv.batch.key]) @proxyable - def store_event(self, event): - return self._get_opms().add_event(event) + def store_event(self, conv, event): + return self._get_opms().add_event(event, batch_ids=[conv.batch.key]) @proxyable def make_stored_inbound(self, conv, content, **kw): @@ -164,17 +174,17 @@ def make_stored_outbound(self, conv, content, **kw): @proxyable def make_stored_ack(self, conv, msg, **kw): event = self.make_ack(msg, conv=conv, **kw) - return maybe_async_return(event, self.store_event(event)) + return maybe_async_return(event, self.store_event(conv, event)) @proxyable def make_stored_nack(self, conv, msg, **kw): event = self.make_nack(msg, conv=conv, **kw) - return maybe_async_return(event, self.store_event(event)) + return maybe_async_return(event, self.store_event(conv, event)) @proxyable def make_stored_delivery_report(self, conv, msg, **kw): event = self.make_delivery_report(msg, conv=conv, **kw) - return maybe_async_return(event, self.store_event(event)) + return maybe_async_return(event, self.store_event(conv, event)) @proxyable def add_inbound_to_conv(self, conv, count, start_date=None, diff --git a/go/vumitools/tests/test_middleware.py b/go/vumitools/tests/test_middleware.py index 29f2bef8f..b5900d864 100644 --- a/go/vumitools/tests/test_middleware.py +++ b/go/vumitools/tests/test_middleware.py @@ -6,7 +6,7 @@ from zope.interface import implements from vumi.transports.failures import FailureMessage -from vumi.message import TransportUserMessage +from vumi.message import TransportUserMessage, format_vumi_date from vumi.middleware.tagger import TaggingMiddleware from vumi.tests.helpers import VumiTestCase, generate_proxies, IHelper from vumi.worker import BaseWorker @@ -14,7 +14,8 @@ from go.vumitools.app_worker import GoWorkerMixin, GoWorkerConfigMixin from go.vumitools.middleware import ( NormalizeMsisdnMiddleware, OptOutMiddleware, MetricsMiddleware, - ConversationStoringMiddleware, RouterStoringMiddleware) + TransportStoringMiddleware, ConversationStoringMiddleware, + RouterStoringMiddleware) from go.vumitools.tests.helpers import VumiApiHelper, GoMessageHelper @@ -824,29 +825,212 @@ def collect_all_results(index_page, results=None): return index_page.next_page().addCallback(collect_all_results, results) -class TestConversationStoringMiddleware(VumiTestCase): +def msg_timestamp(msg): + return format_vumi_date(msg['timestamp'])[:-7] + ".000000" - @inlineCallbacks - def setUp(self): - self.mw_helper = self.add_helper( - MiddlewareHelper(ConversationStoringMiddleware)) - yield self.mw_helper.setup_vumi_api() - self.qms = self.mw_helper.get_vumi_api().get_query_message_store() - self.user_helper = yield self.mw_helper.make_user(u'user') - self.conv = yield self.user_helper.create_conversation(u'dummy_conv') + +def event_tuple(event): + return (event['event_id'], msg_timestamp(event), event['event_type']) + + +class BaseStoringMiddlewareTestCase(VumiTestCase): + """ + Base class for some assertion methods. + """ @inlineCallbacks def assert_stored_inbound(self, msgs): - page = yield self.qms.list_batch_inbound_keys(self.conv.batch.key) + page = yield self.qms.list_batch_inbound_keys(self.batch_id) ids = yield collect_all_results(page) self.assertEqual(sorted(ids), sorted(m['message_id'] for m in msgs)) @inlineCallbacks def assert_stored_outbound(self, msgs): - page = yield self.qms.list_batch_outbound_keys(self.conv.batch.key) + page = yield self.qms.list_batch_outbound_keys(self.batch_id) ids = yield collect_all_results(page) self.assertEqual(sorted(ids), sorted(m['message_id'] for m in msgs)) + @inlineCallbacks + def assert_stored_event(self, events): + page = yield self.qms.list_batch_events(self.batch_id) + event_tuples = yield collect_all_results(page) + expected_tuples = [event_tuple(e) for e in events] + self.assertEqual(sorted(event_tuples), sorted(expected_tuples)) + + +class TestTransportStoringMiddleware(BaseStoringMiddlewareTestCase): + + @inlineCallbacks + def setUp(self): + self.mw_helper = self.add_helper( + MiddlewareHelper(TransportStoringMiddleware)) + yield self.mw_helper.setup_vumi_api() + self.qms = self.mw_helper.get_vumi_api().get_query_message_store() + self.user_helper = yield self.mw_helper.make_user(u'user') + yield self.mw_helper.setup_tagpool(u'pool', [u'tag']) + yield self.user_helper.add_tagpool_permission(u'pool', 1) + self.tag = yield self.user_helper.user_api.acquire_tag(u'pool') + opms = self.mw_helper.get_vumi_api().get_operational_message_store() + tag_info = yield opms.get_tag_info(self.tag) + self.batch_id = tag_info.current_batch.key + + @inlineCallbacks + def test_inbound_message(self): + """ + When we consume or publish an inbound message, it is added to the + appropriate batch in the message store. + """ + mw = yield self.mw_helper.create_middleware() + + msg1 = self.mw_helper.make_inbound("inbound", tag=self.tag) + yield mw.handle_consume_inbound(msg1, 'default') + yield self.assert_stored_inbound([msg1]) + + msg2 = self.mw_helper.make_inbound("inbound", tag=self.tag) + yield mw.handle_publish_inbound(msg2, 'default') + yield self.assert_stored_inbound([msg1, msg2]) + + @inlineCallbacks + def test_inbound_message_no_tag(self): + """ + When we consume or publish an inbound message with no tag, it is not + added to a batch in the message store. + """ + mw = yield self.mw_helper.create_middleware() + + msg1 = self.mw_helper.make_inbound("inbound") + yield mw.handle_consume_inbound(msg1, 'default') + yield self.assert_stored_inbound([]) + + msg2 = self.mw_helper.make_inbound("inbound") + yield mw.handle_publish_inbound(msg2, 'default') + yield self.assert_stored_inbound([]) + + @inlineCallbacks + def test_inbound_message_no_consume_store(self): + """ + When we consume an inbound message, it is not added to the message + store when `store_on_consume` is disabled. + """ + mw = yield self.mw_helper.create_middleware({ + 'store_on_consume': False, + }) + + msg1 = self.mw_helper.make_inbound("inbound", tag=self.tag) + yield mw.handle_consume_inbound(msg1, 'default') + yield self.assert_stored_inbound([]) + + msg2 = self.mw_helper.make_inbound("inbound", tag=self.tag) + yield mw.handle_publish_inbound(msg2, 'default') + yield self.assert_stored_inbound([msg2]) + + @inlineCallbacks + def test_outbound_message(self): + """ + When we consume or publish an outbound message, it is added to the + appropriate batch in the message store. + """ + mw = yield self.mw_helper.create_middleware() + + msg1 = self.mw_helper.make_outbound("outbound", tag=self.tag) + yield mw.handle_consume_outbound(msg1, 'default') + yield self.assert_stored_outbound([msg1]) + + msg2 = self.mw_helper.make_outbound("outbound", tag=self.tag) + yield mw.handle_publish_outbound(msg2, 'default') + yield self.assert_stored_outbound([msg1, msg2]) + + @inlineCallbacks + def test_outbound_message_no_tag(self): + """ + When we consume or publish an outbound message with no tag, it is not + added to a batch in the message store. + """ + mw = yield self.mw_helper.create_middleware() + + msg1 = self.mw_helper.make_outbound("outbound") + yield mw.handle_consume_outbound(msg1, 'default') + yield self.assert_stored_outbound([]) + + msg2 = self.mw_helper.make_outbound("outbound") + yield mw.handle_publish_outbound(msg2, 'default') + yield self.assert_stored_outbound([]) + + @inlineCallbacks + def test_outbound_message_no_consume_store(self): + """ + When we consume an outbound message, it is not added to the message + store when `store_on_consume` is disabled. + """ + mw = yield self.mw_helper.create_middleware({ + 'store_on_consume': False, + }) + + msg1 = self.mw_helper.make_outbound("outbound", tag=self.tag) + yield mw.handle_consume_outbound(msg1, 'default') + yield self.assert_stored_outbound([]) + + msg2 = self.mw_helper.make_outbound("outbound", tag=self.tag) + yield mw.handle_publish_outbound(msg2, 'default') + yield self.assert_stored_outbound([msg2]) + + @inlineCallbacks + def test_event(self): + """ + When we consume or publish an event, it is added to the appropriate + batch in the message store. + """ + mw = yield self.mw_helper.create_middleware() + + # Store an outbound message so we have something to refer to. + outmsg = self.mw_helper.make_outbound("outbound", tag=self.tag) + yield mw.handle_publish_outbound(outmsg, 'default') + yield self.assert_stored_outbound([outmsg]) + + ack = self.mw_helper.make_ack(outmsg) + yield mw.handle_consume_event(ack, 'default') + yield self.assert_stored_event([ack]) + + nack = self.mw_helper.make_nack(outmsg) + yield mw.handle_publish_event(nack, 'default') + yield self.assert_stored_event([ack, nack]) + + @inlineCallbacks + def test_event_no_consume_store(self): + """ + When we consume an event, it is not added to the message store when + `store_on_consume` is disabled. + """ + mw = yield self.mw_helper.create_middleware({ + 'store_on_consume': False, + }) + + # Store an outbound message so we have something to refer to. + outmsg = self.mw_helper.make_outbound("outbound", tag=self.tag) + yield mw.handle_publish_outbound(outmsg, 'default') + yield self.assert_stored_outbound([outmsg]) + + ack = self.mw_helper.make_ack(outmsg) + yield mw.handle_consume_event(ack, 'default') + yield self.assert_stored_event([]) + + nack = self.mw_helper.make_nack(outmsg) + yield mw.handle_publish_event(nack, 'default') + yield self.assert_stored_event([nack]) + + +class TestConversationStoringMiddleware(BaseStoringMiddlewareTestCase): + + @inlineCallbacks + def setUp(self): + self.mw_helper = self.add_helper( + MiddlewareHelper(ConversationStoringMiddleware)) + yield self.mw_helper.setup_vumi_api() + self.qms = self.mw_helper.get_vumi_api().get_query_message_store() + self.user_helper = yield self.mw_helper.make_user(u'user') + self.conv = yield self.user_helper.create_conversation(u'dummy_conv') + self.batch_id = self.conv.batch.key + @inlineCallbacks def test_conversation_cache_ttl_config(self): """ @@ -911,6 +1095,50 @@ def test_outbound_message_no_consume_store(self): yield mw.handle_publish_outbound(msg2, 'default') yield self.assert_stored_outbound([msg2]) + @inlineCallbacks + def test_event(self): + """ + When we consume or publish an event, it is added to the appropriate + batch in the message store. + """ + mw = yield self.mw_helper.create_middleware() + + # Store an outbound message so we have something to refer to. + outmsg = self.mw_helper.make_outbound("outbound", conv=self.conv) + yield mw.handle_publish_outbound(outmsg, 'default') + yield self.assert_stored_outbound([outmsg]) + + ack = self.mw_helper.make_ack(outmsg) + yield mw.handle_consume_event(ack, 'default') + yield self.assert_stored_event([ack]) + + nack = self.mw_helper.make_nack(outmsg) + yield mw.handle_publish_event(nack, 'default') + yield self.assert_stored_event([ack, nack]) + + @inlineCallbacks + def test_event_no_consume_store(self): + """ + When we consume an event, it is not added to the message store when + `store_on_consume` is disabled. + """ + mw = yield self.mw_helper.create_middleware({ + 'store_on_consume': False, + }) + + # Store an outbound message so we have something to refer to. + outmsg = self.mw_helper.make_outbound("outbound", conv=self.conv) + yield mw.handle_publish_outbound(outmsg, 'default') + yield self.assert_stored_outbound([outmsg]) + + ack = self.mw_helper.make_ack(outmsg) + yield mw.handle_consume_event(ack, 'default') + yield self.assert_stored_event([]) + + nack = self.mw_helper.make_nack(outmsg) + yield mw.handle_publish_event(nack, 'default') + yield self.assert_stored_event([nack]) + @inlineCallbacks def test_conversation_cached_for_inbound_message(self): """ @@ -938,7 +1166,7 @@ def test_conversation_cached_for_outbound_message(self): self.assertEqual(cache._models.keys(), [self.conv.key]) -class TestRouterStoringMiddleware(VumiTestCase): +class TestRouterStoringMiddleware(BaseStoringMiddlewareTestCase): @inlineCallbacks def setUp(self): @@ -948,18 +1176,7 @@ def setUp(self): self.qms = self.mw_helper.get_vumi_api().get_query_message_store() self.user_helper = yield self.mw_helper.make_user(u'user') self.router = yield self.user_helper.create_router(u'dummy_conv') - - @inlineCallbacks - def assert_stored_inbound(self, msgs): - page = yield self.qms.list_batch_inbound_keys(self.router.batch.key) - ids = yield collect_all_results(page) - self.assertEqual(sorted(ids), sorted(m['message_id'] for m in msgs)) - - @inlineCallbacks - def assert_stored_outbound(self, msgs): - page = yield self.qms.list_batch_outbound_keys(self.router.batch.key) - ids = yield collect_all_results(page) - self.assertEqual(sorted(ids), sorted(m['message_id'] for m in msgs)) + self.batch_id = self.router.batch.key @inlineCallbacks def test_inbound_message(self): @@ -1012,3 +1229,47 @@ def test_outbound_message_no_consume_store(self): msg2 = self.mw_helper.make_outbound("outbound", router=self.router) yield mw.handle_publish_outbound(msg2, 'default') yield self.assert_stored_outbound([msg2]) + + @inlineCallbacks + def test_event(self): + """ + When we consume or publish an event, it is added to the appropriate + batch in the message store. + """ + mw = yield self.mw_helper.create_middleware() + + # Store an outbound message so we have something to refer to. + outmsg = self.mw_helper.make_outbound("outbound", router=self.router) + yield mw.handle_publish_outbound(outmsg, 'default') + yield self.assert_stored_outbound([outmsg]) + + ack = self.mw_helper.make_ack(outmsg) + yield mw.handle_consume_event(ack, 'default') + yield self.assert_stored_event([ack]) + + nack = self.mw_helper.make_nack(outmsg) + yield mw.handle_publish_event(nack, 'default') + yield self.assert_stored_event([ack, nack]) + + @inlineCallbacks + def test_event_no_consume_store(self): + """ + When we consume an event, it is not added to the message store when + `store_on_consume` is disabled. + """ + mw = yield self.mw_helper.create_middleware({ + 'store_on_consume': False, + }) + + # Store an outbound message so we have something to refer to. + outmsg = self.mw_helper.make_outbound("outbound", router=self.router) + yield mw.handle_publish_outbound(outmsg, 'default') + yield self.assert_stored_outbound([outmsg]) + + ack = self.mw_helper.make_ack(outmsg) + yield mw.handle_consume_event(ack, 'default') + yield self.assert_stored_event([]) + + nack = self.mw_helper.make_nack(outmsg) + yield mw.handle_publish_event(nack, 'default') + yield self.assert_stored_event([nack]) From 43d53d09f29a003a52ed3b907776cda4ccc746dc Mon Sep 17 00:00:00 2001 From: Jeremy Thurgood Date: Wed, 29 Jul 2015 13:28:45 +0200 Subject: [PATCH 07/10] Use consolidated message store list methods. --- .../management/commands/go_account_stats.py | 8 +++--- .../commands/go_migrate_conversations.py | 12 ++++----- go/base/tests/test_go_manage_message_cache.py | 4 +-- .../tests/test_go_migrate_conversations.py | 12 ++++----- go/conversation/tasks.py | 10 +++---- go/conversation/tests.py | 8 +++--- go/conversation/view_definition.py | 10 +++---- go/vumitools/tests/test_middleware.py | 26 +++++++++---------- setup.py | 2 +- 9 files changed, 46 insertions(+), 46 deletions(-) diff --git a/go/base/management/commands/go_account_stats.py b/go/base/management/commands/go_account_stats.py index e8d9c418f..ed9d974a4 100644 --- a/go/base/management/commands/go_account_stats.py +++ b/go/base/management/commands/go_account_stats.py @@ -101,9 +101,9 @@ def _count_results(self, index_page): def do_batch_key(self, qms, batch_key): in_count = self._count_results( - qms.list_batch_inbound_keys(batch_key)) + qms.list_batch_inbound_messages(batch_key)) out_count = self._count_results( - qms.list_batch_outbound_keys(batch_key)) + qms.list_batch_outbound_messages(batch_key)) self.out(u'Total Received in batch %s: %s\n' % (batch_key, in_count)) self.out(u'Total Sent in batch %s: %s\n' % (batch_key, out_count)) @@ -123,9 +123,9 @@ def collect_stats(self, index_page): return per_date, uniques def do_batch_key_breakdown(self, qms, batch_key): - inbound = qms.list_batch_inbound_keys_with_addresses(batch_key) + inbound = qms.list_batch_inbound_messages(batch_key) inbound_per_date, inbound_uniques = self.collect_stats(inbound) - outbound = qms.list_batch_outbound_keys_with_addresses(batch_key) + outbound = qms.list_batch_outbound_messages(batch_key) outbound_per_date, outbound_uniques = self.collect_stats(outbound) all_uniques = inbound_uniques.union(outbound_uniques) diff --git a/go/base/management/commands/go_migrate_conversations.py b/go/base/management/commands/go_migrate_conversations.py index c118a3212..efbc29670 100644 --- a/go/base/management/commands/go_migrate_conversations.py +++ b/go/base/management/commands/go_migrate_conversations.py @@ -104,7 +104,7 @@ def applies_to(self, user_api, conv): def _process_pages(self, index_page, batch_id, get_message, add_message): while index_page is not None: - for key in index_page: + for key, _timestamp, _addr in index_page: add_message(get_message(key), batch_ids=[batch_id]) index_page = index_page.next_page() @@ -112,10 +112,10 @@ def _copy_msgs(self, old_batch, new_batch): qms = self.vumi_api.get_query_message_store() opms = self.vumi_api.get_operational_message_store() self._process_pages( - qms.list_batch_outbound_keys(old_batch), new_batch, + qms.list_batch_outbound_messages(old_batch), new_batch, opms.get_outbound_message, opms.add_outbound_message) self._process_pages( - qms.list_batch_inbound_keys(old_batch), new_batch, + qms.list_batch_inbound_messages(old_batch), new_batch, opms.get_inbound_message, opms.add_inbound_message) def migrate(self, user_api, conv): @@ -143,7 +143,7 @@ def applies_to(self, user_api, conv): def _process_pages(self, index_page, batch_id, get_message, add_message): while index_page is not None: - for key in index_page: + for key, _timestamp, _addr in index_page: add_message(get_message(key), batch_ids=[batch_id]) index_page = index_page.next_page() @@ -151,10 +151,10 @@ def _copy_msgs(self, old_batch, new_batch): qms = self.vumi_api.get_query_message_store() opms = self.vumi_api.get_operational_message_store() self._process_pages( - qms.list_batch_outbound_keys(old_batch), new_batch, + qms.list_batch_outbound_messages(old_batch), new_batch, opms.get_outbound_message, opms.add_outbound_message) self._process_pages( - qms.list_batch_inbound_keys(old_batch), new_batch, + qms.list_batch_inbound_messages(old_batch), new_batch, opms.get_inbound_message, opms.add_inbound_message) def migrate(self, user_api, conv): diff --git a/go/base/tests/test_go_manage_message_cache.py b/go/base/tests/test_go_manage_message_cache.py index 37fa32e52..907cdfee4 100644 --- a/go/base/tests/test_go_manage_message_cache.py +++ b/go/base/tests/test_go_manage_message_cache.py @@ -45,13 +45,13 @@ def needs_rebuild(self, batch_id, delta=0.01): If the cached values are off by the delta then this returns True. """ inbound = float(self.count_results( - self.qms.list_batch_inbound_keys(batch_id))) + self.qms.list_batch_inbound_messages(batch_id))) cached_inbound = self.qms.get_batch_inbound_count(batch_id) if inbound and (abs(cached_inbound - inbound) / inbound) > delta: return True outbound = float(self.count_results( - self.qms.list_batch_outbound_keys(batch_id))) + self.qms.list_batch_outbound_messages(batch_id))) cached_outbound = self.qms.get_batch_outbound_count(batch_id) if outbound and (abs(cached_outbound - outbound) / outbound) > delta: return True diff --git a/go/base/tests/test_go_migrate_conversations.py b/go/base/tests/test_go_migrate_conversations.py index 548751a8d..2224d3e50 100644 --- a/go/base/tests/test_go_migrate_conversations.py +++ b/go/base/tests/test_go_migrate_conversations.py @@ -12,11 +12,11 @@ from go.vumitools.tests.helpers import GoMessageHelper -def collect_all_results(index_page, results=None): +def collect_all_keys(index_page, results=None): if results is None: results = set() while index_page is not None: - results.update(index_page) + results.update(result[0] for result in index_page) index_page = index_page.next_page() return results @@ -167,11 +167,11 @@ def assert_batches_fixed(self, old_conv): old_out, old_in = set(), set() for batch in old_batches: - collect_all_results(qms.list_batch_outbound_keys(batch), old_out) - collect_all_results(qms.list_batch_inbound_keys(batch), old_in) + collect_all_keys(qms.list_batch_outbound_messages(batch), old_out) + collect_all_keys(qms.list_batch_inbound_messages(batch), old_in) - new_out = collect_all_results(qms.list_batch_outbound_keys(new_batch)) - new_in = collect_all_results(qms.list_batch_inbound_keys(new_batch)) + new_out = collect_all_keys(qms.list_batch_outbound_messages(new_batch)) + new_in = collect_all_keys(qms.list_batch_inbound_messages(new_batch)) self.assertEqual(new_out, old_out) self.assertEqual(new_in, old_in) diff --git a/go/conversation/tasks.py b/go/conversation/tasks.py index b798edeeb..2c6fbec87 100644 --- a/go/conversation/tasks.py +++ b/go/conversation/tasks.py @@ -52,9 +52,9 @@ def row_for_inbound_message(message): def row_for_outbound_message(message, qms): - event_keys = qms.list_message_event_keys(message["message_id"]) + event_keys = qms.list_message_events(message["message_id"]) # NOTE: We assume only one page of results here. - events = sorted((qms.get_event(key) for key in event_keys), + events = sorted((qms.get_event(key[0]) for key in event_keys), key=lambda event: event['timestamp'], reverse=True) row = dict((field, unicode(message.payload[field])) @@ -91,11 +91,11 @@ def load_messages_in_chunks(conversation, direction='inbound', modified on the fly. """ if direction == 'inbound': - index_page = conversation.qms.list_batch_inbound_keys( + index_page = conversation.qms.list_batch_inbound_messages( conversation.batch.key) get_msg = conversation.qms.get_inbound_message elif direction == 'outbound': - index_page = conversation.qms.list_batch_outbound_keys( + index_page = conversation.qms.list_batch_outbound_messages( conversation.batch.key) get_msg = conversation.qms.get_outbound_message else: @@ -104,7 +104,7 @@ def load_messages_in_chunks(conversation, direction='inbound', (direction,)) while index_page is not None: - messages = [get_msg(key) for key in index_page] + messages = [get_msg(key[0]) for key in index_page] yield conversation.filter_and_scrub_messages( messages, include_sensitive=include_sensitive, scrubber=scrubber) index_page = index_page.next_page() diff --git a/go/conversation/tests.py b/go/conversation/tests.py index 3ff6215f7..b94bc95f2 100644 --- a/go/conversation/tests.py +++ b/go/conversation/tests.py @@ -1136,13 +1136,13 @@ def test_export_conversation_messages_unsorted(self): reader = csv.DictReader(fp) message_ids = [row['message_id'] for row in reader] all_keys = set() - index_page = conv.qms.list_batch_inbound_keys(conv.batch.key) + index_page = conv.qms.list_batch_inbound_messages(conv.batch.key) while index_page is not None: - all_keys.update(index_page) + all_keys.update(mt[0] for mt in index_page) index_page = index_page.next_page() - index_page = conv.qms.list_batch_outbound_keys(conv.batch.key) + index_page = conv.qms.list_batch_outbound_messages(conv.batch.key) while index_page is not None: - all_keys.update(index_page) + all_keys.update(mt[0] for mt in index_page) index_page = index_page.next_page() self.assertEqual(set(message_ids), all_keys) diff --git a/go/conversation/view_definition.py b/go/conversation/view_definition.py index 19003334a..7f8e0a130 100644 --- a/go/conversation/view_definition.py +++ b/go/conversation/view_definition.py @@ -253,7 +253,7 @@ def add_event_status(msg): return msg msg.event_status = u"Sending" qms = conversation.qms - get_event_info = qms.list_message_event_keys_with_statuses + get_event_info = qms.list_message_events for event_id, _, event_type in get_event_info(msg["message_id"]): if event_type == u"ack": msg.event_status = u"Accepted" @@ -604,14 +604,14 @@ def get_aggregate_counts(self, conv, direction): Get aggregated total count of messages handled bucketed per day. """ message_callback = { - 'inbound': conv.qms.list_batch_inbound_keys_with_timestamps, - 'outbound': conv.qms.list_batch_outbound_keys_with_timestamps, - }.get(direction, conv.qms.list_batch_inbound_keys_with_timestamps) + 'inbound': conv.qms.list_batch_inbound_messages, + 'outbound': conv.qms.list_batch_outbound_messages, + }.get(direction, conv.qms.list_batch_inbound_messages) aggregates = defaultdict(int) index_page = message_callback(conv.batch.key) while index_page is not None: - for key, timestamp in index_page: + for key, timestamp, _addr in index_page: timestamp = parse_vumi_date(timestamp) aggregates[timestamp.date()] += 1 index_page = index_page.next_page() diff --git a/go/vumitools/tests/test_middleware.py b/go/vumitools/tests/test_middleware.py index b5900d864..c15477f2f 100644 --- a/go/vumitools/tests/test_middleware.py +++ b/go/vumitools/tests/test_middleware.py @@ -829,33 +829,33 @@ def msg_timestamp(msg): return format_vumi_date(msg['timestamp'])[:-7] + ".000000" -def event_tuple(event): - return (event['event_id'], msg_timestamp(event), event['event_type']) - - class BaseStoringMiddlewareTestCase(VumiTestCase): """ Base class for some assertion methods. """ + @inlineCallbacks + def assert_stored_messages(self, page, expected_tuples): + msg_tuples = yield collect_all_results(page) + self.assertEqual(sorted(msg_tuples), sorted(expected_tuples)) + @inlineCallbacks def assert_stored_inbound(self, msgs): - page = yield self.qms.list_batch_inbound_keys(self.batch_id) - ids = yield collect_all_results(page) - self.assertEqual(sorted(ids), sorted(m['message_id'] for m in msgs)) + page = yield self.qms.list_batch_inbound_messages(self.batch_id) + totuple = lambda m: (m['message_id'], msg_timestamp(m), m['from_addr']) + self.assert_stored_messages(page, [totuple(m) for m in msgs]) @inlineCallbacks def assert_stored_outbound(self, msgs): - page = yield self.qms.list_batch_outbound_keys(self.batch_id) - ids = yield collect_all_results(page) - self.assertEqual(sorted(ids), sorted(m['message_id'] for m in msgs)) + page = yield self.qms.list_batch_outbound_messages(self.batch_id) + totuple = lambda m: (m['message_id'], msg_timestamp(m), m['to_addr']) + self.assert_stored_messages(page, [totuple(m) for m in msgs]) @inlineCallbacks def assert_stored_event(self, events): page = yield self.qms.list_batch_events(self.batch_id) - event_tuples = yield collect_all_results(page) - expected_tuples = [event_tuple(e) for e in events] - self.assertEqual(sorted(event_tuples), sorted(expected_tuples)) + totuple = lambda e: (e['event_id'], msg_timestamp(e), e['event_type']) + self.assert_stored_messages(page, [totuple(e) for e in events]) class TestTransportStoringMiddleware(BaseStoringMiddlewareTestCase): diff --git a/setup.py b/setup.py index e4a535f42..10e5643f4 100644 --- a/setup.py +++ b/setup.py @@ -14,7 +14,7 @@ include_package_data=True, install_requires=[ 'vumi>=0.5.21', - 'vumi-message-store', + 'vumi-message-store>=0.1.4', 'vxsandbox>=0.5.0', 'vxpolls', 'vumi-wikipedia>=0.2.1', From 1883e339c61e35ee16cd18ce703b6a15f5e24a87 Mon Sep 17 00:00:00 2001 From: Jeremy Thurgood Date: Wed, 29 Jul 2015 15:37:34 +0200 Subject: [PATCH 08/10] Use index queries for recent message list. --- go/conversation/view_definition.py | 6 +++-- go/vumitools/conversation/utils.py | 42 ++++++++++++++---------------- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/go/conversation/view_definition.py b/go/conversation/view_definition.py index 7f8e0a130..25be31dd2 100644 --- a/go/conversation/view_definition.py +++ b/go/conversation/view_definition.py @@ -269,13 +269,15 @@ def get_sent_messages(start, stop): return [add_event_status(m) for m in conversation.sent_messages_in_cache(start, stop)] + # FIXME: This is horrible and inefficient. + # https://github.com/praekelt/vumi-go/issues/1321 # Paginator starts counting at 1 so 0 would also be invalid inbound_message_paginator = Paginator(PagedMessageCache( - conversation.count_inbound_messages(), + min(conversation.count_inbound_messages(), 2000), lambda start, stop: conversation.received_messages_in_cache( start, stop)), 20) outbound_message_paginator = Paginator(PagedMessageCache( - conversation.count_outbound_messages(), + min(conversation.count_outbound_messages(), 2000), lambda start, stop: get_sent_messages(start, stop)), 20) tag_context = { diff --git a/go/vumitools/conversation/utils.py b/go/vumitools/conversation/utils.py index b42bff75b..362605766 100644 --- a/go/vumitools/conversation/utils.py +++ b/go/vumitools/conversation/utils.py @@ -1,8 +1,6 @@ # -*- test-case-name: go.vumitools.conversation.tests.test_utils -*- # -*- coding: utf-8 -*- -import warnings - from twisted.internet.defer import returnValue from vumi.persist.model import Manager @@ -258,14 +256,6 @@ def filter_and_scrub_messages(self, messages, include_sensitive, scrubber): collection.append(scrubbed_msg) return collection - def received_messages(self, start=0, limit=100, include_sensitive=False, - scrubber=None): - warnings.warn('received_messages() is deprecated. Please use ' - 'received_messages_in_cache() instead.', - category=DeprecationWarning) - return self.received_messages_in_cache(start, limit, include_sensitive, - scrubber) - @Manager.calls_manager def received_messages_in_cache(self, start=0, limit=100, include_sensitive=False, scrubber=None): @@ -273,6 +263,9 @@ def received_messages_in_cache(self, start=0, limit=100, Get a list of replies from the message store. The keys come from the message store's cache. + FIXME: The name of this method and many of its parameters are lies. + See https://github.com/praekelt/vumi-go/issues/1321 + :param int start: Where to start in the result set. :param int limit: @@ -290,8 +283,13 @@ def received_messages_in_cache(self, start=0, limit=100, # FIXME: limit actually means end, apparently. scrubber = scrubber or (lambda msg: msg) - keys = yield self.qms.list_batch_recent_inbound_keys(self.batch.key) - keys = keys[start:limit] + # If we're not actually being asked for anything, return nothing. + if limit <= 0: + returnValue([]) + + page = yield self.qms.list_batch_inbound_messages( + self.batch.key, page_size=limit) + keys = [key for key, _timestamp, _addr in list(page)[start:limit]] replies = yield self.collect_messages( keys, self.qms.get_inbound_message, include_sensitive, scrubber) @@ -301,14 +299,6 @@ def received_messages_in_cache(self, start=0, limit=100, sorted(replies, key=lambda msg: msg['timestamp'], reverse=True)) - def sent_messages(self, start=0, limit=100, include_sensitive=False, - scrubber=None): - warnings.warn('sent_messages() is deprecated. Please use ' - 'sent_messages_in_cache() instead.', - category=DeprecationWarning) - return self.sent_messages_in_cache(start, limit, include_sensitive, - scrubber) - @Manager.calls_manager def sent_messages_in_cache(self, start=0, limit=100, include_sensitive=False, scrubber=None): @@ -316,6 +306,9 @@ def sent_messages_in_cache(self, start=0, limit=100, Get a list of sent_messages from the message store. The keys come from the message store's cache. + FIXME: The name of this method and many of its parameters are lies. + See https://github.com/praekelt/vumi-go/issues/1321 + :param int start: Where to start :param int limit: @@ -333,8 +326,13 @@ def sent_messages_in_cache(self, start=0, limit=100, # FIXME: limit actually means end, apparently. scrubber = scrubber or (lambda msg: msg) - keys = yield self.qms.list_batch_recent_outbound_keys(self.batch.key) - keys = keys[start:limit] + # If we're not actually being asked for anything, return nothing. + if limit <= 0: + returnValue([]) + + page = yield self.qms.list_batch_outbound_messages( + self.batch.key, page_size=limit) + keys = [key for key, _timestamp, _addr in list(page)[start:limit]] sent_messages = yield self.collect_messages( keys, self.qms.get_outbound_message, include_sensitive, scrubber) From 607d27e95ceeb0a56048f79e37d15a3cffc4f740 Mon Sep 17 00:00:00 2001 From: Jeremy Thurgood Date: Thu, 30 Jul 2015 12:09:40 +0200 Subject: [PATCH 09/10] Use index queries for throughput calculation. --- go/vumitools/conversation/tests/test_utils.py | 4 +-- go/vumitools/conversation/utils.py | 32 +++++++++++-------- 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/go/vumitools/conversation/tests/test_utils.py b/go/vumitools/conversation/tests/test_utils.py index 804ddb462..65a2595f5 100644 --- a/go/vumitools/conversation/tests/test_utils.py +++ b/go/vumitools/conversation/tests/test_utils.py @@ -393,7 +393,7 @@ def get_contacts(): def test_get_inbound_throughput(self): yield self.conv.start() yield self.msg_helper.add_inbound_to_conv( - self.conv, 20, time_multiplier=0) + self.conv, 20, time_multiplier=0, start_date=datetime.utcnow()) # 20 messages in 5 minutes = 4 messages per minute self.assertEqual( (yield self.conv.get_inbound_throughput()), 4) @@ -405,7 +405,7 @@ def test_get_inbound_throughput(self): def test_get_outbound_throughput(self): yield self.conv.start() yield self.msg_helper.add_outbound_to_conv( - self.conv, 20, time_multiplier=0) + self.conv, 20, time_multiplier=0, start_date=datetime.utcnow()) # 20 messages in 5 minutes = 4 messages per minute self.assertEqual( (yield self.conv.get_outbound_throughput()), 4) diff --git a/go/vumitools/conversation/utils.py b/go/vumitools/conversation/utils.py index 362605766..024296072 100644 --- a/go/vumitools/conversation/utils.py +++ b/go/vumitools/conversation/utils.py @@ -1,8 +1,10 @@ # -*- test-case-name: go.vumitools.conversation.tests.test_utils -*- # -*- coding: utf-8 -*- -from twisted.internet.defer import returnValue +from datetime import datetime, timedelta +from twisted.internet.defer import returnValue +from vumi.message import format_vumi_date from vumi.persist.model import Manager from go.vumitools.opt_out import OptOutStore @@ -380,13 +382,15 @@ def get_inbound_throughput(self, sample_time=300): """ Calculate how many inbound messages per minute we've been doing on average. + + NOTE: This will underestimate if there are more messages within the + sample time than will fit in an index page. """ - inbounds = yield self.qms.list_batch_recent_inbound_keys( - self.batch.key, with_timestamp=True) - if not inbounds: - returnValue(0.0) - threshold = inbounds[0][1] - sample_time - count = sum(1 for _, timestamp in inbounds if timestamp >= threshold) + start = format_vumi_date( + datetime.utcnow() - timedelta(seconds=sample_time)) + inbounds = yield self.qms.list_batch_inbound_messages( + self.batch.key, start=start) + count = sum(1 for _key, ts, _addr in inbounds if ts >= start) returnValue(count / (sample_time / 60.0)) @Manager.calls_manager @@ -394,13 +398,15 @@ def get_outbound_throughput(self, sample_time=300): """ Calculate how many outbound messages per minute we've been doing on average. + + NOTE: This will underestimate if there are more messages within the + sample time than will fit in an index page. """ - outbounds = yield self.qms.list_batch_recent_outbound_keys( - self.batch.key, with_timestamp=True) - if not outbounds: - returnValue(0.0) - threshold = outbounds[0][1] - sample_time - count = sum(1 for _, timestamp in outbounds if timestamp >= threshold) + start = format_vumi_date( + datetime.utcnow() - timedelta(seconds=(sample_time))) + outbounds = yield self.qms.list_batch_outbound_messages( + self.batch.key, start=start) + count = sum(1 for _key, ts, _addr in outbounds if ts >= start) returnValue(count / (sample_time / 60.0)) @Manager.calls_manager From c288cda617eb4d7207f26fdfa112c629658e4c53 Mon Sep 17 00:00:00 2001 From: Jeremy Thurgood Date: Fri, 31 Jul 2015 10:33:43 +0200 Subject: [PATCH 10/10] Improve throughput methods. --- go/vumitools/conversation/utils.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/go/vumitools/conversation/utils.py b/go/vumitools/conversation/utils.py index 024296072..951347a5e 100644 --- a/go/vumitools/conversation/utils.py +++ b/go/vumitools/conversation/utils.py @@ -384,14 +384,13 @@ def get_inbound_throughput(self, sample_time=300): average. NOTE: This will underestimate if there are more messages within the - sample time than will fit in an index page. + sample time than will fit in an index page, currently 1000. """ start = format_vumi_date( datetime.utcnow() - timedelta(seconds=sample_time)) inbounds = yield self.qms.list_batch_inbound_messages( self.batch.key, start=start) - count = sum(1 for _key, ts, _addr in inbounds if ts >= start) - returnValue(count / (sample_time / 60.0)) + returnValue(len(inbounds) / (sample_time / 60.0)) @Manager.calls_manager def get_outbound_throughput(self, sample_time=300): @@ -400,14 +399,13 @@ def get_outbound_throughput(self, sample_time=300): average. NOTE: This will underestimate if there are more messages within the - sample time than will fit in an index page. + sample time than will fit in an index page, currently 1000. """ start = format_vumi_date( datetime.utcnow() - timedelta(seconds=(sample_time))) outbounds = yield self.qms.list_batch_outbound_messages( self.batch.key, start=start) - count = sum(1 for _key, ts, _addr in outbounds if ts >= start) - returnValue(count / (sample_time / 60.0)) + returnValue(len(outbounds) / (sample_time / 60.0)) @Manager.calls_manager def get_opted_in_contact_address(self, contact, delivery_class):