Skip to content
This repository has been archived by the owner on Jun 12, 2018. It is now read-only.

Switch to new message store #1310

Open
wants to merge 14 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go/apps/http_api/tests/test_vumi_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,8 @@ def test_invalid_in_reply_to_with_missing_conversation_key(self):
# create a message with no conversation
inbound_msg = self.app_helper.make_inbound('in 1', message_id='msg-1')
vumi_api = self.app_helper.vumi_helper.get_vumi_api()
yield vumi_api.mdb.add_inbound_message(inbound_msg)
opms = vumi_api.get_operational_message_store()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's with the _operational_? :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new message store is split into three pieces:

  • The batch manager does stuff related to batches.
  • The operational message store reads and writes messages by key only and is used by middleware and routing code.
  • The query message store is read-only and is used by everything that needs to list, search, filter, etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, it's the split interface and the other is _query_? /me keeps reading.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Ideally, most things only need to use one of the three. (Tests quite often need an extra operational store to create messages that the query store looks at.)

yield opms.add_inbound_message(inbound_msg)

msg = {
'content': 'foo',
Expand Down
3 changes: 2 additions & 1 deletion go/apps/http_api_nostream/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ def handle_PUT_in_reply_to(self, request, payload, in_reply_to):
user_account = request.getUser()
conversation = yield self.get_conversation(user_account)

reply_to = yield self.vumi_api.mdb.get_inbound_message(in_reply_to)
opms = self.vumi_api.get_operational_message_store()
reply_to = yield opms.get_inbound_message(in_reply_to)
if reply_to is None:
self.client_error_response(request, 'Invalid in_reply_to value')
return
Expand Down
3 changes: 2 additions & 1 deletion go/apps/http_api_nostream/tests/test_vumi_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,8 @@ def test_invalid_in_reply_to_with_missing_conversation_key(self):
# create a message with no (None) conversation
inbound_msg = self.app_helper.make_inbound('in 1', message_id='msg-1')
vumi_api = self.app_helper.vumi_helper.get_vumi_api()
yield vumi_api.mdb.add_inbound_message(inbound_msg)
opms = vumi_api.get_operational_message_store()
yield opms.add_inbound_message(inbound_msg)

msg = {
'content': 'foo',
Expand Down
8 changes: 4 additions & 4 deletions go/apps/jsbox/tests/test_message_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,24 @@ def setUp(self):
self.user_helper = yield self.vumi_helper.make_user(u"user")
self.app_worker.user_api = self.user_helper.user_api

self.message_store = self.vumi_helper.get_vumi_api().mdb
opms = self.vumi_helper.get_vumi_api().get_operational_message_store()

self.conversation = yield self.user_helper.create_conversation(
u'jsbox', started=True)

# store inbound
yield self.message_store.add_inbound_message(
yield opms.add_inbound_message(
self.msg_helper.make_inbound('hello'),
batch_ids=[self.conversation.batch.key])

# store outbound
outbound_msg = self.msg_helper.make_outbound('hello')
yield self.message_store.add_outbound_message(
yield opms.add_outbound_message(
outbound_msg, batch_ids=[self.conversation.batch.key])

# ack outbound
event = self.msg_helper.make_ack(outbound_msg)
yield self.message_store.add_event(event)
yield opms.add_event(event, batch_ids=[self.conversation.batch.key])

# monkey patch for when no conversation_key is provided
self.app_worker.conversation_for_api = lambda *a: self.conversation
Expand Down
4 changes: 2 additions & 2 deletions go/apps/tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ def __init__(self, worker_class, vumi_helper=None, **msg_helper_args):
self.vumi_helper = vumi_helper
self._app_helper = ApplicationHelper(
self._conversation_type(), self.vumi_helper)
self.msg_helper = GoMessageHelper(**msg_helper_args)
self.msg_helper = GoMessageHelper(
vumi_helper=vumi_helper, **msg_helper_args)
self.transport_name = self.msg_helper.transport_name
self.worker_helper = self.vumi_helper.get_worker_helper(
self.transport_name)
Expand Down Expand Up @@ -141,7 +142,6 @@ def get_app_worker(self, config=None, start=True, extra_worker=False):
# Set up our other bits of helper.
if not extra_worker:
self.vumi_helper.set_vumi_api(worker.vumi_api)
self.msg_helper.mdb = worker.vumi_api.mdb
returnValue(worker)

@inlineCallbacks
Expand Down
18 changes: 9 additions & 9 deletions go/base/management/commands/go_account_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,12 @@ def handle_stats(self, user, api, options):
return
conv_key = options[0]
conversation = api.get_wrapped_conversation(conv_key)
message_store = api.api.mdb
qms = api.api.get_query_message_store()
self.out(u'Conversation: %s\n' % (conversation.name,))

batch_key = conversation.batch.key
self.do_batch_key(message_store, batch_key)
self.do_batch_key_breakdown(message_store, batch_key)
self.do_batch_key(qms, batch_key)
self.do_batch_key_breakdown(qms, batch_key)

def _count_results(self, index_page):
count = 0
Expand All @@ -99,11 +99,11 @@ def _count_results(self, index_page):
index_page = index_page.next_page()
return count

def do_batch_key(self, message_store, batch_key):
def do_batch_key(self, qms, batch_key):
in_count = self._count_results(
message_store.batch_inbound_keys_page(batch_key))
qms.list_batch_inbound_messages(batch_key))
out_count = self._count_results(
message_store.batch_outbound_keys_page(batch_key))
qms.list_batch_outbound_messages(batch_key))
self.out(u'Total Received in batch %s: %s\n' % (batch_key, in_count))
self.out(u'Total Sent in batch %s: %s\n' % (batch_key, out_count))

Expand All @@ -122,10 +122,10 @@ def collect_stats(self, index_page):
index_page = index_page.next_page()
return per_date, uniques

def do_batch_key_breakdown(self, msg_store, batch_key):
inbound = msg_store.batch_inbound_keys_with_addresses(batch_key)
def do_batch_key_breakdown(self, qms, batch_key):
inbound = qms.list_batch_inbound_messages(batch_key)
inbound_per_date, inbound_uniques = self.collect_stats(inbound)
outbound = msg_store.batch_outbound_keys_with_addresses(batch_key)
outbound = qms.list_batch_outbound_messages(batch_key)
outbound_per_date, outbound_uniques = self.collect_stats(outbound)
all_uniques = inbound_uniques.union(outbound_uniques)

Expand Down
3 changes: 2 additions & 1 deletion go/base/management/commands/go_manage_message_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def _apply_command(self, func, dry_run=None):

def handle_command_rebuild(self, *args, **options):
def rebuild(batch_id):
self.vumi_api.mdb.reconcile_cache(batch_id)
qms = self.vumi_api.get_query_message_store()
self.vumi_api.get_batch_manager().rebuild_cache(batch_id, qms)

self._apply_command(rebuild)
50 changes: 27 additions & 23 deletions go/base/management/commands/go_migrate_conversations.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ class Migration(object):
"""Base class for migrations."""
name = None

def __init__(self, dry_run):
def __init__(self, dry_run, vumi_api):
self._dry_run = dry_run
self.vumi_api = vumi_api

@classmethod
def migrator_classes(cls):
Expand Down Expand Up @@ -103,23 +104,25 @@ def applies_to(self, user_api, conv):

def _process_pages(self, index_page, batch_id, get_message, add_message):
while index_page is not None:
for key in index_page:
for key, _timestamp, _addr in index_page:
add_message(get_message(key), batch_ids=[batch_id])
index_page = index_page.next_page()

def _copy_msgs(self, mdb, old_batch, new_batch):
def _copy_msgs(self, old_batch, new_batch):
qms = self.vumi_api.get_query_message_store()
opms = self.vumi_api.get_operational_message_store()
self._process_pages(
mdb.batch_outbound_keys_page(old_batch), new_batch,
mdb.get_outbound_message, mdb.add_outbound_message)
qms.list_batch_outbound_messages(old_batch), new_batch,
opms.get_outbound_message, opms.add_outbound_message)
self._process_pages(
mdb.batch_inbound_keys_page(old_batch), new_batch,
mdb.get_inbound_message, mdb.add_inbound_message)
qms.list_batch_inbound_messages(old_batch), new_batch,
opms.get_inbound_message, opms.add_inbound_message)

def migrate(self, user_api, conv):
conv_batches = conv.batches.keys()
new_batch = user_api.api.mdb.batch_start()
new_batch = user_api.api.get_batch_manager().batch_start()
for batch in conv_batches:
self._copy_msgs(user_api.api.mdb, batch, new_batch)
self._copy_msgs(batch, new_batch)
conv.batches.clear()
conv.batches.add_key(new_batch)
conv.save()
Expand All @@ -133,30 +136,31 @@ class SplitBatches(Migration):
" conversation batches to the new batch.")

def applies_to(self, user_api, conv):
mdb = user_api.api.mdb
tag_keys = mdb.current_tags.index_keys('current_batch', conv.batch.key)
if tag_keys:
return True
return False
batch_manager = user_api.api.get_batch_manager()
batch = batch_manager.get_batch(conv.batch.key)
current_tags = [batch_manager.get_tag_info(tag) for tag in batch.tags]
return any(ct.current_batch.key == batch.key for ct in current_tags)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing we can consider deprecating this code because we no longer have any batches like this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I don't want to make that kind of change in this PR, though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps let's make a ticket? Always keen to delete code that we'd otherwise have to maintain.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created #1323 for this.


def _process_pages(self, index_page, batch_id, get_message, add_message):
while index_page is not None:
for key in index_page:
for key, _timestamp, _addr in index_page:
add_message(get_message(key), batch_ids=[batch_id])
index_page = index_page.next_page()

def _copy_msgs(self, mdb, old_batch, new_batch):
def _copy_msgs(self, old_batch, new_batch):
qms = self.vumi_api.get_query_message_store()
opms = self.vumi_api.get_operational_message_store()
self._process_pages(
mdb.batch_outbound_keys_page(old_batch), new_batch,
mdb.get_outbound_message, mdb.add_outbound_message)
qms.list_batch_outbound_messages(old_batch), new_batch,
opms.get_outbound_message, opms.add_outbound_message)
self._process_pages(
mdb.batch_inbound_keys_page(old_batch), new_batch,
mdb.get_inbound_message, mdb.add_inbound_message)
qms.list_batch_inbound_messages(old_batch), new_batch,
opms.get_inbound_message, opms.add_inbound_message)

def migrate(self, user_api, conv):
old_batch = conv.batch.key
new_batch = user_api.api.mdb.batch_start()
self._copy_msgs(user_api.api.mdb, old_batch, new_batch)
new_batch = user_api.api.get_batch_manager().batch_start()
self._copy_msgs(old_batch, new_batch)
conv.batch.key = new_batch
conv.save()

Expand Down Expand Up @@ -246,7 +250,7 @@ def get_migrator(self, migration_name, dry_run):
migrator_cls = Migration.migrator_class(migration_name)
if migrator_cls is None:
return None
return migrator_cls(dry_run)
return migrator_cls(dry_run, self.vumi_api)

def handle_user(self, user, migrator):
user_api = self.user_api_for_user(user)
Expand Down
4 changes: 2 additions & 2 deletions go/base/management/commands/go_setup_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ def setup_routers(self, user, routers):
config = router_info.pop('config', {})
extra_inbound_endpoints = view_def.get_inbound_endpoints(config)
extra_outbound_endpoints = view_def.get_outbound_endpoints(config)
batch_id = user_api.api.mdb.batch_start()
batch_id = user_api.api.get_batch_manager().batch_start()

# We bypass the usual mechanisms so we can set the key ourselves.
router = user_api.router_store.routers(
Expand Down Expand Up @@ -341,7 +341,7 @@ def setup_conversations(self, user, conversations):
conversation_type = conv_info.pop('conversation_type')
view_def = get_conversation_view_definition(conversation_type)
config = conv_info.pop('config', {})
batch_id = user_api.api.mdb.batch_start()
batch_id = user_api.api.get_batch_manager().batch_start()
# We bypass the usual mechanisms so we can set the key ourselves.
conv = user_api.conversation_store.conversations(
conversation_key, user_account=user_api.user_account_key,
Expand Down
18 changes: 9 additions & 9 deletions go/base/management/commands/go_system_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,16 @@ def handle_command_conversation_types_by_month(self, *args, **options):
writer.writerow(row)

def _increment_msg_stats(self, conv, stats):
inbound_stats = conv.mdb.batch_inbound_stats(conv.batch.key)
outbound_stats = conv.mdb.batch_outbound_stats(conv.batch.key)
key = conv.batch.key
qms = conv.qms
from_addr_count = qms.get_batch_from_addr_count(key)
to_addr_count = qms.get_batch_to_addr_count(key)
stats["conversations_started"] += 1
stats["inbound_message_count"] += inbound_stats['total']
stats["outbound_message_count"] += outbound_stats['total']
stats["inbound_uniques"] += inbound_stats['unique_addresses']
stats["outbound_uniques"] += outbound_stats['unique_addresses']
stats["total_uniques"] += max(
inbound_stats['unique_addresses'],
outbound_stats['unique_addresses'])
stats["inbound_message_count"] += qms.get_batch_inbound_count(key)
stats["outbound_message_count"] += qms.get_batch_outbound_count(key)
stats["inbound_uniques"] += from_addr_count
stats["outbound_uniques"] += to_addr_count
stats["total_uniques"] += max(from_addr_count, to_addr_count)

def handle_command_message_counts_by_month(self, *args, **options):
month_stats = {}
Expand Down
17 changes: 7 additions & 10 deletions go/base/tests/test_go_manage_message_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,19 @@ class TestGoManageMessageCache(GoCommandTestCase):

def setUp(self):
self.setup_command(go_manage_message_cache.Command)
self.qms = self.vumi_helper.get_vumi_api().get_query_message_store()

def clear_batches(self, batches):
"""
Clear the message cache so that all batches need reconciliation.
"""
vumi_api = self.vumi_helper.get_vumi_api()
for batch_id in batches:
vumi_api.mdb.cache.clear_batch(batch_id)
self.qms.batch_info_cache.clear_batch(batch_id)
self.assert_batches_cleared(batches)

def assert_batches_cleared(self, batches):
vumi_api = self.vumi_helper.get_vumi_api()
for batch_id in batches:
self.assertFalse(vumi_api.mdb.cache.batch_exists(batch_id))
self.assertFalse(self.qms.batch_info_cache.batch_exists(batch_id))

def count_results(self, index_page):
count = 0
Expand All @@ -48,17 +47,15 @@ def needs_rebuild(self, batch_id, delta=0.01):
What an acceptable delta is for the cached values. Defaults to 0.01
If the cached values are off by the delta then this returns True.
"""
vumi_api = self.vumi_helper.get_vumi_api()

inbound = float(self.count_results(
vumi_api.mdb.batch_inbound_keys_page(batch_id)))
cached_inbound = vumi_api.mdb.cache.inbound_message_count(batch_id)
self.qms.list_batch_inbound_messages(batch_id)))
cached_inbound = self.qms.get_batch_inbound_count(batch_id)
if inbound and (abs(cached_inbound - inbound) / inbound) > delta:
return True

outbound = float(self.count_results(
vumi_api.mdb.batch_outbound_keys_page(batch_id)))
cached_outbound = vumi_api.mdb.cache.outbound_message_count(batch_id)
self.qms.list_batch_outbound_messages(batch_id)))
cached_outbound = self.qms.get_batch_outbound_count(batch_id)
if outbound and (abs(cached_outbound - outbound) / outbound) > delta:
return True

Expand Down
Loading