Skip to content

Commit

Permalink
Ingest and scheduled task fixes unrelated to branch name.
Browse files Browse the repository at this point in the history
  • Loading branch information
barrycarey committed Jan 22, 2024
1 parent d6a4a42 commit 5b9efd3
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 79 deletions.
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ services:
environment:
- RUN_ENV=production
- db_user=ingest
- LOG_LEVEL=ERROR
- LOG_LEVEL=INFO
- CELERY_IMPORTS=redditrepostsleuth.core.celery.tasks.ingest_tasks
entrypoint: celery -A redditrepostsleuth.core.celery worker -Q post_ingest -n ingest_worker --autoscale=3,16
entrypoint: celery -A redditrepostsleuth.core.celery worker -Q post_ingest -n ingest_worker --autoscale=16,1

link_repost_worker:
container_name: link-repost-worker
Expand Down
59 changes: 33 additions & 26 deletions redditrepostsleuth/core/celery/task_logic/scheduled_task_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,35 +57,42 @@ def update_proxies(uowm: UnitOfWorkManager) -> None:
)
uow.commit()

def update_top_reposts(uowm: UnitOfWorkManager):
def update_top_reposts(uow: UnitOfWork, post_type_id: int, day_range: int = None):
# reddit.info(reddit_ids_to_lookup):
post_types = [2, 3]
day_ranges = [1, 7, 14, 30, 365, None]
log.info('Getting top repostors for post type %s with range %s', post_type_id, day_range)
range_query = "SELECT repost_of_id, COUNT(*) c FROM repost WHERE detected_at > NOW() - INTERVAL :days DAY AND post_type_id=:posttype GROUP BY repost_of_id HAVING c > 5 ORDER BY c DESC"
all_time_query = "SELECT repost_of_id, COUNT(*) c FROM repost WHERE post_type_id=:posttype GROUP BY repost_of_id HAVING c > 5 ORDER BY c DESC"
with uowm.start() as uow:
for post_type in post_types:
for days in day_ranges:
log.info('Getting top reposts for post type %s with range %s', post_type, days)
if days:
query = range_query
else:
query = all_time_query
uow.session.execute(
text('DELETE FROM stat_top_repost WHERE post_type_id=:posttype AND day_range=:days'),
{'posttype': post_type, 'days': days})
uow.commit()
result = uow.session.execute(text(query), {'posttype': post_type, 'days': days})
for row in result:
stat = StatsTopRepost()
stat.post_id = row[0]
stat.post_type_id = post_type
stat.day_range = days
stat.repost_count = row[1]
stat.updated_at = func.utc_timestamp()
stat.nsfw = False
uow.stat_top_repost.add(stat)
uow.commit()
if day_range:
query = range_query
uow.session.execute(text('DELETE FROM stat_top_repost WHERE post_type_id=:posttype AND day_range=:days'),
{'posttype': post_type_id, 'days': day_range})
else:
query = all_time_query
uow.session.execute(text('DELETE FROM stat_top_repost WHERE post_type_id=:posttype AND day_range IS NULL'),
{'posttype': post_type_id})

uow.commit()



result = uow.session.execute(text(query), {'posttype': post_type_id, 'days': day_range})
for row in result:
stat = StatsTopRepost()
stat.post_id = row[0]
stat.post_type_id = post_type_id
stat.day_range = day_range
stat.repost_count = row[1]
stat.updated_at = func.utc_timestamp()
stat.nsfw = False
uow.stat_top_repost.add(stat)
uow.commit()

def run_update_top_reposts(uow: UnitOfWork) -> None:
post_types = [1, 2, 3]
day_ranges = [1, 7, 14, 30, None]
for post_type_id in post_types:
for days in day_ranges:
update_top_reposts(uow, post_type_id, days)

def update_top_reposters(uow: UnitOfWork, post_type_id: int, day_range: int = None) -> None:
log.info('Getting top repostors for post type %s with range %s', post_type_id, day_range)
Expand Down
19 changes: 10 additions & 9 deletions redditrepostsleuth/core/celery/tasks/ingest_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@


@celery.task(bind=True, base=SqlAlchemyTask, ignore_reseults=True, serializer='pickle', autoretry_for=(ConnectionError,ImageConversionException,GalleryNotProcessed), retry_kwargs={'max_retries': 10, 'countdown': 300})
def save_new_post(self, submission: dict):
def save_new_post(self, submission: dict, repost_check: bool = True):

# TODO: temp fix until I can fix imgur gifs
if 'imgur' in submission['url'] and 'gifv' in submission['url']:
Expand Down Expand Up @@ -48,21 +48,22 @@ def save_new_post(self, submission: dict):
log.exception('Database save failed: %s', str(e), exc_info=False)
return

if post.post_type_id == 1:
celery.send_task('redditrepostsleuth.core.celery.tasks.repost_tasks.check_for_text_repost_task', args=[post])
elif post.post_type_id == 2:
celery.send_task('redditrepostsleuth.core.celery.tasks.repost_tasks.check_image_repost_save', args=[post])
elif post.post_type_id == 3:
celery.send_task('redditrepostsleuth.core.celery.tasks.repost_tasks.link_repost_check', args=[post])
if repost_check:
if post.post_type_id == 1:
celery.send_task('redditrepostsleuth.core.celery.tasks.repost_tasks.check_for_text_repost_task', args=[post])
elif post.post_type_id == 2:
celery.send_task('redditrepostsleuth.core.celery.tasks.repost_tasks.check_image_repost_save', args=[post])
elif post.post_type_id == 3:
celery.send_task('redditrepostsleuth.core.celery.tasks.repost_tasks.link_repost_check', args=[post])

celery.send_task('redditrepostsleuth.core.celery.admin_tasks.check_user_for_only_fans', args=[post.author])



@celery.task
def save_new_posts(posts: list[dict]) -> None:
def save_new_posts(posts: list[dict], repost_check: bool = True) -> None:
for post in posts:
save_new_post.apply_async((post,))
save_new_post.apply_async((post, repost_check))

@celery.task(bind=True, base=SqlAlchemyTask, ignore_results=True)
def save_pushshift_results(self, data):
Expand Down
9 changes: 8 additions & 1 deletion redditrepostsleuth/core/celery/tasks/scheduled_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from redditrepostsleuth.core.celery import celery
from redditrepostsleuth.core.celery.basetasks import RedditTask, SqlAlchemyTask, AdminTask
from redditrepostsleuth.core.celery.task_logic.scheduled_task_logic import update_proxies, update_top_reposts, \
token_checker, run_update_top_reposters, update_top_reposters, update_monitored_sub_data
token_checker, run_update_top_reposters, update_top_reposters, update_monitored_sub_data, run_update_top_reposts
from redditrepostsleuth.core.db.databasemodels import MonitoredSub, StatsDailyCount
from redditrepostsleuth.core.logging import configure_logger
from redditrepostsleuth.core.util.reddithelpers import is_sub_mod_praw, get_bot_permissions
Expand Down Expand Up @@ -178,6 +178,13 @@ def update_daily_stats(self):
log.exception('Problem updating stats')


@celery.task(bind=True, base=SqlAlchemyTask)
def update_all_top_reposts_task(self):
try:
with self.uowm.start() as uow:
run_update_top_reposts(uow)
except Exception as e:
log.exception('Unknown task error')

@celery.task(bind=True, base=SqlAlchemyTask)
def update_all_top_reposters_task(self):
Expand Down
1 change: 1 addition & 0 deletions redditrepostsleuth/core/db/databasemodels.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def __repr__(self) -> str:
reports = relationship('UserReport', back_populates='post')
hashes = relationship('PostHash', back_populates='post')
post_type = relationship('PostType') # lazy has to be set to JSON encoders don't fail for unbound session
#post_type = relationship('PostType', lazy='joined')

def to_dict(self):
return {
Expand Down
1 change: 1 addition & 0 deletions redditrepostsleuth/core/model/misc_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class JobStatus(Enum):
TIMEOUT = auto()
PROXYERROR = auto()
ERROR = auto()
RATELIMIT = auto()

@dataclass
class BatchedPostRequestJob:
Expand Down
2 changes: 1 addition & 1 deletion redditrepostsleuth/core/services/responsebuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def build_sub_comment(

try:
return self.build_default_comment(search_results, message, **kwargs)
except KeyError:
except KeyError as e:
log.warning('Custom repost template for %s has a bad slug: %s', monitored_sub.name, monitored_sub.repost_response_template)
return self.build_default_comment(search_results, **kwargs)

Expand Down
108 changes: 68 additions & 40 deletions redditrepostsleuth/ingestsvc/ingestsvc.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import json
import os
import time
from asyncio import ensure_future, gather, run, TimeoutError
from asyncio import ensure_future, gather, run, TimeoutError, CancelledError
from datetime import datetime
from typing import List, Optional

Expand All @@ -15,6 +15,7 @@
from redditrepostsleuth.core.db.databasemodels import Post
from redditrepostsleuth.core.db.db_utils import get_db_engine
from redditrepostsleuth.core.db.uow.unitofworkmanager import UnitOfWorkManager
from redditrepostsleuth.core.exception import RateLimitException, UtilApiException
from redditrepostsleuth.core.logging import configure_logger
from redditrepostsleuth.core.model.misc_models import BatchedPostRequestJob, JobStatus
from redditrepostsleuth.core.util.helpers import get_reddit_instance, get_newest_praw_post_id, get_next_ids, \
Expand All @@ -36,6 +37,7 @@

config = Config()
REMOVAL_REASONS_TO_SKIP = ['deleted', 'author', 'reddit', 'copyright_takedown']
HEADERS = {'User-Agent': 'u/RepostSleuthBot - Submission Ingest (by u/BarryCarey)'}


async def fetch_page(url: str, session: ClientSession) -> Optional[str]:
Expand All @@ -45,12 +47,20 @@ async def fetch_page(url: str, session: ClientSession) -> Optional[str]:
:param session: AIOHttp session to use
:return: raw response from request
"""
async with session.get(url, timeout=ClientTimeout(total=10)) as resp:
log.debug('Page fetch')

async with session.get(url, timeout=ClientTimeout(total=10), headers=HEADERS) as resp:
try:
if resp.status == 200:
log.debug('Successful fetch')
return await resp.text()
try:
return await resp.text()
except CancelledError:
log.error('Canceled on getting text')
raise UtilApiException('Canceled')
else:
if resp.status == 429:
raise RateLimitException('Data API rate limit')
log.info('Unexpected request status %s - %s', resp.status, url)
return
except (ClientOSError, TimeoutError):
Expand All @@ -68,11 +78,15 @@ async def fetch_page_as_job(job: BatchedPostRequestJob, session: ClientSession)
:rtype: BatchedPostRequestJob
"""
try:
async with session.get(job.url, timeout=ClientTimeout(total=10)) as resp:
async with session.get(job.url, timeout=ClientTimeout(total=10), headers=HEADERS) as resp:
if resp.status == 200:
log.debug('Successful fetch')
job.status = JobStatus.SUCCESS
log.debug('Fetching response text')
job.resp_data = await resp.text()
elif resp.status == 429:
log.warning('Data API Rate Limit')
job.status = JobStatus.RATELIMIT
else:
log.warning('Unexpected request status %s - %s', resp.status, job.url)
job.status = JobStatus.ERROR
Expand Down Expand Up @@ -106,14 +120,15 @@ async def ingest_range(newest_post_id: str, oldest_post_id: str) -> None:

tasks = []
conn = TCPConnector(limit=0)
async with ClientSession(connector=conn) as session:
async with ClientSession(connector=conn, headers=HEADERS) as session:
while True:
try:
chunk = list(itertools.islice(missing_ids, 100))
except StopIteration:
break

url = f'{config.util_api}/reddit/info?submission_ids={build_reddit_query_string(chunk)}'
#url = f'https://api.reddit.com/api/info?id={build_reddit_query_string(chunk)}'
job = BatchedPostRequestJob(url, chunk, JobStatus.STARTED)
tasks.append(ensure_future(fetch_page_as_job(job, session)))
if len(tasks) >= 50 or len(chunk) == 0:
Expand All @@ -139,10 +154,15 @@ async def ingest_range(newest_post_id: str, oldest_post_id: str) -> None:
else:
tasks.append(ensure_future(fetch_page_as_job(j, session)))

any_rate_limit = next((x for x in results if x.status == JobStatus.RATELIMIT), None)
if any_rate_limit:
log.info('Some jobs hit data rate limit, waiting')
await asyncio.sleep(10)

log.info('Sending %s posts to save queue', len(posts_to_save))

# save_new_posts.apply_async(([reddit_submission_to_post(submission) for submission in posts_to_save],))
save_new_posts.apply_async((posts_to_save,))
save_new_posts.apply_async((posts_to_save, True))
if len(chunk) == 0:
break

Expand Down Expand Up @@ -170,52 +190,60 @@ async def main() -> None:
oldest_id = oldest_post.post_id

await ingest_range(newest_id, oldest_id)
async with ClientSession() as session:
delay = 0
while True:
ids_to_get = get_next_ids(newest_id, 100)
url = f'{config.util_api}/reddit/info?submission_ids={build_reddit_query_string(ids_to_get)}'

delay = 0
while True:
ids_to_get = get_next_ids(newest_id, 100)
url = f'{config.util_api}/reddit/info?submission_ids={build_reddit_query_string(ids_to_get)}'
#url = f'https://api.reddit.com/api/info?id={build_reddit_query_string(ids_to_get)}'
async with ClientSession(headers=HEADERS) as session:
try:
log.debug('Sending fetch request')
results = await fetch_page(url, session)
except (ServerDisconnectedError, ClientConnectorError, ClientOSError, TimeoutError):
except (ServerDisconnectedError, ClientConnectorError, ClientOSError, TimeoutError, CancelledError, UtilApiException):
log.warning('Error during fetch')
await asyncio.sleep(2)
continue

if not results:
except RateLimitException:
log.warning('Hit Data API Rate Limit')
await asyncio.sleep(10)
continue

res_data = json.loads(results)
if not res_data or not len(res_data['data']['children']):
log.info('No results')
if not results:
log.debug('No results')
continue

res_data = json.loads(results)
if not res_data or not len(res_data['data']['children']):
log.info('No results')
continue

log.info('%s results returned from API', len(res_data['data']['children']))
if len(res_data['data']['children']) < 91:
delay += 1
log.debug('Delay increased by 1. Current delay: %s', delay)
else:
if delay > 0:
delay -= 1
log.debug('Delay decreased by 1. Current delay: %s', delay)

posts_to_save = []
for post in res_data['data']['children']:
if post['data']['removed_by_category'] in REMOVAL_REASONS_TO_SKIP:
continue
posts_to_save.append(post['data'])

log.info('%s results returned from API', len(res_data['data']['children']))
if len(res_data['data']['children']) < 90:
delay += 1
log.debug('Delay increased by 1. Current delay: %s', delay)
else:
if delay > 0:
delay -= 1
log.debug('Delay decreased by 1. Current delay: %s', delay)

posts_to_save = []
for post in res_data['data']['children']:
if post['data']['removed_by_category'] in REMOVAL_REASONS_TO_SKIP:
continue
posts_to_save.append(post['data'])

log.info('Sending %s posts to save queue', len(posts_to_save))
# queue_posts_for_ingest([reddit_submission_to_post(submission) for submission in posts_to_save])
queue_posts_for_ingest(posts_to_save)
log.info('Sending %s posts to save queue', len(posts_to_save))
# queue_posts_for_ingest([reddit_submission_to_post(submission) for submission in posts_to_save])
queue_posts_for_ingest(posts_to_save)

ingest_delay = datetime.utcnow() - datetime.utcfromtimestamp(
res_data['data']['children'][0]['data']['created_utc'])
log.info('Current Delay: %s', ingest_delay)
ingest_delay = datetime.utcnow() - datetime.utcfromtimestamp(
res_data['data']['children'][0]['data']['created_utc'])
log.info('Current Delay: %s', ingest_delay)

newest_id = res_data['data']['children'][-1]['data']['id']
newest_id = res_data['data']['children'][-1]['data']['id']

time.sleep(delay)
time.sleep(delay)


if __name__ == '__main__':
Expand Down

0 comments on commit 5b9efd3

Please sign in to comment.