diff --git a/docker-compose.yml b/docker-compose.yml index cc7b924..422e872 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -134,9 +134,9 @@ services: environment: - RUN_ENV=production - db_user=ingest - - LOG_LEVEL=ERROR + - LOG_LEVEL=INFO - CELERY_IMPORTS=redditrepostsleuth.core.celery.tasks.ingest_tasks - entrypoint: celery -A redditrepostsleuth.core.celery worker -Q post_ingest -n ingest_worker --autoscale=3,16 + entrypoint: celery -A redditrepostsleuth.core.celery worker -Q post_ingest -n ingest_worker --autoscale=16,1 link_repost_worker: container_name: link-repost-worker diff --git a/redditrepostsleuth/core/celery/task_logic/scheduled_task_logic.py b/redditrepostsleuth/core/celery/task_logic/scheduled_task_logic.py index c686307..3308dd5 100644 --- a/redditrepostsleuth/core/celery/task_logic/scheduled_task_logic.py +++ b/redditrepostsleuth/core/celery/task_logic/scheduled_task_logic.py @@ -57,35 +57,42 @@ def update_proxies(uowm: UnitOfWorkManager) -> None: ) uow.commit() -def update_top_reposts(uowm: UnitOfWorkManager): +def update_top_reposts(uow: UnitOfWork, post_type_id: int, day_range: int = None): # reddit.info(reddit_ids_to_lookup): - post_types = [2, 3] - day_ranges = [1, 7, 14, 30, 365, None] + log.info('Getting top repostors for post type %s with range %s', post_type_id, day_range) range_query = "SELECT repost_of_id, COUNT(*) c FROM repost WHERE detected_at > NOW() - INTERVAL :days DAY AND post_type_id=:posttype GROUP BY repost_of_id HAVING c > 5 ORDER BY c DESC" all_time_query = "SELECT repost_of_id, COUNT(*) c FROM repost WHERE post_type_id=:posttype GROUP BY repost_of_id HAVING c > 5 ORDER BY c DESC" - with uowm.start() as uow: - for post_type in post_types: - for days in day_ranges: - log.info('Getting top reposts for post type %s with range %s', post_type, days) - if days: - query = range_query - else: - query = all_time_query - uow.session.execute( - text('DELETE FROM stat_top_repost WHERE post_type_id=:posttype AND day_range=:days'), - {'posttype': post_type, 'days': days}) - uow.commit() - result = uow.session.execute(text(query), {'posttype': post_type, 'days': days}) - for row in result: - stat = StatsTopRepost() - stat.post_id = row[0] - stat.post_type_id = post_type - stat.day_range = days - stat.repost_count = row[1] - stat.updated_at = func.utc_timestamp() - stat.nsfw = False - uow.stat_top_repost.add(stat) - uow.commit() + if day_range: + query = range_query + uow.session.execute(text('DELETE FROM stat_top_repost WHERE post_type_id=:posttype AND day_range=:days'), + {'posttype': post_type_id, 'days': day_range}) + else: + query = all_time_query + uow.session.execute(text('DELETE FROM stat_top_repost WHERE post_type_id=:posttype AND day_range IS NULL'), + {'posttype': post_type_id}) + + uow.commit() + + + + result = uow.session.execute(text(query), {'posttype': post_type_id, 'days': day_range}) + for row in result: + stat = StatsTopRepost() + stat.post_id = row[0] + stat.post_type_id = post_type_id + stat.day_range = day_range + stat.repost_count = row[1] + stat.updated_at = func.utc_timestamp() + stat.nsfw = False + uow.stat_top_repost.add(stat) + uow.commit() + +def run_update_top_reposts(uow: UnitOfWork) -> None: + post_types = [1, 2, 3] + day_ranges = [1, 7, 14, 30, None] + for post_type_id in post_types: + for days in day_ranges: + update_top_reposts(uow, post_type_id, days) def update_top_reposters(uow: UnitOfWork, post_type_id: int, day_range: int = None) -> None: log.info('Getting top repostors for post type %s with range %s', post_type_id, day_range) diff --git a/redditrepostsleuth/core/celery/tasks/ingest_tasks.py b/redditrepostsleuth/core/celery/tasks/ingest_tasks.py index 37ce7c8..a2b86bb 100644 --- a/redditrepostsleuth/core/celery/tasks/ingest_tasks.py +++ b/redditrepostsleuth/core/celery/tasks/ingest_tasks.py @@ -12,7 +12,7 @@ @celery.task(bind=True, base=SqlAlchemyTask, ignore_reseults=True, serializer='pickle', autoretry_for=(ConnectionError,ImageConversionException,GalleryNotProcessed), retry_kwargs={'max_retries': 10, 'countdown': 300}) -def save_new_post(self, submission: dict): +def save_new_post(self, submission: dict, repost_check: bool = True): # TODO: temp fix until I can fix imgur gifs if 'imgur' in submission['url'] and 'gifv' in submission['url']: @@ -48,21 +48,22 @@ def save_new_post(self, submission: dict): log.exception('Database save failed: %s', str(e), exc_info=False) return - if post.post_type_id == 1: - celery.send_task('redditrepostsleuth.core.celery.tasks.repost_tasks.check_for_text_repost_task', args=[post]) - elif post.post_type_id == 2: - celery.send_task('redditrepostsleuth.core.celery.tasks.repost_tasks.check_image_repost_save', args=[post]) - elif post.post_type_id == 3: - celery.send_task('redditrepostsleuth.core.celery.tasks.repost_tasks.link_repost_check', args=[post]) + if repost_check: + if post.post_type_id == 1: + celery.send_task('redditrepostsleuth.core.celery.tasks.repost_tasks.check_for_text_repost_task', args=[post]) + elif post.post_type_id == 2: + celery.send_task('redditrepostsleuth.core.celery.tasks.repost_tasks.check_image_repost_save', args=[post]) + elif post.post_type_id == 3: + celery.send_task('redditrepostsleuth.core.celery.tasks.repost_tasks.link_repost_check', args=[post]) celery.send_task('redditrepostsleuth.core.celery.admin_tasks.check_user_for_only_fans', args=[post.author]) @celery.task -def save_new_posts(posts: list[dict]) -> None: +def save_new_posts(posts: list[dict], repost_check: bool = True) -> None: for post in posts: - save_new_post.apply_async((post,)) + save_new_post.apply_async((post, repost_check)) @celery.task(bind=True, base=SqlAlchemyTask, ignore_results=True) def save_pushshift_results(self, data): diff --git a/redditrepostsleuth/core/celery/tasks/scheduled_tasks.py b/redditrepostsleuth/core/celery/tasks/scheduled_tasks.py index 61c5af3..7f2b117 100644 --- a/redditrepostsleuth/core/celery/tasks/scheduled_tasks.py +++ b/redditrepostsleuth/core/celery/tasks/scheduled_tasks.py @@ -9,7 +9,7 @@ from redditrepostsleuth.core.celery import celery from redditrepostsleuth.core.celery.basetasks import RedditTask, SqlAlchemyTask, AdminTask from redditrepostsleuth.core.celery.task_logic.scheduled_task_logic import update_proxies, update_top_reposts, \ - token_checker, run_update_top_reposters, update_top_reposters, update_monitored_sub_data + token_checker, run_update_top_reposters, update_top_reposters, update_monitored_sub_data, run_update_top_reposts from redditrepostsleuth.core.db.databasemodels import MonitoredSub, StatsDailyCount from redditrepostsleuth.core.logging import configure_logger from redditrepostsleuth.core.util.reddithelpers import is_sub_mod_praw, get_bot_permissions @@ -178,6 +178,13 @@ def update_daily_stats(self): log.exception('Problem updating stats') +@celery.task(bind=True, base=SqlAlchemyTask) +def update_all_top_reposts_task(self): + try: + with self.uowm.start() as uow: + run_update_top_reposts(uow) + except Exception as e: + log.exception('Unknown task error') @celery.task(bind=True, base=SqlAlchemyTask) def update_all_top_reposters_task(self): diff --git a/redditrepostsleuth/core/db/databasemodels.py b/redditrepostsleuth/core/db/databasemodels.py index 1de399e..56ee65e 100644 --- a/redditrepostsleuth/core/db/databasemodels.py +++ b/redditrepostsleuth/core/db/databasemodels.py @@ -47,6 +47,7 @@ def __repr__(self) -> str: reports = relationship('UserReport', back_populates='post') hashes = relationship('PostHash', back_populates='post') post_type = relationship('PostType') # lazy has to be set to JSON encoders don't fail for unbound session + #post_type = relationship('PostType', lazy='joined') def to_dict(self): return { diff --git a/redditrepostsleuth/core/model/misc_models.py b/redditrepostsleuth/core/model/misc_models.py index c883bf9..89fde2e 100644 --- a/redditrepostsleuth/core/model/misc_models.py +++ b/redditrepostsleuth/core/model/misc_models.py @@ -11,6 +11,7 @@ class JobStatus(Enum): TIMEOUT = auto() PROXYERROR = auto() ERROR = auto() + RATELIMIT = auto() @dataclass class BatchedPostRequestJob: diff --git a/redditrepostsleuth/core/services/responsebuilder.py b/redditrepostsleuth/core/services/responsebuilder.py index 5cc12b7..17d3fef 100644 --- a/redditrepostsleuth/core/services/responsebuilder.py +++ b/redditrepostsleuth/core/services/responsebuilder.py @@ -130,7 +130,7 @@ def build_sub_comment( try: return self.build_default_comment(search_results, message, **kwargs) - except KeyError: + except KeyError as e: log.warning('Custom repost template for %s has a bad slug: %s', monitored_sub.name, monitored_sub.repost_response_template) return self.build_default_comment(search_results, **kwargs) diff --git a/redditrepostsleuth/ingestsvc/ingestsvc.py b/redditrepostsleuth/ingestsvc/ingestsvc.py index 1ff762e..46c65c3 100644 --- a/redditrepostsleuth/ingestsvc/ingestsvc.py +++ b/redditrepostsleuth/ingestsvc/ingestsvc.py @@ -3,7 +3,7 @@ import json import os import time -from asyncio import ensure_future, gather, run, TimeoutError +from asyncio import ensure_future, gather, run, TimeoutError, CancelledError from datetime import datetime from typing import List, Optional @@ -15,6 +15,7 @@ from redditrepostsleuth.core.db.databasemodels import Post from redditrepostsleuth.core.db.db_utils import get_db_engine from redditrepostsleuth.core.db.uow.unitofworkmanager import UnitOfWorkManager +from redditrepostsleuth.core.exception import RateLimitException, UtilApiException from redditrepostsleuth.core.logging import configure_logger from redditrepostsleuth.core.model.misc_models import BatchedPostRequestJob, JobStatus from redditrepostsleuth.core.util.helpers import get_reddit_instance, get_newest_praw_post_id, get_next_ids, \ @@ -36,6 +37,7 @@ config = Config() REMOVAL_REASONS_TO_SKIP = ['deleted', 'author', 'reddit', 'copyright_takedown'] +HEADERS = {'User-Agent': 'u/RepostSleuthBot - Submission Ingest (by u/BarryCarey)'} async def fetch_page(url: str, session: ClientSession) -> Optional[str]: @@ -45,12 +47,20 @@ async def fetch_page(url: str, session: ClientSession) -> Optional[str]: :param session: AIOHttp session to use :return: raw response from request """ - async with session.get(url, timeout=ClientTimeout(total=10)) as resp: + log.debug('Page fetch') + + async with session.get(url, timeout=ClientTimeout(total=10), headers=HEADERS) as resp: try: if resp.status == 200: log.debug('Successful fetch') - return await resp.text() + try: + return await resp.text() + except CancelledError: + log.error('Canceled on getting text') + raise UtilApiException('Canceled') else: + if resp.status == 429: + raise RateLimitException('Data API rate limit') log.info('Unexpected request status %s - %s', resp.status, url) return except (ClientOSError, TimeoutError): @@ -68,11 +78,15 @@ async def fetch_page_as_job(job: BatchedPostRequestJob, session: ClientSession) :rtype: BatchedPostRequestJob """ try: - async with session.get(job.url, timeout=ClientTimeout(total=10)) as resp: + async with session.get(job.url, timeout=ClientTimeout(total=10), headers=HEADERS) as resp: if resp.status == 200: log.debug('Successful fetch') job.status = JobStatus.SUCCESS + log.debug('Fetching response text') job.resp_data = await resp.text() + elif resp.status == 429: + log.warning('Data API Rate Limit') + job.status = JobStatus.RATELIMIT else: log.warning('Unexpected request status %s - %s', resp.status, job.url) job.status = JobStatus.ERROR @@ -106,7 +120,7 @@ async def ingest_range(newest_post_id: str, oldest_post_id: str) -> None: tasks = [] conn = TCPConnector(limit=0) - async with ClientSession(connector=conn) as session: + async with ClientSession(connector=conn, headers=HEADERS) as session: while True: try: chunk = list(itertools.islice(missing_ids, 100)) @@ -114,6 +128,7 @@ async def ingest_range(newest_post_id: str, oldest_post_id: str) -> None: break url = f'{config.util_api}/reddit/info?submission_ids={build_reddit_query_string(chunk)}' + #url = f'https://api.reddit.com/api/info?id={build_reddit_query_string(chunk)}' job = BatchedPostRequestJob(url, chunk, JobStatus.STARTED) tasks.append(ensure_future(fetch_page_as_job(job, session))) if len(tasks) >= 50 or len(chunk) == 0: @@ -139,10 +154,15 @@ async def ingest_range(newest_post_id: str, oldest_post_id: str) -> None: else: tasks.append(ensure_future(fetch_page_as_job(j, session))) + any_rate_limit = next((x for x in results if x.status == JobStatus.RATELIMIT), None) + if any_rate_limit: + log.info('Some jobs hit data rate limit, waiting') + await asyncio.sleep(10) + log.info('Sending %s posts to save queue', len(posts_to_save)) # save_new_posts.apply_async(([reddit_submission_to_post(submission) for submission in posts_to_save],)) - save_new_posts.apply_async((posts_to_save,)) + save_new_posts.apply_async((posts_to_save, True)) if len(chunk) == 0: break @@ -170,52 +190,60 @@ async def main() -> None: oldest_id = oldest_post.post_id await ingest_range(newest_id, oldest_id) - async with ClientSession() as session: - delay = 0 - while True: - ids_to_get = get_next_ids(newest_id, 100) - url = f'{config.util_api}/reddit/info?submission_ids={build_reddit_query_string(ids_to_get)}' + + delay = 0 + while True: + ids_to_get = get_next_ids(newest_id, 100) + url = f'{config.util_api}/reddit/info?submission_ids={build_reddit_query_string(ids_to_get)}' + #url = f'https://api.reddit.com/api/info?id={build_reddit_query_string(ids_to_get)}' + async with ClientSession(headers=HEADERS) as session: try: + log.debug('Sending fetch request') results = await fetch_page(url, session) - except (ServerDisconnectedError, ClientConnectorError, ClientOSError, TimeoutError): + except (ServerDisconnectedError, ClientConnectorError, ClientOSError, TimeoutError, CancelledError, UtilApiException): log.warning('Error during fetch') await asyncio.sleep(2) continue - - if not results: + except RateLimitException: + log.warning('Hit Data API Rate Limit') + await asyncio.sleep(10) continue - res_data = json.loads(results) - if not res_data or not len(res_data['data']['children']): - log.info('No results') + if not results: + log.debug('No results') + continue + + res_data = json.loads(results) + if not res_data or not len(res_data['data']['children']): + log.info('No results') + continue + + log.info('%s results returned from API', len(res_data['data']['children'])) + if len(res_data['data']['children']) < 91: + delay += 1 + log.debug('Delay increased by 1. Current delay: %s', delay) + else: + if delay > 0: + delay -= 1 + log.debug('Delay decreased by 1. Current delay: %s', delay) + + posts_to_save = [] + for post in res_data['data']['children']: + if post['data']['removed_by_category'] in REMOVAL_REASONS_TO_SKIP: continue + posts_to_save.append(post['data']) - log.info('%s results returned from API', len(res_data['data']['children'])) - if len(res_data['data']['children']) < 90: - delay += 1 - log.debug('Delay increased by 1. Current delay: %s', delay) - else: - if delay > 0: - delay -= 1 - log.debug('Delay decreased by 1. Current delay: %s', delay) - - posts_to_save = [] - for post in res_data['data']['children']: - if post['data']['removed_by_category'] in REMOVAL_REASONS_TO_SKIP: - continue - posts_to_save.append(post['data']) - - log.info('Sending %s posts to save queue', len(posts_to_save)) - # queue_posts_for_ingest([reddit_submission_to_post(submission) for submission in posts_to_save]) - queue_posts_for_ingest(posts_to_save) + log.info('Sending %s posts to save queue', len(posts_to_save)) + # queue_posts_for_ingest([reddit_submission_to_post(submission) for submission in posts_to_save]) + queue_posts_for_ingest(posts_to_save) - ingest_delay = datetime.utcnow() - datetime.utcfromtimestamp( - res_data['data']['children'][0]['data']['created_utc']) - log.info('Current Delay: %s', ingest_delay) + ingest_delay = datetime.utcnow() - datetime.utcfromtimestamp( + res_data['data']['children'][0]['data']['created_utc']) + log.info('Current Delay: %s', ingest_delay) - newest_id = res_data['data']['children'][-1]['data']['id'] + newest_id = res_data['data']['children'][-1]['data']['id'] - time.sleep(delay) + time.sleep(delay) if __name__ == '__main__':