Skip to content

Commit

Permalink
Fixed failing analysis pipeline tests involved in Label Inference
Browse files Browse the repository at this point in the history
A couple of pipeline tests were failing - TestLableInferencePipeline, TestExpectationPipeline.
Failed since they were calling placeholder_predict_n functions in eacili inferrers.
Had to update these to now receive trip_list instead of single trip.
  • Loading branch information
Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Nov 22, 2023
1 parent 433b40a commit 9032ce0
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 98 deletions.
4 changes: 2 additions & 2 deletions bin/monitor/delete_single_pipeline_state_and_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ def _email_2_user_list(email_list):
help="reset the pipeline for all on the specified platform")
# Add this back when we have the ability to turn off the pipeline
# or we handle parallel pipeline runs (by checking `curr_run_ts` properly below)
group.add_argument("-a", "--all", action="store_true", default=False,
help="reset the pipeline for all users")
# group.add_argument("-a", "--all", action="store_true", default=False,
# help="reset the pipeline for all users")
group.add_argument("-u", "--user_list", nargs='+',
help="user ids to reset the pipeline for")
group.add_argument("-e", "--email_list", nargs='+',
Expand Down
195 changes: 107 additions & 88 deletions emission/analysis/classification/inference/labels/inferrers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,109 +20,128 @@
# the user walks to the location it is to shop (e.g., because they don't have a basket on
# their bike), and the user bikes to the location four times more than they walk there.
# Obviously, it is a simplification.
def placeholder_predictor_0(trip):
return [
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2}
]
def placeholder_predictor_0(user_id, trip_list):
predictions_list = []
for trip in trip_list:
predictions = [
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2}
]
predictions_list.append(predictions)
return predictions_list


# The next placeholder scenario provides that same set of labels in 75% of cases and no
# labels in the rest.
def placeholder_predictor_1(trip):
return [
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2}
] if random.random() > 0.25 else []
def placeholder_predictor_1(user_id, trip_list):
predictions_list = []
for trip in trip_list:
predictions = [
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2}
] if random.random() > 0.25 else []
predictions_list.append(predictions)
return predictions_list



# This third scenario provides labels designed to test the soundness and resilience of
# the client-side inference processing algorithms.
def placeholder_predictor_2(trip):
# Timestamp2index gives us a deterministic way to match test trips with labels
# Hardcoded to match "test_july_22" -- clearly, this is just for testing
timestamp2index = {494: 5, 565: 4, 795: 3, 805: 2, 880: 1, 960: 0}
timestamp = trip["data"]["start_local_dt"]["hour"]*60+trip["data"]["start_local_dt"]["minute"]
index = timestamp2index[timestamp] if timestamp in timestamp2index else 0
return [
[

],
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2}
],
[
{"labels": {"mode_confirm": "drove_alone"}, "p": 0.8},
],
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2}
],
[
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35},
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15},
{"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05}
],
[
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35},
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15},
{"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05}
]
][index]
def placeholder_predictor_2(user_id, trip_list):
predictions_list = []
for trip in trip_list:
# Timestamp2index gives us a deterministic way to match test trips with labels
# Hardcoded to match "test_july_22" -- clearly, this is just for testing
timestamp2index = {494: 5, 565: 4, 795: 3, 805: 2, 880: 1, 960: 0}
timestamp = trip["data"]["start_local_dt"]["hour"]*60+trip["data"]["start_local_dt"]["minute"]
index = timestamp2index[timestamp] if timestamp in timestamp2index else 0
predictions = [
[

],
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2}
],
[
{"labels": {"mode_confirm": "drove_alone"}, "p": 0.8},
],
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2}
],
[
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35},
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15},
{"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05}
],
[
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35},
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15},
{"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05}
]
][index]
predictions_list.append(predictions)
return predictions_list


# This fourth scenario provides labels designed to test the expectation and notification system.
def placeholder_predictor_3(trip):
timestamp2index = {494: 5, 565: 4, 795: 3, 805: 2, 880: 1, 960: 0}
timestamp = trip["data"]["start_local_dt"]["hour"]*60+trip["data"]["start_local_dt"]["minute"]
index = timestamp2index[timestamp] if timestamp in timestamp2index else 0
return [
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.80},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.20}
],
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.80},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.20}
],
[
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "entertainment"}, "p": 0.70},
],
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.96},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.04}
],
[
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35},
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15},
{"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05}
],
[
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.60},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.25},
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.11},
{"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.04}
]
][index]
def placeholder_predictor_3(user_id, trip_list):
predictions_list = []
for trip in trip_list:
timestamp2index = {494: 5, 565: 4, 795: 3, 805: 2, 880: 1, 960: 0}
timestamp = trip["data"]["start_local_dt"]["hour"]*60+trip["data"]["start_local_dt"]["minute"]
index = timestamp2index[timestamp] if timestamp in timestamp2index else 0
predictions = [
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.80},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.20}
],
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.80},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.20}
],
[
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "entertainment"}, "p": 0.70},
],
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.96},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.04}
],
[
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35},
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15},
{"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05}
],
[
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.60},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.25},
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.11},
{"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.04}
]
][index]
predictions_list.append(predictions)
return predictions_list

# Placeholder that is suitable for a demo.
# Finds all unique label combinations for this user and picks one randomly
def placeholder_predictor_demo(trip):
def placeholder_predictor_demo(user_id, trip_list):
import random

import emission.core.get_database as edb
user = trip["user_id"]
unique_user_inputs = edb.get_analysis_timeseries_db().find({"user_id": user}).distinct("data.user_input")

unique_user_inputs = edb.get_analysis_timeseries_db().find({"user_id": user_id}).distinct("data.user_input")
predictions_list = []
if len(unique_user_inputs) == 0:
return []
predictions_list.append([])
return predictions_list
random_user_input = random.choice(unique_user_inputs) if random.randrange(0,10) > 0 else []

logging.debug(f"In placeholder_predictor_demo: found {len(unique_user_inputs)} for user {user}, returning value {random_user_input}")
return [{"labels": random_user_input, "p": random.random()}]
logging.debug(f"In placeholder_predictor_demo: found {len(unique_user_inputs)} for user {user_id}, returning value {random_user_input}")
predictions_list.append([{"labels": random_user_input, "p": random.random()}])
return predictions_list

# Non-placeholder implementation. First bins the trips, and then clusters every bin
# See emission.analysis.modelling.tour_model for more details
Expand All @@ -146,15 +165,15 @@ def predict_cluster_confidence_discounting(user_id, trip_list, max_confidence=No
model_type = eamtc.get_model_type()
model_storage = eamtc.get_model_storage()
labels_n_list = eamur.predict_labels_with_n(user_id, trip_list, model_type, model_storage)
labels_list = []
predictions_list = []
for labels, n in labels_n_list:
if n <= 0: # No model data or trip didn't match a cluster
logging.debug(f"In predict_cluster_confidence_discounting: n={n}; returning as-is")
labels_list.append(labels)
predictions_list.append(labels)
continue
confidence_coeff = n_to_confidence_coeff(n, max_confidence, first_confidence, confidence_multiplier)
logging.debug(f"In predict_cluster_confidence_discounting: n={n}; discounting with coefficient {confidence_coeff}")
labels = copy.deepcopy(labels)
for l in labels: l["p"] *= confidence_coeff
labels_list.append(labels)
return labels_list
predictions_list.append(labels)
return predictions_list
3 changes: 0 additions & 3 deletions emission/analysis/modelling/trip_model/run_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,9 @@ def predict_labels_with_n(
:param model_config: optional configuration for model, for debugging purposes
:return: a list of predictions
"""
# user_id = trip['user_id']
# Start timer
start_model_load_time = time.process_time()
model = _load_stored_trip_model(user_id, model_type, model_storage, model_config)
print(f"{arrow.now()} Inside predict_labels_n: Model load time = {time.process_time() - start_model_load_time}")
# End timer
predictions_list = []
print(f"{arrow.now()} Inside predict_labels_n: Predicting...")
start_predict_time = time.process_time()
Expand Down
8 changes: 4 additions & 4 deletions emission/core/get_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
try:
parsed=pymongo.uri_parser.parse_uri(url)
except:
print("URL not formatted, defaulting to \"openpath_stage\"")
db_name = "openpath_stage"
print("URL not formatted, defaulting to \"Stage_database\"")
db_name = "Stage_database"
else:
if parsed['database']:
db_name = parsed['database']
else:
print("URL does not specify a DB name, defaulting to \"openpath_stage\"")
db_name = "openpath_stage"
print("URL does not specify a DB name, defaulting to \"Stage_database\"")
db_name = "Stage_database"

print("Connecting to database URL "+url)
_current_db = MongoClient(url, uuidRepresentation='pythonLegacy')[db_name]
Expand Down
2 changes: 1 addition & 1 deletion emission/tests/pipelineTests/TestLabelInferencePipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def testIndividualAlgorithms(self):
self.assertEqual(entry["data"]["trip_id"], trip.get_id())
this_algorithm = ecwl.AlgorithmTypes(entry["data"]["algorithm_id"])
self.assertIn(this_algorithm, self.test_algorithms)
self.assertEqual(entry["data"]["prediction"], self.test_algorithms[this_algorithm](trip))
self.assertEqual(entry["data"]["prediction"], self.test_algorithms[this_algorithm](self.testUUID, [trip])[0])
self.assertEqual(entry["data"]["start_ts"], trip["data"]["start_ts"])
self.assertEqual(entry["data"]["end_ts"], trip["data"]["end_ts"])

Expand Down

0 comments on commit 9032ce0

Please sign in to comment.