Skip to content

Commit

Permalink
Feature/store subreddit data (#382)
Browse files Browse the repository at this point in the history
* ingest save time tracking by post type

* Store subreddit data

* Store subreddit data
  • Loading branch information
barrycarey authored Dec 10, 2024
1 parent 58e284c commit 0fc5a11
Show file tree
Hide file tree
Showing 10 changed files with 159 additions and 9 deletions.
5 changes: 1 addition & 4 deletions alembic/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@
target_metadata = Base.metadata

def get_conn_string():
conn_str = r'mysql+pymysql://barry:[email protected]/reddit_dev'
return conn_str
#return f'mysql+pymysql://{bot_config.db_user}:{quote_plus(bot_config.db_password)}@{bot_config.db_host}/{bot_config.db_name}'

return f'mysql+pymysql://{bot_config.db_user}:{quote_plus(bot_config.db_password)}@{bot_config.db_host}/{bot_config.db_name}'

# other values from the config, defined by the needs of env.py,
# can be acquired:
Expand Down
16 changes: 16 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,22 @@ services:
- CELERY_IMPORTS=redditrepostsleuth.core.celery.admin_tasks,redditrepostsleuth.core.celery.tasks.scheduled_tasks
entrypoint: celery -A redditrepostsleuth.core.celery worker -Q scheduled_tasks -n scheduled_task_worker --autoscale=15,2

subreddit_update_worker:
container_name: subreddit_update_worker
restart: unless-stopped
user: "1001"
build:
context: .
dockerfile: docker/WorkerDockerFile
env_file:
- .env
environment:
- RUN_ENV=production
- db_user=maintenance_task
- LOG_LEVEL=DEBUG
- CELERY_IMPORTS=redditrepostsleuth.core.celery.tasks.maintenance_tasks
entrypoint: celery -A redditrepostsleuth.core.celery worker -Q update_subreddit_data -n subreddit_update_worker --autoscale=1,4

scheduler:
container_name: beat_scheduler
restart: unless-stopped
Expand Down
3 changes: 3 additions & 0 deletions redditrepostsleuth/core/celery/celeryconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
task_routes = {
'redditrepostsleuth.core.celery.tasks.ingest_tasks.save_new_post': {'queue': 'post_ingest'},
'redditrepostsleuth.core.celery.tasks.ingest_tasks.save_new_posts': {'queue': 'post_ingest'},
'redditrepostsleuth.core.celery.tasks.ingest_tasks.save_subreddit': {'queue': 'save_subreddit'},
'redditrepostsleuth.core.celery.tasks.ingest_tasks.ingest_repost_check': {'queue': 'repost'},
'redditrepostsleuth.core.celery.tasks.repost_tasks.check_image_repost_save': {'queue': 'repost_image'},
'redditrepostsleuth.core.celery.tasks.repost_tasks.link_repost_check': {'queue': 'repost_link'},
Expand All @@ -31,6 +32,8 @@
'redditrepostsleuth.core.celery.admin_tasks.update_subreddit_config_from_database': {'queue': 'update_wiki_from_database'},
#'redditrepostsleuth.core.celery.admin_tasks.delete_search_batch': {'queue': 'batch_delete_searches'},
'redditrepostsleuth.core.celery.tasks.reddit_action_tasks.*': {'queue': 'reddit_actions'},
'redditrepostsleuth.core.celery.tasks.maintenance_tasks.update_subreddit_data': {'queue': 'update_subreddit_data'},
'redditrepostsleuth.core.celery.tasks.maintenance_tasks.save_subreddit': {'queue': 'update_subreddit_data'}


}
Expand Down
30 changes: 28 additions & 2 deletions redditrepostsleuth/core/celery/tasks/ingest_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from datetime import datetime, timedelta
from time import perf_counter
from typing import Optional
from urllib.parse import urlparse

import requests
from celery import Task
Expand All @@ -14,6 +15,7 @@
from redditrepostsleuth.core.celery.basetasks import SqlAlchemyTask
from redditrepostsleuth.core.celery.task_logic.ingest_task_logic import pre_process_post, get_redgif_image_url
from redditrepostsleuth.core.config import Config
from redditrepostsleuth.core.db.databasemodels import Subreddit
from redditrepostsleuth.core.db.db_utils import get_db_engine
from redditrepostsleuth.core.db.uow.unitofworkmanager import UnitOfWorkManager
from redditrepostsleuth.core.exception import InvalidImageUrlException, GalleryNotProcessed, ImageConversionException, \
Expand Down Expand Up @@ -42,6 +44,22 @@ def __init__(self):
self._proxy_manager = ProxyManager(self.uowm, 1000)
self.domains_to_proxy = []

@celery.task(bind=True, base=IngestTask, ignore_reseults=True, serializer='pickle')
def save_subreddit(self, subreddit_name: str):
try:
with self.uowm.start() as uow:
existing = uow.subreddit.get_by_name(subreddit_name)
if existing:
log.debug('Subreddit %s already exists', subreddit_name)
return
subreddit = Subreddit(name=subreddit_name)
uow.subreddit.add(subreddit)
uow.commit()
log.debug('Saved Subreddit %s', subreddit_name)
celery.send_task('redditrepostsleuth.core.celery.tasks.maintenance_tasks.update_subreddit_data', args=[subreddit_name])
except Exception as e:
log.exception()

@celery.task(bind=True, base=IngestTask, ignore_reseults=True, serializer='pickle', autoretry_for=(ConnectionError,ImageConversionException,GalleryNotProcessed, HTTPException), retry_kwargs={'max_retries': 10, 'countdown': 300})
def save_new_post(self, submission: dict, repost_check: bool = True):

Expand All @@ -50,13 +68,20 @@ def save_new_post(self, submission: dict, repost_check: bool = True):
'measurement': 'Post_Ingest',
#'time': datetime.utcnow().timestamp(),
'fields': {
'run_time': None
'run_time': None,
'post_id': submission.get('id', None)
},
'tags': {
'post_type': None,
'domain': None
}
}

# Adding for timing in Grafana
url = submission.get('url', None)
if url:
save_event['tags']['domain'] = urlparse(url).netloc

# TODO: temp fix until I can fix imgur gifs
if 'imgur' in submission['url'] and 'gifv' in submission['url']:
return
Expand Down Expand Up @@ -110,7 +135,7 @@ def save_new_post(self, submission: dict, repost_check: bool = True):
elif post.post_type_id == 3:
celery.send_task('redditrepostsleuth.core.celery.tasks.repost_tasks.link_repost_check', args=[post])

#celery.send_task('redditrepostsleuth.core.celery.admin_tasks.check_user_for_only_fans', args=[post.author])
celery.send_task('redditrepostsleuth.core.celery.tasks.maintenance_tasks.save_subreddit', args=[post.subreddit])



Expand All @@ -119,6 +144,7 @@ def save_new_posts(posts: list[dict], repost_check: bool = True) -> None:
for post in posts:
save_new_post.apply_async((post, repost_check))


@celery.task(bind=True, base=SqlAlchemyTask, ignore_results=True)
def save_pushshift_results(self, data):
with self.uowm.start() as uow:
Expand Down
52 changes: 52 additions & 0 deletions redditrepostsleuth/core/celery/tasks/maintenance_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import datetime

import requests

from redditrepostsleuth.core.celery import celery
from redditrepostsleuth.core.celery.basetasks import SqlAlchemyTask
from redditrepostsleuth.core.db.databasemodels import Subreddit
from redditrepostsleuth.core.exception import UtilApiException
from redditrepostsleuth.core.logging import configure_logger

log = configure_logger(
name='redditrepostsleuth',
)


@celery.task(bind=True, base=SqlAlchemyTask, autoretry_for=(UtilApiException,), retry_kwards={'max_retries': 50, 'countdown': 600})
def update_subreddit_data(self, subreddit_name) -> None:
try:
with self.uowm.start() as uow:
subreddit = uow.subreddit.get_by_name(subreddit_name)
url_to_fetch = f'{self.config.util_api}/reddit/subreddit?name={subreddit.name}'
res = requests.get(url_to_fetch)
if res.status_code != 200:
log.error('Bad status %s from util API when checking subreddit %s', res.status_code, subreddit.name)
raise UtilApiException(f'Bad status {res.status_code} checking {subreddit_name}')

subreddit_data = res.json()['data']
subreddit.subscribers = subreddit_data['subscribers'] or 0
subreddit.nsfw = subreddit_data['over18'] or False
subreddit.last_checked = datetime.datetime.now(datetime.UTC)
uow.commit()
log.debug('Update subreddit data for %s. NSFW: %s - Subscribers: %s', subreddit.name, subreddit.nsfw, subreddit.subscribers)
except UtilApiException as e:
raise e
except Exception as e:
log.exception('')

@celery.task(bind=True, base=SqlAlchemyTask, ignore_reseults=True, serializer='pickle')
def save_subreddit(self, subreddit_name: str):
try:
with self.uowm.start() as uow:
existing = uow.subreddit.get_by_name(subreddit_name)
if existing:
log.debug('Subreddit %s already exists', subreddit_name)
return
subreddit = Subreddit(name=subreddit_name)
uow.subreddit.add(subreddit)
uow.commit()
log.debug('Saved Subreddit %s', subreddit_name)
update_subreddit_data.apply_async((subreddit_name,))
except Exception as e:
log.exception('')
13 changes: 12 additions & 1 deletion redditrepostsleuth/core/celery/tasks/scheduled_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from functools import wraps
from time import perf_counter

import requests
from prawcore import TooManyRequests, Redirect, ServerError, NotFound

from redditrepostsleuth.adminsvc.bot_comment_monitor import BotCommentMonitor
Expand All @@ -14,6 +15,7 @@
from redditrepostsleuth.core.celery.task_logic.scheduled_task_logic import update_proxies, token_checker, \
run_update_top_reposters, update_top_reposters, update_monitored_sub_data, run_update_top_reposts
from redditrepostsleuth.core.db.databasemodels import StatsDailyCount
from redditrepostsleuth.core.exception import UtilApiException
from redditrepostsleuth.core.logging import configure_logger
from redditrepostsleuth.core.util.helpers import chunk_list

Expand Down Expand Up @@ -321,4 +323,13 @@ def queue_search_history_cleanup(self):
log.info('Queuing Search History Cleanup. Range: ID Range: %s:%s', searches[0].id, searches[-1].id)
ids = [x[0] for x in searches]
for chunk in chunk_list(ids, 5000):
delete_search_batch.apply_async((chunk,))
delete_search_batch.apply_async((chunk,))

@celery.task(bind=True, base=RedditTask, autoretry_for=(UtilApiException,), retry_kwards={'max_retries': 5})
@record_task_status
def queue_subreddit_data_updates(self) -> None:
with self.uowm.start() as uow:
subreddits_to_update = uow.subreddit.get_subreddits_to_update()
for subreddit in subreddits_to_update:
celery.send_task('redditrepostsleuth.core.celery.tasks.maintenance_tasks.save_subreddit',
args=[subreddit.name])
18 changes: 17 additions & 1 deletion redditrepostsleuth/core/db/databasemodels.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ class Repost(Base):
Index('idx_repost_by_type', 'post_type_id', 'detected_at', unique=False),
Index('idx_repost_of_date', 'author', 'detected_at',unique=False),
Index('idx_repost_by_subreddit', 'subreddit', 'post_type_id', 'detected_at', unique=False),
Index('idx_repost_by_author', 'author', unique=False),
)
id = Column(Integer, primary_key=True)
post_id = Column(Integer, ForeignKey('post.id'))
Expand Down Expand Up @@ -758,4 +759,19 @@ class UserReview(Base):
content_links_found = Column(Boolean, default=False)
added_at = Column(DateTime, default=func.utc_timestamp(), nullable=False)
notes = Column(String(150))
last_checked = Column(DateTime, default=func.utc_timestamp())
last_checked = Column(DateTime, default=func.utc_timestamp())

class Subreddit(Base):
__tablename__ = 'subreddit'
__table_args__ = (
Index('idx_subreddit_name', 'name'),
)
id = Column(Integer, primary_key=True)
name = Column(String(25), nullable=False, unique=True)
subscribers = Column(Integer, nullable=False, default=0)
nsfw = Column(Boolean, nullable=False, default=False)
added_at = Column(DateTime, default=func.utc_timestamp(), nullable=False)
bot_banned = Column(Boolean, nullable=False, default=False)
bot_banned_at = Column(DateTime)
last_ban_check = Column(DateTime)
last_checked = Column(DateTime)
3 changes: 3 additions & 0 deletions redditrepostsleuth/core/db/repository/repost_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ def get_all(self, limit: int = None, offset: int = None) -> List[Repost]:
def get_all_by_type(self, post_type_id: int, limit: None, offset: None) -> list[Repost]:
return self.db_session.query(Repost).filter(Repost.post_type_id == post_type_id).order_by(Repost.id.desc()).offset(offset).limit(limit).all()

def get_by_author(self, author: str) -> List[Repost]:
return self.db_session.query(Repost).filter(Repost.author == author).all()

def get_all_without_author(self, limit: int = None, offset: int = None):
return self.db_session.query(Repost).filter(Repost.author == None).order_by(Repost.id.desc()).offset(offset).limit(limit).all()

Expand Down
20 changes: 20 additions & 0 deletions redditrepostsleuth/core/db/repository/subreddit_repo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import datetime

from sqlalchemy import or_

from redditrepostsleuth.core.db.databasemodels import Subreddit


class SubredditRepo:
def __init__(self, db_session):
self.db_session = db_session

def add(self, item):
self.db_session.add(item)

def get_by_name(self, name: str):
return self.db_session.query(Subreddit).filter(Subreddit.name == name).first()

def get_subreddits_to_update(self, limit: int = None, offset: int = None) -> list[Subreddit]:
delta = datetime.datetime.now(datetime.UTC) - datetime.timedelta(days=3)
return self.db_session.query(Subreddit).filter(or_(Subreddit.added_at < delta, Subreddit.last_checked == None)).limit(limit).offset(offset).all()
8 changes: 7 additions & 1 deletion redditrepostsleuth/core/db/uow/unitofwork.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from sqlalchemy.orm import scoped_session

from redditrepostsleuth.core.db.databasemodels import Subreddit
from redditrepostsleuth.core.db.repository.banned_subreddit_repo import BannedSubredditRepo
from redditrepostsleuth.core.db.repository.banned_user_repo import BannedUserRepo
from redditrepostsleuth.core.db.repository.bot_private_message_repo import BotPrivateMessageRepo
Expand Down Expand Up @@ -27,6 +28,7 @@
from redditrepostsleuth.core.db.repository.stat_daily_count_repo import StatDailyCountRepo
from redditrepostsleuth.core.db.repository.stat_top_repost_repo import StatTopRepostRepo
from redditrepostsleuth.core.db.repository.stats_top_reposter_repo import StatTopReposterRepo
from redditrepostsleuth.core.db.repository.subreddit_repo import SubredditRepo
from redditrepostsleuth.core.db.repository.summonsrepository import SummonsRepository
from redditrepostsleuth.core.db.repository.user_report_repo import UserReportRepo
from redditrepostsleuth.core.db.repository.user_review_repo import UserReviewRepo
Expand Down Expand Up @@ -175,4 +177,8 @@ def post_type(self) -> PostTypeRepo:

@property
def user_whitelist(self) -> UserWhitelistRepo:
return UserWhitelistRepo(self.session)
return UserWhitelistRepo(self.session)

@property
def subreddit(self) -> SubredditRepo:
return SubredditRepo(self.session)

0 comments on commit 0fc5a11

Please sign in to comment.