Skip to content

Commit

Permalink
♻️ Refactor the user stats into pipeline dependent and pipeline indep…
Browse files Browse the repository at this point in the history
…endent buckets

- The pipeline dependent stage is based on analysed data and only changes when
  the pipeline has finished running. Since it is infrequent, it can be more
  complicated and lower performance.
- The pipeline independent stage is read from the raw data, so it has to be run
  every time the pipeline is run, roughly every hour. So it should make sure to
  run quick and fast queries.

This change splits the existing function to pipeline dependent and independent
stages and then invokes both of them properly from the pipeline.

Testing done:
- Launched a server with these changes
- Connected an emulator to it
- Faked a trip using the location simulation

```
START 2025-01-23 00:30:20.575034 POST /usercache/put
START 2025-01-23 00:30:20.597320 POST /usercache/get
END 2025-01-23 00:30:20.600963 POST /usercache/get 414ee37a-732e-4751-a6f6-0d83b41b27f0 0.002919912338256836
END 2025-01-23 00:30:21.948563 POST /usercache/put 414ee37a-732e-4751-a6f6-0d83b41b27f0 1.373345136642456
```

There was only one profile

```
>>> edb.get_profile_db().count_documents({})
1
```

Before running the pipeline, it did not have any of these user stats

```
>>> edb.get_profile_db().find_one()
{'_id': ObjectId('6791fc13b4b3fa6db902c42b'), 'user_id': UUID('414ee37a-732e-4751-a6f6-0d83b41b27f0'), 'mode': {}, 'mpg_array': [32.044384997811726], 'purpose': {}, 'source': 'Shankari', 'update_ts': datetime.datetime(2025, 1, 23, 0, 21, 39, 712000), 'client_app_version': '1.9.6', 'client_os_version': '18.1', 'curr_platform': 'ios', 'manufacturer': 'Apple', 'phone_lang': 'en'}
```

- and after the pipeline ran, it did

```
>>> edb.get_profile_db().find_one()
{'_id': ObjectId('6791fc13b4b3fa6db902c42b'), 'user_id': UUID('414ee37a-732e-4751-a6f6-0d83b41b27f0'), 'mode': {}, 'mpg_array': [32.044384997811726], 'purpose': {}, 'source': 'Shankari', 'update_ts': datetime.datetime(2025, 1, 23, 0, 21, 39, 712000), 'client_app_version': '1.9.6', 'client_os_version': '18.1', 'curr_platform': 'ios', 'manufacturer': 'Apple', 'phone_lang': 'en', 'labeled_trips': 0, 'pipeline_range': {'start_ts': 1737620409.0311022, 'end_ts': 1737621020.497178}, 'total_trips': 1, 'last_call_ts': 1737621021.948551}
```
  • Loading branch information
shankari committed Jan 23, 2025
1 parent 13481b5 commit da7795e
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 12 deletions.
41 changes: 31 additions & 10 deletions emission/analysis/result/user_stat.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ def update_user_profile(user_id: str, data: Dict[str, Any]) -> None:
logging.debug(f"New profile: {user.getProfile()}")


def get_and_store_user_stats(user_id: str, trip_key: str) -> None:
def get_and_store_pipeline_dependent_user_stats(user_id: str, trip_key: str) -> None:
"""
Aggregates and stores user statistics into the user profile.
Aggregates and stores pipeline dependent into the user profile.
These are statistics based on analysed data such as trips or labels.
:param user_id: The UUID of the user.
:type user_id: str
Expand All @@ -52,7 +53,7 @@ def get_and_store_user_stats(user_id: str, trip_key: str) -> None:
:return: None
"""
try:
logging.info(f"Starting get_and_store_user_stats for user_id: {user_id}, trip_key: {trip_key}")
logging.info(f"Starting get_and_store_pipeline_dependent_user_stats for user_id: {user_id}, trip_key: {trip_key}")

ts = esta.TimeSeries.get_time_series(user_id)
start_ts_result = ts.get_first_value_for_field(trip_key, "data.start_ts", pymongo.ASCENDING)
Expand All @@ -68,24 +69,44 @@ def get_and_store_user_stats(user_id: str, trip_key: str) -> None:
)

logging.info(f"Total trips: {total_trips}, Labeled trips: {labeled_trips}")
logging.info(f"user_id type: {type(user_id)}")

last_call_ts = get_last_call_timestamp(ts)
logging.info(f"Last call timestamp: {last_call_ts}")

update_data = {
"pipeline_range": {
"start_ts": start_ts,
"end_ts": end_ts
},
"total_trips": total_trips,
"labeled_trips": labeled_trips,
"last_call_ts": last_call_ts
}

logging.info(f"user_id type: {type(user_id)}")
update_user_profile(user_id, update_data)

logging.debug("User profile updated successfully.")

except Exception as e:
logging.error(f"Error in get_and_store_user_stats for user_id {user_id}: {e}")
logging.error(f"Error in get_and_store_dependent_user_stats for user_id {user_id}: {e}")

def get_and_store_pipeline_independent_user_stats(user_id: str) -> None:
"""
Aggregates and stores pipeline indepedent statistics into the user profile.
These are statistics based on raw data, such as the last call, last push
or last location received.
:param user_id: The UUID of the user.
:type user_id: str
:return: None
"""

try:
logging.info(f"Starting get_and_store_pipeline_independent_user_stats for user_id: {user_id}")
ts = esta.TimeSeries.get_time_series(user_id)
last_call_ts = get_last_call_timestamp(ts)
logging.info(f"Last call timestamp: {last_call_ts}")

update_data = {
"last_call_ts": last_call_ts
}
update_user_profile(user_id, update_data)

except Exception as e:
logging.error(f"Error in get_and_store_independent_user_stats for user_id {user_id}: {e}")
10 changes: 8 additions & 2 deletions emission/pipeline/intake_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ def run_intake_pipeline(process_number, uuid_list, skip_if_no_new_data=False):

try:
run_intake_pipeline_for_user(uuid, skip_if_no_new_data)
with ect.Timer() as gsr:
logging.info("*" * 10 + "UUID %s: storing pipeline independent user stats " % uuid + "*" * 10)
print(str(arrow.now()) + "*" * 10 + "UUID %s: storing pipeline independent user stats " % uuid + "*" * 10)
eaurs.get_and_store_pipeline_independent_user_stats(uuid)
esds.store_pipeline_time(uuid, 'STORE_PIPELINE_INDEPENDENT_USER_STATS',
time.time(), gsr.elapsed)
except Exception as e:
esds.store_pipeline_error(uuid, "WHOLE_PIPELINE", time.time(), None)
logging.exception("Found error %s while processing pipeline "
Expand Down Expand Up @@ -203,7 +209,7 @@ def run_intake_pipeline_for_user(uuid, skip_if_no_new_data):
with ect.Timer() as gsr:
logging.info("*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10)
print(str(arrow.now()) + "*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10)
eaurs.get_and_store_user_stats(uuid, "analysis/composite_trip")
eaurs.get_and_store_pipeline_dependent_user_stats(uuid, "analysis/composite_trip")

esds.store_pipeline_time(uuid, 'STORE_USER_STATS',
esds.store_pipeline_time(uuid, 'STORE_PIPELINE_DEPENDENT_USER_STATS',
time.time(), gsr.elapsed)

0 comments on commit da7795e

Please sign in to comment.