From ffcce62e0930ee0338095f0737e3f446428c58ab Mon Sep 17 00:00:00 2001 From: Jonas Winkler <17569239+jonaswinkler@users.noreply.github.com> Date: Wed, 12 Jun 2024 00:33:04 +0200 Subject: [PATCH] implement mbsubmit create release on musicbrainz --- beetsplug/mbsubmit.py | 450 +++++++++++++++++++++++++++++++++- docs/changelog.rst | 5 + docs/plugins/mbsubmit.rst | 47 +++- poetry.lock | 22 +- pyproject.toml | 3 + test/plugins/test_mbsubmit.py | 176 ++++++++++++- 6 files changed, 684 insertions(+), 19 deletions(-) diff --git a/beetsplug/mbsubmit.py b/beetsplug/mbsubmit.py index d215e616c1..b0eb29f14d 100644 --- a/beetsplug/mbsubmit.py +++ b/beetsplug/mbsubmit.py @@ -18,31 +18,323 @@ parseable by the MusicBrainz track parser [1]. Programmatic submitting is not implemented by MusicBrainz yet. +The plugin also allows the user to open the tracks in MusicBrainz Picard [2]. + +Another option this plugin provides is to help with creating a new release +on MusicBrainz by seeding the MusicBrainz release editor [3]. This works in +the following way: + +- Host a small web server that serves a web page. When loaded by the user, + this page will automatically POST data to MusicBrainz as described in [3]. +- The same web server also listens for a callback from MusicBrainz, see + redirect_uri [3] and will try to import an album using the newly created + release. +- jwt tokens with random keys are used to prevent using this web server in + unintended ways. + +This feature is loosely based on how browser integration is implemented in +Picard [4]. + [1] https://wiki.musicbrainz.org/History:How_To_Parse_Track_Listings +[2] https://picard.musicbrainz.org/ +[3] https://musicbrainz.org/doc/Development/Seeding/Release_Editor +[4] https://github.com/metabrainz/picard/blob/master/picard/browser/browser.py """ - import subprocess +import threading +import time +import uuid +import webbrowser +from collections import defaultdict +from dataclasses import dataclass +from html import escape +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from secrets import token_bytes +from typing import Optional +from urllib.parse import parse_qs, urlparse -from beets import ui +from beets import autotag, ui from beets.autotag import Recommendation from beets.plugins import BeetsPlugin +from beets.ui import print_ from beets.ui.commands import PromptChoice from beets.util import displayable_path +from beets.util.pipeline import PipelineThread from beetsplug.info import print_data +try: + import jwt +except ImportError: + jwt = None + +_form_template = """ + + + +
+ {form_data} + +
+ + +""" + +_form_input_template = '' + + +@dataclass +class CreateReleaseTask: + """ + Represents a task for creating a single release on MusicBrainz and its current status. + """ + + formdata: dict + """ + Form data to be submitted to MB. + """ + + result_release_mbid: Optional[str] = None + """ + Contains the release ID returned by MusicBrainz after the release was created. + """ + + browser_opened: bool = False + """ + True when the user has opened the link for this task in the browser. + """ + + +class RequestHandler(BaseHTTPRequestHandler): + """ + Request handler for built-in web server + """ + + def __init__(self, plugin: "MBSubmitPlugin", *args): + self._plugin = plugin + super(RequestHandler, self).__init__(*args) + + def do_GET(self): + try: + self._handle_get() + except Exception as e: + self._plugin._log.error("Unexpected server error", exc_info=e) + self._response(500, "Unexpected server error") + + def log_message(self, format, *args): + self._plugin._log.debug(format % args) + + def _handle_get(self): + parsed = urlparse(self.path) + args = parse_qs(parsed.query) + action = parsed.path + + if action == "/add": + # Generates a web page using _form_template that POSTs data to the MusicBrainz release editor. + self._add(args) + elif action == "/complete_add": + # MusicBrainz redirects to this endpoint after successfully adding a release. The ID of the new release + # is provided, completing the flow. + self._complete_add(args) + else: + self._response(404, "Unknown action.") + + def _get_task_from_args(self, args) -> Optional[CreateReleaseTask]: + # Try to get the token from query args, try to decode it, and try to find the associated CreateReleaseTask. + if "token" in args: + token = args["token"][0] + try: + payload = jwt.decode( + token, + self._plugin.jwt_key, + algorithms=self._plugin.jwt_algorithm, + ) + if ( + "task_key" in payload + and payload["task_key"] in self._plugin.create_release_tasks + ): + return self._plugin.create_release_tasks[ + payload["task_key"] + ] + else: + self._response(404, "task_key not found.") + except jwt.InvalidTokenError: + self._response(400, "Invalid token.") + else: + self._response(400, "Token missing.") + return None + + def _add(self, args): + if (task := self._get_task_from_args(args)) is None: + return + + template = _form_template.format( + action=escape("https://musicbrainz.org/release/add"), + form_data="".join( + [ + _form_input_template.format( + name=escape(str(k)), value=escape(str(v)) + ) + for k, v in task.formdata.items() + ] + ), + ) + task.browser_opened = True + self._response(200, template, content_type="text/html") + + def _complete_add(self, args): + if (task := self._get_task_from_args(args)) is None: + return + + if "release_mbid" not in args: + self._response(400, "release_mbid missing.") + return + + release_mbid = args["release_mbid"][0] + + task.result_release_mbid = release_mbid + self._response( + 200, + f"Release {release_mbid} added. You can close this browser window now and return to beets.", + ) + + def _response( + self, code: int, content: str = "", content_type: str = "text/plain" + ): + self.send_response(code) + self.send_header("Content-Type", content_type) + self.send_header("Cache-Control", "max-age=0") + self.end_headers() + self.wfile.write(content.encode()) + + +def join_phrase(i, total): + if i < total - 2: + return ", " + elif i < total - 1: + return " & " + else: + return "" + + +def build_formdata(items: list, redirect_uri: Optional[str]): + formdata = dict() + + labels = set() + album_artists = set() + + track_counter = defaultdict(int) + + for track in items: + if "name" not in formdata and track.album: + formdata["name"] = track.album + if "type" not in formdata and track.albumtype: + formdata["type"] = track.albumtype + if "barcode" not in formdata and track.barcode: + formdata["barcode"] = track.barcode + if "events.0.date.year" not in formdata and track.year: + formdata["events.0.date.year"] = track.year + if "events.0.date.month" not in formdata and track.month: + formdata["events.0.date.month"] = track.month + if "events.0.date.day" not in formdata and track.day: + formdata["events.0.date.day"] = track.day + + if track.label: + labels.add(track.label) + + if track.albumartists: + for artist in track.albumartists: + album_artists.add(artist) + elif track.albumartist: + album_artists.add(track.albumartist) + + if track.disc: + medium_index = track.disc - 1 + else: + medium_index = 0 + + track_index = track_counter[medium_index] + + if f"mediums.{medium_index}.format" not in formdata and track.media: + formdata[f"mediums.{medium_index}.format"] = track.media + + formdata[f"mediums.{medium_index}.track.{track_index}.name"] = ( + track.title + ) + formdata[f"mediums.{medium_index}.track.{track_index}.number"] = ( + track.track + ) + formdata[f"mediums.{medium_index}.track.{track_index}.length"] = int( + track.length * 1000 + ) # in milliseconds + + if track.artists: + track_artists = track.artists + elif track.artist: + track_artists = [track.artist] + else: + track_artists = [] + + for i, artist in enumerate(track_artists): + formdata[ + f"mediums.{medium_index}.track.{track_index}.artist_credit.names.{i}.artist.name" + ] = artist + if join_phrase(i, len(track_artists)): + formdata[ + f"mediums.{medium_index}.track.{track_index}.artist_credit.names.{i}.join_phrase" + ] = join_phrase(i, len(track_artists)) + + track_counter[medium_index] += 1 + + for i, label in enumerate(labels): + formdata[f"labels.{i}.name"] = label + + for i, artist in enumerate(album_artists): + formdata[f"artist_credit.names.{i}.artist.name"] = artist + if join_phrase(i, len(album_artists)): + formdata[f"artist_credit.names.{i}.join_phrase"] = join_phrase( + i, len(album_artists) + ) + + if redirect_uri: + formdata["redirect_uri"] = redirect_uri + + return formdata + class MBSubmitPlugin(BeetsPlugin): def __init__(self): super().__init__() + if jwt is None: + self._log.warn( + "Cannot import PyJWT, disabling 'Create release on musicbrainz' functionality" + ) + self.config.add( { "format": "$track. $title - $artist ($length)", "threshold": "medium", "picard_path": "picard", + "create_release_server_hostname": "127.0.0.1", + "create_release_server_port": 29661, + "create_release_method": "show_link", + "create_release_await_mbid": True, } ) + self.create_release_server_hostname = self.config[ + "create_release_server_hostname" + ].as_str() + self.create_release_server_port = self.config[ + "create_release_server_port" + ].as_number() + self.create_release_method = self.config[ + "create_release_method" + ].as_choice(["open_browser", "show_link"]) + self.create_release_await_mbid = self.config[ + "create_release_await_mbid" + ].as_choice([True, False]) + # Validate and store threshold. self.threshold = self.config["threshold"].as_choice( { @@ -57,12 +349,78 @@ def __init__(self): "before_choose_candidate", self.before_choose_candidate_event ) + self._server = None + + self.jwt_key = token_bytes() + self.jwt_algorithm = "HS256" + + # When the user selects "Create release on musicbrainz", the data that is going to get POSTed to MusicBrainz is + # stored in this dict using a randomly generated key. The token in the URL opened by the user contains this + # key. The web server looks up the data in this dictionary using the key, and generates the page to be + # displayed. + self.create_release_tasks = dict() + + def _start_server(self): + if not self._server: + + def handler(*args): + return RequestHandler(self, *args) + + for port in range( + self.create_release_server_port, + min(self.create_release_server_port + 100, 65535), + ): + try: + self._server = ThreadingHTTPServer( + (self.create_release_server_hostname, port), handler + ) + except OSError: + continue + threading.Thread( + target=self._server.serve_forever, daemon=True + ).start() + self._log.debug( + f"Starting web server on {self.create_release_server_hostname}:{port}" + ) + break + else: + self._log.error( + "Could not find a port to start the web server on" + ) + self._stop_server() + return False + + return True + + def _stop_server(self): + if self._server: + self._log.debug("Stopping web server") + self._server.shutdown() + self._server.server_close() + self._server = None + + def _wait_for_condition(self, condition): + t = threading.current_thread() + while not condition(): + time.sleep(0.5) + if isinstance(t, PipelineThread) and t.abort_flag: + raise KeyboardInterrupt() + def before_choose_candidate_event(self, session, task): if task.rec <= self.threshold: - return [ + choices = [ PromptChoice("p", "Print tracks", self.print_tracks), PromptChoice("o", "Open files with Picard", self.picard), ] + if jwt is not None and task.is_album: + choices += [ + PromptChoice( + "c", + "Create release on musicbrainz", + self.create_release_on_musicbrainz, + ), + ] + return choices def picard(self, session, task): paths = [] @@ -76,24 +434,90 @@ def picard(self, session, task): self._log.error(f"Could not open picard, got error:\n{exc}") def print_tracks(self, session, task): - for i in sorted(task.items, key=lambda i: i.track): + self._print_tracks(task.items) + + def _print_tracks(self, items): + for i in sorted(items, key=lambda i: i.track): print_data(None, i, self.config["format"].as_str()) + def create_release_on_musicbrainz(self, session, task): + return self._create_release_on_musicbrainz(task.items) + + def _create_release_on_musicbrainz(self, items): + if not self._start_server(): + return + task_key = str(uuid.uuid4()) + token = jwt.encode( + {"task_key": task_key}, self.jwt_key, algorithm=self.jwt_algorithm + ) + + url = f"http://{self.create_release_server_hostname}:{self._server.server_port}/add?token={token}" + redirect_uri = f"http://{self.create_release_server_hostname}:{self._server.server_port}/complete_add?token={token}" + + self._log.debug( + f"New create release task with task_key {task_key}, serving at {url}" + ) + + self.create_release_tasks[task_key] = CreateReleaseTask( + formdata=build_formdata( + items=items, + redirect_uri=( + redirect_uri if self.create_release_await_mbid else None + ), + ), + ) + + if self.create_release_method == "open_browser": + webbrowser.open(url) + elif self.create_release_method == "show_link": + print_(f"Open the following URL in your browser: {url}") + else: + return + + self._wait_for_condition( + lambda: self.create_release_tasks[task_key].browser_opened + ) + + if not self.create_release_await_mbid: + return + + print_("Waiting for MusicBrainz release ID...") + + self._wait_for_condition( + lambda: self.create_release_tasks[task_key].result_release_mbid + ) + mbid = self.create_release_tasks[task_key].result_release_mbid + + self._log.debug(f"Got release_mbid {mbid} for task_key {task_key}") + + _, _, prop = autotag.tag_album(items, search_ids=[mbid]) + return prop + def commands(self): """Add beet UI commands for mbsubmit.""" mbsubmit_cmd = ui.Subcommand( - "mbsubmit", help="Submit Tracks to MusicBrainz" + "mbsubmit", help="submit tracks to MusicBrainz" ) - def func(lib, opts, args): + def mbsubmit_cmd_func(lib, opts, args): items = lib.items(ui.decargs(args)) - self._mbsubmit(items) + self._print_tracks(items) - mbsubmit_cmd.func = func + mbsubmit_cmd.func = mbsubmit_cmd_func - return [mbsubmit_cmd] + mbcreate_cmd = ui.Subcommand( + "mbsubmit-create", help="create release on MusicBrainz" + ) - def _mbsubmit(self, items): - """Print track information to be submitted to MusicBrainz.""" - for i in sorted(items, key=lambda i: i.track): - print_data(None, i, self.config["format"].as_str()) + def mbcreate_cmd_func(lib, ops, args): + items = lib.items(ui.decargs(args)) + print_(f"{len(items)} matching item(s) found.") + if len(items) == 0: + return + self._print_tracks(items) + self.create_release_await_mbid = False + self._create_release_on_musicbrainz(items) + + mbcreate_cmd.func = mbcreate_cmd_func + + return [mbsubmit_cmd, mbcreate_cmd] diff --git a/docs/changelog.rst b/docs/changelog.rst index 3725e4993e..48786e7fe7 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -6,6 +6,11 @@ Unreleased Changelog goes here! Please add your entry to the bottom of one of the lists below! +New features: + +* :doc:`plugins/mbsubmit`: Add new prompt choice "Create release on musicbrainz", automating + the process as much as possible. + Bug fixes: * Improved naming of temporary files by separating the random part with the file extension. diff --git a/docs/plugins/mbsubmit.rst b/docs/plugins/mbsubmit.rst index 0e86ddc698..3532559e66 100644 --- a/docs/plugins/mbsubmit.rst +++ b/docs/plugins/mbsubmit.rst @@ -9,11 +9,24 @@ that is parseable by MusicBrainz's `track parser`_. The prompt choices are: - Print the tracks to stdout in a format suitable for MusicBrainz's `track parser`_. +- Create a new release on MusicBrainz, opens + https://musicbrainz.org/release/add in a new browser window with + fields pre-populated using existing metadata. + - Open the program `Picard`_ with the unmatched folder as an input, allowing you to start submitting the unmatched release to MusicBrainz with many input fields already filled in, thanks to Picard reading the preexisting tags of the files. +To create new releases on MusicBrainz with this plugin you need to install the +`PyJWT`_ library with: + +.. code-block:: console + + $ pip install beets[mbsubmit] + +.. _PyJWT: https://pyjwt.readthedocs.io/en/stable/ + For the last option, `Picard`_ is assumed to be installed and available on the machine including a ``picard`` executable. Picard developers list `download options`_. `other GNU/Linux distributions`_ may distribute Picard via their @@ -34,7 +47,7 @@ choice is demonstrated:: No matching release found for 3 tracks. For help, see: https://beets.readthedocs.org/en/latest/faq.html#nomatch [U]se as-is, as Tracks, Group albums, Skip, Enter search, enter Id, aBort, - Print tracks, Open files with Picard? p + Print tracks, Open files with Picard, Create release on musicbrainz? p 01. An Obscure Track - An Obscure Artist (3:37) 02. Another Obscure Track - An Obscure Artist (2:05) 03. The Third Track - Another Obscure Artist (3:02) @@ -56,6 +69,24 @@ the recommended workflow is to copy the output of the ``Print tracks`` choice and paste it into the parser that can be found by clicking on the "Track Parser" button on MusicBrainz "Tracklist" tab. +Create release on MusicBrainz +----------------------------- + +The https://musicbrainz.org/release/add page can be seeded with existing +metadata, as described here: https://musicbrainz.org/doc/Development/Seeding/Release_Editor. +This works in the following way: + +1. When you select the option to create a release, a local web server is started. +2. You point your web browser to that web server, either by clicking a link + displayed in the console, or by having beets open the link automatically. +3. The opened web page will redirect you to MusicBrainz, and the form fields + will be prepopulated with metadata found in the files. MusicBrainz may + ask you to confirm the action. +4. You edit the release on MusicBrainz and click "Enter edit" to finish. +5. MusicBrainz will redirect you to the local web server, submitting the ID + of the newly created release. +6. beets will add the release using the release ID returned by MusicBrainz. + Configuration ------------- @@ -70,6 +101,20 @@ file. The following options are available: Default: ``medium`` (causing the choice to be displayed for all albums that have a recommendation of medium strength or lower). Valid values: ``none``, ``low``, ``medium``, ``strong``. +- **create_release_server_hostname**: The host name of the local web server used for the + 'Create release on musicbrainz' functionality. The default is '127.0.0.1'. + Adjust this if beets is running on a different host in your local network. + Be aware that this web server is not secured in any way. +- **create_release_server_port**: The port for the local web server. Default is 29661. If + unavailable, beets will search for other ports until an available one is + found. +- **create_release_method**: Either 'open_browser' to automatically open a new + window/tab in your local browser or 'show_link' to simply show the link on + the console. +- **create_release_await_mbid**: Whether or not to wait for you to create the + release on MusicBrainz. If true, waits for a callback from MusicBrainz with + the new release ID and proceeds to add the unmatched album using that Id. + If false, simply shows the select action prompt again. Default: true. - **picard_path**: The path to the ``picard`` executable. Could be an absolute path, and if not, ``$PATH`` is consulted. The default value is simply ``picard``. Windows users will have to find and specify the absolute path to diff --git a/poetry.lock b/poetry.lock index 5050302ff3..6112ff0aa5 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. [[package]] name = "accessible-pygments" @@ -1874,6 +1874,23 @@ pycairo = ">=1.16" dev = ["flake8", "pytest", "pytest-cov"] docs = ["sphinx (>=4.0,<5.0)", "sphinx-rtd-theme (>=0.5,<2.0)"] +[[package]] +name = "pyjwt" +version = "2.8.0" +description = "JSON Web Token implementation in Python" +optional = false +python-versions = ">=3.7" +files = [ + {file = "PyJWT-2.8.0-py3-none-any.whl", hash = "sha256:59127c392cc44c2da5bb3192169a91f429924e17aff6534d70fdc02ab3e04320"}, + {file = "PyJWT-2.8.0.tar.gz", hash = "sha256:57e28d156e3d5c10088e0c68abb90bfac3df82b40a71bd0daa20c65ccd5c23de"}, +] + +[package.extras] +crypto = ["cryptography (>=3.4.0)"] +dev = ["coverage[toml] (==5.0.4)", "cryptography (>=3.4.0)", "pre-commit", "pytest (>=6.0.0,<7.0.0)", "sphinx (>=4.5.0,<5.0.0)", "sphinx-rtd-theme", "zope.interface"] +docs = ["sphinx (>=4.5.0,<5.0.0)", "sphinx-rtd-theme", "zope.interface"] +tests = ["coverage[toml] (==5.0.4)", "pytest (>=6.0.0,<7.0.0)"] + [[package]] name = "pylast" version = "5.3.0" @@ -2739,6 +2756,7 @@ kodiupdate = ["requests"] lastgenre = ["pylast"] lastimport = ["pylast"] lyrics = ["beautifulsoup4", "langdetect", "requests"] +mbsubmit = ["pyjwt"] metasync = ["dbus-python"] mpdstats = ["python-mpd2"] plexupdate = ["requests"] @@ -2752,4 +2770,4 @@ web = ["flask", "flask-cors"] [metadata] lock-version = "2.0" python-versions = ">=3.8,<4" -content-hash = "4ae4e4157dd4a0c7951ba1f642c8dc36ae8ca264789b2a224ecaaf5e3a2f289a" +content-hash = "92fb4a6421dabcaef979812019cff9544b6902f6f000e4307b5f5f96dd8b6b2e" diff --git a/pyproject.toml b/pyproject.toml index 950818ed22..9aca214dd0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -62,6 +62,7 @@ reflink = { version = "*", optional = true } requests = { version = "*", optional = true } requests-oauthlib = { version = ">=0.6.1", optional = true } soco = { version = "*", optional = true } +pyjwt = { version = "*", optional = true } [tool.poetry.group.test.dependencies] beautifulsoup4 = "*" @@ -80,6 +81,7 @@ rarfile = "*" reflink = "*" requests_oauthlib = "*" responses = ">=0.3.0" +pyjwt = "*" [tool.poetry.group.format.dependencies] isort = { version = "<5.14", extras = ["colors"] } @@ -128,6 +130,7 @@ kodiupdate = ["requests"] lastgenre = ["pylast"] lastimport = ["pylast"] lyrics = ["beautifulsoup4", "langdetect", "requests"] +mbsubmit = ["pyjwt"] metasync = ["dbus-python"] mpdstats = ["python-mpd2"] plexupdate = ["requests"] diff --git a/test/plugins/test_mbsubmit.py b/test/plugins/test_mbsubmit.py index 40024bc714..1f1dea39e0 100644 --- a/test/plugins/test_mbsubmit.py +++ b/test/plugins/test_mbsubmit.py @@ -11,10 +11,15 @@ # # The above copyright notice and this permission notice shall be # included in all copies or substantial portions of the Software. - - import unittest +from unittest.mock import patch +from urllib.parse import urljoin + +import jwt +import requests +import beets.plugins +from beets.test._common import item from beets.test.helper import ( AutotagStub, ImportHelper, @@ -23,6 +28,8 @@ capture_stdout, control_stdin, ) +from beetsplug import mbsubmit +from beetsplug.mbsubmit import CreateReleaseTask, MBSubmitPlugin class MBSubmitPluginTest( @@ -31,11 +38,16 @@ class MBSubmitPluginTest( def setUp(self): self.setup_beets() self.load_plugins("mbsubmit") + self.plugin: MBSubmitPlugin = beets.plugins.find_plugins()[0] + self.plugin._start_server() + self.server_url = f"http://localhost:{self.plugin._server.server_port}" self._create_import_dir(2) self._setup_import_session() self.matcher = AutotagStub().install() def tearDown(self): + self.plugin._stop_server() + self.unload_plugins() self.teardown_beets() self.matcher.restore() @@ -51,7 +63,7 @@ def test_print_tracks_output(self): # Manually build the string for comparing the output. tracklist = ( - "Open files with Picard? " + "Create release on musicbrainz? " "01. Tag Title 1 - Tag Artist (0:01)\n" "02. Tag Title 2 - Tag Artist (0:01)" ) @@ -72,6 +84,164 @@ def test_print_tracks_output_as_tracks(self): ) self.assertIn(tracklist, output.getvalue()) + @patch.object(MBSubmitPlugin, "_wait_for_condition", autospec=True) + def test_create_release(self, wait_for_condition_mock): + self.matcher.matching = AutotagStub.BAD + + def _wait_for_condition(plugin: MBSubmitPlugin, condition): + self.assertEqual(1, len(plugin.create_release_tasks)) + task_id = list(plugin.create_release_tasks.keys())[0] + if wait_for_condition_mock.call_count == 1: + plugin.create_release_tasks[task_id].browser_opened = True + if wait_for_condition_mock.call_count == 2: + plugin.create_release_tasks[task_id].result_release_mbid = ( + "new_id" + ) + + wait_for_condition_mock.side_effect = _wait_for_condition + + with control_stdin("\n".join(["c", "s"])): + # Create release on MusicBrainz, Skip + self.importer.run() + + self.assertEqual(2, wait_for_condition_mock.call_count) + + def test_create_release_server_add(self): + r = requests.get(self.server_url) + self.assertEqual(404, r.status_code) + + r = requests.get(urljoin(self.server_url, "/add")) + self.assertEqual(400, r.status_code) + self.assertEqual("Token missing.", r.text) + + r = requests.get(urljoin(self.server_url, "/add?token=12356")) + self.assertEqual(400, r.status_code) + self.assertEqual("Invalid token.", r.text) + + token = jwt.encode( + {"task_key": "unique_key"}, + self.plugin.jwt_key, + algorithm=self.plugin.jwt_algorithm, + ) + + r = requests.get(urljoin(self.server_url, f"/add?token={token}")) + self.assertEqual(404, r.status_code) + self.assertEqual("task_key not found.", r.text) + + task = CreateReleaseTask( + {"a": 1, "b": "Something'test\"", "c": 6767.74} + ) + self.plugin.create_release_tasks["unique_key"] = task + + self.assertFalse(task.browser_opened) + + r = requests.get(urljoin(self.server_url, f"/add?token={token}")) + self.assertEqual(200, r.status_code) + self.assertIn('', r.text) + self.assertIn( + '', + r.text, + ) + self.assertIn('', r.text) + + self.assertTrue(task.browser_opened) + + r = requests.get(urljoin(self.server_url, f"/complete_add")) + self.assertEqual(400, r.status_code) + self.assertEqual("Token missing.", r.text) + + r = requests.get(urljoin(self.server_url, "/complete_add?token=12356")) + self.assertEqual(400, r.status_code) + self.assertEqual("Invalid token.", r.text) + + r = requests.get( + urljoin(self.server_url, f"/complete_add?token={token}") + ) + self.assertEqual(400, r.status_code) + self.assertEqual("release_mbid missing.", r.text) + + self.assertIsNone(task.result_release_mbid) + + r = requests.get( + urljoin( + self.server_url, + f"/complete_add?token={token}&release_mbid=the_new_id", + ) + ) + self.assertEqual(200, r.status_code) + self.assertEqual( + "Release the_new_id added. You can close this browser window now and return to beets.", + r.text, + ) + + self.assertEqual("the_new_id", task.result_release_mbid) + + def test_build_formdata(self): + self.assertDictEqual({}, mbsubmit.build_formdata([], None)) + self.assertDictEqual( + {"redirect_uri": "redirect_to_somewhere"}, + mbsubmit.build_formdata([], "redirect_to_somewhere"), + ) + + item1 = item(self.lib) + item1.track = 1 + item1.title = "Track 1" + item1.albumtype = "Album" + item1.barcode = 1234567890 + item1.media = "CD" + + item2 = item(self.lib) + item2.track = 2 + item2.artists = ["a", "b"] + item2.title = "Track 2" + item2.albumtype = "Album" + item2.barcode = 1234567890 + item2.media = "CD" + + item3 = item(self.lib) + item3.track = 3 + item3.disc = None + item3.artists = ["a", "b", "c"] + item3.title = "Track 3" + item3.albumtype = "Album" + item3.barcode = 1234567890 + item3.media = "Digital Media" + + self.maxDiff = None + + self.assertDictEqual( + { + "name": "the album", + "barcode": "1234567890", + "type": "Album", + "events.0.date.year": 1, + "events.0.date.month": 2, + "events.0.date.day": 3, + "artist_credit.names.0.artist.name": "the album artist", + "mediums.5.format": "CD", + "mediums.5.track.0.artist_credit.names.0.artist.name": "the artist", + "mediums.5.track.0.length": 60000, + "mediums.5.track.0.name": "Track 1", + "mediums.5.track.0.number": 1, + "mediums.5.track.1.artist_credit.names.0.artist.name": "a", + "mediums.5.track.1.artist_credit.names.0.join_phrase": " & ", + "mediums.5.track.1.artist_credit.names.1.artist.name": "b", + "mediums.5.track.1.length": 60000, + "mediums.5.track.1.name": "Track 2", + "mediums.5.track.1.number": 2, + "mediums.0.format": "Digital Media", + "mediums.0.track.0.artist_credit.names.0.artist.name": "a", + "mediums.0.track.0.artist_credit.names.0.join_phrase": ", ", + "mediums.0.track.0.artist_credit.names.1.artist.name": "b", + "mediums.0.track.0.artist_credit.names.1.join_phrase": " & ", + "mediums.0.track.0.artist_credit.names.2.artist.name": "c", + "mediums.0.track.0.length": 60000, + "mediums.0.track.0.name": "Track 3", + "mediums.0.track.0.number": 3, + }, + mbsubmit.build_formdata([item1, item2, item3], None), + ) + def suite(): return unittest.TestLoader().loadTestsFromName(__name__)