Skip to content

Commit

Permalink
feat: add materialized views to merge staking and opened commitments …
Browse files Browse the repository at this point in the history
…events
  • Loading branch information
Evan-Kim2028 committed Nov 21, 2024
1 parent ac62929 commit e75d900
Show file tree
Hide file tree
Showing 6 changed files with 307 additions and 40 deletions.
14 changes: 10 additions & 4 deletions fetch_l1_txs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from dotenv import load_dotenv
from pipeline.queries import fetch_txs
from pipeline.db import DatabaseConnection, write_events_to_timescale, get_max_block_number
import psycopg # Import psycopg to access specific exceptions

# Configure logging
logging.basicConfig(
Expand Down Expand Up @@ -47,17 +48,21 @@ def get_transaction_hashes(db: DatabaseConnection) -> list[str]:
conn = db.get_connection()
max_block = get_max_block_number(conn, "l1transactions")
print(f'max block number: {max_block}')

with conn.cursor() as cursor:
query = """
SELECT txnhash
FROM openedcommitmentstored
FROM openedcommitmentstoredall
WHERE blocknumber > %s
"""
cursor.execute(query, (max_block,))
results = cursor.fetchall()

return [row[0] for row in results] if results else []
except psycopg.errors.UndefinedTable as e:
logger.warning(
f"Table 'openedcommitmentstoredall' does not exist yet: {e}")
return []
except Exception as e:
logger.error(f"Error fetching transaction hashes: {e}", exc_info=True)
return []
Expand Down Expand Up @@ -101,7 +106,7 @@ async def main():
try:
# Create database connection
db = DatabaseConnection(DB_PARAMS)

async with get_manager():
while True:
logger.info("Starting new fetch cycle")
Expand All @@ -112,7 +117,8 @@ async def main():
# 2. Process and store L1 transactions
await process_l1_transactions(db, tx_hashes)

logger.info("Completed fetch cycle, waiting for next iteration")
logger.info(
"Completed fetch cycle, waiting for next iteration")
await asyncio.sleep(30)

except KeyboardInterrupt:
Expand Down
40 changes: 29 additions & 11 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import os
import polars as pl
from hypermanager.manager import HyperManager
from hypermanager.protocols.mev_commit import mev_commit_config
from hypermanager.protocols.mev_commit import mev_commit_config, mev_commit_validator_config
from dotenv import load_dotenv

from pipeline.db import write_events_to_timescale, get_max_block_number, DatabaseConnection
Expand Down Expand Up @@ -32,13 +32,13 @@


@asynccontextmanager
async def get_manager():
async def get_manager(endpoint: str = "https://mev-commit.hypersync.xyz"):
"""Context manager for HyperManager"""
manager = HyperManager("https://mev-commit.hypersync.xyz")
manager = HyperManager(endpoint)
try:
yield manager
finally:
await manager.close() # Add close method if available
pass


async def process_event_config(conn, manager, config, start_block: int = 0):
Expand Down Expand Up @@ -95,24 +95,42 @@ async def main():
conn = db.get_connection()
view_manager = MaterializedViewManager(conn)

# Initialize materialized views
# # Merge staked tables
with db.autocommit():
view_manager.merge_staked_columns()

# First create the consolidated view
with db.autocommit():
if not view_manager.create_openedcommitments_consolidated_view():
logger.warning("Failed to create openedcommitments consolidated view - will retry in next iteration")

# Then try to create the materialized view
if not view_manager.create_preconf_txs_view():
logger.error(
"Failed to create preconf_txs materialized view")
return
logger.warning("Failed to create preconf_txs materialized view - will retry in next iteration")

async with get_manager() as manager:
async with get_manager("https://mev-commit.hypersync.xyz") as mev_commit_manager, \
get_manager("https://holesky.hypersync.xyz") as holesky_manager:
while True:
logger.info("Starting new fetch cycle")
try:
# Process events in transaction mode
with db.transaction():
await process_batch(conn, manager, mev_commit_config.values())
await process_batch(
conn,
mev_commit_manager,
list(mev_commit_config.values())
)

# Process validator events
await process_batch(
conn,
holesky_manager,
list(mev_commit_validator_config.values())
)

# Refresh views in autocommit mode
with db.autocommit():
view_manager.refresh_materialized_views()
view_manager.refresh_all_views()

# Reset retry count on successful iteration
retry_count = 0
Expand Down
66 changes: 52 additions & 14 deletions pipeline/db.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import polars as pl
import psycopg
from typing import Dict, Tuple
from typing import Dict, Tuple, List
from contextlib import contextmanager


RESERVED_KEYWORDS = {
'window', 'user', 'order', 'group', 'default', 'check', 'index',
'primary', 'foreign', 'references', 'constraint', 'select', 'where',
Expand Down Expand Up @@ -76,15 +75,25 @@ def escape_column_name(name: str) -> str:
return name


def get_primary_key_columns(table_name: str) -> List[str]:
"""
Return primary key columns based on the table name.
"""
if table_name in ['staked', 'staked_old']:
return ['block_number', 'valblspubkey']
else:
return ['block_number', 'hash']


def convert_schema_to_timescale(schema: Dict[str, pl.DataType]) -> list[Tuple[str, str]]:
"""
Convert Polars schema to TimescaleDB column definitions.
"""
def get_timescale_type(name: str, pl_dtype: pl.DataType) -> str:
# Special handling for block_number
if name.lower() == 'block_number':
return "BIGINT"

elif name.lower() in ['hash', 'valblspubkey']:
return "TEXT"
if isinstance(pl_dtype, pl.Datetime):
return "TIMESTAMPTZ"
elif isinstance(pl_dtype, pl.Int64):
Expand All @@ -109,6 +118,16 @@ def create_or_update_event_table(conn: psycopg.Connection, table_name: str, df:
table_name = normalize_table_name(table_name)
schema_columns = convert_schema_to_timescale(df.schema)

primary_key_columns = get_primary_key_columns(table_name)
missing_columns = [
col for col in primary_key_columns if col not in df.columns]
if missing_columns:
raise ValueError(f"Missing required primary key columns {
missing_columns} in table {table_name}")

primary_key = ', '.join(escape_column_name(col)
for col in primary_key_columns)

with conn.cursor() as cur:
try:
# Check if table exists
Expand All @@ -126,7 +145,7 @@ def create_or_update_event_table(conn: psycopg.Connection, table_name: str, df:
create_query = f"""
CREATE TABLE {table_name} (
{', '.join(columns)},
PRIMARY KEY (block_number)
PRIMARY KEY ({primary_key})
)
"""
cur.execute(create_query)
Expand Down Expand Up @@ -156,10 +175,7 @@ def create_or_update_event_table(conn: psycopg.Connection, table_name: str, df:
if clean_name not in existing_columns:
cur.execute(f"ALTER TABLE {
table_name} ADD COLUMN {name} {dtype}")
# Optionally, you could check if the data type matches and alter if needed
# else:
# if existing_columns[clean_name].upper() != dtype.upper():
# cur.execute(f"ALTER TABLE {table_name} ALTER COLUMN {name} TYPE {dtype}")
# Optionally, check if the data type matches and alter if needed

conn.commit()
except Exception as e:
Expand Down Expand Up @@ -216,16 +232,35 @@ def write_events_to_timescale(conn: psycopg.Connection, df: pl.DataFrame, table_
# Normalize column names in the DataFrame
df = df.rename({col: normalize_column_name(col) for col in df.columns})

# Ensure block_number is the correct type before creating/updating table
if 'block_number' in df.columns:
df = df.with_columns(pl.col('block_number').cast(pl.Int64))
# Ensure required columns are correctly typed
# First, ensure block_number is Int64
df = df.with_columns(pl.col('block_number').cast(pl.Int64))

# Get primary key columns
primary_key_columns = get_primary_key_columns(table_name)

# Ensure primary key columns are present
missing_columns = [
col for col in primary_key_columns if col not in df.columns]
if missing_columns:
raise ValueError(f"Missing required primary key columns {
missing_columns} in table {table_name}")

# Cast primary key columns to appropriate types
for col in primary_key_columns:
if col == 'block_number':
df = df.with_columns(pl.col('block_number').cast(pl.Int64))
elif col == 'hash':
df = df.with_columns(pl.col('hash').cast(pl.Utf8))
elif col == 'valblspubkey':
df = df.with_columns(pl.col('valblspubkey').cast(pl.Utf8))

create_or_update_event_table(conn, table_name, df)

# Convert UInt64 columns to strings to handle large numbers
schema = df.schema
uint64_cols = [name for name, dtype in schema.items()
if isinstance(dtype, pl.UInt64) and name.lower() != 'block_number']
if isinstance(dtype, pl.UInt64) and name.lower() not in primary_key_columns]

if uint64_cols:
df = df.with_columns([
Expand All @@ -239,12 +274,15 @@ def write_events_to_timescale(conn: psycopg.Connection, df: pl.DataFrame, table_
# Convert DataFrame to records
records = [tuple(row) for row in df.rows()]

conflict_columns = ', '.join(escape_column_name(col)
for col in primary_key_columns)

with conn.cursor() as cur:
cur.executemany(f"""
INSERT INTO {table_name}
({', '.join(columns)})
VALUES ({placeholders})
ON CONFLICT (block_number) DO NOTHING
ON CONFLICT ({conflict_columns}) DO NOTHING
""", records)
conn.commit()
except Exception as e:
Expand Down
Loading

0 comments on commit e75d900

Please sign in to comment.