Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scalability fixes - Load model once per user #944

Merged
merged 18 commits into from
Dec 2, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions emission/core/get_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pymongo
import os
import json
import logging

try:
config_file = open('conf/storage/db.conf')
Expand Down Expand Up @@ -37,9 +38,18 @@
_current_db = MongoClient(url, uuidRepresentation='pythonLegacy')[db_name]
#config_file.close()

# Store the latest model globally for implementing controlled access and allow model loading only once
_model_db = None

shankari marked this conversation as resolved.
Show resolved Hide resolved
def _get_current_db():
return _current_db

def _get_model_db():
return _model_db

def _set_model_db(model_db):
_model_db = model_db

shankari marked this conversation as resolved.
Show resolved Hide resolved
def get_token_db():
Tokens= _get_current_db().Stage_Tokens
return Tokens
Expand Down Expand Up @@ -100,7 +110,6 @@ def update_routeDistanceMatrix_db(user_id, method, updatedMatrix):
f.write(json.dumps(updatedMatrix))
f.close()


def get_client_db():
# current_db=MongoClient().Stage_database
Clients = _get_current_db().Stage_clients
Expand Down Expand Up @@ -231,10 +240,16 @@ def get_model_db():
" will eventually delete them. This means that the elements are essentially
" getting updated, only over time and as a log-structured filesystem.
"""
ModelDB = _get_current_db().Stage_updateable_models
ModelDB.create_index([("user_id", pymongo.ASCENDING)])
ModelDB.create_index([("metadata.key", pymongo.ASCENDING)])
ModelDB.create_index([("metadata.write_ts", pymongo.DESCENDING)])
ModelDB = _get_model_db()
if ModelDB == None:
logging.debug("Started model load in edb.get_model_db()...")
ModelDB = _get_current_db().Stage_updateable_models
ModelDB.create_index([("user_id", pymongo.ASCENDING)])
ModelDB.create_index([("metadata.key", pymongo.ASCENDING)])
ModelDB.create_index([("metadata.write_ts", pymongo.DESCENDING)])
_set_model_db(ModelDB)
logging.debug("Finished model load in edb.get_model_db()...")
logging.debug("Fetched model in edb.get_model_db()")
return ModelDB

def _create_analysis_result_indices(tscoll):
Expand Down
25 changes: 19 additions & 6 deletions emission/storage/modifiable/builtin_model_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ def __init__(self, user_id):
super(BuiltinModelStorage, self).__init__(user_id)
self.key_query = lambda key: {"metadata.key": key}
self.user_query = {"user_id": self.user_id} # UUID is mandatory for this version
self.current_model = None
shankari marked this conversation as resolved.
Show resolved Hide resolved

def _get_model(self):
return self.current_model

def _set_model(self, model):
self.current_model = model

def upsert_model(self, key:str, model: ecwb.WrapperBase):
"""
Expand All @@ -34,12 +41,18 @@ def get_current_model(self, key:str) -> Optional[Dict]:
:return: the most recent database entry for this key
"""
find_query = {"user_id": self.user_id, "metadata.key": key}
result_it = edb.get_model_db().find(find_query).sort("metadata.write_ts", -1).limit(1)
# this differs from the timeseries `get_first_entry` only in the find query
# and the fact that the sort key and sort order are hardcoded
# everything below this point is identical
# but it is also fairly trivial, so I am not sure it is worth pulling
# out into common code at this point
result_it = self._get_model()
if result_it == None:
logging.debug("Started model load in builtin_model_storage.get_current_model()...")
result_it = edb.get_model_db().find(find_query).sort("metadata.write_ts", -1).limit(1)
# this differs from the timeseries `get_first_entry` only in the find query
# and the fact that the sort key and sort order are hardcoded
# everything below this point is identical
# but it is also fairly trivial, so I am not sure it is worth pulling
# out into common code at this point
self._set_model(result_it)
logging.debug("Finished model load in builtin_model_storage.get_current_model()...")
logging.debug("Fetched model in builtin_model_storage.get_current_model()...")
result_list = list(result_it)
if len(result_list) == 0:
return None
Expand Down