-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
185 lines (158 loc) · 6.93 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
import asyncio
from contextlib import asynccontextmanager
import logging
import os
import polars as pl
from hypermanager.manager import HyperManager
from hypermanager.protocols.mev_commit import mev_commit_config, mev_commit_validator_config
from dotenv import load_dotenv
import psycopg
from pipeline.db import write_events_to_timescale, get_max_block_number, DatabaseConnection
from pipeline.queries import fetch_event_for_config
from pipeline.materialized_views import MaterializedViewManager
# Configure logging
logging.basicConfig(
format="%(asctime)s [%(levelname)s] %(message)s",
level=logging.INFO
)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
# Database connection parameters
DB_PARAMS = {
'dbname': os.getenv('DB_NAME'),
'user': os.getenv('DB_USER'),
'password': os.getenv('DB_PASSWORD'),
'host': os.getenv('DB_HOST', 'localhost'),
'port': os.getenv('DB_PORT', '5432')
}
@asynccontextmanager
async def get_manager(endpoint: str = "https://mev-commit.hypersync.xyz"):
"""Context manager for HyperManager"""
manager = HyperManager(endpoint)
try:
yield manager
finally:
pass
async def process_event_config(conn, manager, config, start_block: int = 0):
"""Process a single event configuration."""
try:
max_block = get_max_block_number(conn, config.name)
current_block = max(max_block, start_block)
logger.info(f"Processing {config.name} starting from block {
current_block}")
df: pl.DataFrame = await fetch_event_for_config(
manager=manager,
base_event_config=config,
block_number=current_block+1
)
if df is not None and not df.is_empty():
logger.info(f"Fetched {len(df)} rows for {config.name}")
write_events_to_timescale(conn, df, config.name)
logger.info(f"Successfully wrote data to table {config.name}")
else:
logger.info(f"No new data to write for {config.name}")
except Exception as e:
logger.error(f"Error processing {config.name}: {
str(e)}", exc_info=True)
raise # Re-raise to handle in main loop
async def process_batch(conn, manager, configs):
"""Process a batch of configurations with proper error handling."""
try:
tasks = [
process_event_config(conn, manager, config)
for config in configs
]
await asyncio.gather(*tasks, return_exceptions=True)
except Exception as e:
logger.error(f"Batch processing error: {str(e)}", exc_info=True)
async def main():
"""Main function to continuously fetch and store event data"""
logger.info("Starting TimescaleDB pipeline")
retry_count = 0
max_retries = 3
retry_delay = 5 # seconds
while retry_count < max_retries:
db = None
try:
db = DatabaseConnection(DB_PARAMS)
conn = db.get_connection()
view_manager = MaterializedViewManager(conn)
with db.autocommit():
view_manager.merge_staked_columns()
# Create the consolidated view with explicit creation
try:
if not view_manager.create_openedcommitments_consolidated_view():
logger.warning(
"Failed to create openedcommitments consolidated view - will retry in next iteration")
else:
logger.info(
"Successfully created or verified openedcommitments consolidated view")
# Create the materialized view with explicit creation
if not view_manager.create_preconf_txs_view():
logger.warning(
"Failed to create preconf_txs materialized view - will retry in next iteration")
else:
logger.info(
"Successfully created or verified preconf_txs materialized view")
except Exception as e:
logger.error(f"Error creating views: {str(e)}")
async with get_manager("https://mev-commit.hypersync.xyz") as mev_commit_manager, \
get_manager("https://holesky.hypersync.xyz") as holesky_manager:
while True:
logger.info("Starting new fetch cycle")
try:
# Process events in transaction mode
with db.transaction():
await process_batch(
conn,
mev_commit_manager,
list(mev_commit_config.values())
)
# Process validator events
await process_batch(
conn,
holesky_manager,
list(mev_commit_validator_config.values())
)
# Refresh views in autocommit mode - ensure clean state first
if conn.info.transaction_status == psycopg.pq.TransactionStatus.INTRANS:
conn.commit()
with db.autocommit():
view_manager.refresh_all_views()
# Reset retry count on successful iteration
retry_count = 0
except Exception as e:
logger.error(f"Cycle error: {str(e)}", exc_info=True)
# Don't exit the main loop for individual cycle errors
if conn.info.transaction_status == psycopg.pq.TransactionStatus.INTRANS:
conn.rollback()
logger.info(
"Completed fetch cycle, waiting for next iteration")
await asyncio.sleep(30)
except Exception as e:
retry_count += 1
logger.error(f"Main loop error (attempt {retry_count}/{max_retries}): {str(e)}",
exc_info=True)
if retry_count < max_retries:
logger.info(f"Retrying in {retry_delay} seconds...")
await asyncio.sleep(retry_delay)
else:
logger.error("Max retries reached, exiting...")
break
finally:
if db:
try:
db.close()
logger.info("Database connection closed")
except Exception as e:
logger.error(
f"Error closing database connection: {str(e)}")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Received shutdown signal, closing gracefully...")
except Exception as e:
logger.critical(f"Fatal error: {str(e)}", exc_info=True)
exit(1)