diff --git a/emission/analysis/result/user_stat.py b/emission/analysis/result/user_stat.py index fa1d7ac95..06c820962 100644 --- a/emission/analysis/result/user_stat.py +++ b/emission/analysis/result/user_stat.py @@ -41,9 +41,10 @@ def update_user_profile(user_id: str, data: Dict[str, Any]) -> None: logging.debug(f"New profile: {user.getProfile()}") -def get_and_store_user_stats(user_id: str, trip_key: str) -> None: +def get_and_store_pipeline_dependent_user_stats(user_id: str, trip_key: str) -> None: """ - Aggregates and stores user statistics into the user profile. + Aggregates and stores pipeline dependent into the user profile. + These are statistics based on analysed data such as trips or labels. :param user_id: The UUID of the user. :type user_id: str @@ -52,7 +53,7 @@ def get_and_store_user_stats(user_id: str, trip_key: str) -> None: :return: None """ try: - logging.info(f"Starting get_and_store_user_stats for user_id: {user_id}, trip_key: {trip_key}") + logging.info(f"Starting get_and_store_pipeline_dependent_user_stats for user_id: {user_id}, trip_key: {trip_key}") ts = esta.TimeSeries.get_time_series(user_id) start_ts_result = ts.get_first_value_for_field(trip_key, "data.start_ts", pymongo.ASCENDING) @@ -68,11 +69,6 @@ def get_and_store_user_stats(user_id: str, trip_key: str) -> None: ) logging.info(f"Total trips: {total_trips}, Labeled trips: {labeled_trips}") - logging.info(f"user_id type: {type(user_id)}") - - last_call_ts = get_last_call_timestamp(ts) - logging.info(f"Last call timestamp: {last_call_ts}") - update_data = { "pipeline_range": { "start_ts": start_ts, @@ -80,12 +76,37 @@ def get_and_store_user_stats(user_id: str, trip_key: str) -> None: }, "total_trips": total_trips, "labeled_trips": labeled_trips, - "last_call_ts": last_call_ts } + logging.info(f"user_id type: {type(user_id)}") update_user_profile(user_id, update_data) logging.debug("User profile updated successfully.") except Exception as e: - logging.error(f"Error in get_and_store_user_stats for user_id {user_id}: {e}") \ No newline at end of file + logging.error(f"Error in get_and_store_dependent_user_stats for user_id {user_id}: {e}") + +def get_and_store_pipeline_independent_user_stats(user_id: str) -> None: + """ + Aggregates and stores pipeline indepedent statistics into the user profile. + These are statistics based on raw data, such as the last call, last push + or last location received. + + :param user_id: The UUID of the user. + :type user_id: str + :return: None + """ + + try: + logging.info(f"Starting get_and_store_pipeline_independent_user_stats for user_id: {user_id}") + ts = esta.TimeSeries.get_time_series(user_id) + last_call_ts = get_last_call_timestamp(ts) + logging.info(f"Last call timestamp: {last_call_ts}") + + update_data = { + "last_call_ts": last_call_ts + } + update_user_profile(user_id, update_data) + + except Exception as e: + logging.error(f"Error in get_and_store_independent_user_stats for user_id {user_id}: {e}") diff --git a/emission/pipeline/intake_stage.py b/emission/pipeline/intake_stage.py index 9fb95c49a..d430f89d7 100644 --- a/emission/pipeline/intake_stage.py +++ b/emission/pipeline/intake_stage.py @@ -77,10 +77,10 @@ def run_intake_pipeline(process_number, uuid_list, skip_if_no_new_data=False): try: run_intake_pipeline_for_user(uuid, skip_if_no_new_data) with ect.Timer() as gsr: - logging.info("*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10) - print(str(arrow.now()) + "*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10) - eaurs.get_and_store_user_stats(uuid, "analysis/composite_trip") - esds.store_pipeline_time(uuid, 'STORE_USER_STATS', + logging.info("*" * 10 + "UUID %s: storing pipeline independent user stats " % uuid + "*" * 10) + print(str(arrow.now()) + "*" * 10 + "UUID %s: storing pipeline independent user stats " % uuid + "*" * 10) + eaurs.get_and_store_pipeline_independent_user_stats(uuid) + esds.store_pipeline_time(uuid, 'STORE_PIPELINE_INDEPENDENT_USER_STATS', time.time(), gsr.elapsed) except Exception as e: esds.store_pipeline_error(uuid, "WHOLE_PIPELINE", time.time(), None) @@ -206,3 +206,10 @@ def run_intake_pipeline_for_user(uuid, skip_if_no_new_data): esds.store_pipeline_time(uuid, ecwp.PipelineStages.CREATE_COMPOSITE_OBJECTS.name, time.time(), crt.elapsed) + with ect.Timer() as gsr: + logging.info("*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10) + print(str(arrow.now()) + "*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10) + eaurs.get_and_store_pipeline_dependent_user_stats(uuid, "analysis/composite_trip") + + esds.store_pipeline_time(uuid, 'STORE_PIPELINE_DEPENDENT_USER_STATS', + time.time(), gsr.elapsed) diff --git a/emission/tests/analysisTests/intakeTests/TestUserStat.py b/emission/tests/analysisTests/intakeTests/TestUserStat.py index 207aa0a98..0da4f0777 100644 --- a/emission/tests/analysisTests/intakeTests/TestUserStat.py +++ b/emission/tests/analysisTests/intakeTests/TestUserStat.py @@ -43,7 +43,6 @@ def setUp(self): # Initialize the profile if it does not exist edb.get_profile_db().insert_one({"user_id": self.testUUID}) - #etc.runIntakePipeline(self.testUUID) etc.runIntakePipeline(self.testUUID) logging.debug("UUID = %s" % (self.testUUID)) diff --git a/emission/tests/common.py b/emission/tests/common.py index 5c3ea8ca0..a5d4575c6 100644 --- a/emission/tests/common.py +++ b/emission/tests/common.py @@ -206,7 +206,8 @@ def runIntakePipeline(uuid): eaue.populate_expectations(uuid) eaum.create_confirmed_objects(uuid) eapcc.create_composite_objects(uuid) - eaurs.get_and_store_user_stats(uuid, "analysis/composite_trip") + eaurs.get_and_store_pipeline_dependent_user_stats(uuid, "analysis/composite_trip") + eaurs.get_and_store_pipeline_independent_user_stats(uuid) def configLogging(): diff --git a/emission/tests/netTests/TestPipeline.py b/emission/tests/netTests/TestPipeline.py index d404ac6e2..54f81ba8e 100644 --- a/emission/tests/netTests/TestPipeline.py +++ b/emission/tests/netTests/TestPipeline.py @@ -7,7 +7,6 @@ import emission.core.wrapper.localdate as ecwl import emission.tests.common as etc import emission.pipeline.intake_stage as epi -import emission.analysis.result.user_stat as eaurs from emission.net.api import pipeline @@ -39,7 +38,6 @@ def testNoAnalysisResults(self): def testAnalysisResults(self): self.assertEqual(pipeline.get_range(self.testUUID), (None, None)) epi.run_intake_pipeline_for_user(self.testUUID, skip_if_no_new_data = False) - eaurs.get_and_store_user_stats(self.testUUID, "analysis/composite_trip") pr = pipeline.get_range(self.testUUID) self.assertAlmostEqual(pr[0], 1440688739.672) self.assertAlmostEqual(pr[1], 1440729142.709)