Skip to content

Commit

Permalink
Merge pull request #130 from andrewm4894/turso
Browse files Browse the repository at this point in the history
Add Turso
  • Loading branch information
andrewm4894 authored Jan 26, 2025
2 parents 4656e4f + b26b79e commit 6633c7b
Show file tree
Hide file tree
Showing 10 changed files with 220 additions and 86 deletions.
7 changes: 7 additions & 0 deletions .example.env
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ ANOMSTACK_DUCKDB_PATH=tmpdata/anomstack-duckdb.db

# local sqlite path for testing/dev quickstart
ANOMSTACK_SQLITE_PATH=tmpdata/anomstack-sqlite.db
# example using turso
# https://docs.turso.tech/sdk/python/quickstart
# ANOMSTACK_SQLITE_PATH=libsql://<your-database-url>.turso.io

# table id to store metrics in
ANOMSTACK_TABLE_KEY=tmp.metrics
Expand Down Expand Up @@ -97,3 +100,7 @@ ANOMSTACK_POSTGRES_FORWARD_PORT=5432

# motherduck related env vars
ANOMSTACK_MOTHERDUCK_TOKEN=

# turso related env vars
ANOMSTACK_TURSO_DATABASE_URL=
ANOMSTACK_TURSO_AUTH_TOKEN=
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ Supported sources and databases for your metrics to live in and be queried from:
<th align="center"><a href="./anomstack/external/duckdb/duckdb.py" target="_blank">DuckDB</a></th>
<th align="center"><a href="./anomstack/external/sqlite/sqlite.py" target="_blank">SQLite</a></th>
<th align="center"><a href="./anomstack/external/duckdb/duckdb.py" target="_blank">MotherDuck</a></th>
<th align="center"><a href="./anomstack/external/sqlite/sqlite.py" target="_blank">Turso</a></th>
<th align="center">Redshift</th>
</tr>
</thead>
Expand All @@ -61,6 +62,7 @@ Supported sources and databases for your metrics to live in and be queried from:
<td align="center">✅</td>
<td align="center">✅</td>
<td align="center">✅</td>
<td align="center">✅</td>
<td align="center">🚧</td>
</tr>
</tbody>
Expand Down
22 changes: 22 additions & 0 deletions anomstack/df/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,25 @@ def log_df_info(df: pd.DataFrame, logger=None):
df.info(buf=buffer)
info_str = buffer.getvalue()
logger.info("df.info():\n%s", info_str)


def generate_insert_sql(df, table_name, batch_size=100) -> str:
"""Generate SQL DDL and batched DML from DataFrame."""
columns = ', '.join(df.columns)
insert_sqls = []
for i in range(0, len(df), batch_size):
batch = df.iloc[i:i+batch_size]
values_list = []
for _, row in batch.iterrows():
row_values = []
for val in row:
if isinstance(val, str) or isinstance(val, pd.Timestamp):
row_values.append(f'\'{val}\'')
else:
row_values.append(str(val))
values_list.append(f"({', '.join(row_values)})")
values = ', '.join(values_list)
insert_sql = f"INSERT INTO {table_name} ({columns}) VALUES {values};"
insert_sqls.append(insert_sql)

return insert_sqls
237 changes: 155 additions & 82 deletions anomstack/external/sqlite/sqlite.py
Original file line number Diff line number Diff line change
@@ -1,62 +1,169 @@
"""
Some helper functions for sqlite.
Helper functions for SQLite (or Turso) with retry logic.
"""

import os
import sqlite3
import time
from contextlib import contextmanager

import libsql_experimental as libsql
import pandas as pd
from dagster import get_dagster_logger

from anomstack.df.utils import generate_insert_sql
from anomstack.sql.utils import get_columns_from_sql

MAX_RETRIES = 5
RETRY_DELAY = 1


def read_sql_sqlite(sql: str) -> pd.DataFrame:
def get_sqlite_path() -> str:
"""
Returns the path to the SQLite (or Turso) database,
creating directories if needed.
By default, uses the env var ANOMSTACK_SQLITE_PATH,
or falls back to "tmpdata/anomstack-sqlite.db".
"""
Read data from SQLite with retry logic.
default_path = "tmpdata/anomstack-sqlite.db"
path = os.environ.get("ANOMSTACK_SQLITE_PATH", default_path)
# If not a Turso URI, create directories for local DB path
if not path.endswith("turso.io"):
os.makedirs(os.path.dirname(path), exist_ok=True)
return path


def get_conn(sqlite_path: str) -> libsql.Connection:
"""
Get a connection to the SQLite or Turso database.
If the path ends with 'turso.io', it uses the
ANOMSTACK_TURSO_DATABASE_URL and ANOMSTACK_TURSO_AUTH_TOKEN
environment variables for authentication.
Otherwise, it connects to a local SQLite database.
Args:
sql (str): The SQL query to execute.
sqlite_path (str): The path or URL of the database.
Returns:
pd.DataFrame: The result of the SQL query as a pandas DataFrame.
libsql.Connection: The connection object.
"""
logger = get_dagster_logger()
sqlite_path = os.environ.get("ANOMSTACK_SQLITE_PATH", "tmpdata/anomstack-sqlite.db")
logger.info(f"sqlite_path: {sqlite_path}")
os.makedirs(os.path.dirname(sqlite_path), exist_ok=True)
if sqlite_path.endswith("turso.io"):
url = os.environ.get("ANOMSTACK_TURSO_DATABASE_URL", None)
auth_token = os.environ.get("ANOMSTACK_TURSO_AUTH_TOKEN", None)
return libsql.connect(sqlite_path, sync_url=url, auth_token=auth_token)
else:
return libsql.connect(sqlite_path)


def with_sqlite_retry(action, logger=None, max_retries=MAX_RETRIES, retry_delay=RETRY_DELAY):
"""
Executes a callable with retry logic if the database is locked.
Args:
action (callable): A zero-argument function that performs the DB action and returns a value.
logger (Logger, optional): Logger for logging warnings/errors. Defaults to None.
max_retries (int, optional): Maximum number of retries. Defaults to MAX_RETRIES.
retry_delay (float, optional): Delay in seconds between retries. Defaults to RETRY_DELAY.
attempt = 0
while attempt < MAX_RETRIES:
Returns:
The result of 'action' if successful.
Raises:
Exception: If the database remains locked after all retries or another error occurs.
"""
for attempt in range(max_retries):
try:
conn = sqlite3.connect(sqlite_path)
df = pd.read_sql_query(sql, conn)
conn.close()
return df
except sqlite3.OperationalError as e:
return action()
except Exception as e:
if "database is locked" in str(e):
attempt += 1
logger.warning(
f"Database is locked; attempt {attempt} of {MAX_RETRIES}. "
f"Retrying in {RETRY_DELAY} seconds..."
)
time.sleep(RETRY_DELAY)
if logger:
logger.warning(
f"Database is locked; attempt {attempt + 1} of {max_retries}. "
f"Retrying in {retry_delay} seconds..."
)
time.sleep(retry_delay)
else:
logger.error(f"Error reading from SQLite: {e}")
if logger:
logger.error(f"Error during DB action: {e}")
raise
finally:
if 'conn' in locals():
conn.close()
raise Exception("Database is locked after multiple attempts.")


# If all retries fail, raise an error
raise sqlite3.OperationalError("Database is locked after multiple attempts.")
@contextmanager
def sqlite_connection():
"""
Context manager that yields a DB connection, ensuring it is closed on exit.
"""
path = get_sqlite_path()
conn = get_conn(path)
yield conn


def infer_sqlite_type(dtype) -> str:
"""
Map pandas dtypes to SQLite types.
Args:
dtype: A pandas dtype (e.g. df.dtypes[col]).
Returns:
str: The corresponding SQLite type name.
"""
if pd.api.types.is_integer_dtype(dtype):
return "INTEGER"
elif pd.api.types.is_float_dtype(dtype):
return "REAL"
elif pd.api.types.is_datetime64_any_dtype(dtype):
return "TEXT"
else:
return "TEXT"


def generate_create_table_sql(df: pd.DataFrame, table_name: str) -> str:
"""
Generate the CREATE TABLE statement for a given DataFrame.
Args:
df (pd.DataFrame): The DataFrame whose columns are used to infer table schema.
table_name (str): The name of the table.
Returns:
str: The CREATE TABLE SQL statement.
"""
column_defs = [
f"{col} {infer_sqlite_type(dtype)}"
for col, dtype in zip(df.columns, df.dtypes)
]
return f"CREATE TABLE IF NOT EXISTS {table_name} ({', '.join(column_defs)});"


def read_sql_sqlite(sql: str) -> pd.DataFrame:
"""
Read data from SQLite (or Turso) with retry logic.
Args:
sql (str): The SQL query to execute.
Returns:
pd.DataFrame: The result of the SQL query as a pandas DataFrame.
"""
logger = get_dagster_logger()
logger.info(f"Reading from DB path: {get_sqlite_path()}")

def _action():
with sqlite_connection() as conn:
cursor = conn.execute(sql)
rows = cursor.fetchall()
columns = [desc[0] for desc in cursor.description] if cursor.description else get_columns_from_sql(sql)
return pd.DataFrame(rows, columns=columns)

return with_sqlite_retry(_action, logger=logger)


def save_df_sqlite(df: pd.DataFrame, table_key: str) -> pd.DataFrame:
"""
Save df to db with retry logic.
Save a DataFrame to the database (SQLite or Turso) with retry logic.
Args:
df (pd.DataFrame): The DataFrame to save.
Expand All @@ -66,35 +173,24 @@ def save_df_sqlite(df: pd.DataFrame, table_key: str) -> pd.DataFrame:
pd.DataFrame: The input DataFrame.
"""
logger = get_dagster_logger()
sqlite_path = os.environ.get("ANOMSTACK_SQLITE_PATH", "tmpdata/anomstack-sqlite.db")
logger.info(f"sqlite_path: {sqlite_path}")
os.makedirs(os.path.dirname(sqlite_path), exist_ok=True)
logger.info(f"Saving DataFrame to DB path: {get_sqlite_path()}")

def _action():
with sqlite_connection() as conn:
create_table_sql = generate_create_table_sql(df, table_key)
conn.execute(create_table_sql)
insert_sqls = generate_insert_sql(df, table_key)
for ins_sql in insert_sqls:
conn.execute(ins_sql)
conn.commit()
return df

attempt = 0
while attempt < MAX_RETRIES:
try:
conn = sqlite3.connect(sqlite_path)
df.to_sql(table_key, conn, if_exists='append', index=False)
conn.close()
return df
except sqlite3.OperationalError as e:
if "database is locked" in str(e):
attempt += 1
logger.warning(
f"Database is locked; attempt {attempt} of {MAX_RETRIES}. "
f"Retrying in {RETRY_DELAY} seconds..."
)
time.sleep(RETRY_DELAY)
else:
logger.error(f"Error saving DataFrame to SQLite: {e}")
raise
# If all retries fail, raise an error
raise sqlite3.OperationalError("Database is locked after multiple attempts.")
return with_sqlite_retry(_action, logger=logger)


def run_sql_sqlite(sql: str) -> None:
"""
Execute a non-returning SQL statement in SQLite with retry logic.
Execute a non-returning SQL statement (e.g. CREATE, INSERT, UPDATE, DELETE) with retry logic.
Args:
sql (str): The SQL statement to execute.
Expand All @@ -103,34 +199,11 @@ def run_sql_sqlite(sql: str) -> None:
None
"""
logger = get_dagster_logger()
sqlite_path = os.environ.get("ANOMSTACK_SQLITE_PATH", "tmpdata/anomstack-sqlite.db")
logger.info(f"sqlite_path: {sqlite_path}")
os.makedirs(os.path.dirname(sqlite_path), exist_ok=True)
logger.info(f"Executing SQL against DB path: {get_sqlite_path()}")

attempt = 0
while attempt < MAX_RETRIES:
try:
conn = sqlite3.connect(sqlite_path)
cursor = conn.cursor()
cursor.execute(sql)
def _action():
with sqlite_connection() as conn:
conn.execute(sql)
conn.commit()
cursor.close()
conn.close()
return
except sqlite3.OperationalError as e:
if "database is locked" in str(e):
attempt += 1
logger.warning(
f"Database is locked; attempt {attempt} of {MAX_RETRIES}. "
f"Retrying in {RETRY_DELAY} seconds..."
)
time.sleep(RETRY_DELAY)
else:
logger.error(f"Error executing SQL statement: {e}")
raise
finally:
if 'conn' in locals():
conn.close()

# If all retries fail, raise an error
raise sqlite3.OperationalError("Database is locked after multiple attempts.")
with_sqlite_retry(_action, logger=logger)
23 changes: 23 additions & 0 deletions anomstack/sql/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from typing import List

import sqlglot
import sqlglot.expressions as exp


def get_columns_from_sql(sql: str) -> List[str]:
"""
Get the columns from a SQL query.
Args:
sql (str): The SQL query to extract columns from.
Returns:
List[str]: The columns in the SQL query.
"""
columns = []
for expression in sqlglot.parse_one(sql).find(exp.Select).args["expressions"]:
if isinstance(expression, exp.Alias):
columns.append(expression.text("alias"))
elif isinstance(expression, exp.Column):
columns.append(expression.text("this"))
return columns
1 change: 0 additions & 1 deletion dev.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#%%



#%%
2 changes: 1 addition & 1 deletion metrics/defaults/defaults.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# default values to be applied to all batches unless overridden in metric batch specific yaml files.
db: "duckdb" # database type to use.
db: "sqlite" # database type to use.
table_key: "metrics" # table to store metrics in.

############################################
Expand Down
Loading

0 comments on commit 6633c7b

Please sign in to comment.