diff --git a/alembic/env.py b/alembic/env.py index ba064ef..1b8714f 100644 --- a/alembic/env.py +++ b/alembic/env.py @@ -40,10 +40,7 @@ target_metadata = Base.metadata def get_conn_string(): - conn_str = r'mysql+pymysql://barry:Tovalu88!@192.168.1.194/reddit_dev' - return conn_str - #return f'mysql+pymysql://{bot_config.db_user}:{quote_plus(bot_config.db_password)}@{bot_config.db_host}/{bot_config.db_name}' - + return f'mysql+pymysql://{bot_config.db_user}:{quote_plus(bot_config.db_password)}@{bot_config.db_host}/{bot_config.db_name}' # other values from the config, defined by the needs of env.py, # can be acquired: diff --git a/docker-compose.yml b/docker-compose.yml index fb50975..11a78dd 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -31,6 +31,22 @@ services: - CELERY_IMPORTS=redditrepostsleuth.core.celery.admin_tasks,redditrepostsleuth.core.celery.tasks.scheduled_tasks entrypoint: celery -A redditrepostsleuth.core.celery worker -Q scheduled_tasks -n scheduled_task_worker --autoscale=15,2 + subreddit_update_worker: + container_name: subreddit_update_worker + restart: unless-stopped + user: "1001" + build: + context: . + dockerfile: docker/WorkerDockerFile + env_file: + - .env + environment: + - RUN_ENV=production + - db_user=maintenance_task + - LOG_LEVEL=DEBUG + - CELERY_IMPORTS=redditrepostsleuth.core.celery.tasks.maintenance_tasks + entrypoint: celery -A redditrepostsleuth.core.celery worker -Q update_subreddit_data -n subreddit_update_worker --autoscale=1,4 + scheduler: container_name: beat_scheduler restart: unless-stopped diff --git a/redditrepostsleuth/core/celery/celeryconfig.py b/redditrepostsleuth/core/celery/celeryconfig.py index 675e0b7..843c198 100644 --- a/redditrepostsleuth/core/celery/celeryconfig.py +++ b/redditrepostsleuth/core/celery/celeryconfig.py @@ -16,6 +16,7 @@ task_routes = { 'redditrepostsleuth.core.celery.tasks.ingest_tasks.save_new_post': {'queue': 'post_ingest'}, 'redditrepostsleuth.core.celery.tasks.ingest_tasks.save_new_posts': {'queue': 'post_ingest'}, + 'redditrepostsleuth.core.celery.tasks.ingest_tasks.save_subreddit': {'queue': 'save_subreddit'}, 'redditrepostsleuth.core.celery.tasks.ingest_tasks.ingest_repost_check': {'queue': 'repost'}, 'redditrepostsleuth.core.celery.tasks.repost_tasks.check_image_repost_save': {'queue': 'repost_image'}, 'redditrepostsleuth.core.celery.tasks.repost_tasks.link_repost_check': {'queue': 'repost_link'}, @@ -31,6 +32,8 @@ 'redditrepostsleuth.core.celery.admin_tasks.update_subreddit_config_from_database': {'queue': 'update_wiki_from_database'}, #'redditrepostsleuth.core.celery.admin_tasks.delete_search_batch': {'queue': 'batch_delete_searches'}, 'redditrepostsleuth.core.celery.tasks.reddit_action_tasks.*': {'queue': 'reddit_actions'}, + 'redditrepostsleuth.core.celery.tasks.maintenance_tasks.update_subreddit_data': {'queue': 'update_subreddit_data'}, + 'redditrepostsleuth.core.celery.tasks.maintenance_tasks.save_subreddit': {'queue': 'update_subreddit_data'} } diff --git a/redditrepostsleuth/core/celery/tasks/ingest_tasks.py b/redditrepostsleuth/core/celery/tasks/ingest_tasks.py index 176e8dc..e2cf736 100644 --- a/redditrepostsleuth/core/celery/tasks/ingest_tasks.py +++ b/redditrepostsleuth/core/celery/tasks/ingest_tasks.py @@ -4,6 +4,7 @@ from datetime import datetime, timedelta from time import perf_counter from typing import Optional +from urllib.parse import urlparse import requests from celery import Task @@ -14,6 +15,7 @@ from redditrepostsleuth.core.celery.basetasks import SqlAlchemyTask from redditrepostsleuth.core.celery.task_logic.ingest_task_logic import pre_process_post, get_redgif_image_url from redditrepostsleuth.core.config import Config +from redditrepostsleuth.core.db.databasemodels import Subreddit from redditrepostsleuth.core.db.db_utils import get_db_engine from redditrepostsleuth.core.db.uow.unitofworkmanager import UnitOfWorkManager from redditrepostsleuth.core.exception import InvalidImageUrlException, GalleryNotProcessed, ImageConversionException, \ @@ -42,6 +44,22 @@ def __init__(self): self._proxy_manager = ProxyManager(self.uowm, 1000) self.domains_to_proxy = [] +@celery.task(bind=True, base=IngestTask, ignore_reseults=True, serializer='pickle') +def save_subreddit(self, subreddit_name: str): + try: + with self.uowm.start() as uow: + existing = uow.subreddit.get_by_name(subreddit_name) + if existing: + log.debug('Subreddit %s already exists', subreddit_name) + return + subreddit = Subreddit(name=subreddit_name) + uow.subreddit.add(subreddit) + uow.commit() + log.debug('Saved Subreddit %s', subreddit_name) + celery.send_task('redditrepostsleuth.core.celery.tasks.maintenance_tasks.update_subreddit_data', args=[subreddit_name]) + except Exception as e: + log.exception() + @celery.task(bind=True, base=IngestTask, ignore_reseults=True, serializer='pickle', autoretry_for=(ConnectionError,ImageConversionException,GalleryNotProcessed, HTTPException), retry_kwargs={'max_retries': 10, 'countdown': 300}) def save_new_post(self, submission: dict, repost_check: bool = True): @@ -50,13 +68,20 @@ def save_new_post(self, submission: dict, repost_check: bool = True): 'measurement': 'Post_Ingest', #'time': datetime.utcnow().timestamp(), 'fields': { - 'run_time': None + 'run_time': None, + 'post_id': submission.get('id', None) }, 'tags': { 'post_type': None, + 'domain': None } } + # Adding for timing in Grafana + url = submission.get('url', None) + if url: + save_event['tags']['domain'] = urlparse(url).netloc + # TODO: temp fix until I can fix imgur gifs if 'imgur' in submission['url'] and 'gifv' in submission['url']: return @@ -110,7 +135,7 @@ def save_new_post(self, submission: dict, repost_check: bool = True): elif post.post_type_id == 3: celery.send_task('redditrepostsleuth.core.celery.tasks.repost_tasks.link_repost_check', args=[post]) - #celery.send_task('redditrepostsleuth.core.celery.admin_tasks.check_user_for_only_fans', args=[post.author]) + celery.send_task('redditrepostsleuth.core.celery.tasks.maintenance_tasks.save_subreddit', args=[post.subreddit]) @@ -119,6 +144,7 @@ def save_new_posts(posts: list[dict], repost_check: bool = True) -> None: for post in posts: save_new_post.apply_async((post, repost_check)) + @celery.task(bind=True, base=SqlAlchemyTask, ignore_results=True) def save_pushshift_results(self, data): with self.uowm.start() as uow: diff --git a/redditrepostsleuth/core/celery/tasks/maintenance_tasks.py b/redditrepostsleuth/core/celery/tasks/maintenance_tasks.py new file mode 100644 index 0000000..2ffb501 --- /dev/null +++ b/redditrepostsleuth/core/celery/tasks/maintenance_tasks.py @@ -0,0 +1,52 @@ +import datetime + +import requests + +from redditrepostsleuth.core.celery import celery +from redditrepostsleuth.core.celery.basetasks import SqlAlchemyTask +from redditrepostsleuth.core.db.databasemodels import Subreddit +from redditrepostsleuth.core.exception import UtilApiException +from redditrepostsleuth.core.logging import configure_logger + +log = configure_logger( + name='redditrepostsleuth', +) + + +@celery.task(bind=True, base=SqlAlchemyTask, autoretry_for=(UtilApiException,), retry_kwards={'max_retries': 50, 'countdown': 600}) +def update_subreddit_data(self, subreddit_name) -> None: + try: + with self.uowm.start() as uow: + subreddit = uow.subreddit.get_by_name(subreddit_name) + url_to_fetch = f'{self.config.util_api}/reddit/subreddit?name={subreddit.name}' + res = requests.get(url_to_fetch) + if res.status_code != 200: + log.error('Bad status %s from util API when checking subreddit %s', res.status_code, subreddit.name) + raise UtilApiException(f'Bad status {res.status_code} checking {subreddit_name}') + + subreddit_data = res.json()['data'] + subreddit.subscribers = subreddit_data['subscribers'] or 0 + subreddit.nsfw = subreddit_data['over18'] or False + subreddit.last_checked = datetime.datetime.now(datetime.UTC) + uow.commit() + log.debug('Update subreddit data for %s. NSFW: %s - Subscribers: %s', subreddit.name, subreddit.nsfw, subreddit.subscribers) + except UtilApiException as e: + raise e + except Exception as e: + log.exception('') + +@celery.task(bind=True, base=SqlAlchemyTask, ignore_reseults=True, serializer='pickle') +def save_subreddit(self, subreddit_name: str): + try: + with self.uowm.start() as uow: + existing = uow.subreddit.get_by_name(subreddit_name) + if existing: + log.debug('Subreddit %s already exists', subreddit_name) + return + subreddit = Subreddit(name=subreddit_name) + uow.subreddit.add(subreddit) + uow.commit() + log.debug('Saved Subreddit %s', subreddit_name) + update_subreddit_data.apply_async((subreddit_name,)) + except Exception as e: + log.exception('') \ No newline at end of file diff --git a/redditrepostsleuth/core/celery/tasks/scheduled_tasks.py b/redditrepostsleuth/core/celery/tasks/scheduled_tasks.py index 81a0bd9..bf77ed8 100644 --- a/redditrepostsleuth/core/celery/tasks/scheduled_tasks.py +++ b/redditrepostsleuth/core/celery/tasks/scheduled_tasks.py @@ -2,6 +2,7 @@ from functools import wraps from time import perf_counter +import requests from prawcore import TooManyRequests, Redirect, ServerError, NotFound from redditrepostsleuth.adminsvc.bot_comment_monitor import BotCommentMonitor @@ -14,6 +15,7 @@ from redditrepostsleuth.core.celery.task_logic.scheduled_task_logic import update_proxies, token_checker, \ run_update_top_reposters, update_top_reposters, update_monitored_sub_data, run_update_top_reposts from redditrepostsleuth.core.db.databasemodels import StatsDailyCount +from redditrepostsleuth.core.exception import UtilApiException from redditrepostsleuth.core.logging import configure_logger from redditrepostsleuth.core.util.helpers import chunk_list @@ -321,4 +323,13 @@ def queue_search_history_cleanup(self): log.info('Queuing Search History Cleanup. Range: ID Range: %s:%s', searches[0].id, searches[-1].id) ids = [x[0] for x in searches] for chunk in chunk_list(ids, 5000): - delete_search_batch.apply_async((chunk,)) \ No newline at end of file + delete_search_batch.apply_async((chunk,)) + +@celery.task(bind=True, base=RedditTask, autoretry_for=(UtilApiException,), retry_kwards={'max_retries': 5}) +@record_task_status +def queue_subreddit_data_updates(self) -> None: + with self.uowm.start() as uow: + subreddits_to_update = uow.subreddit.get_subreddits_to_update() + for subreddit in subreddits_to_update: + celery.send_task('redditrepostsleuth.core.celery.tasks.maintenance_tasks.save_subreddit', + args=[subreddit.name]) \ No newline at end of file diff --git a/redditrepostsleuth/core/db/databasemodels.py b/redditrepostsleuth/core/db/databasemodels.py index 6adb105..72d55cd 100644 --- a/redditrepostsleuth/core/db/databasemodels.py +++ b/redditrepostsleuth/core/db/databasemodels.py @@ -262,6 +262,7 @@ class Repost(Base): Index('idx_repost_by_type', 'post_type_id', 'detected_at', unique=False), Index('idx_repost_of_date', 'author', 'detected_at',unique=False), Index('idx_repost_by_subreddit', 'subreddit', 'post_type_id', 'detected_at', unique=False), + Index('idx_repost_by_author', 'author', unique=False), ) id = Column(Integer, primary_key=True) post_id = Column(Integer, ForeignKey('post.id')) @@ -758,4 +759,19 @@ class UserReview(Base): content_links_found = Column(Boolean, default=False) added_at = Column(DateTime, default=func.utc_timestamp(), nullable=False) notes = Column(String(150)) - last_checked = Column(DateTime, default=func.utc_timestamp()) \ No newline at end of file + last_checked = Column(DateTime, default=func.utc_timestamp()) + +class Subreddit(Base): + __tablename__ = 'subreddit' + __table_args__ = ( + Index('idx_subreddit_name', 'name'), + ) + id = Column(Integer, primary_key=True) + name = Column(String(25), nullable=False, unique=True) + subscribers = Column(Integer, nullable=False, default=0) + nsfw = Column(Boolean, nullable=False, default=False) + added_at = Column(DateTime, default=func.utc_timestamp(), nullable=False) + bot_banned = Column(Boolean, nullable=False, default=False) + bot_banned_at = Column(DateTime) + last_ban_check = Column(DateTime) + last_checked = Column(DateTime) \ No newline at end of file diff --git a/redditrepostsleuth/core/db/repository/repost_repo.py b/redditrepostsleuth/core/db/repository/repost_repo.py index cf465f1..f691be2 100644 --- a/redditrepostsleuth/core/db/repository/repost_repo.py +++ b/redditrepostsleuth/core/db/repository/repost_repo.py @@ -19,6 +19,9 @@ def get_all(self, limit: int = None, offset: int = None) -> List[Repost]: def get_all_by_type(self, post_type_id: int, limit: None, offset: None) -> list[Repost]: return self.db_session.query(Repost).filter(Repost.post_type_id == post_type_id).order_by(Repost.id.desc()).offset(offset).limit(limit).all() + def get_by_author(self, author: str) -> List[Repost]: + return self.db_session.query(Repost).filter(Repost.author == author).all() + def get_all_without_author(self, limit: int = None, offset: int = None): return self.db_session.query(Repost).filter(Repost.author == None).order_by(Repost.id.desc()).offset(offset).limit(limit).all() diff --git a/redditrepostsleuth/core/db/repository/subreddit_repo.py b/redditrepostsleuth/core/db/repository/subreddit_repo.py new file mode 100644 index 0000000..fe885fa --- /dev/null +++ b/redditrepostsleuth/core/db/repository/subreddit_repo.py @@ -0,0 +1,20 @@ +import datetime + +from sqlalchemy import or_ + +from redditrepostsleuth.core.db.databasemodels import Subreddit + + +class SubredditRepo: + def __init__(self, db_session): + self.db_session = db_session + + def add(self, item): + self.db_session.add(item) + + def get_by_name(self, name: str): + return self.db_session.query(Subreddit).filter(Subreddit.name == name).first() + + def get_subreddits_to_update(self, limit: int = None, offset: int = None) -> list[Subreddit]: + delta = datetime.datetime.now(datetime.UTC) - datetime.timedelta(days=3) + return self.db_session.query(Subreddit).filter(or_(Subreddit.added_at < delta, Subreddit.last_checked == None)).limit(limit).offset(offset).all() \ No newline at end of file diff --git a/redditrepostsleuth/core/db/uow/unitofwork.py b/redditrepostsleuth/core/db/uow/unitofwork.py index 0aac0ed..dbdb571 100644 --- a/redditrepostsleuth/core/db/uow/unitofwork.py +++ b/redditrepostsleuth/core/db/uow/unitofwork.py @@ -1,5 +1,6 @@ from sqlalchemy.orm import scoped_session +from redditrepostsleuth.core.db.databasemodels import Subreddit from redditrepostsleuth.core.db.repository.banned_subreddit_repo import BannedSubredditRepo from redditrepostsleuth.core.db.repository.banned_user_repo import BannedUserRepo from redditrepostsleuth.core.db.repository.bot_private_message_repo import BotPrivateMessageRepo @@ -27,6 +28,7 @@ from redditrepostsleuth.core.db.repository.stat_daily_count_repo import StatDailyCountRepo from redditrepostsleuth.core.db.repository.stat_top_repost_repo import StatTopRepostRepo from redditrepostsleuth.core.db.repository.stats_top_reposter_repo import StatTopReposterRepo +from redditrepostsleuth.core.db.repository.subreddit_repo import SubredditRepo from redditrepostsleuth.core.db.repository.summonsrepository import SummonsRepository from redditrepostsleuth.core.db.repository.user_report_repo import UserReportRepo from redditrepostsleuth.core.db.repository.user_review_repo import UserReviewRepo @@ -175,4 +177,8 @@ def post_type(self) -> PostTypeRepo: @property def user_whitelist(self) -> UserWhitelistRepo: - return UserWhitelistRepo(self.session) \ No newline at end of file + return UserWhitelistRepo(self.session) + + @property + def subreddit(self) -> SubredditRepo: + return SubredditRepo(self.session) \ No newline at end of file