diff --git a/.github/workflows/pylint.yml b/.github/workflows/pylint.yml new file mode 100644 index 0000000..0805af7 --- /dev/null +++ b/.github/workflows/pylint.yml @@ -0,0 +1,22 @@ +name: Pylint + +on: [push] + +jobs: + build: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + - name: Set up Python 3.9 + uses: actions/setup-python@v2 + with: + python-version: 3.9 + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install pylint + - name: Analysing the code with pylint + run: | + pylint `ls -R|grep .py$|xargs` diff --git a/.gitignore b/.gitignore index 6f59376..ea24040 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ # Byte-compiled / optimized / DLL files __pycache__/ +src/__pycache__/ *.py[cod] *$py.class @@ -142,10 +143,11 @@ cython_debug/ # Spotify Credentials credentials.json +src/credentials.json #Download Folder ZSpotify\ Music/ ZSpotify\ Podcasts/ # Intellij -.idea \ No newline at end of file +.idea diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..1ee5eef --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,40 @@ +# Introduction + +### Thank you for contributing + +Without people like you this project wouldn't be anywhere near as polished and feature-rich as it is now. + +### Guidelines + +Following these guidelines helps show that you respect the the time and effort spent by the developers and your fellow contributors making this project. + +### What we are looking for + +ZSpotify is a community-driven project. There are many different ways to contribute. From providing tutorials and examples to help new users, reporting bugs, requesting new features, writing new code that can be added to the project, or even writing documentation. + +### What we aren't looking for + +Please don't use the issues section to request help installing or setting up the project. It should be reserved for bugs when running the code, and feature requqests. Instead use the support channel in either our Discord or Matrix server. + +# Ground rules + +### Expectations +* Ensure all code is linted with pylint before pushing. +* Ensure all code passes the [testing criteria](#testing-criteria). +* If you're planning on contributing a new feature, join the Discord or Matrix and discuss it with the Dev Team. +* Please don't commit multiple new features at once. +* Follow the [Python Community Code of Conduct](https://www.python.org/psf/codeofconduct/) + +# Your first contribution + +Unsure where to start? Have a look for any issues tagged "good first issue". They should be minor bugs that only require a few lines to fix. +Here are a couple of friendly tutorials on making pull requests: http://makeapullrequest.com/ and http://www.firsttimersonly.com/ + +# Code review process + +The dev team looks at Pull Requests around once per day. After feedback has been given we expect responses within one week. After a week we may close the pull request if it isn't showing any activity. +> ZSpotify updates very frequently, often multiple times per day. If a maintainer asks you to "rebase" your PR, they're saying that a lot of code has changed, and that you need to update your branch so it's easier to merge. + +# Community + +Come and chat with us on Discord or Matrix. Devs try to respond to mentions at least once per day. \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..ceb88cf --- /dev/null +++ b/Dockerfile @@ -0,0 +1,19 @@ +FROM python:3.9-alpine as base + +RUN apk --update add git ffmpeg + +FROM base as builder +RUN mkdir /install +WORKDIR /install +COPY requirements.txt /requirements.txt +RUN apk add gcc libc-dev zlib zlib-dev jpeg-dev \ + && pip install --prefix="/install" -r /requirements.txt + + +FROM base + +COPY --from=builder /install /usr/local +COPY src /app +COPY zs_config.json / +WORKDIR /app +ENTRYPOINT ["/usr/local/bin/python", "app.py"] diff --git a/README.md b/README.md index 0363279..7b8f1b0 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ Requirements: Binaries -- Python 3.8 or greater +- Python 3.9 or greater - ffmpeg* - Git** @@ -29,9 +29,9 @@ Python packages: \*\*Git can be installed via apt for Debian-based distros or by downloading the binaries from [git-scm.com](https://git-scm.com/download/win) for Windows. ``` Command line usage: - python zspotify.py Loads search prompt to find then download a specific track, album or playlist - python zspotify.py Downloads the track, album, playlist or podcast episode specified as a command line argument - python zspotify.py Downloads all albums by specified artist + python app.py Loads search prompt to find then download a specific track, album or playlist + python app.py Downloads the track, album, playlist or podcast episode specified as a command line argument + python app.py Downloads all albums by specified artist Extra command line options: -p, --playlist Downloads a saved playlist from your account @@ -43,8 +43,7 @@ Options that can be configured in zs_config.json: SKIP_EXISTING_FILES Set this to false if you want ZSpotify to overwrite files with the same name rather than skipping the song - MUSIC_FORMAT Set this to "ogg" if you would rather that format audio over "mp3" - RAW_AUDIO_AS_IS Set this to true to only stream the audio to a file and do no re-encoding or post processing + MUSIC_FORMAT Can be "mp3" or "ogg", mp3 is required for track metadata however ogg is slightly higer quality as it is not trsnacoded. FORCE_PREMIUM Set this to true if ZSpotify isn't automatically detecting that you are using a premium account @@ -56,10 +55,24 @@ Currently no user has reported their account getting banned after using ZSpotify This isn't to say _you_ won't get banned as it is technically against Spotify's TOS. **Use ZSpotify at your own risk**, the developers of ZSpotify are not responsible if your account gets banned. +### What do I do if I see "Your session has been terminated"? +If you see this, don't worry! Just try logging back in. If you see the incorrect username or password error, reset your password and you should be able to log back in and continue using Spotify. + ### Contributing -Please be sure to lint your code with pylint before issuing a pull-request, thanks! +Please refer to CONTRIBUTING.md ## **Changelog:** +**v2.2 (24 Oct 2021):** +- Added basic support for downloading an entire podcast series. +- Split code into multiple files for easier maintenance. +- Changed initial launch script to app.py +- Simplified audio formats. +- Added prebuild exe for Windows users. +- Added Docker file. +- Added CONTRIBUTING.md. +- Fixed artist names getting cutoff in metadata. +- Removed data sanitization of metadata tags. + **v2.1 (23 Oct 2021):** - Moved configuration from hard-coded values to separate zs_config.json file. - Add subfolders for each disc. diff --git a/requirements.txt b/requirements.txt index 5afa986..4f4b877 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ music_tag pydub Pillow tqdm +tabulate \ No newline at end of file diff --git a/src/album.py b/src/album.py new file mode 100644 index 0000000..12f86eb --- /dev/null +++ b/src/album.py @@ -0,0 +1,54 @@ +from tqdm import tqdm + +from const import ITEMS, ARTISTS, NAME, ID +from track import download_track +from utils import sanitize_data +from zspotify import ZSpotify + +ALBUM_URL = 'https://api.spotify.com/v1/albums' +ARTIST_URL = 'https://api.spotify.com/v1/artists' + + +def get_album_tracks(album_id): + """ Returns album tracklist """ + songs = [] + offset = 0 + limit = 50 + + while True: + resp = ZSpotify.invoke_url_with_params(f'{ALBUM_URL}/{album_id}/tracks', limit=limit, offset=offset) + offset += limit + songs.extend(resp[ITEMS]) + if len(resp[ITEMS]) < limit: + break + + return songs + + +def get_album_name(album_id): + """ Returns album name """ + resp = ZSpotify.invoke_url(f'{ALBUM_URL}/{album_id}') + return resp[ARTISTS][0][NAME], sanitize_data(resp[NAME]) + + +def get_artist_albums(artist_id): + """ Returns artist's albums """ + resp = ZSpotify.invoke_url(f'{ARTIST_URL}/{artist_id}/albums') + # Return a list each album's id + return [resp[ITEMS][i][ID] for i in range(len(resp[ITEMS]))] + + +def download_album(album): + """ Downloads songs from an album """ + artist, album_name = get_album_name(album) + tracks = get_album_tracks(album) + for n, track in tqdm(enumerate(tracks, start=1), unit_scale=True, unit='Song', total=len(tracks)): + download_track(track[ID], f'{artist}/{album_name}', + prefix=True, prefix_value=str(n), disable_progressbar=True) + + +def download_artist_albums(artist): + """ Downloads albums of an artist """ + albums = get_artist_albums(artist) + for album_id in albums: + download_album(album_id) diff --git a/src/app.py b/src/app.py new file mode 100644 index 0000000..abd8b87 --- /dev/null +++ b/src/app.py @@ -0,0 +1,172 @@ +import sys + +from librespot.audio.decoders import AudioQuality +from tabulate import tabulate + +from album import download_album, download_artist_albums +from const import TRACK, NAME, ID, ARTISTS, ITEMS, TRACKS, EXPLICIT, ALBUMS, OWNER, \ + PLAYLISTS, DISPLAY_NAME +from playlist import get_playlist_songs, get_playlist_info, download_from_user_playlist, download_playlist +from podcast import download_episode, get_show_episodes +from track import download_track, get_saved_tracks +from utils import sanitize_data, splash, split_input, regex_input_for_urls +from zspotify import ZSpotify + +SEARCH_URL = 'https://api.spotify.com/v1/search' + + +def client() -> None: + """ Connects to spotify to perform query's and get songs to download """ + ZSpotify() + splash() + + if ZSpotify.check_premium(): + print('[ DETECTED PREMIUM ACCOUNT - USING VERY_HIGH QUALITY ]\n\n') + ZSpotify.DOWNLOAD_QUALITY = AudioQuality.VERY_HIGH + else: + print('[ DETECTED FREE ACCOUNT - USING HIGH QUALITY ]\n\n') + ZSpotify.DOWNLOAD_QUALITY = AudioQuality.HIGH + + while True: + if len(sys.argv) > 1: + if sys.argv[1] == '-p' or sys.argv[1] == '--playlist': + download_from_user_playlist() + elif sys.argv[1] == '-ls' or sys.argv[1] == '--liked-songs': + for song in get_saved_tracks(): + if not song[TRACK][NAME]: + print('### SKIPPING: SONG DOES NOT EXISTS ON SPOTIFY ANYMORE ###') + else: + download_track(song[TRACK][ID], 'Liked Songs/') + print('\n') + else: + track_id, album_id, playlist_id, episode_id, show_id, artist_id = regex_input_for_urls(sys.argv[1]) + + if track_id is not None: + download_track(track_id) + elif artist_id is not None: + download_artist_albums(artist_id) + elif album_id is not None: + download_album(album_id) + elif playlist_id is not None: + playlist_songs = get_playlist_songs(playlist_id) + name, _ = get_playlist_info(playlist_id) + for song in playlist_songs: + download_track(song[TRACK][ID], + sanitize_data(name) + '/') + print('\n') + elif episode_id is not None: + download_episode(episode_id) + elif show_id is not None: + for episode in get_show_episodes(show_id): + download_episode(episode) + + else: + search_text = '' + while len(search_text) == 0: + search_text = input('Enter search or URL: ') + + track_id, album_id, playlist_id, episode_id, show_id, artist_id = regex_input_for_urls(search_text) + + if track_id is not None: + download_track(track_id) + elif artist_id is not None: + download_artist_albums(artist_id) + elif album_id is not None: + download_album(album_id) + elif playlist_id is not None: + playlist_songs = get_playlist_songs(playlist_id) + name, _ = get_playlist_info(playlist_id) + for song in playlist_songs: + download_track(song[TRACK][ID], sanitize_data(name) + '/') + print('\n') + elif episode_id is not None: + download_episode(episode_id) + elif show_id is not None: + for episode in get_show_episodes(show_id): + download_episode(episode) + else: + search(search_text) + # wait() + + +def search(search_term): + """ Searches Spotify's API for relevant data """ + params = {'limit': '10', 'offset': '0', 'q': search_term, 'type': 'track,album,artist,playlist'} + resp = ZSpotify.invoke_url_with_params(SEARCH_URL, **params) + + counter = 1 + tracks = resp[TRACKS][ITEMS] + if len(tracks) > 0: + print('### TRACKS ###') + track_data = [] + for track in tracks: + if track[EXPLICIT]: + explicit = '[E]' + else: + explicit = '' + track_data.append([counter, f'{track[NAME]} {explicit}', + ','.join([artist[NAME] for artist in track[ARTISTS]])]) + counter += 1 + total_tracks = counter - 1 + print(tabulate(track_data, headers=['S.NO', 'Name', 'Artists'], tablefmt='pretty')) + print('\n') + else: + total_tracks = 0 + + albums = resp[ALBUMS][ITEMS] + if len(albums) > 0: + print('### ALBUMS ###') + album_data = [] + for album in albums: + album_data.append([counter, album[NAME], ','.join([artist[NAME] for artist in album[ARTISTS]])]) + counter += 1 + total_albums = counter - total_tracks - 1 + print(tabulate(album_data, headers=['S.NO', 'Album', 'Artists'], tablefmt='pretty')) + print('\n') + else: + total_albums = 0 + + artists = resp[ARTISTS][ITEMS] + if len(artists) > 0: + print('### ARTISTS ###') + artist_data = [] + for artist in artists: + artist_data.append([counter, artist[NAME]]) + counter += 1 + total_artists = counter - total_tracks - total_albums - 1 + print(tabulate(artist_data, headers=['S.NO', 'Name'], tablefmt='pretty')) + print('\n') + else: + total_artists = 0 + + playlists = resp[PLAYLISTS][ITEMS] + print('### PLAYLISTS ###') + playlist_data = [] + for playlist in playlists: + playlist_data.append([counter, playlist[NAME], playlist[OWNER][DISPLAY_NAME]]) + counter += 1 + print(tabulate(playlist_data, headers=['S.NO', 'Name', 'Owner'], tablefmt='pretty')) + print('\n') + + if len(tracks) + len(albums) + len(playlists) == 0: + print('NO RESULTS FOUND - EXITING...') + else: + selection = '' + while len(selection) == 0: + selection = str(input('SELECT ITEM(S) BY S.NO: ')) + inputs = split_input(selection) + for pos in inputs: + position = int(pos) + if position <= total_tracks: + track_id = tracks[position - 1][ID] + download_track(track_id) + elif position <= total_albums + total_tracks: + download_album(albums[position - total_tracks - 1][ID]) + elif position <= total_artists + total_tracks + total_albums: + download_artist_albums(artists[position - total_tracks - total_albums - 1][ID]) + else: + download_playlist(playlists, position - total_tracks - total_albums - total_artists) + + +if __name__ == '__main__': + client() diff --git a/src/const.py b/src/const.py new file mode 100644 index 0000000..0c9a854 --- /dev/null +++ b/src/const.py @@ -0,0 +1,95 @@ +SANITIZE = ('\\', '/', ':', '*', '?', '\'', '<', '>', '"') + +SAVED_TRACKS_URL = 'https://api.spotify.com/v1/me/tracks' + +TRACKS_URL = 'https://api.spotify.com/v1/tracks' + +TRACKNUMBER = 'tracknumber' + +DISCNUMBER = 'discnumber' + +YEAR = 'year' + +ALBUM = 'album' + +TRACKTITLE = 'tracktitle' + +ARTIST = 'artist' + +ARTISTS = 'artists' + +ARTWORK = 'artwork' + +TRACKS = 'tracks' + +TRACK = 'track' + +ITEMS = 'items' + +NAME = 'name' + +ID = 'id' + +URL = 'url' + +RELEASE_DATE = 'release_date' + +IMAGES = 'images' + +LIMIT = 'limit' + +OFFSET = 'offset' + +AUTHORIZATION = 'Authorization' + +IS_PLAYABLE = 'is_playable' + +TRACK_NUMBER = 'track_number' + +DISC_NUMBER = 'disc_number' + +SHOW = 'show' + +ERROR = 'error' + +EXPLICIT = 'explicit' + +PLAYLISTS = 'playlists' + +OWNER = 'owner' + +DISPLAY_NAME = 'display_name' + +ALBUMS = 'albums' + +TYPE = 'type' + +PREMIUM = 'premium' + +USER_READ_EMAIL = 'user-read-email' + +PLAYLIST_READ_PRIVATE = 'playlist-read-private' + +WINDOWS_SYSTEM = 'Windows' + +CREDENTIALS_JSON = 'credentials.json' + +CONFIG_FILE_PATH = '../zs_config.json' + +ROOT_PATH = 'ROOT_PATH' + +ROOT_PODCAST_PATH = 'ROOT_PODCAST_PATH' + +SKIP_EXISTING_FILES = 'SKIP_EXISTING_FILES' + +DOWNLOAD_FORMAT = 'DOWNLOAD_FORMAT' + +FORCE_PREMIUM = 'FORCE_PREMIUM' + +ANTI_BAN_WAIT_TIME = 'ANTI_BAN_WAIT_TIME' + +OVERRIDE_AUTO_WAIT = 'OVERRIDE_AUTO_WAIT' + +CHUNK_SIZE = 'CHUNK_SIZE' + +SPLIT_ALBUM_DISCS = 'SPLIT_ALBUM_DISCS' diff --git a/src/playlist.py b/src/playlist.py new file mode 100644 index 0000000..8a4357a --- /dev/null +++ b/src/playlist.py @@ -0,0 +1,87 @@ +from tqdm import tqdm + +from const import ITEMS, ID, TRACK, NAME +from track import download_track +from utils import sanitize_data +from zspotify import ZSpotify + +MY_PLAYLISTS_URL = 'https://api.spotify.com/v1/me/playlists' +PLAYLISTS_URL = 'https://api.spotify.com/v1/playlists' + + +def get_all_playlists(): + """ Returns list of users playlists """ + playlists = [] + limit = 50 + offset = 0 + + while True: + resp = ZSpotify.invoke_url_with_params(MY_PLAYLISTS_URL, limit=limit, offset=offset) + offset += limit + playlists.extend(resp[ITEMS]) + if len(resp[ITEMS]) < limit: + break + + return playlists + + +def get_playlist_songs(playlist_id): + """ returns list of songs in a playlist """ + songs = [] + offset = 0 + limit = 100 + + while True: + resp = ZSpotify.invoke_url_with_params(f'{PLAYLISTS_URL}/{playlist_id}/tracks', limit=limit, offset=offset) + offset += limit + songs.extend(resp[ITEMS]) + if len(resp[ITEMS]) < limit: + break + + return songs + + +def get_playlist_info(playlist_id): + """ Returns information scraped from playlist """ + resp = ZSpotify.invoke_url(f'{PLAYLISTS_URL}/{playlist_id}?fields=name,owner(display_name)&market=from_token') + return resp['name'].strip(), resp['owner']['display_name'].strip() + + +def download_playlist(playlists, playlist_number): + """Downloads all the songs from a playlist""" + + playlist_songs = [song for song in get_playlist_songs(playlists[int(playlist_number) - 1][ID]) if song[TRACK][ID]] + p_bar = tqdm(playlist_songs, unit='song', total=len(playlist_songs), unit_scale=True) + for song in p_bar: + download_track(song[TRACK][ID], sanitize_data(playlists[int(playlist_number) - 1][NAME].strip()) + '/', + disable_progressbar=True) + p_bar.set_description(song[TRACK][NAME]) + + +def download_from_user_playlist(): + """ Select which playlist(s) to download """ + playlists = get_all_playlists() + + count = 1 + for playlist in playlists: + print(str(count) + ': ' + playlist[NAME].strip()) + count += 1 + + print('\n> SELECT A PLAYLIST BY ID') + print('> SELECT A RANGE BY ADDING A DASH BETWEEN BOTH ID\'s') + print('> For example, typing 10 to get one playlist or 10-20 to get\nevery playlist from 10-20 (inclusive)\n') + + playlist_choices = input('ID(s): ').split('-') + + if len(playlist_choices) == 1: + download_playlist(playlists, playlist_choices[0]) + else: + start = int(playlist_choices[0]) + end = int(playlist_choices[1]) + 1 + + print(f'Downloading from {start} to {end}...') + + for playlist_number in range(start, end): + download_playlist(playlists, playlist_number) + + print('\n**All playlists have been downloaded**\n') diff --git a/src/podcast.py b/src/podcast.py new file mode 100644 index 0000000..43dae31 --- /dev/null +++ b/src/podcast.py @@ -0,0 +1,70 @@ +import os +from typing import Optional + +from librespot.audio.decoders import VorbisOnlyAudioQuality +from librespot.metadata import EpisodeId +from tqdm import tqdm + +from const import NAME, ERROR, SHOW, ITEMS, ID, ROOT_PODCAST_PATH, CHUNK_SIZE +from utils import sanitize_data, create_download_directory, MusicFormat +from zspotify import ZSpotify + + +EPISODE_INFO_URL = 'https://api.spotify.com/v1/episodes' +SHOWS_URL = 'https://api.spotify.com/v1/shows' + + +def get_episode_info(episode_id_str) -> tuple[Optional[str], Optional[str]]: + info = ZSpotify.invoke_url(f'{EPISODE_INFO_URL}/{episode_id_str}') + if ERROR in info: + return None, None + return sanitize_data(info[SHOW][NAME]), sanitize_data(info[NAME]) + + +def get_show_episodes(show_id_str) -> list: + episodes = [] + offset = 0 + limit = 50 + + while True: + resp = ZSpotify.invoke_url_with_params(f'{SHOWS_URL}/{show_id_str}/episodes', limit=limit, offset=offset) + offset += limit + for episode in resp[ITEMS]: + episodes.append(episode[ID]) + if len(resp[ITEMS]) < limit: + break + + return episodes + + +def download_episode(episode_id) -> None: + podcast_name, episode_name = get_episode_info(episode_id) + + extra_paths = podcast_name + '/' + + if podcast_name is None: + print('### SKIPPING: (EPISODE NOT FOUND) ###') + else: + filename = podcast_name + ' - ' + episode_name + + episode_id = EpisodeId.from_base62(episode_id) + stream = ZSpotify.get_content_stream(episode_id, ZSpotify.DOWNLOAD_QUALITY) + + download_directory = os.path.dirname(__file__) + ZSpotify.get_config(ROOT_PODCAST_PATH) + extra_paths + create_download_directory(download_directory) + + total_size = stream.input_stream.size + with open(download_directory + filename + MusicFormat.OGG.value, + 'wb') as file, tqdm( + desc=filename, + total=total_size, + unit='B', + unit_scale=True, + unit_divisor=1024 + ) as bar: + for _ in range(int(total_size / ZSpotify.get_config(CHUNK_SIZE)) + 1): + bar.update(file.write( + stream.input_stream.stream().read(ZSpotify.get_config(CHUNK_SIZE)))) + + # convert_audio_format(ROOT_PODCAST_PATH + + # extra_paths + filename + '.ogg') \ No newline at end of file diff --git a/src/track.py b/src/track.py new file mode 100644 index 0000000..3fb088f --- /dev/null +++ b/src/track.py @@ -0,0 +1,128 @@ +import os +import time +from typing import Any + +from librespot.audio.decoders import AudioQuality +from librespot.metadata import TrackId +from pydub import AudioSegment +from tqdm import tqdm + +from const import TRACKS, ALBUM, NAME, ITEMS, DISC_NUMBER, TRACK_NUMBER, IS_PLAYABLE, ARTISTS, IMAGES, URL, \ + RELEASE_DATE, ID, TRACKS_URL, SAVED_TRACKS_URL, SPLIT_ALBUM_DISCS, ROOT_PATH, DOWNLOAD_FORMAT, CHUNK_SIZE, \ + SKIP_EXISTING_FILES, ANTI_BAN_WAIT_TIME, OVERRIDE_AUTO_WAIT +from utils import sanitize_data, set_audio_tags, set_music_thumbnail, create_download_directory, \ + MusicFormat +from zspotify import ZSpotify + + +def get_saved_tracks() -> list: + """ Returns user's saved tracks """ + songs = [] + offset = 0 + limit = 50 + + while True: + resp = ZSpotify.invoke_url_with_params(SAVED_TRACKS_URL, limit=limit, offset=offset) + offset += limit + songs.extend(resp[ITEMS]) + if len(resp[ITEMS]) < limit: + break + + return songs + + +def get_song_info(song_id) -> tuple[list[str], str, str, Any, Any, Any, Any, Any, Any]: + """ Retrieves metadata for downloaded songs """ + info = ZSpotify.invoke_url(f'{TRACKS_URL}?ids={song_id}&market=from_token') + + artists = [] + for data in info[TRACKS][0][ARTISTS]: + artists.append(sanitize_data(data[NAME])) + album_name = sanitize_data(info[TRACKS][0][ALBUM][NAME]) + name = sanitize_data(info[TRACKS][0][NAME]) + image_url = info[TRACKS][0][ALBUM][IMAGES][0][URL] + release_year = info[TRACKS][0][ALBUM][RELEASE_DATE].split('-')[0] + disc_number = info[TRACKS][0][DISC_NUMBER] + track_number = info[TRACKS][0][TRACK_NUMBER] + scraped_song_id = info[TRACKS][0][ID] + is_playable = info[TRACKS][0][IS_PLAYABLE] + + return artists, album_name, name, image_url, release_year, disc_number, track_number, scraped_song_id, is_playable + + +# noinspection PyBroadException +def download_track(track_id: str, extra_paths='', prefix=False, prefix_value='', disable_progressbar=False) -> None: + """ Downloads raw song audio from Spotify """ + download_directory = os.path.join(os.path.dirname(__file__), ZSpotify.get_config(ROOT_PATH), extra_paths) + try: + (artists, album_name, name, image_url, release_year, disc_number, + track_number, scraped_song_id, is_playable) = get_song_info(track_id) + + song_name = artists[0] + ' - ' + name + if prefix: + song_name = f'{prefix_value.zfill(2)} - {song_name}' if prefix_value.isdigit( + ) else f'{prefix_value} - {song_name}' + + if ZSpotify.get_config(SPLIT_ALBUM_DISCS): + filename = os.path.join(download_directory, f'Disc {disc_number}', + f'{song_name}.{ZSpotify.get_config(DOWNLOAD_FORMAT)}') + else: + filename = os.path.join(download_directory, + f'{song_name}.{ZSpotify.get_config(DOWNLOAD_FORMAT)}') + except Exception: + print('### SKIPPING SONG - FAILED TO QUERY METADATA ###') + else: + try: + if not is_playable: + print('\n### SKIPPING:', song_name, + '(SONG IS UNAVAILABLE) ###') + else: + if os.path.isfile(filename) and os.path.getsize(filename) and ZSpotify.get_config(SKIP_EXISTING_FILES): + print('\n### SKIPPING:', song_name, + '(SONG ALREADY EXISTS) ###') + else: + if track_id != scraped_song_id: + track_id = scraped_song_id + track_id = TrackId.from_base62(track_id) + stream = ZSpotify.get_content_stream(track_id, ZSpotify.DOWNLOAD_QUALITY) + create_download_directory(download_directory) + total_size = stream.input_stream.size + + with open(filename, 'wb') as file, tqdm( + desc=song_name, + total=total_size, + unit='B', + unit_scale=True, + unit_divisor=1024, + disable=disable_progressbar + ) as p_bar: + for _ in range(int(total_size / ZSpotify.get_config(CHUNK_SIZE)) + 1): + p_bar.update(file.write( + stream.input_stream.stream().read(ZSpotify.get_config(CHUNK_SIZE)))) + + if ZSpotify.get_config(DOWNLOAD_FORMAT) == 'mp3': + convert_audio_format(filename) + set_audio_tags(filename, artists, name, album_name, + release_year, disc_number, track_number) + set_music_thumbnail(filename, image_url) + + if not ZSpotify.get_config(OVERRIDE_AUTO_WAIT): + time.sleep(ZSpotify.get_config(ANTI_BAN_WAIT_TIME)) + except Exception as e: + print('### SKIPPING:', song_name, + '(GENERAL DOWNLOAD ERROR) ###') + print(e) + if os.path.exists(filename): + os.remove(filename) + + +def convert_audio_format(filename) -> None: + """ Converts raw audio into playable mp3 """ + # print('### CONVERTING TO ' + MUSIC_FORMAT.upper() + ' ###') + raw_audio = AudioSegment.from_file(filename, format=MusicFormat.OGG.value, + frame_rate=44100, channels=2, sample_width=2) + if ZSpotify.DOWNLOAD_QUALITY == AudioQuality.VERY_HIGH: + bitrate = '320k' + else: + bitrate = '160k' + raw_audio.export(filename, format=ZSpotify.get_config(DOWNLOAD_FORMAT), bitrate=bitrate) diff --git a/src/utils.py b/src/utils.py new file mode 100644 index 0000000..3fd9161 --- /dev/null +++ b/src/utils.py @@ -0,0 +1,180 @@ +import os +import platform +import re +import time +from enum import Enum + +import music_tag +import requests + +from const import SANITIZE, ARTIST, TRACKTITLE, ALBUM, YEAR, DISCNUMBER, TRACKNUMBER, ARTWORK, \ + WINDOWS_SYSTEM + + +class MusicFormat(str, Enum): + MP3 = 'mp3', + OGG = 'ogg', + + +def create_download_directory(download_path: str) -> None: + os.makedirs(download_path, exist_ok=True) + + +def wait(seconds: int = 3) -> None: + """ Pause for a set number of seconds """ + for second in range(seconds)[::-1]: + print(f'\rWait for {second + 1} second(s)...', end='') + time.sleep(1) + + +def split_input(selection) -> list[str]: + """ Returns a list of inputted strings """ + inputs = [] + if '-' in selection: + for number in range(int(selection.split('-')[0]), int(selection.split('-')[1]) + 1): + inputs.append(number) + else: + selections = selection.split(',') + for i in selections: + inputs.append(i.strip()) + return inputs + + +def splash() -> None: + """ Displays splash screen """ + print(""" +███████ ███████ ██████ ██████ ████████ ██ ███████ ██ ██ + ███ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ + ███ ███████ ██████ ██ ██ ██ ██ █████ ████ + ███ ██ ██ ██ ██ ██ ██ ██ ██ +███████ ███████ ██ ██████ ██ ██ ██ ██ + """) + + +def clear() -> None: + """ Clear the console window """ + if platform.system() == WINDOWS_SYSTEM: + os.system('cls') + else: + os.system('clear') + + +def sanitize_data(value) -> str: + """ Returns given string with problematic removed """ + for pattern in SANITIZE: + value = value.replace(pattern, '') + return value.replace('|', '-') + + +def set_audio_tags(filename, artists, name, album_name, release_year, disc_number, track_number) -> None: + """ sets music_tag metadata """ + tags = music_tag.load_file(filename) + tags[ARTIST] = conv_artist_format(artists) + tags[TRACKTITLE] = name + tags[ALBUM] = album_name + tags[YEAR] = release_year + tags[DISCNUMBER] = disc_number + tags[TRACKNUMBER] = track_number + tags.save() + + +def conv_artist_format(artists) -> str: + """ Returns converted artist format """ + return ', '.join(artists) + + +def set_music_thumbnail(filename, image_url) -> None: + """ Downloads cover artwork """ + img = requests.get(image_url).content + tags = music_tag.load_file(filename) + tags[ARTWORK] = img + tags.save() + + +def regex_input_for_urls(search_input) -> tuple[str, str, str, str, str, str]: + """ Since many kinds of search may be passed at the command line, process them all here. """ + track_uri_search = re.search( + r'^spotify:track:(?P[0-9a-zA-Z]{22})$', search_input) + track_url_search = re.search( + r'^(https?://)?open\.spotify\.com/track/(?P[0-9a-zA-Z]{22})(\?si=.+?)?$', + search_input, + ) + + album_uri_search = re.search( + r'^spotify:album:(?P[0-9a-zA-Z]{22})$', search_input) + album_url_search = re.search( + r'^(https?://)?open\.spotify\.com/album/(?P[0-9a-zA-Z]{22})(\?si=.+?)?$', + search_input, + ) + + playlist_uri_search = re.search( + r'^spotify:playlist:(?P[0-9a-zA-Z]{22})$', search_input) + playlist_url_search = re.search( + r'^(https?://)?open\.spotify\.com/playlist/(?P[0-9a-zA-Z]{22})(\?si=.+?)?$', + search_input, + ) + + episode_uri_search = re.search( + r'^spotify:episode:(?P[0-9a-zA-Z]{22})$', search_input) + episode_url_search = re.search( + r'^(https?://)?open\.spotify\.com/episode/(?P[0-9a-zA-Z]{22})(\?si=.+?)?$', + search_input, + ) + + show_uri_search = re.search( + r'^spotify:show:(?P[0-9a-zA-Z]{22})$', search_input) + show_url_search = re.search( + r'^(https?://)?open\.spotify\.com/show/(?P[0-9a-zA-Z]{22})(\?si=.+?)?$', + search_input, + ) + + artist_uri_search = re.search( + r'^spotify:artist:(?P[0-9a-zA-Z]{22})$', search_input) + artist_url_search = re.search( + r'^(https?://)?open\.spotify\.com/artist/(?P[0-9a-zA-Z]{22})(\?si=.+?)?$', + search_input, + ) + + if track_uri_search is not None or track_url_search is not None: + track_id_str = (track_uri_search + if track_uri_search is not None else + track_url_search).group('TrackID') + else: + track_id_str = None + + if album_uri_search is not None or album_url_search is not None: + album_id_str = (album_uri_search + if album_uri_search is not None else + album_url_search).group('AlbumID') + else: + album_id_str = None + + if playlist_uri_search is not None or playlist_url_search is not None: + playlist_id_str = (playlist_uri_search + if playlist_uri_search is not None else + playlist_url_search).group('PlaylistID') + else: + playlist_id_str = None + + if episode_uri_search is not None or episode_url_search is not None: + episode_id_str = (episode_uri_search + if episode_uri_search is not None else + episode_url_search).group('EpisodeID') + else: + episode_id_str = None + + if show_uri_search is not None or show_url_search is not None: + show_id_str = (show_uri_search + if show_uri_search is not None else + show_url_search).group('ShowID') + else: + show_id_str = None + + if artist_uri_search is not None or artist_url_search is not None: + artist_id_str = (artist_uri_search + if artist_uri_search is not None else + artist_url_search).group('ArtistID') + else: + artist_id_str = None + + return track_id_str, album_id_str, playlist_id_str, episode_id_str, show_id_str, artist_id_str diff --git a/src/zspotify.py b/src/zspotify.py new file mode 100644 index 0000000..f206a80 --- /dev/null +++ b/src/zspotify.py @@ -0,0 +1,96 @@ +#! /usr/bin/env python3 + +""" +ZSpotify +It's like youtube-dl, but for Spotify. + +(Made by Deathmonger/Footsiefat - @doomslayer117:matrix.org) +""" +import json +import os +import os.path +from getpass import getpass +from typing import Any + +import requests +from librespot.audio.decoders import VorbisOnlyAudioQuality +from librespot.core import Session + +from const import CREDENTIALS_JSON, TYPE, \ + PREMIUM, USER_READ_EMAIL, AUTHORIZATION, OFFSET, LIMIT, CONFIG_FILE_PATH, FORCE_PREMIUM, \ + PLAYLIST_READ_PRIVATE +from utils import MusicFormat + + +class ZSpotify: + SESSION: Session = None + DOWNLOAD_QUALITY = None + CONFIG = {} + + def __init__(self): + ZSpotify.load_config() + ZSpotify.login() + + @classmethod + def login(cls): + """ Authenticates with Spotify and saves credentials to a file """ + + if os.path.isfile(CREDENTIALS_JSON): + try: + cls.SESSION = Session.Builder().stored_file().create() + return + except RuntimeError: + pass + while True: + user_name = '' + while len(user_name) == 0: + user_name = input('Username: ') + password = getpass() + try: + cls.SESSION = Session.Builder().user_pass(user_name, password).create() + return + except RuntimeError: + pass + + @classmethod + def load_config(cls) -> None: + app_dir = os.path.dirname(__file__) + with open(os.path.join(app_dir, CONFIG_FILE_PATH), encoding='utf-8') as config_file: + cls.CONFIG = json.load(config_file) + + @classmethod + def get_config(cls, key) -> Any: + return cls.CONFIG.get(key) + + @classmethod + def get_content_stream(cls, content_id, quality): + return cls.SESSION.content_feeder().load(content_id, VorbisOnlyAudioQuality(quality), False, None) + + @classmethod + def __get_auth_token(cls): + return cls.SESSION.tokens().get_token(USER_READ_EMAIL, PLAYLIST_READ_PRIVATE).access_token + + @classmethod + def get_auth_header(cls): + return { + AUTHORIZATION: f'Bearer {cls.__get_auth_token()}'} + + @classmethod + def get_auth_header_and_params(cls, limit, offset): + return {AUTHORIZATION: f'Bearer {cls.__get_auth_token()}'}, {LIMIT: limit, OFFSET: offset} + + @classmethod + def invoke_url_with_params(cls, url, limit, offset, **kwargs): + headers, params = cls.get_auth_header_and_params(limit=limit, offset=offset) + params.update(kwargs) + return requests.get(url, headers=headers, params=params).json() + + @classmethod + def invoke_url(cls, url): + headers = cls.get_auth_header() + return requests.get(url, headers=headers).json() + + @classmethod + def check_premium(cls) -> bool: + """ If user has spotify premium return true """ + return (cls.SESSION.get_user_attribute(TYPE) == PREMIUM) or cls.get_config(FORCE_PREMIUM) diff --git a/zs_config.json b/zs_config.json index 8c5bd05..b5dbb91 100644 --- a/zs_config.json +++ b/zs_config.json @@ -1,12 +1,11 @@ { - "ROOT_PATH": "ZSpotify Music/", - "ROOT_PODCAST_PATH": "ZSpotify Podcasts/", + "ROOT_PATH": "../ZSpotify Music/", + "ROOT_PODCAST_PATH": "../ZSpotify Podcasts/", "SKIP_EXISTING_FILES": true, - "MUSIC_FORMAT": "mp3", - "RAW_AUDIO_AS_IS": false, + "DOWNLOAD_FORMAT": "mp3", "FORCE_PREMIUM": false, "ANTI_BAN_WAIT_TIME": 1, "OVERRIDE_AUTO_WAIT": false, "CHUNK_SIZE": 50000, "SPLIT_ALBUM_DISCS": false -} +} \ No newline at end of file diff --git a/zspotify.py b/zspotify.py deleted file mode 100755 index 3c04c67..0000000 --- a/zspotify.py +++ /dev/null @@ -1,885 +0,0 @@ -#! /usr/bin/env python3 - -""" -ZSpotify -It's like youtube-dl, but for Spotify. - -(Made by Deathmonger/Footsiefat - @doomslayer117:matrix.org) -""" - -from getpass import getpass -import json -import os -import os.path -import platform -import re -import sys -import time - -from librespot.audio.decoders import AudioQuality, VorbisOnlyAudioQuality -from librespot.core import Session -from librespot.metadata import TrackId, EpisodeId -import music_tag -from pydub import AudioSegment -import requests -from tqdm import tqdm - -QUALITY = None -SESSION: Session = None -SANITIZE = ["\\", "/", ":", "*", "?", "'", "<", ">", "\""] - -# user-customizable variables that adjust the core functionality of ZSpotify -with open("zs_config.json", encoding="utf-8") as config_file: - ZS_CONFIG = json.load(config_file) - - -# miscellaneous functions for general use - - -def clear(): - """ Clear the console window """ - if platform.system() == "Windows": - os.system("cls") - else: - os.system("clear") - - -def wait(seconds: int = 3): - """ Pause for a set number of seconds """ - for i in range(seconds)[::-1]: - print(f"\rWait for {i + 1} second(s)...", end="") - time.sleep(1) - - -def sanitize_data(value): - """ Returns given string with problematic removed """ - for i in SANITIZE: - value = value.replace(i, "") - return value.replace("|", "-") - - -def split_input(selection): - """ Returns a list of inputted strings """ - inputs = [] - if "-" in selection: - for number in range(int(selection.split("-")[0]), int(selection.split("-")[1]) + 1): - inputs.append(number) - else: - selections = selection.split(",") - for i in selections: - inputs.append(i.strip()) - return inputs - - -def splash(): - """ Displays splash screen """ - print(""" -███████ ███████ ██████ ██████ ████████ ██ ███████ ██ ██ - ███ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ - ███ ███████ ██████ ██ ██ ██ ██ █████ ████ - ███ ██ ██ ██ ██ ██ ██ ██ ██ -███████ ███████ ██ ██████ ██ ██ ██ ██ - """) - - -# two mains functions for logging in and doing client stuff -def login(): - """ Authenticates with Spotify and saves credentials to a file """ - global SESSION # pylint: disable=global-statement - - if os.path.isfile("credentials.json"): - try: - SESSION = Session.Builder().stored_file().create() - return - except RuntimeError: - pass - while True: - user_name = input("Username: ") - password = getpass() - try: - SESSION = Session.Builder().user_pass(user_name, password).create() - return - except RuntimeError: - pass - - -def client(): # pylint: disable=too-many-branches,too-many-statements - """ Connects to spotify to perform query's and get songs to download """ - global QUALITY # pylint: disable=global-statement - splash() - - token = SESSION.tokens().get("user-read-email") - - if check_premium(): - print("[ DETECTED PREMIUM ACCOUNT - USING VERY_HIGH QUALITY ]\n\n") - QUALITY = AudioQuality.VERY_HIGH - else: - print("[ DETECTED FREE ACCOUNT - USING HIGH QUALITY ]\n\n") - QUALITY = AudioQuality.HIGH - - while True: - if len(sys.argv) > 1: - if sys.argv[1] == "-p" or sys.argv[1] == "--playlist": - download_from_user_playlist() - elif sys.argv[1] == "-ls" or sys.argv[1] == "--liked-songs": - for song in get_saved_tracks(token): - if not song["track"]["name"]: - print( - "### SKIPPING: SONG DOES NOT EXISTS ON SPOTIFY ANYMORE ###") - else: - download_track(song["track"]["id"], "Liked Songs/") - print("\n") - else: - track_id_str, album_id_str, playlist_id_str, episode_id_str, show_id_str, artist_id_str = regex_input_for_urls( - sys.argv[1]) - - if track_id_str is not None: - download_track(track_id_str) - elif artist_id_str is not None: - download_artist_albums(artist_id_str) - elif album_id_str is not None: - download_album(album_id_str) - elif playlist_id_str is not None: - playlist_songs = get_playlist_songs(token, playlist_id_str) - name, _ = get_playlist_info(token, playlist_id_str) - for song in playlist_songs: - download_track(song["track"]["id"], - sanitize_data(name) + "/") - print("\n") - elif episode_id_str is not None: - download_episode(episode_id_str) - elif show_id_str is not None: - for episode in get_show_episodes(token, show_id_str): - download_episode(episode) - else: - search_text = input("Enter search or URL: ") - - track_id_str, album_id_str, playlist_id_str, episode_id_str, show_id_str, artist_id_str = regex_input_for_urls( - search_text) - - if track_id_str is not None: - download_track(track_id_str) - elif artist_id_str is not None: - download_artist_albums(artist_id_str) - elif album_id_str is not None: - download_album(album_id_str) - elif playlist_id_str is not None: - playlist_songs = get_playlist_songs(token, playlist_id_str) - name, _ = get_playlist_info(token, playlist_id_str) - for song in playlist_songs: - download_track(song["track"]["id"], - sanitize_data(name) + "/") - print("\n") - elif episode_id_str is not None: - download_episode(episode_id_str) - elif show_id_str is not None: - for episode in get_show_episodes(token, show_id_str): - download_episode(episode) - else: - search(search_text) - # wait() - - -def regex_input_for_urls(search_input): # pylint: disable=too-many-locals - """ Since many kinds of search may be passed at the command line, process them all here. """ - track_uri_search = re.search( - r"^spotify:track:(?P[0-9a-zA-Z]{22})$", search_input) - track_url_search = re.search( - r"^(https?://)?open\.spotify\.com/track/(?P[0-9a-zA-Z]{22})(\?si=.+?)?$", - search_input, - ) - - album_uri_search = re.search( - r"^spotify:album:(?P[0-9a-zA-Z]{22})$", search_input) - album_url_search = re.search( - r"^(https?://)?open\.spotify\.com/album/(?P[0-9a-zA-Z]{22})(\?si=.+?)?$", - search_input, - ) - - playlist_uri_search = re.search( - r"^spotify:playlist:(?P[0-9a-zA-Z]{22})$", search_input) - playlist_url_search = re.search( - r"^(https?://)?open\.spotify\.com/playlist/(?P[0-9a-zA-Z]{22})(\?si=.+?)?$", - search_input, - ) - - episode_uri_search = re.search( - r"^spotify:episode:(?P[0-9a-zA-Z]{22})$", search_input) - episode_url_search = re.search( - r"^(https?://)?open\.spotify\.com/episode/(?P[0-9a-zA-Z]{22})(\?si=.+?)?$", - search_input, - ) - - show_uri_search = re.search( - r"^spotify:show:(?P[0-9a-zA-Z]{22})$", search_input) - show_url_search = re.search( - r"^(https?://)?open\.spotify\.com/show/(?P[0-9a-zA-Z]{22})(\?si=.+?)?$", - search_input, - ) - - artist_uri_search = re.search( - r"^spotify:artist:(?P[0-9a-zA-Z]{22})$", search_input) - artist_url_search = re.search( - r"^(https?://)?open\.spotify\.com/artist/(?P[0-9a-zA-Z]{22})(\?si=.+?)?$", - search_input, - ) - - if track_uri_search is not None or track_url_search is not None: - track_id_str = (track_uri_search - if track_uri_search is not None else - track_url_search).group("TrackID") - else: - track_id_str = None - - if album_uri_search is not None or album_url_search is not None: - album_id_str = (album_uri_search - if album_uri_search is not None else - album_url_search).group("AlbumID") - else: - album_id_str = None - - if playlist_uri_search is not None or playlist_url_search is not None: - playlist_id_str = (playlist_uri_search - if playlist_uri_search is not None else - playlist_url_search).group("PlaylistID") - else: - playlist_id_str = None - - if episode_uri_search is not None or episode_url_search is not None: - episode_id_str = (episode_uri_search - if episode_uri_search is not None else - episode_url_search).group("EpisodeID") - else: - episode_id_str = None - - if show_uri_search is not None or show_url_search is not None: - show_id_str = (show_uri_search - if show_uri_search is not None else - show_url_search).group("ShowID") - else: - show_id_str = None - - if artist_uri_search is not None or artist_url_search is not None: - artist_id_str = (artist_uri_search - if artist_uri_search is not None else - artist_url_search).group("ArtistID") - else: - artist_id_str = None - - return track_id_str, album_id_str, playlist_id_str, episode_id_str, show_id_str, artist_id_str - - -def get_episode_info(episode_id_str): # pylint: disable=missing-function-docstring - token = SESSION.tokens().get("user-read-email") - info = json.loads(requests.get(f"https://api.spotify.com/v1/episodes/{episode_id_str}", - headers={"Authorization": f"Bearer {token}"}).text) - - if "error" in info: - return None, None - # print(info["images"][0]["url"]) - return sanitize_data(info["show"]["name"]), sanitize_data(info["name"]) - - -def get_show_episodes(access_token, show_id_str): # pylint: disable=missing-function-docstring - episodes = [] - - headers = {"Authorization": f"Bearer {access_token}"} - resp = requests.get( - f"https://api.spotify.com/v1/shows/{show_id_str}/episodes", headers=headers).json() - - for episode in resp["items"]: - episodes.append(episode["id"]) - - return episodes - - -def download_episode(episode_id_str): # pylint: disable=missing-function-docstring - podcast_name, episode_name = get_episode_info(episode_id_str) - - extra_paths = podcast_name + "/" - - if podcast_name is None: - print("### SKIPPING: (EPISODE NOT FOUND) ###") - else: - filename = podcast_name + " - " + episode_name - - episode_id = EpisodeId.from_base62(episode_id_str) - stream = SESSION.content_feeder().load( - episode_id, VorbisOnlyAudioQuality(QUALITY), False, None) - # print("### DOWNLOADING '" + podcast_name + " - " + - # episode_name + "' - THIS MAY TAKE A WHILE ###") - - os.makedirs(ZS_CONFIG["ROOT_PODCAST_PATH"] + - extra_paths, exist_ok=True) - - total_size = stream.input_stream.size - with open(ZS_CONFIG["ROOT_PODCAST_PATH"] + extra_paths + filename + ".wav", "wb") as file, tqdm( - desc=filename, - total=total_size, - unit="B", - unit_scale=True, - unit_divisor=1024 - ) as p_bar: - for _ in range(int(total_size / ZS_CONFIG["CHUNK_SIZE"]) + 1): - p_bar.update(file.write( - stream.input_stream.stream().read(ZS_CONFIG["CHUNK_SIZE"]))) - - # convert_audio_format(ZS_CONFIG["ROOT_PODCAST_PATH"] + - # extra_paths + filename + ".wav") - - # related functions that do stuff with the spotify API - - -def search(search_term): # pylint: disable=too-many-locals,too-many-branches - """ Searches Spotify's API for relevant data """ - token = SESSION.tokens().get("user-read-email") - - params = { - "limit" : "10", - "q" : search_term, - "type" : set(), - } - - # Block for parsing passed arguments - splits = search_term.split() - for split in splits: - index = splits.index(split) - - if split[0] == "-" and len(split) > 1: - if len(splits)-1 == index: - raise IndexError("No parameters passed after option: {}\n". - format(split)) - - if split == "-l" or split == "-limit": - try: - int(splits[index+1]) - except ValueError: - raise ValueError("Paramater passed after {} option must be an integer.\n". - format(split)) - if int(splits[index+1]) > 50: - raise ValueError("Invalid limit passed. Max is 50.\n") - params["limit"] = splits[index+1] - - - if split == "-t" or split == "-type": - - allowed_types = ["track", "playlist", "album", "artist"] - for i in range(index+1, len(splits)): - if splits[i][0] == "-": - break - - if splits[i] not in allowed_types: - raise ValueError("Parameters passed after {} option must be from this list:\n{}". - format(split, '\n'.join(allowed_types))) - - params["type"].add(splits[i]) - - if len(params["type"]) == 0: - params["type"] = {"track", "album", "playlist", "artist"} - - # Clean search term - search_term_list = [] - for split in splits: - if split[0] == "-": - break - search_term_list.append(split) - if not search_term_list: - raise ValueError("Invalid query.") - params["q"] = ' '.join(search_term_list) - - resp = requests.get( - "https://api.spotify.com/v1/search", - { - "limit": params["limit"], - "offset": "0", - "q": params["q"], - "type": ",".join(params["type"]) - }, - headers={"Authorization": f"Bearer {token}"}, - ) - - # print(resp.json()) - - enum = 1 - dics = [] - - # add all returned tracks to dics - if "track" in params["type"]: - tracks = resp.json()["tracks"]["items"] - if len(tracks) > 0: - print("### TRACKS ###") - for track in tracks: - if track["explicit"]: - explicit = "[E]" - else: - explicit = "" - # collect needed data - dic = { - "id" : track["id"], - "name" : track["name"], - "artists/owner" : [artist["name"] for artist in track["artists"]], - "type" : "track", - } - dics.append(dic) - - print("{}, {} {} | {}".format( - enum, - dic["name"], - explicit, - ",".join(dic["artists/owner"]), - )) - enum += 1 - total_tracks = enum - 1 - print("\n") - # free up memory - del tracks - else: - total_tracks = 0 - - if "album" in params["type"]: - albums = resp.json()["albums"]["items"] - if len(albums) > 0: - print("### ALBUMS ###") - for album in albums: - # collect needed data - dic = { - "id" : album["id"], - "name" : album["name"], - "artists/owner" : [artist["name"] for artist in album["artists"]], - "type" : "album", - } - dics.append(dic) - - print("{}, {} | {}".format( - enum, - dic["name"], - ",".join(dic["artists/owner"]), - )) - enum += 1 - total_albums = enum - total_tracks - 1 - print("\n") - # free up memory - del albums - else: - total_albums = 0 - - if "playlist" in params["type"]: - playlists = resp.json()["playlists"]["items"] - if len(playlists) > 0: - print("### PLAYLISTS ###") - for playlist in playlists: - # collect needed data - dic = { - "id" : playlist["id"], - "name" : playlist["name"], - "artists/owner" : [playlist["owner"]["display_name"]], - "type" : "playlist", - } - dics.append(dic) - - print("{}, {} | {}".format( - enum, - dic["name"], - ",".join(dic['artists/owner']), - )) - - enum += 1 - total_playlists = enum - total_tracks - total_albums - 1 - print("\n") - # free up memory - del playlists - else: - total_playlists = 0 - - if "artist" in params["type"]: - artists = resp.json()["artists"]["items"] - if len(artists) > 0: - print("### ARTISTS ###") - for artist in artists: - # collect needed data - dic = { - "id" : artist["id"], - "name" : artist["name"], - "type" : "artist", - } - dics.append(dic) - - print("{}, {}".format( - enum, - dic["name"], - )) - - enum += 1 - total_artists = enum - total_tracks - total_albums - total_playlists - 1 - print("\n") - else: - total_artists = 0 - - if total_tracks + total_albums + total_playlists + total_artists == 0: - print("NO RESULTS FOUND - EXITING...") - else: - selection = str(input("SELECT ITEM(S) BY ID: ")) - inputs = split_input(selection) - for pos in inputs: - position = int(pos) - for dic in dics: - # find dictionary - print_pos = dics.index(dic) + 1 - if print_pos == position: - # if request is for track - if dic["type"] == "track": - download_track(dic["id"]) - # if request is for album - if dic["type"] == "album": - download_album(dic["id"]) - # if request is for artist - if dic["type"] == "artist": - download_artist_albums(dic["id"]) - # if request is for playlist - if dic["type"] == "playlist": - playlist_songs = get_playlist_songs(token, dic["id"]) - for song in playlist_songs: - if song["track"]["id"] is not None: - download_track(song["track"]["id"], - sanitize_data(dic["name"].strip()) + "/") - print("\n") - - -def get_song_info(song_id): - """ Retrieves metadata for downloaded songs """ - token = SESSION.tokens().get("user-read-email") - - info = json.loads(requests.get("https://api.spotify.com/v1/tracks?ids=" + song_id + - "&market=from_token", headers={"Authorization": f"Bearer {token}"}).text) - - artists = [] - for data in info["tracks"][0]["artists"]: - artists.append(sanitize_data(data["name"])) - album_name = sanitize_data(info["tracks"][0]["album"]["name"]) - name = sanitize_data(info["tracks"][0]["name"]) - image_url = info["tracks"][0]["album"]["images"][0]["url"] - release_year = info["tracks"][0]["album"]["release_date"].split("-")[0] - disc_number = info["tracks"][0]["disc_number"] - track_number = info["tracks"][0]["track_number"] - scraped_song_id = info["tracks"][0]["id"] - is_playable = info["tracks"][0]["is_playable"] - - return artists, album_name, name, image_url, release_year, disc_number, track_number, scraped_song_id, is_playable - - -def check_premium(): - """ If user has spotify premium return true """ - return bool((SESSION.get_user_attribute("type") == "premium") or ZS_CONFIG["FORCE_PREMIUM"]) - - -# Functions directly related to modifying the downloaded audio and its metadata -def convert_audio_format(filename): - """ Converts raw audio into playable mp3 or ogg vorbis """ - # print("### CONVERTING TO " + ZS_CONFIG["MUSIC_FORMAT"].upper() + " ###") - raw_audio = AudioSegment.from_file(filename, format="ogg", - frame_rate=44100, channels=2, sample_width=2) - if QUALITY == AudioQuality.VERY_HIGH: - bitrate = "320k" - else: - bitrate = "160k" - raw_audio.export( - filename, format=ZS_CONFIG["MUSIC_FORMAT"], bitrate=bitrate) - - -def set_audio_tags(filename, artists, name, album_name, release_year, disc_number, track_number): # pylint: disable=too-many-arguments - """ sets music_tag metadata """ - # print("### SETTING MUSIC TAGS ###") - tags = music_tag.load_file(filename) - tags["artist"] = conv_artist_format(artists) - tags["tracktitle"] = name - tags["album"] = album_name - tags["year"] = release_year - tags["discnumber"] = disc_number - tags["tracknumber"] = track_number - tags.save() - - -def set_music_thumbnail(filename, image_url): - """ Downloads cover artwork """ - # print("### SETTING THUMBNAIL ###") - img = requests.get(image_url).content - tags = music_tag.load_file(filename) - tags["artwork"] = img - tags.save() - - -def conv_artist_format(artists): - """ Returns converted artist format """ - formatted = "" - for artist in artists: - formatted += artist + ", " - return formatted[:-2] - - -# Extra functions directly related to spotify playlists -def get_all_playlists(access_token): - """ Returns list of users playlists """ - playlists = [] - limit = 50 - offset = 0 - - while True: - headers = {"Authorization": f"Bearer {access_token}"} - params = {"limit": limit, "offset": offset} - resp = requests.get("https://api.spotify.com/v1/me/playlists", - headers=headers, params=params).json() - offset += limit - playlists.extend(resp["items"]) - - if len(resp["items"]) < limit: - break - - return playlists - - -def get_playlist_songs(access_token, playlist_id): - """ returns list of songs in a playlist """ - songs = [] - offset = 0 - limit = 100 - - while True: - headers = {"Authorization": f"Bearer {access_token}"} - params = {"limit": limit, "offset": offset} - resp = requests.get( - f"https://api.spotify.com/v1/playlists/{playlist_id}/tracks", headers=headers, params=params).json() - offset += limit - songs.extend(resp["items"]) - - if len(resp["items"]) < limit: - break - - return songs - - -def get_playlist_info(access_token, playlist_id): - """ Returns information scraped from playlist """ - headers = {"Authorization": f"Bearer {access_token}"} - resp = requests.get( - f"https://api.spotify.com/v1/playlists/{playlist_id}?fields=name,owner(display_name)&market=from_token", - headers=headers).json() - return resp["name"].strip(), resp["owner"]["display_name"].strip() - - -# Extra functions directly related to spotify albums -def get_album_tracks(access_token, album_id): - """ Returns album tracklist """ - songs = [] - offset = 0 - limit = 50 - - while True: - headers = {"Authorization": f"Bearer {access_token}"} - params = {"limit": limit, "offset": offset} - resp = requests.get( - f"https://api.spotify.com/v1/albums/{album_id}/tracks", headers=headers, params=params).json() - offset += limit - songs.extend(resp["items"]) - - if len(resp["items"]) < limit: - break - - return songs - - -def get_album_name(access_token, album_id): - """ Returns album name """ - headers = {"Authorization": f"Bearer {access_token}"} - resp = requests.get( - f"https://api.spotify.com/v1/albums/{album_id}", headers=headers).json() - return resp["artists"][0]["name"], sanitize_data(resp["name"]) - -# Extra functions directly related to spotify artists - - -def get_artist_albums(access_token, artist_id): - """ Returns artist's albums """ - headers = {"Authorization": f"Bearer {access_token}"} - resp = requests.get( - f"https://api.spotify.com/v1/artists/{artist_id}/albums", headers=headers).json() - # Return a list each album's id - return [resp["items"][i]["id"] for i in range(len(resp["items"]))] - -# Extra functions directly related to our saved tracks - - -def get_saved_tracks(access_token): - """ Returns user's saved tracks """ - songs = [] - offset = 0 - limit = 50 - - while True: - headers = {"Authorization": f"Bearer {access_token}"} - params = {"limit": limit, "offset": offset} - resp = requests.get("https://api.spotify.com/v1/me/tracks", - headers=headers, params=params).json() - offset += limit - songs.extend(resp["items"]) - - if len(resp["items"]) < limit: - break - - return songs - - -# Functions directly related to downloading stuff -def download_track(track_id_str: str, extra_paths="", prefix=False, prefix_value="", disable_progressbar=False): # pylint: disable=too-many-locals,too-many-branches - """ Downloads raw song audio from Spotify """ - try: - artists, album_name, name, image_url, release_year, disc_number, track_number, scraped_song_id, is_playable = get_song_info( - track_id_str) - - song_name = artists[0] + " - " + name - if prefix: - song_name = f"{prefix_value.zfill(2)} - {song_name}" if prefix_value.isdigit( - ) else f"{prefix_value} - {song_name}" - - if ZS_CONFIG["SPLIT_ALBUM_DISCS"]: - filename = os.path.join(ZS_CONFIG["ROOT_PATH"], extra_paths, "Disc " + str( - disc_number) + "/" + song_name + "." + ZS_CONFIG["MUSIC_FORMAT"]) - else: - filename = os.path.join(ZS_CONFIG["ROOT_PATH"], extra_paths, - song_name + "." + ZS_CONFIG["MUSIC_FORMAT"]) - except Exception as err: # pylint: disable=broad-except,unused-variable - print("### SKIPPING SONG - FAILED TO QUERY METADATA ###") - # print(err) - else: - try: - if not is_playable: - print("### SKIPPING:", song_name, - "(SONG IS UNAVAILABLE) ###") - else: - if os.path.isfile(filename) and os.path.getsize(filename) and ZS_CONFIG["SKIP_EXISTING_FILES"]: - print("### SKIPPING:", song_name, - "(SONG ALREADY EXISTS) ###") - else: - if track_id_str != scraped_song_id: - track_id_str = scraped_song_id - - track_id = TrackId.from_base62(track_id_str) - # print("### FOUND SONG:", song_name, " ###") - - stream = SESSION.content_feeder().load( - track_id, VorbisOnlyAudioQuality(QUALITY), False, None) - # print("### DOWNLOADING RAW AUDIO ###") - - if ZS_CONFIG["SPLIT_ALBUM_DISCS"]: - os.makedirs( - ZS_CONFIG["ROOT_PATH"] + extra_paths + "/Disc " + str(disc_number) + "/", exist_ok=True) - else: - os.makedirs(ZS_CONFIG["ROOT_PATH"] + - extra_paths, exist_ok=True) - - total_size = stream.input_stream.size - with open(filename, "wb") as file, tqdm( - desc=song_name, - total=total_size, - unit="B", - unit_scale=True, - unit_divisor=1024, - disable=disable_progressbar - ) as p_bar: - for _ in range(int(total_size / ZS_CONFIG["CHUNK_SIZE"]) + 1): - p_bar.update(file.write( - stream.input_stream.stream().read(ZS_CONFIG["CHUNK_SIZE"]))) - - if not ZS_CONFIG["RAW_AUDIO_AS_IS"]: - convert_audio_format(filename) - set_audio_tags(filename, artists, name, album_name, - release_year, disc_number, track_number) - set_music_thumbnail(filename, image_url) - - if not ZS_CONFIG["OVERRIDE_AUTO_WAIT"]: - time.sleep(ZS_CONFIG["ANTI_BAN_WAIT_TIME"]) - except Exception: # pylint: disable=broad-except - print("### SKIPPING:", song_name, - "(GENERAL DOWNLOAD ERROR) ###") - if os.path.exists(filename): - os.remove(filename) - - -def download_album(album): - """ Downloads songs from an album """ - token = SESSION.tokens().get("user-read-email") - artist, album_name = get_album_name(token, album) - tracks = get_album_tracks(token, album) - for num, track in tqdm(enumerate(tracks, start=1), unit_scale=True, unit="Song", total=len(tracks)): - download_track(track["id"], f"{artist}/{album_name}", - prefix=True, prefix_value=str(num), disable_progressbar=True) - - -def download_artist_albums(artist): - """ Downloads albums of an artist """ - token = SESSION.tokens().get("user-read-email") - albums = get_artist_albums(token, artist) - for album_id in albums: - download_album(album_id) - - -def download_playlist(playlists, playlist_choice): - """Downloads all the songs from a playlist""" - token = SESSION.tokens().get("user-read-email") - - playlist_songs = get_playlist_songs( - token, playlists[int(playlist_choice) - 1]["id"]) - - for song in playlist_songs: - if song["track"]["id"] is not None: - download_track(song["track"]["id"], sanitize_data( - playlists[int(playlist_choice) - 1]["name"].strip()) + "/") - print("\n") - - -def download_from_user_playlist(): - """ Select which playlist(s) to download """ - token = SESSION.tokens().get("user-read-email") - playlists = get_all_playlists(token) - - count = 1 - for playlist in playlists: - print(str(count) + ": " + playlist["name"].strip()) - count += 1 - - print("\n> SELECT A PLAYLIST BY ID") - print("> SELECT A RANGE BY ADDING A DASH BETWEEN BOTH ID's") - print("> For example, typing 10 to get one playlist or 10-20 to get\nevery playlist from 10-20 (inclusive)\n") - - playlist_choices = input("ID(s): ").split("-") - - if len(playlist_choices) == 1: - download_playlist(playlists, playlist_choices[0]) - else: - start = int(playlist_choices[0]) - end = int(playlist_choices[1]) + 1 - - print(f"Downloading from {start} to {end}...") - - for playlist in range(start, end): - download_playlist(playlists, playlist) - - print("\n**All playlists have been downloaded**\n") - - -# Core functions here - - -def check_raw(): # pylint: disable=missing-function-docstring - if ZS_CONFIG["RAW_AUDIO_AS_IS"]: - ZS_CONFIG["MUSIC_FORMAT"] = "wav" - - -def main(): - """ Main function """ - check_raw() - login() - client() - - -if __name__ == "__main__": - main()