From c99e173c0ec6f0649b99f65d9c77bf7c3c505e9f Mon Sep 17 00:00:00 2001 From: andrewm4894 Date: Sat, 25 Jan 2025 23:34:11 +0000 Subject: [PATCH 01/15] Add environment variable examples for Turso integration --- .example.env | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/.example.env b/.example.env index a762a11..5c01e30 100644 --- a/.example.env +++ b/.example.env @@ -26,6 +26,9 @@ ANOMSTACK_DUCKDB_PATH=tmpdata/anomstack-duckdb.db # local sqlite path for testing/dev quickstart ANOMSTACK_SQLITE_PATH=tmpdata/anomstack-sqlite.db +# example using turso +# https://docs.turso.tech/sdk/python/quickstart +# ANOMSTACK_SQLITE_PATH=libsql://.turso.io # table id to store metrics in ANOMSTACK_TABLE_KEY=tmp.metrics @@ -97,3 +100,7 @@ ANOMSTACK_POSTGRES_FORWARD_PORT=5432 # motherduck related env vars ANOMSTACK_MOTHERDUCK_TOKEN= + +# turso related env vars +ANOMSTACK_TURSO_DATABASE_URL= +ANOMSTACK_TURSO_AUTH_TOKEN= From f3d863fcda86fa26032d4982663ebb007c872218 Mon Sep 17 00:00:00 2001 From: andrewm4894 Date: Sat, 25 Jan 2025 23:34:20 +0000 Subject: [PATCH 02/15] Add Turso support to the README documentation --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index dc18ded..9e7e7df 100644 --- a/README.md +++ b/README.md @@ -50,6 +50,7 @@ Supported sources and databases for your metrics to live in and be queried from: DuckDB SQLite MotherDuck + Turso Redshift @@ -61,6 +62,7 @@ Supported sources and databases for your metrics to live in and be queried from: ✅ ✅ ✅ + ✅ 🚧 From e71dad5f8da1fb42df98ee79e6c080fdeaf97f61 Mon Sep 17 00:00:00 2001 From: andrewm4894 Date: Sat, 25 Jan 2025 23:34:43 +0000 Subject: [PATCH 03/15] Add libsql-experimental to requirements files --- requirements.compile | 1 + requirements.txt | 2 ++ 2 files changed, 3 insertions(+) diff --git a/requirements.compile b/requirements.compile index 398e2c1..c972d2c 100644 --- a/requirements.compile +++ b/requirements.compile @@ -13,6 +13,7 @@ fh-plotly google-auth google-cloud-bigquery Jinja2 +libsql-experimental matplotlib numpy oscrypto diff --git a/requirements.txt b/requirements.txt index 28e0318..e5e54c4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -285,6 +285,8 @@ jsonschema-specifications==2023.12.1 # via jsonschema kiwisolver==1.4.7 # via matplotlib +libsql-experimental==0.0.41 + # via -r requirements.compile llvmlite==0.43.0 # via numba mako==1.3.5 From 35becd8c11a75d5a41b77d213cf23caa4eb8d83b Mon Sep 17 00:00:00 2001 From: andrewm4894 Date: Sat, 25 Jan 2025 23:34:58 +0000 Subject: [PATCH 04/15] Change default database type from DuckDB to SQLite in defaults.yaml --- metrics/defaults/defaults.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metrics/defaults/defaults.yaml b/metrics/defaults/defaults.yaml index c43597f..faf7e04 100644 --- a/metrics/defaults/defaults.yaml +++ b/metrics/defaults/defaults.yaml @@ -1,5 +1,5 @@ # default values to be applied to all batches unless overridden in metric batch specific yaml files. -db: "duckdb" # database type to use. +db: "sqlite" # database type to use. table_key: "metrics" # table to store metrics in. ############################################ From 6e2233bc2861336ed9a6b05e9823601ab4d9e105 Mon Sep 17 00:00:00 2001 From: andrewm4894 Date: Sat, 25 Jan 2025 23:35:08 +0000 Subject: [PATCH 05/15] Add function to generate batched SQL insert statements from DataFrame --- anomstack/df/utils.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/anomstack/df/utils.py b/anomstack/df/utils.py index a5a1356..26bd26b 100644 --- a/anomstack/df/utils.py +++ b/anomstack/df/utils.py @@ -17,3 +17,25 @@ def log_df_info(df: pd.DataFrame, logger=None): df.info(buf=buffer) info_str = buffer.getvalue() logger.info("df.info():\n%s", info_str) + + +def generate_insert_sql(df, table_name, batch_size=100) -> str: + """Generate SQL DDL and batched DML from DataFrame.""" + columns = ', '.join(df.columns) + insert_sqls = [] + for i in range(0, len(df), batch_size): + batch = df.iloc[i:i+batch_size] + values_list = [] + for _, row in batch.iterrows(): + row_values = [] + for val in row: + if isinstance(val, str) or isinstance(val, pd.Timestamp): + row_values.append(f'\'{val}\'') + else: + row_values.append(str(val)) + values_list.append(f"({', '.join(row_values)})") + values = ', '.join(values_list) + insert_sql = f"INSERT INTO {table_name} ({columns}) VALUES {values};" + insert_sqls.append(insert_sql) + + return insert_sqls From 680b727fd269f06fd5798d67d33789ee77f232d2 Mon Sep 17 00:00:00 2001 From: andrewm4894 Date: Sat, 25 Jan 2025 23:35:20 +0000 Subject: [PATCH 06/15] Refactor SQLite integration to use libsql and improve connection handling --- anomstack/external/sqlite/sqlite.py | 90 +++++++++++++++++++++++------ 1 file changed, 71 insertions(+), 19 deletions(-) diff --git a/anomstack/external/sqlite/sqlite.py b/anomstack/external/sqlite/sqlite.py index 3101a64..0b630da 100644 --- a/anomstack/external/sqlite/sqlite.py +++ b/anomstack/external/sqlite/sqlite.py @@ -3,16 +3,62 @@ """ import os -import sqlite3 +import libsql_experimental as libsql import time import pandas as pd from dagster import get_dagster_logger +from anomstack.df.utils import generate_insert_sql MAX_RETRIES = 5 RETRY_DELAY = 1 +def get_conn(sqlite_path: str) -> libsql.Connection: + """ + Get a connection to the SQLite database. + + Args: + sqlite_path (str): The path to the SQLite database. + + Returns: + libsql.Connection: The connection object. + """ + if sqlite_path.endswith('turso.io'): + url = os.environ.get("ANOMSTACK_TURSO_DATABASE_URL", None) + auth_token = os.environ.get("ANOMSTACK_TURSO_AUTH_TOKEN", None) + conn = libsql.connect(sqlite_path, sync_url=url, auth_token=auth_token) + else: + os.makedirs(os.path.dirname(sqlite_path), exist_ok=True) + conn = libsql.connect(sqlite_path) + return conn + + +def infer_sqlite_type(dtype): + """Map pandas dtypes to SQLite types.""" + if pd.api.types.is_integer_dtype(dtype): + return "INTEGER" + elif pd.api.types.is_float_dtype(dtype): + return "REAL" + elif pd.api.types.is_string_dtype(dtype): + return "TEXT" + elif pd.api.types.is_datetime64_any_dtype(dtype): + return "TEXT" + else: + return "TEXT" + + +def generate_create_table_sql(df, table_name) -> str: + """Generate SQL DDL and batched DML from DataFrame.""" + # Infer column types for CREATE TABLE + column_defs = [ + f"{col} {infer_sqlite_type(dtype)}" + for col, dtype in zip(df.columns, df.dtypes) + ] + create_table_sql = f"CREATE TABLE IF NOT EXISTS {table_name} ({', '.join(column_defs)});" + return create_table_sql + + def read_sql_sqlite(sql: str) -> pd.DataFrame: """ Read data from SQLite with retry logic. @@ -26,16 +72,17 @@ def read_sql_sqlite(sql: str) -> pd.DataFrame: logger = get_dagster_logger() sqlite_path = os.environ.get("ANOMSTACK_SQLITE_PATH", "tmpdata/anomstack-sqlite.db") logger.info(f"sqlite_path: {sqlite_path}") - os.makedirs(os.path.dirname(sqlite_path), exist_ok=True) + + if not sqlite_path.endswith('turso.io'): + os.makedirs(os.path.dirname(sqlite_path), exist_ok=True) attempt = 0 while attempt < MAX_RETRIES: try: - conn = sqlite3.connect(sqlite_path) + conn = get_conn(sqlite_path) df = pd.read_sql_query(sql, conn) - conn.close() return df - except sqlite3.OperationalError as e: + except Exception as e: if "database is locked" in str(e): attempt += 1 logger.warning( @@ -46,12 +93,9 @@ def read_sql_sqlite(sql: str) -> pd.DataFrame: else: logger.error(f"Error reading from SQLite: {e}") raise - finally: - if 'conn' in locals(): - conn.close() # If all retries fail, raise an error - raise sqlite3.OperationalError("Database is locked after multiple attempts.") + raise Exception("Database is locked after multiple attempts.") def save_df_sqlite(df: pd.DataFrame, table_key: str) -> pd.DataFrame: @@ -68,16 +112,22 @@ def save_df_sqlite(df: pd.DataFrame, table_key: str) -> pd.DataFrame: logger = get_dagster_logger() sqlite_path = os.environ.get("ANOMSTACK_SQLITE_PATH", "tmpdata/anomstack-sqlite.db") logger.info(f"sqlite_path: {sqlite_path}") - os.makedirs(os.path.dirname(sqlite_path), exist_ok=True) + + if not sqlite_path.endswith('turso.io'): + os.makedirs(os.path.dirname(sqlite_path), exist_ok=True) attempt = 0 while attempt < MAX_RETRIES: try: - conn = sqlite3.connect(sqlite_path) - df.to_sql(table_key, conn, if_exists='append', index=False) - conn.close() + conn = get_conn(sqlite_path) + create_table_sql = generate_create_table_sql(df, table_key) + insert_sqls = generate_insert_sql(df, table_key) + conn.execute(create_table_sql) + for sql in insert_sqls: + conn.execute(sql) + conn.commit() return df - except sqlite3.OperationalError as e: + except Exception as e: if "database is locked" in str(e): attempt += 1 logger.warning( @@ -89,7 +139,7 @@ def save_df_sqlite(df: pd.DataFrame, table_key: str) -> pd.DataFrame: logger.error(f"Error saving DataFrame to SQLite: {e}") raise # If all retries fail, raise an error - raise sqlite3.OperationalError("Database is locked after multiple attempts.") + raise Exception("Database is locked after multiple attempts.") def run_sql_sqlite(sql: str) -> None: @@ -105,19 +155,21 @@ def run_sql_sqlite(sql: str) -> None: logger = get_dagster_logger() sqlite_path = os.environ.get("ANOMSTACK_SQLITE_PATH", "tmpdata/anomstack-sqlite.db") logger.info(f"sqlite_path: {sqlite_path}") - os.makedirs(os.path.dirname(sqlite_path), exist_ok=True) + + if not sqlite_path.endswith('turso.io'): + os.makedirs(os.path.dirname(sqlite_path), exist_ok=True) attempt = 0 while attempt < MAX_RETRIES: try: - conn = sqlite3.connect(sqlite_path) + conn = get_conn(sqlite_path) cursor = conn.cursor() cursor.execute(sql) conn.commit() cursor.close() conn.close() return - except sqlite3.OperationalError as e: + except Exception as e: if "database is locked" in str(e): attempt += 1 logger.warning( @@ -133,4 +185,4 @@ def run_sql_sqlite(sql: str) -> None: conn.close() # If all retries fail, raise an error - raise sqlite3.OperationalError("Database is locked after multiple attempts.") + raise Exception("Database is locked after multiple attempts.") From b21d60aa89886b195bc69b82b0d473dde06dc86c Mon Sep 17 00:00:00 2001 From: andrewm4894 Date: Sun, 26 Jan 2025 00:24:56 +0000 Subject: [PATCH 07/15] Add utility function to extract columns from SQL queries --- anomstack/sql/utils.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 anomstack/sql/utils.py diff --git a/anomstack/sql/utils.py b/anomstack/sql/utils.py new file mode 100644 index 0000000..95d4e92 --- /dev/null +++ b/anomstack/sql/utils.py @@ -0,0 +1,22 @@ +from typing import List +import sqlglot +import sqlglot.expressions as exp + + +def get_columns_from_sql(sql: str) -> List[str]: + """ + Get the columns from a SQL query. + + Args: + sql (str): The SQL query to extract columns from. + + Returns: + List[str]: The columns in the SQL query. + """ + columns = [] + for expression in sqlglot.parse_one(sql).find(exp.Select).args["expressions"]: + if isinstance(expression, exp.Alias): + columns.append(expression.text("alias")) + elif isinstance(expression, exp.Column): + columns.append(expression.text("this")) + return columns From 9b19a0d9b22ad87d49bab41b1a1f3a2ba9deb2ca Mon Sep 17 00:00:00 2001 From: andrewm4894 Date: Sun, 26 Jan 2025 00:25:03 +0000 Subject: [PATCH 08/15] Remove unnecessary blank line in dev.py --- dev.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dev.py b/dev.py index f56bebd..3be2e2f 100644 --- a/dev.py +++ b/dev.py @@ -1,5 +1,4 @@ #%% - #%% From 6de34538e5418ec5ae629be07f48ea8d90813b9a Mon Sep 17 00:00:00 2001 From: andrewm4894 Date: Sun, 26 Jan 2025 00:25:12 +0000 Subject: [PATCH 09/15] Refactor read_sql_sqlite to use cursor for fetching rows and handle column extraction --- anomstack/external/sqlite/sqlite.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/anomstack/external/sqlite/sqlite.py b/anomstack/external/sqlite/sqlite.py index 0b630da..2d4d383 100644 --- a/anomstack/external/sqlite/sqlite.py +++ b/anomstack/external/sqlite/sqlite.py @@ -6,6 +6,7 @@ import libsql_experimental as libsql import time +from anomstack.sql.utils import get_columns_from_sql import pandas as pd from dagster import get_dagster_logger from anomstack.df.utils import generate_insert_sql @@ -80,7 +81,11 @@ def read_sql_sqlite(sql: str) -> pd.DataFrame: while attempt < MAX_RETRIES: try: conn = get_conn(sqlite_path) - df = pd.read_sql_query(sql, conn) + cursor = conn.execute(sql) + rows = cursor.fetchall() + columns = [desc[0] for desc in cursor.description] if cursor.description else get_columns_from_sql(sql) + df = pd.DataFrame(rows, columns=columns) + cursor.close() return df except Exception as e: if "database is locked" in str(e): From 87033fffcf08b80b68dd294849ca1a5baa73b6bb Mon Sep 17 00:00:00 2001 From: andrewm4894 Date: Sun, 26 Jan 2025 00:26:04 +0000 Subject: [PATCH 10/15] Remove redundant connection close in run_sql_sqlite function --- anomstack/external/sqlite/sqlite.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/anomstack/external/sqlite/sqlite.py b/anomstack/external/sqlite/sqlite.py index 2d4d383..2944c37 100644 --- a/anomstack/external/sqlite/sqlite.py +++ b/anomstack/external/sqlite/sqlite.py @@ -172,7 +172,6 @@ def run_sql_sqlite(sql: str) -> None: cursor.execute(sql) conn.commit() cursor.close() - conn.close() return except Exception as e: if "database is locked" in str(e): @@ -185,9 +184,6 @@ def run_sql_sqlite(sql: str) -> None: else: logger.error(f"Error executing SQL statement: {e}") raise - finally: - if 'conn' in locals(): - conn.close() # If all retries fail, raise an error raise Exception("Database is locked after multiple attempts.") From 937ba3814b7f08f0ef7e55db88e7c00c2290487d Mon Sep 17 00:00:00 2001 From: andrewm4894 Date: Sun, 26 Jan 2025 00:42:22 +0000 Subject: [PATCH 11/15] Refactor SQL query in train.sql to select specific columns instead of using wildcard --- metrics/defaults/sql/train.sql | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/metrics/defaults/sql/train.sql b/metrics/defaults/sql/train.sql index 3afd3c2..fdd65c1 100644 --- a/metrics/defaults/sql/train.sql +++ b/metrics/defaults/sql/train.sql @@ -27,14 +27,19 @@ group by metric_timestamp, metric_name data_ranked as ( select - *, + metric_timestamp, + metric_name, + metric_value, row_number() over (partition by metric_name order by metric_timestamp desc) as metric_recency_rank from data ) select - * + metric_timestamp, + metric_name, + metric_value, + metric_recency_rank from data_ranked where From 7935d08fff4f22d0467c5a3c5051dd9ecddfdca1 Mon Sep 17 00:00:00 2001 From: andrewm4894 Date: Sun, 26 Jan 2025 00:43:17 +0000 Subject: [PATCH 12/15] Refactor SQLite helper functions to include retry logic and improve connection management --- anomstack/external/sqlite/sqlite.py | 240 +++++++++++++++------------- 1 file changed, 131 insertions(+), 109 deletions(-) diff --git a/anomstack/external/sqlite/sqlite.py b/anomstack/external/sqlite/sqlite.py index 2944c37..5613fc9 100644 --- a/anomstack/external/sqlite/sqlite.py +++ b/anomstack/external/sqlite/sqlite.py @@ -1,189 +1,211 @@ """ -Some helper functions for sqlite. +Refactored helper functions for SQLite (or Turso) with retry logic. """ import os -import libsql_experimental as libsql import time - -from anomstack.sql.utils import get_columns_from_sql import pandas as pd + +# Use the contextlib library for the connection context manager +from contextlib import contextmanager + from dagster import get_dagster_logger +import libsql_experimental as libsql + +from anomstack.sql.utils import get_columns_from_sql from anomstack.df.utils import generate_insert_sql MAX_RETRIES = 5 RETRY_DELAY = 1 -def get_conn(sqlite_path: str) -> libsql.Connection: +def get_sqlite_path() -> str: + """ + Returns the path to the SQLite (or Turso) database, + creating directories if needed. + + By default, uses the env var ANOMSTACK_SQLITE_PATH, + or falls back to "tmpdata/anomstack-sqlite.db". """ - Get a connection to the SQLite database. + default_path = "tmpdata/anomstack-sqlite.db" + path = os.environ.get("ANOMSTACK_SQLITE_PATH", default_path) + # If not a Turso URI, create directories for local DB path + if not path.endswith("turso.io"): + os.makedirs(os.path.dirname(path), exist_ok=True) + return path - Args: - sqlite_path (str): The path to the SQLite database. +def get_conn(sqlite_path: str) -> libsql.Connection: + """ + Get a connection to the SQLite or Turso database. + + If the path ends with 'turso.io', it uses the + ANOMSTACK_TURSO_DATABASE_URL and ANOMSTACK_TURSO_AUTH_TOKEN + environment variables for authentication. + Otherwise, it connects to a local SQLite database. + + Args: + sqlite_path (str): The path or URL of the database. + Returns: libsql.Connection: The connection object. """ - if sqlite_path.endswith('turso.io'): + if sqlite_path.endswith("turso.io"): url = os.environ.get("ANOMSTACK_TURSO_DATABASE_URL", None) auth_token = os.environ.get("ANOMSTACK_TURSO_AUTH_TOKEN", None) - conn = libsql.connect(sqlite_path, sync_url=url, auth_token=auth_token) + return libsql.connect(sqlite_path, sync_url=url, auth_token=auth_token) else: - os.makedirs(os.path.dirname(sqlite_path), exist_ok=True) - conn = libsql.connect(sqlite_path) - return conn + return libsql.connect(sqlite_path) -def infer_sqlite_type(dtype): - """Map pandas dtypes to SQLite types.""" +def with_sqlite_retry(action, logger=None, max_retries=MAX_RETRIES, retry_delay=RETRY_DELAY): + """ + Executes a callable with retry logic if the database is locked. + + Args: + action (callable): A zero-argument function that performs the DB action and returns a value. + logger (Logger, optional): Logger for logging warnings/errors. Defaults to None. + max_retries (int, optional): Maximum number of retries. Defaults to MAX_RETRIES. + retry_delay (float, optional): Delay in seconds between retries. Defaults to RETRY_DELAY. + + Returns: + The result of 'action' if successful. + + Raises: + Exception: If the database remains locked after all retries or another error occurs. + """ + for attempt in range(max_retries): + try: + return action() + except Exception as e: + if "database is locked" in str(e): + if logger: + logger.warning( + f"Database is locked; attempt {attempt + 1} of {max_retries}. " + f"Retrying in {retry_delay} seconds..." + ) + time.sleep(retry_delay) + else: + if logger: + logger.error(f"Error during DB action: {e}") + raise + raise Exception("Database is locked after multiple attempts.") + + +@contextmanager +def sqlite_connection(): + """ + Context manager that yields a DB connection, ensuring it is closed on exit. + """ + path = get_sqlite_path() + conn = get_conn(path) + yield conn + + +def infer_sqlite_type(dtype) -> str: + """ + Map pandas dtypes to SQLite types. + + Args: + dtype: A pandas dtype (e.g. df.dtypes[col]). + + Returns: + str: The corresponding SQLite type name. + """ if pd.api.types.is_integer_dtype(dtype): return "INTEGER" elif pd.api.types.is_float_dtype(dtype): return "REAL" - elif pd.api.types.is_string_dtype(dtype): - return "TEXT" elif pd.api.types.is_datetime64_any_dtype(dtype): return "TEXT" else: return "TEXT" -def generate_create_table_sql(df, table_name) -> str: - """Generate SQL DDL and batched DML from DataFrame.""" - # Infer column types for CREATE TABLE +def generate_create_table_sql(df: pd.DataFrame, table_name: str) -> str: + """ + Generate the CREATE TABLE statement for a given DataFrame. + + Args: + df (pd.DataFrame): The DataFrame whose columns are used to infer table schema. + table_name (str): The name of the table. + + Returns: + str: The CREATE TABLE SQL statement. + """ column_defs = [ f"{col} {infer_sqlite_type(dtype)}" for col, dtype in zip(df.columns, df.dtypes) ] - create_table_sql = f"CREATE TABLE IF NOT EXISTS {table_name} ({', '.join(column_defs)});" - return create_table_sql + return f"CREATE TABLE IF NOT EXISTS {table_name} ({', '.join(column_defs)});" def read_sql_sqlite(sql: str) -> pd.DataFrame: """ - Read data from SQLite with retry logic. - + Read data from SQLite (or Turso) with retry logic. + Args: sql (str): The SQL query to execute. - + Returns: pd.DataFrame: The result of the SQL query as a pandas DataFrame. """ logger = get_dagster_logger() - sqlite_path = os.environ.get("ANOMSTACK_SQLITE_PATH", "tmpdata/anomstack-sqlite.db") - logger.info(f"sqlite_path: {sqlite_path}") - - if not sqlite_path.endswith('turso.io'): - os.makedirs(os.path.dirname(sqlite_path), exist_ok=True) + logger.info(f"Reading from DB path: {get_sqlite_path()}") - attempt = 0 - while attempt < MAX_RETRIES: - try: - conn = get_conn(sqlite_path) + def _action(): + with sqlite_connection() as conn: cursor = conn.execute(sql) rows = cursor.fetchall() columns = [desc[0] for desc in cursor.description] if cursor.description else get_columns_from_sql(sql) - df = pd.DataFrame(rows, columns=columns) - cursor.close() - return df - except Exception as e: - if "database is locked" in str(e): - attempt += 1 - logger.warning( - f"Database is locked; attempt {attempt} of {MAX_RETRIES}. " - f"Retrying in {RETRY_DELAY} seconds..." - ) - time.sleep(RETRY_DELAY) - else: - logger.error(f"Error reading from SQLite: {e}") - raise + return pd.DataFrame(rows, columns=columns) - # If all retries fail, raise an error - raise Exception("Database is locked after multiple attempts.") + return with_sqlite_retry(_action, logger=logger) def save_df_sqlite(df: pd.DataFrame, table_key: str) -> pd.DataFrame: """ - Save df to db with retry logic. - + Save a DataFrame to the database (SQLite or Turso) with retry logic. + Args: df (pd.DataFrame): The DataFrame to save. table_key (str): The table name to save the DataFrame as. - + Returns: - pd.DataFrame: The input DataFrame. + pd.DataFrame: The input DataFrame (unchanged). """ logger = get_dagster_logger() - sqlite_path = os.environ.get("ANOMSTACK_SQLITE_PATH", "tmpdata/anomstack-sqlite.db") - logger.info(f"sqlite_path: {sqlite_path}") - - if not sqlite_path.endswith('turso.io'): - os.makedirs(os.path.dirname(sqlite_path), exist_ok=True) + logger.info(f"Saving DataFrame to DB path: {get_sqlite_path()}") - attempt = 0 - while attempt < MAX_RETRIES: - try: - conn = get_conn(sqlite_path) + def _action(): + with sqlite_connection() as conn: create_table_sql = generate_create_table_sql(df, table_key) - insert_sqls = generate_insert_sql(df, table_key) conn.execute(create_table_sql) - for sql in insert_sqls: - conn.execute(sql) + insert_sqls = generate_insert_sql(df, table_key) + for ins_sql in insert_sqls: + conn.execute(ins_sql) conn.commit() - return df - except Exception as e: - if "database is locked" in str(e): - attempt += 1 - logger.warning( - f"Database is locked; attempt {attempt} of {MAX_RETRIES}. " - f"Retrying in {RETRY_DELAY} seconds..." - ) - time.sleep(RETRY_DELAY) - else: - logger.error(f"Error saving DataFrame to SQLite: {e}") - raise - # If all retries fail, raise an error - raise Exception("Database is locked after multiple attempts.") + return df + + return with_sqlite_retry(_action, logger=logger) def run_sql_sqlite(sql: str) -> None: """ - Execute a non-returning SQL statement in SQLite with retry logic. - + Execute a non-returning SQL statement (e.g. CREATE, INSERT, UPDATE, DELETE) with retry logic. + Args: sql (str): The SQL statement to execute. - + Returns: None """ logger = get_dagster_logger() - sqlite_path = os.environ.get("ANOMSTACK_SQLITE_PATH", "tmpdata/anomstack-sqlite.db") - logger.info(f"sqlite_path: {sqlite_path}") - - if not sqlite_path.endswith('turso.io'): - os.makedirs(os.path.dirname(sqlite_path), exist_ok=True) + logger.info(f"Executing SQL against DB path: {get_sqlite_path()}") - attempt = 0 - while attempt < MAX_RETRIES: - try: - conn = get_conn(sqlite_path) - cursor = conn.cursor() - cursor.execute(sql) + def _action(): + with sqlite_connection() as conn: + conn.execute(sql) conn.commit() - cursor.close() - return - except Exception as e: - if "database is locked" in str(e): - attempt += 1 - logger.warning( - f"Database is locked; attempt {attempt} of {MAX_RETRIES}. " - f"Retrying in {RETRY_DELAY} seconds..." - ) - time.sleep(RETRY_DELAY) - else: - logger.error(f"Error executing SQL statement: {e}") - raise - # If all retries fail, raise an error - raise Exception("Database is locked after multiple attempts.") + with_sqlite_retry(_action, logger=logger) From ee4f1d95f04588fc3c67912fc0d93da5bd13ea34 Mon Sep 17 00:00:00 2001 From: andrewm4894 Date: Sun, 26 Jan 2025 00:45:21 +0000 Subject: [PATCH 13/15] Update docstring for SQLite helper functions to clarify retry logic implementation --- anomstack/external/sqlite/sqlite.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/anomstack/external/sqlite/sqlite.py b/anomstack/external/sqlite/sqlite.py index 5613fc9..9f1e40f 100644 --- a/anomstack/external/sqlite/sqlite.py +++ b/anomstack/external/sqlite/sqlite.py @@ -1,12 +1,10 @@ """ -Refactored helper functions for SQLite (or Turso) with retry logic. +Helper functions for SQLite (or Turso) with retry logic. """ import os import time import pandas as pd - -# Use the contextlib library for the connection context manager from contextlib import contextmanager from dagster import get_dagster_logger From 834505237b4201cd69cd90ef778043a91ff1998c Mon Sep 17 00:00:00 2001 From: andrewm4894 Date: Sun, 26 Jan 2025 00:53:05 +0000 Subject: [PATCH 14/15] fix precommit --- anomstack/df/utils.py | 2 +- anomstack/external/sqlite/sqlite.py | 40 ++++++++++++++--------------- anomstack/sql/utils.py | 1 + 3 files changed, 22 insertions(+), 21 deletions(-) diff --git a/anomstack/df/utils.py b/anomstack/df/utils.py index 26bd26b..00fe1c1 100644 --- a/anomstack/df/utils.py +++ b/anomstack/df/utils.py @@ -37,5 +37,5 @@ def generate_insert_sql(df, table_name, batch_size=100) -> str: values = ', '.join(values_list) insert_sql = f"INSERT INTO {table_name} ({columns}) VALUES {values};" insert_sqls.append(insert_sql) - + return insert_sqls diff --git a/anomstack/external/sqlite/sqlite.py b/anomstack/external/sqlite/sqlite.py index 9f1e40f..1207e88 100644 --- a/anomstack/external/sqlite/sqlite.py +++ b/anomstack/external/sqlite/sqlite.py @@ -4,14 +4,14 @@ import os import time -import pandas as pd from contextlib import contextmanager -from dagster import get_dagster_logger import libsql_experimental as libsql +import pandas as pd +from dagster import get_dagster_logger -from anomstack.sql.utils import get_columns_from_sql from anomstack.df.utils import generate_insert_sql +from anomstack.sql.utils import get_columns_from_sql MAX_RETRIES = 5 RETRY_DELAY = 1 @@ -21,7 +21,7 @@ def get_sqlite_path() -> str: """ Returns the path to the SQLite (or Turso) database, creating directories if needed. - + By default, uses the env var ANOMSTACK_SQLITE_PATH, or falls back to "tmpdata/anomstack-sqlite.db". """ @@ -36,15 +36,15 @@ def get_sqlite_path() -> str: def get_conn(sqlite_path: str) -> libsql.Connection: """ Get a connection to the SQLite or Turso database. - + If the path ends with 'turso.io', it uses the ANOMSTACK_TURSO_DATABASE_URL and ANOMSTACK_TURSO_AUTH_TOKEN environment variables for authentication. Otherwise, it connects to a local SQLite database. - + Args: sqlite_path (str): The path or URL of the database. - + Returns: libsql.Connection: The connection object. """ @@ -59,16 +59,16 @@ def get_conn(sqlite_path: str) -> libsql.Connection: def with_sqlite_retry(action, logger=None, max_retries=MAX_RETRIES, retry_delay=RETRY_DELAY): """ Executes a callable with retry logic if the database is locked. - + Args: action (callable): A zero-argument function that performs the DB action and returns a value. logger (Logger, optional): Logger for logging warnings/errors. Defaults to None. max_retries (int, optional): Maximum number of retries. Defaults to MAX_RETRIES. retry_delay (float, optional): Delay in seconds between retries. Defaults to RETRY_DELAY. - + Returns: The result of 'action' if successful. - + Raises: Exception: If the database remains locked after all retries or another error occurs. """ @@ -103,10 +103,10 @@ def sqlite_connection(): def infer_sqlite_type(dtype) -> str: """ Map pandas dtypes to SQLite types. - + Args: dtype: A pandas dtype (e.g. df.dtypes[col]). - + Returns: str: The corresponding SQLite type name. """ @@ -123,11 +123,11 @@ def infer_sqlite_type(dtype) -> str: def generate_create_table_sql(df: pd.DataFrame, table_name: str) -> str: """ Generate the CREATE TABLE statement for a given DataFrame. - + Args: df (pd.DataFrame): The DataFrame whose columns are used to infer table schema. table_name (str): The name of the table. - + Returns: str: The CREATE TABLE SQL statement. """ @@ -141,10 +141,10 @@ def generate_create_table_sql(df: pd.DataFrame, table_name: str) -> str: def read_sql_sqlite(sql: str) -> pd.DataFrame: """ Read data from SQLite (or Turso) with retry logic. - + Args: sql (str): The SQL query to execute. - + Returns: pd.DataFrame: The result of the SQL query as a pandas DataFrame. """ @@ -164,11 +164,11 @@ def _action(): def save_df_sqlite(df: pd.DataFrame, table_key: str) -> pd.DataFrame: """ Save a DataFrame to the database (SQLite or Turso) with retry logic. - + Args: df (pd.DataFrame): The DataFrame to save. table_key (str): The table name to save the DataFrame as. - + Returns: pd.DataFrame: The input DataFrame (unchanged). """ @@ -191,10 +191,10 @@ def _action(): def run_sql_sqlite(sql: str) -> None: """ Execute a non-returning SQL statement (e.g. CREATE, INSERT, UPDATE, DELETE) with retry logic. - + Args: sql (str): The SQL statement to execute. - + Returns: None """ diff --git a/anomstack/sql/utils.py b/anomstack/sql/utils.py index 95d4e92..353d574 100644 --- a/anomstack/sql/utils.py +++ b/anomstack/sql/utils.py @@ -1,4 +1,5 @@ from typing import List + import sqlglot import sqlglot.expressions as exp From b26b79e0bd6fd71e27da5db1f1c23255c185d307 Mon Sep 17 00:00:00 2001 From: andrewm4894 Date: Sun, 26 Jan 2025 00:56:42 +0000 Subject: [PATCH 15/15] clean up --- anomstack/external/sqlite/sqlite.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/anomstack/external/sqlite/sqlite.py b/anomstack/external/sqlite/sqlite.py index 1207e88..f78e145 100644 --- a/anomstack/external/sqlite/sqlite.py +++ b/anomstack/external/sqlite/sqlite.py @@ -170,7 +170,7 @@ def save_df_sqlite(df: pd.DataFrame, table_key: str) -> pd.DataFrame: table_key (str): The table name to save the DataFrame as. Returns: - pd.DataFrame: The input DataFrame (unchanged). + pd.DataFrame: The input DataFrame. """ logger = get_dagster_logger() logger.info(f"Saving DataFrame to DB path: {get_sqlite_path()}")