From ebe64f83633a56d5e360e62ea57bc09e5c268728 Mon Sep 17 00:00:00 2001 From: Andrew Maguire Date: Tue, 14 Nov 2023 12:12:23 +0000 Subject: [PATCH 1/2] add pre-commit changes --- .devcontainer/devcontainer.json | 2 +- .devcontainer/post_create_command.sh | 2 +- .github/workflows/branch_deployments.yml | 7 ++- .github/workflows/deploy.yml | 7 ++- .gitignore | 2 +- .streamlit/config.toml | 2 +- .vscode/settings.json | 2 +- Dockerfile.anomstack_code | 2 +- Dockerfile.anomstack_dashboard | 2 +- Dockerfile.dagster | 2 +- Makefile | 2 - anomstack/alerts/asciiart.py | 16 +++--- anomstack/alerts/email.py | 10 ++-- anomstack/alerts/send.py | 5 +- anomstack/alerts/slack.py | 3 +- anomstack/config.py | 7 +-- anomstack/df/resample.py | 11 ++-- anomstack/df/save.py | 13 ++--- anomstack/df/wrangle.py | 4 +- anomstack/external/aws/credentials.py | 9 ++-- anomstack/external/aws/s3.py | 23 +++++---- anomstack/external/duckdb/duckdb.py | 51 ++++++++++--------- anomstack/external/gcp/bigquery.py | 14 ++--- anomstack/external/gcp/credentials.py | 21 +++++--- anomstack/external/gcp/gcs.py | 25 ++++----- anomstack/external/snowflake/credentials.py | 17 +++---- anomstack/external/snowflake/snowflake.py | 31 +++++------ anomstack/fn/run.py | 9 ++-- anomstack/io/load.py | 17 ++++--- anomstack/io/save.py | 28 +++++----- anomstack/jobs/alert.py | 17 ++++--- anomstack/jobs/ingest.py | 18 ++++--- anomstack/jobs/llmalert.py | 30 ++++++----- anomstack/jobs/plot.py | 24 +++++---- anomstack/jobs/score.py | 23 +++++---- anomstack/jobs/train.py | 35 ++++++------- anomstack/llm/completion.py | 9 +++- anomstack/main.py | 8 +-- anomstack/ml/preprocess.py | 22 ++++---- anomstack/ml/train.py | 9 ++-- anomstack/plots/plot.py | 6 +-- anomstack/sensors/failure.py | 4 +- anomstack/sql/read.py | 5 +- anomstack/validate/validate.py | 27 ++++++---- dagster_docker.yaml | 2 +- dashboard.py | 10 ++-- docker-compose.yaml | 4 +- docs/deployment/README.md | 2 +- docs/deployment/gcp.md | 2 +- metrics/defaults/python/preprocess.py | 13 +++-- metrics/defaults/python/prompt.py | 4 +- metrics/defaults/sql/alerts.sql | 10 ++-- metrics/defaults/sql/plot.sql | 6 +-- metrics/defaults/sql/train.sql | 2 +- .../bigquery_example_simple.yaml | 10 ++-- .../example_simple/example_simple.yaml | 2 +- metrics/examples/freq/freq.yaml | 12 ++--- metrics/examples/gsod/gsod.sql | 32 ++++++------ metrics/examples/gsod/gsod.yaml | 2 +- metrics/examples/gtrends/gtrends.sql | 18 +++---- .../hackernews/hn_top_stories_scores.yaml | 2 +- metrics/examples/netdata/netdata.py | 4 +- metrics/examples/netdata/netdata.yaml | 2 +- .../python/python_ingest_simple/README.md | 2 +- .../python/python_ingest_simple/ingest.py | 12 ++--- .../python_ingest_simple.yaml | 2 +- .../s3_example_simple/s3_example_simple.yaml | 12 ++--- metrics/examples/sales/sales.sql | 8 +-- .../snowflake_example_simple.yaml | 12 ++--- metrics/examples/users/users.sql | 6 +-- metrics/examples/weather/ingest_weather.py | 4 +- metrics/examples/weather/weather.yaml | 2 +- .../weather_forecast/weather_forecast.sql | 8 ++- .../weather_forecast/weather_forecast.yaml | 2 +- metrics/examples/yfinance/yfinance.py | 9 ++-- metrics/examples/yfinance/yfinance.yaml | 2 +- requirements.txt | 2 +- workspace.yaml | 2 +- 78 files changed, 433 insertions(+), 374 deletions(-) diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index baa5b04..a85330a 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -3,4 +3,4 @@ "forwardPorts": [3000], "remoteUser": "root", "postCreateCommand": "bash .devcontainer/post_create_command.sh" -} \ No newline at end of file +} diff --git a/.devcontainer/post_create_command.sh b/.devcontainer/post_create_command.sh index 20ceee8..a1a9292 100644 --- a/.devcontainer/post_create_command.sh +++ b/.devcontainer/post_create_command.sh @@ -10,4 +10,4 @@ cp .example.env .env # docker compose up docker compose up -d -echo "done post create command" \ No newline at end of file +echo "done post create command" diff --git a/.github/workflows/branch_deployments.yml b/.github/workflows/branch_deployments.yml index 8f80144..80c3799 100644 --- a/.github/workflows/branch_deployments.yml +++ b/.github/workflows/branch_deployments.yml @@ -2,7 +2,7 @@ name: Serverless Branch Deployments on: pull_request: types: [opened, synchronize, reopened, closed] - + concurrency: # Cancel in-progress deploys to same branch group: ${{ github.ref }}/branch_deployments @@ -20,7 +20,7 @@ jobs: runs-on: ubuntu-20.04 outputs: build_info: ${{ steps.parse-workspace.outputs.build_info }} - + steps: - name: Prerun Checks id: prerun @@ -39,7 +39,7 @@ jobs: with: ref: ${{ github.head_ref }} path: project-repo - + - name: Python Executable Deploy if: steps.prerun.outputs.result == 'pex-deploy' uses: dagster-io/dagster-cloud-action/actions/build_deploy_python_executable@v0.1 @@ -75,4 +75,3 @@ jobs: organization_id: ${{ secrets.ORGANIZATION_ID }} env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index c856eb6..0f9faf2 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -4,7 +4,7 @@ on: branches: - "main" - "master" - + concurrency: # Cancel in-progress deploys to same branch group: ${{ github.ref }}/deploy @@ -22,7 +22,7 @@ jobs: runs-on: ubuntu-20.04 outputs: build_info: ${{ steps.parse-workspace.outputs.build_info }} - + steps: - name: Prerun Checks id: prerun @@ -41,7 +41,7 @@ jobs: with: ref: ${{ github.head_ref }} path: project-repo - + - name: Python Executable Deploy if: steps.prerun.outputs.result == 'pex-deploy' uses: dagster-io/dagster-cloud-action/actions/build_deploy_python_executable@v0.1 @@ -77,4 +77,3 @@ jobs: organization_id: ${{ secrets.ORGANIZATION_ID }} env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - diff --git a/.gitignore b/.gitignore index 3db6511..a405328 100644 --- a/.gitignore +++ b/.gitignore @@ -176,4 +176,4 @@ models/* # tmp tmp -tmpdata \ No newline at end of file +tmpdata diff --git a/.streamlit/config.toml b/.streamlit/config.toml index 8844683..049dc7b 100644 --- a/.streamlit/config.toml +++ b/.streamlit/config.toml @@ -2,4 +2,4 @@ primaryColor="#ff4b4b" backgroundColor="#0e1117" secondaryBackgroundColor="#262730" -textColor="#fafafa" \ No newline at end of file +textColor="#fafafa" diff --git a/.vscode/settings.json b/.vscode/settings.json index 77e50a5..d7cb347 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -8,4 +8,4 @@ "llmalert", "openai" ] -} \ No newline at end of file +} diff --git a/Dockerfile.anomstack_code b/Dockerfile.anomstack_code index 55578b1..b554e41 100644 --- a/Dockerfile.anomstack_code +++ b/Dockerfile.anomstack_code @@ -17,4 +17,4 @@ EXPOSE 4000 # CMD allows this to be overridden from run launchers or executors that want # to run other commands against your repository -CMD ["dagster", "api", "grpc", "-h", "0.0.0.0", "-p", "4000", "-f", "anomstack/main.py"] \ No newline at end of file +CMD ["dagster", "api", "grpc", "-h", "0.0.0.0", "-p", "4000", "-f", "anomstack/main.py"] diff --git a/Dockerfile.anomstack_dashboard b/Dockerfile.anomstack_dashboard index 5e38584..dd7d540 100644 --- a/Dockerfile.anomstack_dashboard +++ b/Dockerfile.anomstack_dashboard @@ -25,4 +25,4 @@ COPY .streamlit /opt/dagster/app/.streamlit EXPOSE 8501 -ENTRYPOINT ["streamlit", "run", "/opt/dagster/app/dashboard.py", "--server.port=8501", "--server.address=0.0.0.0"] \ No newline at end of file +ENTRYPOINT ["streamlit", "run", "/opt/dagster/app/dashboard.py", "--server.port=8501", "--server.address=0.0.0.0"] diff --git a/Dockerfile.dagster b/Dockerfile.dagster index bb34d36..43d8994 100644 --- a/Dockerfile.dagster +++ b/Dockerfile.dagster @@ -22,4 +22,4 @@ COPY dagster_docker.yaml $DAGSTER_HOME/dagster.yaml COPY workspace.yaml $DAGSTER_HOME -WORKDIR $DAGSTER_HOME \ No newline at end of file +WORKDIR $DAGSTER_HOME diff --git a/Makefile b/Makefile index 17f9161..2050a4e 100644 --- a/Makefile +++ b/Makefile @@ -79,5 +79,3 @@ show-help: printf "\n"; \ }' \ | more $(shell test $(shell uname) == Darwin && echo '--no-init --raw-control-chars') - - diff --git a/anomstack/alerts/asciiart.py b/anomstack/alerts/asciiart.py index 9601eb5..afe3af5 100644 --- a/anomstack/alerts/asciiart.py +++ b/anomstack/alerts/asciiart.py @@ -1,13 +1,15 @@ from __future__ import unicode_literals +import copy +import re +import sys + +import numpy as np +import pandas as pd + # copied from: https://raw.githubusercontent.com/kakwa/py-ascii-graph/09ca5901be94ec3563bdcc25d6396e18fd8ca5df/ascii_graph/__init__.py # copied from: https://raw.githubusercontent.com/nyurik/py-ascii-graph/fix-python310/ascii_graph/__init__.py -import pandas as pd -import numpy as np -import sys -import re -import copy if ( sys.version < "3" @@ -515,7 +517,9 @@ def make_alert_message( df_alert_metric = df_alert_metric.sort_values( by="metric_timestamp", ascending=False ).dropna() - df_alert_metric["metric_timestamp"] = pd.to_datetime(df_alert_metric["metric_timestamp"]) + df_alert_metric["metric_timestamp"] = pd.to_datetime( + df_alert_metric["metric_timestamp"] + ) x = df_alert_metric["metric_value"].round(2).values.tolist() metric_batch = df_alert_metric["metric_batch"].unique()[0] metric_name = df_alert_metric["metric_name"].unique()[0] diff --git a/anomstack/alerts/email.py b/anomstack/alerts/email.py index bb46cd9..3733a2a 100644 --- a/anomstack/alerts/email.py +++ b/anomstack/alerts/email.py @@ -5,14 +5,16 @@ import os import smtplib import ssl +import tempfile +from email import encoders +from email.mime.base import MIMEBase from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText -from email.mime.base import MIMEBase -from email import encoders -import tempfile -from anomstack.plots.plot import make_alert_plot + from dagster import get_dagster_logger +from anomstack.plots.plot import make_alert_plot + def send_email_with_plot( df, metric_name, subject, body, attachment_name, threshold=0.8 diff --git a/anomstack/alerts/send.py b/anomstack/alerts/send.py index 167ce27..bbac9b5 100644 --- a/anomstack/alerts/send.py +++ b/anomstack/alerts/send.py @@ -2,11 +2,12 @@ Helper functions to send alerts. """ -from dagster import get_dagster_logger import pandas as pd +from dagster import get_dagster_logger + from anomstack.alerts.asciiart import make_alert_message -from anomstack.alerts.slack import send_alert_slack from anomstack.alerts.email import send_email_with_plot +from anomstack.alerts.slack import send_alert_slack def send_alert( diff --git a/anomstack/alerts/slack.py b/anomstack/alerts/slack.py index f110569..71eff38 100644 --- a/anomstack/alerts/slack.py +++ b/anomstack/alerts/slack.py @@ -2,9 +2,10 @@ Helper functions for sending alerts via Slack. """ -import requests import json import os + +import requests from dagster import get_dagster_logger diff --git a/anomstack/config.py b/anomstack/config.py index 6882f19..8eb406c 100644 --- a/anomstack/config.py +++ b/anomstack/config.py @@ -3,9 +3,10 @@ """ import os -import yaml from pathlib import Path +import yaml + # environment variables that can be used to override the configuration. env_vars = ["ANOMSTACK_GCP_PROJECT_ID", "ANOMSTACK_MODEL_PATH", "ANOMSTACK_TABLE_KEY"] @@ -28,7 +29,7 @@ def process_yaml_file(yaml_file): None """ - with open(yaml_file, "r", encoding='utf-8') as f: + with open(yaml_file, "r", encoding="utf-8") as f: metric_specs = yaml.safe_load(f) metric_batch = metric_specs["metric_batch"] merged_specs = {**defaults, **metric_specs} @@ -45,7 +46,7 @@ def process_yaml_file(yaml_file): # load defaults -with open(defaults_dir / "defaults.yaml", "r", encoding='utf-8') as file: +with open(defaults_dir / "defaults.yaml", "r", encoding="utf-8") as file: defaults = yaml.safe_load(file) # load all the YAML files diff --git a/anomstack/df/resample.py b/anomstack/df/resample.py index a978754..d848e80 100644 --- a/anomstack/df/resample.py +++ b/anomstack/df/resample.py @@ -2,14 +2,13 @@ def resample(df, freq, freq_agg): - logger = get_dagster_logger() - df = df.set_index('metric_timestamp') - if freq_agg == 'mean': - df = df.groupby(['metric_batch', 'metric_name']).resample(freq).mean() - elif freq_agg == 'sum': - df = df.groupby(['metric_batch', 'metric_name']).resample(freq).sum() + df = df.set_index("metric_timestamp") + if freq_agg == "mean": + df = df.groupby(["metric_batch", "metric_name"]).resample(freq).mean() + elif freq_agg == "sum": + df = df.groupby(["metric_batch", "metric_name"]).resample(freq).sum() else: raise ValueError(f"Unsupported aggregation method: {freq_agg}") df = df.reset_index() diff --git a/anomstack/df/save.py b/anomstack/df/save.py index d956c66..6ac7b62 100644 --- a/anomstack/df/save.py +++ b/anomstack/df/save.py @@ -6,12 +6,13 @@ """ import pandas as pd -from anomstack.external.gcp.bigquery import save_df_bigquery + from anomstack.external.duckdb.duckdb import save_df_duckdb +from anomstack.external.gcp.bigquery import save_df_bigquery from anomstack.external.snowflake.snowflake import save_df_snowflake -def save_df(df, db, table_key, if_exists='append') -> pd.DataFrame: +def save_df(df, db, table_key, if_exists="append") -> pd.DataFrame: """ Save a Pandas DataFrame to a database. @@ -24,13 +25,13 @@ def save_df(df, db, table_key, if_exists='append') -> pd.DataFrame: Returns: - The Pandas DataFrame that was saved. """ - if db=='bigquery': + if db == "bigquery": df = save_df_bigquery(df, table_key, if_exists) - elif db=='snowflake': + elif db == "snowflake": df = save_df_snowflake(df, table_key) - elif db=='duckdb': + elif db == "duckdb": df = save_df_duckdb(df, table_key) else: - raise ValueError(f'Unknown db: {db}') + raise ValueError(f"Unknown db: {db}") return df diff --git a/anomstack/df/wrangle.py b/anomstack/df/wrangle.py index c9e2d1c..73342bb 100644 --- a/anomstack/df/wrangle.py +++ b/anomstack/df/wrangle.py @@ -28,7 +28,9 @@ def wrangle_df(df: pd.DataFrame, rounding: int = 4) -> pd.DataFrame: # if we have any nan metric_values then drop them and log how many nan rows we dropped if df["metric_value"].isnull().sum() > 0: - logger.warning(f"dropping {df['metric_value'].isnull().sum()} nan metric_value rows") + logger.warning( + f"dropping {df['metric_value'].isnull().sum()} nan metric_value rows" + ) df = df[~df["metric_value"].isnull()] # round metric_value diff --git a/anomstack/external/aws/credentials.py b/anomstack/external/aws/credentials.py index cb47eca..fb75609 100644 --- a/anomstack/external/aws/credentials.py +++ b/anomstack/external/aws/credentials.py @@ -2,13 +2,12 @@ def get_aws_credentials(): - - aws_access_key_id = os.getenv('ANOMSTACK_AWS_ACCESS_KEY_ID') - aws_secret_access_key = os.getenv('ANOMSTACK_AWS_SECRET_ACCESS_KEY') + aws_access_key_id = os.getenv("ANOMSTACK_AWS_ACCESS_KEY_ID") + aws_secret_access_key = os.getenv("ANOMSTACK_AWS_SECRET_ACCESS_KEY") aws_credentials = { - 'aws_access_key_id': aws_access_key_id, - 'aws_secret_access_key': aws_secret_access_key + "aws_access_key_id": aws_access_key_id, + "aws_secret_access_key": aws_secret_access_key, } return aws_credentials diff --git a/anomstack/external/aws/s3.py b/anomstack/external/aws/s3.py index 348ee96..5b91b61 100644 --- a/anomstack/external/aws/s3.py +++ b/anomstack/external/aws/s3.py @@ -1,8 +1,10 @@ -import boto3 import pickle from typing import List, Tuple -from pyod.models.base import BaseDetector + +import boto3 from dagster import get_dagster_logger +from pyod.models.base import BaseDetector + from anomstack.external.aws.credentials import get_aws_credentials @@ -16,9 +18,9 @@ def split_model_path(model_path: str) -> Tuple[str, str]: def get_s3_client(): aws_credentials = get_aws_credentials() return boto3.client( - 's3', - aws_access_key_id=aws_credentials['aws_access_key_id'], - aws_secret_access_key=aws_credentials['aws_secret_access_key'], + "s3", + aws_access_key_id=aws_credentials["aws_access_key_id"], + aws_secret_access_key=aws_credentials["aws_secret_access_key"], ) @@ -36,7 +38,7 @@ def save_models_s3(models, model_path, metric_batch) -> List[Tuple[str, BaseDete s3_client.put_object( Body=model_byte_stream, Bucket=model_path_bucket, - Key=f"{model_path_prefix}/{metric_batch}/{model_name}" + Key=f"{model_path_prefix}/{metric_batch}/{model_name}", ) return models @@ -46,17 +48,16 @@ def load_model_s3(metric_name, model_path, metric_batch) -> BaseDetector: logger = get_dagster_logger() model_path_bucket, model_path_prefix = split_model_path(model_path) - model_name = f'{metric_name}.pkl' - logger.info(f'loading {model_name} from {model_path}') + model_name = f"{metric_name}.pkl" + logger.info(f"loading {model_name} from {model_path}") s3_client = get_s3_client() model_obj = s3_client.get_object( - Bucket=model_path_bucket, - Key=f'{model_path_prefix}/{metric_batch}/{model_name}' + Bucket=model_path_bucket, Key=f"{model_path_prefix}/{metric_batch}/{model_name}" ) - model_byte_stream = model_obj['Body'].read() + model_byte_stream = model_obj["Body"].read() model = pickle.loads(model_byte_stream) return model diff --git a/anomstack/external/duckdb/duckdb.py b/anomstack/external/duckdb/duckdb.py index a857494..320f7a1 100644 --- a/anomstack/external/duckdb/duckdb.py +++ b/anomstack/external/duckdb/duckdb.py @@ -2,32 +2,33 @@ Some utility functions. """ -from dagster import get_dagster_logger -import pandas as pd -import jinja2 -from jinja2 import FileSystemLoader -import requests import json import os + import duckdb +import jinja2 +import pandas as pd +import requests +from dagster import get_dagster_logger +from jinja2 import FileSystemLoader def read_sql_duckdb(sql) -> pd.DataFrame: """ Read data from SQL. """ - + logger = get_dagster_logger() - - duckdb_path = os.environ.get('ANOMSTACK_DUCKDB_PATH','tmpdata/anomstack.db') - logger.info(f'duckdb_path:{duckdb_path}') - + + duckdb_path = os.environ.get("ANOMSTACK_DUCKDB_PATH", "tmpdata/anomstack.db") + logger.info(f"duckdb_path:{duckdb_path}") + conn = duckdb.connect(duckdb_path) - - logger.debug(f'sql:\n{sql}') + + logger.debug(f"sql:\n{sql}") df = duckdb.query(connection=conn, query=sql).df() - logger.debug(f'df:\n{df}') - + logger.debug(f"df:\n{df}") + return df @@ -35,19 +36,21 @@ def save_df_duckdb(df, table_key) -> pd.DataFrame: """ Save df to db. """ - + logger = get_dagster_logger() - - duckdb_path = os.environ.get('ANOMSTACK_DUCKDB_PATH','tmpdata/anomstack.db') - logger.info(f'duckdb_path:{duckdb_path}') + + duckdb_path = os.environ.get("ANOMSTACK_DUCKDB_PATH", "tmpdata/anomstack.db") + logger.info(f"duckdb_path:{duckdb_path}") conn = duckdb.connect(duckdb_path) try: - if '.' in table_key: - schema, _ = table_key.split('.') - duckdb.query(connection=conn, query=f'CREATE SCHEMA IF NOT EXISTS {schema}') - duckdb.query(connection=conn, query=f'INSERT INTO {table_key} SELECT * FROM df') + if "." in table_key: + schema, _ = table_key.split(".") + duckdb.query(connection=conn, query=f"CREATE SCHEMA IF NOT EXISTS {schema}") + duckdb.query(connection=conn, query=f"INSERT INTO {table_key} SELECT * FROM df") except: - duckdb.query(connection=conn, query=f'CREATE TABLE {table_key} AS SELECT * FROM df') - + duckdb.query( + connection=conn, query=f"CREATE TABLE {table_key} AS SELECT * FROM df" + ) + return df diff --git a/anomstack/external/gcp/bigquery.py b/anomstack/external/gcp/bigquery.py index 8e22964..568e6ac 100644 --- a/anomstack/external/gcp/bigquery.py +++ b/anomstack/external/gcp/bigquery.py @@ -1,16 +1,18 @@ """ """ -from dagster import get_dagster_logger -import pandas as pd -from anomstack.external.gcp.credentials import get_google_credentials import os -import time import random +import time + +import pandas as pd +from dagster import get_dagster_logger from google.api_core.exceptions import Forbidden from google.cloud import bigquery from google.cloud.exceptions import TooManyRequests +from anomstack.external.gcp.credentials import get_google_credentials + def read_sql_bigquery(sql) -> pd.DataFrame: """ @@ -98,9 +100,7 @@ def save_df_bigquery(df, table_key, if_exists="append", max_retries=5) -> pd.Dat for attempt in range(max_retries): try: job = client.load_table_from_dataframe( - dataframe=df, - destination=destination_table, - job_config=job_config + dataframe=df, destination=destination_table, job_config=job_config ) job.result() # Wait for the job to complete break # Success, exit the retry loop diff --git a/anomstack/external/gcp/credentials.py b/anomstack/external/gcp/credentials.py index 0abb6fe..fd94775 100644 --- a/anomstack/external/gcp/credentials.py +++ b/anomstack/external/gcp/credentials.py @@ -1,6 +1,7 @@ -from dagster import get_dagster_logger -import os import json +import os + +from dagster import get_dagster_logger from google.oauth2 import service_account @@ -19,24 +20,30 @@ def get_google_credentials(): logger = get_dagster_logger() # Check for file path credentials - credentials_path = os.getenv('ANOMSTACK_GOOGLE_APPLICATION_CREDENTIALS') + credentials_path = os.getenv("ANOMSTACK_GOOGLE_APPLICATION_CREDENTIALS") credentials = None if credentials_path: try: - credentials = service_account.Credentials.from_service_account_file(credentials_path) + credentials = service_account.Credentials.from_service_account_file( + credentials_path + ) logger.info(f"Loaded credentials from file path: {credentials_path}") except Exception as e: - logger.info(f"Failed to load credentials from file with: {str(e)}. Trying to load from JSON string...") + logger.info( + f"Failed to load credentials from file with: {str(e)}. Trying to load from JSON string..." + ) # If credentials could not be loaded from file path, try JSON string if credentials is None: - raw_credentials = os.getenv('ANOMSTACK_GOOGLE_APPLICATION_CREDENTIALS_JSON') + raw_credentials = os.getenv("ANOMSTACK_GOOGLE_APPLICATION_CREDENTIALS_JSON") if raw_credentials: try: credentials_json = json.loads(raw_credentials) - credentials = service_account.Credentials.from_service_account_info(credentials_json) + credentials = service_account.Credentials.from_service_account_info( + credentials_json + ) logger.info("Loaded credentials from JSON string") except json.JSONDecodeError as e: logger.info(f"Failed to parse JSON credentials with: {str(e)}") diff --git a/anomstack/external/gcp/gcs.py b/anomstack/external/gcp/gcs.py index 3b507a6..e9ea174 100644 --- a/anomstack/external/gcp/gcs.py +++ b/anomstack/external/gcp/gcs.py @@ -1,11 +1,12 @@ -from google.cloud import storage +import json +import os +import pickle from typing import List, Tuple -from pyod.models.base import BaseDetector + from dagster import get_dagster_logger -import pickle -import os -import json +from google.cloud import storage from google.oauth2 import service_account +from pyod.models.base import BaseDetector def split_model_path(model_path) -> Tuple[str, str]: @@ -31,7 +32,9 @@ def get_credentials(): if credentials_path: return service_account.Credentials.from_service_account_file(credentials_path) elif credentials_json: - return service_account.Credentials.from_service_account_info(json.loads(credentials_json)) + return service_account.Credentials.from_service_account_info( + json.loads(credentials_json) + ) else: return None @@ -50,7 +53,6 @@ def save_models_gcs(models, model_path, metric_batch) -> List[Tuple[str, BaseDet bucket = storage_client.get_bucket(model_path_bucket) for metric, model in models: - model_name = f"{metric}.pkl" logger.info(f"saving {model_name} to {model_path}") @@ -75,13 +77,12 @@ def load_model_gcs(metric_name, model_path, metric_batch) -> BaseDetector: storage_client = storage.Client(credentials=credentials) bucket = storage_client.get_bucket(model_path_bucket) - model_name = f'{metric_name}.pkl' - logger.info(f'loading {model_name} from {model_path}') - - blob = bucket.blob(f'{model_path_prefix}/{metric_batch}/{model_name}') + model_name = f"{metric_name}.pkl" + logger.info(f"loading {model_name} from {model_path}") - with blob.open('rb') as f: + blob = bucket.blob(f"{model_path_prefix}/{metric_batch}/{model_name}") + with blob.open("rb") as f: model = pickle.load(f) return model diff --git a/anomstack/external/snowflake/credentials.py b/anomstack/external/snowflake/credentials.py index ea95bd2..6f52a39 100644 --- a/anomstack/external/snowflake/credentials.py +++ b/anomstack/external/snowflake/credentials.py @@ -2,17 +2,16 @@ def get_snowflake_credentials(): - - snowflake_account = os.getenv('ANOMSTACK_SNOWFLAKE_ACCOUNT') - snowflake_user = os.getenv('ANOMSTACK_SNOWFLAKE_USER') - snowflake_password = os.getenv('ANOMSTACK_SNOWFLAKE_PASSWORD') - snowflake_warehouse = os.getenv('ANOMSTACK_SNOWFLAKE_WAREHOUSE') + snowflake_account = os.getenv("ANOMSTACK_SNOWFLAKE_ACCOUNT") + snowflake_user = os.getenv("ANOMSTACK_SNOWFLAKE_USER") + snowflake_password = os.getenv("ANOMSTACK_SNOWFLAKE_PASSWORD") + snowflake_warehouse = os.getenv("ANOMSTACK_SNOWFLAKE_WAREHOUSE") snowflake_credentials = { - 'snowflake_account': snowflake_account, - 'snowflake_user': snowflake_user, - 'snowflake_password': snowflake_password, - 'snowflake_warehouse': snowflake_warehouse, + "snowflake_account": snowflake_account, + "snowflake_user": snowflake_user, + "snowflake_password": snowflake_password, + "snowflake_warehouse": snowflake_warehouse, } return snowflake_credentials diff --git a/anomstack/external/snowflake/snowflake.py b/anomstack/external/snowflake/snowflake.py index 92d06a1..f218dea 100644 --- a/anomstack/external/snowflake/snowflake.py +++ b/anomstack/external/snowflake/snowflake.py @@ -1,10 +1,11 @@ """ """ -from dagster import get_dagster_logger +import pandas as pd import snowflake.connector +from dagster import get_dagster_logger from snowflake.connector.pandas_tools import write_pandas -import pandas as pd + from anomstack.external.snowflake.credentials import get_snowflake_credentials @@ -15,15 +16,15 @@ def read_sql_snowflake(sql, cols_lowercase=True) -> pd.DataFrame: logger = get_dagster_logger() - logger.debug(f'sql:\n{sql}') + logger.debug(f"sql:\n{sql}") credentials = get_snowflake_credentials() conn = snowflake.connector.connect( - account=credentials['snowflake_account'], - user=credentials['snowflake_user'], - password=credentials['snowflake_password'], - warehouse=credentials['snowflake_warehouse'], + account=credentials["snowflake_account"], + user=credentials["snowflake_user"], + password=credentials["snowflake_password"], + warehouse=credentials["snowflake_warehouse"], ) cur = conn.cursor() cur.execute(sql) @@ -32,7 +33,7 @@ def read_sql_snowflake(sql, cols_lowercase=True) -> pd.DataFrame: if cols_lowercase: df.columns = df.columns.str.lower() - logger.debug(f'df:\n{df}') + logger.debug(f"df:\n{df}") return df @@ -44,21 +45,21 @@ def save_df_snowflake(df, table_key, cols_lowercase=True) -> pd.DataFrame: logger = get_dagster_logger() - table_key_parts = table_key.split('.') + table_key_parts = table_key.split(".") credentials = get_snowflake_credentials() conn = snowflake.connector.connect( - account=credentials['snowflake_account'], - user=credentials['snowflake_user'], - password=credentials['snowflake_password'], - warehouse=credentials['snowflake_warehouse'], + account=credentials["snowflake_account"], + user=credentials["snowflake_user"], + password=credentials["snowflake_password"], + warehouse=credentials["snowflake_warehouse"], ) # convert metric timestamp to string # fixes: nowflake.connector.errors.ProgrammingError: 002023 (22000): SQL compilation error: Expression type does not match column data type, expecting TIMESTAMP_NTZ(9) but got NUMBER(38,0) for column METRIC_TIMESTAMP # TODO: why do i have to do this? - df['metric_timestamp'] = df['metric_timestamp'].astype(str) + df["metric_timestamp"] = df["metric_timestamp"].astype(str) success, nchunks, nrows, output = write_pandas( conn, @@ -66,7 +67,7 @@ def save_df_snowflake(df, table_key, cols_lowercase=True) -> pd.DataFrame: database=table_key_parts[0], schema=table_key_parts[1], table_name=table_key_parts[2], - auto_create_table=True + auto_create_table=True, ) conn.close() diff --git a/anomstack/fn/run.py b/anomstack/fn/run.py index 112bc2a..a9e9d24 100644 --- a/anomstack/fn/run.py +++ b/anomstack/fn/run.py @@ -3,8 +3,9 @@ """ import ast -from dagster import get_dagster_logger + import pandas as pd +from dagster import get_dagster_logger def validate_function_definition(code_str: str, function_name: str) -> bool: @@ -28,8 +29,8 @@ def define_fn(fn_name: str, fn: str) -> str: logger = get_dagster_logger() - logger.info(f'fn_name: {fn_name}') - logger.info(f'fn: {fn}') + logger.info(f"fn_name: {fn_name}") + logger.info(f"fn: {fn}") namespace = {} exec(fn, globals(), namespace) @@ -42,7 +43,7 @@ def run_df_fn(fn_name: str, fn: str) -> pd.DataFrame: Run a python function. """ - #fn_name = define_fn(fn_name, fn) + # fn_name = define_fn(fn_name, fn) exec(fn) df = locals()[fn_name]() diff --git a/anomstack/io/load.py b/anomstack/io/load.py index fb2b235..56de877 100644 --- a/anomstack/io/load.py +++ b/anomstack/io/load.py @@ -1,8 +1,10 @@ +import pickle from typing import List, Tuple + from pyod.models.base import BaseDetector -from anomstack.external.gcp.gcs import load_model_gcs + from anomstack.external.aws.s3 import load_model_s3 -import pickle +from anomstack.external.gcp.gcs import load_model_gcs def load_model(metric_name, model_path, metric_batch) -> BaseDetector: @@ -10,11 +12,11 @@ def load_model(metric_name, model_path, metric_batch) -> BaseDetector: Load model. """ - if model_path.startswith('gs://'): + if model_path.startswith("gs://"): model = load_model_gcs(metric_name, model_path, metric_batch) - elif model_path.startswith('s3://'): + elif model_path.startswith("s3://"): model = load_model_s3(metric_name, model_path, metric_batch) - elif model_path.startswith('local://'): + elif model_path.startswith("local://"): model = load_model_local(metric_name, model_path, metric_batch) else: raise ValueError(f"model_path {model_path} not supported") @@ -27,10 +29,9 @@ def load_model_local(metric_name, model_path, metric_batch) -> BaseDetector: Load model locally. """ - model_path = model_path.replace('local://', '') - - with open(f'{model_path}/{metric_batch}/{metric_name}.pkl', 'rb') as f: + model_path = model_path.replace("local://", "") + with open(f"{model_path}/{metric_batch}/{metric_name}.pkl", "rb") as f: model = pickle.load(f) return model diff --git a/anomstack/io/save.py b/anomstack/io/save.py index 57a3779..64486f8 100644 --- a/anomstack/io/save.py +++ b/anomstack/io/save.py @@ -1,25 +1,27 @@ +import os +import pickle from typing import List, Tuple + from pyod.models.base import BaseDetector -from anomstack.external.gcp.gcs import save_models_gcs + from anomstack.external.aws.s3 import save_models_s3 -import pickle -import os +from anomstack.external.gcp.gcs import save_models_gcs -def save_models_local(models, model_path, metric_batch) -> List[Tuple[str, BaseDetector]]: +def save_models_local( + models, model_path, metric_batch +) -> List[Tuple[str, BaseDetector]]: """ Save trained models locally. """ - model_path = model_path.replace('local://', '') + model_path = model_path.replace("local://", "") - if not os.path.exists(f'{model_path}/{metric_batch}'): - os.makedirs(f'{model_path}/{metric_batch}') + if not os.path.exists(f"{model_path}/{metric_batch}"): + os.makedirs(f"{model_path}/{metric_batch}") for metric_name, model in models: - - with open(f'{model_path}/{metric_batch}/{metric_name}.pkl', 'wb') as f: - + with open(f"{model_path}/{metric_batch}/{metric_name}.pkl", "wb") as f: pickle.dump(model, f) return models @@ -30,11 +32,11 @@ def save_models(models, model_path, metric_batch) -> List[Tuple[str, BaseDetecto Save trained models. """ - if model_path.startswith('gs://'): + if model_path.startswith("gs://"): models = save_models_gcs(models, model_path, metric_batch) - elif model_path.startswith('s3://'): + elif model_path.startswith("s3://"): models = save_models_s3(models, model_path, metric_batch) - elif model_path.startswith('local://'): + elif model_path.startswith("local://"): models = save_models_local(models, model_path, metric_batch) else: raise ValueError(f"model_path {model_path} not supported") diff --git a/anomstack/jobs/alert.py b/anomstack/jobs/alert.py index db86afb..361bf45 100644 --- a/anomstack/jobs/alert.py +++ b/anomstack/jobs/alert.py @@ -4,15 +4,16 @@ import pandas as pd from dagster import ( + DefaultScheduleStatus, + JobDefinition, + ScheduleDefinition, get_dagster_logger, job, op, - ScheduleDefinition, - JobDefinition, - DefaultScheduleStatus, ) -from anomstack.config import specs + from anomstack.alerts.send import send_alert +from anomstack.config import specs from anomstack.jinja.render import render from anomstack.sql.read import read_sql @@ -85,13 +86,13 @@ def alert(df_alerts) -> pd.DataFrame: for metric_name in df_alerts["metric_name"].unique(): logger.info(f"alerting on {metric_name}") df_alert = df_alerts.query(f"metric_name=='{metric_name}'") - df_alert["metric_timestamp"] = pd.to_datetime(df_alert["metric_timestamp"]) + df_alert["metric_timestamp"] = pd.to_datetime( + df_alert["metric_timestamp"] + ) metric_timestamp_max = ( df_alert["metric_timestamp"].max().strftime("%Y-%m-%d %H:%M") ) - alert_title = ( - f"🔥 [{metric_name}] looks anomalous ({metric_timestamp_max}) 🔥" - ) + alert_title = f"🔥 [{metric_name}] looks anomalous ({metric_timestamp_max}) 🔥" df_alert = send_alert( metric_name=metric_name, title=alert_title, diff --git a/anomstack/jobs/ingest.py b/anomstack/jobs/ingest.py index 951a045..b02e595 100644 --- a/anomstack/jobs/ingest.py +++ b/anomstack/jobs/ingest.py @@ -3,22 +3,24 @@ """ from typing import Dict + import pandas as pd from dagster import ( - job, - op, - ScheduleDefinition, - JobDefinition, DefaultScheduleStatus, - get_dagster_logger, + JobDefinition, + ScheduleDefinition, asset, + get_dagster_logger, + job, + op, ) + from anomstack.config import specs -from anomstack.jinja.render import render -from anomstack.sql.read import read_sql -from anomstack.fn.run import run_df_fn from anomstack.df.save import save_df from anomstack.df.wrangle import wrangle_df +from anomstack.fn.run import run_df_fn +from anomstack.jinja.render import render +from anomstack.sql.read import read_sql from anomstack.validate.validate import validate_df diff --git a/anomstack/jobs/llmalert.py b/anomstack/jobs/llmalert.py index 20aaa42..1ff6190 100644 --- a/anomstack/jobs/llmalert.py +++ b/anomstack/jobs/llmalert.py @@ -3,22 +3,24 @@ """ import os + +import openai import pandas as pd from dagster import ( - job, - op, - ScheduleDefinition, - JobDefinition, DefaultScheduleStatus, + JobDefinition, + ScheduleDefinition, get_dagster_logger, + job, + op, ) -import openai + +from anomstack.alerts.send import send_alert from anomstack.config import specs +from anomstack.fn.run import define_fn from anomstack.jinja.render import render -from anomstack.sql.read import read_sql -from anomstack.alerts.send import send_alert from anomstack.llm.completion import get_completion -from anomstack.fn.run import define_fn +from anomstack.sql.read import read_sql def build_llmalert_job(spec) -> JobDefinition: @@ -96,17 +98,21 @@ def llmalert(context, df: pd.DataFrame) -> None: .sort_values(by="metric_timestamp", ascending=True) .reset_index(drop=True) ).dropna() - df_metric["metric_timestamp"] = pd.to_datetime(df_metric["metric_timestamp"]) + df_metric["metric_timestamp"] = pd.to_datetime( + df_metric["metric_timestamp"] + ) if llmalert_smooth_n > 0: df_metric["metric_value"] = ( df_metric["metric_value"].rolling(llmalert_smooth_n).mean() ) - df_metric['metric_recency'] = 'baseline' - df_metric.iloc[-llmalert_recent_n:, df_metric.columns.get_loc('metric_recency')] = 'recent' + df_metric["metric_recency"] = "baseline" + df_metric.iloc[ + -llmalert_recent_n:, df_metric.columns.get_loc("metric_recency") + ] = "recent" df_prompt = ( - df_metric[["metric_value","metric_recency"]] + df_metric[["metric_value", "metric_recency"]] .dropna() .round(llmalert_metric_rounding) ) diff --git a/anomstack/jobs/plot.py b/anomstack/jobs/plot.py index 51c7530..6ff69f2 100644 --- a/anomstack/jobs/plot.py +++ b/anomstack/jobs/plot.py @@ -3,26 +3,28 @@ import base64 from io import BytesIO -import pandas as pd +from typing import List, Tuple + import matplotlib.pyplot as plt +import pandas as pd import seaborn as sns from dagster import ( AssetExecutionContext, + DefaultScheduleStatus, + JobDefinition, MetadataValue, - job, - op, ScheduleDefinition, - JobDefinition, - DefaultScheduleStatus, asset, - get_dagster_logger + get_dagster_logger, + job, + op, ) -from typing import List, Tuple + from anomstack.config import specs +from anomstack.df.resample import resample from anomstack.jinja.render import render -from anomstack.sql.read import read_sql from anomstack.plots.plot import make_batch_plot -from anomstack.df.resample import resample +from anomstack.sql.read import read_sql def build_plot_job(spec) -> JobDefinition: @@ -45,8 +47,8 @@ def noop(): metric_batch = spec["metric_batch"] db = spec["db"] preprocess_params = spec["preprocess_params"] - freq = preprocess_params.get('freq') - freq_agg = preprocess_params.get('freq_agg') + freq = preprocess_params.get("freq") + freq_agg = preprocess_params.get("freq_agg") @job(name=f"{metric_batch}_plot_job") def _job(): diff --git a/anomstack/jobs/score.py b/anomstack/jobs/score.py index 91130b6..2c68b18 100644 --- a/anomstack/jobs/score.py +++ b/anomstack/jobs/score.py @@ -4,21 +4,22 @@ import pandas as pd from dagster import ( + DefaultScheduleStatus, + JobDefinition, + ScheduleDefinition, get_dagster_logger, job, op, - ScheduleDefinition, - JobDefinition, - DefaultScheduleStatus, ) + from anomstack.config import specs from anomstack.df.save import save_df +from anomstack.df.wrangle import wrangle_df +from anomstack.fn.run import define_fn +from anomstack.io.load import load_model from anomstack.jinja.render import render from anomstack.sql.read import read_sql -from anomstack.io.load import load_model -from anomstack.fn.run import define_fn from anomstack.validate.validate import validate_df -from anomstack.df.wrangle import wrangle_df def build_score_job(spec) -> JobDefinition: @@ -101,7 +102,9 @@ def score(df) -> pd.DataFrame: X = preprocess(df_metric, **preprocess_params) if len(X) == 0: - logger.debug(f"X is empty for {metric_name} in {metric_batch} score job.") + logger.debug( + f"X is empty for {metric_name} in {metric_batch} score job." + ) continue logger.debug(f"X:\n{X.head()}") @@ -148,7 +151,7 @@ def score(df) -> pd.DataFrame: if len(df_scores) == 0: logger.debug(f"df_scores is empty for {metric_batch} score job.") return df_scores - + df_scores = wrangle_df(df_scores, rounding=score_metric_rounding) df_scores = validate_df(df_scores) @@ -171,7 +174,9 @@ def save_scores(df) -> pd.DataFrame: if len(df) > 0: df = save_df(df, db, table_key) else: - logger.debug(f"no scores to save, df is empty for {metric_batch} score job.") + logger.debug( + f"no scores to save, df is empty for {metric_batch} score job." + ) return df diff --git a/anomstack/jobs/train.py b/anomstack/jobs/train.py index 76303e2..5bcedcd 100644 --- a/anomstack/jobs/train.py +++ b/anomstack/jobs/train.py @@ -2,23 +2,25 @@ Generate train jobs and schedules. """ +from typing import List, Tuple + import pandas as pd -from pyod.models.base import BaseDetector from dagster import ( + DefaultScheduleStatus, + JobDefinition, + ScheduleDefinition, get_dagster_logger, job, op, - ScheduleDefinition, - JobDefinition, - DefaultScheduleStatus, ) -from typing import List, Tuple +from pyod.models.base import BaseDetector + from anomstack.config import specs -from anomstack.jinja.render import render -from anomstack.sql.read import read_sql +from anomstack.fn.run import define_fn from anomstack.io.save import save_models +from anomstack.jinja.render import render from anomstack.ml.train import train_model -from anomstack.fn.run import define_fn +from anomstack.sql.read import read_sql def build_train_job(spec) -> JobDefinition: @@ -99,24 +101,17 @@ def train(df) -> List[Tuple[str, BaseDetector]]: else: for metric_name in df["metric_name"].unique(): df_metric = df[df["metric_name"] == metric_name] - logger.debug(f"preprocess {metric_name} in {metric_batch} train job.") - logger.debug(f"df_metric:\n{df_metric.head()}") - X = preprocess( - df_metric, - shuffle=True, - **preprocess_params + logger.debug( + f"preprocess {metric_name} in {metric_batch} train job." ) + logger.debug(f"df_metric:\n{df_metric.head()}") + X = preprocess(df_metric, shuffle=True, **preprocess_params) logger.debug(f"X:\n{X.head()}") if len(X) > 0: logger.info( f"training {metric_name} in {metric_batch} train job. len(X)={len(X)}" ) - model = train_model( - X, - metric_name, - model_name, - model_params - ) + model = train_model(X, metric_name, model_name, model_params) models.append((metric_name, model)) else: logger.info( diff --git a/anomstack/llm/completion.py b/anomstack/llm/completion.py index 5fc8c85..8faae4f 100644 --- a/anomstack/llm/completion.py +++ b/anomstack/llm/completion.py @@ -1,7 +1,8 @@ import json -import openai import time +import openai + def get_completion(prompt: str, model="gpt-3.5-turbo", max_retries=5): """ @@ -46,7 +47,11 @@ def get_completion(prompt: str, model="gpt-3.5-turbo", max_retries=5): "description": "Confidence level in the `is_anomalous` flag. 'high' if very confident in the anomaly decision, 'medium' if somewhat confident, 'low' if not confident.", }, }, - "required": ["is_anomalous", "decision_reasoning", "decision_confidence_level"], + "required": [ + "is_anomalous", + "decision_reasoning", + "decision_confidence_level", + ], }, } ], diff --git a/anomstack/main.py b/anomstack/main.py index c7d5b22..0a96fc8 100644 --- a/anomstack/main.py +++ b/anomstack/main.py @@ -3,15 +3,15 @@ """ from dagster import Definitions -from anomstack.jobs.ingest import ingest_jobs, ingest_schedules -from anomstack.jobs.train import train_jobs, train_schedules -from anomstack.jobs.score import score_jobs, score_schedules + from anomstack.jobs.alert import alert_jobs, alert_schedules +from anomstack.jobs.ingest import ingest_jobs, ingest_schedules from anomstack.jobs.llmalert import llmalert_jobs, llmalert_schedules from anomstack.jobs.plot import plot_jobs, plot_schedules +from anomstack.jobs.score import score_jobs, score_schedules +from anomstack.jobs.train import train_jobs, train_schedules from anomstack.sensors.failure import email_on_run_failure - jobs = ingest_jobs + train_jobs + score_jobs + alert_jobs + llmalert_jobs + plot_jobs sensors = [email_on_run_failure] schedules = ( diff --git a/anomstack/ml/preprocess.py b/anomstack/ml/preprocess.py index 26f86b9..b9ada23 100644 --- a/anomstack/ml/preprocess.py +++ b/anomstack/ml/preprocess.py @@ -2,7 +2,7 @@ from dagster import get_dagster_logger -def make_x(df, mode='train', diff_n=0, smooth_n=0, lags_n=0, score_n=1) -> pd.DataFrame: +def make_x(df, mode="train", diff_n=0, smooth_n=0, lags_n=0, score_n=1) -> pd.DataFrame: """ Prepare data for model training and scoring. @@ -14,23 +14,27 @@ def make_x(df, mode='train', diff_n=0, smooth_n=0, lags_n=0, score_n=1) -> pd.Da logger = get_dagster_logger() - X = df.sort_values(by=['metric_timestamp']).reset_index(drop=True).set_index('metric_timestamp') + X = ( + df.sort_values(by=["metric_timestamp"]) + .reset_index(drop=True) + .set_index("metric_timestamp") + ) X = df[["metric_value"]] if diff_n > 0: - X['metric_value'] = X['metric_value'].diff(periods=diff_n).dropna() + X["metric_value"] = X["metric_value"].diff(periods=diff_n).dropna() if smooth_n > 0: - X['metric_value'] = X['metric_value'].rolling(window=smooth_n).mean().dropna() + X["metric_value"] = X["metric_value"].rolling(window=smooth_n).mean().dropna() if lags_n > 0: - for lag in range(1,lags_n+1): - X[f'lag_{lag}'] = X['metric_value'].shift(lag) + for lag in range(1, lags_n + 1): + X[f"lag_{lag}"] = X["metric_value"].shift(lag) - if mode == 'train': + if mode == "train": X = X.sample(frac=1) - elif mode == 'score': + elif mode == "score": X = X.tail(score_n) else: @@ -38,6 +42,6 @@ def make_x(df, mode='train', diff_n=0, smooth_n=0, lags_n=0, score_n=1) -> pd.Da X = X.dropna() - logger.info(f'X=\n{X}') + logger.info(f"X=\n{X}") return X diff --git a/anomstack/ml/train.py b/anomstack/ml/train.py index 2e06496..42daa36 100644 --- a/anomstack/ml/train.py +++ b/anomstack/ml/train.py @@ -1,7 +1,8 @@ -import time import importlib -from pyod.models.base import BaseDetector +import time + from dagster import get_dagster_logger +from pyod.models.base import BaseDetector def train_model(X, metric, model_name, model_params) -> BaseDetector: @@ -11,7 +12,9 @@ def train_model(X, metric, model_name, model_params) -> BaseDetector: logger = get_dagster_logger() - model_class = getattr(importlib.import_module(f'pyod.models.{model_name.lower()}'), model_name) + model_class = getattr( + importlib.import_module(f"pyod.models.{model_name.lower()}"), model_name + ) model = model_class(**model_params) time_start_train = time.time() diff --git a/anomstack/plots/plot.py b/anomstack/plots/plot.py index b6b6393..2739d23 100644 --- a/anomstack/plots/plot.py +++ b/anomstack/plots/plot.py @@ -1,8 +1,8 @@ """ """ -import matplotlib.pyplot as plt import matplotlib.dates as mdates +import matplotlib.pyplot as plt import pandas as pd import seaborn as sns @@ -27,7 +27,7 @@ def make_alert_plot(df: pd.DataFrame, metric_name: str, threshold: float = 0.8) n = len(df_plot) ax1 = df_plot["metric_value"].plot( - title=f'{metric_name} (n={n})', ax=axes[0], style="-o", color="royalblue" + title=f"{metric_name} (n={n})", ax=axes[0], style="-o", color="royalblue" ) if "metric_value_smooth" in df_plot.columns: df_plot["metric_value_smooth"].plot( @@ -62,7 +62,7 @@ def make_alert_plot(df: pd.DataFrame, metric_name: str, threshold: float = 0.8) ax2.set_ylim(0, 1) ax2.legend(loc="upper left") ax2.grid(False) - ax2.locator_params(axis='x', nbins=25) + ax2.locator_params(axis="x", nbins=25) for idx in alert_points.index: ax1.axvline(idx, color="yellow", alpha=0.3) diff --git a/anomstack/sensors/failure.py b/anomstack/sensors/failure.py index 4180b63..72eb14d 100644 --- a/anomstack/sensors/failure.py +++ b/anomstack/sensors/failure.py @@ -1,6 +1,6 @@ import os -from dagster import make_email_on_run_failure_sensor +from dagster import make_email_on_run_failure_sensor email_from = os.getenv( "ANOMSTACK_FAILURE_EMAIL_FROM", os.getenv("ANOMSTACK_ALERT_EMAIL_FROM") @@ -16,5 +16,5 @@ email_from=email_from, email_password=email_password, email_to=email_to, - monitor_all_repositories=True + monitor_all_repositories=True, ) diff --git a/anomstack/sql/read.py b/anomstack/sql/read.py index b4f6dcf..3f807be 100644 --- a/anomstack/sql/read.py +++ b/anomstack/sql/read.py @@ -2,10 +2,11 @@ This module provides functions for reading data from SQL databases using different database connectors. """ -from dagster import get_dagster_logger import pandas as pd -from anomstack.external.gcp.bigquery import read_sql_bigquery +from dagster import get_dagster_logger + from anomstack.external.duckdb.duckdb import read_sql_duckdb +from anomstack.external.gcp.bigquery import read_sql_bigquery from anomstack.external.snowflake.snowflake import read_sql_snowflake diff --git a/anomstack/validate/validate.py b/anomstack/validate/validate.py index d9f095c..712fa92 100644 --- a/anomstack/validate/validate.py +++ b/anomstack/validate/validate.py @@ -1,4 +1,5 @@ from io import StringIO + import pandas as pd from dagster import get_dagster_logger @@ -16,19 +17,25 @@ def validate_df(df: pd.DataFrame) -> pd.DataFrame: logger.debug(f"df.info(): \n{info_str}") # validate the dataframe - assert 'metric_type' in df.columns.str.lower(), 'metric_type column missing' - assert 'metric_batch' in df.columns.str.lower(), 'metric_batch column missing' - assert 'metric_name' in df.columns.str.lower(), 'metric_name column missing' - assert 'metric_value' in df.columns.str.lower(), 'metric_value column missing' - assert 'metric_timestamp' in df.columns.str.lower(), 'metric_timestamp column missing' - assert len(df.columns) == 5, f'expected 5 columns, got {len(df.columns)}' - assert len(df) > 0, 'no data returned' + assert "metric_type" in df.columns.str.lower(), "metric_type column missing" + assert "metric_batch" in df.columns.str.lower(), "metric_batch column missing" + assert "metric_name" in df.columns.str.lower(), "metric_name column missing" + assert "metric_value" in df.columns.str.lower(), "metric_value column missing" + assert ( + "metric_timestamp" in df.columns.str.lower() + ), "metric_timestamp column missing" + assert len(df.columns) == 5, f"expected 5 columns, got {len(df.columns)}" + assert len(df) > 0, "no data returned" # metric_name is string - assert df['metric_name'].dtype == 'object', 'metric_name is not string' + assert df["metric_name"].dtype == "object", "metric_name is not string" # metric_value is numeric - assert pd.api.types.is_numeric_dtype(df['metric_value']), 'metric_value is not numeric' + assert pd.api.types.is_numeric_dtype( + df["metric_value"] + ), "metric_value is not numeric" # metric_timestamp is timestamp - assert pd.api.types.is_datetime64_any_dtype(df['metric_timestamp']), 'metric_timestamp is not timestamp' + assert pd.api.types.is_datetime64_any_dtype( + df["metric_timestamp"] + ), "metric_timestamp is not timestamp" return df diff --git a/dagster_docker.yaml b/dagster_docker.yaml index 0b3f15c..20372e5 100644 --- a/dagster_docker.yaml +++ b/dagster_docker.yaml @@ -80,4 +80,4 @@ event_log_storage: env: DAGSTER_POSTGRES_PASSWORD db_name: env: DAGSTER_POSTGRES_DB - port: 5432 \ No newline at end of file + port: 5432 diff --git a/dashboard.py b/dashboard.py index 0d9eba6..a5aaffb 100644 --- a/dashboard.py +++ b/dashboard.py @@ -6,14 +6,14 @@ """ import pandas as pd -import streamlit as st import plotly.graph_objs as go +import streamlit as st +from dotenv import load_dotenv from plotly.subplots import make_subplots + from anomstack.config import specs from anomstack.jinja.render import render from anomstack.sql.read import read_sql -from dotenv import load_dotenv - load_dotenv() @@ -93,7 +93,9 @@ def get_data(sql: str, db: str) -> pd.DataFrame: # data based inputs metric_names = ["ALL"] -unique_metrics = sorted(list(df[df["metric_batch"] == batch_selection]["metric_name"].unique())) +unique_metrics = sorted( + list(df[df["metric_batch"] == batch_selection]["metric_name"].unique()) +) metric_names.extend(unique_metrics) metric_selection = st.sidebar.selectbox("Metric Name:", metric_names) diff --git a/docker-compose.yaml b/docker-compose.yaml index 9b375a0..c13c759 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -162,7 +162,7 @@ services: depends_on: - anomstack_postgresql - anomstack_code - + # This service runs the Streamlit dashboard anomstack_dashboard: build: @@ -200,4 +200,4 @@ services: networks: anomstack_network: driver: bridge - name: anomstack_network \ No newline at end of file + name: anomstack_network diff --git a/docs/deployment/README.md b/docs/deployment/README.md index 81750be..b643986 100644 --- a/docs/deployment/README.md +++ b/docs/deployment/README.md @@ -1,3 +1,3 @@ # Deployment -WIP \ No newline at end of file +WIP diff --git a/docs/deployment/gcp.md b/docs/deployment/gcp.md index 5e0ef17..a5e72fd 100644 --- a/docs/deployment/gcp.md +++ b/docs/deployment/gcp.md @@ -122,7 +122,7 @@ sudo docker run hello-world # clone anomstack git clone https://github.com/andrewm4894/anomstack.git -# or +# or # clone repo at specific release tag # git clone -b v0.0.1 https://github.com/andrewm4894/anomstack.git diff --git a/metrics/defaults/python/preprocess.py b/metrics/defaults/python/preprocess.py index e292fbf..8528e14 100644 --- a/metrics/defaults/python/preprocess.py +++ b/metrics/defaults/python/preprocess.py @@ -2,7 +2,14 @@ def preprocess( - df, diff_n=1, smooth_n=3, lags_n=5, shuffle=False, dropna=True, freq=None, freq_agg='mean' + df, + diff_n=1, + smooth_n=3, + lags_n=5, + shuffle=False, + dropna=True, + freq=None, + freq_agg="mean", ) -> pd.DataFrame: """ Prepare data for model training and scoring. @@ -25,9 +32,9 @@ def preprocess( X = X[["metric_value"]] if freq is not None: - if freq_agg == 'mean': + if freq_agg == "mean": X = X.resample(freq).mean() - elif freq_agg == 'sum': + elif freq_agg == "sum": X = X.resample(freq).sum() # Add other aggregation methods as needed else: diff --git a/metrics/defaults/python/prompt.py b/metrics/defaults/python/prompt.py index 59a8765..e167ebb 100644 --- a/metrics/defaults/python/prompt.py +++ b/metrics/defaults/python/prompt.py @@ -12,7 +12,9 @@ def make_prompt(df, llmalert_recent_n) -> str: from tabulate import tabulate - text_representation = tabulate(df.reset_index(), headers='keys', tablefmt='pipe', showindex=False) + text_representation = tabulate( + df.reset_index(), headers="keys", tablefmt="pipe", showindex=False + ) prompt = f""" You are a seasoned time series expert who has worked with time series data for many years and are very acomplished at spotting and explaining anomalies in time series data. diff --git a/metrics/defaults/sql/alerts.sql b/metrics/defaults/sql/alerts.sql index 50620ab..6f1dc94 100644 --- a/metrics/defaults/sql/alerts.sql +++ b/metrics/defaults/sql/alerts.sql @@ -66,7 +66,7 @@ from metric_value_data ), -data_ranked as +data_ranked as ( select m.metric_timestamp, @@ -80,7 +80,7 @@ from metric_value_recency_ranked m left outer join metric_score_recency_ranked s -on +on m.metric_name = s.metric_name and m.metric_batch = s.metric_batch @@ -100,7 +100,7 @@ select metric_score_recency_rank, -- smooth the metric score over the last {{ alert_smooth_n }} values avg(metric_score) over (partition by metric_name order by metric_score_recency_rank rows between {{ alert_smooth_n }} preceding and current row) as metric_score_smooth -from +from data_ranked ), @@ -128,7 +128,7 @@ select metric_batch, metric_name, max(metric_alert) as metric_alert_tmp -from +from data_alerts group by 1,2 having max(metric_alert) = 1 @@ -151,4 +151,4 @@ on data_alerts.metric_batch = metrics_triggered.metric_batch and data_alerts.metric_name = metrics_triggered.metric_name -; \ No newline at end of file +; diff --git a/metrics/defaults/sql/plot.sql b/metrics/defaults/sql/plot.sql index 215f73c..367da72 100644 --- a/metrics/defaults/sql/plot.sql +++ b/metrics/defaults/sql/plot.sql @@ -66,7 +66,7 @@ from metric_value_data ), -data_ranked as +data_ranked as ( select m.metric_timestamp, @@ -100,7 +100,7 @@ select metric_score_recency_rank, -- smooth the metric score over the last {{ alert_smooth_n }} values avg(metric_score) over (partition by metric_batch, metric_name order by metric_score_recency_rank rows between {{ alert_smooth_n }} preceding and current row) as metric_score_smooth -from +from data_ranked ), @@ -132,4 +132,4 @@ select metric_alert from data_alerts -; \ No newline at end of file +; diff --git a/metrics/defaults/sql/train.sql b/metrics/defaults/sql/train.sql index 0900ebd..dc132f5 100644 --- a/metrics/defaults/sql/train.sql +++ b/metrics/defaults/sql/train.sql @@ -39,4 +39,4 @@ where and -- must be at least {{ train_min_n }} records metric_recency_rank >= {{ train_min_n }} -; \ No newline at end of file +; diff --git a/metrics/examples/bigquery/bigquery_example_simple/bigquery_example_simple.yaml b/metrics/examples/bigquery/bigquery_example_simple/bigquery_example_simple.yaml index d4ab50e..b4ec59b 100644 --- a/metrics/examples/bigquery/bigquery_example_simple/bigquery_example_simple.yaml +++ b/metrics/examples/bigquery/bigquery_example_simple/bigquery_example_simple.yaml @@ -18,7 +18,7 @@ ingest_sql: > * from ( - + -- random_1 select @@ -36,9 +36,9 @@ ingest_sql: > else rand() -- Normal value end as metric_value union all - + -- random_2 - + select current_timestamp() as metric_timestamp, 'random_2' as metric_name, @@ -54,9 +54,9 @@ ingest_sql: > else rand() -- Normal value end as metric_value union all - + -- random_3 - + select current_timestamp() as metric_timestamp, 'random_3' as metric_name, diff --git a/metrics/examples/example_simple/example_simple.yaml b/metrics/examples/example_simple/example_simple.yaml index cde6efa..a888dc8 100644 --- a/metrics/examples/example_simple/example_simple.yaml +++ b/metrics/examples/example_simple/example_simple.yaml @@ -33,4 +33,4 @@ ingest_sql: > union all select * from metric_2 ) - ; \ No newline at end of file + ; diff --git a/metrics/examples/freq/freq.yaml b/metrics/examples/freq/freq.yaml index 03019ed..5d46f6e 100644 --- a/metrics/examples/freq/freq.yaml +++ b/metrics/examples/freq/freq.yaml @@ -23,7 +23,7 @@ ingest_sql: > * from ( - + -- freq_random_1 select @@ -41,9 +41,9 @@ ingest_sql: > else rand() -- Normal value end as metric_value union all - + -- freq_random_2 - + select current_timestamp() as metric_timestamp, 'freq_random_2' as metric_name, @@ -59,9 +59,9 @@ ingest_sql: > else rand() -- Normal value end as metric_value union all - + -- freq_random_3 - + select current_timestamp() as metric_timestamp, 'freq_random_3' as metric_name, @@ -77,4 +77,4 @@ ingest_sql: > else rand() -- Normal value end as metric_value - ); \ No newline at end of file + ); diff --git a/metrics/examples/gsod/gsod.sql b/metrics/examples/gsod/gsod.sql index 977a64e..53322d3 100644 --- a/metrics/examples/gsod/gsod.sql +++ b/metrics/examples/gsod/gsod.sql @@ -2,29 +2,29 @@ with stations_selected as ( -select +select usaf, wban, country, name, -from +from `bigquery-public-data.noaa_gsod.stations` where country in ('US','UK') ), -data_filtered as +data_filtered as ( -select +select gsod.*, stations.country -from +from `bigquery-public-data.noaa_gsod.gsod2023` gsod join stations_selected stations on gsod.stn = stations.usaf - and + and gsod.wban = stations.wban where date(date) = date_add(current_date(), interval -4 day) @@ -34,11 +34,11 @@ where us_temp_avg as ( -select +select timestamp(date) as metric_timestamp, 'gsod_us_temp_avg' as metric_name, avg(temp) as metric_value -from +from data_filtered where country = 'US' @@ -47,11 +47,11 @@ group by 1,2 us_temp_min as ( -select +select timestamp(date) as metric_timestamp, 'gsod_us_temp_min' as metric_name, min(temp) as metric_value -from +from data_filtered where country = 'US' @@ -60,11 +60,11 @@ group by 1,2 us_temp_max as ( -select +select timestamp(date) as metric_timestamp, 'gsod_us_temp_max' as metric_name, max(temp) as metric_value -from +from data_filtered where country = 'US' @@ -75,7 +75,7 @@ group by 1,2 uk_temp_avg as ( -select +select timestamp(date) as metric_timestamp, 'gsod_uk_temp_avg' as metric_name, avg(temp) as metric_value @@ -88,7 +88,7 @@ group by 1,2 uk_temp_min as ( -select +select timestamp(date) as metric_timestamp, 'gsod_uk_temp_min' as metric_name, min(temp) as metric_value @@ -101,7 +101,7 @@ group by 1,2 uk_temp_max as ( -select +select timestamp(date) as metric_timestamp, 'gsod_uk_temp_max' as metric_name, max(temp) as metric_value @@ -124,4 +124,4 @@ union all select * from uk_temp_min union all select * from uk_temp_max -; \ No newline at end of file +; diff --git a/metrics/examples/gsod/gsod.yaml b/metrics/examples/gsod/gsod.yaml index 2bda86f..c9e2bd8 100644 --- a/metrics/examples/gsod/gsod.yaml +++ b/metrics/examples/gsod/gsod.yaml @@ -13,4 +13,4 @@ plot_cron_schedule: "45 10 * * *" alert_methods: "email" alert_always: False ingest_sql: > - {% include "./examples/gsod/gsod.sql" %} \ No newline at end of file + {% include "./examples/gsod/gsod.sql" %} diff --git a/metrics/examples/gtrends/gtrends.sql b/metrics/examples/gtrends/gtrends.sql index a195b5c..e174d2a 100644 --- a/metrics/examples/gtrends/gtrends.sql +++ b/metrics/examples/gtrends/gtrends.sql @@ -1,30 +1,30 @@ with -- get the latest data -latest_data as +latest_data as ( -select +select * -FROM - `bigquery-public-data.google_trends.top_terms` +FROM + `bigquery-public-data.google_trends.top_terms` WHERE -- usually the data is updated every 3 days - refresh_date = date_sub(current_date(), INTERVAL 3 DAY) + refresh_date = date_sub(current_date(), INTERVAL 3 DAY) ), -- get the latest week max_week as ( -SELECT - max(week) as week_max -FROM +SELECT + max(week) as week_max +FROM latest_data ), -- get the latest data for the latest week most_recent_week as ( -select +select latest_data.* from latest_data diff --git a/metrics/examples/hackernews/hn_top_stories_scores.yaml b/metrics/examples/hackernews/hn_top_stories_scores.yaml index 0c13752..e961e6d 100644 --- a/metrics/examples/hackernews/hn_top_stories_scores.yaml +++ b/metrics/examples/hackernews/hn_top_stories_scores.yaml @@ -12,4 +12,4 @@ alert_always: False alert_methods: "email" ingest_fn: > {% include "./examples/hackernews/hn_top_stories_scores.py" %} - \ No newline at end of file + diff --git a/metrics/examples/netdata/netdata.py b/metrics/examples/netdata/netdata.py index 6a612fd..87c2241 100644 --- a/metrics/examples/netdata/netdata.py +++ b/metrics/examples/netdata/netdata.py @@ -6,8 +6,8 @@ def ingest() -> pd.DataFrame: Ingest data from Netdata API. """ - import requests import pandas as pd + import requests from dagster import get_dagster_logger logger = get_dagster_logger() @@ -54,7 +54,7 @@ def ingest() -> pd.DataFrame: df = df[["metric_timestamp", "metric_name", "metric_value"]] - logger.debug(f'df:\n{df}') + logger.debug(f"df:\n{df}") df["metric_value"] = df["metric_value"].astype(float) diff --git a/metrics/examples/netdata/netdata.yaml b/metrics/examples/netdata/netdata.yaml index 78d42f1..d010e2d 100644 --- a/metrics/examples/netdata/netdata.yaml +++ b/metrics/examples/netdata/netdata.yaml @@ -12,4 +12,4 @@ alert_always: False alert_methods: "email" ingest_fn: > {% include "./examples/netdata/netdata.py" %} - \ No newline at end of file + diff --git a/metrics/examples/python/python_ingest_simple/README.md b/metrics/examples/python/python_ingest_simple/README.md index 4287ca8..792d600 100644 --- a/metrics/examples/python/python_ingest_simple/README.md +++ b/metrics/examples/python/python_ingest_simple/README.md @@ -1 +1 @@ -# \ No newline at end of file +# diff --git a/metrics/examples/python/python_ingest_simple/ingest.py b/metrics/examples/python/python_ingest_simple/ingest.py index d4113fd..920bc90 100644 --- a/metrics/examples/python/python_ingest_simple/ingest.py +++ b/metrics/examples/python/python_ingest_simple/ingest.py @@ -1,9 +1,9 @@ def ingest(): - - import pandas as pd import random import time + import pandas as pd + # generate random metrics metric1_value = random.uniform(0, 10) metric2_value = random.uniform(0, 10) @@ -13,10 +13,10 @@ def ingest(): # make df data = { - 'metric_name': ['metric1', 'metric2'], - 'metric_value': [metric1_value, metric2_value], - 'metric_timestamp': [current_timestamp, current_timestamp] - } + "metric_name": ["metric1", "metric2"], + "metric_value": [metric1_value, metric2_value], + "metric_timestamp": [current_timestamp, current_timestamp], + } df = pd.DataFrame(data) return df diff --git a/metrics/examples/python/python_ingest_simple/python_ingest_simple.yaml b/metrics/examples/python/python_ingest_simple/python_ingest_simple.yaml index 447abd0..e54c9a6 100644 --- a/metrics/examples/python/python_ingest_simple/python_ingest_simple.yaml +++ b/metrics/examples/python/python_ingest_simple/python_ingest_simple.yaml @@ -5,4 +5,4 @@ train_cron_schedule: "*/4 * * * *" score_cron_schedule: "*/3 * * * *" ingest_fn: > {% include "./examples/python/python_ingest_simple/ingest.py" %} - \ No newline at end of file + diff --git a/metrics/examples/s3/s3_example_simple/s3_example_simple.yaml b/metrics/examples/s3/s3_example_simple/s3_example_simple.yaml index a0028cd..15d2947 100644 --- a/metrics/examples/s3/s3_example_simple/s3_example_simple.yaml +++ b/metrics/examples/s3/s3_example_simple/s3_example_simple.yaml @@ -12,7 +12,7 @@ ingest_sql: > * from ( - + -- metric_1 select @@ -20,20 +20,20 @@ ingest_sql: > 'metric_1' as metric_name, random() as metric_value union all - + -- metric_2 - + select get_current_timestamp() as metric_timestamp, 'metric_2' as metric_name, random() as metric_value union all - + -- metric_3 - + select get_current_timestamp() as metric_timestamp, 'metric_3' as metric_name, random() as metric_value - ); \ No newline at end of file + ); diff --git a/metrics/examples/sales/sales.sql b/metrics/examples/sales/sales.sql index 057dd4a..18714dd 100644 --- a/metrics/examples/sales/sales.sql +++ b/metrics/examples/sales/sales.sql @@ -18,17 +18,17 @@ select sales_pipeline_success_rate as ( -select +select get_current_timestamp() as metric_timestamp, 'sales_pipeline_success_rate' as metric_name, sales_pipeline_deals_sold.metric_value / sales_pipeline_deals_created.metric_value as metric_value from sales_pipeline_deals_created -join +join sales_pipeline_deals_sold -on +on date_trunc('day', sales_pipeline_deals_created.metric_timestamp) = date_trunc('day', sales_pipeline_deals_sold.metric_timestamp) -) +) select metric_timestamp, diff --git a/metrics/examples/snowflake/snowflake_example_simple/snowflake_example_simple.yaml b/metrics/examples/snowflake/snowflake_example_simple/snowflake_example_simple.yaml index cd3176b..b348098 100644 --- a/metrics/examples/snowflake/snowflake_example_simple/snowflake_example_simple.yaml +++ b/metrics/examples/snowflake/snowflake_example_simple/snowflake_example_simple.yaml @@ -14,7 +14,7 @@ ingest_sql: > * from ( - + -- metric_1 select @@ -22,20 +22,20 @@ ingest_sql: > 'metric_1' as metric_name, uniform(0::float, 1::float, random()) as metric_value union all - + -- metric_2 - + select current_timestamp() as metric_timestamp, 'metric_2' as metric_name, uniform(0::float, 1::float, random()) as metric_value union all - + -- metric_3 - + select current_timestamp() as metric_timestamp, 'metric_3' as metric_name, uniform(0::float, 1::float, random()) as metric_value - ); \ No newline at end of file + ); diff --git a/metrics/examples/users/users.sql b/metrics/examples/users/users.sql index 283bcba..f322aa6 100644 --- a/metrics/examples/users/users.sql +++ b/metrics/examples/users/users.sql @@ -22,15 +22,15 @@ select users_active_rate_{{ geo }} as ( -select +select get_current_timestamp() as metric_timestamp, 'users_active_rate_{{ geo }}' as metric_name, users_active_{{ geo }}.metric_value / users_total_{{ geo }}.metric_value as metric_value from users_total_{{ geo }} -join +join users_active_{{ geo }} -on +on date_trunc('day', users_total_{{ geo }}.metric_timestamp) = date_trunc('day', users_active_{{ geo }}.metric_timestamp) ) {% if not loop.last %},{% endif %} diff --git a/metrics/examples/weather/ingest_weather.py b/metrics/examples/weather/ingest_weather.py index da232a7..5b2bf30 100644 --- a/metrics/examples/weather/ingest_weather.py +++ b/metrics/examples/weather/ingest_weather.py @@ -10,8 +10,8 @@ def ingest() -> pd.DataFrame: data = { city: requests.get( - url = f"https://api.open-meteo.com/v1/forecast?latitude={lat}&longitude={lng}¤t=temperature_2m", - timeout = 10 + url=f"https://api.open-meteo.com/v1/forecast?latitude={lat}&longitude={lng}¤t=temperature_2m", + timeout=10, ).json()["current"]["temperature_2m"] for (city, lat, lng) in [ ("dublin", 53.3441, -6.2675), diff --git a/metrics/examples/weather/weather.yaml b/metrics/examples/weather/weather.yaml index 8c164fc..1bf7ba7 100644 --- a/metrics/examples/weather/weather.yaml +++ b/metrics/examples/weather/weather.yaml @@ -12,4 +12,4 @@ alert_always: False alert_methods: "email" ingest_fn: > {% include "./examples/weather/ingest_weather.py" %} - \ No newline at end of file + diff --git a/metrics/examples/weather_forecast/weather_forecast.sql b/metrics/examples/weather_forecast/weather_forecast.sql index 29de0ee..6b591fd 100644 --- a/metrics/examples/weather_forecast/weather_forecast.sql +++ b/metrics/examples/weather_forecast/weather_forecast.sql @@ -1,6 +1,6 @@ with -forecast_data as +forecast_data as ( SELECT CASE @@ -24,9 +24,9 @@ SELECT avg(avg_wind_speed_10m_mph) as avg_wind_speed, avg(avg_humidity_relative_2m_pct) as avg_humidity, avg(avg_cloud_cover_tot_pct) as avg_cloud_cover -FROM +FROM global_weather__climate_data_for_bi.standard_tile.forecast_day -WHERE +WHERE date_valid_std >= DATEADD(DAY, 0, CURRENT_DATE()) AND date_valid_std <= DATEADD(DAY, 3, CURRENT_DATE()) @@ -58,5 +58,3 @@ select from forecast_data_clean ; - - diff --git a/metrics/examples/weather_forecast/weather_forecast.yaml b/metrics/examples/weather_forecast/weather_forecast.yaml index 34f4f4a..60c65f8 100644 --- a/metrics/examples/weather_forecast/weather_forecast.yaml +++ b/metrics/examples/weather_forecast/weather_forecast.yaml @@ -11,4 +11,4 @@ alert_always: False alert_methods: "email" ingest_sql: > {% include "./examples/weather_forecast/weather_forecast.sql" %} - \ No newline at end of file + diff --git a/metrics/examples/yfinance/yfinance.py b/metrics/examples/yfinance/yfinance.py index 94b26a2..15add55 100644 --- a/metrics/examples/yfinance/yfinance.py +++ b/metrics/examples/yfinance/yfinance.py @@ -3,17 +3,15 @@ def ingest() -> pd.DataFrame: """ - Credit to: https://stackoverflow.com/a/76580610/1919374 + Credit to: https://stackoverflow.com/a/76580610/1919374 """ - import requests import pandas as pd + import requests - apiBase = "https://query2.finance.yahoo.com" headers = {"User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64)"} - - + def getCredentials( cookieUrl="https://fc.yahoo.com", crumbUrl=apiBase + "/v1/test/getcrumb" ): @@ -36,7 +34,6 @@ def quote(symbols, credentials): quotes = response.json()["quoteResponse"]["result"] return quotes - symbols = ["GOOG", "TSLA", "AAPL", "MSFT"] credentials = getCredentials() diff --git a/metrics/examples/yfinance/yfinance.yaml b/metrics/examples/yfinance/yfinance.yaml index 32bc3d1..061e9c0 100644 --- a/metrics/examples/yfinance/yfinance.yaml +++ b/metrics/examples/yfinance/yfinance.yaml @@ -12,4 +12,4 @@ alert_always: False alert_methods: "email" ingest_fn: > {% include "./examples/yfinance/yfinance.py" %} - \ No newline at end of file + diff --git a/requirements.txt b/requirements.txt index dc299bc..e3d6242 100644 --- a/requirements.txt +++ b/requirements.txt @@ -23,4 +23,4 @@ pyyaml snowflake-connector-python[pandas] seaborn streamlit -tabulate \ No newline at end of file +tabulate diff --git a/workspace.yaml b/workspace.yaml index f567c8f..23f9127 100644 --- a/workspace.yaml +++ b/workspace.yaml @@ -3,4 +3,4 @@ load_from: - grpc_server: host: anomstack_code port: 4000 - location_name: "anomstack_code" \ No newline at end of file + location_name: "anomstack_code" From dfac954a1f00f7007f7016d31c62f9c90e2ecd4a Mon Sep 17 00:00:00 2001 From: Andrew Maguire Date: Tue, 14 Nov 2023 12:15:18 +0000 Subject: [PATCH 2/2] add pre-commit gh action --- .github/workflows/pre-commit.yaml | 35 +++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 .github/workflows/pre-commit.yaml diff --git a/.github/workflows/pre-commit.yaml b/.github/workflows/pre-commit.yaml new file mode 100644 index 0000000..e627fc3 --- /dev/null +++ b/.github/workflows/pre-commit.yaml @@ -0,0 +1,35 @@ +name: pre-commit + +on: + workflow_call: + pull_request_target: + +jobs: + pre-commit: + runs-on: ubuntu-latest + steps: + # in case of PR, check out the PR's head branch + - uses: actions/checkout@v3 + if: github.event_name == 'pull_request_target' + with: + ref: ${{ github.event.pull_request.head.sha }} + + # in case of push, check out the main branch + - uses: actions/checkout@v3 + if: github.event_name != 'pull_request_target' + + - uses: actions/setup-python@v4 + with: + python-version: "3.10" + cache: "pip" + cache-dependency-path: "**/requirements*.txt" + - uses: pre-commit/action@v3.0.0 + - name: Post PR comment on failure + if: failure() && github.event_name == 'pull_request_target' + uses: peter-evans/create-or-update-comment@v2 + with: + issue-number: ${{ github.event.pull_request.number }} + body: | + :x: **pre-commit** failed. + Please run `pre-commit run --all-files` locally and commit the changes. + Find more information in the repository's CONTRIBUTING.md \ No newline at end of file