diff --git a/README.md b/README.md index 1074719..bea8b48 100644 --- a/README.md +++ b/README.md @@ -52,9 +52,12 @@ Options that can be configured in zs_config.json: ``` ### Will my account get banned if I use this tool? Currently no user has reported their account getting banned after using ZSpotify. -This isn't to say _you_ won't get banned as it is technically againt Spotify's TOS. +This isn't to say _you_ won't get banned as it is technically against Spotify's TOS. **Use ZSpotify at your own risk**, the developers of ZSpotify are not responsible if your account gets banned. +### Contributing +Please be sure to lint your code with pylint before issuing a pull-request, thanks! + ## **Changelog:** **v2.1 (Oct 2021):** - Moved configuration from hard-coded values to separate zs_config.json file diff --git a/zspotify.py b/zspotify.py index 8e25e72..16c51a7 100755 --- a/zspotify.py +++ b/zspotify.py @@ -7,6 +7,7 @@ It's like youtube-dl, but for Spotify. (Made by Deathmonger/Footsiefat - @doomslayer117:matrix.org) """ +from getpass import getpass import json import os import os.path @@ -14,21 +15,21 @@ import platform import re import sys import time -from getpass import getpass -import music_tag -import requests from librespot.audio.decoders import AudioQuality, VorbisOnlyAudioQuality from librespot.core import Session from librespot.metadata import TrackId, EpisodeId +import music_tag from pydub import AudioSegment +import requests from tqdm import tqdm +QUALITY = None SESSION: Session = None -sanitize = ["\\", "/", ":", "*", "?", "'", "<", ">", '"'] +SANITIZE = ["\\", "/", ":", "*", "?", "'", "<", ">", "\""] # user-customizable variables that adjust the core functionality of ZSpotify -with open('zs_config.json') as config_file: +with open("zs_config.json", encoding="utf-8") as config_file: ZS_CONFIG = json.load(config_file) @@ -46,14 +47,13 @@ def clear(): def wait(seconds: int = 3): """ Pause for a set number of seconds """ for i in range(seconds)[::-1]: - print("\rWait for %d second(s)..." % (i + 1), end="") + print(f"\rWait for {i + 1} second(s)...", end="") time.sleep(1) def sanitize_data(value): """ Returns given string with problematic removed """ - global sanitize - for i in sanitize: + for i in SANITIZE: value = value.replace(i, "") return value.replace("|", "-") @@ -64,12 +64,11 @@ def split_input(selection): if "-" in selection: for number in range(int(selection.split("-")[0]), int(selection.split("-")[1]) + 1): inputs.append(number) - return inputs else: selections = selection.split(",") for i in selections: inputs.append(i.strip()) - return inputs + return inputs def splash(): @@ -86,7 +85,7 @@ def splash(): # two mains functions for logging in and doing client stuff def login(): """ Authenticates with Spotify and saves credentials to a file """ - global SESSION + global SESSION # pylint: disable=global-statement if os.path.isfile("credentials.json"): try: @@ -104,9 +103,9 @@ def login(): pass -def client(): +def client(): # pylint: disable=too-many-branches,too-many-statements """ Connects to spotify to perform query's and get songs to download """ - global QUALITY, SESSION + global QUALITY # pylint: disable=global-statement splash() token = SESSION.tokens().get("user-read-email") @@ -124,11 +123,11 @@ def client(): download_from_user_playlist() elif sys.argv[1] == "-ls" or sys.argv[1] == "--liked-songs": for song in get_saved_tracks(token): - if not song['track']['name']: + if not song["track"]["name"]: print( "### SKIPPING: SONG DOES NOT EXISTS ON SPOTIFY ANYMORE ###") else: - download_track(song['track']['id'], "Liked Songs/") + download_track(song["track"]["id"], "Liked Songs/") print("\n") else: track_id_str, album_id_str, playlist_id_str, episode_id_str, show_id_str, artist_id_str = regex_input_for_urls( @@ -142,9 +141,9 @@ def client(): download_album(album_id_str) elif playlist_id_str is not None: playlist_songs = get_playlist_songs(token, playlist_id_str) - name, creator = get_playlist_info(token, playlist_id_str) + name, _ = get_playlist_info(token, playlist_id_str) for song in playlist_songs: - download_track(song['track']['id'], + download_track(song["track"]["id"], sanitize_data(name) + "/") print("\n") elif episode_id_str is not None: @@ -152,7 +151,6 @@ def client(): elif show_id_str is not None: for episode in get_show_episodes(token, show_id_str): download_episode(episode) - else: search_text = input("Enter search or URL: ") @@ -167,9 +165,9 @@ def client(): download_album(album_id_str) elif playlist_id_str is not None: playlist_songs = get_playlist_songs(token, playlist_id_str) - name, creator = get_playlist_info(token, playlist_id_str) + name, _ = get_playlist_info(token, playlist_id_str) for song in playlist_songs: - download_track(song['track']['id'], + download_track(song["track"]["id"], sanitize_data(name) + "/") print("\n") elif episode_id_str is not None: @@ -182,7 +180,8 @@ def client(): # wait() -def regex_input_for_urls(search_input): +def regex_input_for_urls(search_input): # pylint: disable=too-many-locals + """ Since many kinds of search may be passed at the command line, process them all here. """ track_uri_search = re.search( r"^spotify:track:(?P[0-9a-zA-Z]{22})$", search_input) track_url_search = re.search( @@ -270,24 +269,23 @@ def regex_input_for_urls(search_input): return track_id_str, album_id_str, playlist_id_str, episode_id_str, show_id_str, artist_id_str -def get_episode_info(episode_id_str): +def get_episode_info(episode_id_str): # pylint: disable=missing-function-docstring token = SESSION.tokens().get("user-read-email") - info = json.loads(requests.get("https://api.spotify.com/v1/episodes/" + - episode_id_str, headers={"Authorization": "Bearer %s" % token}).text) + info = json.loads(requests.get(f"https://api.spotify.com/v1/episodes/{episode_id_str}", + headers={"Authorization": f"Bearer {token}"}).text) if "error" in info: return None, None - else: - # print(info['images'][0]['url']) - return sanitize_data(info["show"]["name"]), sanitize_data(info["name"]) + # print(info["images"][0]["url"]) + return sanitize_data(info["show"]["name"]), sanitize_data(info["name"]) -def get_show_episodes(access_token, show_id_str): +def get_show_episodes(access_token, show_id_str): # pylint: disable=missing-function-docstring episodes = [] - headers = {'Authorization': f'Bearer {access_token}'} + headers = {"Authorization": f"Bearer {access_token}"} resp = requests.get( - f'https://api.spotify.com/v1/shows/{show_id_str}/episodes', headers=headers).json() + f"https://api.spotify.com/v1/shows/{show_id_str}/episodes", headers=headers).json() for episode in resp["items"]: episodes.append(episode["id"]) @@ -295,7 +293,7 @@ def get_show_episodes(access_token, show_id_str): return episodes -def download_episode(episode_id_str): +def download_episode(episode_id_str): # pylint: disable=missing-function-docstring podcast_name, episode_name = get_episode_info(episode_id_str) extra_paths = podcast_name + "/" @@ -315,15 +313,15 @@ def download_episode(episode_id_str): extra_paths, exist_ok=True) total_size = stream.input_stream.size - with open(ZS_CONFIG["ROOT_PODCAST_PATH"] + extra_paths + filename + ".wav", 'wb') as file, tqdm( + with open(ZS_CONFIG["ROOT_PODCAST_PATH"] + extra_paths + filename + ".wav", "wb") as file, tqdm( desc=filename, total=total_size, - unit='B', + unit="B", unit_scale=True, unit_divisor=1024 - ) as bar: + ) as p_bar: for _ in range(int(total_size / ZS_CONFIG["CHUNK_SIZE"]) + 1): - bar.update(file.write( + p_bar.update(file.write( stream.input_stream.stream().read(ZS_CONFIG["CHUNK_SIZE"]))) # convert_audio_format(ZS_CONFIG["ROOT_PODCAST_PATH"] + @@ -332,7 +330,7 @@ def download_episode(episode_id_str): # related functions that do stuff with the spotify API -def search(search_term): +def search(search_term): # pylint: disable=too-many-locals,too-many-branches """ Searches Spotify's API for relevant data """ token = SESSION.tokens().get("user-read-email") @@ -344,7 +342,7 @@ def search(search_term): "q": search_term, "type": "track,album,playlist" }, - headers={"Authorization": "Bearer %s" % token}, + headers={"Authorization": f"Bearer {token}"}, ) # print(resp.json()) @@ -358,12 +356,7 @@ def search(search_term): explicit = "[E]" else: explicit = "" - print("%d, %s %s | %s" % ( - i, - track["name"], - explicit, - ",".join([artist["name"] for artist in track["artists"]]), - )) + print(f"{i}, {track['name']} {explicit} | {','.join([artist['name'] for artist in track['artists']])}") i += 1 total_tracks = i - 1 print("\n") @@ -374,11 +367,7 @@ def search(search_term): if len(albums) > 0: print("### ALBUMS ###") for album in albums: - print("%d, %s | %s" % ( - i, - album["name"], - ",".join([artist["name"] for artist in album["artists"]]), - )) + print(f"{i}, {album['name']} | {','.join([artist['name'] for artist in album['artists']])}") i += 1 total_albums = i - total_tracks - 1 print("\n") @@ -388,11 +377,7 @@ def search(search_term): playlists = resp.json()["playlists"]["items"] print("### PLAYLISTS ###") for playlist in playlists: - print("%d, %s | %s" % ( - i, - playlist["name"], - playlist['owner']['display_name'], - )) + print(f"{i}, {playlist['name']} | {playlist['owner']['display_name']}") i += 1 print("\n") @@ -412,11 +397,11 @@ def search(search_term): playlist_choice = playlists[position - total_tracks - total_albums - 1] playlist_songs = get_playlist_songs( - token, playlist_choice['id']) + token, playlist_choice["id"]) for song in playlist_songs: - if song['track']['id'] is not None: - download_track(song['track']['id'], sanitize_data( - playlist_choice['name'].strip()) + "/") + if song["track"]["id"] is not None: + download_track(song["track"]["id"], sanitize_data( + playlist_choice["name"].strip()) + "/") print("\n") @@ -425,19 +410,19 @@ def get_song_info(song_id): token = SESSION.tokens().get("user-read-email") info = json.loads(requests.get("https://api.spotify.com/v1/tracks?ids=" + song_id + - '&market=from_token', headers={"Authorization": "Bearer %s" % token}).text) + "&market=from_token", headers={"Authorization": f"Bearer {token}"}).text) artists = [] - for data in info['tracks'][0]['artists']: - artists.append(sanitize_data(data['name'])) - album_name = sanitize_data(info['tracks'][0]['album']["name"]) - name = sanitize_data(info['tracks'][0]['name']) - image_url = info['tracks'][0]['album']['images'][0]['url'] - release_year = info['tracks'][0]['album']['release_date'].split("-")[0] - disc_number = info['tracks'][0]['disc_number'] - track_number = info['tracks'][0]['track_number'] - scraped_song_id = info['tracks'][0]['id'] - is_playable = info['tracks'][0]['is_playable'] + for data in info["tracks"][0]["artists"]: + artists.append(sanitize_data(data["name"])) + album_name = sanitize_data(info["tracks"][0]["album"]["name"]) + name = sanitize_data(info["tracks"][0]["name"]) + image_url = info["tracks"][0]["album"]["images"][0]["url"] + release_year = info["tracks"][0]["album"]["release_date"].split("-")[0] + disc_number = info["tracks"][0]["disc_number"] + track_number = info["tracks"][0]["track_number"] + scraped_song_id = info["tracks"][0]["id"] + is_playable = info["tracks"][0]["is_playable"] return artists, album_name, name, image_url, release_year, disc_number, track_number, scraped_song_id, is_playable @@ -461,16 +446,16 @@ def convert_audio_format(filename): filename, format=ZS_CONFIG["MUSIC_FORMAT"], bitrate=bitrate) -def set_audio_tags(filename, artists, name, album_name, release_year, disc_number, track_number): +def set_audio_tags(filename, artists, name, album_name, release_year, disc_number, track_number): # pylint: disable=too-many-arguments """ sets music_tag metadata """ # print("### SETTING MUSIC TAGS ###") tags = music_tag.load_file(filename) - tags['artist'] = conv_artist_format(artists) - tags['tracktitle'] = name - tags['album'] = album_name - tags['year'] = release_year - tags['discnumber'] = disc_number - tags['tracknumber'] = track_number + tags["artist"] = conv_artist_format(artists) + tags["tracktitle"] = name + tags["album"] = album_name + tags["year"] = release_year + tags["discnumber"] = disc_number + tags["tracknumber"] = track_number tags.save() @@ -479,7 +464,7 @@ def set_music_thumbnail(filename, image_url): # print("### SETTING THUMBNAIL ###") img = requests.get(image_url).content tags = music_tag.load_file(filename) - tags['artwork'] = img + tags["artwork"] = img tags.save() @@ -499,14 +484,14 @@ def get_all_playlists(access_token): offset = 0 while True: - headers = {'Authorization': f'Bearer {access_token}'} - params = {'limit': limit, 'offset': offset} + headers = {"Authorization": f"Bearer {access_token}"} + params = {"limit": limit, "offset": offset} resp = requests.get("https://api.spotify.com/v1/me/playlists", headers=headers, params=params).json() offset += limit - playlists.extend(resp['items']) + playlists.extend(resp["items"]) - if len(resp['items']) < limit: + if len(resp["items"]) < limit: break return playlists @@ -519,14 +504,14 @@ def get_playlist_songs(access_token, playlist_id): limit = 100 while True: - headers = {'Authorization': f'Bearer {access_token}'} - params = {'limit': limit, 'offset': offset} + headers = {"Authorization": f"Bearer {access_token}"} + params = {"limit": limit, "offset": offset} resp = requests.get( - f'https://api.spotify.com/v1/playlists/{playlist_id}/tracks', headers=headers, params=params).json() + f"https://api.spotify.com/v1/playlists/{playlist_id}/tracks", headers=headers, params=params).json() offset += limit - songs.extend(resp['items']) + songs.extend(resp["items"]) - if len(resp['items']) < limit: + if len(resp["items"]) < limit: break return songs @@ -534,10 +519,11 @@ def get_playlist_songs(access_token, playlist_id): def get_playlist_info(access_token, playlist_id): """ Returns information scraped from playlist """ - headers = {'Authorization': f'Bearer {access_token}'} + headers = {"Authorization": f"Bearer {access_token}"} resp = requests.get( - f'https://api.spotify.com/v1/playlists/{playlist_id}?fields=name,owner(display_name)&market=from_token', headers=headers).json() - return resp['name'].strip(), resp['owner']['display_name'].strip() + f"https://api.spotify.com/v1/playlists/{playlist_id}?fields=name,owner(display_name)&market=from_token", + headers=headers).json() + return resp["name"].strip(), resp["owner"]["display_name"].strip() # Extra functions directly related to spotify albums @@ -548,14 +534,14 @@ def get_album_tracks(access_token, album_id): limit = 50 while True: - headers = {'Authorization': f'Bearer {access_token}'} - params = {'limit': limit, 'offset': offset} + headers = {"Authorization": f"Bearer {access_token}"} + params = {"limit": limit, "offset": offset} resp = requests.get( - f'https://api.spotify.com/v1/albums/{album_id}/tracks', headers=headers, params=params).json() + f"https://api.spotify.com/v1/albums/{album_id}/tracks", headers=headers, params=params).json() offset += limit - songs.extend(resp['items']) + songs.extend(resp["items"]) - if len(resp['items']) < limit: + if len(resp["items"]) < limit: break return songs @@ -563,21 +549,21 @@ def get_album_tracks(access_token, album_id): def get_album_name(access_token, album_id): """ Returns album name """ - headers = {'Authorization': f'Bearer {access_token}'} + headers = {"Authorization": f"Bearer {access_token}"} resp = requests.get( - f'https://api.spotify.com/v1/albums/{album_id}', headers=headers).json() - return resp['artists'][0]['name'], sanitize_data(resp['name']) + f"https://api.spotify.com/v1/albums/{album_id}", headers=headers).json() + return resp["artists"][0]["name"], sanitize_data(resp["name"]) # Extra functions directly related to spotify artists def get_artist_albums(access_token, artist_id): """ Returns artist's albums """ - headers = {'Authorization': f'Bearer {access_token}'} + headers = {"Authorization": f"Bearer {access_token}"} resp = requests.get( - f'https://api.spotify.com/v1/artists/{artist_id}/albums', headers=headers).json() + f"https://api.spotify.com/v1/artists/{artist_id}/albums", headers=headers).json() # Return a list each album's id - return [resp['items'][i]['id'] for i in range(len(resp['items']))] + return [resp["items"][i]["id"] for i in range(len(resp["items"]))] # Extra functions directly related to our saved tracks @@ -589,21 +575,21 @@ def get_saved_tracks(access_token): limit = 50 while True: - headers = {'Authorization': f'Bearer {access_token}'} - params = {'limit': limit, 'offset': offset} - resp = requests.get('https://api.spotify.com/v1/me/tracks', + headers = {"Authorization": f"Bearer {access_token}"} + params = {"limit": limit, "offset": offset} + resp = requests.get("https://api.spotify.com/v1/me/tracks", headers=headers, params=params).json() offset += limit - songs.extend(resp['items']) + songs.extend(resp["items"]) - if len(resp['items']) < limit: + if len(resp["items"]) < limit: break return songs # Functions directly related to downloading stuff -def download_track(track_id_str: str, extra_paths="", prefix=False, prefix_value='', disable_progressbar=False): +def download_track(track_id_str: str, extra_paths="", prefix=False, prefix_value="", disable_progressbar=False): # pylint: disable=too-many-locals,too-many-branches """ Downloads raw song audio from Spotify """ try: artists, album_name, name, image_url, release_year, disc_number, track_number, scraped_song_id, is_playable = get_song_info( @@ -611,18 +597,18 @@ def download_track(track_id_str: str, extra_paths="", prefix=False, prefix_value song_name = artists[0] + " - " + name if prefix: - song_name = f'{prefix_value.zfill(2)} - {song_name}' if prefix_value.isdigit( - ) else f'{prefix_value} - {song_name}' + song_name = f"{prefix_value.zfill(2)} - {song_name}" if prefix_value.isdigit( + ) else f"{prefix_value} - {song_name}" if ZS_CONFIG["SPLIT_ALBUM_DISCS"]: filename = os.path.join(ZS_CONFIG["ROOT_PATH"], extra_paths, "Disc " + str( - disc_number) + '/' + song_name + '.' + ZS_CONFIG["MUSIC_FORMAT"]) + disc_number) + "/" + song_name + "." + ZS_CONFIG["MUSIC_FORMAT"]) else: filename = os.path.join(ZS_CONFIG["ROOT_PATH"], extra_paths, - song_name + '.' + ZS_CONFIG["MUSIC_FORMAT"]) - except Exception as e: + song_name + "." + ZS_CONFIG["MUSIC_FORMAT"]) + except Exception as err: # pylint: disable=broad-except,unused-variable print("### SKIPPING SONG - FAILED TO QUERY METADATA ###") - # print(e) + # print(err) else: try: if not is_playable: @@ -645,22 +631,22 @@ def download_track(track_id_str: str, extra_paths="", prefix=False, prefix_value if ZS_CONFIG["SPLIT_ALBUM_DISCS"]: os.makedirs( - ZS_CONFIG["ROOT_PATH"] + extra_paths + "/Disc " + str(disc_number) + '/', exist_ok=True) + ZS_CONFIG["ROOT_PATH"] + extra_paths + "/Disc " + str(disc_number) + "/", exist_ok=True) else: os.makedirs(ZS_CONFIG["ROOT_PATH"] + extra_paths, exist_ok=True) total_size = stream.input_stream.size - with open(filename, 'wb') as file, tqdm( + with open(filename, "wb") as file, tqdm( desc=song_name, total=total_size, - unit='B', + unit="B", unit_scale=True, unit_divisor=1024, disable=disable_progressbar - ) as bar: + ) as p_bar: for _ in range(int(total_size / ZS_CONFIG["CHUNK_SIZE"]) + 1): - bar.update(file.write( + p_bar.update(file.write( stream.input_stream.stream().read(ZS_CONFIG["CHUNK_SIZE"]))) if not ZS_CONFIG["RAW_AUDIO_AS_IS"]: @@ -671,7 +657,7 @@ def download_track(track_id_str: str, extra_paths="", prefix=False, prefix_value if not ZS_CONFIG["OVERRIDE_AUTO_WAIT"]: time.sleep(ZS_CONFIG["ANTI_BAN_WAIT_TIME"]) - except: + except Exception: # pylint: disable=broad-except print("### SKIPPING:", song_name, "(GENERAL DOWNLOAD ERROR) ###") if os.path.exists(filename): @@ -683,9 +669,9 @@ def download_album(album): token = SESSION.tokens().get("user-read-email") artist, album_name = get_album_name(token, album) tracks = get_album_tracks(token, album) - for n, track in tqdm(enumerate(tracks, start=1), unit_scale=True, unit='Song', total=len(tracks)): - download_track(track['id'], f'{artist}/{album_name}', - prefix=True, prefix_value=str(n), disable_progressbar=True) + for num, track in tqdm(enumerate(tracks, start=1), unit_scale=True, unit="Song", total=len(tracks)): + download_track(track["id"], f"{artist}/{album_name}", + prefix=True, prefix_value=str(num), disable_progressbar=True) def download_artist_albums(artist): @@ -701,12 +687,12 @@ def download_playlist(playlists, playlist_choice): token = SESSION.tokens().get("user-read-email") playlist_songs = get_playlist_songs( - token, playlists[int(playlist_choice) - 1]['id']) + token, playlists[int(playlist_choice) - 1]["id"]) for song in playlist_songs: - if song['track']['id'] is not None: - download_track(song['track']['id'], sanitize_data( - playlists[int(playlist_choice) - 1]['name'].strip()) + "/") + if song["track"]["id"] is not None: + download_track(song["track"]["id"], sanitize_data( + playlists[int(playlist_choice) - 1]["name"].strip()) + "/") print("\n") @@ -717,7 +703,7 @@ def download_from_user_playlist(): count = 1 for playlist in playlists: - print(str(count) + ": " + playlist['name'].strip()) + print(str(count) + ": " + playlist["name"].strip()) count += 1 print("\n> SELECT A PLAYLIST BY ID") @@ -743,7 +729,7 @@ def download_from_user_playlist(): # Core functions here -def check_raw(): +def check_raw(): # pylint: disable=missing-function-docstring if ZS_CONFIG["RAW_AUDIO_AS_IS"]: ZS_CONFIG["MUSIC_FORMAT"] = "wav"