Skip to content

Commit

Permalink
[lofter]: add initial support
Browse files Browse the repository at this point in the history
  • Loading branch information
hdk5 committed Dec 7, 2024
1 parent d9bbe3b commit 32907e0
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 0 deletions.
1 change: 1 addition & 0 deletions gallery_dl/extractor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
"lexica",
"lightroom",
"livedoor",
"lofter",
"luscious",
"lynxchan",
"mangadex",
Expand Down
150 changes: 150 additions & 0 deletions gallery_dl/extractor/lofter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
# -*- coding: utf-8 -*-

# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.

"""Extractors for https://www.lofter.com/"""

from .common import Extractor, Message
from .. import text, util, exception


class LofterExtractor(Extractor):
"""Base class for lofter extractors"""
category = "lofter"
root = "https://www.lofter.com"
directory_fmt = ("{category}", "{blog_name}")
filename_fmt = "{id}_{num}.{extension}"
archive_fmt = "{id}_{num}"

def _init(self):
self.api = LofterAPI(self)

def items(self):
for post in self.posts():
if "post" in post:
post = post["post"]

post["blog_name"] = post["blogInfo"]["blogName"]

post_type = post["type"]
image_urls = []

# Article
if post_type == 1:
content = post["content"]
image_urls = text.extract_iter(content, '<img src="', '"')
image_urls = [text.unescape(x) for x in image_urls]
image_urls = [x.partition("?")[0] for x in image_urls]

# Photo
elif post_type == 2:
photo_links = util.json_loads(post["photoLinks"])
image_urls = [x["orign"] for x in photo_links]
image_urls = [x.partition("?")[0] for x in image_urls]

# Video
elif post_type == 4:
embed = util.json_loads(post["embed"])
image_urls = [embed["originUrl"]]

# Answer
elif post_type == 5:
images = util.json_loads(post["images"])
image_urls = [x["orign"] for x in images]
image_urls = [x.partition("?")[0] for x in image_urls]

else:
self.log.warning(
"%s: Unsupported post type '%s'.",
post["id"], post_type)

post["count"] = len(image_urls)
yield Message.Directory, post
for post["num"], url in enumerate(image_urls, 1):
yield Message.Url, url, text.nameext_from_url(url, post)

def posts(self):
return ()


class LofterPostExtractor(LofterExtractor):
"""Extractor for a lofter post"""
subcategory = "post"
pattern = r"(?:https?://)?[\w-]+\.lofter\.com/post/([0-9a-f]+)_([0-9a-f]+)"
example = "https://blog_name.lofter.com/post/12345678_90abcdef"

def posts(self):
blog_id, post_id = self.groups
post = self.api.post(int(blog_id, 16), int(post_id, 16))
return (post,)


class LofterBlogPostsExtractor(LofterExtractor):
"""Extractor for a lofter blog's posts"""
subcategory = "blog-posts"
pattern = (r"(?:https?://)?(?:"
# https://www.lofter.com/front/blog/home-page/<blog_name>
r"www\.lofter\.com/front/blog/home-page/([\w-]+)|"
# https://<blog_name>.lofter.com/
r"([\w-]+)\.lofter\.com"
r")")
example = "https://blog_name.lofter.com/"

def posts(self):
blog_name = self.groups[0] or self.groups[1]
posts = self.api.blog_posts(blog_name)
return posts


class LofterAPI():
def __init__(self, extractor):
self.extractor = extractor

def _call(self, endpoint, data):
url = "https://api.lofter.com{}".format(endpoint)
params = {
'product': 'lofter-android-7.9.10'
}
response = self.extractor.request(
url, method="POST", params=params, data=data)
info = response.json()

if info["meta"]["status"] != 200:
self.extractor.log.debug("Server response: %s", data)
raise exception.StopExtraction("API request failed")

return data

def blog_posts(self, blog_name):
endpoint = "/v2.0/blogHomePage.api"
params = {
"method": "getPostLists",
"offset": 0,
"limit": 200,
"blogdomain": "{}.lofter.com".format(blog_name),
}

while True:
data = self._call(endpoint, params)
posts = data["response"]["posts"]

if not posts:
break

for post in posts:
yield post

params["offset"] = data["response"]["offset"]

def post(self, blog_id, post_id):
endpoint = "/oldapi/post/detail.api"
params = {
"targetblogid": blog_id,
"postid": post_id,
}
data = self._call(endpoint, params)
posts = data["response"]["posts"]
post = posts[0]
return post

0 comments on commit 32907e0

Please sign in to comment.