From 13481b583a101fd52cf69309542bbd9e9ab24cf5 Mon Sep 17 00:00:00 2001 From: Shankari Date: Wed, 22 Jan 2025 22:30:24 -0800 Subject: [PATCH 1/2] Revert "Always run the computation of the user profile stats" This reverts commit 9d1f414b067cbb8411214fed88d45aaa8c6d5eae. Always running the computation of the user profile stats meant that we were retrieving all composite trips every hour. This could have possibly led to a significantly greater I/O load, since DocumentDB appears to be very slow for composite trip queries. However, we cannot just change the pipeline range to use confirmed trips instead of composite either, since the UI relies on composite trips. If the pipeline failed after creating confirmed trips but before creating composite trips, and we use composite trips for the pipeline range, the user will not see the trips in the UI, since we only pull draft trips for the time range after the pipeline is done https://github.com/e-mission/e-mission-server/pull/1011 The real fix is to split the user stats into pipeline-related and non-pipeline-related. I will make that change in a subsequent commit in the same PR --- emission/pipeline/intake_stage.py | 13 +++++++------ .../tests/analysisTests/intakeTests/TestUserStat.py | 4 ++-- emission/tests/netTests/TestPipeline.py | 2 -- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/emission/pipeline/intake_stage.py b/emission/pipeline/intake_stage.py index 9fb95c49a..d58be2b25 100644 --- a/emission/pipeline/intake_stage.py +++ b/emission/pipeline/intake_stage.py @@ -76,12 +76,6 @@ def run_intake_pipeline(process_number, uuid_list, skip_if_no_new_data=False): try: run_intake_pipeline_for_user(uuid, skip_if_no_new_data) - with ect.Timer() as gsr: - logging.info("*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10) - print(str(arrow.now()) + "*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10) - eaurs.get_and_store_user_stats(uuid, "analysis/composite_trip") - esds.store_pipeline_time(uuid, 'STORE_USER_STATS', - time.time(), gsr.elapsed) except Exception as e: esds.store_pipeline_error(uuid, "WHOLE_PIPELINE", time.time(), None) logging.exception("Found error %s while processing pipeline " @@ -206,3 +200,10 @@ def run_intake_pipeline_for_user(uuid, skip_if_no_new_data): esds.store_pipeline_time(uuid, ecwp.PipelineStages.CREATE_COMPOSITE_OBJECTS.name, time.time(), crt.elapsed) + with ect.Timer() as gsr: + logging.info("*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10) + print(str(arrow.now()) + "*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10) + eaurs.get_and_store_user_stats(uuid, "analysis/composite_trip") + + esds.store_pipeline_time(uuid, 'STORE_USER_STATS', + time.time(), gsr.elapsed) diff --git a/emission/tests/analysisTests/intakeTests/TestUserStat.py b/emission/tests/analysisTests/intakeTests/TestUserStat.py index 207aa0a98..7b2243322 100644 --- a/emission/tests/analysisTests/intakeTests/TestUserStat.py +++ b/emission/tests/analysisTests/intakeTests/TestUserStat.py @@ -44,7 +44,7 @@ def setUp(self): edb.get_profile_db().insert_one({"user_id": self.testUUID}) #etc.runIntakePipeline(self.testUUID) - etc.runIntakePipeline(self.testUUID) + epi.run_intake_pipeline_for_user(self.testUUID, skip_if_no_new_data = False) logging.debug("UUID = %s" % (self.testUUID)) def tearDown(self): @@ -118,4 +118,4 @@ def testLastCall(self): if __name__ == '__main__': # Configure logging for the test etc.configLogging() - unittest.main() + unittest.main() \ No newline at end of file diff --git a/emission/tests/netTests/TestPipeline.py b/emission/tests/netTests/TestPipeline.py index d404ac6e2..54f81ba8e 100644 --- a/emission/tests/netTests/TestPipeline.py +++ b/emission/tests/netTests/TestPipeline.py @@ -7,7 +7,6 @@ import emission.core.wrapper.localdate as ecwl import emission.tests.common as etc import emission.pipeline.intake_stage as epi -import emission.analysis.result.user_stat as eaurs from emission.net.api import pipeline @@ -39,7 +38,6 @@ def testNoAnalysisResults(self): def testAnalysisResults(self): self.assertEqual(pipeline.get_range(self.testUUID), (None, None)) epi.run_intake_pipeline_for_user(self.testUUID, skip_if_no_new_data = False) - eaurs.get_and_store_user_stats(self.testUUID, "analysis/composite_trip") pr = pipeline.get_range(self.testUUID) self.assertAlmostEqual(pr[0], 1440688739.672) self.assertAlmostEqual(pr[1], 1440729142.709) From 88bb35a79fda4026842e1b59d2e2e85a73893b09 Mon Sep 17 00:00:00 2001 From: Shankari Date: Thu, 23 Jan 2025 00:34:30 -0800 Subject: [PATCH 2/2] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20=20Refactor=20the=20us?= =?UTF-8?q?er=20stats=20into=20pipeline=20dependent=20and=20pipeline=20ind?= =?UTF-8?q?ependent=20buckets?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - The pipeline dependent stage is based on analysed data and only changes when the pipeline has finished running. Since it is infrequent, it can be more complicated and lower performance. - The pipeline independent stage is read from the raw data, so it has to be run every time the pipeline is run, roughly every hour. So it should make sure to run quick and fast queries. This change splits the existing function to pipeline dependent and independent stages and then invokes both of them properly from the pipeline. Also fixed a few tests: - adjusted `etc.runIntakePipeline` to call both the pipeline dependent and pipeline independent user stats - switched the pipeline call in the setup from `epi.run_intake_pipeline_for_user` to `etc.runIntakePipeline`. The previous call had `etc.runIntakePipeline` commented out, but it was part of the original commit, so I am not sure why it was commented out in the first place. But we need to invoke the pipeline independent stats, which are called outside of `epi.run_intake_pipeline_for_user` since they need to be generated even if the pipeline was skipped, so I think this is a meaningful change for this test. Testing done: - Launched a server with these changes - Connected an emulator to it - Faked a trip using the location simulation ``` START 2025-01-23 00:30:20.575034 POST /usercache/put START 2025-01-23 00:30:20.597320 POST /usercache/get END 2025-01-23 00:30:20.600963 POST /usercache/get 414ee37a-732e-4751-a6f6-0d83b41b27f0 0.002919912338256836 END 2025-01-23 00:30:21.948563 POST /usercache/put 414ee37a-732e-4751-a6f6-0d83b41b27f0 1.373345136642456 ``` There was only one profile ``` >>> edb.get_profile_db().count_documents({}) 1 ``` Before running the pipeline, it did not have any of these user stats ``` >>> edb.get_profile_db().find_one() {'_id': ObjectId('6791fc13b4b3fa6db902c42b'), 'user_id': UUID('414ee37a-732e-4751-a6f6-0d83b41b27f0'), 'mode': {}, 'mpg_array': [32.044384997811726], 'purpose': {}, 'source': 'Shankari', 'update_ts': datetime.datetime(2025, 1, 23, 0, 21, 39, 712000), 'client_app_version': '1.9.6', 'client_os_version': '18.1', 'curr_platform': 'ios', 'manufacturer': 'Apple', 'phone_lang': 'en'} ``` - and after the pipeline ran, it did ``` >>> edb.get_profile_db().find_one() {'_id': ObjectId('6791fc13b4b3fa6db902c42b'), 'user_id': UUID('414ee37a-732e-4751-a6f6-0d83b41b27f0'), 'mode': {}, 'mpg_array': [32.044384997811726], 'purpose': {}, 'source': 'Shankari', 'update_ts': datetime.datetime(2025, 1, 23, 0, 21, 39, 712000), 'client_app_version': '1.9.6', 'client_os_version': '18.1', 'curr_platform': 'ios', 'manufacturer': 'Apple', 'phone_lang': 'en', 'labeled_trips': 0, 'pipeline_range': {'start_ts': 1737620409.0311022, 'end_ts': 1737621020.497178}, 'total_trips': 1, 'last_call_ts': 1737621021.948551} ``` --- emission/analysis/result/user_stat.py | 41 ++++++++++++++----- emission/pipeline/intake_stage.py | 10 ++++- .../analysisTests/intakeTests/TestUserStat.py | 5 +-- emission/tests/common.py | 3 +- 4 files changed, 43 insertions(+), 16 deletions(-) diff --git a/emission/analysis/result/user_stat.py b/emission/analysis/result/user_stat.py index fa1d7ac95..06c820962 100644 --- a/emission/analysis/result/user_stat.py +++ b/emission/analysis/result/user_stat.py @@ -41,9 +41,10 @@ def update_user_profile(user_id: str, data: Dict[str, Any]) -> None: logging.debug(f"New profile: {user.getProfile()}") -def get_and_store_user_stats(user_id: str, trip_key: str) -> None: +def get_and_store_pipeline_dependent_user_stats(user_id: str, trip_key: str) -> None: """ - Aggregates and stores user statistics into the user profile. + Aggregates and stores pipeline dependent into the user profile. + These are statistics based on analysed data such as trips or labels. :param user_id: The UUID of the user. :type user_id: str @@ -52,7 +53,7 @@ def get_and_store_user_stats(user_id: str, trip_key: str) -> None: :return: None """ try: - logging.info(f"Starting get_and_store_user_stats for user_id: {user_id}, trip_key: {trip_key}") + logging.info(f"Starting get_and_store_pipeline_dependent_user_stats for user_id: {user_id}, trip_key: {trip_key}") ts = esta.TimeSeries.get_time_series(user_id) start_ts_result = ts.get_first_value_for_field(trip_key, "data.start_ts", pymongo.ASCENDING) @@ -68,11 +69,6 @@ def get_and_store_user_stats(user_id: str, trip_key: str) -> None: ) logging.info(f"Total trips: {total_trips}, Labeled trips: {labeled_trips}") - logging.info(f"user_id type: {type(user_id)}") - - last_call_ts = get_last_call_timestamp(ts) - logging.info(f"Last call timestamp: {last_call_ts}") - update_data = { "pipeline_range": { "start_ts": start_ts, @@ -80,12 +76,37 @@ def get_and_store_user_stats(user_id: str, trip_key: str) -> None: }, "total_trips": total_trips, "labeled_trips": labeled_trips, - "last_call_ts": last_call_ts } + logging.info(f"user_id type: {type(user_id)}") update_user_profile(user_id, update_data) logging.debug("User profile updated successfully.") except Exception as e: - logging.error(f"Error in get_and_store_user_stats for user_id {user_id}: {e}") \ No newline at end of file + logging.error(f"Error in get_and_store_dependent_user_stats for user_id {user_id}: {e}") + +def get_and_store_pipeline_independent_user_stats(user_id: str) -> None: + """ + Aggregates and stores pipeline indepedent statistics into the user profile. + These are statistics based on raw data, such as the last call, last push + or last location received. + + :param user_id: The UUID of the user. + :type user_id: str + :return: None + """ + + try: + logging.info(f"Starting get_and_store_pipeline_independent_user_stats for user_id: {user_id}") + ts = esta.TimeSeries.get_time_series(user_id) + last_call_ts = get_last_call_timestamp(ts) + logging.info(f"Last call timestamp: {last_call_ts}") + + update_data = { + "last_call_ts": last_call_ts + } + update_user_profile(user_id, update_data) + + except Exception as e: + logging.error(f"Error in get_and_store_independent_user_stats for user_id {user_id}: {e}") diff --git a/emission/pipeline/intake_stage.py b/emission/pipeline/intake_stage.py index d58be2b25..d430f89d7 100644 --- a/emission/pipeline/intake_stage.py +++ b/emission/pipeline/intake_stage.py @@ -76,6 +76,12 @@ def run_intake_pipeline(process_number, uuid_list, skip_if_no_new_data=False): try: run_intake_pipeline_for_user(uuid, skip_if_no_new_data) + with ect.Timer() as gsr: + logging.info("*" * 10 + "UUID %s: storing pipeline independent user stats " % uuid + "*" * 10) + print(str(arrow.now()) + "*" * 10 + "UUID %s: storing pipeline independent user stats " % uuid + "*" * 10) + eaurs.get_and_store_pipeline_independent_user_stats(uuid) + esds.store_pipeline_time(uuid, 'STORE_PIPELINE_INDEPENDENT_USER_STATS', + time.time(), gsr.elapsed) except Exception as e: esds.store_pipeline_error(uuid, "WHOLE_PIPELINE", time.time(), None) logging.exception("Found error %s while processing pipeline " @@ -203,7 +209,7 @@ def run_intake_pipeline_for_user(uuid, skip_if_no_new_data): with ect.Timer() as gsr: logging.info("*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10) print(str(arrow.now()) + "*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10) - eaurs.get_and_store_user_stats(uuid, "analysis/composite_trip") + eaurs.get_and_store_pipeline_dependent_user_stats(uuid, "analysis/composite_trip") - esds.store_pipeline_time(uuid, 'STORE_USER_STATS', + esds.store_pipeline_time(uuid, 'STORE_PIPELINE_DEPENDENT_USER_STATS', time.time(), gsr.elapsed) diff --git a/emission/tests/analysisTests/intakeTests/TestUserStat.py b/emission/tests/analysisTests/intakeTests/TestUserStat.py index 7b2243322..0da4f0777 100644 --- a/emission/tests/analysisTests/intakeTests/TestUserStat.py +++ b/emission/tests/analysisTests/intakeTests/TestUserStat.py @@ -43,8 +43,7 @@ def setUp(self): # Initialize the profile if it does not exist edb.get_profile_db().insert_one({"user_id": self.testUUID}) - #etc.runIntakePipeline(self.testUUID) - epi.run_intake_pipeline_for_user(self.testUUID, skip_if_no_new_data = False) + etc.runIntakePipeline(self.testUUID) logging.debug("UUID = %s" % (self.testUUID)) def tearDown(self): @@ -118,4 +117,4 @@ def testLastCall(self): if __name__ == '__main__': # Configure logging for the test etc.configLogging() - unittest.main() \ No newline at end of file + unittest.main() diff --git a/emission/tests/common.py b/emission/tests/common.py index 5c3ea8ca0..a5d4575c6 100644 --- a/emission/tests/common.py +++ b/emission/tests/common.py @@ -206,7 +206,8 @@ def runIntakePipeline(uuid): eaue.populate_expectations(uuid) eaum.create_confirmed_objects(uuid) eapcc.create_composite_objects(uuid) - eaurs.get_and_store_user_stats(uuid, "analysis/composite_trip") + eaurs.get_and_store_pipeline_dependent_user_stats(uuid, "analysis/composite_trip") + eaurs.get_and_store_pipeline_independent_user_stats(uuid) def configLogging():