diff --git a/.env b/.env index 20f7176..bd538df 100644 --- a/.env +++ b/.env @@ -19,8 +19,6 @@ QWEATHER_APIKEY CRAZY_AUTO_UPDATE=true # 网易云ncm配置 -# 上传歌单时是否压缩 -ncm_playlist_zip=True # 手机号 ncm_phone # 密码 diff --git a/src/plugins/ncm/__init__.py b/src/plugins/ncm/__init__.py index b884e9c..164a893 100644 --- a/src/plugins/ncm/__init__.py +++ b/src/plugins/ncm/__init__.py @@ -1,33 +1,33 @@ -from pathlib import Path +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- from typing import Tuple, Any, Union + from nonebot import on_regex, on_command, on_message -from nonebot.adapters.onebot.v11 import (Message, Bot, - MessageSegment, - GroupMessageEvent, - PrivateMessageEvent) +from nonebot.adapters.onebot.v11 import ( + Message, + Bot, + MessageSegment, + GroupMessageEvent, + PrivateMessageEvent, +) from nonebot.log import logger from nonebot.matcher import Matcher from nonebot.params import CommandArg, RegexGroup, Arg -from nonebot.rule import Rule from nonebot.plugin import PluginMetadata +from nonebot.rule import Rule +from .config import Config from .data_source import nncm, ncm_config, setting, Q, cmd -# =======nonebot-plugin-help======= __plugin_meta__ = PluginMetadata( - name='网易云点歌', - description='网易云无损音乐下载/点歌', - usage=( - '将网易云歌曲/歌单分享到群聊即可自动解析\n' - '回复分享消息 + 文字`下载` 即可开始下载歌曲并上传到群文件(需要稍等一会)\n' - '指令:\n' - f'开启下载:{cmd}ncm t\n' - f'关闭下载:{cmd}ncm f\n' - f'点歌:{cmd}点歌 歌名' - ), - extra={'version': '1.5.0'} + name="网易云无损音乐下载", + description="基于go-cqhttp与nonebot2的 网易云无损音乐下载", + usage=("将网易云歌曲/歌单分享到群聊即可自动解析\n" "回复分享消息 + 文字`下载` 即可开始下载歌曲并上传到群文件(需要稍等一会)"), + config=Config, + type="application", + homepage="https://github.com/kitUIN/nonebot-plugin-ncm", + supported_adapters={"~onebot.v11"}, ) - # ========nonebot-plugin-ncm====== # ===========Constant============= TRUE = ["True", "T", "true", "t"] @@ -40,64 +40,67 @@ async def song_is_open(event: Union[GroupMessageEvent, PrivateMessageEvent]) -> if isinstance(event, GroupMessageEvent): if info := setting.search(Q["group_id"] == event.group_id): return info[0]["song"] - setting.insert({"group_id": event.group_id, - "song": False, "list": False}) + setting.insert({"group_id": event.group_id, "song": False, "list": False}) return False - if isinstance(event, PrivateMessageEvent): + elif isinstance(event, PrivateMessageEvent): if info := setting.search(Q["user_id"] == event.user_id): return info[0]["song"] setting.insert({"user_id": event.user_id, "song": True, "list": True}) return True -async def playlist_is_open(event: Union[GroupMessageEvent, PrivateMessageEvent]) -> bool: +async def playlist_is_open( + event: Union[GroupMessageEvent, PrivateMessageEvent] +) -> bool: if isinstance(event, GroupMessageEvent): - if info := setting.search(Q["group_id"] == event.group_id): + info = setting.search(Q["group_id"] == event.group_id) + if info: return info[0]["list"] - setting.insert({"group_id": event.group_id, - "song": False, "list": False}) - return False - if isinstance(event, PrivateMessageEvent): - if info := setting.search(Q["user_id"] == event.user_id): + else: + setting.insert({"group_id": event.group_id, "song": False, "list": False}) + return False + elif isinstance(event, PrivateMessageEvent): + info = setting.search(Q["user_id"] == event.user_id) + if info: return info[0]["list"] - setting.insert({"user_id": event.user_id, "song": True, "list": True}) - return True + else: + setting.insert({"user_id": event.user_id, "song": True, "list": True}) + return True async def check_search() -> bool: - if info := setting.search(Q["global"] == "search"): + info = setting.search(Q["global"] == "search") + if info: return info[0]["value"] - setting.insert({"global": "search", "value": True}) - return True + else: + setting.insert({"global": "search", "value": True}) + return True async def music_set_rule(event: Union[GroupMessageEvent, PrivateMessageEvent]) -> bool: # 权限设置 - return event.sender.role in ADMIN[:ncm_config.ncm_admin_level] or event.get_user_id() in ncm_config.superusers + return ( + event.sender.role in ADMIN[: ncm_config.ncm_admin_level] + or event.get_user_id() in ncm_config.superusers + ) async def music_reply_rule(event: Union[GroupMessageEvent, PrivateMessageEvent]): + # logger.info(event.get_plaintext()) return event.reply and event.get_plaintext().strip() == "下载" # ============Matcher============= -ncm_set = on_command("ncm", - rule=Rule(music_set_rule), - block=False) -'''功能设置''' -music_regex = on_regex(r"(song|url)\?id=([0-9]+)(|&)", - block=False) -'''歌曲id识别''' -playlist_regex = on_regex(r"playlist\?id=([0-9]+)&", - block=False) -'''歌单识别''' -music_reply = on_message( - rule=Rule(music_reply_rule), - block=False) -'''回复下载''' -search = on_command("点歌", - rule=Rule(check_search), block=False) -'''点歌''' +ncm_set = on_command("ncm", rule=Rule(music_set_rule), priority=1, block=False) +"""功能设置""" +music_regex = on_regex(r"(song|url)\?id=([0-9]+)(|&)", priority=2, block=False) +"""歌曲id识别""" +playlist_regex = on_regex(r"playlist\?id=([0-9]+)(|&)", priority=2, block=False) +"""歌单识别""" +music_reply = on_message(priority=2, rule=Rule(music_reply_rule), block=False) +"""回复下载""" +search = on_command("点歌", rule=Rule(check_search), priority=2, block=False) +"""点歌""" @search.handle() @@ -107,14 +110,17 @@ async def search_receive(matcher: Matcher, args: Message = CommandArg()): @search.got("song", prompt="要点什么歌捏?") -async def receive_song(bot: Bot, - event: Union[GroupMessageEvent, PrivateMessageEvent], - song: Message = Arg(), - ): - nncm.get_session(bot, event) - _id = await nncm.search_song(keyword=str(song), limit=1) - message_id = await bot.send(event=event, message=Message(MessageSegment.music(type_="163", id_=_id))) +async def receive_song( + bot: Bot, + event: Union[GroupMessageEvent, PrivateMessageEvent], + song: Message = Arg(), +): + _id = await nncm.search_song(keyword=song.extract_plain_text(), limit=1) + message_id = await bot.send( + event=event, message=Message(MessageSegment.music(type_="163", id_=_id)) + ) nncm.get_song(message_id=message_id["message_id"], nid=_id) + # try: # except ActionFailed as e: # logger.error(e.info) @@ -122,155 +128,175 @@ async def receive_song(bot: Bot, @music_regex.handle() -async def music_receive(bot: Bot, event: Union[GroupMessageEvent, PrivateMessageEvent], - regroup: Tuple[Any, ...] = RegexGroup()): +async def music_receive( + bot: Bot, + event: Union[GroupMessageEvent, PrivateMessageEvent], + regroup: Tuple[Any, ...] = RegexGroup(), +): nid = regroup[1] logger.info(f"已识别NID:{nid}的歌曲") - nncm.get_session(bot, event) - nncm.get_song(nid) + + nncm.get_song(nid=nid, message_id=event.message_id) @playlist_regex.handle() -async def music_list_receive(bot: Bot, event: Union[GroupMessageEvent, PrivateMessageEvent], - regroup: Tuple[Any, ...] = RegexGroup()): +async def music_list_receive( + bot: Bot, + event: Union[GroupMessageEvent, PrivateMessageEvent], + regroup: Tuple[Any, ...] = RegexGroup(), +): lid = regroup[0] logger.info(f"已识别LID:{lid}的歌单") - nncm.get_session(bot, event) - nncm.get_playlist(lid=lid) + nncm.get_playlist(lid=lid, message_id=event.message_id) @music_reply.handle() -async def music_reply_receive(bot: Bot, event: Union[GroupMessageEvent, PrivateMessageEvent]): - nncm.get_session(bot, event) - info = nncm.check_message() +async def music_reply_receive( + bot: Bot, event: Union[GroupMessageEvent, PrivateMessageEvent] +): + info = nncm.check_message(int(event.dict()["reply"]["message_id"])) if info is None: return if info["type"] == "song" and await song_is_open(event): - await bot.send(event=event, message="好吧好吧,你等等我!") - await nncm.download(ids=[int(info["nid"])]) - data = await nncm.music_check(info["nid"]) - if data: - if isinstance(event, GroupMessageEvent): - await nncm.upload_group_data_file(data) - elif isinstance(event, PrivateMessageEvent): - await nncm.upload_private_data_file(data) - else: - logger.error("数据库中未有该音乐地址数据") - + await bot.send(event=event, message="少女祈祷中🙏...上传时间较久,请勿重复发送命令") + await nncm.music_check(info["nid"], event) elif info["type"] == "playlist" and await playlist_is_open(event): - await bot.send(event=event, message=info["lmsg"]+"\n下载中,上传时间较久,请勿重复发送命令") - not_zips = await nncm.download(ids=info["ids"], lid=info["lid"], is_zip=ncm_config.ncm_playlist_zip) - filename = f"{info['lid']}.zip" - data = Path.cwd().joinpath("music").joinpath(filename) - if ncm_config.ncm_playlist_zip: - logger.debug(f"Upload:{filename}") - if isinstance(event, GroupMessageEvent): - await nncm.upload_group_file(file=str(data), name=filename) - elif isinstance(event, PrivateMessageEvent): - await nncm.upload_private_file(file=str(data), name=filename) - else: - for i in not_zips: - file = i["file"] - filename = i["filename"] - logger.debug(f"Upload:{filename}") - if isinstance(event, GroupMessageEvent): - await nncm.upload_group_file(file=file, name=filename) - elif isinstance(event, PrivateMessageEvent): - await nncm.upload_private_file(file=file, name=filename) + await bot.send(event=event, message=info["lmsg"] + "\n下载中,上传时间较久,请勿重复发送命令") + await nncm.music_check(info["ids"], event, info["lid"]) @ncm_set.handle() -async def set_receive(bot: Bot, event: Union[GroupMessageEvent, PrivateMessageEvent], - args: Message = CommandArg()): # 功能设置接收 +async def set_receive( + bot: Bot, + event: Union[GroupMessageEvent, PrivateMessageEvent], + args: Message = CommandArg(), +): # 功能设置接收 logger.debug( - f"权限为{event.sender.role}的用户<{event.sender.nickname}>尝试使用命令{cmd}ncm {args}") + f"权限为{event.sender.role}的用户<{event.sender.nickname}>尝试使用命令{cmd}ncm {args}" + ) if args: args = str(args).split() if len(args) == 1: mold = args[0] if isinstance(event, GroupMessageEvent): - if info := setting.search(Q["group_id"] == event.group_id): + info = setting.search(Q["group_id"] == event.group_id) + # logger.info(info) + if info: if mold in TRUE: info[0]["song"] = True info[0]["list"] = True - setting.update( - info[0], Q["group_id"] == event.group_id) - await bot.send(event=event, message=Message(MessageSegment.text("已开启自动下载功能"))) + setting.update(info[0], Q["group_id"] == event.group_id) + msg = "已开启自动下载功能" + await bot.send( + event=event, message=Message(MessageSegment.text(msg)) + ) elif mold in FALSE: info[0]["song"] = False info[0]["list"] = False - setting.update( - info[0], Q["group_id"] == event.group_id) + setting.update(info[0], Q["group_id"] == event.group_id) msg = "已关闭自动下载功能" - await bot.send(event=event, message=Message(MessageSegment.text(msg))) + await bot.send( + event=event, message=Message(MessageSegment.text(msg)) + ) logger.debug(f"用户<{event.sender.nickname}>执行操作成功") - elif mold in TRUE: - setting.insert({"group_id": event.group_id, - "song": True, "list": True}) - elif mold in FALSE: - setting.insert({"group_id": event.group_id, - "song": False, "list": False}) + else: + if mold in TRUE: + setting.insert( + {"group_id": event.group_id, "song": True, "list": True} + ) + elif mold in FALSE: + setting.insert( + {"group_id": event.group_id, "song": False, "list": False} + ) elif isinstance(event, PrivateMessageEvent): - if info := setting.search(Q["user_id"] == event.user_id): + info = setting.search(Q["user_id"] == event.user_id) + # logger.info(info) + if info: if mold in TRUE: info[0]["song"] = True info[0]["list"] = True setting.update(info[0], Q["user_id"] == event.user_id) - await bot.send(event=event, message=Message(MessageSegment.text("已开启下载功能"))) + msg = "已开启下载功能" + await bot.send( + event=event, message=Message(MessageSegment.text(msg)) + ) elif mold in FALSE: info[0]["song"] = False info[0]["list"] = False setting.update(info[0], Q["user_id"] == event.user_id) msg = "已关闭下载功能" - await bot.send(event=event, message=Message(MessageSegment.text(msg))) + await bot.send( + event=event, message=Message(MessageSegment.text(msg)) + ) logger.debug(f"用户<{event.sender.nickname}>执行操作成功") - elif mold in TRUE: - setting.insert({"user_id": event.user_id, - "song": True, "list": True}) - elif mold in FALSE: - setting.insert({"user_id": event.user_id, - "song": False, "list": False}) + else: + if mold in TRUE: + setting.insert( + {"user_id": event.user_id, "song": True, "list": True} + ) + elif mold in FALSE: + setting.insert( + {"user_id": event.user_id, "song": False, "list": False} + ) elif len(args) == 2 and args[0] == "search": mold = args[1] - if info := setting.search(Q["global"] == "search"): + info = setting.search(Q["global"] == "search") + if info: if mold in TRUE: info[0]["value"] = True setting.update(info[0], Q["global"] == "search") - await bot.send(event=event, message=Message(MessageSegment.text("已开启点歌功能"))) + msg = "已开启点歌功能" + await bot.send( + event=event, message=Message(MessageSegment.text(msg)) + ) elif mold in FALSE: info[0]["value"] = False setting.update(info[0], Q["global"] == "search") msg = "已关闭点歌功能" - await bot.send(event=event, message=Message(MessageSegment.text(msg))) + await bot.send( + event=event, message=Message(MessageSegment.text(msg)) + ) logger.debug(f"用户<{event.sender.nickname}>执行操作成功") - elif mold in TRUE: - setting.insert({"global": "search", "value": True}) - elif mold in FALSE: - setting.insert({"global": "search", "value": False}) + else: + if mold in TRUE: + setting.insert({"global": "search", "value": True}) + elif mold in FALSE: + setting.insert({"global": "search", "value": False}) elif len(args) == 3 and args[0] == "private": qq = args[1] mold = args[2] - if info := setting.search(Q["user_id"] == qq): + info = setting.search(Q["user_id"] == qq) + # logger.info(info) + if info: if mold in TRUE: info[0]["song"] = True info[0]["list"] = True setting.update(info[0], Q["user_id"] == qq) msg = f"已开启用户{qq}的下载功能" - await bot.send(event=event, message=Message(MessageSegment.text(msg))) + await bot.send( + event=event, message=Message(MessageSegment.text(msg)) + ) elif mold in FALSE: info[0]["song"] = False info[0]["list"] = False setting.update(info[0], Q["user_id"] == qq) msg = f"已关闭用户{qq}的下载功能" - await bot.send(event=event, message=Message(MessageSegment.text(msg))) + await bot.send( + event=event, message=Message(MessageSegment.text(msg)) + ) logger.debug(f"用户<{event.sender.nickname}>执行操作成功") - elif mold in TRUE: - setting.insert({"user_id": event.user_id, - "song": True, "list": True}) - elif mold in FALSE: - setting.insert({"user_id": event.user_id, - "song": False, "list": False}) + else: + if mold in TRUE: + setting.insert( + {"user_id": event.user_id, "song": True, "list": True} + ) + elif mold in FALSE: + setting.insert( + {"user_id": event.user_id, "song": False, "list": False} + ) else: - msg = f"{cmd}ncm:获取命令菜单\r\n说明:网易云歌曲分享到群内后回复机器人即可下载\r\n" \ - f"{cmd}ncm t:开启解析\r\n{cmd}ncm f:关闭解析\n{cmd}点歌 歌名:点歌" + msg = ( + f"{cmd}ncm:获取命令菜单\r\n说明:网易云歌曲分享到群内后回复机器人即可下载\r\n" + f"{cmd}ncm t:开启解析\r\n{cmd}ncm f:关闭解析\n{cmd}点歌 歌名:点歌" + ) return await ncm_set.finish(message=MessageSegment.text(msg)) diff --git a/src/plugins/ncm/config.py b/src/plugins/ncm/config.py index 0f15b8e..db6723c 100644 --- a/src/plugins/ncm/config.py +++ b/src/plugins/ncm/config.py @@ -17,9 +17,8 @@ class Config(BaseModel, extra=Extra.ignore): ncm_password: str = "" '''密码''' - - ncm_playlist_zip: bool = False - '''上传歌单时是否压缩''' + ncm_bitrate: int = 320 + '''下载码率(单位K) 96及以下为m4a,320及以上为flac,中间mp3''' global_config = nonebot.get_driver().config diff --git a/src/plugins/ncm/data_source.py b/src/plugins/ncm/data_source.py index ed53919..8790902 100644 --- a/src/plugins/ncm/data_source.py +++ b/src/plugins/ncm/data_source.py @@ -1,41 +1,24 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -import zipfile -from pathlib import Path -from datetime import datetime -from typing import Union +import asyncio import re - -import qrcode import time -from aiofile import async_open +from datetime import datetime +from pathlib import Path +from typing import Union, List, Dict, cast -import httpx import nonebot -from nonebot.utils import run_sync - -from pyncm import ( - apis, - GetCurrentSession, - DumpSessionAsString, - LoadSessionFromString, - SetCurrentSession, -) -from pyncm.apis.cloudsearch import SONG, USER, PLAYLIST - +import qrcode +from nonebot.adapters.onebot.v11 import (MessageSegment, Message, + ActionFailed, NetworkError, Bot, + GroupMessageEvent, PrivateMessageEvent) from nonebot.log import logger -from nonebot.adapters.onebot.v11 import ( - MessageSegment, - Message, - ActionFailed, - NetworkError, - Bot, - GroupMessageEvent, - PrivateMessageEvent, -) +from nonebot.matcher import current_bot +from pyncm import apis, GetCurrentSession, DumpSessionAsString, LoadSessionFromString, SetCurrentSession +from pyncm.apis.cloudsearch import SONG, USER, PLAYLIST +from tinydb import TinyDB, Query from .config import ncm_config -from tinydb import TinyDB, Query # ============数据库导入============= dbPath = Path("db") @@ -55,74 +38,61 @@ cmd = list(nonebot.get_driver().config.command_start)[0] -class NcmLoginFailedException(Exception): - pass +class NcmLoginFailedException(Exception): pass # ============主类============= class Ncm: def __init__(self): self.api = apis - self.bot = None - self.event = None - - def get_session( - self, bot: Bot, event: Union[GroupMessageEvent, PrivateMessageEvent] - ): - self.bot = bot - self.event = event @staticmethod def save_user(session: str): - if info := ncm_user_cache.search(Q["uid"] == "user"): - info[0]["session"] = session + info = ncm_user_cache.search(Q["uid"] == "user") + if info: + info[0]['session'] = session ncm_user_cache.update(info[0], Q["uid"] == "user") else: ncm_user_cache.insert({"uid": "user", "session": session}) @staticmethod - def load_user(info): - SetCurrentSession(LoadSessionFromString(info[0]["session"])) + def load_user(session: str): + SetCurrentSession(LoadSessionFromString(session)) - def login(self): + def login(self) -> bool: try: - self.api.login.LoginViaCellphone( - phone=ncm_config.ncm_phone, password=ncm_config.ncm_password - ) + self.api.login.LoginViaCellphone(phone=ncm_config.ncm_phone, password=ncm_config.ncm_password) self.get_user_info() - + return True except Exception as e: - if str(e) != str({"code": 400, "message": "登陆失败,请进行安全验证"}): + if str(e) == str({'code': 400, 'message': '登陆失败,请进行安全验证'}): + logger.error("缺少安全验证,请将账号留空进行二维码登录") + logger.info("自动切换为二维码登录↓") + self.get_qrcode() + else: raise e - logger.error("缺少安全验证,请将账号留空进行二维码登录") - logger.info("自动切换为二维码登录↓") - self.get_qrcode() + return False - def get_user_info(self): - logger.success( - f"欢迎您网易云用户:{GetCurrentSession().nickname} [{GetCurrentSession().uid}]" - ) + def get_user_info(self) -> str: + message: str = f"欢迎您网易云用户:{GetCurrentSession().nickname} [{GetCurrentSession().uid}]"; + logger.success(message) self.save_user(DumpSessionAsString(GetCurrentSession())) + return message def get_phone_login(self): phone = ncm_config.ncm_phone ctcode = int(ncm_config.ncm_ctcode) - result = self.api.login.SetSendRegisterVerifcationCodeViaCellphone( - cell=phone, ctcode=ctcode - ) - if result.get("code", 0) != 200: + result = self.api.login.SetSendRegisterVerifcationCodeViaCellphone(cell=phone, ctcode=ctcode) + if not result.get('code', 0) == 200: logger.error(result) else: - logger.success("已发送验证码,输入验证码:") + logger.success('已发送验证码,输入验证码:') while True: captcha = int(input()) - verified = self.api.login.GetRegisterVerifcationStatusViaCellphone( - phone, captcha, ctcode - ) - if verified.get("code", 0) == 200: + verified = self.api.login.GetRegisterVerifcationStatusViaCellphone(phone, captcha, ctcode) + if verified.get('code', 0) == 200: break - result = self.api.login.LoginViaCellphone( - phone, captcha=captcha, ctcode=ctcode) + result = self.api.login.LoginViaCellphone(phone, captcha=captcha, ctcode=ctcode) self.get_user_info() def get_qrcode(self): @@ -138,11 +108,11 @@ def get_qrcode(self): uuid = self.api.login.LoginQrcodeUnikey()["unikey"] url = f"https://music.163.com/login?codekey={uuid}" img = qrcode.make(url) - img.save("ncm.png") + img.save('ncm.png') logger.info("二维码已经保存在当前目录下的ncm.png,请使用手机网易云客户端扫码登录。") while True: rsp = self.api.login.LoginQrcodeCheck(uuid) # 检测扫描状态 - if rsp["code"] in [803, 800]: + if rsp["code"] == 803 or rsp["code"] == 800: st = self.api.login.GetCurrentLoginStatus() logger.debug(st) self.api.login.WriteLoginInfo(st) @@ -150,196 +120,195 @@ def get_qrcode(self): return True time.sleep(1) - def detail(self, ids: list) -> list: + def detail_names(self, ids: List[int]) -> List[str]: songs: list = self.api.track.GetTrackDetail(song_ids=ids)["songs"] - return [ - (data["name"] + "-" + ",".join([names["name"] - for names in data["ar"]])) - for data in songs - ] + detail = [(data["name"] + "-" + ",".join([names["name"] for names in data["ar"]])) for data in songs] + return detail - async def music_check(self, nid): - nid = int(nid) - if not (info := music.search(Q["id"] == nid)): - return None - path = Path(info[0]["file"]) - return info[0] if path.is_file() else await self.download(ids=[nid], check=True) + @logger.catch() + def get_detail(self, ids: List[int]): + data: list = self.api.track.GetTrackAudio(song_ids=ids, bitrate=ncm_config.ncm_bitrate * 1000)["data"] + names: list = self.detail_names(ids) + for i in range(len(ids)): + data[i]['ncm_name'] = names[i] + return data + + async def music_check(self, nid: Union[int, List[int]], event: Union[GroupMessageEvent, PrivateMessageEvent], + lid: int = None): + """判断数据库中是否有缓存,有则使用缓存,没有则新下载""" + tasks = [] + if lid: + del_nid = [] + for i in nid: + info = music.search(Q["id"] == i) + if info: + try: + tasks.append(asyncio.create_task(self.upload_data_file(event=event, data=info[0]))) + del_nid.append(i) + except Exception: + continue + for j in del_nid: + nid.remove(j) + else: + nid = int(nid) + info = music.search(Q["id"] == nid) + if info: + try: + tasks.append(asyncio.create_task(self.upload_data_file(event=event, data=info[0]))) + return + except Exception as e: + if isinstance(e, ActionFailed) and e.info.get("retcode") != 10003: + logger.error(e) + return + if tasks: + await asyncio.gather(*tasks) + if nid: + if isinstance(nid, int): + nid = [nid] + await self.start_upload(ids=nid, event=event) async def search_song(self, keyword: str, limit: int = 1) -> int: # 搜索歌曲 - res = self.api.cloudsearch.GetSearchResult( - keyword=keyword, stype=SONG, limit=limit - ) + res = self.api.cloudsearch.GetSearchResult(keyword=keyword, stype=SONG, limit=limit) logger.debug(f"搜索歌曲{keyword},返回结果:{res}") - data = res["result"]["songs"] if "result" in res.keys() else res["songs"] + if "result" in res.keys(): + data = res["result"]["songs"] + else: + data = res["songs"] if data: return data[0]["id"] async def search_user(self, keyword: str, limit: int = 1): # 搜索用户 - self.api.cloudsearch.GetSearchResult( - keyword=keyword, stype=USER, limit=limit) + self.api.cloudsearch.GetSearchResult(keyword=keyword, stype=USER, limit=limit) async def search_playlist(self, keyword: str, limit: int = 1): # 搜索歌单 - self.api.cloudsearch.GetSearchResult( - keyword=keyword, stype=PLAYLIST, limit=limit - ) + self.api.cloudsearch.GetSearchResult(keyword=keyword, stype=PLAYLIST, limit=limit) - def check_message(self): + @staticmethod + def check_message(message_id: int): """检查缓存中是否存在解析 :return: """ - info = ncm_check_cache.search( - Q.message_id == self.event.dict()["reply"]["message_id"] - ) - return info[0] if info else None + flag = ncm_check_cache.search(Q.message_id == message_id) + return flag[0] if flag else None - def get_song(self, nid: Union[int, str], message_id=None): + @staticmethod + def get_song(nid: int, message_id: int): """解析歌曲id,并且加入缓存 - :param message_id: :param nid: :return: """ - mid = message_id or self.event.message_id - ncm_check_cache.insert( - { - "message_id": mid, - "type": "song", - "nid": nid, - "lid": 0, - "ids": [], - "lmsg": "", - "time": int(time.time()), - } - ) - - def get_playlist(self, lid: Union[int, str]): + ncm_check_cache.insert({"message_id": message_id, + "type": "song", + "nid": int(nid), + "lid": 0, + "ids": [], + "lmsg": "", + "bot_id": "", + "time": int(time.time())}) + + def get_playlist(self, lid: int, message_id: int): + lid = int(lid) data = self.api.playlist.GetPlaylistInfo(lid) # logger.info(data) if data["code"] == 200: raw = data["playlist"] - tags = ",".join(raw["tags"]) - songs = [i["id"] for i in raw["trackIds"]] - ncm_check_cache.insert( - { - "message_id": self.event.message_id, - "type": "playlist", - "nid": 0, - "lid": lid, - "ids": songs, - "lmsg": f"歌单:{raw['name']}\r\n创建者:{raw['creator']['nickname']}\r\n歌曲总数:{raw['trackCount']}\r\n" - f"标签:{tags}\r\n播放次数:{raw['playCount']}\r\n收藏:{raw['subscribedCount']}\r\n" - f"评论:{raw['commentCount']}\r\n分享:{raw['shareCount']}\r\nListID:{lid}", - "time": int(time.time()), - } - ) - - async def upload_group_data_file(self, data): - await self.upload_group_file(file=data["file"], name=data["filename"]) - - async def upload_private_data_file(self, data): - await self.upload_private_file(file=data["file"], name=data["filename"]) + tags = ",".join(raw['tags']) + songs = [int(i['id']) for i in raw['trackIds']] + ncm_check_cache.insert({"message_id": message_id, + "type": "playlist", + "nid": 0, + "lid": lid, + "ids": songs, + "lmsg": f"歌单:{raw['name']}\r\n创建者:{raw['creator']['nickname']}\r\n歌曲总数:{raw['trackCount']}\r\n" + f"标签:{tags}\r\n播放次数:{raw['playCount']}\r\n收藏:{raw['subscribedCount']}\r\n" + f"评论:{raw['commentCount']}\r\n分享:{raw['shareCount']}\r\nListID:{lid}", + "bot_id": "", + "time": int(time.time())}) + + async def upload_data_file(self, event: Union[GroupMessageEvent, PrivateMessageEvent], + data: Dict[str, Union[str, int]]): + if isinstance(event, GroupMessageEvent): + await self.upload_group_file(group_id=event.group_id, file=data["file"], name=data["filename"]) + elif isinstance(event, PrivateMessageEvent): + await self.upload_private_file(user_id=event.user_id, file=data["file"], name=data["filename"]) - async def upload_group_file(self, file, name): + @staticmethod + async def upload_group_file(group_id: int, file: str, name: str): + bot: Bot = cast(Bot, current_bot.get()) try: - await self.bot.upload_group_file( - group_id=self.event.group_id, file=file, name=name - ) + await bot.upload_group_file(group_id=group_id, file=file, name=name) except (ActionFailed, NetworkError) as e: logger.error(e) - if ( - isinstance(e, ActionFailed) - and e.info["wording"] == "server" " requires unsupported ftn upload" - ): - await self.bot.send( - event=self.event, - message=Message( - MessageSegment.text( - "[ERROR] 文件上传失败\r\n[原因] 机器人缺少上传文件的权限\r\n[解决办法] " - "请将机器人设置为管理员或者允许群员上传文件" - ) - ), - ) + if isinstance(e, ActionFailed) and e.info["wording"] == "server" \ + " requires unsupported ftn upload": + await bot.send_group_msg(group_id=group_id, message=Message(MessageSegment.text( + "[ERROR] 文件上传失败\r\n[原因] 机器人缺少上传文件的权限\r\n[解决办法] " + "请将机器人设置为管理员或者允许群员上传文件"))) elif isinstance(e, NetworkError): - await self.bot.send( - event=self.event, - message=Message( - MessageSegment.text( - "[ERROR]文件上传失败\r\n[原因] " "上传超时(一般来说还在传,建议等待五分钟)" - ) - ), - ) + await bot.send_group_msg(group_id=group_id, + message=Message(MessageSegment.text("[ERROR]文件上传失败\r\n[原因] " + "上传超时(一般来说还在传,建议等待五分钟)"))) - async def upload_private_file(self, file, name): + @staticmethod + async def upload_private_file(user_id: int, file: str, name: str): + bot: Bot = cast(Bot, current_bot.get()) try: - await self.bot.upload_private_file( - user_id=self.event.user_id, file=file, name=name - ) + await bot.upload_private_file(user_id=user_id, file=file, name=name) except (ActionFailed, NetworkError) as e: logger.error(e) if isinstance(e, NetworkError): - await self.bot.send( - event=self.event, - message=Message( - MessageSegment.text( - "[ERROR] 文件上传失败\r\n[原因] 上传超时(一般来说还在传,建议等待五分钟)" - ) - ), - ) - - @run_sync - def get_zip(self, lid, filenames: list): - zip_file_new = f"{lid}.zip" - with zipfile.ZipFile( - str(Path.cwd().joinpath("music").joinpath(zip_file_new)), - "w", - zipfile.ZIP_DEFLATED, - ) as z: - for f in filenames: - z.write(str(f), f.name) - return zip_file_new + await bot.send_private_msg(user_id=user_id, message=Message(MessageSegment.text( + "[ERROR] 文件上传失败\r\n[原因] 上传超时(一般来说还在传,建议等待五分钟)"))) + + # @run_sync + # def get_zip(self, lid: int, filenames: list): + # zip_file_new = f'{lid}.zip' + # with zipfile.ZipFile(str(Path.cwd().joinpath("music").joinpath(zip_file_new)), 'w', zipfile.ZIP_DEFLATED) as z: + # for f in filenames: + # z.write(str(f), f.name) + # return zip_file_new + async def upload(self, data: dict, fr: str, event: Union[GroupMessageEvent, PrivateMessageEvent]): + if data["code"] == 404: + logger.error("未从网易云读取到下载地址") + return None + url = data["url"] + nid = data["id"] + filename = f"{data['ncm_name']}.{data['type']}" + filename = re.sub(r'[/:*?"<>|]', '-', filename) + bot = cast(Bot, current_bot.get()) + download_ret: Dict[str, str] = await bot.download_file(url=url) + file = download_ret["file"] + cf = { + "id": int(nid), + "file": str(file), # 获取文件位置 + "filename": filename, # 获取文件名 + "from": fr, # 判断来自单曲还是歌单 + "time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 获取时间 + } + if info := music.search(Q["id"] == nid): + music.update(cf, Q["id"] == nid) + else: + music.insert(cf) + logger.debug(f"Download:{filename}") + await self.upload_data_file(event=event, data=cf) - async def download(self, ids: list, check=False, lid=0, is_zip=False): # 下载音乐 - data: list = self.api.track.GetTrackAudio(song_ids=ids, bitrate=3200 * 1000)[ - "data" - ] - name: list = self.detail(ids) - filenames = [] - not_zips = [] + async def start_upload(self, ids: List[int], event: Union[GroupMessageEvent, PrivateMessageEvent]): + """一般地 320k及以上即 flac, 320k及以下即 mp3,96k及以下即 m4a + """ + data: list = self.get_detail(ids) for i in range(len(ids)): - if data[i]["code"] == 404: - logger.error("未从网易云读取到下载地址") - return - url = data[i]["url"] - nid = data[i]["id"] - filename = f"{name[i]}.{data[i]['type']}" - filename = re.sub(r'[\/:*?"<>|]', "-", filename) - file = Path.cwd().joinpath("music").joinpath(filename) - config = { - "id": int(nid), - "file": str(file), # 获取文件位置 - "filename": filename, # 获取文件名 - "from": "song" if len(ids) == 1 else "list", # 判断来自单曲还是歌单 - "time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 获取时间 - } - filenames.append(file) - not_zips.append(config) - if info := music.search(Q["id"] == nid): - music.update(config, Q["id"] == nid) - else: - music.insert(config) - async with httpx.AsyncClient() as client, client.stream("GET", url=url) as r, async_open(file, "wb") as out_file: - async for chunk in r.aiter_bytes(): - await out_file.write(chunk) - logger.debug(f"Download:{filename}") - if is_zip: - await self.get_zip(lid=lid, filenames=filenames) - return not_zips + await self.upload(data[i], "song" if len(ids) == 1 else "list", event) + + # if is_zip: + # await self.get_zip(lid=lid, filenames=filenames) + # return not_zips nncm = Ncm() if info := ncm_user_cache.search(Q.uid == "user"): logger.info("检测到缓存,自动加载用户") - nncm.load_user(info) + nncm.load_user(info[0]['session']) nncm.get_user_info() elif ncm_config.ncm_phone == "": logger.warning("您未填写账号,自动进入二维码登录模式")