Skip to content

Commit

Permalink
Merge pull request #68 from andrewm4894/add-change-detection
Browse files Browse the repository at this point in the history
Add change detection
  • Loading branch information
andrewm4894 authored Dec 13, 2023
2 parents f4e1c95 + 0719f80 commit 9522674
Show file tree
Hide file tree
Showing 27 changed files with 402 additions and 20 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ Here is a list of features of Anomstack (emoji alert warning!)
13. 📦 - Dockerized for easy deployment.
14. 🔔 - Scores & Alerts saved to database so you can query them and do whatever you want with them.
15. 🏷️ - Add custom metric tags for more complex alert routing e.g. priority or subject area based.
16. 🔄 - Change detection jobs out of the box.

### Architecture

Expand All @@ -175,6 +176,7 @@ flowchart LR;
train[[train]]
score[[score]]
alert[[alert]]
change[[change]]
llmalert[[llmalert]]
plot[[plot]]
dashboardpy["dashboard.py"]
Expand All @@ -191,6 +193,7 @@ flowchart LR;
train
score
alert
change
llmalert
plot
end
Expand Down Expand Up @@ -235,6 +238,8 @@ flowchart LR;
score --> alert
score --> llmalert
score --> plot
ingest --> change
change --> alert
metric_batch --> dagster_jobs
Expand Down
8 changes: 3 additions & 5 deletions anomstack/alerts/asciiart.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ def make_alert_message(
normal_symbol=" ",
alert_float_format="{:,.2f}",
tags=None,
score_col="metric_score_smooth",
):
df_alert_metric = df_alert_metric.sort_values(
by="metric_timestamp", ascending=False
Expand All @@ -522,7 +523,6 @@ def make_alert_message(
df_alert_metric["metric_timestamp"]
)
x = df_alert_metric["metric_value"].round(2).values.tolist()
metric_batch = df_alert_metric["metric_batch"].unique()[0]
metric_name = df_alert_metric["metric_name"].unique()[0]
metric_timestamp_from = (
df_alert_metric["metric_timestamp"].min().strftime("%Y-%m-%d %H:%M")
Expand All @@ -532,11 +532,9 @@ def make_alert_message(
)
labels = (
np.where(df_alert_metric["metric_alert"] == 1, anomaly_symbol, normal_symbol)
+ (df_alert_metric["metric_score_smooth"].round(2) * 100)
.astype("int")
.astype("str")
+ (df_alert_metric[score_col].round(2) * 100).astype("int").astype("str")
+ "% "
) # + df_alert_metric['metric_timestamp'].astype('str').values).to_list()
)
data = zip(labels, x)
graph_title = f"{metric_name} ({metric_timestamp_from} to {metric_timestamp_to})"

Expand Down
5 changes: 3 additions & 2 deletions anomstack/alerts/email.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@


def send_email_with_plot(
df, metric_name, subject, body, attachment_name, threshold=0.8
df, metric_name, subject, body, attachment_name, threshold=0.8, score_col="metric_score_smooth"
) -> None:
"""
Sends an email with a plot attached.
Expand All @@ -29,6 +29,7 @@ def send_email_with_plot(
body (str): The body of the email.
attachment_name (str): The name of the attachment.
threshold (float, optional): The threshold for the anomaly detection. Defaults to 0.8.
score_col (str, optional): The name of the column containing the anomaly scores. Defaults to 'metric_score_smooth'.
Returns:
None
Expand All @@ -45,7 +46,7 @@ def send_email_with_plot(
with tempfile.NamedTemporaryFile(
prefix=attachment_name, suffix=".png", delete=False
) as temp:
fig = make_alert_plot(df, metric_name, threshold)
fig = make_alert_plot(df, metric_name, threshold, score_col)
fig.savefig(temp.name)

msg = MIMEMultipart()
Expand Down
6 changes: 5 additions & 1 deletion anomstack/alerts/send.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def send_alert(
threshold: float = 0.8,
description: str = "",
tags=None,
score_col: str = "metric_score_smooth",
) -> pd.DataFrame:
"""
Sends an alert using the specified alert methods.
Expand All @@ -34,7 +35,9 @@ def send_alert(
"""
logger = get_dagster_logger()
logger.info(f"alerts to send: \n{df}")
message = make_alert_message(df, description=description, tags=tags)
message = make_alert_message(
df, description=description, tags=tags, score_col=score_col
)
if "slack" in alert_methods:
send_alert_slack(title=title, message=message)
if "email" in alert_methods:
Expand All @@ -45,6 +48,7 @@ def send_alert(
body=message,
attachment_name=metric_name,
threshold=threshold,
score_col=score_col,
)

return df
2 changes: 1 addition & 1 deletion anomstack/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def process_yaml_file(yaml_file):
metric_batch = metric_specs["metric_batch"]
merged_specs = {**defaults, **metric_specs}

if merged_specs["disable_batch"] == True:
if merged_specs["disable_batch"]:
return None
for env_var in env_vars:
if env_var in os.environ:
Expand Down
222 changes: 222 additions & 0 deletions anomstack/jobs/change.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
"""
Generate change detection jobs and schedules.
"""

import os

import pandas as pd
from dagster import (
MAX_RUNTIME_SECONDS_TAG,
DefaultScheduleStatus,
JobDefinition,
ScheduleDefinition,
get_dagster_logger,
job,
op,
)

from anomstack.alerts.send import send_alert
from anomstack.config import specs
from anomstack.df.save import save_df
from anomstack.df.wrangle import wrangle_df
from anomstack.jinja.render import render
from anomstack.ml.change import detect_change
from anomstack.sql.read import read_sql
from anomstack.validate.validate import validate_df

ANOMSTACK_MAX_RUNTIME_SECONDS_TAG = os.getenv("ANOMSTACK_MAX_RUNTIME_SECONDS_TAG", 3600)


def build_change_job(spec) -> JobDefinition:
"""
Build job definitions for change jobs.
Args:
spec (dict): A dictionary containing the specifications for the change job.
Returns:
JobDefinition: A job definition for the change job.
"""

if spec.get("disable_change"):

@job(
name=f'{spec["metric_batch"]}_change_disabled',
tags={MAX_RUNTIME_SECONDS_TAG: ANOMSTACK_MAX_RUNTIME_SECONDS_TAG},
)
def _dummy_job():
@op(name=f'{spec["metric_batch"]}_change_noop')
def noop():
pass

noop()

return _dummy_job

logger = get_dagster_logger()

metric_batch = spec["metric_batch"]
db = spec["db"]
alert_methods = spec["alert_methods"]
table_key = spec["table_key"]
metric_tags = spec.get("metric_tags", {})
change_threshold = spec.get("change_threshold", 3.5)
change_detect_last_n = spec.get("change_detect_last_n", 1)

@job(
name=f"{metric_batch}_change",
tags={MAX_RUNTIME_SECONDS_TAG: ANOMSTACK_MAX_RUNTIME_SECONDS_TAG},
)
def _job():
"""
Get data for change detection.
Returns:
pd.DataFrame: A pandas DataFrame containing the data for change detection.
"""

@op(name=f"{metric_batch}_get_change_data")
def get_change_data() -> pd.DataFrame:
"""
Get data for change detection.
Returns:
pd.DataFrame: A pandas DataFrame containing the data for change detection.
"""
df_change = read_sql(render("change_sql", spec), db)
return df_change

@op(name=f"{metric_batch}_detect_changes")
def detect_changes(df_change) -> pd.DataFrame:
"""
Run change detection.
Returns:
pd.DataFrame: A pandas DataFrame containing the data for change detection.
"""
logger.info(f"running change detection on {len(df_change)} rows")
df_change_alerts = pd.DataFrame()
for metric_name in df_change["metric_name"].unique():
df_metric = df_change.query(
f"metric_name=='{metric_name}'"
).sort_values("metric_timestamp")
df_metric = detect_change(
df_metric, threshold=change_threshold, detect_last_n=change_detect_last_n
)
df_change_alerts = pd.concat([df_change_alerts, df_metric])
return df_change_alerts

@op(name=f"{metric_batch}_change_alerts_op")
def alert(df_change_alerts) -> pd.DataFrame:
"""
Alert on data.
Args:
df_change_alerts (pd.DataFrame): A pandas DataFrame containing the data for alerting.
Returns:
pd.DataFrame: A pandas DataFrame containing the data for alerting.
"""

if len(df_change_alerts) == 0:
logger.info("no alerts to send")
else:
for metric_name in df_change_alerts["metric_name"].unique():
logger.info(f"alerting on {metric_name}")
df_alert = df_change_alerts.query(f"metric_name=='{metric_name}'")
df_alert["metric_timestamp"] = pd.to_datetime(
df_alert["metric_timestamp"]
)
metric_timestamp_max = (
df_alert["metric_timestamp"].max().strftime("%Y-%m-%d %H:%M")
)
alert_title = (
f"Δ [{metric_name}] looks changed ({metric_timestamp_max}) Δ"
)
tags = {
"metric_batch": metric_batch,
"metric_name": metric_name,
"metric_timestamp": metric_timestamp_max,
"alert_type": "change",
**metric_tags[metric_name],
}
logger.debug(f"metric tags:\n{tags}")
df_alert = send_alert(
metric_name=metric_name,
title=alert_title,
df=df_alert,
threshold=change_threshold,
alert_methods=alert_methods,
tags=tags,
score_col="metric_score",
)

return df_change_alerts

@op(name=f"{metric_batch}_save_change_alerts")
def save_alerts(df_change_alerts: pd.DataFrame) -> pd.DataFrame:
"""
Save alerts to db.
Args:
df (DataFrame): A pandas DataFrame containing the alerts to be saved.
Returns:
DataFrame: A pandas DataFrame containing the saved alerts.
"""

if len(df_change_alerts) == 0:
logger.info("no alerts to save")
return df_change_alerts

df_change_alerts = df_change_alerts.query("metric_alert == 1")

if len(df_change_alerts) > 0:
df_change_alerts["metric_type"] = "change"
df_change_alerts["metric_alert"] = df_change_alerts[
"metric_alert"
].astype(float)
df_change_alerts = df_change_alerts[
[
"metric_timestamp",
"metric_batch",
"metric_name",
"metric_type",
"metric_alert",
]
]
df_change_alerts = df_change_alerts.rename(
columns={"metric_alert": "metric_value"}
)
df_change_alerts = wrangle_df(df_change_alerts)
df_change_alerts = validate_df(df_change_alerts)
logger.info(
f"saving {len(df_change_alerts)} change alerts to {db} {table_key}"
)
df_change_alerts = save_df(df_change_alerts, db, table_key)
else:
logger.info("no alerts to save")

return df_change_alerts

save_alerts(alert(detect_changes(get_change_data())))

return _job


# Build alert jobs and schedules.
change_jobs = []
change_schedules = []
for spec_name, spec in specs.items():
change_job = build_change_job(spec)
change_jobs.append(change_job)
if spec["change_default_schedule_status"] == "RUNNING":
change_default_schedule_status = DefaultScheduleStatus.RUNNING
else:
change_default_schedule_status = DefaultScheduleStatus.STOPPED
change_schedule = ScheduleDefinition(
job=change_job,
cron_schedule=spec["change_cron_schedule"],
default_status=change_default_schedule_status,
)
change_schedules.append(change_schedule)
12 changes: 11 additions & 1 deletion anomstack/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dagster import Definitions

from anomstack.jobs.alert import alert_jobs, alert_schedules
from anomstack.jobs.change import change_jobs, change_schedules
from anomstack.jobs.ingest import ingest_jobs, ingest_schedules
from anomstack.jobs.llmalert import llmalert_jobs, llmalert_schedules
from anomstack.jobs.plot import plot_jobs, plot_schedules
Expand All @@ -13,7 +14,15 @@

# from anomstack.sensors.failure import email_on_run_failure

jobs = ingest_jobs + train_jobs + score_jobs + alert_jobs + llmalert_jobs + plot_jobs
jobs = (
ingest_jobs
+ train_jobs
+ score_jobs
+ alert_jobs
+ llmalert_jobs
+ plot_jobs
+ change_jobs
)
# sensors = [email_on_run_failure]
schedules = (
ingest_schedules
Expand All @@ -22,6 +31,7 @@
+ alert_schedules
+ llmalert_schedules
+ plot_schedules
+ change_schedules
)

defs = Definitions(
Expand Down
Loading

0 comments on commit 9522674

Please sign in to comment.