Skip to content

Commit

Permalink
fix: updated longterm and deltat (#57)
Browse files Browse the repository at this point in the history
  • Loading branch information
stevemandl authored Oct 25, 2024
1 parent b47f581 commit 56d98cb
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 87 deletions.
106 changes: 60 additions & 46 deletions py_src/long_term_variance/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,87 +6,101 @@
import numpy as np
import pandas as pd
from requests.exceptions import ConnectionError as RequestConnectionError, HTTPError
from python_lib.utils import parse_event, fetch_trends, build_index, build_df, AnomalyError
from python_lib.utils import parse_event, fetch_trends, build_df, MeterAnomaly, now

# the window size we are analyzing. The assumption is the consumption is
# normally similar across this period and for the same period one year prior
# the window size we are analyzing. The assumption is the consumption is
# normally similar across this period and for the same period one year prior
RECENT_DAYS = 7
# the minimum acceptable length of the datapoints array
MIN_DATAPOINTS_LENGTH = int(RECENT_DAYS * 24 * 0.6)
# if the magnitude of the z score is greater than this, it's anomolous
Z_SCORE_THRESHOLD = 3.0
ANOMALY_PORTION = 0.5
ALGORITHM = "long_term_variance"


def run(event, _context):
"""
handler.run - the lambda function entry point
"""


# start out with blank response
response = {"statusCode": 200, "body": ""}
# start out with blank payload
payload = {}
# parse event and ensure timeStamp and pointName are present
params = parse_event(event)
print(f"long_term_variance run with params {params}")
print(f"{ALGORITHM} run with params {params}")

now = params.get("timeStamp")
year_ago = now - timedelta(365)
two_years_ago = now - timedelta(365*2)
start_time = now - timedelta(RECENT_DAYS)
end_time = params.get("timeStamp")
year_ago = end_time - timedelta(365)
two_years_ago = end_time - timedelta(365 * 2)
start_time = end_time - timedelta(RECENT_DAYS)
one_year_st = year_ago - timedelta(RECENT_DAYS)
two_year_st = two_years_ago - timedelta(RECENT_DAYS)
point_name = params.get("pointName")
try:
# read recent data, prior year, and prior 2 year
# fetch recent
df = build_df(fetch_trends(
point=point_name, start_time=start_time, end_time=now ))
# fetch recent
df = build_df(
fetch_trends(point=point_name, start_time=start_time, end_time=end_time)
)
year1 = fetch_trends(
point=point_name, start_time=one_year_st, end_time=year_ago )
point=point_name, start_time=one_year_st, end_time=year_ago
)
year2 = fetch_trends(
point=point_name, start_time=two_year_st, end_time=two_years_ago )
point=point_name, start_time=two_year_st, end_time=two_years_ago
)
a = np.array(year1[0]["datapoints"] + year2[0]["datapoints"])
if (a.size < MIN_DATAPOINTS_LENGTH*2) or (df.size < MIN_DATAPOINTS_LENGTH):
if (a.size < MIN_DATAPOINTS_LENGTH * 2) or (df.size < MIN_DATAPOINTS_LENGTH):
# not enough data to determine anomaly
return response
t_df = pd.DataFrame.from_records(a, columns = ("previous", "ts"), index="ts")
t_df.index = pd.to_datetime(t_df.index, unit='ms'
)
return payload
t_df = pd.DataFrame.from_records(a, columns=("previous", "ts"), index="ts")
t_df.index = pd.to_datetime(t_df.index, unit="ms")
df = df.merge(t_df, how="outer", on="ts").interpolate()
# add column to separate time of day into 8 bins
df["hour"] = df.index.hour
# 8 three-hour bins to aggregate the data
hourbins = np.array(range(0,24,3))
df["hourbin"] = np.digitize(df["hour"], bins= hourbins)
hourbins = np.array(range(0, 24, 3))
df["hourbin"] = np.digitize(df["hour"], bins=hourbins)
# compute mean and std of prior, compute z score for recent
result = df.groupby(["hourbin"], as_index=False).agg({"previous": ["mean", "std"], point_name: ["mean"]})
result.columns=['bin','prev_mean','prev_std','curr_mean']
result["z"]=(result["curr_mean"] - result["prev_mean"])/result["prev_std"]
anomoly_count = result.loc[lambda x: np.abs(x["z"]) > Z_SCORE_THRESHOLD]["z"].size
result = df.groupby(["hourbin"], as_index=False).agg(
{"previous": ["mean", "std"], point_name: ["mean"]}
)
result.columns = ["bin", "prev_mean", "prev_std", "curr_mean"]
result["z"] = (result["curr_mean"] - result["prev_mean"]) / result["prev_std"]
anomaly_count = result.loc[lambda x: np.abs(x["z"]) > Z_SCORE_THRESHOLD][
"z"
].size
# return anomaly if magnitude of z score is greater than threshold
if anomoly_count > (hourbins.size * ANOMALY_PORTION):
raise(AnomalyError(f"z score over {Z_SCORE_THRESHOLD} for more than {ANOMALY_PORTION:.0%} over past {RECENT_DAYS} relative to past 2 years"))
if anomaly_count > (hourbins.size * ANOMALY_PORTION):
calc_score = anomaly_count / hourbins.size
desc = f"z score over {Z_SCORE_THRESHOLD} for more than {ANOMALY_PORTION:.0%} over past {RECENT_DAYS} relative to past 2 years"
payload = MeterAnomaly(
point_name,
ALGORITHM,
now().isoformat(),
desc,
calc_score,
ANOMALY_PORTION,
start_time.isoformat(),
end_time.isoformat(),
)
except RequestConnectionError as err:
response[
"body"
] = f"""{point_name} ConnectionError:
{err.response.text} {start_time:%Y-%m-%d %H:%M} to {now:%Y-%m-%d %H:%M}"""
except AnomalyError as err:
response[
"body"
] = f"""{point_name} Anomaly Detected: {err} {start_time:%Y-%m-%d %H:%M} to {now:%Y-%m-%d %H:%M}"""
payload["error"] = err.response.text
except HTTPError as err:
response[
"body"
] = f"""{point_name} {err.response.text}
for the period {start_time:%Y-%m-%d %H:%M} to {now:%Y-%m-%d %H:%M}"""
payload["error"] = err.response.text
if err.response.status_code == 400:
try: # have to try decoding json
if err.response.json()["error"] == "No data":
response[
"body"
] = f"""{point_name} has no data
for the period {start_time:%Y-%m-%d %H:%M} to {now:%Y-%m-%d %H:%M}"""
payload = MeterAnomaly(
point_name,
ALGORITHM,
now().isoformat(),
"No data for the period.",
0,
MIN_DATAPOINTS_LENGTH,
start_time.isoformat(),
end_time.isoformat(),
)
except ValueError:
pass
return response
return payload
12 changes: 4 additions & 8 deletions py_src/long_term_variance/longterm_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,9 @@ def test_onetime(mocker):
side_effect = [data_noanom_1, data1, data_noanom_3]
)
result = run(event, None)
assert "statusCode" in result
# if not enough data is present to be conclusive,
# we expect the algorithm to return no anomaly
assert "" == result.get("body")
assert not result

def test_anom(mocker):
event = {
Expand All @@ -141,8 +140,7 @@ def test_anom(mocker):
side_effect = [data_anom_1, data_anom_2, data_anom_3]
)
result = run(event, None)
assert "statusCode" in result
assert "Anomaly Detected" in result.get("body")
assert "z score over" in result["description"]

def test_noanom(mocker):
event = {
Expand All @@ -159,8 +157,7 @@ def test_noanom(mocker):
side_effect = [data_noanom_1, data_noanom_2, data_noanom_3]
)
result = run(event, None)
assert "statusCode" in result
assert "" == result.get("body")
assert not result

def test_http_error_400_no_data(mocker):
event = {
Expand All @@ -182,5 +179,4 @@ def test_http_error_400_no_data(mocker):
)

result = run(event, None)
assert "statusCode" in result
assert "TestPoint has no data" in result.get("body")
assert "No data" in result["description"]
53 changes: 28 additions & 25 deletions py_src/low_delta_t/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,27 @@
from datetime import timedelta
import numpy as np
from requests.exceptions import ConnectionError as RequestConnectionError, HTTPError
from python_lib.utils import parse_event, fetch_trends, build_index, build_df, AnomalyError
from python_lib.utils import parse_event, fetch_trends, build_index, build_df, AnomalyError, MeterAnomaly, now

# the minimum acceptable length of the datapoints array
MIN_DATAPOINTS_LENGTH = int(7 * 24)
RECENT_DAYS = 2
ALGORITHM = "low_delta_t"
ANOMALY_THRESHOLD = 0.9

def run(event, _context):
"""
handler.run - the lambda function entry point
"""
# start out with blank response
response = {"statusCode": 200, "body": ""}
# start out with blank payload
payload = {}
# parse event and ensure timeStamp and pointName are present
params = parse_event(event)
print(f"low_delta_t run with params {params}")
print(f"{ALGORITHM} run with params {params}")

now = params.get("timeStamp")
year_ago = now - timedelta(365)
start_time = now - timedelta(RECENT_DAYS)
end_time = params.get("timeStamp")
year_ago = end_time - timedelta(365)
start_time = end_time - timedelta(RECENT_DAYS)
point_name = params.get("pointName")
device_name = point_name[:-4]
if point_name.endswith("TONS"):
Expand Down Expand Up @@ -63,30 +65,31 @@ def run(event, _context):
weighted_actual_dt = np.average(model_df[rtemp_name] - model_df[stemp_name], weights=model_df[flow_name])
model_dt = np.mean(model_df["DT_PRED"])
weighted_model_dt = np.average(model_df["DT_PRED"], weights=model_df[flow_name])
if actual_dt < model_dt * 0.9:
if actual_dt < model_dt * ANOMALY_THRESHOLD:
actual_pct = actual_dt / model_dt
response[ "body" ] = f"""{point_name} actual_pct {actual_pct:.2%} {actual_dt:.2f}/{model_dt:.2f} weighted {weighted_actual_dt:.2f}/{weighted_model_dt:.2f} for the period {start_time:%Y-%m-%d %H:%M} to {now:%Y-%m-%d %H:%M}"""
desc = f"DeltaT/ModelDT below expectations {actual_dt:.2f}/{model_dt:.2f} weighted: {weighted_actual_dt:.2f}/{weighted_model_dt:.2f}"
payload = MeterAnomaly(point_name, ALGORITHM, now().isoformat(), desc, actual_pct, ANOMALY_THRESHOLD, start_time.isoformat(), end_time.isoformat())
except RequestConnectionError as err:
response[
"body"
] = f"""{point_name} ConnectionError:
{err.response.text} {start_time:%Y-%m-%d %H:%M} to {now:%Y-%m-%d %H:%M}"""
payload["error"] = err.response.text
except AnomalyError as err:
response[
"body"
] = f"""{point_name} Error: {err} {start_time:%Y-%m-%d %H:%M} to {now:%Y-%m-%d %H:%M}"""
desc = f"Error: {err}"
payload = MeterAnomaly(point_name, ALGORITHM, now().isoformat(), desc, start_ts=start_time.isoformat(), end_ts=end_time.isoformat())
except HTTPError as err:
response[
"body"
] = f"""{point_name} {err.response.text}
for the period {start_time:%Y-%m-%d %H:%M} to {now:%Y-%m-%d %H:%M}"""
payload["error"] = err.response.text
if err.response.status_code == 400:
try: # have to try decoding json
if err.response.json()["error"] == "No data":
response[
"body"
] = f"""{point_name} has no data
for the period {start_time:%Y-%m-%d %H:%M} to {now:%Y-%m-%d %H:%M}"""
payload = MeterAnomaly(
point_name,
ALGORITHM,
now().isoformat(),
"No data for the period.",
0,
MIN_DATAPOINTS_LENGTH,
start_time.isoformat(),
end_time.isoformat(),
)
except ValueError:
pass
return response
return payload
12 changes: 4 additions & 8 deletions py_src/low_delta_t/low_delta_t_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ def test_handle400(mocker):
mocker.patch("low_delta_t.handler.fetch_trends", side_effect=HTTPError(response=r))
event = {"body": {"pointName": "foo/TONS"}}
result = run(event, None)
assert "statusCode" in result
assert "no data" in result.get("body")
assert "No data" in result["description"]


def test_barf(mocker):
Expand All @@ -73,8 +72,7 @@ def test_barf(mocker):
mocker.patch("low_delta_t.handler.fetch_trends", side_effect=HTTPError(response=r))
event = {"body": {"pointName": "foo/Tons"}}
result = run(event, None)
assert "statusCode" in result
assert "qwerty" in result.get("body")
assert "qwerty" in result["error"]


def test_normal(mocker):
Expand All @@ -92,8 +90,7 @@ def test_normal(mocker):
side_effect=[normal_tons_data, normal_temps_flows_data],
)
result = run(event, None)
assert "statusCode" in result
assert "" == result.get("body")
assert not result


def test_anomaly(mocker):
Expand All @@ -110,5 +107,4 @@ def test_anomaly(mocker):
side_effect=[anom_tons_data, anom_temps_flows_data],
)
result = run(event, None)
assert "statusCode" in result
assert "ClarkHall.CW.FP/TONS actual_pct" in result.get("body")
assert "DeltaT/ModelDT" in result["description"]

0 comments on commit 56d98cb

Please sign in to comment.