From e77fed1b9ad1b0c882e43aa265caf85b13a68a72 Mon Sep 17 00:00:00 2001 From: Christoph Reiter Date: Wed, 1 Nov 2023 19:20:17 +0100 Subject: [PATCH] Run pyupgrade upgrade everything to 3.10+ --- app/api.py | 50 +++++++------- app/appconfig.py | 7 +- app/appstate.py | 167 +++++++++++++++++++++++------------------------ app/fetch.py | 64 +++++++++--------- app/pkgextra.py | 22 +++---- app/utils.py | 18 ++--- app/web.py | 76 ++++++++++----------- run.py | 3 +- setup.cfg | 2 +- 9 files changed, 201 insertions(+), 208 deletions(-) diff --git a/app/api.py b/app/api.py index f47dae2..f3245d5 100644 --- a/app/api.py +++ b/app/api.py @@ -3,26 +3,26 @@ from fastapi_etag import Etag from pydantic import BaseModel -from typing import Tuple, Dict, List, Set, Iterable, Union, Optional +from collections.abc import Iterable from .appstate import state, SrcInfoPackage from .utils import extract_upstream_version, version_is_newer_than from .fetch import queue_update class QueueBuild(BaseModel): - packages: List[str] - depends: Dict[str, List[str]] + packages: list[str] + depends: dict[str, list[str]] new: bool class QueueEntry(BaseModel): name: str version: str - version_repo: Optional[str] + version_repo: str | None repo_url: str repo_path: str source: bool - builds: Dict[str, QueueBuild] + builds: dict[str, QueueBuild] async def get_etag(request: Request) -> str: @@ -32,7 +32,7 @@ async def get_etag(request: Request) -> str: router = APIRouter() -def get_srcinfos_to_build() -> Tuple[List[SrcInfoPackage], Set[str]]: +def get_srcinfos_to_build() -> tuple[list[SrcInfoPackage], set[str]]: srcinfos = [] # packages that should be updated @@ -45,8 +45,8 @@ def get_srcinfos_to_build() -> Tuple[List[SrcInfoPackage], Set[str]]: srcinfos.append(srcinfo) # packages that are new - not_in_repo: Dict[str, List[SrcInfoPackage]] = {} - replaces_not_in_repo: Set[str] = set() + not_in_repo: dict[str, list[SrcInfoPackage]] = {} + replaces_not_in_repo: set[str] = set() for srcinfo in state.sourceinfos.values(): not_in_repo.setdefault(srcinfo.pkgname, []).append(srcinfo) replaces_not_in_repo.update(srcinfo.replaces) @@ -54,7 +54,7 @@ def get_srcinfos_to_build() -> Tuple[List[SrcInfoPackage], Set[str]]: for p in s.packages.values(): not_in_repo.pop(p.name, None) replaces_not_in_repo.discard(p.name) - marked_new: Set[str] = set() + marked_new: set[str] = set() for sis in not_in_repo.values(): srcinfos.extend(sis) # packages that are considered new, that don't exist in the repo, or @@ -69,8 +69,8 @@ def get_srcinfos_to_build() -> Tuple[List[SrcInfoPackage], Set[str]]: return srcinfos, marked_new -@router.get('/buildqueue2', response_model=List[QueueEntry]) -async def buildqueue2(request: Request, response: Response) -> List[QueueEntry]: +@router.get('/buildqueue2', response_model=list[QueueEntry]) +async def buildqueue2(request: Request, response: Response) -> list[QueueEntry]: srcinfos, marked_new = get_srcinfos_to_build() srcinfo_provides = {} @@ -92,7 +92,7 @@ def resolve_package(pkgname: str) -> str: # if there is no real one, try to find a provider return srcinfo_provides.get(pkgname, pkgname) - def get_transitive_depends_and_resolve(packages: Iterable[str]) -> Set[str]: + def get_transitive_depends_and_resolve(packages: Iterable[str]) -> set[str]: todo = set(packages) done = set() while todo: @@ -105,8 +105,8 @@ def get_transitive_depends_and_resolve(packages: Iterable[str]) -> Set[str]: todo.update(si.depends.keys()) return done - def get_transitive_makedepends(packages: Iterable[str]) -> Set[str]: - todo: Set[str] = set() + def get_transitive_makedepends(packages: Iterable[str]) -> set[str]: + todo: set[str] = set() for name in packages: # don't resolve here, we want the real deps of the packages to build # even if it gets replaced @@ -115,7 +115,7 @@ def get_transitive_makedepends(packages: Iterable[str]) -> Set[str]: todo.update(si.makedepends.keys()) return get_transitive_depends_and_resolve(todo) - def srcinfo_get_repo_version(si: SrcInfoPackage) -> Optional[str]: + def srcinfo_get_repo_version(si: SrcInfoPackage) -> str | None: if si.pkgbase in state.sources: return state.sources[si.pkgbase].version return None @@ -131,21 +131,21 @@ def srcinfo_has_src(si: SrcInfoPackage) -> bool: def srcinfo_is_new(si: SrcInfoPackage) -> bool: return si.pkgname in marked_new - def build_key(srcinfo: SrcInfoPackage) -> Tuple[str, str]: + def build_key(srcinfo: SrcInfoPackage) -> tuple[str, str]: return (srcinfo.repo_url, srcinfo.repo_path) - to_build: Dict[Tuple, List[SrcInfoPackage]] = {} + to_build: dict[tuple, list[SrcInfoPackage]] = {} for srcinfo in srcinfos: key = build_key(srcinfo) to_build.setdefault(key, []).append(srcinfo) entries = [] repo_mapping = {} - all_packages: Set[str] = set() + all_packages: set[str] = set() for srcinfos in to_build.values(): packages = set() needs_src = False - new_all: Dict[str, List[bool]] = {} + new_all: dict[str, list[bool]] = {} version_repo = None for si in srcinfos: if not srcinfo_has_src(si): @@ -177,8 +177,8 @@ def build_key(srcinfo: SrcInfoPackage) -> Tuple[str, str]: e["makedepends"] &= all_packages e["makedepends"] -= e["packages"] - def group_by_repo(sequence: Iterable[str]) -> Dict[str, List]: - grouped: Dict[str, List] = {} + def group_by_repo(sequence: Iterable[str]) -> dict[str, list]: + grouped: dict[str, list] = {} for name in sequence: grouped.setdefault(repo_mapping[name], []).append(name) for key, values in grouped.items(): @@ -200,7 +200,7 @@ def group_by_repo(sequence: Iterable[str]) -> Dict[str, List]: makedepends = e["makedepends"] - builds: Dict[str, QueueBuild] = {} + builds: dict[str, QueueBuild] = {} deps_grouped = group_by_repo(makedepends) for repo, build_packages in group_by_repo(e["packages"]).items(): @@ -251,7 +251,7 @@ async def search(request: Request, response: Response, query: str = "", qtype: s qtype = "pkg" parts = query.split() - res_pkg: List[Dict[str, Union[str, List[str], int]]] = [] + res_pkg: list[dict[str, str | list[str] | int]] = [] exact = {} if not query: pass @@ -296,8 +296,8 @@ class OutOfDateEntry(BaseModel): version_upstream: str -@router.get('/outofdate', response_model=List[OutOfDateEntry], dependencies=[Depends(Etag(get_etag))]) -async def outofdate(request: Request, response: Response) -> List[OutOfDateEntry]: +@router.get('/outofdate', response_model=list[OutOfDateEntry], dependencies=[Depends(Etag(get_etag))]) +async def outofdate(request: Request, response: Response) -> list[OutOfDateEntry]: to_update = [] for s in state.sources.values(): diff --git a/app/appconfig.py b/app/appconfig.py index dc4d1cf..2057039 100644 --- a/app/appconfig.py +++ b/app/appconfig.py @@ -1,9 +1,6 @@ # Copyright 2016-2020 Christoph Reiter # SPDX-License-Identifier: MIT -from typing import Optional - - REPO_URL = "https://repo.msys2.org" DOWNLOAD_URL = "https://mirror.msys2.org" REPOSITORIES = [ @@ -21,7 +18,7 @@ ARCH_REPO_CONFIG = [] for repo in ["core", "core-testing", "extra", "extra-testing"]: ARCH_REPO_CONFIG.append( - (ARCH_REPO_URL + "/{0}/os/x86_64/{0}.db".format(repo), repo) + (ARCH_REPO_URL + f"/{repo}/os/x86_64/{repo}.db", repo) ) AUR_METADATA_URL = "https://aur.archlinux.org/packages-meta-ext-v1.json.gz" @@ -48,4 +45,4 @@ UPDATE_MIN_RATE = 1 REQUEST_TIMEOUT = 60 -CACHE_DIR: Optional[str] = None +CACHE_DIR: str | None = None diff --git a/app/appstate.py b/app/appstate.py index 9d98ba9..dc12caf 100644 --- a/app/appstate.py +++ b/app/appstate.py @@ -11,7 +11,8 @@ from enum import Enum from functools import cmp_to_key from urllib.parse import quote_plus, quote -from typing import List, Set, Dict, Tuple, Optional, Type, Sequence, NamedTuple, Any +from typing import NamedTuple, Any +from collections.abc import Sequence from pydantic import BaseModel from .appconfig import REPOSITORIES @@ -21,27 +22,26 @@ from .pkgextra import PkgExtra, PkgExtraEntry -PackageKey = Tuple[str, str, str, str, str] +PackageKey = tuple[str, str, str, str, str] -ExtId = NamedTuple('ExtId', [ - ('id', str), - ('name', str), - # If the versions should be considered only as a fallback - ('fallback', bool), -]) -ExtInfo = NamedTuple('ExtInfo', [ - ('name', str), - ('version', str), - ('date', int), - ('url', str), - ('other_urls', Dict[str, str]), -]) +class ExtId(NamedTuple): + id: str + name: str + fallback: bool + -PackagerInfo = NamedTuple('PackagerInfo', [ - ('name', str), - ('email', Optional[str]), -]) +class ExtInfo(NamedTuple): + name: str + version: str + date: int + url: str + other_urls: dict[str, str] + + +class PackagerInfo(NamedTuple): + name: str + email: str | None def parse_packager(text: str, _re: Any = re.compile("(.*?)<(.*?)>")) -> PackagerInfo: @@ -60,14 +60,14 @@ class DepType(Enum): CHECK = 3 -def get_repositories() -> List[Repository]: +def get_repositories() -> list[Repository]: l = [] for data in REPOSITORIES: l.append(Repository(*data)) return l -def get_realname_variants(s: Source) -> List[str]: +def get_realname_variants(s: Source) -> list[str]: """Returns a list of potential names used by external systems, highest priority first""" main = [s.realname, s.realname.lower()] @@ -75,14 +75,14 @@ def get_realname_variants(s: Source) -> List[str]: package_variants = [p.realname for p in s.packages.values()] # fallback to the provide names - provides_variants: List[str] = [] + provides_variants: list[str] = [] for p in s.packages.values(): provides_variants.extend(p.realprovides.keys()) return main + sorted(package_variants) + sorted(provides_variants) -def cleanup_files(files: List[str]) -> List[str]: +def cleanup_files(files: list[str]) -> list[str]: """Remove redundant directory paths and root them""" last = None @@ -124,7 +124,7 @@ def files_url(self) -> str: return self.url.rstrip("/") + "/" + self.name + ".files" @property - def packages(self) -> "List[Package]": + def packages(self) -> list[Package]: global state repo_packages = [] @@ -144,20 +144,20 @@ def isize(self) -> int: class BuildStatusBuild(BaseModel): - desc: Optional[str] + desc: str | None status: str - urls: Dict[str, str] + urls: dict[str, str] class BuildStatusPackage(BaseModel): name: str version: str - builds: Dict[str, BuildStatusBuild] + builds: dict[str, BuildStatusBuild] class BuildStatus(BaseModel): - packages: List[BuildStatusPackage] = [] - cycles: List[Tuple[str, str]] = [] + packages: list[BuildStatusPackage] = [] + cycles: list[tuple[str, str]] = [] class AppState: @@ -168,10 +168,10 @@ def __init__(self) -> None: self._etag = "" self.ready = False self._last_update = 0.0 - self._sources: Dict[str, Source] = {} - self._sourceinfos: Dict[str, SrcInfoPackage] = {} + self._sources: dict[str, Source] = {} + self._sourceinfos: dict[str, SrcInfoPackage] = {} self._pkgextra: PkgExtra = PkgExtra(packages={}) - self._ext_infos: Dict[ExtId, Dict[str, ExtInfo]] = {} + self._ext_infos: dict[ExtId, dict[str, ExtInfo]] = {} self._build_status: BuildStatus = BuildStatus() self._update_etag() @@ -188,20 +188,20 @@ def etag(self) -> str: return self._etag @property - def sources(self) -> Dict[str, Source]: + def sources(self) -> dict[str, Source]: return self._sources @sources.setter - def sources(self, sources: Dict[str, Source]) -> None: + def sources(self, sources: dict[str, Source]) -> None: self._sources = sources self._update_etag() @property - def sourceinfos(self) -> Dict[str, SrcInfoPackage]: + def sourceinfos(self) -> dict[str, SrcInfoPackage]: return self._sourceinfos @sourceinfos.setter - def sourceinfos(self, sourceinfos: Dict[str, SrcInfoPackage]) -> None: + def sourceinfos(self, sourceinfos: dict[str, SrcInfoPackage]) -> None: self._sourceinfos = sourceinfos self._update_etag() @@ -215,13 +215,13 @@ def pkgextra(self, pkgextra: PkgExtra) -> None: self._update_etag() @property - def ext_info_ids(self) -> List[ExtId]: + def ext_info_ids(self) -> list[ExtId]: return list(self._ext_infos.keys()) - def get_ext_infos(self, id: ExtId) -> Dict[str, ExtInfo]: + def get_ext_infos(self, id: ExtId) -> dict[str, ExtInfo]: return self._ext_infos.get(id, {}) - def set_ext_infos(self, id: ExtId, info: Dict[str, ExtInfo]) -> None: + def set_ext_infos(self, id: ExtId, info: dict[str, ExtInfo]) -> None: self._ext_infos[id] = info self._update_etag() @@ -237,12 +237,12 @@ def build_status(self, build_status: BuildStatus) -> None: class Package: - def __init__(self, builddate: str, csize: str, depends: List[str], filename: str, files: List[str], isize: str, - makedepends: List[str], md5sum: str, name: str, pgpsig: Optional[str], sha256sum: str, arch: str, + def __init__(self, builddate: str, csize: str, depends: list[str], filename: str, files: list[str], isize: str, + makedepends: list[str], md5sum: str, name: str, pgpsig: str | None, sha256sum: str, arch: str, base_url: str, repo: str, repo_variant: str, package_prefix: str, base_prefix: str, - provides: List[str], conflicts: List[str], replaces: List[str], - version: str, base: str, desc: str, groups: List[str], licenses: List[str], optdepends: List[str], - checkdepends: List[str], url: str, packager: str) -> None: + provides: list[str], conflicts: list[str], replaces: list[str], + version: str, base: str, desc: str, groups: list[str], licenses: list[str], optdepends: list[str], + checkdepends: list[str], url: str, packager: str) -> None: self.builddate = int(builddate) self.csize = csize self.url = url @@ -270,10 +270,10 @@ def __init__(self, builddate: str, csize: str, depends: List[str], filename: str self.desc = desc self.groups = groups self.licenses = licenses - self.rdepends: Dict[Package, Set[DepType]] = {} + self.rdepends: dict[Package, set[DepType]] = {} self.optdepends = split_optdepends(optdepends) self.packager = parse_packager(packager) - self.provided_by: Set[Package] = set() + self.provided_by: set[Package] = set() @property def files(self) -> Sequence[str]: @@ -289,7 +289,7 @@ def pkgextra(self) -> PkgExtraEntry: return state.pkgextra.packages.get(self.base, PkgExtraEntry()) @property - def urls(self) -> List[Tuple[str, str]]: + def urls(self) -> list[tuple[str, str]]: """Returns a list of (name, url) tuples for the various URLs of the package""" extra = self.pkgextra @@ -309,7 +309,7 @@ def urls(self) -> List[Tuple[str, str]]: return urls @property - def realprovides(self) -> Dict[str, Set[str]]: + def realprovides(self) -> dict[str, set[str]]: prov = {} for key, infos in self.provides.items(): if key.startswith(self.package_prefix): @@ -358,7 +358,7 @@ def key(self) -> PackageKey: self.name, self.arch, self.fileurl) @classmethod - def from_desc(cls: Type[Package], d: Dict[str, List[str]], base: str, repo: Repository) -> Package: + def from_desc(cls: type[Package], d: dict[str, list[str]], base: str, repo: Repository) -> Package: return cls(d["%BUILDDATE%"][0], d["%CSIZE%"][0], d.get("%DEPENDS%", []), d["%FILENAME%"][0], d.get("%FILES%", []), d["%ISIZE%"][0], @@ -379,7 +379,7 @@ class Source: def __init__(self, name: str): self.name = name - self.packages: Dict[PackageKey, Package] = {} + self.packages: dict[PackageKey, Package] = {} @property def desc(self) -> str: @@ -398,27 +398,27 @@ def _package(self) -> Package: return sorted(self.packages.items())[0][1] @property - def repos(self) -> List[str]: - return sorted(set([p.repo for p in self.packages.values()])) + def repos(self) -> list[str]: + return sorted({p.repo for p in self.packages.values()}) @property def url(self) -> str: return self._package.url @property - def arches(self) -> List[str]: - return sorted(set([p.arch for p in self.packages.values()])) + def arches(self) -> list[str]: + return sorted({p.arch for p in self.packages.values()}) @property - def groups(self) -> List[str]: - groups: Set[str] = set() + def groups(self) -> list[str]: + groups: set[str] = set() for p in self.packages.values(): groups.update(p.groups) return sorted(groups) @property - def basegroups(self) -> List[str]: - groups: Set[str] = set() + def basegroups(self) -> list[str]: + groups: set[str] = set() for p in self.packages.values(): groups.update(get_base_group_name(p, g) for g in p.groups) return sorted(groups) @@ -426,25 +426,25 @@ def basegroups(self) -> List[str]: @property def version(self) -> str: # get the newest version - versions: Set[str] = set([p.version for p in self.packages.values()]) + versions: set[str] = {p.version for p in self.packages.values()} return sorted(versions, key=cmp_to_key(vercmp), reverse=True)[0] @property def git_version(self) -> str: # get the newest version - versions: Set[str] = set([p.git_version for p in self.packages.values()]) + versions: set[str] = {p.git_version for p in self.packages.values()} return sorted(versions, key=cmp_to_key(vercmp), reverse=True)[0] @property - def licenses(self) -> List[List[str]]: - licenses: List[List[str]] = [] + def licenses(self) -> list[list[str]]: + licenses: list[list[str]] = [] for p in self.packages.values(): if p.licenses and p.licenses not in licenses: licenses.append(p.licenses) return sorted(licenses) @property - def upstream_info(self) -> Optional[ExtInfo]: + def upstream_info(self) -> ExtInfo | None: # Take the newest version of the external versions newest = None fallback = None @@ -469,11 +469,11 @@ def pkgextra(self) -> PkgExtraEntry: return state.pkgextra.packages.get(self.name, PkgExtraEntry()) @property - def urls(self) -> List[Tuple[str, str]]: + def urls(self) -> list[tuple[str, str]]: return self._package.urls @property - def external_infos(self) -> Sequence[Tuple[ExtId, ExtInfo]]: + def external_infos(self) -> Sequence[tuple[ExtId, ExtInfo]]: global state # internal package, don't try to link it @@ -542,7 +542,7 @@ def searchbug_url(self) -> str: "/issues?q=" + quote_plus("is:issue is:open %s" % self.realname)) @classmethod - def from_desc(cls, d: Dict[str, List[str]], repo: Repository) -> "Source": + def from_desc(cls, d: dict[str, list[str]], repo: Repository) -> Source: name = d["%NAME%"][0] if "%BASE%" not in d: @@ -555,12 +555,12 @@ def from_desc(cls, d: Dict[str, List[str]], repo: Repository) -> "Source": return cls(base) - def add_desc(self, d: Dict[str, List[str]], repo: Repository) -> None: + def add_desc(self, d: dict[str, list[str]], repo: Repository) -> None: p = Package.from_desc(d, self.name, repo) assert p.key not in self.packages self.packages[p.key] = p - def get_info(self) -> Dict[str, Any]: + def get_info(self) -> dict[str, Any]: return { 'name': self.name, 'realname': self.realname, @@ -576,10 +576,10 @@ def get_info(self) -> Dict[str, Any]: } -class SrcInfoPackage(object): +class SrcInfoPackage: def __init__(self, pkgbase: str, pkgname: str, pkgver: str, pkgrel: str, - repo: str, repo_url: str, repo_path: str, date: str, pkgbasedesc: Optional[str]): + repo: str, repo_url: str, repo_path: str, date: str, pkgbasedesc: str | None): self.pkgbase = pkgbase self.pkgname = pkgname self.pkgver = pkgver @@ -589,13 +589,13 @@ def __init__(self, pkgbase: str, pkgname: str, pkgver: str, pkgrel: str, self.repo_path = repo_path # iso 8601 to UTC without a timezone self.date = datetime.fromisoformat(date).astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") - self.epoch: Optional[str] = None - self.depends: Dict[str, Set[str]] = {} - self.makedepends: Dict[str, Set[str]] = {} - self.provides: Dict[str, Set[str]] = {} - self.conflicts: Dict[str, Set[str]] = {} - self.replaces: Set[str] = set() - self.sources: List[str] = [] + self.epoch: str | None = None + self.depends: dict[str, set[str]] = {} + self.makedepends: dict[str, set[str]] = {} + self.provides: dict[str, set[str]] = {} + self.conflicts: dict[str, set[str]] = {} + self.replaces: set[str] = set() + self.sources: list[str] = [] self.pkgbasedesc = pkgbasedesc @property @@ -608,20 +608,19 @@ def source_url(self) -> str: @property def build_version(self) -> str: - version = "%s-%s" % (self.pkgver, self.pkgrel) + version = f"{self.pkgver}-{self.pkgrel}" if self.epoch: - version = "%s~%s" % (self.epoch, version) + version = f"{self.epoch}~{version}" return version def __repr__(self) -> str: - return "<%s %s %s>" % ( - type(self).__name__, self.pkgname, self.build_version) + return f"<{type(self).__name__} {self.pkgname} {self.build_version}>" @classmethod - def for_srcinfo(cls, srcinfo: str, repo: str, repo_url: str, repo_path: str, date: str) -> "Set[SrcInfoPackage]": + def for_srcinfo(cls, srcinfo: str, repo: str, repo_url: str, repo_path: str, date: str) -> set[SrcInfoPackage]: # parse pkgbase and then each pkgname - base: Dict[str, List[str]] = {} - sub: Dict[str, Dict[str, List[str]]] = {} + base: dict[str, list[str]] = {} + sub: dict[str, dict[str, list[str]]] = {} current = None for line in srcinfo.splitlines(): line = line.strip() diff --git a/app/fetch.py b/app/fetch.py index a58688a..da8d536 100644 --- a/app/fetch.py +++ b/app/fetch.py @@ -13,7 +13,7 @@ import datetime from asyncio import Event from urllib.parse import urlparse, quote_plus -from typing import Any, Dict, Tuple, List, Set, Optional +from typing import Any, Optional from email.utils import parsedate_to_datetime import httpx @@ -30,7 +30,7 @@ from .exttarfile import ExtTarFile -def get_mtime_for_response(response: httpx.Response) -> Optional[datetime.datetime]: +def get_mtime_for_response(response: httpx.Response) -> datetime.datetime | None: last_modified = response.headers.get("last-modified") if last_modified is not None: dt: datetime.datetime = parsedate_to_datetime(last_modified) @@ -38,7 +38,7 @@ def get_mtime_for_response(response: httpx.Response) -> Optional[datetime.dateti return None -async def get_content_cached_mtime(url: str, *args: Any, **kwargs: Any) -> Tuple[bytes, Optional[datetime.datetime]]: +async def get_content_cached_mtime(url: str, *args: Any, **kwargs: Any) -> tuple[bytes, datetime.datetime | None]: """Returns the content of the URL response, and a datetime object for when the content was last modified""" # cache the file locally, and store the "last-modified" date as the file mtime @@ -77,13 +77,13 @@ async def get_content_cached(url: str, *args: Any, **kwargs: Any) -> bytes: return (await get_content_cached_mtime(url, *args, **kwargs))[0] -def parse_cygwin_versions(base_url: str, data: bytes) -> Tuple[Dict[str, ExtInfo], Dict[str, ExtInfo]]: +def parse_cygwin_versions(base_url: str, data: bytes) -> tuple[dict[str, ExtInfo], dict[str, ExtInfo]]: # This is kinda hacky: extract the source name from the src tarball and take # last version line before it version = None source_package = None - versions: Dict[str, ExtInfo] = {} - versions_mingw64: Dict[str, ExtInfo] = {} + versions: dict[str, ExtInfo] = {} + versions_mingw64: dict[str, ExtInfo] = {} base_url = base_url.rsplit("/", 2)[0] in_main = True for line in data.decode("utf-8").splitlines(): @@ -145,19 +145,19 @@ async def update_build_status() -> None: for url in urls: logger.info("Loading %r" % url) data, mtime = await get_content_cached_mtime(url, timeout=REQUEST_TIMEOUT) - logger.info("Done: %r, %r" % (url, str(mtime))) + logger.info(f"Done: {url!r}, {str(mtime)!r}") responses.append((mtime, url, data)) # use the newest of all status summaries newest = sorted(responses)[-1] - logger.info("Selected: %r" % (newest[1],)) + logger.info(f"Selected: {newest[1]!r}") state.build_status = BuildStatus.parse_raw(newest[2]) -def parse_desc(t: str) -> Dict[str, List[str]]: - d: Dict[str, List[str]] = {} +def parse_desc(t: str) -> dict[str, list[str]]: + d: dict[str, list[str]] = {} cat = None - values: List[str] = [] + values: list[str] = [] for l in t.splitlines(): l = l.strip() if cat is None: @@ -173,8 +173,8 @@ def parse_desc(t: str) -> Dict[str, List[str]]: return d -async def parse_repo(repo: Repository, include_files: bool = True) -> Dict[str, Source]: - sources: Dict[str, Source] = {} +async def parse_repo(repo: Repository, include_files: bool = True) -> dict[str, Source]: + sources: dict[str, Source] = {} def add_desc(d: Any) -> None: source = Source.from_desc(d, repo) @@ -191,7 +191,7 @@ def add_desc(d: Any) -> None: with io.BytesIO(data) as f: with ExtTarFile.open(fileobj=f, mode="r") as tar: - packages: Dict[str, list] = {} + packages: dict[str, list] = {} for info in tar: package_name = info.name.split("/", 1)[0] infofile = tar.extractfile(info) @@ -222,7 +222,7 @@ async def update_arch_versions() -> None: return logger.info("update versions") - arch_versions: Dict[str, ExtInfo] = {} + arch_versions: dict[str, ExtInfo] = {} awaitables = [] for (url, repo) in ARCH_REPO_CONFIG: download_url = url.rsplit("/", 1)[0] @@ -239,8 +239,7 @@ async def update_arch_versions() -> None: for source in sources.values(): version = extract_upstream_version(arch_version_to_msys(source.version)) for p in source.packages.values(): - url = "https://archlinux.org/packages/%s/%s/%s/" % ( - p.repo, p.arch, p.name) + url = f"https://archlinux.org/packages/{p.repo}/{p.arch}/{p.name}/" if p.name in arch_versions: old_ver = arch_versions[p.name][0] @@ -249,7 +248,7 @@ async def update_arch_versions() -> None: else: arch_versions[p.name] = ExtInfo(p.name, version, p.builddate, url, {}) - url = "https://archlinux.org/packages/%s/%s/%s/" % ( + url = "https://archlinux.org/packages/{}/{}/{}/".format( source.repos[0], source.arches[0], source.name) if source.name in arch_versions: old_ver = arch_versions[source.name][0] @@ -260,8 +259,7 @@ async def update_arch_versions() -> None: # use provides as fallback for p in source.packages.values(): - url = "https://archlinux.org/packages/%s/%s/%s/" % ( - p.repo, p.arch, p.name) + url = f"https://archlinux.org/packages/{p.repo}/{p.arch}/{p.name}/" for provides in sorted(p.provides.keys()): if provides not in arch_versions: @@ -271,7 +269,7 @@ async def update_arch_versions() -> None: state.set_ext_infos(ExtId("archlinux", "Arch Linux", False), arch_versions) logger.info("update versions from AUR") - aur_versions: Dict[str, ExtInfo] = {} + aur_versions: dict[str, ExtInfo] = {} r = await get_content_cached(AUR_METADATA_URL, timeout=REQUEST_TIMEOUT) items = json.loads(r) @@ -300,16 +298,16 @@ async def update_arch_versions() -> None: state.set_ext_infos(ExtId("aur", "AUR", True), aur_versions) -CacheHeaders = Dict[str, Optional[str]] +CacheHeaders = dict[str, Optional[str]] -async def check_needs_update(urls: List[str], _cache: Dict[str, CacheHeaders] = {}) -> bool: +async def check_needs_update(urls: list[str], _cache: dict[str, CacheHeaders] = {}) -> bool: """Raises RequestException""" if appconfig.CACHE_DIR: return True - async def get_cache_headers(client: httpx.AsyncClient, url: str, timeout: float) -> Tuple[str, CacheHeaders]: + async def get_cache_headers(client: httpx.AsyncClient, url: str, timeout: float) -> tuple[str, CacheHeaders]: """This tries to return the cache response headers for a given URL as cheap as possible""" old_headers = _cache.get(url, {}) @@ -341,7 +339,7 @@ async def get_cache_headers(client: httpx.AsyncClient, url: str, timeout: float) needs_update = True _cache[url] = new_cache_headers - logger.info("check needs update: %r -> %r" % (urls, needs_update)) + logger.info(f"check needs update: {urls!r} -> {needs_update!r}") return needs_update @@ -355,7 +353,7 @@ async def update_source() -> None: logger.info("update source") - final: Dict[str, Source] = {} + final: dict[str, Source] = {} awaitables = [] for repo in get_repositories(): awaitables.append(parse_repo(repo)) @@ -377,7 +375,7 @@ async def update_sourceinfos() -> None: return logger.info("update sourceinfos") - result: Dict[str, SrcInfoPackage] = {} + result: dict[str, SrcInfoPackage] = {} pkgextra = PkgExtra(packages={}) for url in urls: @@ -439,8 +437,8 @@ async def update_pypi_versions(pkgextra: PkgExtra) -> None: state.set_ext_infos(ExtId("pypi", "PyPI", True), pypi_versions) -def fill_rdepends(sources: Dict[str, Source]) -> None: - deps: Dict[str, Dict[Package, Set[DepType]]] = {} +def fill_rdepends(sources: dict[str, Source]) -> None: + deps: dict[str, dict[Package, set[DepType]]] = {} for s in sources.values(): for p in s.packages.values(): for n, r in p.depends.items(): @@ -458,7 +456,7 @@ def fill_rdepends(sources: Dict[str, Source]) -> None: for prov in p.provides: rdeps.append(deps.get(prov, dict())) - merged: Dict[Package, Set[DepType]] = {} + merged: dict[Package, set[DepType]] = {} for rd in rdeps: for rp, rs in rd.items(): merged.setdefault(rp, set()).update(rs) @@ -466,9 +464,9 @@ def fill_rdepends(sources: Dict[str, Source]) -> None: p.rdepends = merged -def fill_provided_by(sources: Dict[str, Source]) -> None: +def fill_provided_by(sources: dict[str, Source]) -> None: - provided_by: Dict[str, Set[Package]] = {} + provided_by: dict[str, set[Package]] = {} for s in sources.values(): for p in s.packages.values(): for provides in p.provides.keys(): @@ -482,7 +480,7 @@ def fill_provided_by(sources: Dict[str, Source]) -> None: _rate_limit = AsyncLimiter(UPDATE_MIN_RATE, UPDATE_MIN_INTERVAL) -@functools.lru_cache(maxsize=None) +@functools.cache def _get_update_event() -> Event: return Event() diff --git a/app/pkgextra.py b/app/pkgextra.py index 4a622fc..ea50630 100644 --- a/app/pkgextra.py +++ b/app/pkgextra.py @@ -2,7 +2,7 @@ # SPDX-License-Identifier: MIT from pydantic import BaseModel, Field -from typing import Dict, Optional, Sequence, Union, Collection +from collections.abc import Sequence, Collection class PkgExtraEntry(BaseModel): @@ -11,36 +11,36 @@ class PkgExtraEntry(BaseModel): internal: bool = Field(default=False) """If the package is MSYS2 internal or just a meta package""" - references: Dict[str, Optional[str]] = Field(default_factory=dict) + references: dict[str, str | None] = Field(default_factory=dict) """References to third party repositories""" - changelog_url: Optional[str] = Field(default=None) + changelog_url: str | None = Field(default=None) """A NEWS file in git or the github releases page. In case there are multiple, the one that is more useful for packagers """ - documentation_url: Optional[str] = Field(default=None) + documentation_url: str | None = Field(default=None) """Documentation for the API, tools, etc provided, in case it's a different website""" - repository_url: Optional[str] = Field(default=None) + repository_url: str | None = Field(default=None) """Web view of the repository, e.g. on github or gitlab""" - issue_tracker_url: Optional[str] = Field(default=None) + issue_tracker_url: str | None = Field(default=None) """The bug tracker, mailing list, etc""" - pgp_keys_url: Optional[str] = Field(default=None) + pgp_keys_url: str | None = Field(default=None) """A website containing which keys are used to sign releases""" class PkgExtra(BaseModel): - packages: Dict[str, PkgExtraEntry] + packages: dict[str, PkgExtraEntry] """A mapping of pkgbase names to PkgExtraEntry""" -def convert_mapping(array: Sequence[str]) -> Dict[str, Optional[str]]: - converted: Dict[str, Optional[str]] = {} +def convert_mapping(array: Sequence[str]) -> dict[str, str | None]: + converted: dict[str, str | None] = {} for item in array: if ":" in item: key, value = item.split(":", 1) @@ -52,7 +52,7 @@ def convert_mapping(array: Sequence[str]) -> Dict[str, Optional[str]]: return converted -def extra_to_pkgextra_entry(data: Dict[str, Union[str, Collection[str]]]) -> PkgExtraEntry: +def extra_to_pkgextra_entry(data: dict[str, str | Collection[str]]) -> PkgExtraEntry: mappings = ["references"] data = dict(data) diff --git a/app/utils.py b/app/utils.py index e0dc78f..dc60208 100644 --- a/app/utils.py +++ b/app/utils.py @@ -5,7 +5,7 @@ import sys import logging from itertools import zip_longest -from typing import List, Tuple, Optional, Dict, Set, Any +from typing import Any logger = logging.getLogger('app') @@ -28,13 +28,13 @@ def cmp(a: Any, b: Any) -> int: assert isinstance(res, int) return res - def split(v: str) -> Tuple[str, str, Optional[str]]: + def split(v: str) -> tuple[str, str, str | None]: if "~" in v: e, v = v.split("~", 1) else: e, v = ("0", v) - r: Optional[str] = None + r: str | None = None if "-" in v: v, r = v.rsplit("-", 1) else: @@ -53,8 +53,8 @@ def get_type(c: str) -> int: else: return other - def parse(v: str) -> List[str]: - parts: List[str] = [] + def parse(v: str) -> list[str]: + parts: list[str] = [] current = "" for c in v: if not current: @@ -142,8 +142,8 @@ def version_is_newer_than(v1: str, v2: str) -> bool: return vercmp(v1, v2) == 1 -def split_depends(deps: List[str]) -> Dict[str, Set[str]]: - r: Dict[str, Set[str]] = {} +def split_depends(deps: list[str]) -> dict[str, set[str]]: + r: dict[str, set[str]] = {} for d in deps: parts = re.split("([<>=]+)", d, 1) first = parts[0].strip() @@ -152,8 +152,8 @@ def split_depends(deps: List[str]) -> Dict[str, Set[str]]: return r -def split_optdepends(deps: List[str]) -> Dict[str, Set[str]]: - r: Dict[str, Set[str]] = {} +def split_optdepends(deps: list[str]) -> dict[str, set[str]]: + r: dict[str, set[str]] = {} for d in deps: if ":" in d: a, b = d.split(":", 1) diff --git a/app/web.py b/app/web.py index 1b293c8..f637112 100644 --- a/app/web.py +++ b/app/web.py @@ -8,7 +8,8 @@ import datetime from enum import Enum import urllib.parse -from typing import Callable, Any, List, Union, Dict, Optional, Tuple, Set, NamedTuple +from typing import Any, Optional, NamedTuple +from collections.abc import Callable import jinja2 import markupsafe @@ -41,13 +42,12 @@ class PackageStatus(Enum): UNKNOWN = 'unknown' -PackageBuildStatus = NamedTuple('PackageBuildStatus', [ - ('type', str), - ('status', str), - ('details', str), - ('urls', Dict[str, str]), - ('category', str), -]) +class PackageBuildStatus(NamedTuple): + type: str + status: str + details: str + urls: dict[str, str] + category: str async def get_etag(request: Request) -> str: @@ -64,7 +64,7 @@ def wrap(f: Callable) -> Callable: def context_function(name: str) -> Callable: def wrap(f: Callable) -> Callable: @jinja2.pass_context - def ctxfunc(context: Dict, *args: Any, **kwargs: Any) -> Any: + def ctxfunc(context: dict, *args: Any, **kwargs: Any) -> Any: return f(context["request"], *args, **kwargs) templates.env.globals[name] = ctxfunc return f @@ -83,7 +83,7 @@ def update_timestamp(request: Request) -> float: @context_function("package_url") -def package_url(request: Request, package: Package, name: Optional[str] = None) -> str: +def package_url(request: Request, package: Package, name: str | None = None) -> str: res: str = "" if name is None: res = str(request.url_for("package", package_name=name or package.name)) @@ -134,7 +134,7 @@ def needs_quote(s: str) -> bool: @context_function("licenses_to_html") -def licenses_to_html(request: Request, licenses: List[str]) -> str: +def licenses_to_html(request: Request, licenses: list[str]) -> str: done = [] for license in licenses: needs_quote = (" " in license.strip()) and len(licenses) > 1 @@ -148,7 +148,7 @@ def licenses_to_html(request: Request, licenses: List[str]) -> str: @template_filter("rdepends_type") -def rdepends_type(types: Set[DepType]) -> List[str]: +def rdepends_type(types: set[DepType]) -> list[str]: if list(types) == [DepType.NORMAL]: return [] names = [] @@ -165,7 +165,7 @@ def rdepends_type(types: Set[DepType]) -> List[str]: @template_filter("rdepends_sort") -def rdepends_sort(rdepends: Dict[Package, Set[str]]) -> List[Tuple[Package, Set[str]]]: +def rdepends_sort(rdepends: dict[Package, set[str]]) -> list[tuple[Package, set[str]]]: return sorted(rdepends.items(), key=lambda x: (x[0].name.lower(), x[0].key)) @@ -218,7 +218,7 @@ async def index(request: Request, response: Response) -> Response: @router.get('/base', dependencies=[Depends(Etag(get_etag))]) @router.get('/base/{base_name}', dependencies=[Depends(Etag(get_etag))]) -async def base(request: Request, response: Response, base_name: Optional[str] = None) -> Response: +async def base(request: Request, response: Response, base_name: str | None = None) -> Response: global state if base_name is not None: @@ -239,7 +239,7 @@ async def base(request: Request, response: Response, base_name: Optional[str] = @router.get('/group/', dependencies=[Depends(Etag(get_etag))]) @router.get('/group/{group_name}', dependencies=[Depends(Etag(get_etag))]) -async def group(request: Request, response: Response, group_name: Optional[str] = None) -> Response: +async def group(request: Request, response: Response, group_name: str | None = None) -> Response: params = {} if group_name is not None: params['group_name'] = group_name @@ -248,7 +248,7 @@ async def group(request: Request, response: Response, group_name: Optional[str] @router.get('/groups/', dependencies=[Depends(Etag(get_etag))]) @router.get('/groups/{group_name}', dependencies=[Depends(Etag(get_etag))]) -async def groups(request: Request, response: Response, group_name: Optional[str] = None) -> Response: +async def groups(request: Request, response: Response, group_name: str | None = None) -> Response: global state if group_name is not None: @@ -264,7 +264,7 @@ async def groups(request: Request, response: Response, group_name: Optional[str] "packages": res, }, headers=dict(response.headers)) else: - groups: Dict[str, int] = {} + groups: dict[str, int] = {} for s in state.sources.values(): for k, p in sorted(s.packages.items()): for name in p.groups: @@ -277,11 +277,11 @@ async def groups(request: Request, response: Response, group_name: Optional[str] @router.get('/basegroups/', dependencies=[Depends(Etag(get_etag))]) @router.get('/basegroups/{group_name}', dependencies=[Depends(Etag(get_etag))]) -async def basegroups(request: Request, response: Response, group_name: Optional[str] = None) -> Response: +async def basegroups(request: Request, response: Response, group_name: str | None = None) -> Response: global state if group_name is not None: - groups: Dict[str, int] = {} + groups: dict[str, int] = {} for s in state.sources.values(): for k, p in sorted(s.packages.items()): for name in p.groups: @@ -295,7 +295,7 @@ async def basegroups(request: Request, response: Response, group_name: Optional[ "groups": groups, }, headers=dict(response.headers)) else: - base_groups: Dict[str, Set[str]] = {} + base_groups: dict[str, set[str]] = {} for s in state.sources.values(): for k, p in sorted(s.packages.items()): for name in p.groups: @@ -309,7 +309,7 @@ async def basegroups(request: Request, response: Response, group_name: Optional[ @router.get('/package/', dependencies=[Depends(Etag(get_etag))]) -async def packages(request: Request, response: Response, repo: Optional[str] = None, variant: Optional[str] = None) -> Response: +async def packages(request: Request, response: Response, repo: str | None = None, variant: str | None = None) -> Response: global state repo = repo or DEFAULT_REPO @@ -331,7 +331,7 @@ async def packages(request: Request, response: Response, repo: Optional[str] = N @router.get('/package/{package_name}', dependencies=[Depends(Etag(get_etag))]) -async def package(request: Request, response: Response, package_name: str, repo: Optional[str] = None, variant: Optional[str] = None) -> Response: +async def package(request: Request, response: Response, package_name: str, repo: str | None = None, variant: str | None = None) -> Response: global state packages = [] @@ -366,7 +366,7 @@ async def updates(request: Request, response: Response, repo: str = "") -> Respo repo_filter = repo or None repos = get_repositories() - packages: List[Package] = [] + packages: list[Package] = [] for s in state.sources.values(): for p in s.packages.values(): if repo_filter is not None and p.repo != repo_filter: @@ -382,11 +382,11 @@ async def updates(request: Request, response: Response, repo: str = "") -> Respo }, headers=dict(response.headers)) -def get_transitive_depends(related: List[str]) -> Set[str]: +def get_transitive_depends(related: list[str]) -> set[str]: if not related: return set() - db_depends: Dict[str, Set[str]] = {} + db_depends: dict[str, set[str]] = {} related_pkgs = set() for s in state.sources.values(): for p in s.packages.values(): @@ -408,7 +408,7 @@ def get_transitive_depends(related: List[str]) -> Set[str]: @router.get('/outofdate', dependencies=[Depends(Etag(get_etag))]) -async def outofdate(request: Request, response: Response, related: Optional[str] = None, repo: str = "") -> Response: +async def outofdate(request: Request, response: Response, related: str | None = None, repo: str = "") -> Response: repo_filter = repo or None repos = get_repositories() @@ -513,7 +513,7 @@ def get_status_category(key: str) -> str: return DANGER -def get_status_priority(key: str) -> Tuple[int, str]: +def get_status_priority(key: str) -> tuple[int, str]: """We want to show the most important status as the primary one""" try: @@ -538,14 +538,14 @@ def get_status_priority(key: str) -> Tuple[int, str]: return (-1, key) -def repo_to_builds(repo: str) -> List[str]: +def repo_to_builds(repo: str) -> list[str]: if repo == "msys": return [repo, "msys-src"] else: return [repo, "mingw-src"] -def get_build_status(srcinfo: SrcInfoPackage, build_types: Set[str] = set()) -> List[PackageBuildStatus]: +def get_build_status(srcinfo: SrcInfoPackage, build_types: set[str] = set()) -> list[PackageBuildStatus]: build_status = state.build_status entry = None @@ -580,12 +580,12 @@ def get_build_status(srcinfo: SrcInfoPackage, build_types: Set[str] = set()) -> async def queue(request: Request, response: Response, build_type: str = "") -> Response: # Create entries for all packages where the version doesn't match - UpdateEntry = Tuple[SrcInfoPackage, Optional[Source], Optional[Package], List[PackageBuildStatus]] + UpdateEntry = tuple[SrcInfoPackage, Optional[Source], Optional[Package], list[PackageBuildStatus]] build_filter = build_type or None - srcinfo_repos: Dict[str, Set[str]] = {} + srcinfo_repos: dict[str, set[str]] = {} - grouped: Dict[str, UpdateEntry] = {} + grouped: dict[str, UpdateEntry] = {} for s in state.sources.values(): for k, p in sorted(s.packages.items()): if p.name in state.sourceinfos: @@ -594,12 +594,12 @@ async def queue(request: Request, response: Response, build_type: str = "") -> R continue if version_is_newer_than(srcinfo.build_version, p.version): srcinfo_repos.setdefault(srcinfo.pkgbase, set()).update(repo_to_builds(srcinfo.repo)) - repo_list = srcinfo_repos[srcinfo.pkgbase] if not build_filter else set([build_filter]) + repo_list = srcinfo_repos[srcinfo.pkgbase] if not build_filter else {build_filter} new_src = state.sources.get(srcinfo.pkgbase) grouped[srcinfo.pkgbase] = (srcinfo, new_src, p, get_build_status(srcinfo, repo_list)) # new packages - available: Dict[str, List[SrcInfoPackage]] = {} + available: dict[str, list[SrcInfoPackage]] = {} for srcinfo in state.sourceinfos.values(): if build_filter is not None and build_filter not in repo_to_builds(srcinfo.repo): continue @@ -612,7 +612,7 @@ async def queue(request: Request, response: Response, build_type: str = "") -> R for srcinfos in available.values(): for srcinfo in srcinfos: srcinfo_repos.setdefault(srcinfo.pkgbase, set()).update(repo_to_builds(srcinfo.repo)) - repo_list = srcinfo_repos[srcinfo.pkgbase] if not build_filter else set([build_filter]) + repo_list = srcinfo_repos[srcinfo.pkgbase] if not build_filter else {build_filter} src, pkg = None, None if srcinfo.pkgbase in grouped: src, pkg = grouped[srcinfo.pkgbase][1:3] @@ -620,7 +620,7 @@ async def queue(request: Request, response: Response, build_type: str = "") -> R src = state.sources[srcinfo.pkgbase] grouped[srcinfo.pkgbase] = (srcinfo, src, pkg, get_build_status(srcinfo, repo_list)) - updates: List[UpdateEntry] = [] + updates: list[UpdateEntry] = [] updates = list(grouped.values()) updates.sort( key=lambda i: (i[0].date, i[0].pkgbase, i[0].pkgname), @@ -667,9 +667,9 @@ async def search(request: Request, response: Response, q: str = "", t: str = "") parts = query.split() parts_lower = [p.lower() for p in parts] - res_pkg: List[Tuple[float, Union[Package, Source]]] = [] + res_pkg: list[tuple[float, Package | Source]] = [] - def get_score(name: str, parts: List[str]) -> float: + def get_score(name: str, parts: list[str]) -> float: score = 0.0 for part in parts: if part not in name: diff --git a/run.py b/run.py index fd14285..f40cf91 100755 --- a/run.py +++ b/run.py @@ -4,7 +4,6 @@ import os import sys import argparse -from typing import List, Optional, Union import uvicorn from app import app @@ -12,7 +11,7 @@ from app import logger -def main(argv: List[str]) -> Optional[Union[int, str]]: +def main(argv: list[str]) -> int | str | None: parser = argparse.ArgumentParser() parser.add_argument("-c", "--cache", action="store_true", help="use local repo cache") diff --git a/setup.cfg b/setup.cfg index a9f2dff..6bdb96c 100644 --- a/setup.cfg +++ b/setup.cfg @@ -4,7 +4,7 @@ max-line-length=100 exclude=frontend [mypy] -python_version=3.8 +python_version=3.10 ignore_missing_imports=True warn_no_return=True warn_return_any=True