Skip to content

Commit

Permalink
added delete task option & added phase parameter for list task
Browse files Browse the repository at this point in the history
  • Loading branch information
mhdzumair committed Jan 2, 2024
1 parent 78a5c76 commit 18260b6
Showing 1 changed file with 34 additions and 2 deletions.
36 changes: 34 additions & 2 deletions pikpakapi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,14 @@ async def _request_patch(
):
return await self._make_request("patch", url, data=data)

async def _request_delete(
self,
url: str,
params: dict = None,
data: dict = None,
):
return await self._make_request("delete", url, params=params, data=data)

def decode_token(self):
"""Decodes the encoded token to update access and refresh tokens."""
try:
Expand Down Expand Up @@ -269,21 +277,28 @@ async def offline_download(
return result

async def offline_list(
self, size: int = 10000, next_page_token: Optional[str] = None
self,
size: int = 10000,
next_page_token: Optional[str] = None,
phase: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""
size: int - 每次请求的数量
next_page_token: str - 下一页的page token
phase: List[str] - Offline download task status, default is ["PHASE_TYPE_RUNNING", "PHASE_TYPE_ERROR"]
supported values: PHASE_TYPE_RUNNING, PHASE_TYPE_ERROR, PHASE_TYPE_COMPLETE, PHASE_TYPE_PENDING
获取离线下载列表
"""
if phase is None:
phase = ["PHASE_TYPE_RUNNING", "PHASE_TYPE_ERROR"]
list_url = f"https://{self.PIKPAK_API_HOST}/drive/v1/tasks"
list_data = {
"type": "offline",
"thumbnail_size": "SIZE_SMALL",
"limit": size,
"next_page_token": next_page_token,
"filters": """{"phase": {"in": "PHASE_TYPE_RUNNING,PHASE_TYPE_ERROR"}}""",
"filters": json.dumps({"phase": {"in": ",".join(phase)}}),
}
result = await self._request_get(list_url, list_data)
return result
Expand Down Expand Up @@ -367,6 +382,23 @@ async def offline_task_retry(self, task_id: str) -> Dict[str, Any]:
except Exception as e:
raise PikpakException(f"重试离线下载任务失败: {task_id}. {e}")

async def delete_tasks(
self, task_ids: List[str], delete_files: bool = False
) -> None:
"""
delete tasks by task ids
task_ids: List[str] - task ids to delete
"""
delete_url = f"https://{self.PIKPAK_API_HOST}/drive/v1/tasks"
params = {
"task_ids": task_ids,
"delete_files": delete_files,
}
try:
await self._request_delete(delete_url, params=params)
except Exception as e:
raise PikpakException(f"Failing to delete tasks: {task_ids}. {e}")

async def get_task_status(self, task_id: str, file_id: str) -> DownloadStatus:
"""
task_id: str - 离线下载任务id
Expand Down

0 comments on commit 18260b6

Please sign in to comment.