Skip to content
This repository has been archived by the owner on Aug 27, 2023. It is now read-only.

Commit

Permalink
Add local downloader
Browse files Browse the repository at this point in the history
  • Loading branch information
joeywang4 committed Aug 7, 2023
1 parent 78dcc02 commit 9ba010e
Show file tree
Hide file tree
Showing 14 changed files with 388 additions and 72 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,4 @@ cython_debug/
# SQLite database
*.db
*.bin
output/
2 changes: 2 additions & 0 deletions PubuPoC/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@
from .fetcher import Fetcher
from .page_crawler import PageCrawler
from .book_crawler import BookCrawler
from .downloader import Downloader
from .main import Main
5 changes: 2 additions & 3 deletions PubuPoC/book_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ def parse_book(res1: Response, res2: Response, book_id: int) -> Book or None:
class BookCrawler(Fetcher):
"""Crawl books"""

def __init__(self, db_path: str = "") -> None:
self.database = DB(db_path) if db_path != "" else DB()
def __init__(self, database: DB) -> None:
super().__init__()
self.database = database
self.url1 = (
"https://www.pubu.com.tw/api/flex/3.0/book/{}?referralType=ORDER_ITEM"
)
Expand All @@ -83,7 +83,6 @@ def get_last_id(self) -> int:
def checkpoint(self, result: list[Book], thread_id: int) -> None:
"""Save books into DB"""
for book in result:
self.count += 1
if book.error == 0 and book.book_id > self.workload.last_success_id:
self.workload.last_success_id = book.book_id
self.database.update_books(result)
Expand Down
56 changes: 56 additions & 0 deletions PubuPoC/downloader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""Book downloader"""
import os
from shutil import rmtree
from requests import Session
from .fetcher import Fetcher
from .util import Mode


class Downloader(Fetcher):
"""Book downloader"""

def __init__(self, path: str = "output/tmp/") -> None:
super().__init__()
self.path = path
self.ignored_errors = []
self.urls = []
self.num_threads = 25
self.workload.job_size = 10

def clean_up(self) -> None:
"""Delete tmp files if terminated"""
if self.terminated:
self.rmdir()
return super().clean_up()

def job_worker(self, job: tuple[int, int], thread_id: int) -> None:
"""Fetch images of pages"""
with Session() as session:
for i in range(job[0], job[1]):
if self.terminated:
return
self.progress[thread_id] = i
url = self.urls[i]
got = self.get(url, session)

with open(f"{self.path}/{str(i + 1).rjust(4, '0')}.jpg", "wb") as ofile:
for chunk in got.iter_content(chunk_size=8192):
ofile.write(chunk)

def rmdir(self):
"""Remove output dir"""
try:
rmtree(self.path)
except FileNotFoundError:
pass

def mkdir(self):
"""Create output dir"""
os.makedirs(self.path, exist_ok=True)

def download(self, urls: list):
"""Clean up output dir and download files"""
self.rmdir()
self.mkdir()
self.urls = urls
self.start(Mode.FIXED, len(urls))
88 changes: 29 additions & 59 deletions PubuPoC/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
import sys
import threading
import time
from enum import Enum
import requests
from .util import Counter, Workload, Mode

SPACES = 150


class Worker:
"""
Use python threads to work on jobs.
Expand Down Expand Up @@ -70,7 +71,9 @@ def terminate_threads(self) -> None:
if self.terminated:
return
self.terminated = True
print("[!] Received keyboard interrupt, terminating threads...".ljust(SPACES, " "))
print(
"[!] Received keyboard interrupt, terminating threads...".ljust(SPACES, " ")
)

def spawn_threads(self) -> None:
"""Create threads"""
Expand Down Expand Up @@ -112,51 +115,6 @@ def join_threads(self) -> None:
self.clean_up()


class Mode(Enum):
"""Crawler execution modes"""

FIXED = 1
UPDATE = 2
SEARCH = 3


class Workload:
"""Generate jobs for threads"""

def __init__(self) -> None:
self.job_size = 50
self.mode = Mode.UPDATE
# next id to fetch
self.next_id = -1
self.end_id = -1
# state in update mode
self.last_success_id = -1
self.max_error = 100000

def clean_up(self) -> None:
"""Reset states"""
self.next_id = self.end_id = self.last_success_id = -1

def get_job(self) -> tuple[int, int] or None:
"""Get next job for a worker"""
if self.mode == Mode.FIXED:
assert self.end_id != -1
# get next job until `end_id`
if self.next_id >= self.end_id:
return None
next_job = (self.next_id, min(self.next_id + self.job_size, self.end_id))
self.next_id = next_job[1]
return next_job

if self.mode == Mode.UPDATE:
if self.next_id - self.last_success_id > self.max_error:
return None
self.next_id += self.job_size
return (self.next_id - self.job_size, self.next_id)

raise NotImplementedError(f"[!] Mode {self.mode} is not supported")


class Fetcher(Worker):
"""Use worker to send GET requests"""

Expand All @@ -171,8 +129,8 @@ def __init__(self) -> None:

# stats
self.workload = Workload()
self.progress = [0 for _ in range(self.num_threads)]
self.count = 0
self.counter = Counter()
self.progress = []

def get(
self, url: str, session=requests, headers: dict = {}
Expand Down Expand Up @@ -207,8 +165,13 @@ def get(
# stop retry loop for 200 and 4XX responses
break

self.counter.inc()
return res

def checkpoint(self, result, thread_id: int):
"""Handle output from a job worker"""
return

def get_job(self) -> tuple[int, int] or None:
"""Get a job from workload"""
return self.workload.get_job()
Expand All @@ -219,27 +182,33 @@ def get_last_id(self) -> int:

def clean_up(self) -> None:
"""Print stats and reset states"""
msg = f"[*] Fetched {self.count} requests."
msg += f" Last id is now {self.get_last_id()}."
msg = f"[*] Fetched {self.counter.count} requests."
if self.workload.mode == Mode.UPDATE:
msg += f" Last id is now {self.get_last_id()}."
print(msg.ljust(SPACES, " "))

self.progress = [0 for _ in range(self.num_threads)]
self.count = 0
self.workload.clean_up()

return super().clean_up()

def status(self):
"""Report the current crawling status"""
progress = ", ".join(
[f"T{i + 1}-{self.progress[i]}" for i in range(self.num_threads)]
)
if self.workload.mode == Mode.FIXED:
progress = f"{self.counter.count}/{self.workload.end_id}"
else:
# for Mode.UPDATE
if self.num_threads <= 8:
progress = ", ".join(
[f"T{i + 1}-{self.progress[i]}" for i in range(self.num_threads)]
)
else:
progress = f"ID - {min(self.progress)}"
progress += " (" + f"{self.counter.report():.2f}" + " req/s)"
print("[*] Crawling: " + progress, end="\r", file=sys.stdout, flush=True)

def start(self, mode: Mode = Mode.UPDATE, end_id: int = -1) -> None:
"""
Start crawling pages
Init states and start crawling
"""
self.progress = [0 for _ in range(self.num_threads)]
self.workload.next_id = self.get_last_id() + 1
self.workload.last_success_id = self.workload.next_id - 1
self.workload.mode = mode
Expand All @@ -248,5 +217,6 @@ def start(self, mode: Mode = Mode.UPDATE, end_id: int = -1) -> None:
self.workload.end_id = end_id

print(f"[*] Start fetching from ID: {self.workload.next_id}")
self.counter.start()
self.spawn_threads()
self.join_threads()
72 changes: 72 additions & 0 deletions PubuPoC/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""The main module"""
from . import DB, RawPages, PageCrawler, BookCrawler, Downloader
from .util import Converter, gen_pdf


class Main:
"""The main class"""

def __init__(
self,
database: str = "data/pubu.db",
raw_pages: str = "data/",
output: str = "output/",
verbose: bool = False,
change_decode: bool = False,
) -> None:
self.database = DB(database)
self.raw_pages = RawPages(raw_pages)
self.output = output
self.verbose = verbose

# sub-modules
self.page_crawler = PageCrawler(self.database, self.raw_pages)
self.book_crawler = BookCrawler(self.database)
self.downloader = Downloader(output + "tmp/")
self.converter = Converter(output + "tmp/", change_decode)

def download(self, book_id: int) -> None:
"""Download a book"""
if self.verbose:
print(f"[*] Downloading book with book_id {book_id}")
book = self.database.search_book(book_id)
if book is None or book.error > 0:
if self.verbose:
cause = "not found in database" if book is None else "invalid"
print(f"[!] Book is {cause}. Fetching online information...")

book = self.book_crawler.job_worker([book_id, book_id + 1], 0)[0]
if book.error > 0 or book.doc_id == 0 or book.pages == 0:
print(f"[!] Online info is invalid - book info: {book.to_tuple()}")
return

if self.verbose:
print(f"[*] Found book: {book.to_tuple()}")
print("[*] Getting pages...")

pages = self.database.get_pages(book.doc_id)
if len(pages) < book.pages:
lack = book.pages - len(pages)
if self.verbose:
print(f"[!] Missing {lack} pages, continue in search mode")
raise NotImplementedError("Search mode is not implemented")
elif len(pages) > book.pages:
if self.verbose:
print(f"[!] Extra {len(pages) - book.pages} pages in local files.")
pages = pages[: book.pages]

# download pages
if self.verbose:
print(f"[*] Downloading {len(pages)} pages...")
self.downloader.download([page.to_url(self.database) for page in pages])

# convert images
if self.verbose:
print("[*] Converting images...")
self.converter.convert(book.doc_id)

# generate PDF
if self.verbose:
print("[*] Generating PDF...")
gen_pdf(self.output + "/tmp", self.output, book.title)
self.downloader.rmdir()
11 changes: 6 additions & 5 deletions PubuPoC/page_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@ def to_input(page_id: int) -> str:
class PageCrawler(Fetcher):
"""Crawl pages"""

def __init__(self, db_path: str = "", raw_path: str = "") -> None:
def __init__(self, database: DB, raw_pages: RawPages) -> None:
super().__init__()
self.url = (
"https://www.pubu.com.tw/api/flex/3.0/page/{}/1/reader/jpg?productId=1"
)
self.database = DB(db_path) if db_path != "" else DB()
self.raw_pages = RawPages(raw_path) if raw_path != "" else RawPages()
super().__init__()
self.database = database
self.raw_pages = raw_pages
self.num_threads = 20
self.workload.job_size = 200

def get_last_id(self) -> int:
"""Get last avail page id in DB"""
Expand All @@ -63,7 +65,6 @@ def checkpoint(self, result: list[Page], thread_id: int) -> None:
"""Save pages into raw page records"""
for page in result:
self.raw_pages.write_page(page)
self.count += 1
if page.error == 0 and page.page_id > self.workload.last_success_id:
self.workload.last_success_id = page.page_id
self.raw_pages.sync_db(self.database)
Expand Down
5 changes: 5 additions & 0 deletions PubuPoC/util/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Utilities"""
from .counter import Counter
from .workload import Workload, Mode
from .converter import Converter
from .pdf import gen_pdf
Loading

0 comments on commit 9ba010e

Please sign in to comment.