From 34f1e14cf915f003c32b3d42a191235e044767bb Mon Sep 17 00:00:00 2001 From: XDGFX Date: Sat, 6 Nov 2021 18:41:57 +0000 Subject: [PATCH] Improve spotr structure --- .gitignore | 9 + README.md | 5 +- requirements.txt | 3 + spotipy/__init__.py | 2 - spotipy/client.py | 1056 ------------------------------------------- spotipy/oauth2.py | 264 ----------- spotipy/util.py | 93 ---- spotr.py | 221 ++++++--- 8 files changed, 183 insertions(+), 1470 deletions(-) create mode 100644 .gitignore create mode 100644 requirements.txt delete mode 100644 spotipy/__init__.py delete mode 100644 spotipy/client.py delete mode 100644 spotipy/oauth2.py delete mode 100644 spotipy/util.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6639307 --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +* + +!README.md +!spotr.py +!example-cred.py +!LICENSE +!requirements.txt +!.github/ +!.gitignore \ No newline at end of file diff --git a/README.md b/README.md index efc5c2f..7e03da6 100644 --- a/README.md +++ b/README.md @@ -17,4 +17,7 @@ spotr will **delete** all tracks from the Spotify playlist you provide it. This Run with `python3` and follow instructions to authenticate your account once ## Further runs -Just run with `python3` \ No newline at end of file +Just run with `python3` + +## Errors +Spotr should print any errors. If a track is not being removed from your Spotify playlist, check the logs. \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..af8de09 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +spotipy==2.19.0 +requests==2.26.0 +tqdm==4.62.3 \ No newline at end of file diff --git a/spotipy/__init__.py b/spotipy/__init__.py deleted file mode 100644 index 9be1dce..0000000 --- a/spotipy/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -VERSION='2.0.1' -from .client import Spotify, SpotifyException diff --git a/spotipy/client.py b/spotipy/client.py deleted file mode 100644 index 3e33a21..0000000 --- a/spotipy/client.py +++ /dev/null @@ -1,1056 +0,0 @@ -# coding: utf-8 - - -from __future__ import print_function -import sys -import requests -import json -import time - -import six - -""" A simple and thin Python library for the Spotify Web API -""" - - -class SpotifyException(Exception): - def __init__(self, http_status, code, msg, headers=None): - self.http_status = http_status - self.code = code - self.msg = msg - # `headers` is used to support `Retry-After` in the event of a - # 429 status code. - if headers is None: - headers = {} - self.headers = headers - - def __str__(self): - return 'http status: {0}, code:{1} - {2}'.format( - self.http_status, self.code, self.msg) - - -class Spotify(object): - """ - Example usage:: - - import spotipy - - urn = 'spotify:artist:3jOstUTkEu2JkjvRdBA5Gu' - sp = spotipy.Spotify() - - sp.trace = True # turn on tracing - sp.trace_out = True # turn on trace out - - artist = sp.artist(urn) - print(artist) - - user = sp.user('plamere') - print(user) - """ - - trace = False # Enable tracing? - trace_out = False - max_get_retries = 10 - - def __init__(self, auth=None, requests_session=True, - client_credentials_manager=None, proxies=None, requests_timeout=None): - """ - Create a Spotify API object. - - :param auth: An authorization token (optional) - :param requests_session: - A Requests session object or a truthy value to create one. - A falsy value disables sessions. - It should generally be a good idea to keep sessions enabled - for performance reasons (connection pooling). - :param client_credentials_manager: - SpotifyClientCredentials object - :param proxies: - Definition of proxies (optional) - :param requests_timeout: - Tell Requests to stop waiting for a response after a given number of seconds - """ - self.prefix = 'https://api.spotify.com/v1/' - self._auth = auth - self.client_credentials_manager = client_credentials_manager - self.proxies = proxies - self.requests_timeout = requests_timeout - - if isinstance(requests_session, requests.Session): - self._session = requests_session - else: - if requests_session: # Build a new session. - self._session = requests.Session() - else: # Use the Requests API module as a "session". - from requests import api - self._session = api - - def _auth_headers(self): - if self._auth: - return {'Authorization': 'Bearer {0}'.format(self._auth)} - elif self.client_credentials_manager: - token = self.client_credentials_manager.get_access_token() - return {'Authorization': 'Bearer {0}'.format(token)} - else: - return {} - - def _internal_call(self, method, url, payload, params): - args = dict(params=params) - args["timeout"] = self.requests_timeout - if not url.startswith('http'): - url = self.prefix + url - headers = self._auth_headers() - headers['Content-Type'] = 'application/json' - - if payload: - args["data"] = json.dumps(payload) - - if self.trace_out: - print(url) - r = self._session.request(method, url, headers=headers, proxies=self.proxies, **args) - - if self.trace: # pragma: no cover - print() - print ('headers', headers) - print ('http status', r.status_code) - print(method, r.url) - if payload: - print("DATA", json.dumps(payload)) - - try: - r.raise_for_status() - except: - if r.text and len(r.text) > 0 and r.text != 'null': - raise SpotifyException(r.status_code, - -1, '%s:\n %s' % (r.url, r.json()['error']['message']), - headers=r.headers) - else: - raise SpotifyException(r.status_code, - -1, '%s:\n %s' % (r.url, 'error'), headers=r.headers) - finally: - r.connection.close() - if r.text and len(r.text) > 0 and r.text != 'null': - results = r.json() - if self.trace: # pragma: no cover - print('RESP', results) - print() - return results - else: - return None - - def _get(self, url, args=None, payload=None, **kwargs): - if args: - kwargs.update(args) - retries = self.max_get_retries - delay = 1 - while retries > 0: - try: - return self._internal_call('GET', url, payload, kwargs) - except SpotifyException as e: - retries -= 1 - status = e.http_status - # 429 means we hit a rate limit, backoff - if status == 429 or (status >= 500 and status < 600): - if retries < 0: - raise - else: - sleep_seconds = int(e.headers.get('Retry-After', delay)) - print ('retrying ...' + str(sleep_seconds) + 'secs') - time.sleep(sleep_seconds + 1) - delay += 1 - else: - raise - except Exception as e: - raise - print ('exception', str(e)) - # some other exception. Requests have - # been know to throw a BadStatusLine exception - retries -= 1 - if retries >= 0: - sleep_seconds = int(e.headers.get('Retry-After', delay)) - print ('retrying ...' + str(delay) + 'secs') - time.sleep(sleep_seconds + 1) - delay += 1 - else: - raise - - def _post(self, url, args=None, payload=None, **kwargs): - if args: - kwargs.update(args) - return self._internal_call('POST', url, payload, kwargs) - - def _delete(self, url, args=None, payload=None, **kwargs): - if args: - kwargs.update(args) - return self._internal_call('DELETE', url, payload, kwargs) - - def _put(self, url, args=None, payload=None, **kwargs): - if args: - kwargs.update(args) - return self._internal_call('PUT', url, payload, kwargs) - - def next(self, result): - """ returns the next result given a paged result - - Parameters: - - result - a previously returned paged result - """ - if result['next']: - return self._get(result['next']) - else: - return None - - def previous(self, result): - """ returns the previous result given a paged result - - Parameters: - - result - a previously returned paged result - """ - if result['previous']: - return self._get(result['previous']) - else: - return None - - def _warn_old(self, msg): - print('warning:' + msg, file=sys.stderr) - - def _warn(self, msg, *args): - print('warning:' + msg.format(*args), file=sys.stderr) - - def track(self, track_id): - """ returns a single track given the track's ID, URI or URL - - Parameters: - - track_id - a spotify URI, URL or ID - """ - - trid = self._get_id('track', track_id) - return self._get('tracks/' + trid) - - def tracks(self, tracks, market = None): - """ returns a list of tracks given a list of track IDs, URIs, or URLs - - Parameters: - - tracks - a list of spotify URIs, URLs or IDs - - market - an ISO 3166-1 alpha-2 country code. - """ - - tlist = [self._get_id('track', t) for t in tracks] - return self._get('tracks/?ids=' + ','.join(tlist), market = market) - - def artist(self, artist_id): - """ returns a single artist given the artist's ID, URI or URL - - Parameters: - - artist_id - an artist ID, URI or URL - """ - - trid = self._get_id('artist', artist_id) - return self._get('artists/' + trid) - - def artists(self, artists): - """ returns a list of artists given the artist IDs, URIs, or URLs - - Parameters: - - artists - a list of artist IDs, URIs or URLs - """ - - tlist = [self._get_id('artist', a) for a in artists] - return self._get('artists/?ids=' + ','.join(tlist)) - - def artist_albums(self, artist_id, album_type=None, country=None, limit=20, - offset=0): - """ Get Spotify catalog information about an artist's albums - - Parameters: - - artist_id - the artist ID, URI or URL - - album_type - 'album', 'single', 'appears_on', 'compilation' - - country - limit the response to one particular country. - - limit - the number of albums to return - - offset - the index of the first album to return - """ - - trid = self._get_id('artist', artist_id) - return self._get('artists/' + trid + '/albums', album_type=album_type, - country=country, limit=limit, offset=offset) - - def artist_top_tracks(self, artist_id, country='US'): - """ Get Spotify catalog information about an artist's top 10 tracks - by country. - - Parameters: - - artist_id - the artist ID, URI or URL - - country - limit the response to one particular country. - """ - - trid = self._get_id('artist', artist_id) - return self._get('artists/' + trid + '/top-tracks', country=country) - - def artist_related_artists(self, artist_id): - """ Get Spotify catalog information about artists similar to an - identified artist. Similarity is based on analysis of the - Spotify community's listening history. - - Parameters: - - artist_id - the artist ID, URI or URL - """ - trid = self._get_id('artist', artist_id) - return self._get('artists/' + trid + '/related-artists') - - def album(self, album_id): - """ returns a single album given the album's ID, URIs or URL - - Parameters: - - album_id - the album ID, URI or URL - """ - - trid = self._get_id('album', album_id) - return self._get('albums/' + trid) - - def album_tracks(self, album_id, limit=50, offset=0): - """ Get Spotify catalog information about an album's tracks - - Parameters: - - album_id - the album ID, URI or URL - - limit - the number of items to return - - offset - the index of the first item to return - """ - - trid = self._get_id('album', album_id) - return self._get('albums/' + trid + '/tracks/', limit=limit, - offset=offset) - - def albums(self, albums): - """ returns a list of albums given the album IDs, URIs, or URLs - - Parameters: - - albums - a list of album IDs, URIs or URLs - """ - - tlist = [self._get_id('album', a) for a in albums] - return self._get('albums/?ids=' + ','.join(tlist)) - - def search(self, q, limit=10, offset=0, type='track', market=None): - """ searches for an item - - Parameters: - - q - the search query - - limit - the number of items to return - - offset - the index of the first item to return - - type - the type of item to return. One of 'artist', 'album', - 'track' or 'playlist' - - market - An ISO 3166-1 alpha-2 country code or the string from_token. - """ - return self._get('search', q=q, limit=limit, offset=offset, type=type, market=market) - - def user(self, user): - """ Gets basic profile information about a Spotify User - - Parameters: - - user - the id of the usr - """ - return self._get('users/' + user) - - def current_user_playlists(self, limit=50, offset=0): - """ Get current user playlists without required getting his profile - Parameters: - - limit - the number of items to return - - offset - the index of the first item to return - """ - return self._get("me/playlists", limit=limit, offset=offset) - - def user_playlists(self, user, limit=50, offset=0): - """ Gets playlists of a user - - Parameters: - - user - the id of the usr - - limit - the number of items to return - - offset - the index of the first item to return - """ - return self._get("users/%s/playlists" % user, limit=limit, - offset=offset) - - def user_playlist(self, user, playlist_id=None, fields=None): - """ Gets playlist of a user - Parameters: - - user - the id of the user - - playlist_id - the id of the playlist - - fields - which fields to return - """ - if playlist_id is None: - return self._get("users/%s/starred" % (user), fields=fields) - plid = self._get_id('playlist', playlist_id) - return self._get("users/%s/playlists/%s" % (user, plid), fields=fields) - - def user_playlist_tracks(self, user, playlist_id=None, fields=None, - limit=100, offset=0, market=None): - """ Get full details of the tracks of a playlist owned by a user. - - Parameters: - - user - the id of the user - - playlist_id - the id of the playlist - - fields - which fields to return - - limit - the maximum number of tracks to return - - offset - the index of the first track to return - - market - an ISO 3166-1 alpha-2 country code. - """ - plid = self._get_id('playlist', playlist_id) - return self._get("users/%s/playlists/%s/tracks" % (user, plid), - limit=limit, offset=offset, fields=fields, - market=market) - - - def user_playlist_create(self, user, name, public=True, description=''): - """ Creates a playlist for a user - - Parameters: - - user - the id of the user - - name - the name of the playlist - - public - is the created playlist public - - description - the description of the playlist - """ - data = {'name': name, 'public': public, 'description': description} - - - return self._post("users/%s/playlists" % (user,), payload=data) - - def user_playlist_change_details( - self, user, playlist_id, name=None, public=None, - collaborative=None, description=None): - """ Changes a playlist's name and/or public/private state - - Parameters: - - user - the id of the user - - playlist_id - the id of the playlist - - name - optional name of the playlist - - public - optional is the playlist public - - collaborative - optional is the playlist collaborative - - description - optional description of the playlist - """ - - data = {} - if isinstance(name, six.string_types): - data['name'] = name - if isinstance(public, bool): - data['public'] = public - if isinstance(collaborative, bool): - data['collaborative'] = collaborative - if isinstance(description, six.string_types): - data['description'] = description - return self._put("users/%s/playlists/%s" % (user, playlist_id), - payload=data) - - def user_playlist_unfollow(self, user, playlist_id): - """ Unfollows (deletes) a playlist for a user - - Parameters: - - user - the id of the user - - name - the name of the playlist - """ - return self._delete("users/%s/playlists/%s/followers" % (user, playlist_id)) - - def user_playlist_add_tracks(self, user, playlist_id, tracks, - position=None): - """ Adds tracks to a playlist - - Parameters: - - user - the id of the user - - playlist_id - the id of the playlist - - tracks - a list of track URIs, URLs or IDs - - position - the position to add the tracks - """ - plid = self._get_id('playlist', playlist_id) - ftracks = [self._get_uri('track', tid) for tid in tracks] - return self._post("users/%s/playlists/%s/tracks" % (user, plid), - payload=ftracks, position=position) - - def user_playlist_replace_tracks(self, user, playlist_id, tracks): - """ Replace all tracks in a playlist - - Parameters: - - user - the id of the user - - playlist_id - the id of the playlist - - tracks - the list of track ids to add to the playlist - """ - plid = self._get_id('playlist', playlist_id) - ftracks = [self._get_uri('track', tid) for tid in tracks] - payload = {"uris": ftracks} - return self._put("users/%s/playlists/%s/tracks" % (user, plid), - payload=payload) - - def user_playlist_reorder_tracks( - self, user, playlist_id, range_start, insert_before, - range_length=1, snapshot_id=None): - """ Reorder tracks in a playlist - - Parameters: - - user - the id of the user - - playlist_id - the id of the playlist - - range_start - the position of the first track to be reordered - - range_length - optional the number of tracks to be reordered (default: 1) - - insert_before - the position where the tracks should be inserted - - snapshot_id - optional playlist's snapshot ID - """ - plid = self._get_id('playlist', playlist_id) - payload = {"range_start": range_start, - "range_length": range_length, - "insert_before": insert_before} - if snapshot_id: - payload["snapshot_id"] = snapshot_id - return self._put("users/%s/playlists/%s/tracks" % (user, plid), - payload=payload) - - def user_playlist_remove_all_occurrences_of_tracks( - self, user, playlist_id, tracks, snapshot_id=None): - """ Removes all occurrences of the given tracks from the given playlist - - Parameters: - - user - the id of the user - - playlist_id - the id of the playlist - - tracks - the list of track ids to add to the playlist - - snapshot_id - optional id of the playlist snapshot - - """ - - plid = self._get_id('playlist', playlist_id) - ftracks = [self._get_uri('track', tid) for tid in tracks] - payload = {"tracks": [{"uri": track} for track in ftracks]} - if snapshot_id: - payload["snapshot_id"] = snapshot_id - return self._delete("users/%s/playlists/%s/tracks" % (user, plid), - payload=payload) - - def user_playlist_remove_specific_occurrences_of_tracks( - self, user, playlist_id, tracks, snapshot_id=None): - """ Removes all occurrences of the given tracks from the given playlist - - Parameters: - - user - the id of the user - - playlist_id - the id of the playlist - - tracks - an array of objects containing Spotify URIs of the tracks to remove with their current positions in the playlist. For example: - [ { "uri":"4iV5W9uYEdYUVa79Axb7Rh", "positions":[2] }, - { "uri":"1301WleyT98MSxVHPZCA6M", "positions":[7] } ] - - snapshot_id - optional id of the playlist snapshot - """ - - plid = self._get_id('playlist', playlist_id) - ftracks = [] - for tr in tracks: - ftracks.append({ - "uri": self._get_uri("track", tr["uri"]), - "positions": tr["positions"], - }) - payload = {"tracks": ftracks} - if snapshot_id: - payload["snapshot_id"] = snapshot_id - return self._delete("users/%s/playlists/%s/tracks" % (user, plid), - payload=payload) - - def user_playlist_follow_playlist(self, playlist_owner_id, playlist_id): - """ - Add the current authenticated user as a follower of a playlist. - - Parameters: - - playlist_owner_id - the user id of the playlist owner - - playlist_id - the id of the playlist - - """ - return self._put("users/{}/playlists/{}/followers".format(playlist_owner_id, playlist_id)) - - def user_playlist_is_following(self, playlist_owner_id, playlist_id, user_ids): - """ - Check to see if the given users are following the given playlist - - Parameters: - - playlist_owner_id - the user id of the playlist owner - - playlist_id - the id of the playlist - - user_ids - the ids of the users that you want to check to see if they follow the playlist. Maximum: 5 ids. - - """ - return self._get("users/{}/playlists/{}/followers/contains?ids={}".format(playlist_owner_id, playlist_id, ','.join(user_ids))) - - def me(self): - """ Get detailed profile information about the current user. - An alias for the 'current_user' method. - """ - return self._get('me/') - - def current_user(self): - """ Get detailed profile information about the current user. - An alias for the 'me' method. - """ - return self.me() - - def current_user_playing_track(self): - ''' Get information about the current users currently playing track. - ''' - return self._get('me/player/currently-playing') - - def current_user_saved_albums(self, limit=20, offset=0): - """ Gets a list of the albums saved in the current authorized user's - "Your Music" library - - Parameters: - - limit - the number of albums to return - - offset - the index of the first album to return - - """ - return self._get('me/albums', limit=limit, offset=offset) - - def current_user_saved_tracks(self, limit=20, offset=0): - """ Gets a list of the tracks saved in the current authorized user's - "Your Music" library - - Parameters: - - limit - the number of tracks to return - - offset - the index of the first track to return - - """ - return self._get('me/tracks', limit=limit, offset=offset) - - def current_user_followed_artists(self, limit=20, after=None): - """ Gets a list of the artists followed by the current authorized user - - Parameters: - - limit - the number of tracks to return - - after - ghe last artist ID retrieved from the previous request - - """ - return self._get('me/following', type='artist', limit=limit, - after=after) - - def current_user_saved_tracks_delete(self, tracks=None): - """ Remove one or more tracks from the current user's - "Your Music" library. - - Parameters: - - tracks - a list of track URIs, URLs or IDs - """ - tlist = [] - if tracks is not None: - tlist = [self._get_id('track', t) for t in tracks] - return self._delete('me/tracks/?ids=' + ','.join(tlist)) - - def current_user_saved_tracks_contains(self, tracks=None): - """ Check if one or more tracks is already saved in - the current Spotify user’s “Your Music” library. - - Parameters: - - tracks - a list of track URIs, URLs or IDs - """ - tlist = [] - if tracks is not None: - tlist = [self._get_id('track', t) for t in tracks] - return self._get('me/tracks/contains?ids=' + ','.join(tlist)) - - def current_user_saved_tracks_add(self, tracks=None): - """ Add one or more tracks to the current user's - "Your Music" library. - - Parameters: - - tracks - a list of track URIs, URLs or IDs - """ - tlist = [] - if tracks is not None: - tlist = [self._get_id('track', t) for t in tracks] - return self._put('me/tracks/?ids=' + ','.join(tlist)) - - def current_user_top_artists(self, limit=20, offset=0, - time_range='medium_term'): - """ Get the current user's top artists - - Parameters: - - limit - the number of entities to return - - offset - the index of the first entity to return - - time_range - Over what time frame are the affinities computed - Valid-values: short_term, medium_term, long_term - """ - return self._get('me/top/artists', time_range=time_range, limit=limit, - offset=offset) - - def current_user_top_tracks(self, limit=20, offset=0, - time_range='medium_term'): - """ Get the current user's top tracks - - Parameters: - - limit - the number of entities to return - - offset - the index of the first entity to return - - time_range - Over what time frame are the affinities computed - Valid-values: short_term, medium_term, long_term - """ - return self._get('me/top/tracks', time_range=time_range, limit=limit, - offset=offset) - - def current_user_recently_played(self, limit=50): - ''' Get the current user's recently played tracks - - Parameters: - - limit - the number of entities to return - ''' - return self._get('me/player/recently-played', limit=limit) - - def current_user_saved_albums_add(self, albums=[]): - """ Add one or more albums to the current user's - "Your Music" library. - Parameters: - - albums - a list of album URIs, URLs or IDs - """ - alist = [self._get_id('album', a) for a in albums] - r = self._put('me/albums?ids=' + ','.join(alist)) - return r - - def user_follow_artists(self, ids=[]): - ''' Follow one or more artists - Parameters: - - ids - a list of artist IDs - ''' - return self._put('me/following?type=artist&ids=' + ','.join(ids)) - - def user_follow_users(self, ids=[]): - ''' Follow one or more users - Parameters: - - ids - a list of user IDs - ''' - return self._put('me/following?type=user&ids=' + ','.join(ids)) - - def featured_playlists(self, locale=None, country=None, timestamp=None, - limit=20, offset=0): - """ Get a list of Spotify featured playlists - - Parameters: - - locale - The desired language, consisting of a lowercase ISO - 639 language code and an uppercase ISO 3166-1 alpha-2 country - code, joined by an underscore. - - - country - An ISO 3166-1 alpha-2 country code. - - - timestamp - A timestamp in ISO 8601 format: - yyyy-MM-ddTHH:mm:ss. Use this parameter to specify the user's - local time to get results tailored for that specific date and - time in the day - - - limit - The maximum number of items to return. Default: 20. - Minimum: 1. Maximum: 50 - - - offset - The index of the first item to return. Default: 0 - (the first object). Use with limit to get the next set of - items. - """ - return self._get('browse/featured-playlists', locale=locale, - country=country, timestamp=timestamp, limit=limit, - offset=offset) - - def new_releases(self, country=None, limit=20, offset=0): - """ Get a list of new album releases featured in Spotify - - Parameters: - - country - An ISO 3166-1 alpha-2 country code. - - - limit - The maximum number of items to return. Default: 20. - Minimum: 1. Maximum: 50 - - - offset - The index of the first item to return. Default: 0 - (the first object). Use with limit to get the next set of - items. - """ - return self._get('browse/new-releases', country=country, limit=limit, - offset=offset) - - def categories(self, country=None, locale=None, limit=20, offset=0): - """ Get a list of new album releases featured in Spotify - - Parameters: - - country - An ISO 3166-1 alpha-2 country code. - - locale - The desired language, consisting of an ISO 639 - language code and an ISO 3166-1 alpha-2 country code, joined - by an underscore. - - - limit - The maximum number of items to return. Default: 20. - Minimum: 1. Maximum: 50 - - - offset - The index of the first item to return. Default: 0 - (the first object). Use with limit to get the next set of - items. - """ - return self._get('browse/categories', country=country, locale=locale, - limit=limit, offset=offset) - - def category_playlists(self, category_id=None, country=None, limit=20, - offset=0): - """ Get a list of new album releases featured in Spotify - - Parameters: - - category_id - The Spotify category ID for the category. - - - country - An ISO 3166-1 alpha-2 country code. - - - limit - The maximum number of items to return. Default: 20. - Minimum: 1. Maximum: 50 - - - offset - The index of the first item to return. Default: 0 - (the first object). Use with limit to get the next set of - items. - """ - return self._get('browse/categories/' + category_id + '/playlists', - country=country, limit=limit, offset=offset) - - def recommendations(self, seed_artists=None, seed_genres=None, - seed_tracks=None, limit=20, country=None, **kwargs): - """ Get a list of recommended tracks for one to five seeds. - - Parameters: - - seed_artists - a list of artist IDs, URIs or URLs - - - seed_tracks - a list of artist IDs, URIs or URLs - - - seed_genres - a list of genre names. Available genres for - recommendations can be found by calling recommendation_genre_seeds - - - country - An ISO 3166-1 alpha-2 country code. If provided, all - results will be playable in this country. - - - limit - The maximum number of items to return. Default: 20. - Minimum: 1. Maximum: 100 - - - min/max/target_ - For the tuneable track attributes listed - in the documentation, these values provide filters and targeting on - results. - """ - params = dict(limit=limit) - if seed_artists: - params['seed_artists'] = ','.join( - [self._get_id('artist', a) for a in seed_artists]) - if seed_genres: - params['seed_genres'] = ','.join(seed_genres) - if seed_tracks: - params['seed_tracks'] = ','.join( - [self._get_id('track', t) for t in seed_tracks]) - if country: - params['market'] = country - - for attribute in ["acousticness", "danceability", "duration_ms", - "energy", "instrumentalness", "key", "liveness", - "loudness", "mode", "popularity", "speechiness", - "tempo", "time_signature", "valence"]: - for prefix in ["min_", "max_", "target_"]: - param = prefix + attribute - if param in kwargs: - params[param] = kwargs[param] - return self._get('recommendations', **params) - - def recommendation_genre_seeds(self): - """ Get a list of genres available for the recommendations function. - """ - return self._get('recommendations/available-genre-seeds') - - def audio_analysis(self, track_id): - """ Get audio analysis for a track based upon its Spotify ID - Parameters: - - track_id - a track URI, URL or ID - """ - trid = self._get_id('track', track_id) - return self._get('audio-analysis/' + trid) - - def audio_features(self, tracks=[]): - """ Get audio features for one or multiple tracks based upon their Spotify IDs - Parameters: - - tracks - a list of track URIs, URLs or IDs, maximum: 50 ids - """ - if isinstance(tracks, str): - trackid = self._get_id('track', tracks) - results = self._get('audio-features/?ids=' + trackid) - else: - tlist = [self._get_id('track', t) for t in tracks] - results = self._get('audio-features/?ids=' + ','.join(tlist)) - # the response has changed, look for the new style first, and if - # its not there, fallback on the old style - if 'audio_features' in results: - return results['audio_features'] - else: - return results - - def audio_analysis(self, id): - """ Get audio analysis for a track based upon its Spotify ID - Parameters: - - id - a track URIs, URLs or IDs - """ - id = self._get_id('track', id) - return self._get('audio-analysis/'+id) - - def devices(self): - ''' Get a list of user's available devices. - ''' - return self._get("me/player/devices") - - def current_playback(self, market = None): - ''' Get information about user's current playback. - - Parameters: - - market - an ISO 3166-1 alpha-2 country code. - ''' - return self._get("me/player", market = market) - - def currently_playing(self, market = None): - ''' Get user's currently playing track. - - Parameters: - - market - an ISO 3166-1 alpha-2 country code. - ''' - return self._get("me/player/currently-playing", market = market) - - def transfer_playback(self, device_id, force_play = True): - ''' Transfer playback to another device. - Note that the API accepts a list of device ids, but only - actually supports one. - - Parameters: - - device_id - transfer playback to this device - - force_play - true: after transfer, play. false: - keep current state. - ''' - data = { - 'device_ids': [device_id], - 'play': force_play - } - return self._put("me/player", payload=data) - - def start_playback(self, device_id = None, context_uri = None, uris = None, offset = None): - ''' Start or resume user's playback. - - Provide a `context_uri` to start playback or a album, - artist, or playlist. - - Provide a `uris` list to start playback of one or more - tracks. - - Provide `offset` as {"position": } or {"uri": ""} - to start playback at a particular offset. - - Parameters: - - device_id - device target for playback - - context_uri - spotify context uri to play - - uris - spotify track uris - - offset - offset into context by index or track - ''' - if context_uri is not None and uris is not None: - self._warn('specify either context uri or uris, not both') - return - if uris is not None and not isinstance(uris, list): - self._warn('uris must be a list') - return - data = {} - if context_uri is not None: - data['context_uri'] = context_uri - if uris is not None: - data['uris'] = uris - if offset is not None: - data['offset'] = offset - return self._put(self._append_device_id("me/player/play", device_id), payload=data) - - def pause_playback(self, device_id = None): - ''' Pause user's playback. - - Parameters: - - device_id - device target for playback - ''' - return self._put(self._append_device_id("me/player/pause", device_id)) - - def next_track(self, device_id = None): - ''' Skip user's playback to next track. - - Parameters: - - device_id - device target for playback - ''' - return self._post(self._append_device_id("me/player/next", device_id)) - - def previous_track(self, device_id = None): - ''' Skip user's playback to previous track. - - Parameters: - - device_id - device target for playback - ''' - return self._post(self._append_device_id("me/player/previous", device_id)) - - def seek_track(self, position_ms, device_id = None): - ''' Seek to position in current track. - - Parameters: - - position_ms - position in milliseconds to seek to - - device_id - device target for playback - ''' - if not isinstance(position_ms, int): - self._warn('position_ms must be an integer') - return - return self._put(self._append_device_id("me/player/seek?position_ms=%s" % position_ms, device_id)) - - def repeat(self, state, device_id = None): - ''' Set repeat mode for playback. - - Parameters: - - state - `track`, `context`, or `off` - - device_id - device target for playback - ''' - if state not in ['track', 'context', 'off']: - self._warn('invalid state') - return - self._put(self._append_device_id("me/player/repeat?state=%s" % state, device_id)) - - def volume(self, volume_percent, device_id = None): - ''' Set playback volume. - - Parameters: - - volume_percent - volume between 0 and 100 - - device_id - device target for playback - ''' - if not isinstance(volume_percent, int): - self._warn('volume must be an integer') - return - if volume_percent < 0 or volume_percent > 100: - self._warn('volume must be between 0 and 100, inclusive') - return - self._put(self._append_device_id("me/player/volume?volume_percent=%s" % volume_percent, device_id)) - - def shuffle(self, state, device_id = None): - ''' Toggle playback shuffling. - - Parameters: - - state - true or false - - device_id - device target for playback - ''' - if not isinstance(state, bool): - self._warn('state must be a boolean') - return - state = str(state).lower() - self._put(self._append_device_id("me/player/shuffle?state=%s" % state, device_id)) - - def _append_device_id(self, path, device_id): - ''' Append device ID to API path. - - Parameters: - - device_id - device id to append - ''' - if device_id: - if '?' in path: - path += "&device_id=%s" % device_id - else: - path += "?device_id=%s" % device_id - return path - - def _get_id(self, type, id): - fields = id.split(':') - if len(fields) >= 3: - if type != fields[-2]: - self._warn('expected id of type %s but found type %s %s', - type, fields[-2], id) - return fields[-1] - fields = id.split('/') - if len(fields) >= 3: - itype = fields[-2] - if type != itype: - self._warn('expected id of type %s but found type %s %s', - type, itype, id) - return fields[-1] - return id - - def _get_uri(self, type, id): - return 'spotify:' + type + ":" + self._get_id(type, id) diff --git a/spotipy/oauth2.py b/spotipy/oauth2.py deleted file mode 100644 index cef7908..0000000 --- a/spotipy/oauth2.py +++ /dev/null @@ -1,264 +0,0 @@ - -from __future__ import print_function -import base64 -import requests -import os -import json -import time -import sys - -# Workaround to support both python 2 & 3 -import six -import six.moves.urllib.parse as urllibparse - - -class SpotifyOauthError(Exception): - pass - - -def _make_authorization_headers(client_id, client_secret): - auth_header = base64.b64encode(six.text_type(client_id + ':' + client_secret).encode('ascii')) - return {'Authorization': 'Basic %s' % auth_header.decode('ascii')} - - -def is_token_expired(token_info): - now = int(time.time()) - return token_info['expires_at'] - now < 60 - - -class SpotifyClientCredentials(object): - OAUTH_TOKEN_URL = 'https://accounts.spotify.com/api/token' - - def __init__(self, client_id=None, client_secret=None, proxies=None): - """ - You can either provid a client_id and client_secret to the - constructor or set SPOTIPY_CLIENT_ID and SPOTIPY_CLIENT_SECRET - environment variables - """ - if not client_id: - client_id = os.getenv('SPOTIPY_CLIENT_ID') - - if not client_secret: - client_secret = os.getenv('SPOTIPY_CLIENT_SECRET') - - if not client_id: - raise SpotifyOauthError('No client id') - - if not client_secret: - raise SpotifyOauthError('No client secret') - - self.client_id = client_id - self.client_secret = client_secret - self.token_info = None - self.proxies = proxies - - def get_access_token(self): - """ - If a valid access token is in memory, returns it - Else feches a new token and returns it - """ - if self.token_info and not self.is_token_expired(self.token_info): - return self.token_info['access_token'] - - token_info = self._request_access_token() - token_info = self._add_custom_values_to_token_info(token_info) - self.token_info = token_info - return self.token_info['access_token'] - - def _request_access_token(self): - """Gets client credentials access token """ - payload = { 'grant_type': 'client_credentials'} - - headers = _make_authorization_headers(self.client_id, self.client_secret) - - response = requests.post(self.OAUTH_TOKEN_URL, data=payload, - headers=headers, verify=True, proxies=self.proxies) - if response.status_code != 200: - raise SpotifyOauthError(response.reason) - token_info = response.json() - return token_info - - def is_token_expired(self, token_info): - return is_token_expired(token_info) - - def _add_custom_values_to_token_info(self, token_info): - """ - Store some values that aren't directly provided by a Web API - response. - """ - token_info['expires_at'] = int(time.time()) + token_info['expires_in'] - return token_info - - -class SpotifyOAuth(object): - ''' - Implements Authorization Code Flow for Spotify's OAuth implementation. - ''' - - OAUTH_AUTHORIZE_URL = 'https://accounts.spotify.com/authorize' - OAUTH_TOKEN_URL = 'https://accounts.spotify.com/api/token' - - def __init__(self, client_id, client_secret, redirect_uri, - state=None, scope=None, cache_path=None, proxies=None): - ''' - Creates a SpotifyOAuth object - - Parameters: - - client_id - the client id of your app - - client_secret - the client secret of your app - - redirect_uri - the redirect URI of your app - - state - security state - - scope - the desired scope of the request - - cache_path - path to location to save tokens - ''' - - self.client_id = client_id - self.client_secret = client_secret - self.redirect_uri = redirect_uri - self.state=state - self.cache_path = cache_path - self.scope=self._normalize_scope(scope) - self.proxies = proxies - - def get_cached_token(self): - ''' Gets a cached auth token - ''' - token_info = None - if self.cache_path: - try: - f = open(self.cache_path) - token_info_string = f.read() - f.close() - token_info = json.loads(token_info_string) - - # if scopes don't match, then bail - if 'scope' not in token_info or not self._is_scope_subset(self.scope, token_info['scope']): - return None - - if self.is_token_expired(token_info): - token_info = self.refresh_access_token(token_info['refresh_token']) - - except IOError: - pass - return token_info - - def _save_token_info(self, token_info): - if self.cache_path: - try: - f = open(self.cache_path, 'w') - f.write(json.dumps(token_info)) - f.close() - except IOError: - self._warn("couldn't write token cache to " + self.cache_path) - pass - - def _is_scope_subset(self, needle_scope, haystack_scope): - needle_scope = set(needle_scope.split()) if needle_scope else set() - haystack_scope = set(haystack_scope.split()) if haystack_scope else set() - return needle_scope <= haystack_scope - - def is_token_expired(self, token_info): - return is_token_expired(token_info) - - def get_authorize_url(self, state=None, show_dialog=False): - """ Gets the URL to use to authorize this app - """ - payload = {'client_id': self.client_id, - 'response_type': 'code', - 'redirect_uri': self.redirect_uri} - if self.scope: - payload['scope'] = self.scope - if state is None: - state = self.state - if state is not None: - payload['state'] = state - if show_dialog: - payload['show_dialog'] = True - - urlparams = urllibparse.urlencode(payload) - - return "%s?%s" % (self.OAUTH_AUTHORIZE_URL, urlparams) - - def parse_response_code(self, url): - """ Parse the response code in the given response url - - Parameters: - - url - the response url - """ - - try: - return url.split("?code=")[1].split("&")[0] - except IndexError: - return None - - def _make_authorization_headers(self): - return _make_authorization_headers(self.client_id, self.client_secret) - - def get_access_token(self, code): - """ Gets the access token for the app given the code - - Parameters: - - code - the response code - """ - - payload = {'redirect_uri': self.redirect_uri, - 'code': code, - 'grant_type': 'authorization_code'} - if self.scope: - payload['scope'] = self.scope - if self.state: - payload['state'] = self.state - - headers = self._make_authorization_headers() - - response = requests.post(self.OAUTH_TOKEN_URL, data=payload, - headers=headers, verify=True, proxies=self.proxies) - if response.status_code != 200: - raise SpotifyOauthError(response.reason) - token_info = response.json() - token_info = self._add_custom_values_to_token_info(token_info) - self._save_token_info(token_info) - return token_info - - def _normalize_scope(self, scope): - if scope: - scopes = scope.split() - scopes.sort() - return ' '.join(scopes) - else: - return None - - def refresh_access_token(self, refresh_token): - payload = { 'refresh_token': refresh_token, - 'grant_type': 'refresh_token'} - - headers = self._make_authorization_headers() - - response = requests.post(self.OAUTH_TOKEN_URL, data=payload, - headers=headers, proxies=self.proxies) - if response.status_code != 200: - if False: # debugging code - print('headers', headers) - print('request', response.url) - self._warn("couldn't refresh token: code:%d reason:%s" \ - % (response.status_code, response.reason)) - return None - token_info = response.json() - token_info = self._add_custom_values_to_token_info(token_info) - if not 'refresh_token' in token_info: - token_info['refresh_token'] = refresh_token - self._save_token_info(token_info) - return token_info - - def _add_custom_values_to_token_info(self, token_info): - ''' - Store some values that aren't directly provided by a Web API - response. - ''' - token_info['expires_at'] = int(time.time()) + token_info['expires_in'] - token_info['scope'] = self.scope - return token_info - - def _warn(self, msg): - print('warning:' + msg, file=sys.stderr) - diff --git a/spotipy/util.py b/spotipy/util.py deleted file mode 100644 index 91f9b36..0000000 --- a/spotipy/util.py +++ /dev/null @@ -1,93 +0,0 @@ - -# shows a user's playlists (need to be authenticated via oauth) - -from __future__ import print_function -import os -from . import oauth2 -import spotipy - -def prompt_for_user_token(username, scope=None, client_id = None, - client_secret = None, redirect_uri = None, cache_path = None): - ''' prompts the user to login if necessary and returns - the user token suitable for use with the spotipy.Spotify - constructor - - Parameters: - - - username - the Spotify username - - scope - the desired scope of the request - - client_id - the client id of your app - - client_secret - the client secret of your app - - redirect_uri - the redirect URI of your app - - cache_path - path to location to save tokens - - ''' - - if not client_id: - client_id = os.getenv('SPOTIPY_CLIENT_ID') - - if not client_secret: - client_secret = os.getenv('SPOTIPY_CLIENT_SECRET') - - if not redirect_uri: - redirect_uri = os.getenv('SPOTIPY_REDIRECT_URI') - - if not client_id: - print(''' - You need to set your Spotify API credentials. You can do this by - setting environment variables like so: - - export SPOTIPY_CLIENT_ID='your-spotify-client-id' - export SPOTIPY_CLIENT_SECRET='your-spotify-client-secret' - export SPOTIPY_REDIRECT_URI='your-app-redirect-url' - - Get your credentials at - https://developer.spotify.com/my-applications - ''') - raise spotipy.SpotifyException(550, -1, 'no credentials set') - - cache_path = cache_path or ".cache-" + username - sp_oauth = oauth2.SpotifyOAuth(client_id, client_secret, redirect_uri, - scope=scope, cache_path=cache_path) - - # try to get a valid token for this user, from the cache, - # if not in the cache, the create a new (this will send - # the user to a web page where they can authorize this app) - - token_info = sp_oauth.get_cached_token() - - if not token_info: - print(''' - - User authentication requires interaction with your - web browser. Once you enter your credentials and - give authorization, you will be redirected to - a url. Paste that url you were directed to to - complete the authorization. - - ''') - auth_url = sp_oauth.get_authorize_url() - # try: - # import webbrowser - # webbrowser.open(auth_url) - # print("Opened %s in your browser" % auth_url) - # except: - print("Please navigate here: %s" % auth_url) - - print() - print() - try: - response = raw_input("Enter the URL you were redirected to: ") - except NameError: - response = input("Enter the URL you were redirected to: ") - - print() - print() - - code = sp_oauth.parse_response_code(response) - token_info = sp_oauth.get_access_token(code) - # Auth'ed API request - if token_info: - return token_info['access_token'] - else: - return None diff --git a/spotr.py b/spotr.py index 4b41422..881a2cb 100644 --- a/spotr.py +++ b/spotr.py @@ -1,91 +1,204 @@ +import datetime +import logging +import typing +from os import path + +import requests import spotipy -import spotipy.util as util +from tqdm import tqdm + import cred -import requests -from os import path -username = cred.username +logging.basicConfig(level=logging.INFO) -scope = 'playlist-read-private playlist-modify-private' +# Retrieve info from cred.py +username = cred.username client_id = cred.client_id client_secret = cred.client_secret -# Modified lines in util.py: -# try: -# import webbrowser -# webbrowser.open(auth_url) -# print("Opened %s in your browser" % auth_url) -# except: -# Removed to prevent Python trying to open a webbrowser +scope = "playlist-read-private playlist-modify-private" + + +class Spotr: + def __init__(self): + self.sp = None + + def authenticate(self): + logging.info("Attempting to authenticate...") + + self.sp = spotipy.Spotify( + auth_manager=spotipy.oauth2.SpotifyOAuth( + scope=scope, + client_id=client_id, + client_secret=client_secret, + redirect_uri="http://localhost", + open_browser=False, + ) + ) + + return True if self.sp.me() else False + + def get_download_playlist(self) -> str: + """ + Find the ID of the playlist to download tracks from. + """ + playlists = self.sp.user_playlists(username) + + for playlist in playlists["items"]: + if playlist["name"] == cred.playlist: + + logging.info("Found playlist: " + cred.playlist) + logging.info("ID: " + playlist["id"]) -print('Attempting to fetch token...') + return playlist["id"] -token = util.prompt_for_user_token( - username, scope, client_id, client_secret, redirect_uri="http://localhost") + logging.error("Unable to find playlist: " + cred.playlist) + return False -if token: + def get_all_tracks(self, playlist_id: str) -> list: + """ + Get all tracks from the download playlist. + """ - print('Token fetch successful') + logging.info("Getting all tracks from playlist...") - sp = spotipy.Spotify(auth=token) + # Get playlist from id + playlist = self.sp.user_playlist(username, playlist_id) - print('Requesting playlists...') - playlists = sp.user_playlists(username) - - for playlist in playlists['items']: - if playlist['name'] == cred.playlist: + # Calculate how many requests will need to be made to get all tracks + playlist_length = playlist["tracks"]["total"] + requests_needed = playlist_length // 100 + 1 - print('Found playlist: ' + cred.playlist) + # Get all tracks in playlist + playlist_tracks = [] + for i in tqdm(range(requests_needed)): + playlist_tracks += self.sp.playlist_items(playlist["id"], offset=i * 100)[ + "items" + ] - playlist_id = playlist['id'] + assert len(playlist_tracks) == playlist_length, "Not all tracks were retrieved" - print('ID: ' + playlist_id) + return playlist_tracks - data = sp.user_playlist(username, playlist['id'], - fields="tracks") + def convert_to_deezer(self, spotify_tracks: list) -> typing.Tuple[list, list]: + """ + Convert a list of spotify tracks to a list of deezer tracks. - tracks = data['tracks']['items'] + Returns a tuple of two lists: + - The first list contains the deezer track ids + - The second list contains the corresponding spotify track ids + """ - links = [] - remove = [] - seen = [] + logging.info("Converting to deezer...") - for track in tracks: - isrc = track['track']['external_ids']['isrc'] + deezer_links = [] + spotify_successful = [] + + for track in tqdm(spotify_tracks): + try: + isrc = track["track"]["external_ids"]["isrc"] url = "https://api.deezer.com/2.0/track/isrc:" + isrc response = requests.get(url) if response.status_code != 200: - print('Unable to find isrc: ' + isrc) + logging.warning("Unable to find isrc: " + isrc) else: - links.append(response.json()['link']) - remove.append( - track['track']['uri'].lstrip('spotify:track:')) + deezer_links.append(response.json()["link"]) + spotify_successful.append( + track["track"]["uri"].lstrip("spotify:track:") + ) + except: + logging.warning( + "Track does not have a valid ISRC. Please download manually." + ) + logging.info( + "Track name: " + + track["track"]["name"] + + " by " + + track["track"]["artists"][0]["name"] + ) + + return deezer_links, spotify_successful + + def append_to_download_file(self, deezer_links: list): + """ + Append the deezer links to the download file. + """ + + seen = [] + if path.exists(cred.filename): + + logging.info(cred.filename + " already exists! Appending to file...") + + with open(cred.filename, "r") as f: + seen = f.readlines() + + seen = [line.strip("\n") for line in seen] + + with open(cred.filename, "a+") as f: + f.write("\n") + for link in deezer_links: + if link not in seen: + f.write(link + "\n") + + logging.info("Appended tracks to file") + + def remove_tracks_from_spotify(self, playlist_id: str, track_ids: list): + """ + Remove tracks from the download playlist. + """ + + logging.info("Removing tracks from Spotify...") + + # Split the tracks into groups of 100 to prevent 413 error + track_ids_split = [ + track_ids[i : i + 100] for i in range(0, len(track_ids), 100) + ] + + for tracks in track_ids_split: + self.sp.playlist_remove_all_occurrences_of_items(playlist_id, tracks) + + logging.info("Removed tracks from Spotify") + - if path.exists(cred.filename): +def main(): + # Print runtime info + logging.info("Running at " + str(datetime.datetime.now())) - print(cred.filename + ' already exists! Appending to file...') + spotr = Spotr() - with open(cred.filename, 'r') as f: - seen = f.readlines() + # Authenticate Spotify + if spotr.authenticate(): + logging.info("Authentication successful") + else: + logging.critical("Authentication failed") + exit(1) - seen = [line.strip('\n') for line in seen] + # Get playlist id as specified in cred.py + if playlist_id := spotr.get_download_playlist(): + logging.info("Download playlist found") + else: + logging.critical("Download playlist not found") + exit(1) - with open(cred.filename, 'a+') as f: - for link in links: - if link not in seen: - f.write("%s\n" % link) + # Get all tracks from playlist + if download_tracks := spotr.get_all_tracks(playlist_id): + logging.info("All tracks found") + else: + logging.critical("No tracks found") + exit(1) - print('Appended tracks to file') + # Convert to deezer links + deezer_links, spotify_successful = spotr.convert_to_deezer(download_tracks) - print('Removing tracks from Spotify...') + # Append to download file + spotr.append_to_download_file(deezer_links) - sp.user_playlist_remove_all_occurrences_of_tracks( - username, playlist_id, remove) + # Remove successful tracks from Spotify playlist + spotr.remove_tracks_from_spotify(playlist_id, spotify_successful) - print('Success!') -else: - print('No token supplied') \ No newline at end of file +if __name__ == "__main__": + main()