Skip to content

Commit

Permalink
Remove database_session from tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
moisses89 committed Jan 15, 2025
1 parent f560653 commit e763c2e
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 64 deletions.
2 changes: 1 addition & 1 deletion app/datasources/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,5 +311,5 @@ async def get_proxy_contracts(cls, session: AsyncSession):
"""
query = select(cls).where(cls.implementation != None) # noqa: E711
result = await session.stream(query)
async for contract in result:
async for (contract,) in result:
yield contract
4 changes: 2 additions & 2 deletions app/tests/datasources/db/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,9 @@ async def test_get_proxy_contracts(self, session: AsyncSession):
self.assertTrue(result)
async for proxy_contract in Contract.get_proxy_contracts(session):
self.assertEqual(
fast_to_checksum_address(proxy_contract[0].address), random_address
fast_to_checksum_address(proxy_contract.address), random_address
)
self.assertEqual(
fast_to_checksum_address(proxy_contract[0].implementation),
fast_to_checksum_address(proxy_contract.implementation),
"0x43506849D7C04F9138D1A2050bbF3A0c054402dd",
)
122 changes: 61 additions & 61 deletions app/workers/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from sqlmodel.ext.asyncio.session import AsyncSession

from app.config import settings
from app.datasources.db.database import database_session
from app.datasources.db.database import get_engine
from app.datasources.db.models import Contract
from app.services.contract_metadata_service import get_contract_metadata_service

Expand Down Expand Up @@ -38,77 +38,77 @@ async def test_task(message: str) -> None:


@dramatiq.actor
@database_session
async def get_contract_metadata_task(
session: AsyncSession,
address: str,
chain_id: int,
skip_attemp_download: bool = False,
) -> None:
contract_metadata_service = get_contract_metadata_service()
# Just try the first time, following retries should be scheduled
if skip_attemp_download or await contract_metadata_service.should_attempt_download(
session, address, chain_id, 0
):
logger.info(
"Downloading contract metadata for contract=%s and chain=%s",
address,
chain_id,
)
contract_metadata = await contract_metadata_service.get_contract_metadata(
fast_to_checksum_address(address), chain_id
)
result = await contract_metadata_service.process_contract_metadata(
session, contract_metadata
)
if result:
logger.info(
"Success download contract metadata for contract=%s and chain=%s",
address,
chain_id,
address: str, chain_id: int, skip_attemp_download: bool = False
):
async with AsyncSession(get_engine(), expire_on_commit=False) as session:
contract_metadata_service = get_contract_metadata_service()
# Just try the first time, following retries should be scheduled
if (
skip_attemp_download
or await contract_metadata_service.should_attempt_download(
session, address, chain_id, 0
)
else:
logger.info(
"Failed to download contract metadata for contract=%s and chain=%s",
address,
chain_id,
)

if proxy_implementation_address := contract_metadata_service.get_proxy_implementation_address(
contract_metadata
):
logger.info(
"Adding task to download proxy implementation metadata from address=%s for contract=%s and chain=%s",
proxy_implementation_address,
"Downloading contract metadata for contract=%s and chain=%s",
address,
chain_id,
)
get_contract_metadata_task.send(
address=proxy_implementation_address, chain_id=chain_id
contract_metadata = await contract_metadata_service.get_contract_metadata(
fast_to_checksum_address(address), chain_id
)
result = await contract_metadata_service.process_contract_metadata(
session, contract_metadata
)
else:
logger.debug("Skipping contract=%s and chain=%s", address, chain_id)
if result:
logger.info(
"Success download contract metadata for contract=%s and chain=%s",
address,
chain_id,
)
else:
logger.info(
"Failed to download contract metadata for contract=%s and chain=%s",
address,
chain_id,
)

if proxy_implementation_address := contract_metadata_service.get_proxy_implementation_address(
contract_metadata
):
logger.info(
"Adding task to download proxy implementation metadata from address=%s for contract=%s and chain=%s",
proxy_implementation_address,
address,
chain_id,
)
get_contract_metadata_task.send(
address=proxy_implementation_address, chain_id=chain_id
)
else:
logger.debug("Skipping contract=%s and chain=%s", address, chain_id)


@dramatiq.actor(periodic=cron("0 0 * * *")) # Every midnight
@database_session
async def get_missing_contract_metadata_task(session: AsyncSession) -> None:
async for contract in Contract.get_contracts_without_abi(
session, settings.CONTRACT_MAX_DOWNLOAD_RETRIES
):
get_contract_metadata_task.send(
address=HexBytes(contract[0].address).hex(),
chain_id=contract[0].chain_id,
skip_attemp_download=True,
)
async def get_missing_contract_metadata_task():
async with AsyncSession(get_engine(), expire_on_commit=False) as session:
async for contract in Contract.get_contracts_without_abi(
session, settings.CONTRACT_MAX_DOWNLOAD_RETRIES
):
get_contract_metadata_task.send(
address=HexBytes(contract[0].address).hex(),
chain_id=contract[0].chain_id,
skip_attemp_download=True,
)


@dramatiq.actor(periodic=cron("0 5 * * *")) # Every day at 5 am
@database_session
async def update_proxies_task(session: AsyncSession) -> None:
async for proxy_contract in Contract.get_proxy_contracts(session):
get_contract_metadata_task.send(
address=HexBytes(proxy_contract[0].address).hex(),
chain_id=proxy_contract[0].chain_id,
skip_attemp_download=True,
)
async def update_proxies_task():
async with AsyncSession(get_engine(), expire_on_commit=False) as session:
async for proxy_contract in Contract.get_proxy_contracts(session):
get_contract_metadata_task.send(
address=HexBytes(proxy_contract.address).hex(),
chain_id=proxy_contract.chain_id,
skip_attemp_download=True,
)

0 comments on commit e763c2e

Please sign in to comment.