From 8407feed47415659afbf20e7eb877218248f00f2 Mon Sep 17 00:00:00 2001 From: Sean Reifschneider Date: Sun, 4 Feb 2024 11:51:07 -0700 Subject: [PATCH] Adding spotify-backup license and reference, reformatting with black --- README.md | 3 + spotify2ytmusic/backend.py | 68 +++-- spotify2ytmusic/cli.py | 1 - spotify2ytmusic/gui.py | 223 +++++++++++----- spotify2ytmusic/reverse_playlist.py | 15 +- spotify2ytmusic/spotify_backup.py | 401 +++++++++++++++------------- 6 files changed, 423 insertions(+), 288 deletions(-) diff --git a/README.md b/README.md index 386a1c5..3bd6638 100644 --- a/README.md +++ b/README.md @@ -150,4 +150,7 @@ This is mostly for debugging, but there is a command to search for tracks in YTM Creative Commons Zero v1.0 Universal +spotify-backup.py licensed under MIT License +See https://github.com/caseychu/spotify-backup for more information + [//]: # ( vim: set tw=90 ts=4 sw=4 ai: ) diff --git a/spotify2ytmusic/backend.py b/spotify2ytmusic/backend.py index 7b7d9f8..afcace3 100644 --- a/spotify2ytmusic/backend.py +++ b/spotify2ytmusic/backend.py @@ -147,7 +147,9 @@ def get_playlist_id_by_name(yt: YTMusic, title: str) -> Optional[str]: return None -def lookup_song(yt: YTMusic, track_name: str, artist_name: str, album_name, yt_search_algo: int) -> dict: +def lookup_song( + yt: YTMusic, track_name: str, artist_name: str, album_name, yt_search_algo: int +) -> dict: """Look up a song on YTMusic Given the Spotify track information, it does a lookup for the album by the same @@ -188,12 +190,11 @@ def lookup_song(yt: YTMusic, track_name: str, artist_name: str, album_name, yt_s print(f"Unable to lookup album ({e}), continuing...") songs = yt.search(query=f"{track_name} by {artist_name}", filter="songs") - + match yt_search_algo: - case 0: return songs[0] - + case 1: for song in songs: if ( @@ -204,24 +205,28 @@ def lookup_song(yt: YTMusic, track_name: str, artist_name: str, album_name, yt_s return song # print(f"SONG: {song['videoId']} - {song['title']} - {song['artists'][0]['name']} - {song['album']['name']}") - raise ValueError(f"Did not find {track_name} by {artist_name} from {album_name}") - + raise ValueError( + f"Did not find {track_name} by {artist_name} from {album_name}" + ) + case 2: # This would need to do fuzzy matching for song in songs: # Remove everything in brackets in the song title - song_title_without_brackets = re.sub(r'[\[\(].*?[\]\)]', '', song["title"]) - if (( - song_title_without_brackets == track_name - and song["album"]["name"] == album_name - ) or ( - song_title_without_brackets == track_name - ) or ( - song_title_without_brackets in track_name - ) or ( - track_name in song_title_without_brackets - )) and ( - song["artists"][0]["name"] == artist_name or artist_name in song["artists"][0]["name"] + song_title_without_brackets = re.sub( + r"[\[\(].*?[\]\)]", "", song["title"] + ) + if ( + ( + song_title_without_brackets == track_name + and song["album"]["name"] == album_name + ) + or (song_title_without_brackets == track_name) + or (song_title_without_brackets in track_name) + or (track_name in song_title_without_brackets) + ) and ( + song["artists"][0]["name"] == artist_name + or artist_name in song["artists"][0]["name"] ): return song @@ -230,22 +235,35 @@ def lookup_song(yt: YTMusic, track_name: str, artist_name: str, album_name, yt_s else: track_name = track_name.lower() first_song_title = songs[0]["title"].lower() - if track_name not in first_song_title or songs[0]["artists"][0]["name"] != artist_name: # If the first song is not the one we are looking for + if ( + track_name not in first_song_title + or songs[0]["artists"][0]["name"] != artist_name + ): # If the first song is not the one we are looking for print("Not found in songs, searching videos") - new_songs = yt.search(query=f"{track_name} by {artist_name}", filter="videos") # Search videos - + new_songs = yt.search( + query=f"{track_name} by {artist_name}", filter="videos" + ) # Search videos + # From here, we search for videos reposting the song. They often contain the name of it and the artist. Like with 'Nekfeu - Ecrire'. for new_song in new_songs: - new_song_title = new_song["title"].lower() # People sometimes mess up the capitalization in the title - if (track_name in new_song_title and artist_name in new_song_title) or (track_name in new_song_title): + new_song_title = new_song[ + "title" + ].lower() # People sometimes mess up the capitalization in the title + if ( + track_name in new_song_title + and artist_name in new_song_title + ) or (track_name in new_song_title): print("Found a video") return new_song - else: + else: # Basically we only get here if the song isnt present anywhere on youtube - raise ValueError(f"Did not find {track_name} by {artist_name} from {album_name}") + raise ValueError( + f"Did not find {track_name} by {artist_name} from {album_name}" + ) else: return songs[0] + def copier( src_tracks: Iterator[SongInfo], dst_pl_id: Optional[str] = None, diff --git a/spotify2ytmusic/cli.py b/spotify2ytmusic/cli.py index aa3c646..93d8ab6 100644 --- a/spotify2ytmusic/cli.py +++ b/spotify2ytmusic/cli.py @@ -172,7 +172,6 @@ def parse_arguments(): None, args.dry_run, args.track_sleep, - ) diff --git a/spotify2ytmusic/gui.py b/spotify2ytmusic/gui.py index ff48c93..3bb6952 100644 --- a/spotify2ytmusic/gui.py +++ b/spotify2ytmusic/gui.py @@ -15,11 +15,26 @@ def create_label(parent, text, **kwargs) -> tk.Label: - return tk.Label(parent, text=text, font=("Helvetica", 14), background="#26242f", foreground="white", **kwargs) + return tk.Label( + parent, + text=text, + font=("Helvetica", 14), + background="#26242f", + foreground="white", + **kwargs, + ) def create_button(parent, text, **kwargs): - return tk.Button(parent, text=text, font=("Helvetica", 14), background="#696969", foreground="white", border=1, **kwargs) + return tk.Button( + parent, + text=text, + font=("Helvetica", 14), + background="#696969", + foreground="white", + border=1, + **kwargs, + ) class Window: @@ -30,11 +45,17 @@ def __init__(self) -> None: self.root.config(background="#26242f") style = ttk.Style() - style.theme_use('default') - style.configure("TNotebook.Tab", background="#121212", foreground="white") # Set the background color to #121212 when not selected - style.map("TNotebook.Tab", background=[("selected", "#26242f")], foreground=[("selected", "#ffffff")]) # Set the background color to #26242f and text color to white when selected - style.configure('TFrame', background='#26242f') - style.configure('TNotebook', background='#121212') + style.theme_use("default") + style.configure( + "TNotebook.Tab", background="#121212", foreground="white" + ) # Set the background color to #121212 when not selected + style.map( + "TNotebook.Tab", + background=[("selected", "#26242f")], + foreground=[("selected", "#ffffff")], + ) # Set the background color to #26242f and text color to white when selected + style.configure("TFrame", background="#26242f") + style.configure("TNotebook", background="#121212") # Redirect stdout to GUI sys.stdout.write = self.redirector @@ -63,16 +84,16 @@ def __init__(self) -> None: self.tab5 = ttk.Frame(self.tabControl) self.tab6 = ttk.Frame(self.tabControl) self.tab7 = ttk.Frame(self.tabControl) - - self.tabControl.add(self.tab0, text='Login to YT Music') - self.tabControl.add(self.tab1, text='Spotify backup') - self.tabControl.add(self.tab2, text='Reverse playlist') - self.tabControl.add(self.tab3, text='Load liked songs') - self.tabControl.add(self.tab4, text='List playlists') - self.tabControl.add(self.tab5, text='Copy all playlists') - self.tabControl.add(self.tab6, text='Copy a specific playlist') - self.tabControl.add(self.tab7, text='Settings') - + + self.tabControl.add(self.tab0, text="Login to YT Music") + self.tabControl.add(self.tab1, text="Spotify backup") + self.tabControl.add(self.tab2, text="Reverse playlist") + self.tabControl.add(self.tab3, text="Load liked songs") + self.tabControl.add(self.tab4, text="List playlists") + self.tabControl.add(self.tab5, text="Copy all playlists") + self.tabControl.add(self.tab6, text="Copy a specific playlist") + self.tabControl.add(self.tab7, text="Settings") + # Create a Frame for the logs self.log_frame = ttk.Frame(self.paned_window) self.paned_window.add(self.log_frame, weight=1) @@ -83,77 +104,125 @@ def __init__(self) -> None: self.logs.config(background="#26242f", foreground="white") # tab 0 - create_label(self.tab0, text="Welcome to Spotify to YT Music!\nTo start, you need to login to YT Music.").pack( - anchor=tk.CENTER, expand=True) - create_button(self.tab0, text="Login", command=self.yt_login).pack(anchor=tk.CENTER, expand=True) + create_label( + self.tab0, + text="Welcome to Spotify to YT Music!\nTo start, you need to login to YT Music.", + ).pack(anchor=tk.CENTER, expand=True) + create_button(self.tab0, text="Login", command=self.yt_login).pack( + anchor=tk.CENTER, expand=True + ) # tab1 - create_label(self.tab1, text="First, you need to backup your spotify playlists").pack(anchor=tk.CENTER, - expand=True) - create_button(self.tab1, text="Backup", command=lambda: self.call_func(spotify_backup.main, self.tab2)).pack( - anchor=tk.CENTER, expand=True) + create_label( + self.tab1, text="First, you need to backup your spotify playlists" + ).pack(anchor=tk.CENTER, expand=True) + create_button( + self.tab1, + text="Backup", + command=lambda: self.call_func(spotify_backup.main, self.tab2), + ).pack(anchor=tk.CENTER, expand=True) # tab2 - create_label(self.tab2, - text="Since this program likes the last added song first, you need to reverse the playlist if " - "you want to keep the exact same playlists.\nBut this step is not mandatory, you can skip " - "it if you don't mind by clicking here.").pack( - anchor=tk.CENTER, expand=True) - create_button(self.tab2, text="Skip", command=lambda: self.tabControl.select(self.tab3)).pack(anchor=tk.CENTER, expand=True) - create_button(self.tab2, text="Reverse", command=self.call_reverse).pack(anchor=tk.CENTER, expand=True) + create_label( + self.tab2, + text="Since this program likes the last added song first, you need to reverse the playlist if " + "you want to keep the exact same playlists.\nBut this step is not mandatory, you can skip " + "it if you don't mind by clicking here.", + ).pack(anchor=tk.CENTER, expand=True) + create_button( + self.tab2, text="Skip", command=lambda: self.tabControl.select(self.tab3) + ).pack(anchor=tk.CENTER, expand=True) + create_button(self.tab2, text="Reverse", command=self.call_reverse).pack( + anchor=tk.CENTER, expand=True + ) # tab3 - create_label(self.tab3, text="Now, you can load your liked songs.").pack(anchor=tk.CENTER, expand=True) - create_button(self.tab3, text="Load", command=lambda: self.call_func(cli.load_liked, self.tab4)).pack( - anchor=tk.CENTER, expand=True) + create_label(self.tab3, text="Now, you can load your liked songs.").pack( + anchor=tk.CENTER, expand=True + ) + create_button( + self.tab3, + text="Load", + command=lambda: self.call_func(cli.load_liked, self.tab4), + ).pack(anchor=tk.CENTER, expand=True) # tab4 - create_label(self.tab4, text="Here, you can get a list of your playlists, with their ID.").pack( - anchor=tk.CENTER, expand=True) - create_button(self.tab4, text="List", command=lambda: self.call_func(cli.list_playlists, self.tab5)).pack( - anchor=tk.CENTER, expand=True) + create_label( + self.tab4, text="Here, you can get a list of your playlists, with their ID." + ).pack(anchor=tk.CENTER, expand=True) + create_button( + self.tab4, + text="List", + command=lambda: self.call_func(cli.list_playlists, self.tab5), + ).pack(anchor=tk.CENTER, expand=True) # tab5 - create_label(self.tab5, - text="Here, you can copy all your playlists from Spotify to YT Music. Please note that this step " - "can take a long time since songs are added one by one.").pack( - anchor=tk.CENTER, expand=True) - create_button(self.tab5, text="Copy", command=lambda: self.call_func(backend.copy_all_playlists, self.tab6)).pack( - anchor=tk.CENTER, expand=True) + create_label( + self.tab5, + text="Here, you can copy all your playlists from Spotify to YT Music. Please note that this step " + "can take a long time since songs are added one by one.", + ).pack(anchor=tk.CENTER, expand=True) + create_button( + self.tab5, + text="Copy", + command=lambda: self.call_func(backend.copy_all_playlists, self.tab6), + ).pack(anchor=tk.CENTER, expand=True) # tab6 - create_label(self.tab6, text="Here, you can copy a specific playlist from Spotify to YT Music.").pack( - anchor=tk.CENTER, expand=True) - create_label(self.tab6, text="Spotify playlist ID:").pack(anchor=tk.CENTER, expand=True) + create_label( + self.tab6, + text="Here, you can copy a specific playlist from Spotify to YT Music.", + ).pack(anchor=tk.CENTER, expand=True) + create_label(self.tab6, text="Spotify playlist ID:").pack( + anchor=tk.CENTER, expand=True + ) self.spotify_playlist_id = tk.Entry(self.tab6) self.spotify_playlist_id.pack(anchor=tk.CENTER, expand=True) - create_label(self.tab6, text="YT Music playlist ID:").pack(anchor=tk.CENTER, expand=True) + create_label(self.tab6, text="YT Music playlist ID:").pack( + anchor=tk.CENTER, expand=True + ) self.yt_playlist_id = tk.Entry(self.tab6) self.yt_playlist_id.pack(anchor=tk.CENTER, expand=True) - create_button(self.tab6, text="Copy", command=self.call_copy_playlist).pack(anchor=tk.CENTER, expand=True) - + create_button(self.tab6, text="Copy", command=self.call_copy_playlist).pack( + anchor=tk.CENTER, expand=True + ) + # tab7 self.var_scroll = tk.BooleanVar() - - auto_scroll = tk.Checkbutton(self.tab7, text="Auto scroll", variable=self.var_scroll, command=lambda: self.load_write_settings(1), background="#696969", foreground="#ffffff", selectcolor="#26242f", border=1) + + auto_scroll = tk.Checkbutton( + self.tab7, + text="Auto scroll", + variable=self.var_scroll, + command=lambda: self.load_write_settings(1), + background="#696969", + foreground="#ffffff", + selectcolor="#26242f", + border=1, + ) auto_scroll.pack(anchor=tk.CENTER, expand=True) auto_scroll.select() - + self.var_algo = tk.IntVar() self.var_algo.set(0) - + self.algo_label = create_label(self.tab7, text=f"Algorithm: ") self.algo_label.pack(anchor=tk.CENTER, expand=True) - - menu_algo = tk.OptionMenu(self.tab7, self.var_algo, 0, *[1, 2], command=lambda x: self.load_write_settings(1)) + + menu_algo = tk.OptionMenu( + self.tab7, + self.var_algo, + 0, + *[1, 2], + command=lambda x: self.load_write_settings(1), + ) menu_algo.pack(anchor=tk.CENTER, expand=True) menu_algo.config(background="#696969", foreground="#ffffff", border=1) - def redirector(self, input_str="") -> None: """ Inserts the input string into the logs widget and disables editing. - + Args: self: The instance of the class. input_str (str): The string to be inserted into the logs' widget. @@ -177,14 +246,18 @@ def call_copy_playlist(self): yt_playlist_id = self.yt_playlist_id.get() print() - + if spotify_playlist_id == "": print("Please enter the Spotify playlist ID") return if yt_playlist_id == "": - print("No Youtube playlist ID, creating one and naming it by the source playlist ID.") + print( + "No Youtube playlist ID, creating one and naming it by the source playlist ID." + ) backend.create_playlist(spotify_playlist_id) - th = threading.Thread(target=backend.copy_playlist, args=(spotify_playlist_id, yt_playlist_id)) + th = threading.Thread( + target=backend.copy_playlist, args=(spotify_playlist_id, yt_playlist_id) + ) th.start() while th.is_alive(): self.root.update() @@ -196,7 +269,9 @@ def call_reverse(self): result = [0] # Shared data structure def target(): - result[0] = reverse_playlist(replace=True) # Call the function with specific arguments + result[0] = reverse_playlist( + replace=True + ) # Call the function with specific arguments th = threading.Thread(target=target) th.start() @@ -219,14 +294,24 @@ def run_in_thread(): command = ["ytmusicapi", "oauth"] # Open a new console window to run the command - if os.name == 'nt': # If the OS is Windows - process = subprocess.Popen(command, creationflags=subprocess.CREATE_NEW_CONSOLE) + if os.name == "nt": # If the OS is Windows + process = subprocess.Popen( + command, creationflags=subprocess.CREATE_NEW_CONSOLE + ) process.communicate() else: # For Unix and Linux try: - subprocess.call('x-terminal-emulator -e ytmusicapi oauth', shell=True, stdout=subprocess.PIPE) + subprocess.call( + "x-terminal-emulator -e ytmusicapi oauth", + shell=True, + stdout=subprocess.PIPE, + ) except: - subprocess.call('xterm -e ytmusicapi oauth', shell=True, stdout=subprocess.PIPE) + subprocess.call( + "xterm -e ytmusicapi oauth", + shell=True, + stdout=subprocess.PIPE, + ) self.tabControl.select(self.tab1) print() @@ -234,10 +319,10 @@ def run_in_thread(): # Run the function in a separate thread th = threading.Thread(target=run_in_thread) th.start() - + def load_write_settings(self, action: int) -> None: texts = {0: "Exact match", 1: "Fuzzy match", 2: "Fuzzy match with videos"} - + exist = True if action == 0: with open("settings.json", "a+") as f: @@ -257,7 +342,7 @@ def load_write_settings(self, action: int) -> None: settings["auto_scroll"] = self.var_scroll.get() settings["algo_number"] = self.var_algo.get() json.dump(settings, f) - + self.algo_label.config(text=f"Algorithm: {texts[self.var_algo.get()]}") self.root.update() diff --git a/spotify2ytmusic/reverse_playlist.py b/spotify2ytmusic/reverse_playlist.py index 0e5f4de..035452d 100644 --- a/spotify2ytmusic/reverse_playlist.py +++ b/spotify2ytmusic/reverse_playlist.py @@ -9,7 +9,9 @@ def reverse_playlist(input_file="playlists.json", verbose=True, replace=False) -> int: if os.path.exists(input_file) and not replace: if verbose: - print("Output file already exists and no replace argument detected, exiting...") + print( + "Output file already exists and no replace argument detected, exiting..." + ) return 1 print("Backing up file...") @@ -46,8 +48,15 @@ def reverse_playlist(input_file="playlists.json", verbose=True, replace=False) - if __name__ == "__main__": parser = ArgumentParser() parser.add_argument("input_file", type=str, help="Path to the input file") - parser.add_argument("-v", "--verbose", action="store_false", help="Enable verbose mode") - parser.add_argument("-r", "--replace", action="store_true", help="Replace the output file if already existing") + parser.add_argument( + "-v", "--verbose", action="store_false", help="Enable verbose mode" + ) + parser.add_argument( + "-r", + "--replace", + action="store_true", + help="Replace the output file if already existing", + ) args = parser.parse_args() diff --git a/spotify2ytmusic/spotify_backup.py b/spotify2ytmusic/spotify_backup.py index d9e9e20..1933ac9 100644 --- a/spotify2ytmusic/spotify_backup.py +++ b/spotify2ytmusic/spotify_backup.py @@ -6,193 +6,214 @@ class SpotifyAPI: - - # Requires an OAuth token. - def __init__(self, auth): - self._auth = auth - - # Gets a resource from the Spotify API and returns the object. - def get(self, url, params={}, tries=3): - # Construct the correct URL. - if not url.startswith('https://api.spotify.com/v1/'): - url = 'https://api.spotify.com/v1/' + url - if params: - url += ('&' if '?' in url else '?') + urllib.parse.urlencode(params) - - # Try the sending off the request a specified number of times before giving up. - for _ in range(tries): - try: - req = urllib.request.Request(url) - req.add_header('Authorization', 'Bearer ' + self._auth) - res = urllib.request.urlopen(req) - reader = codecs.getreader('utf-8') - return json.load(reader(res)) - except Exception as err: - print('Couldn\'t load URL: {} ({})'.format(url, err)) - time.sleep(2) - print('Trying again...') - sys.exit(1) - - # The Spotify API breaks long lists into multiple pages. This method automatically - # fetches all pages and joins them, returning in a single list of objects. - def list(self, url, params={}): - last_log_time = time.time() - response = self.get(url, params) - items = response['items'] - - while response['next']: - if time.time() > last_log_time + 15: - last_log_time = time.time() - print(f"Loaded {len(items)}/{response['total']} items") - - response = self.get(response['next']) - items += response['items'] - return items - - # Pops open a browser window for a user to log in and authorize API access. - @staticmethod - def authorize(client_id, scope): - url = 'https://accounts.spotify.com/authorize?' + urllib.parse.urlencode({ - 'response_type': 'token', - 'client_id': client_id, - 'scope': scope, - 'redirect_uri': 'http://127.0.0.1:{}/redirect'.format(SpotifyAPI._SERVER_PORT) - }) - print(f'Logging in (click if it doesn\'t open automatically): {url}') - webbrowser.open(url) - - # Start a simple, local HTTP server to listen for the authorization token... (i.e. a hack). - server = SpotifyAPI._AuthorizationServer('127.0.0.1', SpotifyAPI._SERVER_PORT) - try: - while True: - server.handle_request() - except SpotifyAPI._Authorization as auth: - return SpotifyAPI(auth.access_token) - - # The port that the local server listens on. Don't change this, - # as Spotify only will redirect to certain predefined URLs. - _SERVER_PORT = 43019 - - class _AuthorizationServer(http.server.HTTPServer): - def __init__(self, host, port): - http.server.HTTPServer.__init__(self, (host, port), SpotifyAPI._AuthorizationHandler) - - # Disable the default error handling. - def handle_error(self, request, client_address): - raise - - class _AuthorizationHandler(http.server.BaseHTTPRequestHandler): - def do_GET(self): - # The Spotify API has redirected here, but access_token is hidden in the URL fragment. - # Read it using JavaScript and send it to /token as an actual query string... - if self.path.startswith('/redirect'): - self.send_response(200) - self.send_header('Content-Type', 'text/html') - self.end_headers() - self.wfile.write(b'') - - # Read access_token and use an exception to kill the server listening... - elif self.path.startswith('/token?'): - self.send_response(200) - self.send_header('Content-Type', 'text/html') - self.end_headers() - self.wfile.write(b'Thanks! You may now close this window.') - - access_token = re.search('access_token=([^&]*)', self.path).group(1) - print(f'Received access token from Spotify: {access_token}') - raise SpotifyAPI._Authorization(access_token) - - else: - self.send_error(404) - - # Disable the default logging. - def log_message(self, format, *args): - pass - - class _Authorization(Exception): - def __init__(self, access_token): - self.access_token = access_token - - -def main(dump = "playlists,liked", format = "json", file = "playlists.json", token=""): - print("Starting backup...") - # If they didn't give a filename, then just prompt them. (They probably just double-clicked.) - while file == "": - file = input('Enter a file name (e.g. playlists.txt): ') - format = file.split('.')[-1] - - # Log into the Spotify API. - if token != "": - spotify = SpotifyAPI(token) - else: - spotify = SpotifyAPI.authorize(client_id='5c098bcc800e45d49e476265bc9b6934', - scope='playlist-read-private playlist-read-collaborative user-library-read') - - # Get the ID of the logged in user. - print('Loading user info...') - me = spotify.get('me') - print('Logged in as {display_name} ({id})'.format(**me)) - - playlists = [] - liked_albums = [] - - # List liked albums and songs - if 'liked' in dump: - print('Loading liked albums and songs...') - liked_tracks = spotify.list('users/{user_id}/tracks'.format(user_id=me['id']), {'limit': 50}) - liked_albums = spotify.list('me/albums', {'limit': 50}) - playlists += [{'name': 'Liked Songs', 'tracks': liked_tracks}] - - # List all playlists and the tracks in each playlist - if 'playlists' in dump: - print('Loading playlists...') - playlist_data = spotify.list('users/{user_id}/playlists'.format(user_id=me['id']), {'limit': 50}) - print(f'Found {len(playlist_data)} playlists') - - # List all tracks in each playlist - for playlist in playlist_data: - print('Loading playlist: {name} ({tracks[total]} songs)'.format(**playlist)) - playlist['tracks'] = spotify.list(playlist['tracks']['href'], {'limit': 100}) - playlists += playlist_data - - # Write the file. - print('Writing files...') - with open(file, 'w', encoding='utf-8') as f: - # JSON file. - if format == 'json': - json.dump({ - 'playlists': playlists, - 'albums': liked_albums - }, f) - - # Tab-separated file. - else: - f.write('Playlists: \r\n\r\n') - for playlist in playlists: - f.write(playlist['name'] + '\r\n') - for track in playlist['tracks']: - if track['track'] is None: - continue - f.write('{name}\t{artists}\t{album}\t{uri}\t{release_date}\r\n'.format( - uri=track['track']['uri'], - name=track['track']['name'], - artists=', '.join([artist['name'] for artist in track['track']['artists']]), - album=track['track']['album']['name'], - release_date=track['track']['album']['release_date'] - )) - f.write('\r\n') - if len(liked_albums) > 0: - f.write('Liked Albums: \r\n\r\n') - for album in liked_albums: - uri = album['album']['uri'] - name = album['album']['name'] - artists = ', '.join([artist['name'] for artist in album['album']['artists']]) - release_date = album['album']['release_date'] - album = f'{artists} - {name}' - - f.write(f'{name}\t{artists}\t-\t{uri}\t{release_date}\r\n') - - print('Wrote file: ' + file) - -if __name__ == '__main__': - main() + # Requires an OAuth token. + def __init__(self, auth): + self._auth = auth + + # Gets a resource from the Spotify API and returns the object. + def get(self, url, params={}, tries=3): + # Construct the correct URL. + if not url.startswith("https://api.spotify.com/v1/"): + url = "https://api.spotify.com/v1/" + url + if params: + url += ("&" if "?" in url else "?") + urllib.parse.urlencode(params) + + # Try the sending off the request a specified number of times before giving up. + for _ in range(tries): + try: + req = urllib.request.Request(url) + req.add_header("Authorization", "Bearer " + self._auth) + res = urllib.request.urlopen(req) + reader = codecs.getreader("utf-8") + return json.load(reader(res)) + except Exception as err: + print("Couldn't load URL: {} ({})".format(url, err)) + time.sleep(2) + print("Trying again...") + sys.exit(1) + + # The Spotify API breaks long lists into multiple pages. This method automatically + # fetches all pages and joins them, returning in a single list of objects. + def list(self, url, params={}): + last_log_time = time.time() + response = self.get(url, params) + items = response["items"] + + while response["next"]: + if time.time() > last_log_time + 15: + last_log_time = time.time() + print(f"Loaded {len(items)}/{response['total']} items") + + response = self.get(response["next"]) + items += response["items"] + return items + + # Pops open a browser window for a user to log in and authorize API access. + @staticmethod + def authorize(client_id, scope): + url = "https://accounts.spotify.com/authorize?" + urllib.parse.urlencode( + { + "response_type": "token", + "client_id": client_id, + "scope": scope, + "redirect_uri": "http://127.0.0.1:{}/redirect".format( + SpotifyAPI._SERVER_PORT + ), + } + ) + print(f"Logging in (click if it doesn't open automatically): {url}") + webbrowser.open(url) + + # Start a simple, local HTTP server to listen for the authorization token... (i.e. a hack). + server = SpotifyAPI._AuthorizationServer("127.0.0.1", SpotifyAPI._SERVER_PORT) + try: + while True: + server.handle_request() + except SpotifyAPI._Authorization as auth: + return SpotifyAPI(auth.access_token) + + # The port that the local server listens on. Don't change this, + # as Spotify only will redirect to certain predefined URLs. + _SERVER_PORT = 43019 + + class _AuthorizationServer(http.server.HTTPServer): + def __init__(self, host, port): + http.server.HTTPServer.__init__( + self, (host, port), SpotifyAPI._AuthorizationHandler + ) + + # Disable the default error handling. + def handle_error(self, request, client_address): + raise + + class _AuthorizationHandler(http.server.BaseHTTPRequestHandler): + def do_GET(self): + # The Spotify API has redirected here, but access_token is hidden in the URL fragment. + # Read it using JavaScript and send it to /token as an actual query string... + if self.path.startswith("/redirect"): + self.send_response(200) + self.send_header("Content-Type", "text/html") + self.end_headers() + self.wfile.write( + b'' + ) + + # Read access_token and use an exception to kill the server listening... + elif self.path.startswith("/token?"): + self.send_response(200) + self.send_header("Content-Type", "text/html") + self.end_headers() + self.wfile.write( + b"Thanks! You may now close this window." + ) + + access_token = re.search("access_token=([^&]*)", self.path).group(1) + print(f"Received access token from Spotify: {access_token}") + raise SpotifyAPI._Authorization(access_token) + + else: + self.send_error(404) + + # Disable the default logging. + def log_message(self, format, *args): + pass + + class _Authorization(Exception): + def __init__(self, access_token): + self.access_token = access_token + + +def main(dump="playlists,liked", format="json", file="playlists.json", token=""): + print("Starting backup...") + # If they didn't give a filename, then just prompt them. (They probably just double-clicked.) + while file == "": + file = input("Enter a file name (e.g. playlists.txt): ") + format = file.split(".")[-1] + + # Log into the Spotify API. + if token != "": + spotify = SpotifyAPI(token) + else: + spotify = SpotifyAPI.authorize( + client_id="5c098bcc800e45d49e476265bc9b6934", + scope="playlist-read-private playlist-read-collaborative user-library-read", + ) + + # Get the ID of the logged in user. + print("Loading user info...") + me = spotify.get("me") + print("Logged in as {display_name} ({id})".format(**me)) + + playlists = [] + liked_albums = [] + + # List liked albums and songs + if "liked" in dump: + print("Loading liked albums and songs...") + liked_tracks = spotify.list( + "users/{user_id}/tracks".format(user_id=me["id"]), {"limit": 50} + ) + liked_albums = spotify.list("me/albums", {"limit": 50}) + playlists += [{"name": "Liked Songs", "tracks": liked_tracks}] + + # List all playlists and the tracks in each playlist + if "playlists" in dump: + print("Loading playlists...") + playlist_data = spotify.list( + "users/{user_id}/playlists".format(user_id=me["id"]), {"limit": 50} + ) + print(f"Found {len(playlist_data)} playlists") + + # List all tracks in each playlist + for playlist in playlist_data: + print("Loading playlist: {name} ({tracks[total]} songs)".format(**playlist)) + playlist["tracks"] = spotify.list( + playlist["tracks"]["href"], {"limit": 100} + ) + playlists += playlist_data + + # Write the file. + print("Writing files...") + with open(file, "w", encoding="utf-8") as f: + # JSON file. + if format == "json": + json.dump({"playlists": playlists, "albums": liked_albums}, f) + + # Tab-separated file. + else: + f.write("Playlists: \r\n\r\n") + for playlist in playlists: + f.write(playlist["name"] + "\r\n") + for track in playlist["tracks"]: + if track["track"] is None: + continue + f.write( + "{name}\t{artists}\t{album}\t{uri}\t{release_date}\r\n".format( + uri=track["track"]["uri"], + name=track["track"]["name"], + artists=", ".join( + [artist["name"] for artist in track["track"]["artists"]] + ), + album=track["track"]["album"]["name"], + release_date=track["track"]["album"]["release_date"], + ) + ) + f.write("\r\n") + if len(liked_albums) > 0: + f.write("Liked Albums: \r\n\r\n") + for album in liked_albums: + uri = album["album"]["uri"] + name = album["album"]["name"] + artists = ", ".join( + [artist["name"] for artist in album["album"]["artists"]] + ) + release_date = album["album"]["release_date"] + album = f"{artists} - {name}" + + f.write(f"{name}\t{artists}\t-\t{uri}\t{release_date}\r\n") + + print("Wrote file: " + file) + + +if __name__ == "__main__": + main()