Skip to content

Commit

Permalink
fix: db state read/write management
Browse files Browse the repository at this point in the history
  • Loading branch information
Evan-Kim2028 committed Nov 22, 2024
1 parent 319769e commit d205231
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 17 deletions.
37 changes: 24 additions & 13 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from hypermanager.manager import HyperManager
from hypermanager.protocols.mev_commit import mev_commit_config, mev_commit_validator_config
from dotenv import load_dotenv
import psycopg

from pipeline.db import write_events_to_timescale, get_max_block_number, DatabaseConnection
from pipeline.queries import fetch_event_for_config
Expand Down Expand Up @@ -83,7 +84,6 @@ async def process_batch(conn, manager, configs):
async def main():
"""Main function to continuously fetch and store event data"""
logger.info("Starting TimescaleDB pipeline")

retry_count = 0
max_retries = 3
retry_delay = 5 # seconds
Expand All @@ -95,18 +95,27 @@ async def main():
conn = db.get_connection()
view_manager = MaterializedViewManager(conn)

# # Merge staked tables
with db.autocommit():
view_manager.merge_staked_columns()

# First create the consolidated view
with db.autocommit():
if not view_manager.create_openedcommitments_consolidated_view():
logger.warning("Failed to create openedcommitments consolidated view - will retry in next iteration")

# Then try to create the materialized view
if not view_manager.create_preconf_txs_view():
logger.warning("Failed to create preconf_txs materialized view - will retry in next iteration")
# Create the consolidated view with explicit creation
try:
if not view_manager.create_openedcommitments_consolidated_view():
logger.warning(
"Failed to create openedcommitments consolidated view - will retry in next iteration")
else:
logger.info(
"Successfully created or verified openedcommitments consolidated view")

# Create the materialized view with explicit creation
if not view_manager.create_preconf_txs_view():
logger.warning(
"Failed to create preconf_txs materialized view - will retry in next iteration")
else:
logger.info(
"Successfully created or verified preconf_txs materialized view")
except Exception as e:
logger.error(f"Error creating views: {str(e)}")

async with get_manager("https://mev-commit.hypersync.xyz") as mev_commit_manager, \
get_manager("https://holesky.hypersync.xyz") as holesky_manager:
Expand All @@ -128,7 +137,9 @@ async def main():
list(mev_commit_validator_config.values())
)

# Refresh views in autocommit mode
# Refresh views in autocommit mode - ensure clean state first
if conn.info.transaction_status == psycopg.pq.TransactionStatus.INTRANS:
conn.commit()
with db.autocommit():
view_manager.refresh_all_views()

Expand All @@ -138,6 +149,8 @@ async def main():
except Exception as e:
logger.error(f"Cycle error: {str(e)}", exc_info=True)
# Don't exit the main loop for individual cycle errors
if conn.info.transaction_status == psycopg.pq.TransactionStatus.INTRANS:
conn.rollback()

logger.info(
"Completed fetch cycle, waiting for next iteration")
Expand All @@ -147,14 +160,12 @@ async def main():
retry_count += 1
logger.error(f"Main loop error (attempt {retry_count}/{max_retries}): {str(e)}",
exc_info=True)

if retry_count < max_retries:
logger.info(f"Retrying in {retry_delay} seconds...")
await asyncio.sleep(retry_delay)
else:
logger.error("Max retries reached, exiting...")
break

finally:
if db:
try:
Expand Down
10 changes: 8 additions & 2 deletions pipeline/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class DatabaseConnection:
def __init__(self, db_params):
self.db_params = db_params
self.conn = None
self.connect()

def connect(self):
if not self.conn or self.conn.closed:
Expand All @@ -57,13 +58,18 @@ def transaction(self):
@contextmanager
def autocommit(self):
"""Context manager for autocommit operations"""
if self.conn.info.transaction_status == psycopg.pq.TransactionStatus.INTRANS:
self.conn.commit() # Commit any pending transaction

original_autocommit = self.conn.autocommit
try:
self.conn.commit() # Commit any existing transaction
self.conn.autocommit = True
yield self.conn
finally:
self.conn.autocommit = original_autocommit
if not self.conn.closed:
if self.conn.info.transaction_status == psycopg.pq.TransactionStatus.INTRANS:
self.conn.commit()
self.conn.autocommit = original_autocommit


def escape_column_name(name: str) -> str:
Expand Down
22 changes: 20 additions & 2 deletions pipeline/materialized_views.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import psycopg
import logging
from contextlib import contextmanager

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -338,14 +339,16 @@ def create_openedcommitments_consolidated_view(self) -> bool:
""")

# Initial refresh of the materialized view
cur.execute("REFRESH MATERIALIZED VIEW openedcommitmentstoredall;")
cur.execute(
"REFRESH MATERIALIZED VIEW openedcommitmentstoredall;")

logger.info(
"Successfully created openedcommitmentstoredall materialized view")
return True

except Exception as e:
logger.error(f"Error creating openedcommitmentstoredall materialized view: {e}")
logger.error(
f"Error creating openedcommitmentstoredall materialized view: {e}")
return False
finally:
if self.conn.autocommit:
Expand Down Expand Up @@ -407,3 +410,18 @@ def merge_staked_columns(self) -> None:
except Exception as e:
logger.error(f"Error merging staked tables: {e}")
self.conn.rollback()

@contextmanager
def autocommit(self):
"""Context manager for autocommit operations"""
if self.conn.info.transaction_status == psycopg.pq.TransactionStatus.INTRANS:
self.conn.commit() # Commit any pending transaction
original_autocommit = self.conn.autocommit
try:
self.conn.autocommit = True
yield self.conn
finally:
if not self.conn.closed:
if self.conn.info.transaction_status == psycopg.pq.TransactionStatus.INTRANS:
self.conn.commit()
self.conn.autocommit = original_autocommit

0 comments on commit d205231

Please sign in to comment.