diff --git a/requirements.txt b/requirements.txt index 5afa986..4f4b877 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ music_tag pydub Pillow tqdm +tabulate \ No newline at end of file diff --git a/src/album.py b/src/album.py new file mode 100644 index 0000000..12f86eb --- /dev/null +++ b/src/album.py @@ -0,0 +1,54 @@ +from tqdm import tqdm + +from const import ITEMS, ARTISTS, NAME, ID +from track import download_track +from utils import sanitize_data +from zspotify import ZSpotify + +ALBUM_URL = 'https://api.spotify.com/v1/albums' +ARTIST_URL = 'https://api.spotify.com/v1/artists' + + +def get_album_tracks(album_id): + """ Returns album tracklist """ + songs = [] + offset = 0 + limit = 50 + + while True: + resp = ZSpotify.invoke_url_with_params(f'{ALBUM_URL}/{album_id}/tracks', limit=limit, offset=offset) + offset += limit + songs.extend(resp[ITEMS]) + if len(resp[ITEMS]) < limit: + break + + return songs + + +def get_album_name(album_id): + """ Returns album name """ + resp = ZSpotify.invoke_url(f'{ALBUM_URL}/{album_id}') + return resp[ARTISTS][0][NAME], sanitize_data(resp[NAME]) + + +def get_artist_albums(artist_id): + """ Returns artist's albums """ + resp = ZSpotify.invoke_url(f'{ARTIST_URL}/{artist_id}/albums') + # Return a list each album's id + return [resp[ITEMS][i][ID] for i in range(len(resp[ITEMS]))] + + +def download_album(album): + """ Downloads songs from an album """ + artist, album_name = get_album_name(album) + tracks = get_album_tracks(album) + for n, track in tqdm(enumerate(tracks, start=1), unit_scale=True, unit='Song', total=len(tracks)): + download_track(track[ID], f'{artist}/{album_name}', + prefix=True, prefix_value=str(n), disable_progressbar=True) + + +def download_artist_albums(artist): + """ Downloads albums of an artist """ + albums = get_artist_albums(artist) + for album_id in albums: + download_album(album_id) diff --git a/src/app.py b/src/app.py new file mode 100644 index 0000000..a3822aa --- /dev/null +++ b/src/app.py @@ -0,0 +1,175 @@ +import sys + +from librespot.audio.decoders import AudioQuality +from tabulate import tabulate + +from album import download_album, download_artist_albums +from const import TRACK, NAME, ID, ARTISTS, ITEMS, TRACKS, EXPLICIT, ALBUMS, OWNER, \ + PLAYLISTS, DISPLAY_NAME +from playlist import get_playlist_songs, get_playlist_info, download_from_user_playlist +from podcast import download_episode, get_show_episodes +from track import download_track, get_saved_tracks +from utils import sanitize_data, splash, split_input, regex_input_for_urls +from zspotify import ZSpotify + +SEARCH_URL = 'https://api.spotify.com/v1/search' + + +def client() -> None: + """ Connects to spotify to perform query's and get songs to download """ + ZSpotify() + splash() + + if ZSpotify.check_premium(): + print('[ DETECTED PREMIUM ACCOUNT - USING VERY_HIGH QUALITY ]\n\n') + ZSpotify.DOWNLOAD_QUALITY = AudioQuality.VERY_HIGH + else: + print('[ DETECTED FREE ACCOUNT - USING HIGH QUALITY ]\n\n') + ZSpotify.DOWNLOAD_QUALITY = AudioQuality.HIGH + + while True: + if len(sys.argv) > 1: + if sys.argv[1] == '-p' or sys.argv[1] == '--playlist': + download_from_user_playlist() + elif sys.argv[1] == '-ls' or sys.argv[1] == '--liked-songs': + for song in get_saved_tracks(): + if not song[TRACK][NAME]: + print('### SKIPPING: SONG DOES NOT EXISTS ON SPOTIFY ANYMORE ###') + else: + download_track(song[TRACK][ID], 'Liked Songs/') + print('\n') + else: + track_id, album_id, playlist_id, episode_id, show_id, artist_id = regex_input_for_urls(sys.argv[1]) + + if track_id is not None: + download_track(track_id) + elif artist_id is not None: + download_artist_albums(artist_id) + elif album_id is not None: + download_album(album_id) + elif playlist_id is not None: + playlist_songs = get_playlist_songs(playlist_id) + name, _ = get_playlist_info(playlist_id) + for song in playlist_songs: + download_track(song[TRACK][ID], + sanitize_data(name) + '/') + print('\n') + elif episode_id is not None: + download_episode(episode_id) + elif show_id is not None: + for episode in get_show_episodes(show_id): + download_episode(episode) + + else: + search_text = input('Enter search or URL: ') + + track_id, album_id, playlist_id, episode_id, show_id, artist_id = regex_input_for_urls(search_text) + + if track_id is not None: + download_track(track_id) + elif artist_id is not None: + download_artist_albums(artist_id) + elif album_id is not None: + download_album(album_id) + elif playlist_id is not None: + playlist_songs = get_playlist_songs(playlist_id) + name, _ = get_playlist_info(playlist_id) + for song in playlist_songs: + download_track(song[TRACK][ID], sanitize_data(name) + '/') + print('\n') + elif episode_id is not None: + download_episode(episode_id) + elif show_id is not None: + for episode in get_show_episodes(show_id): + download_episode(episode) + else: + search(search_text) + # wait() + + +def search(search_term): + """ Searches Spotify's API for relevant data """ + params = {'limit': '10', 'offset': '0', 'q': search_term, 'type': 'track,album,artist,playlist'} + resp = ZSpotify.invoke_url_with_params(SEARCH_URL, **params) + + counter = 1 + tracks = resp[TRACKS][ITEMS] + if len(tracks) > 0: + print('### TRACKS ###') + track_data = [] + for track in tracks: + if track[EXPLICIT]: + explicit = '[E]' + else: + explicit = '' + track_data.append([counter, f'{track[NAME]} {explicit}', + ','.join([artist[NAME] for artist in track[ARTISTS]])]) + counter += 1 + total_tracks = counter - 1 + print(tabulate(track_data, headers=['S.NO', 'Name', 'Artists'], tablefmt='pretty')) + print('\n') + else: + total_tracks = 0 + + albums = resp[ALBUMS][ITEMS] + if len(albums) > 0: + print('### ALBUMS ###') + album_data = [] + for album in albums: + album_data.append([counter, album[NAME], ','.join([artist[NAME] for artist in album[ARTISTS]])]) + counter += 1 + total_albums = counter - total_tracks - 1 + print(tabulate(album_data, headers=['S.NO', 'Album', 'Artists'], tablefmt='pretty')) + print('\n') + else: + total_albums = 0 + + artists = resp[ARTISTS][ITEMS] + if len(artists) > 0: + print("### ARTISTS ###") + artist_data = [] + for artist in artists: + artist_data.append([counter, artist[NAME]]) + counter += 1 + total_artists = counter - total_tracks - total_albums - 1 + print(tabulate(artist_data, headers=['S.NO', 'Name'], tablefmt='pretty')) + print("\n") + else: + total_artists = 0 + + playlists = resp[PLAYLISTS][ITEMS] + print('### PLAYLISTS ###') + playlist_data = [] + for playlist in playlists: + playlist_data.append([counter, playlist[NAME], playlist[OWNER][DISPLAY_NAME]]) + counter += 1 + print(tabulate(playlist_data, headers=['S.NO', 'Name', 'Owner'], tablefmt='pretty')) + print('\n') + + if len(tracks) + len(albums) + len(playlists) == 0: + print('NO RESULTS FOUND - EXITING...') + else: + selection = str(input('SELECT ITEM(S) BY S.NO: ')) + inputs = split_input(selection) + for pos in inputs: + position = int(pos) + if position <= total_tracks: + track_id = tracks[position - 1][ID] + download_track(track_id) + elif position <= total_albums + total_tracks: + download_album(albums[position - total_tracks - 1][ID]) + elif position <= total_artists + total_tracks + total_albums: + download_artist_albums(artists[position - total_tracks - total_albums - 1][ID]) + else: + playlist_choice = playlists[position - + total_tracks - total_albums - total_artists - 1] + playlist_songs = get_playlist_songs(playlist_choice[ID]) + for song in playlist_songs: + if song[TRACK][ID] is not None: + download_track(song[TRACK][ID], sanitize_data(playlist_choice[NAME].strip()) + "/", + disable_progressbar=True) + print("\n") + + +if __name__ == '__main__': + client() diff --git a/src/const.py b/src/const.py new file mode 100644 index 0000000..8548bd5 --- /dev/null +++ b/src/const.py @@ -0,0 +1,95 @@ +SANITIZE = ('\\', '/', ':', '*', '?', '\'', '<', '>', '"') + +SAVED_TRACKS_URL = 'https://api.spotify.com/v1/me/tracks' + +TRACKS_URL = 'https://api.spotify.com/v1/tracks' + +TRACKNUMBER = 'tracknumber' + +DISCNUMBER = 'discnumber' + +YEAR = 'year' + +ALBUM = 'album' + +TRACKTITLE = 'tracktitle' + +ARTIST = 'artist' + +ARTISTS = 'artists' + +ARTWORK = 'artwork' + +TRACKS = 'tracks' + +TRACK = 'track' + +ITEMS = 'items' + +NAME = 'name' + +ID = 'id' + +URL = 'url' + +RELEASE_DATE = 'release_date' + +IMAGES = 'images' + +LIMIT = 'limit' + +OFFSET = 'offset' + +AUTHORIZATION = 'Authorization' + +IS_PLAYABLE = 'is_playable' + +TRACK_NUMBER = 'track_number' + +DISC_NUMBER = 'disc_number' + +SHOW = 'show' + +ERROR = 'error' + +EXPLICIT = 'explicit' + +PLAYLISTS = 'playlists' + +OWNER = 'owner' + +DISPLAY_NAME = 'display_name' + +ALBUMS = 'albums' + +TYPE = 'type' + +PREMIUM = 'premium' + +USER_READ_EMAIL = 'user-read-email' + +WINDOWS_SYSTEM = 'Windows' + +CREDENTIALS_JSON = '../credentials.json' + +CONFIG_FILE_PATH = '../zs_config.json' + +ROOT_PATH = 'ROOT_PATH' + +ROOT_PODCAST_PATH = 'ROOT_PODCAST_PATH' + +SKIP_EXISTING_FILES = 'SKIP_EXISTING_FILES' + +DOWNLOAD_FORMAT = 'DOWNLOAD_FORMAT' + +RAW_AUDIO_AS_IS = 'RAW_AUDIO_AS_IS' + +FORCE_PREMIUM = 'FORCE_PREMIUM' + +ANTI_BAN_WAIT_TIME = 'ANTI_BAN_WAIT_TIME' + +OVERRIDE_AUTO_WAIT = 'OVERRIDE_AUTO_WAIT' + +CHUNK_SIZE = 'CHUNK_SIZE' + +SPLIT_ALBUM_DISCS = 'SPLIT_ALBUM_DISCS' diff --git a/src/playlist.py b/src/playlist.py new file mode 100644 index 0000000..c09433e --- /dev/null +++ b/src/playlist.py @@ -0,0 +1,85 @@ +from const import ITEMS, ID, TRACK, NAME +from track import download_track +from utils import sanitize_data +from zspotify import ZSpotify + +MY_PLAYLISTS_URL = 'https://api.spotify.com/v1/me/playlists' +PLAYLISTS_URL = 'https://api.spotify.com/v1/playlists' + + +def get_all_playlists(): + """ Returns list of users playlists """ + playlists = [] + limit = 50 + offset = 0 + + while True: + resp = ZSpotify.invoke_url_with_params(MY_PLAYLISTS_URL, limit=limit, offset=offset) + offset += limit + playlists.extend(resp[ITEMS]) + if len(resp[ITEMS]) < limit: + break + + return playlists + + +def get_playlist_songs(playlist_id): + """ returns list of songs in a playlist """ + songs = [] + offset = 0 + limit = 100 + + while True: + resp = ZSpotify.invoke_url_with_params(f'{PLAYLISTS_URL}/{playlist_id}/tracks', limit=limit, offset=offset) + offset += limit + songs.extend(resp[ITEMS]) + if len(resp[ITEMS]) < limit: + break + + return songs + + +def get_playlist_info(playlist_id): + """ Returns information scraped from playlist """ + resp = ZSpotify.invoke_url(f'{PLAYLISTS_URL}/{playlist_id}?fields=name,owner(display_name)&market=from_token') + return resp['name'].strip(), resp['owner']['display_name'].strip() + + +def download_playlist(playlists, playlist_choice): + """Downloads all the songs from a playlist""" + playlist_songs = get_playlist_songs(playlists[int(playlist_choice) - 1][ID]) + + for song in playlist_songs: + if song[TRACK][ID] is not None: + download_track(song[TRACK][ID], sanitize_data( + playlists[int(playlist_choice) - 1][NAME].strip()) + '/') + print('\n') + + +def download_from_user_playlist(): + """ Select which playlist(s) to download """ + playlists = get_all_playlists() + + count = 1 + for playlist in playlists: + print(str(count) + ': ' + playlist[NAME].strip()) + count += 1 + + print('\n> SELECT A PLAYLIST BY ID') + print('> SELECT A RANGE BY ADDING A DASH BETWEEN BOTH ID\'s') + print('> For example, typing 10 to get one playlist or 10-20 to get\nevery playlist from 10-20 (inclusive)\n') + + playlist_choices = input('ID(s): ').split('-') + + if len(playlist_choices) == 1: + download_playlist(playlists, playlist_choices[0]) + else: + start = int(playlist_choices[0]) + end = int(playlist_choices[1]) + 1 + + print(f'Downloading from {start} to {end}...') + + for playlist in range(start, end): + download_playlist(playlists, playlist) + + print('\n**All playlists have been downloaded**\n') diff --git a/src/podcast.py b/src/podcast.py new file mode 100644 index 0000000..008f060 --- /dev/null +++ b/src/podcast.py @@ -0,0 +1,64 @@ +from typing import Optional + +from librespot.audio.decoders import VorbisOnlyAudioQuality +from librespot.metadata import EpisodeId +from tqdm import tqdm + +from const import NAME, ERROR, SHOW, ITEMS, ID, ROOT_PODCAST_PATH, CHUNK_SIZE +from utils import sanitize_data, create_download_directory, MusicFormat +from zspotify import ZSpotify + +EPISODE_INFO_URL = 'https://api.spotify.com/v1/episodes' +SHOWS_URL = 'https://api.spotify.com/v1/shows' + + +def get_episode_info(episode_id_str) -> tuple[Optional[str], Optional[str]]: + info = ZSpotify.invoke_url(f'{EPISODE_INFO_URL}/{episode_id_str}') + if ERROR in info: + return None, None + return sanitize_data(info[SHOW][NAME]), sanitize_data(info[NAME]) + + +def get_show_episodes(show_id_str) -> list: + episodes = [] + + resp = ZSpotify.invoke_url(f'{SHOWS_URL}/{show_id_str}/episodes') + + for episode in resp[ITEMS]: + episodes.append(episode[ID]) + + return episodes + + +def download_episode(episode_id) -> None: + podcast_name, episode_name = get_episode_info(episode_id) + + extra_paths = podcast_name + '/' + + if podcast_name is None: + print('### SKIPPING: (EPISODE NOT FOUND) ###') + else: + filename = podcast_name + ' - ' + episode_name + + episode_id = EpisodeId.from_base62(episode_id) + stream = ZSpotify.get_content_stream(episode_id, ZSpotify.DOWNLOAD_QUALITY) + + create_download_directory(ZSpotify.get_config(ROOT_PODCAST_PATH) + extra_paths) + + total_size = stream.input_stream.size + with open(ZSpotify.get_config(ROOT_PODCAST_PATH) + extra_paths + filename + MusicFormat.WAV.value, + 'wb') as file, tqdm( + desc=filename, + total=total_size, + unit='B', + unit_scale=True, + unit_divisor=1024 + ) as bar: + for _ in range(int(total_size / ZSpotify.get_config(CHUNK_SIZE)) + 1): + bar.update(file.write( + stream.input_stream.stream().read(ZSpotify.get_config(CHUNK_SIZE)))) + + # convert_audio_format(ROOT_PODCAST_PATH + + # extra_paths + filename + '.wav') + + # related functions that do stuff with the spotify API diff --git a/src/track.py b/src/track.py new file mode 100644 index 0000000..12a5e39 --- /dev/null +++ b/src/track.py @@ -0,0 +1,126 @@ +import os +import time +from typing import Any + +from librespot.audio.decoders import AudioQuality +from librespot.metadata import TrackId +from pydub import AudioSegment +from tqdm import tqdm + +from const import TRACKS, ALBUM, NAME, ITEMS, DISC_NUMBER, TRACK_NUMBER, IS_PLAYABLE, ARTISTS, IMAGES, URL, \ + RELEASE_DATE, ID, TRACKS_URL, SAVED_TRACKS_URL, SPLIT_ALBUM_DISCS, ROOT_PATH, DOWNLOAD_FORMAT, CHUNK_SIZE, \ + SKIP_EXISTING_FILES, RAW_AUDIO_AS_IS, ANTI_BAN_WAIT_TIME, OVERRIDE_AUTO_WAIT +from utils import sanitize_data, set_audio_tags, set_music_thumbnail, create_download_directory, \ + MusicFormat +from zspotify import ZSpotify + + +def get_saved_tracks() -> list: + """ Returns user's saved tracks """ + songs = [] + offset = 0 + limit = 50 + + while True: + resp = ZSpotify.invoke_url_with_params(SAVED_TRACKS_URL, limit=limit, offset=offset) + offset += limit + songs.extend(resp[ITEMS]) + if len(resp[ITEMS]) < limit: + break + + return songs + + +def get_song_info(song_id) -> tuple[list[str], str, str, Any, Any, Any, Any, Any, Any]: + """ Retrieves metadata for downloaded songs """ + info = ZSpotify.invoke_url(f'{TRACKS_URL}?ids={song_id}&market=from_token') + + artists = [] + for data in info[TRACKS][0][ARTISTS]: + artists.append(sanitize_data(data[NAME])) + album_name = sanitize_data(info[TRACKS][0][ALBUM][NAME]) + name = sanitize_data(info[TRACKS][0][NAME]) + image_url = info[TRACKS][0][ALBUM][IMAGES][0][URL] + release_year = info[TRACKS][0][ALBUM][RELEASE_DATE].split('-')[0] + disc_number = info[TRACKS][0][DISC_NUMBER] + track_number = info[TRACKS][0][TRACK_NUMBER] + scraped_song_id = info[TRACKS][0][ID] + is_playable = info[TRACKS][0][IS_PLAYABLE] + + return artists, album_name, name, image_url, release_year, disc_number, track_number, scraped_song_id, is_playable + + +# noinspection PyBroadException +def download_track(track_id: str, extra_paths='', prefix=False, prefix_value='', disable_progressbar=False) -> None: + """ Downloads raw song audio from Spotify """ + try: + (artists, album_name, name, image_url, release_year, disc_number, + track_number, scraped_song_id, is_playable) = get_song_info(track_id) + + song_name = artists[0] + ' - ' + name + if prefix: + song_name = f'{prefix_value.zfill(2)}-{song_name}' if prefix_value.isdigit( + ) else f'{prefix_value}-{song_name}' + + if ZSpotify.get_config(SPLIT_ALBUM_DISCS): + filename = os.path.join(ZSpotify.get_config(ROOT_PATH), extra_paths, 'Disc ' + str( + disc_number) + '/' + song_name + '.' + ZSpotify.get_config(DOWNLOAD_FORMAT)) + else: + filename = os.path.join(ZSpotify.get_config(ROOT_PATH), extra_paths, + song_name + '.' + ZSpotify.get_config(DOWNLOAD_FORMAT)) + except Exception: + print('### SKIPPING SONG - FAILED TO QUERY METADATA ###') + else: + try: + if not is_playable: + print('### SKIPPING:', song_name, + '(SONG IS UNAVAILABLE) ###') + else: + if os.path.isfile(filename) and os.path.getsize(filename) and ZSpotify.get_config(SKIP_EXISTING_FILES): + print('### SKIPPING:', song_name, + '(SONG ALREADY EXISTS) ###') + else: + if track_id != scraped_song_id: + track_id = scraped_song_id + track_id = TrackId.from_base62(track_id) + stream = ZSpotify.get_content_stream(track_id, ZSpotify.DOWNLOAD_QUALITY) + create_download_directory(ZSpotify.get_config(ROOT_PATH) + extra_paths) + total_size = stream.input_stream.size + + with open(filename, 'wb') as file, tqdm( + desc=song_name, + total=total_size, + unit='B', + unit_scale=True, + unit_divisor=1024, + disable=disable_progressbar + ) as p_bar: + for _ in range(int(total_size / ZSpotify.get_config(CHUNK_SIZE)) + 1): + p_bar.update(file.write( + stream.input_stream.stream().read(ZSpotify.get_config(CHUNK_SIZE)))) + + if not ZSpotify.get_config(RAW_AUDIO_AS_IS): + convert_audio_format(filename) + set_audio_tags(filename, artists, name, album_name, + release_year, disc_number, track_number) + set_music_thumbnail(filename, image_url) + + if not ZSpotify.get_config(OVERRIDE_AUTO_WAIT): + time.sleep(ZSpotify.get_config(ANTI_BAN_WAIT_TIME)) + except Exception as e: + print('### SKIPPING:', song_name, + '(GENERAL DOWNLOAD ERROR) ###') + if os.path.exists(filename): + os.remove(filename) + + +def convert_audio_format(filename) -> None: + """ Converts raw audio into playable mp3 or ogg vorbis """ + # print('### CONVERTING TO ' + MUSIC_FORMAT.upper() + ' ###') + raw_audio = AudioSegment.from_file(filename, format=MusicFormat.OGG.value, + frame_rate=44100, channels=2, sample_width=2) + if ZSpotify.DOWNLOAD_QUALITY == AudioQuality.VERY_HIGH: + bitrate = '320k' + else: + bitrate = '160k' + raw_audio.export(filename, format=ZSpotify.get_config(DOWNLOAD_FORMAT), bitrate=bitrate) diff --git a/src/utils.py b/src/utils.py new file mode 100644 index 0000000..c31647a --- /dev/null +++ b/src/utils.py @@ -0,0 +1,181 @@ +import os +import platform +import re +import time +from enum import Enum + +import music_tag +import requests + +from const import SANITIZE, ARTIST, TRACKTITLE, ALBUM, YEAR, DISCNUMBER, TRACKNUMBER, ARTWORK, \ + WINDOWS_SYSTEM + + +class MusicFormat(str, Enum): + MP3 = 'mp3', + OGG = 'ogg', + WAV = 'wav' + + +def create_download_directory(download_path: str) -> None: + os.makedirs(download_path, exist_ok=True) + + +def wait(seconds: int = 3) -> None: + """ Pause for a set number of seconds """ + for second in range(seconds)[::-1]: + print(f'\rWait for {second + 1} second(s)...', end='') + time.sleep(1) + + +def split_input(selection) -> list[str]: + """ Returns a list of inputted strings """ + inputs = [] + if '-' in selection: + for number in range(int(selection.split('-')[0]), int(selection.split('-')[1]) + 1): + inputs.append(number) + else: + selections = selection.split(',') + for i in selections: + inputs.append(i.strip()) + return inputs + + +def splash() -> None: + """ Displays splash screen """ + print(""" +███████ ███████ ██████ ██████ ████████ ██ ███████ ██ ██ + ███ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ + ███ ███████ ██████ ██ ██ ██ ██ █████ ████ + ███ ██ ██ ██ ██ ██ ██ ██ ██ +███████ ███████ ██ ██████ ██ ██ ██ ██ + """) + + +def clear() -> None: + """ Clear the console window """ + if platform.system() == WINDOWS_SYSTEM: + os.system('cls') + else: + os.system('clear') + + +def sanitize_data(value) -> str: + """ Returns given string with problematic removed """ + for pattern in SANITIZE: + value = value.replace(pattern, '') + return value.replace('|', '-') + + +def set_audio_tags(filename, artists, name, album_name, release_year, disc_number, track_number) -> None: + """ sets music_tag metadata """ + tags = music_tag.load_file(filename) + tags[ARTIST] = conv_artist_format(artists) + tags[TRACKTITLE] = name + tags[ALBUM] = album_name + tags[YEAR] = release_year + tags[DISCNUMBER] = disc_number + tags[TRACKNUMBER] = track_number + tags.save() + + +def conv_artist_format(artists) -> str: + """ Returns converted artist format """ + return ', '.join(artists)[:-2] + + +def set_music_thumbnail(filename, image_url) -> None: + """ Downloads cover artwork """ + img = requests.get(image_url).content + tags = music_tag.load_file(filename) + tags[ARTWORK] = img + tags.save() + + +def regex_input_for_urls(search_input) -> tuple[str, str, str, str, str, str]: + """ Since many kinds of search may be passed at the command line, process them all here. """ + track_uri_search = re.search( + r'^spotify:track:(?P[0-9a-zA-Z]{22})$', search_input) + track_url_search = re.search( + r'^(https?://)?open\.spotify\.com/track/(?P[0-9a-zA-Z]{22})(\?si=.+?)?$', + search_input, + ) + + album_uri_search = re.search( + r'^spotify:album:(?P[0-9a-zA-Z]{22})$', search_input) + album_url_search = re.search( + r'^(https?://)?open\.spotify\.com/album/(?P[0-9a-zA-Z]{22})(\?si=.+?)?$', + search_input, + ) + + playlist_uri_search = re.search( + r'^spotify:playlist:(?P[0-9a-zA-Z]{22})$', search_input) + playlist_url_search = re.search( + r'^(https?://)?open\.spotify\.com/playlist/(?P[0-9a-zA-Z]{22})(\?si=.+?)?$', + search_input, + ) + + episode_uri_search = re.search( + r'^spotify:episode:(?P[0-9a-zA-Z]{22})$', search_input) + episode_url_search = re.search( + r'^(https?://)?open\.spotify\.com/episode/(?P[0-9a-zA-Z]{22})(\?si=.+?)?$', + search_input, + ) + + show_uri_search = re.search( + r'^spotify:show:(?P[0-9a-zA-Z]{22})$', search_input) + show_url_search = re.search( + r'^(https?://)?open\.spotify\.com/show/(?P[0-9a-zA-Z]{22})(\?si=.+?)?$', + search_input, + ) + + artist_uri_search = re.search( + r'^spotify:artist:(?P[0-9a-zA-Z]{22})$', search_input) + artist_url_search = re.search( + r'^(https?://)?open\.spotify\.com/artist/(?P[0-9a-zA-Z]{22})(\?si=.+?)?$', + search_input, + ) + + if track_uri_search is not None or track_url_search is not None: + track_id_str = (track_uri_search + if track_uri_search is not None else + track_url_search).group('TrackID') + else: + track_id_str = None + + if album_uri_search is not None or album_url_search is not None: + album_id_str = (album_uri_search + if album_uri_search is not None else + album_url_search).group('AlbumID') + else: + album_id_str = None + + if playlist_uri_search is not None or playlist_url_search is not None: + playlist_id_str = (playlist_uri_search + if playlist_uri_search is not None else + playlist_url_search).group('PlaylistID') + else: + playlist_id_str = None + + if episode_uri_search is not None or episode_url_search is not None: + episode_id_str = (episode_uri_search + if episode_uri_search is not None else + episode_url_search).group('EpisodeID') + else: + episode_id_str = None + + if show_uri_search is not None or show_url_search is not None: + show_id_str = (show_uri_search + if show_uri_search is not None else + show_url_search).group('ShowID') + else: + show_id_str = None + + if artist_uri_search is not None or artist_url_search is not None: + artist_id_str = (artist_uri_search + if artist_uri_search is not None else + artist_url_search).group('ArtistID') + else: + artist_id_str = None + + return track_id_str, album_id_str, playlist_id_str, episode_id_str, show_id_str, artist_id_str diff --git a/src/zspotify.py b/src/zspotify.py new file mode 100644 index 0000000..98a4575 --- /dev/null +++ b/src/zspotify.py @@ -0,0 +1,93 @@ +#! /usr/bin/env python3 + +""" +ZSpotify +It's like youtube-dl, but for Spotify. + +(Made by Deathmonger/Footsiefat - @doomslayer117:matrix.org) +""" +import json +import os +import os.path +from getpass import getpass +from typing import Any + +import requests +from librespot.audio.decoders import VorbisOnlyAudioQuality +from librespot.core import Session + +from const import CREDENTIALS_JSON, TYPE, \ + PREMIUM, USER_READ_EMAIL, AUTHORIZATION, OFFSET, LIMIT, CONFIG_FILE_PATH, FORCE_PREMIUM, RAW_AUDIO_AS_IS +from utils import MusicFormat + + +class ZSpotify: + SESSION: Session = None + DOWNLOAD_QUALITY = None + CONFIG = {} + + def __init__(self): + ZSpotify.load_config() + ZSpotify.check_raw() + ZSpotify.login() + + @classmethod + def login(cls): + """ Authenticates with Spotify and saves credentials to a file """ + + if os.path.isfile(CREDENTIALS_JSON): + try: + cls.SESSION = Session.Builder().stored_file().create() + return + except RuntimeError: + pass + while True: + user_name = input('Username: ') + password = getpass() + try: + cls.SESSION = Session.Builder().user_pass(user_name, password).create() + return + except RuntimeError: + pass + + @classmethod + def load_config(cls) -> None: + with open(CONFIG_FILE_PATH, encoding='utf-8') as config_file: + cls.CONFIG = json.load(config_file) + + @classmethod + def get_config(cls, key) -> Any: + return cls.CONFIG.get(key) + + @classmethod + def check_raw(cls) -> None: + if cls.get_config(RAW_AUDIO_AS_IS): + cls.DOWNLOAD_FORMAT = MusicFormat.WAV + + @classmethod + def get_content_stream(cls, content_id, quality): + return cls.SESSION.content_feeder().load(content_id, VorbisOnlyAudioQuality(quality), False, None) + + @classmethod + def get_auth_header(cls): + return {AUTHORIZATION: f'Bearer {cls.SESSION.tokens().get(USER_READ_EMAIL)}'} + + @classmethod + def get_auth_header_and_params(cls, limit, offset): + return {AUTHORIZATION: f'Bearer {cls.SESSION.tokens().get(USER_READ_EMAIL)}'}, {LIMIT: limit, OFFSET: offset} + + @classmethod + def invoke_url_with_params(cls, url, limit, offset, **kwargs): + headers, params = cls.get_auth_header_and_params(limit=limit, offset=offset) + params.update(kwargs) + return requests.get(url, headers=headers, params=params).json() + + @classmethod + def invoke_url(cls, url): + headers = cls.get_auth_header() + return requests.get(url, headers=headers).json() + + @classmethod + def check_premium(cls) -> bool: + """ If user has spotify premium return true """ + return (cls.SESSION.get_user_attribute(TYPE) == PREMIUM) or cls.get_config(FORCE_PREMIUM) diff --git a/zs_config.json b/zs_config.json index 8c5bd05..7daf8b5 100644 --- a/zs_config.json +++ b/zs_config.json @@ -1,12 +1,12 @@ { - "ROOT_PATH": "ZSpotify Music/", - "ROOT_PODCAST_PATH": "ZSpotify Podcasts/", + "ROOT_PATH": "../ZSpotify Music/", + "ROOT_PODCAST_PATH": "../ZSpotify Podcasts/", "SKIP_EXISTING_FILES": true, - "MUSIC_FORMAT": "mp3", + "DOWNLOAD_FORMAT": "mp3", "RAW_AUDIO_AS_IS": false, "FORCE_PREMIUM": false, "ANTI_BAN_WAIT_TIME": 1, "OVERRIDE_AUTO_WAIT": false, "CHUNK_SIZE": 50000, "SPLIT_ALBUM_DISCS": false -} +} \ No newline at end of file diff --git a/zspotify.py b/zspotify.py deleted file mode 100755 index b2281a7..0000000 --- a/zspotify.py +++ /dev/null @@ -1,761 +0,0 @@ -#! /usr/bin/env python3 - -""" -ZSpotify -It's like youtube-dl, but for Spotify. - -(Made by Deathmonger/Footsiefat - @doomslayer117:matrix.org) -""" - -from getpass import getpass -import json -import os -import os.path -import platform -import re -import sys -import time - -from librespot.audio.decoders import AudioQuality, VorbisOnlyAudioQuality -from librespot.core import Session -from librespot.metadata import TrackId, EpisodeId -import music_tag -from pydub import AudioSegment -import requests -from tqdm import tqdm - -QUALITY = None -SESSION: Session = None -SANITIZE = ["\\", "/", ":", "*", "?", "'", "<", ">", "\""] - -# user-customizable variables that adjust the core functionality of ZSpotify -with open("zs_config.json", encoding="utf-8") as config_file: - ZS_CONFIG = json.load(config_file) - - -# miscellaneous functions for general use - - -def clear(): - """ Clear the console window """ - if platform.system() == "Windows": - os.system("cls") - else: - os.system("clear") - - -def wait(seconds: int = 3): - """ Pause for a set number of seconds """ - for i in range(seconds)[::-1]: - print(f"\rWait for {i + 1} second(s)...", end="") - time.sleep(1) - - -def sanitize_data(value): - """ Returns given string with problematic removed """ - for i in SANITIZE: - value = value.replace(i, "") - return value.replace("|", "-") - - -def split_input(selection): - """ Returns a list of inputted strings """ - inputs = [] - if "-" in selection: - for number in range(int(selection.split("-")[0]), int(selection.split("-")[1]) + 1): - inputs.append(number) - else: - selections = selection.split(",") - for i in selections: - inputs.append(i.strip()) - return inputs - - -def splash(): - """ Displays splash screen """ - print(""" -███████ ███████ ██████ ██████ ████████ ██ ███████ ██ ██ - ███ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ - ███ ███████ ██████ ██ ██ ██ ██ █████ ████ - ███ ██ ██ ██ ██ ██ ██ ██ ██ -███████ ███████ ██ ██████ ██ ██ ██ ██ - """) - - -# two mains functions for logging in and doing client stuff -def login(): - """ Authenticates with Spotify and saves credentials to a file """ - global SESSION # pylint: disable=global-statement - - if os.path.isfile("credentials.json"): - try: - SESSION = Session.Builder().stored_file().create() - return - except RuntimeError: - pass - while True: - user_name = input("Username: ") - password = getpass() - try: - SESSION = Session.Builder().user_pass(user_name, password).create() - return - except RuntimeError: - pass - - -def client(): # pylint: disable=too-many-branches,too-many-statements - """ Connects to spotify to perform query's and get songs to download """ - global QUALITY # pylint: disable=global-statement - splash() - - token = SESSION.tokens().get("user-read-email") - - if check_premium(): - print("[ DETECTED PREMIUM ACCOUNT - USING VERY_HIGH QUALITY ]\n\n") - QUALITY = AudioQuality.VERY_HIGH - else: - print("[ DETECTED FREE ACCOUNT - USING HIGH QUALITY ]\n\n") - QUALITY = AudioQuality.HIGH - - while True: - if len(sys.argv) > 1: - if sys.argv[1] == "-p" or sys.argv[1] == "--playlist": - download_from_user_playlist() - elif sys.argv[1] == "-ls" or sys.argv[1] == "--liked-songs": - for song in get_saved_tracks(token): - if not song["track"]["name"]: - print( - "### SKIPPING: SONG DOES NOT EXISTS ON SPOTIFY ANYMORE ###") - else: - download_track(song["track"]["id"], "Liked Songs/") - print("\n") - else: - track_id_str, album_id_str, playlist_id_str, episode_id_str, show_id_str, artist_id_str = regex_input_for_urls( - sys.argv[1]) - - if track_id_str is not None: - download_track(track_id_str) - elif artist_id_str is not None: - download_artist_albums(artist_id_str) - elif album_id_str is not None: - download_album(album_id_str) - elif playlist_id_str is not None: - playlist_songs = get_playlist_songs(token, playlist_id_str) - name, _ = get_playlist_info(token, playlist_id_str) - for song in playlist_songs: - download_track(song["track"]["id"], - sanitize_data(name) + "/") - print("\n") - elif episode_id_str is not None: - download_episode(episode_id_str) - elif show_id_str is not None: - for episode in get_show_episodes(token, show_id_str): - download_episode(episode) - else: - search_text = input("Enter search or URL: ") - - track_id_str, album_id_str, playlist_id_str, episode_id_str, show_id_str, artist_id_str = regex_input_for_urls( - search_text) - - if track_id_str is not None: - download_track(track_id_str) - elif artist_id_str is not None: - download_artist_albums(artist_id_str) - elif album_id_str is not None: - download_album(album_id_str) - elif playlist_id_str is not None: - playlist_songs = get_playlist_songs(token, playlist_id_str) - name, _ = get_playlist_info(token, playlist_id_str) - for song in playlist_songs: - download_track(song["track"]["id"], - sanitize_data(name) + "/") - print("\n") - elif episode_id_str is not None: - download_episode(episode_id_str) - elif show_id_str is not None: - for episode in get_show_episodes(token, show_id_str): - download_episode(episode) - else: - search(search_text) - # wait() - - -def regex_input_for_urls(search_input): # pylint: disable=too-many-locals - """ Since many kinds of search may be passed at the command line, process them all here. """ - track_uri_search = re.search( - r"^spotify:track:(?P[0-9a-zA-Z]{22})$", search_input) - track_url_search = re.search( - r"^(https?://)?open\.spotify\.com/track/(?P[0-9a-zA-Z]{22})(\?si=.+?)?$", - search_input, - ) - - album_uri_search = re.search( - r"^spotify:album:(?P[0-9a-zA-Z]{22})$", search_input) - album_url_search = re.search( - r"^(https?://)?open\.spotify\.com/album/(?P[0-9a-zA-Z]{22})(\?si=.+?)?$", - search_input, - ) - - playlist_uri_search = re.search( - r"^spotify:playlist:(?P[0-9a-zA-Z]{22})$", search_input) - playlist_url_search = re.search( - r"^(https?://)?open\.spotify\.com/playlist/(?P[0-9a-zA-Z]{22})(\?si=.+?)?$", - search_input, - ) - - episode_uri_search = re.search( - r"^spotify:episode:(?P[0-9a-zA-Z]{22})$", search_input) - episode_url_search = re.search( - r"^(https?://)?open\.spotify\.com/episode/(?P[0-9a-zA-Z]{22})(\?si=.+?)?$", - search_input, - ) - - show_uri_search = re.search( - r"^spotify:show:(?P[0-9a-zA-Z]{22})$", search_input) - show_url_search = re.search( - r"^(https?://)?open\.spotify\.com/show/(?P[0-9a-zA-Z]{22})(\?si=.+?)?$", - search_input, - ) - - artist_uri_search = re.search( - r"^spotify:artist:(?P[0-9a-zA-Z]{22})$", search_input) - artist_url_search = re.search( - r"^(https?://)?open\.spotify\.com/artist/(?P[0-9a-zA-Z]{22})(\?si=.+?)?$", - search_input, - ) - - if track_uri_search is not None or track_url_search is not None: - track_id_str = (track_uri_search - if track_uri_search is not None else - track_url_search).group("TrackID") - else: - track_id_str = None - - if album_uri_search is not None or album_url_search is not None: - album_id_str = (album_uri_search - if album_uri_search is not None else - album_url_search).group("AlbumID") - else: - album_id_str = None - - if playlist_uri_search is not None or playlist_url_search is not None: - playlist_id_str = (playlist_uri_search - if playlist_uri_search is not None else - playlist_url_search).group("PlaylistID") - else: - playlist_id_str = None - - if episode_uri_search is not None or episode_url_search is not None: - episode_id_str = (episode_uri_search - if episode_uri_search is not None else - episode_url_search).group("EpisodeID") - else: - episode_id_str = None - - if show_uri_search is not None or show_url_search is not None: - show_id_str = (show_uri_search - if show_uri_search is not None else - show_url_search).group("ShowID") - else: - show_id_str = None - - if artist_uri_search is not None or artist_url_search is not None: - artist_id_str = (artist_uri_search - if artist_uri_search is not None else - artist_url_search).group("ArtistID") - else: - artist_id_str = None - - return track_id_str, album_id_str, playlist_id_str, episode_id_str, show_id_str, artist_id_str - - -def get_episode_info(episode_id_str): # pylint: disable=missing-function-docstring - token = SESSION.tokens().get("user-read-email") - info = json.loads(requests.get(f"https://api.spotify.com/v1/episodes/{episode_id_str}", - headers={"Authorization": f"Bearer {token}"}).text) - - if "error" in info: - return None, None - # print(info["images"][0]["url"]) - return sanitize_data(info["show"]["name"]), sanitize_data(info["name"]) - - -def get_show_episodes(access_token, show_id_str): # pylint: disable=missing-function-docstring - episodes = [] - - headers = {"Authorization": f"Bearer {access_token}"} - resp = requests.get( - f"https://api.spotify.com/v1/shows/{show_id_str}/episodes", headers=headers).json() - - for episode in resp["items"]: - episodes.append(episode["id"]) - - return episodes - - -def download_episode(episode_id_str): # pylint: disable=missing-function-docstring - podcast_name, episode_name = get_episode_info(episode_id_str) - - extra_paths = podcast_name + "/" - - if podcast_name is None: - print("### SKIPPING: (EPISODE NOT FOUND) ###") - else: - filename = podcast_name + " - " + episode_name - - episode_id = EpisodeId.from_base62(episode_id_str) - stream = SESSION.content_feeder().load( - episode_id, VorbisOnlyAudioQuality(QUALITY), False, None) - # print("### DOWNLOADING '" + podcast_name + " - " + - # episode_name + "' - THIS MAY TAKE A WHILE ###") - - os.makedirs(ZS_CONFIG["ROOT_PODCAST_PATH"] + - extra_paths, exist_ok=True) - - total_size = stream.input_stream.size - with open(ZS_CONFIG["ROOT_PODCAST_PATH"] + extra_paths + filename + ".wav", "wb") as file, tqdm( - desc=filename, - total=total_size, - unit="B", - unit_scale=True, - unit_divisor=1024 - ) as p_bar: - for _ in range(int(total_size / ZS_CONFIG["CHUNK_SIZE"]) + 1): - p_bar.update(file.write( - stream.input_stream.stream().read(ZS_CONFIG["CHUNK_SIZE"]))) - - # convert_audio_format(ZS_CONFIG["ROOT_PODCAST_PATH"] + - # extra_paths + filename + ".wav") - - # related functions that do stuff with the spotify API - - -def search(search_term): # pylint: disable=too-many-locals,too-many-branches - """ Searches Spotify's API for relevant data """ - token = SESSION.tokens().get("user-read-email") - - resp = requests.get( - "https://api.spotify.com/v1/search", - { - "limit": "10", - "offset": "0", - "q": search_term, - "type": "track,album,artist,playlist" - }, - headers={"Authorization": f"Bearer {token}"}, - ) - - # print(resp.json()) - - i = 1 - tracks = resp.json()["tracks"]["items"] - if len(tracks) > 0: - print("### TRACKS ###") - for track in tracks: - if track["explicit"]: - explicit = "[E]" - else: - explicit = "" - print(f"{i}, {track['name']} {explicit} | {','.join([artist['name'] for artist in track['artists']])}") - i += 1 - total_tracks = i - 1 - print("\n") - else: - total_tracks = 0 - - albums = resp.json()["albums"]["items"] - if len(albums) > 0: - print("### ALBUMS ###") - for album in albums: - print(f"{i}, {album['name']} | {','.join([artist['name'] for artist in album['artists']])}") - i += 1 - total_albums = i - total_tracks - 1 - print("\n") - else: - total_albums = 0 - - artists = resp.json()["artists"]["items"] - if len(artists) > 0: - print("### ARTISTS ###") - for artist in artists: - print("%d, %s" % ( - i, - artist["name"], - )) - i += 1 - total_artists = i - total_tracks - total_albums - 1 - print("\n") - else: - total_artists = 0 - - playlists = resp.json()["playlists"]["items"] - print("### PLAYLISTS ###") - for playlist in playlists: - print(f"{i}, {playlist['name']} | {playlist['owner']['display_name']}") - i += 1 - print("\n") - - if len(tracks) + len(albums) + len(playlists) == 0: - print("NO RESULTS FOUND - EXITING...") - else: - selection = str(input("SELECT ITEM(S) BY ID: ")) - inputs = split_input(selection) - for pos in inputs: - position = int(pos) - if position <= total_tracks: - track_id = tracks[position - 1]["id"] - download_track(track_id) - elif position <= total_albums + total_tracks: - download_album(albums[position - total_tracks - 1]["id"]) - elif position <= total_artists + total_tracks + total_albums: - download_artist_albums(artists[position - total_tracks - total_albums -1]["id"]) - else: - playlist_choice = playlists[position - - total_tracks - total_albums - total_artists - 1] - playlist_songs = get_playlist_songs( - token, playlist_choice["id"]) - for song in playlist_songs: - if song["track"]["id"] is not None: - download_track(song["track"]["id"], sanitize_data( - playlist_choice["name"].strip()) + "/") - print("\n") - - -def get_song_info(song_id): - """ Retrieves metadata for downloaded songs """ - token = SESSION.tokens().get("user-read-email") - - info = json.loads(requests.get("https://api.spotify.com/v1/tracks?ids=" + song_id + - "&market=from_token", headers={"Authorization": f"Bearer {token}"}).text) - - artists = [] - for data in info["tracks"][0]["artists"]: - artists.append(sanitize_data(data["name"])) - album_name = sanitize_data(info["tracks"][0]["album"]["name"]) - name = sanitize_data(info["tracks"][0]["name"]) - image_url = info["tracks"][0]["album"]["images"][0]["url"] - release_year = info["tracks"][0]["album"]["release_date"].split("-")[0] - disc_number = info["tracks"][0]["disc_number"] - track_number = info["tracks"][0]["track_number"] - scraped_song_id = info["tracks"][0]["id"] - is_playable = info["tracks"][0]["is_playable"] - - return artists, album_name, name, image_url, release_year, disc_number, track_number, scraped_song_id, is_playable - - -def check_premium(): - """ If user has spotify premium return true """ - return bool((SESSION.get_user_attribute("type") == "premium") or ZS_CONFIG["FORCE_PREMIUM"]) - - -# Functions directly related to modifying the downloaded audio and its metadata -def convert_audio_format(filename): - """ Converts raw audio into playable mp3 or ogg vorbis """ - # print("### CONVERTING TO " + ZS_CONFIG["MUSIC_FORMAT"].upper() + " ###") - raw_audio = AudioSegment.from_file(filename, format="ogg", - frame_rate=44100, channels=2, sample_width=2) - if QUALITY == AudioQuality.VERY_HIGH: - bitrate = "320k" - else: - bitrate = "160k" - raw_audio.export( - filename, format=ZS_CONFIG["MUSIC_FORMAT"], bitrate=bitrate) - - -def set_audio_tags(filename, artists, name, album_name, release_year, disc_number, track_number): # pylint: disable=too-many-arguments - """ sets music_tag metadata """ - # print("### SETTING MUSIC TAGS ###") - tags = music_tag.load_file(filename) - tags["artist"] = conv_artist_format(artists) - tags["tracktitle"] = name - tags["album"] = album_name - tags["year"] = release_year - tags["discnumber"] = disc_number - tags["tracknumber"] = track_number - tags.save() - - -def set_music_thumbnail(filename, image_url): - """ Downloads cover artwork """ - # print("### SETTING THUMBNAIL ###") - img = requests.get(image_url).content - tags = music_tag.load_file(filename) - tags["artwork"] = img - tags.save() - - -def conv_artist_format(artists): - """ Returns converted artist format """ - formatted = "" - for artist in artists: - formatted += artist + ", " - return formatted[:-2] - - -# Extra functions directly related to spotify playlists -def get_all_playlists(access_token): - """ Returns list of users playlists """ - playlists = [] - limit = 50 - offset = 0 - - while True: - headers = {"Authorization": f"Bearer {access_token}"} - params = {"limit": limit, "offset": offset} - resp = requests.get("https://api.spotify.com/v1/me/playlists", - headers=headers, params=params).json() - offset += limit - playlists.extend(resp["items"]) - - if len(resp["items"]) < limit: - break - - return playlists - - -def get_playlist_songs(access_token, playlist_id): - """ returns list of songs in a playlist """ - songs = [] - offset = 0 - limit = 100 - - while True: - headers = {"Authorization": f"Bearer {access_token}"} - params = {"limit": limit, "offset": offset} - resp = requests.get( - f"https://api.spotify.com/v1/playlists/{playlist_id}/tracks", headers=headers, params=params).json() - offset += limit - songs.extend(resp["items"]) - - if len(resp["items"]) < limit: - break - - return songs - - -def get_playlist_info(access_token, playlist_id): - """ Returns information scraped from playlist """ - headers = {"Authorization": f"Bearer {access_token}"} - resp = requests.get( - f"https://api.spotify.com/v1/playlists/{playlist_id}?fields=name,owner(display_name)&market=from_token", - headers=headers).json() - return resp["name"].strip(), resp["owner"]["display_name"].strip() - - -# Extra functions directly related to spotify albums -def get_album_tracks(access_token, album_id): - """ Returns album tracklist """ - songs = [] - offset = 0 - limit = 50 - - while True: - headers = {"Authorization": f"Bearer {access_token}"} - params = {"limit": limit, "offset": offset} - resp = requests.get( - f"https://api.spotify.com/v1/albums/{album_id}/tracks", headers=headers, params=params).json() - offset += limit - songs.extend(resp["items"]) - - if len(resp["items"]) < limit: - break - - return songs - - -def get_album_name(access_token, album_id): - """ Returns album name """ - headers = {"Authorization": f"Bearer {access_token}"} - resp = requests.get( - f"https://api.spotify.com/v1/albums/{album_id}", headers=headers).json() - return resp["artists"][0]["name"], sanitize_data(resp["name"]) - -# Extra functions directly related to spotify artists - - -def get_artist_albums(access_token, artist_id): - """ Returns artist's albums """ - headers = {"Authorization": f"Bearer {access_token}"} - resp = requests.get( - f"https://api.spotify.com/v1/artists/{artist_id}/albums", headers=headers).json() - # Return a list each album's id - return [resp["items"][i]["id"] for i in range(len(resp["items"]))] - -# Extra functions directly related to our saved tracks - - -def get_saved_tracks(access_token): - """ Returns user's saved tracks """ - songs = [] - offset = 0 - limit = 50 - - while True: - headers = {"Authorization": f"Bearer {access_token}"} - params = {"limit": limit, "offset": offset} - resp = requests.get("https://api.spotify.com/v1/me/tracks", - headers=headers, params=params).json() - offset += limit - songs.extend(resp["items"]) - - if len(resp["items"]) < limit: - break - - return songs - - -# Functions directly related to downloading stuff -def download_track(track_id_str: str, extra_paths="", prefix=False, prefix_value="", disable_progressbar=False): # pylint: disable=too-many-locals,too-many-branches - """ Downloads raw song audio from Spotify """ - try: - artists, album_name, name, image_url, release_year, disc_number, track_number, scraped_song_id, is_playable = get_song_info( - track_id_str) - - song_name = artists[0] + " - " + name - if prefix: - song_name = f"{prefix_value.zfill(2)} - {song_name}" if prefix_value.isdigit( - ) else f"{prefix_value} - {song_name}" - - if ZS_CONFIG["SPLIT_ALBUM_DISCS"]: - filename = os.path.join(ZS_CONFIG["ROOT_PATH"], extra_paths, "Disc " + str( - disc_number) + "/" + song_name + "." + ZS_CONFIG["MUSIC_FORMAT"]) - else: - filename = os.path.join(ZS_CONFIG["ROOT_PATH"], extra_paths, - song_name + "." + ZS_CONFIG["MUSIC_FORMAT"]) - except Exception as err: # pylint: disable=broad-except,unused-variable - print("### SKIPPING SONG - FAILED TO QUERY METADATA ###") - # print(err) - else: - try: - if not is_playable: - print("### SKIPPING:", song_name, - "(SONG IS UNAVAILABLE) ###") - else: - if os.path.isfile(filename) and os.path.getsize(filename) and ZS_CONFIG["SKIP_EXISTING_FILES"]: - print("### SKIPPING:", song_name, - "(SONG ALREADY EXISTS) ###") - else: - if track_id_str != scraped_song_id: - track_id_str = scraped_song_id - - track_id = TrackId.from_base62(track_id_str) - # print("### FOUND SONG:", song_name, " ###") - - stream = SESSION.content_feeder().load( - track_id, VorbisOnlyAudioQuality(QUALITY), False, None) - # print("### DOWNLOADING RAW AUDIO ###") - - if ZS_CONFIG["SPLIT_ALBUM_DISCS"]: - os.makedirs( - ZS_CONFIG["ROOT_PATH"] + extra_paths + "/Disc " + str(disc_number) + "/", exist_ok=True) - else: - os.makedirs(ZS_CONFIG["ROOT_PATH"] + - extra_paths, exist_ok=True) - - total_size = stream.input_stream.size - with open(filename, "wb") as file, tqdm( - desc=song_name, - total=total_size, - unit="B", - unit_scale=True, - unit_divisor=1024, - disable=disable_progressbar - ) as p_bar: - for _ in range(int(total_size / ZS_CONFIG["CHUNK_SIZE"]) + 1): - p_bar.update(file.write( - stream.input_stream.stream().read(ZS_CONFIG["CHUNK_SIZE"]))) - - if not ZS_CONFIG["RAW_AUDIO_AS_IS"]: - convert_audio_format(filename) - set_audio_tags(filename, artists, name, album_name, - release_year, disc_number, track_number) - set_music_thumbnail(filename, image_url) - - if not ZS_CONFIG["OVERRIDE_AUTO_WAIT"]: - time.sleep(ZS_CONFIG["ANTI_BAN_WAIT_TIME"]) - except Exception: # pylint: disable=broad-except - print("### SKIPPING:", song_name, - "(GENERAL DOWNLOAD ERROR) ###") - if os.path.exists(filename): - os.remove(filename) - - -def download_album(album): - """ Downloads songs from an album """ - token = SESSION.tokens().get("user-read-email") - artist, album_name = get_album_name(token, album) - tracks = get_album_tracks(token, album) - for num, track in tqdm(enumerate(tracks, start=1), unit_scale=True, unit="Song", total=len(tracks)): - download_track(track["id"], f"{artist}/{album_name}", - prefix=True, prefix_value=str(num), disable_progressbar=True) - - -def download_artist_albums(artist): - """ Downloads albums of an artist """ - token = SESSION.tokens().get("user-read-email") - albums = get_artist_albums(token, artist) - for album_id in albums: - download_album(album_id) - - -def download_playlist(playlists, playlist_choice): - """Downloads all the songs from a playlist""" - token = SESSION.tokens().get("user-read-email") - - playlist_songs = get_playlist_songs( - token, playlists[int(playlist_choice) - 1]["id"]) - - for song in playlist_songs: - if song["track"]["id"] is not None: - download_track(song["track"]["id"], sanitize_data( - playlists[int(playlist_choice) - 1]["name"].strip()) + "/") - print("\n") - - -def download_from_user_playlist(): - """ Select which playlist(s) to download """ - token = SESSION.tokens().get("user-read-email") - playlists = get_all_playlists(token) - - count = 1 - for playlist in playlists: - print(str(count) + ": " + playlist["name"].strip()) - count += 1 - - print("\n> SELECT A PLAYLIST BY ID") - print("> SELECT A RANGE BY ADDING A DASH BETWEEN BOTH ID's") - print("> For example, typing 10 to get one playlist or 10-20 to get\nevery playlist from 10-20 (inclusive)\n") - - playlist_choices = input("ID(s): ").split("-") - - if len(playlist_choices) == 1: - download_playlist(playlists, playlist_choices[0]) - else: - start = int(playlist_choices[0]) - end = int(playlist_choices[1]) + 1 - - print(f"Downloading from {start} to {end}...") - - for playlist in range(start, end): - download_playlist(playlists, playlist) - - print("\n**All playlists have been downloaded**\n") - - -# Core functions here - - -def check_raw(): # pylint: disable=missing-function-docstring - if ZS_CONFIG["RAW_AUDIO_AS_IS"]: - ZS_CONFIG["MUSIC_FORMAT"] = "wav" - - -def main(): - """ Main function """ - check_raw() - login() - client() - - -if __name__ == "__main__": - main()