diff --git a/.vscode/settings.json b/.vscode/settings.json
index e0009a2..ba9a9b5 100644
--- a/.vscode/settings.json
+++ b/.vscode/settings.json
@@ -1,3 +1,9 @@
{
- "python.analysis.typeCheckingMode": "basic"
-}
\ No newline at end of file
+ "python.analysis.typeCheckingMode": "basic",
+ "[python]": {
+ "editor.codeActionsOnSave": {
+ "source.organizeImports.ruff": "explicit",
+ }
+ },
+ "ruff.lineLength": 80,
+}
diff --git a/README.md b/README.md
index 3e3948e..7b9b5ef 100644
--- a/README.md
+++ b/README.md
@@ -1,7 +1,6 @@
# Tidal Downloader
-TIDDL is the Python CLI application that allows downloading Tidal tracks.
-Fully typed, only 2 requirements.
+TIDDL is Python CLI application that allows downloading Tidal tracks.
![GitHub top language](https://img.shields.io/github/languages/top/oskvr37/tiddl?style=for-the-badge)
![PyPI - Version](https://img.shields.io/pypi/v/tiddl?style=for-the-badge)
@@ -11,6 +10,9 @@ Fully typed, only 2 requirements.
It's inspired by [Tidal-Media-Downloader](https://github.com/yaronzz/Tidal-Media-Downloader) - currently not mantained project.
This repository will contain features requests from that project and will be the enhanced version.
+> [!WARNING]
+> This app is for personal use only and is not affiliated with Tidal. Users must ensure their use complies with Tidal's terms of service and local copyright laws. Downloaded tracks are for personal use and may not be shared or redistributed. The developer assumes no responsibility for misuse of this app.
+
# Installation
Install package using `pip`
@@ -19,84 +21,74 @@ Install package using `pip`
pip install tiddl
```
-After installation you can use `tiddl` to set up auth token
+Run the package cli with `tiddl`
```bash
$ tiddl
-> go to https://link.tidal.com/xxxxx and add device!
-authenticated!
-token expires in 7 days
-```
-Use `tiddl -h` to show help message
+Usage: tiddl [OPTIONS] COMMAND [ARGS]...
-# CLI
+ TIDDL - Download Tidal tracks ✨
-After authentication - when your token is ready - you can start downloading!
+Options:
+ -v, --verbose Show debug logs
+ --help Show this message and exit.
-You can download `tracks` `albums` `playlists` `artists albums`
+Commands:
+ ...
+```
-- `tiddl -s -q high` sets high quality as default quality
-- `tiddl ` downloads with high quality
-- `tiddl -q master` downloads with best possible quality
-- `tiddl 284165609 -p my_folder -o "{artist} - {title}"` downloads track to `my_folder/{artist} - {title}.flac`
-- `tiddl track/284165609 -p my_folder -o "{artist} - {title}" -s` same as above, but saves `my_folder` as default download path and `{artist} - {title}` as default file format
+# Basic usage
-### Valid input
+Login with Tidal account
-- 284165609 (will treat this as track id)
-- https://tidal.com/browse/track/284165609
-- track/284165609
-- https://listen.tidal.com/album/284165608/track/284165609
-- https://listen.tidal.com/album/284165608
-- album/284165608
-- https://listen.tidal.com/artist/7695548
-- artist/7695548
-- https://listen.tidal.com/playlist/803be625-97e4-4cbb-88dd-43f0b1c61ed7
-- playlist/803be625-97e4-4cbb-88dd-43f0b1c61ed7
+```bash
+tiddl auth login
+```
-### File formatting
+Download track / album / artist / playlist
-| Key | Example | Comment |
-| --------------- | ------------------------- | ------------------------------------------------------------- |
-| title | Money Trees | |
-| artist | Kendrick Lamar | |
-| artists | Kendrick Lamar, Jay Rock | |
-| album | good kid, m.A.A.d city | |
-| number | 5 | number on album |
-| disc_number | 1 | number of album volume |
-| released | 10/22/2012 | release date |
-| year | 2012 | year of release date |
-| playlist | Kendrick Lamar Essentials | title of playlist will only appear when you download playlist |
-| playlist_number | 15 | index of track on the playlist |
-| id | 20556797 | id on Tidal |
+```bash
+tiddl url https://listen.tidal.com/track/103805726 download
+tiddl url https://listen.tidal.com/album/103805723 download
+tiddl url https://listen.tidal.com/artist/25022 download
+tiddl url https://listen.tidal.com/playlist/84974059-76af-406a-aede-ece2b78fa372 download
+```
-# Modules
+> [!TIP]
+> You don't have to paste full urls, track/103805726, album/103805723 etc. will also work
-You can also use TIDDL as module, it's fully typed so you will get type hints
+Set download quality and output format
-```python
-from tiddl import TidalApi, Config
+```bash
+tiddl ... download -q master -o "{artist}/{title} ({album})"
+```
-config = Config()
+This command will:
+- download with highest quality
+- save track with title and album name in artist folder
-api = TidalApi(
- config["token"],
- config["user"]["user_id"],
- config["user"]["country_code"]
-)
+> [!NOTE]
+> More about file templating [on wiki](https://github.com/oskvr37/tiddl/wiki/Template-formatting).
-album_id = 284165608
+# Development
-album = api.getAlbum(album_id)
+Clone the repository
-print(f"{album["title"]} has {album["numberOfTracks"]} tracks!")
+```bash
+git clone https://github.com/oskvr37/tiddl
```
-# Testing
+Install package with `--editable` flag
+```bash
+pip install -e .
```
-python -m unittest tiddl/tests.py
+
+Run tests
+
+```bash
+python -m unittest
```
# Resources
diff --git a/pyproject.toml b/pyproject.toml
new file mode 100644
index 0000000..ca570fb
--- /dev/null
+++ b/pyproject.toml
@@ -0,0 +1,29 @@
+[build-system]
+requires = ["setuptools>=42", "wheel"]
+build-backend = "setuptools.build_meta"
+
+[project]
+name = "tiddl"
+version = "2.0.0"
+description = "Download Tidal tracks with CLI downloader."
+readme = "README.md"
+requires-python = ">=3.11"
+authors = [{ name = "oskvr37" }]
+classifiers = [
+ "Environment :: Console",
+ "Programming Language :: Python :: 3",
+ "Operating System :: OS Independent",
+]
+dependencies = [
+ "pydantic>=2.9.2",
+ "requests>=2.20.0",
+ "click>=8.1.7",
+ "mutagen>=1.47.0",
+ "ffmpeg-python>=0.2.0",
+]
+
+[project.urls]
+homepage = "https://github.com/oskvr37/tiddl"
+
+[project.scripts]
+tiddl = "tiddl.cli:cli"
diff --git a/requirements.txt b/requirements.txt
deleted file mode 100644
index a9a4c67..0000000
--- a/requirements.txt
+++ /dev/null
@@ -1,3 +0,0 @@
-requests>=2.20.0
-mutagen>=1.47.0
-ffmpeg-python>=0.2.0
\ No newline at end of file
diff --git a/setup.py b/setup.py
deleted file mode 100644
index efdc632..0000000
--- a/setup.py
+++ /dev/null
@@ -1,15 +0,0 @@
-from setuptools import setup, find_packages
-
-setup(
- name="tiddl",
- version="1.9.4",
- description="TIDDL (Tidal Downloader) is a Python CLI application that allows downloading Tidal tracks.",
- long_description=open("README.md", encoding="utf-8").read(),
- long_description_content_type="text/markdown",
- readme="README.md",
- author="oskvr37",
- packages=find_packages(),
- entry_points={
- "console_scripts": ["tiddl=tiddl:main"],
- },
-)
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/tests/test_api.py b/tests/test_api.py
new file mode 100644
index 0000000..fe448d8
--- /dev/null
+++ b/tests/test_api.py
@@ -0,0 +1,78 @@
+import unittest
+
+from tiddl.config import Config
+from tiddl.api import TidalApi
+
+
+class TestApi(unittest.TestCase):
+ api: TidalApi
+
+ def setUp(self):
+ config = Config.fromFile()
+ auth = config.auth
+
+ token, user_id, country_code = (
+ auth.token,
+ auth.user_id,
+ auth.country_code
+ )
+
+ assert token, "No token found in config file"
+ assert user_id, "No user_id found in config file"
+ assert country_code, "No country_code found in config file"
+
+ self.api = TidalApi(token, user_id, country_code)
+
+ def test_ready(self):
+ session = self.api.getSession()
+
+ self.assertEqual(session.userId, int(self.api.user_id))
+ self.assertEqual(session.countryCode, self.api.country_code)
+
+ def test_track(self):
+ track = self.api.getTrack(103805726)
+ self.assertEqual(track.title, "Stronger")
+
+ def test_artist(self):
+ artist = self.api.getArtist(25022)
+ self.assertEqual(artist.name, "Kanye West")
+
+ def test_artist_albums(self):
+ self.api.getArtistAlbums(25022, filter="ALBUMS")
+ self.api.getArtistAlbums(25022, filter="EPSANDSINGLES")
+
+ def test_album(self):
+ album = self.api.getAlbum(103805723)
+ self.assertEqual(album.title, "Graduation")
+
+ def test_album_items(self):
+ album_items = self.api.getAlbumItems(103805723, limit=10)
+ self.assertEqual(len(album_items.items), 10)
+
+ album_items = self.api.getAlbumItems(103805723, limit=10, offset=10)
+ self.assertEqual(len(album_items.items), 4)
+
+ def test_playlist(self):
+ playlist = self.api.getPlaylist("84974059-76af-406a-aede-ece2b78fa372")
+ self.assertEqual(playlist.title, "Kanye West Essentials")
+
+ def test_playlist_items(self):
+ playlist_items = self.api.getPlaylistItems(
+ "84974059-76af-406a-aede-ece2b78fa372"
+ )
+ self.assertEqual(len(playlist_items.items), 25)
+
+ def test_favorites(self):
+ favorites = self.api.getFavorites()
+ self.assertGreaterEqual(len(favorites.PLAYLIST), 0)
+ self.assertGreaterEqual(len(favorites.ALBUM), 0)
+ self.assertGreaterEqual(len(favorites.VIDEO), 0)
+ self.assertGreaterEqual(len(favorites.TRACK), 0)
+ self.assertGreaterEqual(len(favorites.ARTIST), 0)
+
+ def test_search(self):
+ self.api.getSearch("Kanye West")
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/test_utils.py b/tests/test_utils.py
new file mode 100644
index 0000000..a255c21
--- /dev/null
+++ b/tests/test_utils.py
@@ -0,0 +1,137 @@
+import unittest
+
+from tiddl.models.resource import Track
+from tiddl.utils import TidalResource, formatTrack
+
+
+class TestTidalResource(unittest.TestCase):
+ def test_resource_parsing(self):
+ positive_cases = [
+ ("https://tidal.com/browse/track/12345678", "track", "12345678"),
+ ("track/12345678", "track", "12345678"),
+ ("https://tidal.com/browse/album/12345678", "album", "12345678"),
+ ("album/12345678", "album", "12345678"),
+ ("https://tidal.com/browse/playlist/12345678", "playlist", "12345678"),
+ ("playlist/12345678", "playlist", "12345678"),
+ ("https://tidal.com/browse/artist/12345678", "artist", "12345678"),
+ ("artist/12345678", "artist", "12345678"),
+ ]
+
+ for resource, expected_type, expected_id in positive_cases:
+ with self.subTest(resource=resource):
+ tidal_resource = TidalResource.fromString(resource)
+ self.assertEqual(tidal_resource.type, expected_type)
+ self.assertEqual(tidal_resource.id, expected_id)
+
+ def test_failing_cases(self):
+ failing_cases = [
+ "https://tidal.com/browse/invalid/12345678",
+ "invalid/12345678",
+ "https://tidal.com/browse/track/invalid",
+ "track/invalid",
+ "",
+ "invalid",
+ "https://tidal.com/browse/track/",
+ "track/",
+ "/12345678",
+ ]
+
+ for resource in failing_cases:
+ with self.subTest(resource=resource):
+ with self.assertRaises(ValueError):
+ TidalResource.fromString(resource)
+
+
+class TestFormatTrack(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.track = Track(
+ **{
+ "id": 66421438,
+ "title": "Shutdown",
+ "duration": 189,
+ "replayGain": -9.95,
+ "peak": 0.966051,
+ "allowStreaming": True,
+ "streamReady": True,
+ "adSupportedStreamReady": True,
+ "djReady": True,
+ "stemReady": False,
+ "streamStartDate": "2016-11-15T00:00:00.000+0000",
+ "premiumStreamingOnly": False,
+ "trackNumber": 9,
+ "volumeNumber": 1,
+ "version": None,
+ "popularity": 24,
+ "copyright": "(P) 2016 Boy Better Know",
+ "bpm": 69,
+ "url": "http://www.tidal.com/track/66421438",
+ "isrc": "GB7QY1500024",
+ "editable": False,
+ "explicit": True,
+ "audioQuality": "LOSSLESS",
+ "audioModes": ["STEREO"],
+ "mediaMetadata": {"tags": ["LOSSLESS", "HIRES_LOSSLESS"]},
+ "artist": {
+ "id": 3566984,
+ "name": "Skepta",
+ "type": "MAIN",
+ "picture": "747af850-fa9c-4178-a3e6-49259b67df86",
+ },
+ "artists": [
+ {
+ "id": 3566984,
+ "name": "Skepta",
+ "type": "MAIN",
+ "picture": "747af850-fa9c-4178-a3e6-49259b67df86",
+ }
+ ],
+ "album": {
+ "id": 66421429,
+ "title": "Konnichiwa",
+ "cover": "e0c2f05e-e21f-47c5-9c37-2993437df27d",
+ "vibrantColor": "#ae3b31",
+ "videoCover": None,
+ },
+ "mixes": {"TRACK_MIX": "001aa4abeb471e8f55f5784772b478"},
+ "playlistNumber": None,
+ }
+ )
+
+ def test_templating(self):
+ test_cases = [
+ ("{id}", "66421438"),
+ ("{title}", "Shutdown"),
+ ("{version}", ""),
+ ("{artist}", "Skepta"),
+ ("{artists}", "Skepta"),
+ ("{album}", "Konnichiwa"),
+ ("{number}", "9"),
+ ("{disc}", "1"),
+ ("{date:%m-%d-%y}", "11-15-16"),
+ ("{date:%Y}", "2016"),
+ ("{year}", "2016"),
+ ("{playlist_number}", "0"),
+ ("{playlist_number:02d}", "00"),
+ ("{bpm}", "69"),
+ ("{quality}", "high"),
+ ("{artist}/{album}/{title}", "Skepta/Konnichiwa/Shutdown"),
+ ("{number:02d}. {title}", "09. Shutdown"),
+ ]
+
+ for template, expected_result in test_cases:
+ with self.subTest(template=template, expected_result=expected_result):
+ result = formatTrack(template, self.track)
+ self.assertEqual(result, expected_result)
+
+ def test_invalid_characters(self):
+ test_cases = ["\\", ":", '"', "?", "<", ">", "|", "{number}:{title}", "{date}"]
+
+ for template in test_cases:
+ with self.subTest(template=template):
+ with self.assertRaises(ValueError):
+ formatTrack(template, self.track)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tiddl/__init__.py b/tiddl/__init__.py
index 46f02d7..e69de29 100644
--- a/tiddl/__init__.py
+++ b/tiddl/__init__.py
@@ -1,357 +0,0 @@
-import os
-import time
-import logging
-from random import randint
-
-from .api import TidalApi, ApiError
-from .auth import getDeviceAuth, getToken, refreshToken
-from .config import Config
-from .download import downloadTrackStream, Cover
-from .parser import QUALITY_ARGS, parser
-from .types import TRACK_QUALITY, TrackQuality, Track
-from .types.api import _PlaylistItem
-from .utils import (
- RESOURCE,
- parseURL,
- formatFilename,
- loadingSymbol,
- setMetadata,
- convertFileExtension,
- initLogging,
- parseFileInput,
-)
-
-SAVE_COVER = True
-
-
-def main():
- args = parser.parse_args()
- initLogging(
- silent=args.silent, verbose=args.verbose, colored_logging=not args.no_color
- )
-
- logger = logging.getLogger("TIDDL")
- logger.debug(args)
-
- config = Config()
-
- include_singles = args.include_singles
- download_path = args.download_path or config["settings"]["download_path"]
- track_template = args.file_template or config["settings"]["track_template"]
- track_quality = (
- QUALITY_ARGS[args.quality]
- if args.quality
- else config["settings"]["track_quality"]
- )
- file_extension = args.file_extension or config["settings"]["file_extension"]
-
- if args.save_options:
- logger.info("saving new settings...")
- settings = config.update(
- {
- "settings": {
- "download_path": download_path,
- "track_quality": track_quality,
- "track_template": track_template,
- "file_extension": file_extension,
- }
- }
- ).get("settings")
-
- if settings:
- print("Current Settings:")
- for k, v in settings.items():
- print(f'> {k.upper()} "{v}"')
-
- logger.info(f"saved settings to {config.config_path}")
-
- if not config["token"]:
- auth = getDeviceAuth()
- text = f"> go to https://{auth['verificationUriComplete']} and add device!"
- expires_at = time.time() + auth["expiresIn"]
- i = 0
- while time.time() < expires_at:
- for _ in range(50):
- loadingSymbol(i, text)
- i += 1
- time.sleep(0.1)
- token = getToken(auth["deviceCode"])
-
- if token.get("error"):
- continue
-
- print()
-
- config.update(
- {
- "token": token["access_token"],
- "refresh_token": token["refresh_token"],
- "token_expires_at": int(time.time()) + token["expires_in"],
- "user": {
- "user_id": str(token["user"]["userId"]),
- "country_code": token["user"]["countryCode"],
- },
- }
- )
- logger.info(f"authenticated!")
- break
- else:
- logger.info("time for authentication has expired")
- return
-
- t_now = int(time.time())
- token_expired = t_now > config["token_expires_at"]
-
- if token_expired:
- token = refreshToken(config["refresh_token"])
- config.update(
- {
- "token": token["access_token"],
- "token_expires_at": int(time.time()) + token["expires_in"],
- }
- )
- logger.info(f"refreshed token!")
-
- time_to_expire = config["token_expires_at"] - t_now
- days, hours = time_to_expire // (24 * 3600), time_to_expire % (24 * 3600) // 3600
- days_text = f" {days} {'day' if days == 1 else 'days'}" if days else ""
- hours_text = f" {hours} {'hour' if hours == 1 else 'hours'}" if hours else ""
- logger.debug(f"token expires in{days_text}{hours_text}")
-
- user_inputs: list[str] = args.input
-
- if args.input_file:
- file_inputs = parseFileInput(args.input_file)
- user_inputs.extend(file_inputs)
-
- if len(user_inputs) == 0:
- logger.warning("no ID nor URL provided")
- return
-
- api = TidalApi(
- config["token"], config["user"]["user_id"], config["user"]["country_code"]
- )
-
- def downloadTrack(
- track: Track,
- file_template: str,
- skip_existing=True,
- sleep=False,
- playlist="",
- cover_data=b"",
- ) -> tuple[str, str]:
- if track.get("status", 200) != 200 or not track["allowStreaming"]:
- raise ValueError(
- f"The track is not streamable: {track["title"]} ({track["id"]})"
- )
-
- file_dir, file_name = formatFilename(file_template, track, playlist)
-
- file_path = f"{download_path}/{file_dir}/{file_name}"
- if skip_existing and (
- os.path.isfile(file_path + ".m4a") or os.path.isfile(file_path + ".flac")
- ):
- logger.info(f"already exists: {file_path}")
- return file_dir, file_name
-
- if sleep:
- sleep_time = randint(5, 15) / 10 + 1
- logger.info(f"sleeping for {sleep_time}s")
- try:
- time.sleep(sleep_time)
- except KeyboardInterrupt:
- logger.info("stopping...")
- exit()
-
- stream = api.getTrackStream(track["id"], track_quality)
- logger.debug({"stream": stream})
-
- quality = TRACK_QUALITY[stream["audioQuality"]]
-
- MASTER_QUALITIES: list[TrackQuality] = ["HI_RES_LOSSLESS", "LOSSLESS"]
- if stream["audioQuality"] in MASTER_QUALITIES:
- bit_depth, sample_rate = stream.get("bitDepth"), stream.get("sampleRate")
- if bit_depth is None or sample_rate is None:
- raise ValueError(
- "bitDepth and sampleRate must be provided for master qualities"
- )
- details = f"{bit_depth} bit {sample_rate/1000:.1f} kHz"
- else:
- details = quality["details"]
-
- logger.info(f"{file_name} :: {quality['name']} Quality - {details}")
-
- track_data, extension = downloadTrackStream(
- stream["manifest"],
- stream["manifestMimeType"],
- )
-
- os.makedirs(f"{download_path}/{file_dir}", exist_ok=True)
-
- file_path = f"{download_path}/{file_dir}/{file_name}.{extension}"
-
- with open(file_path, "wb+") as f:
- f.write(track_data)
-
- if not cover_data and track["album"]["cover"]:
- cover = Cover(track["album"]["cover"])
- cover_data = cover.content
-
- setMetadata(file_path, extension, track, cover_data)
-
- if file_extension:
- file_path = convertFileExtension(
- source_path=file_path, file_extension=file_extension
- )
-
- logger.info(f"track saved as {file_path}")
-
- return file_dir, file_name
-
- def downloadAlbum(album_id: str | int, skip_existing: bool):
- album = api.getAlbum(album_id)
- logger.info(f"album: {album['title']}")
-
- # i dont know if limit 100 is suspicious
- # but i will leave it here
- album_items = api.getAlbumItems(album_id, limit=100)
- album_cover = Cover(album["cover"])
-
- for item in album_items["items"]:
- track = item["item"]
-
- if item["type"] != "track":
- logger.warning(f"item is not a track: {track["title"]} ({track["id"]})")
- continue
-
- try:
- file_dir, file_name = downloadTrack(
- track,
- file_template=args.file_template
- or config["settings"]["album_template"],
- skip_existing=skip_existing,
- sleep=True,
- cover_data=album_cover.content,
- )
-
- if SAVE_COVER:
- album_cover.save(f"{download_path}/{file_dir}")
-
- except ValueError as e:
- logger.error(e)
-
- skip_existing = not args.no_skip
- failed_input = []
-
- for user_input in user_inputs:
- input_type: RESOURCE
- input_id: str
-
- if user_input.isdigit():
- input_type = "track"
- input_id = user_input
- else:
- try:
- input_type, input_id = parseURL(user_input)
- except ValueError as e:
- logger.error(e)
- failed_input.append(user_input)
- continue
-
- match input_type:
- case "track":
- try:
- track = api.getTrack(input_id)
- except ApiError as e:
- logger.warning(f"{e.error['userMessage']} ({e.error['status']})")
- continue
-
- try:
- downloadTrack(
- track,
- file_template=track_template,
- skip_existing=skip_existing,
- )
- except ValueError as e:
- logger.error(e)
-
- continue
-
- case "album":
- downloadAlbum(input_id, skip_existing)
- continue
-
- case "artist":
- all_albums = []
- artist_albums = api.getArtistAlbums(input_id)
- all_albums.extend(artist_albums["items"])
-
- if include_singles:
- artist_singles = api.getArtistAlbums(input_id, onlyNonAlbum=True)
- all_albums.extend(artist_singles["items"])
-
- for album in all_albums:
- downloadAlbum(album["id"], skip_existing)
-
- continue
-
- case "playlist":
- # TODO: add option to limit and set offset of playlist ✨
- # or just make a feature in GUI that lets user choose
- # which tracks from playlist to download
-
- playlist = api.getPlaylist(input_id)
- logger.info(f"playlist: {playlist['title']} ({playlist['url']})")
-
- playlist_cover = Cover(
- playlist["squareImage"], 1080
- ) # playlists have 1080x1080 size
-
- items: list[_PlaylistItem] = []
- offset = 0
-
- while True:
- playlist_items = api.getPlaylistItems(input_id, offset=offset)
- items.extend(playlist_items["items"])
-
- if (
- playlist_items["limit"] + playlist_items["offset"]
- > playlist_items["totalNumberOfItems"]
- ):
- break
-
- offset += playlist_items["limit"]
-
- for index, item in enumerate(items, 1):
- track = item["item"]
-
- track["playlistNumber"] = index
- try:
- file_dir, file_name = downloadTrack(
- track,
- file_template=args.file_template
- or config["settings"]["playlist_template"],
- skip_existing=skip_existing,
- sleep=True,
- playlist=playlist["title"],
- )
-
- if SAVE_COVER:
- playlist_cover.save(f"{download_path}/{file_dir}")
-
- except ValueError as e:
- logger.warning(f"track unavailable")
-
- continue
-
- case _:
- logger.warning(f"invalid input: `{input_type}`")
-
- failed_input.append(input_id)
-
- if len(failed_input) > 0:
- logger.info(f"failed: {failed_input}")
-
-
-if __name__ == "__main__":
- main()
diff --git a/tiddl/api.py b/tiddl/api.py
index b0324b7..6006e9e 100644
--- a/tiddl/api.py
+++ b/tiddl/api.py
@@ -1,116 +1,187 @@
+import json
import logging
+from pathlib import Path
+from typing import Any, Literal, Type, TypeVar
+
+from pydantic import BaseModel
from requests import Session
-from typing import TypedDict
-from .types import (
- ErrorResponse,
- SessionResponse,
- TrackQuality,
- Track,
- TrackStream,
- AristAlbumsItems,
+from tiddl.models.api import (
Album,
AlbumItems,
+ Artist,
+ ArtistAlbumsItems,
+ Favorites,
Playlist,
PlaylistItems,
- Favorites,
+ Search,
+ SessionResponse,
+ Track,
+ TrackStream,
+ Video,
)
-API_URL = "https://api.tidal.com/v1"
+from tiddl.models.constants import TrackQuality
+from tiddl.exceptions import ApiError
+
+DEBUG = False
+T = TypeVar("T", bound=BaseModel)
+
+logger = logging.getLogger(__name__)
+
-# Tidal default limits
-ARTIST_ALBUMS_LIMIT = 50
-ALBUM_ITEMS_LIMIT = 10
-PLAYLIST_LIMIT = 50
+def ensureLimit(limit: int, max_limit: int) -> int:
+ if limit > max_limit:
+ logger.warning(f"Max limit is {max_limit}")
+ return max_limit
+ return limit
-class ApiError(Exception):
- def __init__(self, message: str, error: ErrorResponse):
- super().__init__(message)
- self.error = error
+
+class Limits:
+ ARTIST_ALBUMS = 50
+ ALBUM_ITEMS = 10
+ ALBUM_ITEMS_MAX = 100
+ PLAYLIST = 50
class TidalApi:
+ URL = "https://api.tidal.com/v1"
+ LIMITS = Limits
+
def __init__(self, token: str, user_id: str, country_code: str) -> None:
- self.token = token
self.user_id = user_id
self.country_code = country_code
- self._session = Session()
- self._session.headers = {"authorization": f"Bearer {token}"}
- self._logger = logging.getLogger("TidalApi")
+ self.session = Session()
+ self.session.headers = {
+ "Authorization": f"Bearer {token}",
+ "Accept": "application/json",
+ }
- def _request(self, endpoint: str, params={}):
- self._logger.debug(f"{endpoint} {params}")
- req = self._session.request(
- method="GET", url=f"{API_URL}/{endpoint}", params=params
- )
+ def fetch(
+ self, model: Type[T], endpoint: str, params: dict[str, Any] = {}
+ ) -> T:
+ """Fetch data from the API and parse it into the given Pydantic model."""
+
+ req = self.session.get(f"{self.URL}/{endpoint}", params=params)
+
+ logger.debug((endpoint, params, req.status_code))
data = req.json()
+ if DEBUG:
+ debug_data = {
+ "status_code": req.status_code,
+ "endpoint": endpoint,
+ "params": params,
+ "data": data,
+ }
+
+ path = Path(f"debug_data/{endpoint}.json")
+ path.parent.mkdir(parents=True, exist_ok=True)
+
+ with path.open("w", encoding="utf-8") as f:
+ json.dump(debug_data, f, indent=2)
+
if req.status_code != 200:
- raise ApiError(req.text, data)
+ raise ApiError(**data)
- return data
+ return model.model_validate(data)
- def getSession(self) -> SessionResponse:
- return self._request(
- f"sessions",
+ def getAlbum(self, album_id: str | int):
+ return self.fetch(
+ Album, f"albums/{album_id}", {"countryCode": self.country_code}
)
- def getTrackStream(self, id: str | int, quality: TrackQuality) -> TrackStream:
- return self._request(
- f"tracks/{id}/playbackinfo",
+ def getAlbumItems(
+ self, album_id: str | int, limit=LIMITS.ALBUM_ITEMS, offset=0
+ ):
+ return self.fetch(
+ AlbumItems,
+ f"albums/{album_id}/items",
{
- "audioquality": quality,
- "playbackmode": "STREAM",
- "assetpresentation": "FULL",
+ "countryCode": self.country_code,
+ "limit": ensureLimit(limit, self.LIMITS.ALBUM_ITEMS_MAX),
+ "offset": offset,
},
)
- def getTrack(self, id: str | int) -> Track:
- return self._request(f"tracks/{id}", {"countryCode": self.country_code})
+ def getArtist(self, artist_id: str | int):
+ return self.fetch(
+ Artist, f"artists/{artist_id}", {"countryCode": self.country_code}
+ )
def getArtistAlbums(
- self, id: str | int, limit=ARTIST_ALBUMS_LIMIT, offset=0, onlyNonAlbum=False
- ) -> AristAlbumsItems:
- params = {"countryCode": self.country_code, "limit": limit, "offset": offset}
+ self,
+ artist_id: str | int,
+ limit=LIMITS.ARTIST_ALBUMS,
+ offset=0,
+ filter: Literal["ALBUMS", "EPSANDSINGLES"] = "ALBUMS",
+ ):
+ return self.fetch(
+ ArtistAlbumsItems,
+ f"artists/{artist_id}/albums",
+ {
+ "countryCode": self.country_code,
+ "limit": limit, # tested limit 10,000
+ "offset": offset,
+ "filter": filter,
+ },
+ )
- if onlyNonAlbum:
- params.update({"filter": "EPSANDSINGLES"})
+ def getFavorites(self):
+ return self.fetch(
+ Favorites,
+ f"users/{self.user_id}/favorites/ids",
+ {"countryCode": self.country_code},
+ )
- return self._request(
- f"artists/{id}/albums",
- params,
+ def getPlaylist(self, playlist_uuid: str):
+ return self.fetch(
+ Playlist,
+ f"playlists/{playlist_uuid}",
+ {"countryCode": self.country_code},
)
- def getAlbum(self, id: str | int) -> Album:
- return self._request(f"albums/{id}", {"countryCode": self.country_code})
+ def getPlaylistItems(
+ self, playlist_uuid: str, limit=LIMITS.PLAYLIST, offset=0
+ ):
+ return self.fetch(
+ PlaylistItems,
+ f"playlists/{playlist_uuid}/items",
+ {
+ "countryCode": self.country_code,
+ "limit": limit,
+ "offset": offset,
+ },
+ )
- def getAlbumItems(
- self, id: str | int, limit=ALBUM_ITEMS_LIMIT, offset=0
- ) -> AlbumItems:
- return self._request(
- f"albums/{id}/items",
- {"countryCode": self.country_code, "limit": limit, "offset": offset},
+ def getSearch(self, query: str):
+ return self.fetch(
+ Search, "search", {"countryCode": self.country_code, "query": query}
)
- def getPlaylist(self, uuid: str) -> Playlist:
- return self._request(
- f"playlists/{uuid}",
- {"countryCode": self.country_code},
+ def getSession(self):
+ return self.fetch(SessionResponse, "sessions")
+
+ def getTrack(self, track_id: str | int):
+ return self.fetch(
+ Track, f"tracks/{track_id}", {"countryCode": self.country_code}
)
- def getPlaylistItems(
- self, uuid: str, limit=PLAYLIST_LIMIT, offset=0
- ) -> PlaylistItems:
- return self._request(
- f"playlists/{uuid}/items",
- {"countryCode": self.country_code, "limit": limit, "offset": offset},
+ def getTrackStream(self, track_id: str | int, quality: TrackQuality):
+ return self.fetch(
+ TrackStream,
+ f"tracks/{track_id}/playbackinfo",
+ {
+ "audioquality": quality,
+ "playbackmode": "STREAM",
+ "assetpresentation": "FULL",
+ },
)
- def getFavorites(self) -> Favorites:
- return self._request(
- f"users/{self.user_id}/favorites/ids",
- {"countryCode": self.country_code},
+ def getVideo(self, video_id: str | int):
+ return self.fetch(
+ Video, f"videos/{video_id}", {"countryCode": self.country_code}
)
diff --git a/tiddl/auth.py b/tiddl/auth.py
index 0de887f..560b9dc 100644
--- a/tiddl/auth.py
+++ b/tiddl/auth.py
@@ -1,21 +1,35 @@
+import logging
+
from requests import request
-from .types.auth import AuthDeviceResponse, AuthResponse, AuthResponseWithRefresh
+
+from .exceptions import AuthError
+from .models import auth
AUTH_URL = "https://auth.tidal.com/v1/oauth2"
CLIENT_ID = "zU4XHVVkc2tDPo4t"
CLIENT_SECRET = "VJKhDFqJPqvsPVNBV6ukXTJmwlvbttP7wlMlrc72se4="
-def getDeviceAuth() -> AuthDeviceResponse:
- return request(
+logger = logging.getLogger(__name__)
+
+
+def getDeviceAuth():
+ req = request(
"POST",
f"{AUTH_URL}/device_authorization",
data={"client_id": CLIENT_ID, "scope": "r_usr+w_usr+w_sub"},
- ).json()
+ )
+
+ data = req.json()
+ if req.status_code == 200:
+ return auth.AuthDeviceResponse(**data)
-def getToken(device_code: str) -> AuthResponseWithRefresh:
- return request(
+ raise AuthError(**data)
+
+
+def getToken(device_code: str):
+ req = request(
"POST",
f"{AUTH_URL}/token",
data={
@@ -25,11 +39,18 @@ def getToken(device_code: str) -> AuthResponseWithRefresh:
"scope": "r_usr+w_usr+w_sub",
},
auth=(CLIENT_ID, CLIENT_SECRET),
- ).json()
+ )
+
+ data = req.json()
+
+ if req.status_code == 200:
+ return auth.AuthResponseWithRefresh(**data)
+ raise AuthError(**data)
-def refreshToken(refresh_token: str) -> AuthResponse:
- return request(
+
+def refreshToken(refresh_token: str):
+ req = request(
"POST",
f"{AUTH_URL}/token",
data={
@@ -39,4 +60,21 @@ def refreshToken(refresh_token: str) -> AuthResponse:
"scope": "r_usr+w_usr+w_sub",
},
auth=(CLIENT_ID, CLIENT_SECRET),
- ).json()
+ )
+
+ data = req.json()
+
+ if req.status_code == 200:
+ return auth.AuthResponse(**data)
+
+ raise AuthError(**data)
+
+
+def removeToken(access_token: str):
+ req = request(
+ "POST",
+ "https://api.tidal.com/v1/logout",
+ headers={"authorization": f"Bearer {access_token}"},
+ )
+
+ logger.debug((req.status_code, req.text))
diff --git a/tiddl/cli/__init__.py b/tiddl/cli/__init__.py
new file mode 100644
index 0000000..2583182
--- /dev/null
+++ b/tiddl/cli/__init__.py
@@ -0,0 +1,34 @@
+import click
+import logging
+
+from .ctx import ContextObj, passContext, Context
+from .auth import AuthGroup
+from .download import UrlGroup, FavGroup, SearchGroup, FileGroup
+from .config import ConfigCommand
+
+
+@click.group()
+@passContext
+@click.option("--verbose", "-v", is_flag=True, help="Show debug logs")
+def cli(ctx: Context, verbose: bool):
+ """TIDDL - Download Tidal tracks \u266b"""
+ ctx.obj = ContextObj()
+
+ logging.basicConfig(
+ level=logging.DEBUG if verbose else logging.INFO,
+ handlers=[logging.StreamHandler()],
+ format="%(levelname)s [%(name)s.%(funcName)s] %(message)s",
+ )
+
+ logging.getLogger("urllib3").setLevel(logging.ERROR)
+
+
+cli.add_command(ConfigCommand)
+cli.add_command(AuthGroup)
+cli.add_command(UrlGroup)
+cli.add_command(FavGroup)
+cli.add_command(SearchGroup)
+cli.add_command(FileGroup)
+
+if __name__ == "__main__":
+ cli()
diff --git a/tiddl/cli/auth.py b/tiddl/cli/auth.py
new file mode 100644
index 0000000..51e374e
--- /dev/null
+++ b/tiddl/cli/auth.py
@@ -0,0 +1,98 @@
+import click
+import logging
+
+from click import style
+from time import sleep, time
+
+from tiddl.auth import getDeviceAuth, getToken, refreshToken, removeToken, AuthError
+from tiddl.config import AuthConfig
+from .ctx import passContext, Context
+
+
+logger = logging.getLogger(__name__)
+
+
+@click.group("auth")
+def AuthGroup():
+ """Manage Tidal token"""
+
+
+@AuthGroup.command("login")
+@passContext
+def login(ctx: Context):
+ """Add token to the config"""
+
+ auth = ctx.obj.config.auth
+
+ if auth.token:
+ if auth.refresh_token and time() > auth.expires:
+ click.echo(style("Refreshing token...", fg="yellow"))
+ token = refreshToken(auth.refresh_token)
+
+ ctx.obj.config.auth.expires = token.expires_in + int(time())
+ ctx.obj.config.auth.token = token.access_token
+
+ ctx.obj.config.save()
+
+ click.echo(style("Authenticated!", fg="green"))
+ return
+
+ auth = getDeviceAuth()
+
+ uri = f"https://{auth.verificationUriComplete}"
+ click.launch(uri)
+ click.echo(f"Go to {style(uri, fg='cyan')} and complete authentication!")
+
+ time_left = time() + auth.expiresIn
+
+ while True:
+ sleep(auth.interval)
+
+ try:
+ token = getToken(auth.deviceCode)
+ except AuthError as e:
+ if e.error == "authorization_pending":
+ # FIX: `Time left: 0 secondsss` 🐍
+
+ click.echo(f"\rTime left: {time_left - time():.0f} seconds", nl=False)
+ continue
+
+ if e.error == "expired_token":
+ click.echo(
+ f"\nTime for authentication {style('has expired', fg='red')}."
+ )
+ break
+
+ new_auth = AuthConfig(
+ token=token.access_token,
+ refresh_token=token.refresh_token,
+ expires=token.expires_in + int(time()),
+ user_id=str(token.user.userId),
+ country_code=token.user.countryCode,
+ )
+
+ ctx.obj.config.auth = new_auth
+ ctx.obj.config.save()
+
+ click.echo(style("\nAuthenticated!", fg="green"))
+
+ break
+
+
+@AuthGroup.command("logout")
+@passContext
+def logout(ctx: Context):
+ """Remove token from config"""
+
+ access_token = ctx.obj.config.auth.token
+
+ if not access_token:
+ click.echo(style("Not logged in", fg="yellow"))
+ return
+
+ removeToken(access_token)
+
+ ctx.obj.config.auth = AuthConfig()
+ ctx.obj.config.save()
+
+ click.echo(style("Logged out!", fg="green"))
diff --git a/tiddl/cli/config.py b/tiddl/cli/config.py
new file mode 100644
index 0000000..c0587d0
--- /dev/null
+++ b/tiddl/cli/config.py
@@ -0,0 +1,19 @@
+import click
+
+from tiddl.config import CONFIG_PATH
+
+
+@click.command("config")
+@click.option(
+ "--open",
+ "-o",
+ is_flag=True,
+ help="Open the configuration file with the default editor",
+)
+def ConfigCommand(open: bool):
+ """Print path to the configuration file"""
+
+ click.echo(str(CONFIG_PATH))
+
+ if open:
+ click.launch(str(CONFIG_PATH))
diff --git a/tiddl/cli/ctx.py b/tiddl/cli/ctx.py
new file mode 100644
index 0000000..d641a0e
--- /dev/null
+++ b/tiddl/cli/ctx.py
@@ -0,0 +1,49 @@
+import functools
+import click
+
+from typing import Callable, TypeVar, cast
+
+from tiddl.api import TidalApi
+from tiddl.config import Config
+from tiddl.utils import TidalResource
+
+
+class ContextObj:
+ api: TidalApi | None
+ config: Config
+ resources: list[TidalResource]
+
+ def __init__(self) -> None:
+ self.config = Config.fromFile()
+ self.resources = []
+ self.api = None
+
+ auth = self.config.auth
+
+ if auth.token and auth.user_id and auth.country_code:
+ self.api = TidalApi(auth.token, auth.user_id, auth.country_code)
+
+ def getApi(self) -> TidalApi:
+ if self.api is None:
+ raise click.UsageError("You must login first")
+
+ return self.api
+
+
+class Context(click.Context):
+ obj: ContextObj
+
+
+F = TypeVar("F", bound=Callable[..., None])
+
+
+def passContext(func: F) -> F:
+ """Wrapper for @click.pass_context to use custom Context"""
+
+ @click.pass_context
+ @functools.wraps(func)
+ def wrapper(ctx: click.Context, *args, **kwargs):
+ custom_ctx = cast(Context, ctx)
+ return func(custom_ctx, *args, **kwargs)
+
+ return cast(F, wrapper)
diff --git a/tiddl/cli/download/__init__.py b/tiddl/cli/download/__init__.py
new file mode 100644
index 0000000..34ba127
--- /dev/null
+++ b/tiddl/cli/download/__init__.py
@@ -0,0 +1,162 @@
+import click
+
+from .fav import FavGroup
+from .file import FileGroup
+from .search import SearchGroup
+from .url import UrlGroup
+
+from ..ctx import Context, passContext
+
+from tiddl.download import downloadTrackStream
+from tiddl.utils import formatTrack, trackExists, TidalResource
+from tiddl.metadata import addMetadata, Cover
+from tiddl.exceptions import ApiError, AuthError
+from tiddl.models.constants import TrackArg, ARG_TO_QUALITY
+from tiddl.models.resource import Track, Album
+from tiddl.models.api import PlaylistItems
+
+
+@click.command("download")
+@click.option("--quality", "-q", "quality", type=click.Choice(TrackArg.__args__))
+@click.option(
+ "--output", "-o", "template", type=str, help="Format track file template."
+)
+@click.option(
+ "--noskip",
+ "-ns",
+ "noskip",
+ is_flag=True,
+ default=False,
+ help="Dont skip downloaded tracks.",
+)
+@passContext
+def DownloadCommand(
+ ctx: Context, quality: TrackArg | None, template: str | None, noskip: bool
+):
+ """Download the tracks"""
+
+ api = ctx.obj.getApi()
+
+ def downloadTrack(track: Track, file_name: str, cover_data=b""):
+ if not track.allowStreaming:
+ click.echo(
+ f"{click.style('✖', 'yellow')} Track {click.style(file_name, 'yellow')} does not allow streaming"
+ )
+ return
+
+ download_quality = ARG_TO_QUALITY[quality or ctx.obj.config.download.quality]
+
+ # .suffix is needed because the Path.with_suffix method will replace any content after dot
+ # for example: 'album/01. title' becomes 'album/01.m4a'
+ path = ctx.obj.config.download.path / f"{file_name}.suffix"
+
+ if not noskip and trackExists(track.audioQuality, download_quality, path):
+ click.echo(
+ f"{click.style('✔', 'cyan')} Skipping track {click.style(file_name, 'cyan')}"
+ )
+ return
+
+ click.echo(
+ f"{click.style('✔', 'green')} Downloading track {click.style(file_name, 'green')}"
+ )
+
+ track_stream = api.getTrackStream(track.id, download_quality)
+
+ stream_data, file_extension = downloadTrackStream(track_stream)
+
+ full_path = path.with_suffix(file_extension)
+ full_path.parent.mkdir(parents=True, exist_ok=True)
+
+ with full_path.open("wb") as f:
+ f.write(stream_data)
+
+ # TODO: add track credits fetching to fill more metadata
+
+ if not cover_data and track.album.cover:
+ cover_data = Cover(track.album.cover).content
+
+ addMetadata(full_path, track, cover_data)
+
+ def downloadAlbum(album: Album):
+ click.echo(f"★ Album {album.title}")
+
+ # TODO: fetch all items
+ album_items = api.getAlbumItems(album.id, limit=100)
+
+ cover_data = Cover(album.cover).content if album.cover else b""
+
+ for item in album_items.items:
+ if isinstance(item.item, Track):
+ track = item.item
+
+ file_name = formatTrack(
+ template=template or ctx.obj.config.template.album,
+ track=track,
+ album_artist=album.artist.name,
+ )
+
+ downloadTrack(track=track, file_name=file_name, cover_data=cover_data)
+
+ def handleResource(resource: TidalResource):
+ match resource.type:
+ case "track":
+ track = api.getTrack(resource.id)
+ file_name = formatTrack(
+ template=template or ctx.obj.config.template.track, track=track
+ )
+
+ downloadTrack(
+ track=track,
+ file_name=file_name,
+ )
+
+ case "album":
+ album = api.getAlbum(resource.id)
+
+ downloadAlbum(album)
+
+ case "artist":
+ # TODO: add `include_singles`
+ # TODO: fetch all items
+ artist_albums = api.getArtistAlbums(resource.id)
+
+ for album in artist_albums.items:
+ downloadAlbum(album)
+
+ case "playlist":
+ playlist = api.getPlaylist(resource.id)
+ click.echo(f"★ Playlist {playlist.title}")
+
+ # TODO: fetch all items
+ playlist_items = api.getPlaylistItems(resource.id)
+
+ for item in playlist_items.items:
+ if isinstance(
+ item.item, PlaylistItems.PlaylistTrackItem.PlaylistTrack
+ ):
+ track = item.item
+
+ file_name = formatTrack(
+ template=template or ctx.obj.config.template.playlist,
+ track=track,
+ playlist_title=playlist.title,
+ playlist_index=track.index // 100000,
+ )
+
+ downloadTrack(track=item.item, file_name=file_name)
+
+ for resource in ctx.obj.resources:
+ try:
+ handleResource(resource)
+
+ except ApiError as e:
+ click.echo(click.style(f"✖ {e}", "red"))
+
+ except AuthError as e:
+ click.echo(click.style(f"✖ {e}", "red"))
+
+
+UrlGroup.add_command(DownloadCommand)
+SearchGroup.add_command(DownloadCommand)
+FavGroup.add_command(DownloadCommand)
+FileGroup.add_command(DownloadCommand)
diff --git a/tiddl/cli/download/fav.py b/tiddl/cli/download/fav.py
new file mode 100644
index 0000000..5a38259
--- /dev/null
+++ b/tiddl/cli/download/fav.py
@@ -0,0 +1,46 @@
+import click
+
+from tiddl.utils import TidalResource, ResourceTypeLiteral
+from ..ctx import Context, passContext
+
+ResourceTypeList: list[ResourceTypeLiteral] = ["track", "album", "artist", "playlist"]
+
+
+@click.group("fav")
+@click.option(
+ "--resource",
+ "-r",
+ "resource_types",
+ multiple=True,
+ type=click.Choice(ResourceTypeList),
+)
+@passContext
+def FavGroup(ctx: Context, resource_types: list[ResourceTypeLiteral]):
+ """Get your Tidal favorites"""
+
+ api = ctx.obj.getApi()
+
+ favorites = api.getFavorites()
+ favorites_dict = favorites.model_dump()
+
+ click.echo(type(resource_types))
+
+ if not resource_types:
+ resource_types = ResourceTypeList
+
+ stats: dict[ResourceTypeLiteral, int] = dict()
+
+ for resource_type in resource_types:
+ resources = favorites_dict[resource_type.upper()]
+
+ stats[resource_type] = len(resources)
+
+ for resource_id in resources:
+ ctx.obj.resources.append(TidalResource(id=resource_id, type=resource_type))
+
+ # TODO: show pretty message
+
+ click.echo(click.style(f"Loaded {len(ctx.obj.resources)} resources", "green"))
+
+ for resource_type, count in stats.items():
+ click.echo(f"{resource_type} - {count}")
diff --git a/tiddl/cli/download/file.py b/tiddl/cli/download/file.py
new file mode 100644
index 0000000..9204101
--- /dev/null
+++ b/tiddl/cli/download/file.py
@@ -0,0 +1,40 @@
+import click
+import json
+
+from io import TextIOWrapper
+from os.path import splitext
+
+from ..ctx import Context, passContext
+from tiddl.utils import TidalResource
+
+
+@click.group("file")
+@click.argument("filename", type=click.File(mode="r"))
+@passContext
+def FileGroup(ctx: Context, filename: TextIOWrapper):
+ """Parse txt or JSON file with urls"""
+
+ _, extension = splitext(filename.name)
+
+ resource_strings: list[str]
+
+ match extension:
+ case ".json":
+ try:
+ resource_strings = json.load(filename)
+ except json.JSONDecodeError as e:
+ raise click.UsageError(f"Cant decode JSON file - {e.msg}")
+
+ case ".txt":
+ resource_strings = [line.strip() for line in filename.readlines()]
+
+ case _:
+ raise click.UsageError(f"Unsupported file extension - {extension}")
+
+ for string in resource_strings:
+ try:
+ ctx.obj.resources.append(TidalResource.fromString(string))
+ except ValueError as e:
+ click.echo(click.style(e, "red"))
+
+ click.echo(click.style(f"Loaded {len(ctx.obj.resources)} resources", "green"))
diff --git a/tiddl/cli/download/search.py b/tiddl/cli/download/search.py
new file mode 100644
index 0000000..db8c82a
--- /dev/null
+++ b/tiddl/cli/download/search.py
@@ -0,0 +1,44 @@
+import click
+
+from tiddl.utils import TidalResource
+from tiddl.models.resource import Artist, Album, Playlist, Track, Video
+
+from ..ctx import Context, passContext
+
+
+@click.group("search")
+@click.argument("query")
+@passContext
+def SearchGroup(ctx: Context, query: str):
+ """Search on Tidal"""
+
+ # TODO: give user interactive choice what to select
+
+ api = ctx.obj.getApi()
+
+ search = api.getSearch(query)
+
+ # issue is that we get resource data in search api call,
+ # in download we refetch that data.
+ # it's not that big deal as we refetch one resource at most,
+ # but it should be redesigned
+
+ value = search.topHit.value
+ icon = click.style("\u2bcc", "magenta")
+
+ if isinstance(value, Album):
+ resource = TidalResource(type="album", id=str(value.id))
+ click.echo(f"{icon} Album {value.title}")
+ elif isinstance(value, Artist):
+ resource = TidalResource(type="artist", id=str(value.id))
+ click.echo(f"{icon} Artist {value.name}")
+ elif isinstance(value, Track):
+ resource = TidalResource(type="track", id=str(value.id))
+ click.echo(f"{icon} Track {value.title}")
+ elif isinstance(value, Playlist):
+ resource = TidalResource(type="playlist", id=str(value.uuid))
+ click.echo(f"{icon} Playlist {value.title}")
+ elif isinstance(value, Video):
+ click.echo(f"{icon} Video {value.title} (currently not supported)")
+
+ ctx.obj.resources.append(resource)
diff --git a/tiddl/cli/download/url.py b/tiddl/cli/download/url.py
new file mode 100644
index 0000000..265987d
--- /dev/null
+++ b/tiddl/cli/download/url.py
@@ -0,0 +1,27 @@
+import click
+
+from ..ctx import Context, passContext
+
+from tiddl.utils import TidalResource
+
+
+class TidalURL(click.ParamType):
+ def convert(self, value: str, param, ctx) -> TidalResource:
+ try:
+ return TidalResource.fromString(value)
+ except ValueError as e:
+ self.fail(message=str(e), param=param, ctx=ctx)
+
+
+@click.group("url")
+@click.argument("url", type=TidalURL())
+@passContext
+def UrlGroup(ctx: Context, url: TidalResource):
+ """
+ Get Tidal URL.
+
+ It can be Tidal link or `resource_type/resource_id` format.
+ The resource can be a track, album, playlist or artist.
+ """
+
+ ctx.obj.resources.append(url)
diff --git a/tiddl/config.py b/tiddl/config.py
index 60c8a6d..cde1a84 100644
--- a/tiddl/config.py
+++ b/tiddl/config.py
@@ -1,105 +1,48 @@
-import json
-import logging
-
+from pydantic import BaseModel
from pathlib import Path
-from typing import TypedDict, Any
-from .types import TrackQuality
+from tiddl.models.constants import TrackArg
-class Settings(TypedDict, total=False):
- download_path: str
- track_quality: TrackQuality
- track_template: str
- album_template: str
- playlist_template: str
- file_extension: str
+CONFIG_PATH = Path.home() / "tiddl.json"
+CONFIG_INDENT = 2
-class User(TypedDict, total=False):
- user_id: str
- country_code: str
+class TemplateConfig(BaseModel):
+ track: str = "{artist} - {title}"
+ album: str = "{album_artist}/{album}/{number:02d}. {title}"
+ playlist: str = "{playlist}/{playlist_number:02d}. {artist} - {title}"
-class ConfigData(TypedDict, total=False):
- token: str
- refresh_token: str
- token_expires_at: int
- settings: Settings
- user: User
+class DownloadConfig(BaseModel):
+ quality: TrackArg = "high"
+ path: Path = Path.home() / "Music" / "Tiddl"
-HOME_DIRECTORY = str(Path.home())
-CONFIG_FILENAME = ".tiddl_config.json"
-DEFAULT_CONFIG: ConfigData = {
- "token": "",
- "refresh_token": "",
- "token_expires_at": 0,
- "settings": {
- "download_path": f"{HOME_DIRECTORY}/tidal_download",
- "track_quality": "HIGH",
- "track_template": "{artist}/{title}",
- "album_template": "{artist}/{album}/{title}",
- "playlist_template": "{playlist}/{title}",
- "file_extension": ""
- },
- "user": {"user_id": "", "country_code": ""},
-}
+class AuthConfig(BaseModel):
+ token: str = ""
+ refresh_token: str = ""
+ expires: int = 0
+ user_id: str = ""
+ country_code: str = ""
-class Config:
- def __init__(self, config_path="") -> None:
- if config_path == "":
- self.config_directory = HOME_DIRECTORY
- else:
- self.config_directory = config_path
- self.config_path = f"{self.config_directory}/{CONFIG_FILENAME}"
- self._config: ConfigData = DEFAULT_CONFIG
- self._logger = logging.getLogger("Config")
+class Config(BaseModel):
+ template: TemplateConfig = TemplateConfig()
+ download: DownloadConfig = DownloadConfig()
+ auth: AuthConfig = AuthConfig()
- try:
- with open(self.config_path, "r") as f:
- loaded_config: ConfigData = json.load(f)
- loaded_settings = loaded_config.get("settings")
- self._logger.debug(f"loaded {loaded_settings}")
- self.update(loaded_config)
+ def save(self):
+ with open(CONFIG_PATH, "w") as f:
+ f.write(self.model_dump_json(indent=CONFIG_INDENT))
+ @classmethod
+ def fromFile(cls):
+ try:
+ with CONFIG_PATH.open() as f:
+ config = cls.model_validate_json(f.read())
except FileNotFoundError:
- self._logger.debug("creating new file")
- self._save() # save default config if file does not exist
- self._logger.debug("created new file")
-
- def _save(self) -> None:
- with open(self.config_path, "w") as f:
- self._logger.debug(self._config.get("settings"))
- json.dump(self._config, f, indent=2)
-
- def __getitem__(self, key: str) -> Any:
- return self._config[key]
-
- def __iter__(self):
- return iter(self._config)
-
- def __str__(self) -> str:
- return json.dumps(self._config, indent=2)
-
- def update(self, data: ConfigData) -> ConfigData:
- self._logger.debug("updating")
- merged_config: ConfigData = merge(data, self._config)
- self._config.update(merged_config)
- self._save()
- self._logger.debug("updated")
- return self._config.copy()
-
-
-def merge(source, destination):
- # https://stackoverflow.com/a/20666342
- for key, value in source.items():
- if isinstance(value, dict):
- # get node or create one
- node = destination.setdefault(key, {})
- merge(value, node)
- else:
- destination[key] = value
+ config = cls()
- return destination
+ config.save()
+ return config
diff --git a/tiddl/download.py b/tiddl/download.py
index 2a01bf5..0018a36 100644
--- a/tiddl/download.py
+++ b/tiddl/download.py
@@ -1,92 +1,14 @@
import logging
-import requests
-import json
-import os
-import ffmpeg
-from queue import Queue
-from threading import Thread
-from time import time
-from xml.etree.ElementTree import fromstring
+from requests import Session
+from pydantic import BaseModel
from base64 import b64decode
-from typing import TypedDict, List
-
-from .types.track import ManifestMimeType
-
-THREADS_COUNT = 4
-
-logger = logging.getLogger("download")
-
-
-class Worker(Thread):
- def __init__(self, queue: Queue, function):
- Thread.__init__(self)
- self.queue = queue
- self.function = function
- self.daemon = True
- self.start()
-
- def run(self):
- while True:
- arg = self.queue.get()
- self.function(arg)
- self.queue.task_done()
-
-
-class Threader:
- def __init__(self, workers_num: int, target, args: list) -> None:
- self.queue = Queue()
-
- for arg in args:
- self.queue.put(arg)
-
- self.workers: list[Worker] = [
- Worker(self.queue, target) for _ in range(workers_num)
- ]
-
- def run(self):
- ts = time()
- self.queue.join()
- return round(time() - ts, 2)
-
-
-class Downloader:
- def __init__(self) -> None:
- self.indexed_content: list[tuple[int, bytes]] = []
- self.session = requests.Session()
- self.total = 0
-
- def download(self, urls: list[str]) -> bytes:
- self.total = len(urls)
- indexed_urls = [(i, url) for (i, url) in enumerate(urls)]
- threader = Threader(THREADS_COUNT, self._downloadFragment, indexed_urls)
- threader.run()
- sorted_content = sorted(self.indexed_content, key=lambda x: x[0])
- data = b"".join(content for _, content in sorted_content)
- return data
-
- def _downloadFragment(self, arg: tuple[int, str]):
- index, url = arg
- req = self.session.get(url)
- self.indexed_content.append((index, req.content))
- showProgressBar(
- len(self.indexed_content), self.total, "threaded download", show_size=False
- )
-
-
-def decodeManifest(manifest: str):
- return b64decode(manifest).decode()
+from xml.etree.ElementTree import fromstring
+from tiddl.models.api import TrackStream
-def parseManifest(manifest: str):
- class AudioFileInfo(TypedDict):
- mimeType: str
- codecs: str
- encryptionType: str
- urls: List[str]
- data: AudioFileInfo = json.loads(manifest)
- return data
+logger = logging.getLogger(__name__)
def parseManifestXML(xml_content: str):
@@ -104,7 +26,7 @@ def parseManifestXML(xml_content: str):
if representationElement is None:
raise ValueError("Representation element not found")
- codecs = representationElement.get("codecs")
+ codecs = representationElement.get("codecs", "")
segmentElement = representationElement.find(f"{NS}SegmentTemplate")
if segmentElement is None:
@@ -130,172 +52,40 @@ def parseManifestXML(xml_content: str):
return urls, codecs
-def showProgressBar(iteration: int, total: int, text: str, length=30, show_size=True):
- SQUARE, SQUARE_FILL = "□", "■"
- iteration_mb = iteration / 1024 / 1024
- total_mb = total / 1024 / 1024
- percent = 100 * (iteration / total)
- progress = int(length * iteration // total)
- bar = f"{SQUARE_FILL * progress}{SQUARE * (length - progress)}"
- size = f" {iteration_mb:.2f} / {total_mb:.2f} MB" if show_size else ""
- print(
- f"\r{text} {bar} {percent:.0f}%{size}",
- end="\r",
- )
- if iteration >= total:
- print()
-
-
-def download(url: str) -> bytes:
- logger.debug(url)
- # use session for performance
- session = requests.Session()
- req = session.get(url, stream=True)
- total_size = int(req.headers.get("content-length", 0))
- block_size = 1024 * 1024
- data = b""
+class TrackManifest(BaseModel):
+ mimeType: str
+ codecs: str
+ encryptionType: str
+ urls: list[str]
- for block in req.iter_content(block_size):
- data += block
- showProgressBar(len(data), total_size, "Single URL")
-
- return data
-
-
-def threadDownload(urls: list[str]) -> bytes:
- dl = Downloader()
- data = dl.download(urls)
-
- return data
-
-
-def toFlac(track_data: bytes) -> bytes:
- process = (
- ffmpeg.input("pipe:0")
- .output("pipe:1", format="flac", codec="copy")
- .run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True)
- )
- flac_data, stderr = process.communicate(input=track_data)
+def downloadTrackStream(stream: TrackStream) -> tuple[bytes, str]:
+ """Download data from track stream and return it with file extension."""
- if process.returncode != 0:
- raise RuntimeError(f"FFmpeg failed: {stderr.decode()}")
+ decoded_manifest = b64decode(stream.manifest).decode()
- return flac_data
-
-
-def downloadTrackStream(
- encoded_manifest: str,
- mime_type: ManifestMimeType,
-) -> tuple[bytes, str]:
- logger.debug(f"mime_type: {mime_type}")
- manifest = decodeManifest(encoded_manifest)
+ match stream.manifestMimeType:
+ case "application/vnd.tidal.bts":
+ track_manifest = TrackManifest.model_validate_json(decoded_manifest)
+ urls, codecs = track_manifest.urls, track_manifest.codecs
- match mime_type:
case "application/dash+xml":
- track_urls, codecs = parseManifestXML(manifest)
- case "application/vnd.tidal.bts":
- data = parseManifest(manifest)
- track_urls, codecs = data["urls"], data["codecs"]
- case _:
- raise ValueError(f"Unknown `mime_type`: {mime_type}")
+ urls, codecs = parseManifestXML(decoded_manifest)
- logger.debug(f"codecs: {codecs}")
+ logger.debug((stream.trackId, stream.audioQuality, codecs, len(urls)))
- if len(track_urls) == 1:
- track_data = download(track_urls[0])
+ if codecs == "flac":
+ file_extension = ".flac"
+ elif codecs.startswith("mp4"):
+ file_extension = ".m4a"
else:
- track_data = threadDownload(track_urls)
- track_data = toFlac(track_data)
-
- """
- known codecs
- flac (master)
- mp4a.40.2 (high)
- mp4a.40.5 (low)
- """
-
- if codecs is None:
- raise Exception("Missing codecs")
-
- extension = "flac"
-
- if codecs.startswith("mp4a"):
- extension = "m4a"
- elif codecs != "flac":
- logger.warning(
- f'unknown file codecs: "{codecs}", please submit this as issue on GitHub'
- )
-
- return track_data, extension
-
-
-def downloadCover(uid: str, path: str, size=1280):
- file = f"{path}/cover.jpg"
-
- if os.path.isfile(file):
- logger.debug(f"cover already exists ({file})")
- return
-
- formatted_uid = uid.replace("-", "/")
- url = f"https://resources.tidal.com/images/{formatted_uid}/{size}x{size}.jpg"
-
- req = requests.get(url)
-
- if req.status_code != 200:
- logger.error(f"could not download cover. ({req.status_code}) {url}")
- return
-
- try:
- with open(file, "wb") as f:
- f.write(req.content)
- except FileNotFoundError as e:
- logger.error(f"could not save cover. {file} -> {e}")
-
-
-class Cover:
- def __init__(self, uid: str, size=1280) -> None:
- if size > 1280:
- logger.warning(
- f"can not set cover size higher than 1280 (user set: {size})"
- )
- size = 1280
-
- self.uid = uid
-
- formatted_uid = uid.replace("-", "/")
- self.url = (
- f"https://resources.tidal.com/images/{formatted_uid}/{size}x{size}.jpg"
- )
-
- logger.debug((self.uid, self.url))
- self.content = self.get()
-
- def get(self) -> bytes:
- req = requests.get(self.url)
-
- if req.status_code != 200:
- logger.error(f"could not download cover. ({req.status_code}) {self.url}")
- return b""
-
- logger.debug("got cover")
-
- return req.content
-
- def save(self, path: str):
- if not self.content:
- logger.error("cover file content is empty")
- return
+ raise ValueError(f"Unknown codecs: {codecs}")
- file = f"{path}/cover.jpg"
+ with Session() as s:
+ stream_data = b""
- if os.path.isfile(file):
- logger.debug(f"cover already exists ({file})")
- return
+ for url in urls:
+ req = s.get(url)
+ stream_data += req.content
- try:
- with open(file, "wb") as f:
- logger.debug(file)
- f.write(self.content)
- except FileNotFoundError as e:
- logger.error(f"could not save cover. {file} -> {e}")
+ return stream_data, file_extension
diff --git a/tiddl/exceptions.py b/tiddl/exceptions.py
new file mode 100644
index 0000000..7cb9f6f
--- /dev/null
+++ b/tiddl/exceptions.py
@@ -0,0 +1,21 @@
+class AuthError(Exception):
+ def __init__(
+ self, status: int, error: str, sub_status: str, error_description: str
+ ):
+ self.status = status
+ self.error = error
+ self.sub_status = sub_status
+ self.error_description = error_description
+
+ def __str__(self):
+ return f"{self.status}: {self.error} - {self.error_description}"
+
+
+class ApiError(Exception):
+ def __init__(self, status: int, subStatus: str, userMessage: str):
+ self.status = status
+ self.sub_status = subStatus
+ self.user_message = userMessage
+
+ def __str__(self):
+ return f"{self.user_message} ({self.status} - {self.sub_status})"
diff --git a/tiddl/metadata.py b/tiddl/metadata.py
new file mode 100644
index 0000000..8a2673e
--- /dev/null
+++ b/tiddl/metadata.py
@@ -0,0 +1,100 @@
+import logging
+import requests
+
+from pathlib import Path
+
+from mutagen.flac import FLAC as MutagenFLAC, Picture
+from mutagen.easymp4 import EasyMP4 as MutagenEasyMP4
+from mutagen.mp4 import MP4Cover, MP4 as MutagenMP4
+
+from tiddl.models.resource import Track
+
+
+logger = logging.getLogger(__name__)
+
+
+def addMetadata(track_path: Path, track: Track, cover_data=b""):
+ extension = track_path.suffix
+
+ if extension == ".flac":
+ metadata = MutagenFLAC(track_path)
+ if cover_data:
+ picture = Picture()
+ picture.data = cover_data
+ picture.mime = "image/jpeg"
+ metadata.add_picture(picture)
+ elif extension == ".m4a":
+ if cover_data:
+ metadata = MutagenMP4(track_path)
+ metadata["covr"] = [MP4Cover(cover_data, imageformat=MP4Cover.FORMAT_JPEG)]
+ metadata.save(track_path)
+ metadata = MutagenEasyMP4(track_path)
+ else:
+ raise ValueError(f"Unknown file extension: {extension}")
+
+ new_metadata: dict[str, str] = {
+ "title": track.title,
+ "trackNumber": str(track.trackNumber),
+ "discnumber": str(track.volumeNumber),
+ "copyright": track.copyright,
+ "albumartist": track.artist.name if track.artist else "",
+ "artist": ";".join([artist.name.strip() for artist in track.artists]),
+ "album": track.album.title,
+ "date": str(track.streamStartDate) if track.streamStartDate else "",
+ }
+
+ metadata.update(new_metadata)
+
+ try:
+ metadata.save(track_path)
+ except Exception as e:
+ logger.error(f"Failed to set metadata for {extension}: {e}")
+
+
+class Cover:
+ def __init__(self, uid: str, size=1280) -> None:
+ if size > 1280:
+ logger.warning(
+ f"can not set cover size higher than 1280 (user set: {size})"
+ )
+ size = 1280
+
+ self.uid = uid
+
+ formatted_uid = uid.replace("-", "/")
+ self.url = (
+ f"https://resources.tidal.com/images/{formatted_uid}/{size}x{size}.jpg"
+ )
+
+ logger.debug((self.uid, self.url))
+
+ self.content = self._get()
+
+ def _get(self) -> bytes:
+ req = requests.get(self.url)
+
+ if req.status_code != 200:
+ logger.error(f"could not download cover. ({req.status_code}) {self.url}")
+ return b""
+
+ logger.debug(f"got cover: {self.uid}")
+
+ return req.content
+
+ def save(self, directory_path: Path):
+ if not self.content:
+ logger.error("cover file content is empty")
+ return
+
+ file = directory_path / "cover.jpg"
+
+ if file.exists():
+ logger.debug(f"cover already exists ({file})")
+ return
+
+ try:
+ with file.open("wb") as f:
+ f.write(self.content)
+
+ except FileNotFoundError as e:
+ logger.error(f"could not save cover. {file} -> {e}")
diff --git a/tiddl/models/__init__.py b/tiddl/models/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/tiddl/models/api.py b/tiddl/models/api.py
new file mode 100644
index 0000000..4326d1b
--- /dev/null
+++ b/tiddl/models/api.py
@@ -0,0 +1,137 @@
+from pydantic import BaseModel
+from typing import Optional, List, Literal, Union
+
+from .resource import Album, Artist, Playlist, Track, TrackQuality, Video
+
+__all__ = [
+ "SessionResponse",
+ "ArtistAlbumsItems",
+ "AlbumItems",
+ "PlaylistItems",
+ "Favorites",
+ "TrackStream",
+ "Search",
+]
+
+
+class SessionResponse(BaseModel):
+ class Client(BaseModel):
+ id: int
+ name: str
+ authorizedForOffline: bool
+ authorizedForOfflineDate: Optional[str]
+
+ sessionId: str
+ userId: int
+ countryCode: str
+ channelId: int
+ partnerId: int
+ client: Client
+
+
+class Items(BaseModel):
+ limit: int
+ offset: int
+ totalNumberOfItems: int
+
+
+class ArtistAlbumsItems(Items):
+ items: List[Album]
+
+
+ItemType = Literal["track", "video"]
+
+
+class AlbumItems(Items):
+ class VideoItem(BaseModel):
+ item: Video
+ type: ItemType = "video"
+
+ class TrackItem(BaseModel):
+ item: Track
+ type: ItemType = "track"
+
+ items: List[Union[TrackItem, VideoItem]]
+
+
+class PlaylistItems(Items):
+ class PlaylistVideoItem(BaseModel):
+ class PlaylistVideo(Video):
+ dateAdded: str
+ index: int
+ itemUuid: str
+
+ item: PlaylistVideo
+ type: ItemType = "video"
+ cut: None
+
+ class PlaylistTrackItem(BaseModel):
+ class PlaylistTrack(Track):
+ dateAdded: str
+ index: int
+ itemUuid: str
+
+ item: PlaylistTrack
+ type: ItemType = "track"
+ cut: None
+
+ items: List[Union[PlaylistTrackItem, PlaylistVideoItem]]
+
+
+class Favorites(BaseModel):
+ PLAYLIST: List[str]
+ ALBUM: List[str]
+ VIDEO: List[str]
+ TRACK: List[str]
+ ARTIST: List[str]
+
+
+class TrackStream(BaseModel):
+ trackId: int
+ assetPresentation: Literal["FULL"]
+ audioMode: Literal["STEREO"]
+ audioQuality: TrackQuality
+ manifestMimeType: Literal[
+ "application/dash+xml", "application/vnd.tidal.bts"
+ ]
+ manifestHash: str
+ manifest: str
+ albumReplayGain: float
+ albumPeakAmplitude: float
+ trackReplayGain: float
+ trackPeakAmplitude: float
+ bitDepth: Optional[int] = None
+ sampleRate: Optional[int] = None
+
+
+class SearchAlbum(Album):
+ # TODO: remove the artist field instead of making it None
+ artist: None = None
+
+
+class Search(BaseModel):
+ class Artists(Items):
+ items: List[Artist]
+
+ class Albums(Items):
+ items: List[SearchAlbum]
+
+ class Playlists(Items):
+ items: List[Playlist]
+
+ class Tracks(Items):
+ items: List[Track]
+
+ class Videos(Items):
+ items: List[Video]
+
+ class TopHit(BaseModel):
+ value: Union[Artist, Track, Playlist, SearchAlbum]
+ type: Literal["ARTISTS", "TRACKS", "PLAYLISTS", "ALBUMS"]
+
+ artists: Artists
+ albums: Albums
+ playlists: Playlists
+ tracks: Tracks
+ videos: Videos
+ topHit: TopHit
diff --git a/tiddl/types/auth.py b/tiddl/models/auth.py
similarity index 84%
rename from tiddl/types/auth.py
rename to tiddl/models/auth.py
index 11b22a5..c91d5a7 100644
--- a/tiddl/types/auth.py
+++ b/tiddl/models/auth.py
@@ -1,7 +1,8 @@
-from typing import TypedDict, Optional
+from typing import Optional
+from pydantic import BaseModel
-class _User(TypedDict):
+class AuthUser(BaseModel):
userId: int
email: str
countryCode: str
@@ -29,8 +30,8 @@ class _User(TypedDict):
newUser: bool
-class AuthResponse(TypedDict):
- user: _User
+class AuthResponse(BaseModel):
+ user: AuthUser
scope: str
clientName: str
token_type: str
@@ -43,7 +44,7 @@ class AuthResponseWithRefresh(AuthResponse):
refresh_token: str
-class AuthDeviceResponse(TypedDict):
+class AuthDeviceResponse(BaseModel):
deviceCode: str
userCode: str
verificationUri: str
diff --git a/tiddl/models/constants.py b/tiddl/models/constants.py
new file mode 100644
index 0000000..94b65b9
--- /dev/null
+++ b/tiddl/models/constants.py
@@ -0,0 +1,13 @@
+from typing import Literal
+
+TrackQuality = Literal["LOW", "HIGH", "LOSSLESS", "HI_RES_LOSSLESS"]
+TrackArg = Literal["low", "normal", "high", "master"]
+
+ARG_TO_QUALITY: dict[TrackArg, TrackQuality] = {
+ "low": "LOW",
+ "normal": "HIGH",
+ "high": "LOSSLESS",
+ "master": "HI_RES_LOSSLESS",
+}
+
+QUALITY_TO_ARG = {v: k for k, v in ARG_TO_QUALITY.items()}
diff --git a/tiddl/models/resource.py b/tiddl/models/resource.py
new file mode 100644
index 0000000..a84f6a2
--- /dev/null
+++ b/tiddl/models/resource.py
@@ -0,0 +1,191 @@
+from pydantic import BaseModel
+from datetime import datetime
+from typing import Optional, List, Literal, Dict
+from .constants import TrackQuality
+
+
+__all__ = ["Track", "Video", "Album", "Playlist", "Artist"]
+
+
+class Track(BaseModel):
+
+ class Artist(BaseModel):
+ id: int
+ name: str
+ type: str
+ picture: Optional[str] = None
+
+ class Album(BaseModel):
+ id: int
+ title: str
+ cover: Optional[str] = None
+ vibrantColor: Optional[str] = None
+ videoCover: Optional[str] = None
+
+ id: int
+ title: str
+ duration: int
+ replayGain: float
+ peak: float
+ allowStreaming: bool
+ streamReady: bool
+ adSupportedStreamReady: bool
+ djReady: bool
+ stemReady: bool
+ streamStartDate: Optional[datetime] = None
+ premiumStreamingOnly: bool
+ trackNumber: int
+ volumeNumber: int
+ version: Optional[str] = None
+ popularity: int
+ copyright: str
+ bpm: Optional[int] = None
+ url: str
+ isrc: str
+ editable: bool
+ explicit: bool
+ audioQuality: TrackQuality
+ audioModes: List[str]
+ mediaMetadata: Dict[str, List[str]]
+ # for real, artist can be None?
+ artist: Optional[Artist] = None
+ artists: List[Artist]
+ album: Album
+ mixes: Dict[str, str]
+
+
+class Video(BaseModel):
+
+ class Arist(BaseModel):
+ id: int
+ name: str
+ type: str
+ picture: Optional[str] = None
+
+ class Album(BaseModel):
+ id: int
+ title: str
+ cover: str
+ vibrantColor: str
+ videoCover: Optional[str] = None
+
+ id: int
+ title: str
+ volumeNumber: int
+ trackNumber: int
+ releaseDate: str
+ imagePath: Optional[str] = None
+ imageId: str
+ vibrantColor: str
+ duration: int
+ quality: str
+ streamReady: bool
+ adSupportedStreamReady: bool
+ djReady: bool
+ stemReady: bool
+ streamStartDate: str
+ allowStreaming: bool
+ explicit: bool
+ popularity: int
+ type: str
+ adsUrl: Optional[str] = None
+ adsPrePaywallOnly: bool
+ artist: Optional[Arist] = None
+ artists: List[Arist]
+ album: Optional[Album] = None
+
+
+class Album(BaseModel):
+
+ class Artist(BaseModel):
+ id: int
+ name: str
+ type: Literal["MAIN", "FEATURED"]
+ picture: Optional[str] = None
+
+ class MediaMetadata(BaseModel):
+ tags: List[Literal["LOSSLESS", "HIRES_LOSSLESS", "DOLBY_ATMOS"]]
+
+ id: int
+ title: str
+ duration: int
+ streamReady: bool
+ adSupportedStreamReady: bool
+ djReady: bool
+ stemReady: bool
+ streamStartDate: Optional[datetime] = None
+ allowStreaming: bool
+ premiumStreamingOnly: bool
+ numberOfTracks: int
+ numberOfVideos: int
+ numberOfVolumes: int
+ releaseDate: str
+ copyright: str
+ type: str
+ version: Optional[str] = None
+ url: str
+ cover: Optional[str] = None
+ vibrantColor: Optional[str] = None
+ videoCover: Optional[str] = None
+ explicit: bool
+ upc: str
+ popularity: int
+ audioQuality: str
+ audioModes: List[str]
+ mediaMetadata: MediaMetadata
+ artist: Artist
+ artists: List[Artist]
+
+
+class Playlist(BaseModel):
+
+ class Creator(BaseModel):
+ id: int
+
+ uuid: str
+ title: str
+ numberOfTracks: int
+ numberOfVideos: int
+ creator: Creator | Dict
+ description: Optional[str] = None
+ duration: int
+ lastUpdated: str
+ created: str
+ type: str
+ publicPlaylist: bool
+ url: str
+ image: Optional[str] = None
+ popularity: int
+ squareImage: str
+ promotedArtists: List[Album.Artist]
+ lastItemAddedAt: Optional[str] = None
+
+
+class Artist(BaseModel):
+
+ class Role(BaseModel):
+ categoryId: int
+ category: Literal[
+ "Artist",
+ "Songwriter",
+ "Performer",
+ "Producer",
+ "Engineer",
+ "Production team",
+ "Misc",
+ ]
+
+ class Mix(BaseModel):
+ ARTIST_MIX: str
+ MASTER_ARTIST_MIX: Optional[str] = None
+
+ id: int
+ name: str
+ artistTypes: Optional[List[Literal["ARTIST", "CONTRIBUTOR"]]] = None
+ url: Optional[str] = None
+ picture: Optional[str] = None
+ # only in search i guess
+ selectedAlbumCoverFallback: Optional[str] = None
+ popularity: Optional[int] = None
+ artistRoles: Optional[List[Role]] = None
+ mixes: Optional[Mix | Dict] = None
diff --git a/tiddl/parser.py b/tiddl/parser.py
deleted file mode 100644
index b5b48c9..0000000
--- a/tiddl/parser.py
+++ /dev/null
@@ -1,112 +0,0 @@
-import os
-import argparse
-
-from .types import TRACK_QUALITY
-from .types.track import TrackQuality
-
-
-def shouldNotColor() -> bool:
- # TODO: add more checks ✨
- checks = ["NO_COLOR" in os.environ]
- return any(checks)
-
-
-parser = argparse.ArgumentParser(
- description="\033[4mTIDDL\033[0m - Tidal Downloader",
- epilog="options defaults will be fetched from your config file.",
-)
-
-parser.add_argument(
- "input",
- type=str,
- nargs="*",
- help="track, album, playlist or artist - must be url, single id will be treated as track",
-)
-
-parser.add_argument(
- "-o",
- type=str,
- nargs="?",
- const=True,
- help="output file template, more info https://github.com/oskvr37/tiddl?tab=readme-ov-file#file-formatting",
- dest="file_template",
-)
-
-parser.add_argument(
- "-p",
- type=str,
- nargs="?",
- const=True,
- help="download destination path",
- dest="download_path",
-)
-
-parser.add_argument(
- "-e",
- type=str,
- nargs="?",
- const=True,
- help="choose file extension",
- dest="file_extension",
-)
-
-QUALITY_ARGS: dict[str, TrackQuality] = {
- details["arg"]: quality for quality, details in TRACK_QUALITY.items()
-}
-
-parser.add_argument(
- "-q",
- nargs="?",
- help="track quality",
- dest="quality",
- choices=QUALITY_ARGS.keys(),
-)
-
-parser.add_argument(
- "-is",
- help="include artist EPs and singles when downloading artist",
- dest="include_singles",
- action="store_true",
-)
-
-parser.add_argument(
- "-s",
- help="save options to config // show config file",
- dest="save_options",
- action="store_true",
-)
-
-parser.add_argument(
- "-i",
- type=str,
- nargs="?",
- const=True,
- help="choose a file with urls (.txt file separated with newlines or .json list)",
- dest="input_file",
- default="",
-)
-
-parser.add_argument(
- "--no-skip",
- help="dont skip already downloaded tracks",
- action="store_true",
-)
-
-parser.add_argument(
- "--silent",
- help="silent mode",
- action="store_true",
-)
-
-parser.add_argument(
- "--verbose",
- help="show debug logs",
- action="store_true",
-)
-
-parser.add_argument(
- "--no-color",
- help="suppress output colors",
- action="store_true",
- default=shouldNotColor(),
-)
diff --git a/tiddl/tests.py b/tiddl/tests.py
deleted file mode 100644
index e67ab97..0000000
--- a/tiddl/tests.py
+++ /dev/null
@@ -1,200 +0,0 @@
-import unittest
-import subprocess
-import shutil
-
-from .utils import parseURL, formatFilename
-from .types.track import Track
-
-
-class TestUtils(unittest.TestCase):
-
- def test_parseURL(self):
- self.assertEqual(
- parseURL("https://tidal.com/browse/track/284165609"), ("track", "284165609")
- )
- self.assertEqual(
- parseURL("https://tidal.com/browse/track/284165609/"),
- ("track", "284165609"),
- )
- self.assertEqual(
- parseURL("https://tidal.com/browse/track/284165609?u"),
- ("track", "284165609"),
- )
- self.assertEqual(
- parseURL(
- "https://listen.tidal.com/album/284165608/track/284165609",
- ),
- ("track", "284165609"),
- )
-
- self.assertEqual(
- parseURL("https://listen.tidal.com/album/284165608"), ("album", "284165608")
- )
- self.assertEqual(
- parseURL("https://tidal.com/browse/album/284165608"), ("album", "284165608")
- )
- self.assertEqual(
- parseURL("https://tidal.com/browse/album/284165608?u"),
- ("album", "284165608"),
- )
-
- self.assertEqual(
- parseURL("https://listen.tidal.com/artist/7695548"), ("artist", "7695548")
- )
- self.assertEqual(
- parseURL("https://tidal.com/browse/artist/7695548"), ("artist", "7695548")
- )
-
- self.assertEqual(
- parseURL(
- "https://tidal.com/browse/playlist/803be625-97e4-4cbb-88dd-43f0b1c61ed7"
- ),
- ("playlist", "803be625-97e4-4cbb-88dd-43f0b1c61ed7"),
- )
- self.assertEqual(
- parseURL(
- "https://listen.tidal.com/playlist/803be625-97e4-4cbb-88dd-43f0b1c61ed7"
- ),
- ("playlist", "803be625-97e4-4cbb-88dd-43f0b1c61ed7"),
- )
-
- self.assertEqual(
- parseURL(
- "https://listen.tidal.com/playlist/803be625-97e4-4cbb-88dd-43f0b1c61ed7"
- ),
- ("playlist", "803be625-97e4-4cbb-88dd-43f0b1c61ed7"),
- )
-
- self.assertEqual(parseURL("track/284165609"), ("track", "284165609"))
- self.assertEqual(
- parseURL("playlist/803be625-97e4-4cbb-88dd-43f0b1c61ed7"),
- ("playlist", "803be625-97e4-4cbb-88dd-43f0b1c61ed7"),
- )
-
- # we can also omit domain
- self.assertEqual(
- parseURL("playlist/803be625-97e4-4cbb-88dd-43f0b1c61ed7"),
- ("playlist", "803be625-97e4-4cbb-88dd-43f0b1c61ed7"),
- )
-
- self.assertRaises(ValueError, parseURL, "")
-
- def test_formatFilename(self):
- track: Track = {
- "id": 133017101,
- "title": "HAUTE COUTURE",
- "duration": 243,
- "replayGain": -7.7,
- "peak": 0.944031,
- "allowStreaming": True,
- "streamReady": True,
- "adSupportedStreamReady": True,
- "djReady": True,
- "stemReady": False,
- "streamStartDate": "2020-03-05T00:00:00.000+0000",
- "premiumStreamingOnly": False,
- "trackNumber": 1,
- "volumeNumber": 1,
- "version": None,
- "popularity": 29,
- "copyright": "2020 TUZZA Globale",
- "bpm": None,
- "url": "http://www.tidal.com/track/133017101",
- "isrc": "PL70D1900060",
- "editable": False,
- "explicit": False,
- "audioQuality": "LOSSLESS",
- "audioModes": ["STEREO"],
- "mediaMetadata": {"tags": ["LOSSLESS"]},
- "artist": {
- "id": 9550100,
- "name": "Tuzza Globale",
- "type": "MAIN",
- "picture": "125c9343-3257-407a-8285-5e9f1d283a2e",
- },
- "artists": [
- {
- "id": 9550100,
- "name": "Tuzza Globale",
- "type": "MAIN",
- "picture": "125c9343-3257-407a-8285-5e9f1d283a2e",
- },
- {
- "id": 6847736,
- "name": "Taco Hemingway",
- "type": "FEATURED",
- "picture": "7a1f5193-5d96-452c-b8dd-5ff0f81d5335",
- },
- ],
- "album": {
- "id": 133017100,
- "title": "HAUTE COUTURE",
- "cover": "efd381c2-a982-4d09-bb15-da872006cadf",
- "vibrantColor": "#f6a285",
- "videoCover": None,
- },
- "mixes": {"TRACK_MIX": "001ec78dae0d4a470999adefffd570"},
- "playlistNumber": None
- }
-
- self.assertEqual(formatFilename("{title}", track), ("", "HAUTE COUTURE"))
- self.assertEqual(
- formatFilename("{artist} - {title}", track),
- ("", "Tuzza Globale - HAUTE COUTURE"),
- )
- self.assertEqual(
- formatFilename("{album} - {title}", track),
- ("", "HAUTE COUTURE - HAUTE COUTURE"),
- )
- self.assertEqual(
- formatFilename("{number}. {title}", track), ("", "1. HAUTE COUTURE")
- )
- self.assertEqual(
- formatFilename("{artists} - {title}", track),
- ("", "Tuzza Globale, Taco Hemingway - HAUTE COUTURE"),
- )
- self.assertEqual(
- formatFilename("{id}", track),
- ("", "133017101"),
- )
- self.assertEqual(
- formatFilename("{album}/{title}", track),
- ("HAUTE COUTURE", "HAUTE COUTURE"),
- )
-
-
-TRACK_ID = "284165609"
-DOWNLOAD_DIR = "download_test"
-
-
-class TestTiddl(unittest.TestCase):
-
- @classmethod
- def setUpClass(cls):
- try:
- shutil.rmtree(DOWNLOAD_DIR)
- except FileNotFoundError:
- pass
-
- @classmethod
- def tearDownClass(cls):
- try:
- shutil.rmtree(DOWNLOAD_DIR)
- except FileNotFoundError:
- pass
-
- def test_noInput(self):
- result = subprocess.run(["tiddl"])
- self.assertEqual(result.returncode, 0)
-
- def test_downloadTrack(self):
- result = subprocess.run(["tiddl", TRACK_ID, "-p", DOWNLOAD_DIR])
- self.assertEqual(result.returncode, 0)
-
- def test_downloadTrackExists(self):
- result = subprocess.run(["tiddl", TRACK_ID, "-p", DOWNLOAD_DIR])
- self.assertEqual(result.returncode, 0)
-
-
-if __name__ == "__main__":
- unittest.main()
diff --git a/tiddl/types/__init__.py b/tiddl/types/__init__.py
deleted file mode 100644
index 5117da3..0000000
--- a/tiddl/types/__init__.py
+++ /dev/null
@@ -1,24 +0,0 @@
-from typing import TypedDict, Literal
-
-from .api import *
-from .track import *
-
-TrackArg = Literal["low", "normal", "high", "master"]
-
-
-class QualityDetails(TypedDict):
- name: str
- details: str
- arg: TrackArg
-
-
-TRACK_QUALITY: dict[TrackQuality, QualityDetails] = {
- "LOW": {"name": "Low", "details": "96 kbps", "arg": "low"},
- "HIGH": {"name": "Low", "details": "320 kbps", "arg": "normal"},
- "LOSSLESS": {"name": "High", "details": "16-bit, 44.1 kHz", "arg": "high"},
- "HI_RES_LOSSLESS": {
- "name": "Max",
- "details": "Up to 24-bit, 192 kHz",
- "arg": "master",
- },
-}
diff --git a/tiddl/types/api.py b/tiddl/types/api.py
deleted file mode 100644
index d676dee..0000000
--- a/tiddl/types/api.py
+++ /dev/null
@@ -1,119 +0,0 @@
-from typing import TypedDict, Optional, List, Literal
-
-from .track import Track
-
-
-class ErrorResponse(TypedDict):
- status: int
- subStatus: int
- userMessage: str
-
-
-class Client(TypedDict):
- id: int
- name: str
- authorizedForOffline: bool
- authorizedForOfflineDate: Optional[str]
-
-
-class SessionResponse(TypedDict):
- sessionId: str
- userId: int
- countryCode: str
- channelId: int
- partnerId: int
- client: Client
-
-
-class Items(TypedDict):
- limit: int
- offset: int
- totalNumberOfItems: int
-
-
-class ArtistAlbum(TypedDict):
- id: int
- name: str
- type: Literal["MAIN"]
-
-
-class Album(TypedDict):
- id: int
- title: str
- duration: int
- streamReady: bool
- streamStartDate: str
- allowStreaming: bool
- premiumStreamingOnly: bool
- numberOfTracks: int
- numberOfVideos: int
- numberOfVolumes: int
- releaseDate: str
- copyright: str
- type: str
- version: Optional[str]
- url: str
- cover: str
- videoCover: Optional[str]
- explicit: bool
- upc: str
- popularity: int
- audioQuality: str
- audioModes: List[str]
- artist: ArtistAlbum
- artists: List[ArtistAlbum]
-
-
-class AristAlbumsItems(Items):
- items: List[Album]
-
-
-class _AlbumTrack(TypedDict):
- item: Track
- type: Literal["track"]
-
-
-class AlbumItems(Items):
- items: List[_AlbumTrack]
-
-
-class _Creator(TypedDict):
- id: int
-
-
-class Playlist(TypedDict):
- uuid: str
- title: str
- numberOfTracks: int
- numberOfVideos: int
- creator: _Creator
- description: str
- duration: int
- lastUpdated: str
- created: str
- type: str
- publicPlaylist: bool
- url: str
- image: str
- popularity: int
- squareImage: str
- promotedArtists: List[ArtistAlbum]
- lastItemAddedAt: str
-
-
-class _PlaylistItem(TypedDict):
- item: Track
- type: Literal["track"]
- cut: Literal[None]
-
-
-class PlaylistItems(Items):
- items: List[_PlaylistItem]
-
-
-class Favorites(TypedDict):
- PLAYLIST: List[str]
- ALBUM: List[str]
- VIDEO: List[str]
- TRACK: List[str]
- ARTIST: List[str]
diff --git a/tiddl/types/track.py b/tiddl/types/track.py
deleted file mode 100644
index 29cc5f4..0000000
--- a/tiddl/types/track.py
+++ /dev/null
@@ -1,71 +0,0 @@
-from typing import TypedDict, Optional, List, Dict, Literal, Optional
-
-
-TrackQuality = Literal["LOW", "HIGH", "LOSSLESS", "HI_RES_LOSSLESS"]
-ManifestMimeType = Literal["application/dash+xml", "application/vnd.tidal.bts"]
-
-
-class TrackStream(TypedDict):
- trackId: int
- assetPresentation: Literal["FULL"]
- audioMode: Literal["STEREO"]
- audioQuality: TrackQuality
- manifestMimeType: ManifestMimeType
- manifestHash: str
- manifest: str
- albumReplayGain: float
- albumPeakAmplitude: float
- trackReplayGain: float
- trackPeakAmplitude: float
- bitDepth: Optional[int]
- sampleRate: Optional[int]
-
-
-class _Artist(TypedDict):
- id: int
- name: str
- type: str
- picture: Optional[str]
-
-
-class _Album(TypedDict):
- id: int
- title: str
- cover: str
- vibrantColor: str
- videoCover: Optional[str]
-
-
-class Track(TypedDict):
- id: int
- title: str
- duration: int
- replayGain: float
- peak: float
- allowStreaming: bool
- streamReady: bool
- adSupportedStreamReady: bool
- djReady: bool
- stemReady: bool
- streamStartDate: str
- premiumStreamingOnly: bool
- trackNumber: int
- volumeNumber: int
- version: Optional[str]
- popularity: int
- copyright: str
- bpm: Optional[int]
- url: str
- isrc: str
- editable: bool
- explicit: bool
- audioQuality: str
- audioModes: List[str]
- mediaMetadata: Dict[str, List[str]]
- artist: _Artist
- artists: List[_Artist]
- album: _Album
- mixes: Dict[str, str]
-
- # this is used only when downloading playlist
- playlistNumber: Optional[int]
diff --git a/tiddl/utils.py b/tiddl/utils.py
index dd53155..b58f886 100644
--- a/tiddl/utils.py
+++ b/tiddl/utils.py
@@ -1,315 +1,112 @@
import re
-import os
-import json
-import logging
-import subprocess
-from datetime import datetime
-from typing import TypedDict, Literal, List, get_args
-from mutagen.flac import FLAC as MutagenFLAC, Picture
-from mutagen.easymp4 import EasyMP4 as MutagenEasyMP4
-from mutagen.mp4 import MP4Cover, MP4 as MutagenMP4
+from pydantic import BaseModel
+from urllib.parse import urlparse
+from pathlib import Path
-from .types.track import Track
-from .config import HOME_DIRECTORY
+from typing import Literal, get_args
-RESOURCE = Literal["track", "album", "artist", "playlist"]
-RESOURCE_LIST: List[RESOURCE] = list(get_args(RESOURCE))
+from tiddl.models.constants import TrackQuality, QUALITY_TO_ARG
+from tiddl.models.resource import Track
+ResourceTypeLiteral = Literal["track", "album", "playlist", "artist"]
-logger = logging.getLogger("utils")
-
-def parseFileInput(file: str) -> list[str]:
- _, file_extension = os.path.splitext(file)
- logger.debug(file, file_extension)
- urls_set: set[str] = set()
-
- if file_extension == ".txt":
- with open(file) as f:
- data = f.read()
- urls_set.update(data.splitlines())
- elif file_extension == ".json":
- with open(file) as f:
- data = json.load(f)
- urls_set.update(data)
- else:
- logger.warning(f"a file with '{file_extension}' extension is not supported!")
-
- filtered_urls = [url for url in urls_set if type(url) == str]
-
- return filtered_urls
-
-
-def parseURL(url: str) -> tuple[RESOURCE, str]:
- # remove trailing slash
- url = url.rstrip("/")
- # remove params
- url = url.split("?")[0]
-
- fragments = url.split("/")
-
- if len(fragments) < 2:
- raise ValueError(f"Invalid input: {url}")
-
- parsed_type, parsed_id = fragments[-2], fragments[-1]
-
- if parsed_type not in RESOURCE_LIST:
- raise ValueError(f"Invalid resource type: {parsed_type} ({url})")
-
- return parsed_type, parsed_id
-
-
-class FormattedTrack(TypedDict):
+class TidalResource(BaseModel):
+ type: ResourceTypeLiteral
id: str
- title: str
- number: str
- disc_number: str
- artist: str
- album: str
- artists: str
- playlist: str
- released: str
- year: str
- playlist_number: str
- version: str
-
-
-def formatFilename(template: str, track: Track, playlist=""):
- artists = [artist["name"].strip() for artist in track["artists"]]
-
- release_date = datetime.strptime(
- track["streamStartDate"] or "1970-01-01T00:00:00.000+0000",
- "%Y-%m-%dT%H:%M:%S.000+0000",
- )
- version = track.get("version", "")
-
- formatted_track: FormattedTrack = {
- "album": re.sub(r'[<>:"|?*/\\]', "_", track["album"]["title"].strip()),
- "artist": track["artist"]["name"].strip(),
- "artists": ", ".join(artists),
- "id": str(track["id"]),
- "title": track["title"].strip(),
- "number": str(track["trackNumber"]),
- "disc_number": str(track["volumeNumber"]),
- "playlist": playlist.strip(),
- "released": release_date.strftime("%m-%d-%Y"),
- "year": release_date.strftime("%Y"),
- "playlist_number": str(track.get("playlistNumber", "")),
- "version": f"({version})" if version else "",
+ @property
+ def url(self) -> str:
+ return f"https://listen.tidal.com/{self.type}/{self.id}"
+
+ @classmethod
+ def fromString(cls, string: str):
+ """
+ Extracts the resource type (e.g., "track", "album")
+ and resource ID from a given input string.
+
+ The input string can either be a full URL or a shorthand string
+ in the format `resource_type/resource_id` (e.g., `track/12345678`).
+ """
+
+ path = urlparse(string).path
+ resource_type, resource_id = path.split("/")[-2:]
+
+ if resource_type not in get_args(ResourceTypeLiteral):
+ raise ValueError(f"Invalid resource type: {resource_type}")
+
+ if not resource_id.isdigit() and resource_type != "playlist":
+ raise ValueError(f"Invalid resource id: {resource_id}")
+
+ return cls(type=resource_type, id=resource_id) # type: ignore
+
+ def __str__(self) -> str:
+ return f"{self.type}/{self.id}"
+
+
+def sanitizeString(string: str) -> str:
+ pattern = r'[\\/:"*?<>|]+'
+ return re.sub(pattern, "", string)
+
+
+def formatTrack(
+ template: str, track: Track, album_artist="", playlist_title="", playlist_index=0
+) -> str:
+ artist = sanitizeString(track.artist.name) if track.artist else ""
+ features = [
+ sanitizeString(track_artist.name)
+ for track_artist in track.artists
+ if track_artist.name != artist
+ ]
+
+ track_dict = {
+ "id": str(track.id),
+ "title": sanitizeString(track.title),
+ "version": sanitizeString(track.version or ""),
+ "artist": artist,
+ "artists": ", ".join(features + [artist]),
+ "features": ", ".join(features),
+ "album": sanitizeString(track.album.title),
+ "number": track.trackNumber,
+ "disc": track.volumeNumber,
+ "date": (track.streamStartDate if track.streamStartDate else ""),
+ # i think we can remove year as we are able to format date
+ "year": track.streamStartDate.strftime("%Y") if track.streamStartDate else "",
+ "playlist": sanitizeString(playlist_title),
+ "bpm": track.bpm or "",
+ "quality": QUALITY_TO_ARG[track.audioQuality],
+ "album_artist": sanitizeString(album_artist),
+ "playlist_number": playlist_index or 0,
}
- dirs = template.split("/")
- filename = dirs.pop()
-
- formatted_filename = filename.format(**formatted_track)
- formatted_dir = "/".join(dirs).format(**formatted_track)
-
- return sanitizeDirName(formatted_dir), sanitizeFileName(formatted_filename)
-
-
-def sanitizeDirName(dir_name: str):
- # replace invalid characters with an underscore
- sanitized = re.sub(r'[<>:"|?*]', "_", dir_name)
- # strip whitespace
- sanitized = sanitized.strip()
-
- return sanitized
-
-
-def sanitizeFileName(file_name: str):
- # replace invalid characters with an underscore
- sanitized = re.sub(r'[<>:"|?*/\\]', "_", file_name)
- # strip whitespace
- sanitized = sanitized.strip()
-
- return sanitized
-
-
-def loadingSymbol(i: int, text: str):
- symbols = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]
- symbol = symbols[i % len(symbols)]
- print(f"\r{text} {symbol}", end="\r")
-
-
-def setMetadata(file: str, extension: str, track: Track, cover_data=b""):
- if extension == "flac":
- metadata = MutagenFLAC(file)
- if cover_data:
- picture = Picture()
- picture.data = cover_data
- picture.mime = "image/jpeg"
- metadata.add_picture(picture)
- elif extension == "m4a":
- if cover_data:
- metadata = MutagenMP4(file)
- metadata["covr"] = [MP4Cover(cover_data, imageformat=MP4Cover.FORMAT_JPEG)]
- metadata.save(file)
- metadata = MutagenEasyMP4(file)
- else:
- raise ValueError(f"Unknown file extension: {extension}")
-
- new_metadata: dict[str, str] = {
- "title": track["title"],
- "trackNumber": str(track["trackNumber"]),
- "discnumber": str(track["volumeNumber"]),
- "copyright": track["copyright"],
- "albumartist": track["artist"]["name"],
- "artist": ";".join([artist["name"].strip() for artist in track["artists"]]),
- "album": track["album"]["title"],
- "date": track["streamStartDate"][:10],
- }
+ formatted_track = template.format(**track_dict)
- metadata.update(new_metadata)
+ disallowed_chars = r'[\\:"*?<>|]+'
+ invalid_chars = re.findall(disallowed_chars, formatted_track)
- try:
- metadata.save(file)
- except Exception as e:
- logger.error(f"Failed to set metadata for {extension}: {e}")
-
-
-def convertFileExtension(source_path: str, file_extension: str, remove_source=True):
- source_dir, source_extension = os.path.splitext(source_path)
- dest_path = f"{source_dir}.{file_extension}"
-
- logger.debug((source_path, source_dir, source_extension, dest_path))
-
- if source_extension == f".{file_extension}":
- return source_path
-
- logger.debug(f"converting `{source_path}` to `{file_extension}`")
- command = ["ffmpeg", "-i", source_path, dest_path]
- result = subprocess.run(
- command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
- )
-
- if result.returncode != 0:
- logger.error(result.stderr)
- return source_path
-
- if remove_source:
- os.remove(source_path)
-
- return dest_path
-
-
-class Colors:
- def __init__(self, colored=True) -> None:
- if colored:
- self.BLACK = "\033[0;30m"
- self.GRAY = "\033[1;30m"
-
- self.RED = "\033[0;31m"
- self.LIGHT_RED = "\033[1;31m"
-
- self.GREEN = "\033[0;32m"
- self.LIGHT_GREEN = "\033[1;32m"
-
- self.YELLOW = "\033[0;33m"
- self.LIGHT_YELLOW = "\033[1;33m"
-
- self.BLUE = "\033[0;34m"
- self.LIGHT_BLUE = "\033[1;34m"
-
- self.PURPLE = "\033[0;35m"
- self.LIGHT_PURPLE = "\033[1;35m"
-
- self.CYAN = "\033[0;36m"
- self.LIGHT_CYAN = "\033[1;36m"
-
- self.LIGHT_GRAY = "\033[0;37m"
- self.LIGHT_WHITE = "\033[1;37m"
-
- self.RESET = "\033[0m"
- self.BOLD = "\033[1m"
- self.FAINT = "\033[2m"
- self.ITALIC = "\033[3m"
- self.UNDERLINE = "\033[4m"
- self.BLINK = "\033[5m"
- self.NEGATIVE = "\033[7m"
- self.CROSSED = "\033[9m"
- else:
- self.BLACK = ""
- self.GRAY = ""
-
- self.RED = ""
- self.LIGHT_RED = ""
-
- self.GREEN = ""
- self.LIGHT_GREEN = ""
-
- self.YELLOW = ""
- self.LIGHT_YELLOW = ""
-
- self.BLUE = ""
- self.LIGHT_BLUE = ""
-
- self.PURPLE = ""
- self.LIGHT_PURPLE = ""
-
- self.CYAN = ""
- self.LIGHT_CYAN = ""
-
- self.LIGHT_GRAY = ""
- self.LIGHT_WHITE = ""
+ if invalid_chars:
+ raise ValueError(
+ f"Template '{template}' and formatted track '{formatted_track}' contains disallowed characters: {' '.join(sorted(set(invalid_chars)))}"
+ )
- self.RESET = ""
- self.BOLD = ""
- self.FAINT = ""
- self.ITALIC = ""
- self.UNDERLINE = ""
- self.BLINK = ""
- self.NEGATIVE = ""
- self.CROSSED = ""
+ return formatted_track
-def initLogging(
- silent: bool, verbose: bool, directory=HOME_DIRECTORY, colored_logging=True
+def trackExists(
+ track_quality: TrackQuality, download_quality: TrackQuality, file_name: Path
):
- c = Colors(colored_logging)
-
- class StreamFormatter(logging.Formatter):
- FORMATS = {
- logging.DEBUG: f"{c.BLUE}[ %(name)s ] {c.CYAN}%(funcName)s {c.RESET}%(message)s",
- logging.INFO: f"{c.GREEN}[ %(name)s ] {c.RESET}%(message)s",
- logging.WARNING: f"{c.YELLOW}[ %(name)s ] {c.RESET}%(message)s",
- logging.ERROR: f"{c.RED}[ %(name)s ] %(message)s",
- logging.CRITICAL: f"{c.RED}[ %(name)s ] %(message)s",
- }
-
- def format(self, record):
- log_fmt = self.FORMATS.get(record.levelno)
- formatter = logging.Formatter(log_fmt)
- return formatter.format(record) + c.RESET
+ """
+ Predict track extension and check if track file exists.
+ """
- stream_handler = logging.StreamHandler()
+ FLAC_QUALITIES: list[TrackQuality] = ["LOSSLESS", "HI_RES_LOSSLESS"]
- file_handler = logging.FileHandler(f"{directory}/tiddl.log", "a", "utf-8")
-
- if silent:
- log_level = logging.WARNING
- elif verbose:
- log_level = logging.DEBUG
+ if download_quality in FLAC_QUALITIES and track_quality in FLAC_QUALITIES:
+ extension = ".flac"
else:
- log_level = logging.INFO
-
- stream_handler.setLevel(log_level)
- stream_handler.setFormatter(StreamFormatter())
-
- file_handler.setLevel(logging.DEBUG)
- file_handler.setFormatter(
- logging.Formatter(
- "%(asctime)s %(levelname)s\t%(name)s.%(funcName)s %(message)s",
- datefmt="%x %X",
- )
- )
+ extension = ".m4a"
- # suppress logs from third-party libraries
- logging.getLogger("requests").setLevel(logging.WARNING)
- logging.getLogger("urllib3").setLevel(logging.WARNING)
+ full_file_name = file_name.with_suffix(extension)
- logging.basicConfig(
- level=logging.DEBUG,
- handlers=[file_handler, stream_handler],
- )
+ return full_file_name.exists()