This commit is contained in:
yiannisha 2021-10-24 17:00:37 +03:00
commit 1c62ff6604
16 changed files with 991 additions and 898 deletions

22
.github/workflows/pylint.yml vendored Normal file
View File

@ -0,0 +1,22 @@
name: Pylint
on: [push]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python 3.9
uses: actions/setup-python@v2
with:
python-version: 3.9
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pylint
- name: Analysing the code with pylint
run: |
pylint `ls -R|grep .py$|xargs`

4
.gitignore vendored
View File

@ -1,5 +1,6 @@
# Byte-compiled / optimized / DLL files
__pycache__/
src/__pycache__/
*.py[cod]
*$py.class
@ -142,10 +143,11 @@ cython_debug/
# Spotify Credentials
credentials.json
src/credentials.json
#Download Folder
ZSpotify\ Music/
ZSpotify\ Podcasts/
# Intellij
.idea
.idea

40
CONTRIBUTING.md Normal file
View File

@ -0,0 +1,40 @@
# Introduction
### Thank you for contributing
Without people like you this project wouldn't be anywhere near as polished and feature-rich as it is now.
### Guidelines
Following these guidelines helps show that you respect the the time and effort spent by the developers and your fellow contributors making this project.
### What we are looking for
ZSpotify is a community-driven project. There are many different ways to contribute. From providing tutorials and examples to help new users, reporting bugs, requesting new features, writing new code that can be added to the project, or even writing documentation.
### What we aren't looking for
Please don't use the issues section to request help installing or setting up the project. It should be reserved for bugs when running the code, and feature requqests. Instead use the support channel in either our Discord or Matrix server.
# Ground rules
### Expectations
* Ensure all code is linted with pylint before pushing.
* Ensure all code passes the [testing criteria](#testing-criteria).
* If you're planning on contributing a new feature, join the Discord or Matrix and discuss it with the Dev Team.
* Please don't commit multiple new features at once.
* Follow the [Python Community Code of Conduct](https://www.python.org/psf/codeofconduct/)
# Your first contribution
Unsure where to start? Have a look for any issues tagged "good first issue". They should be minor bugs that only require a few lines to fix.
Here are a couple of friendly tutorials on making pull requests: http://makeapullrequest.com/ and http://www.firsttimersonly.com/
# Code review process
The dev team looks at Pull Requests around once per day. After feedback has been given we expect responses within one week. After a week we may close the pull request if it isn't showing any activity.
> ZSpotify updates very frequently, often multiple times per day. If a maintainer asks you to "rebase" your PR, they're saying that a lot of code has changed, and that you need to update your branch so it's easier to merge.
# Community
Come and chat with us on Discord or Matrix. Devs try to respond to mentions at least once per day.

19
Dockerfile Normal file
View File

@ -0,0 +1,19 @@
FROM python:3.9-alpine as base
RUN apk --update add git ffmpeg
FROM base as builder
RUN mkdir /install
WORKDIR /install
COPY requirements.txt /requirements.txt
RUN apk add gcc libc-dev zlib zlib-dev jpeg-dev \
&& pip install --prefix="/install" -r /requirements.txt
FROM base
COPY --from=builder /install /usr/local
COPY src /app
COPY zs_config.json /
WORKDIR /app
ENTRYPOINT ["/usr/local/bin/python", "app.py"]

View File

@ -14,7 +14,7 @@ Requirements:
Binaries
- Python 3.8 or greater
- Python 3.9 or greater
- ffmpeg*
- Git**
@ -29,9 +29,9 @@ Python packages:
\*\*Git can be installed via apt for Debian-based distros or by downloading the binaries from [git-scm.com](https://git-scm.com/download/win) for Windows.
```
Command line usage:
python zspotify.py Loads search prompt to find then download a specific track, album or playlist
python zspotify.py <track/album/playlist/episode url> Downloads the track, album, playlist or podcast episode specified as a command line argument
python zspotify.py <artist url> Downloads all albums by specified artist
python app.py Loads search prompt to find then download a specific track, album or playlist
python app.py <track/album/playlist/episode url> Downloads the track, album, playlist or podcast episode specified as a command line argument
python app.py <artist url> Downloads all albums by specified artist
Extra command line options:
-p, --playlist Downloads a saved playlist from your account
@ -43,8 +43,7 @@ Options that can be configured in zs_config.json:
SKIP_EXISTING_FILES Set this to false if you want ZSpotify to overwrite files with the same name rather than skipping the song
MUSIC_FORMAT Set this to "ogg" if you would rather that format audio over "mp3"
RAW_AUDIO_AS_IS Set this to true to only stream the audio to a file and do no re-encoding or post processing
MUSIC_FORMAT Can be "mp3" or "ogg", mp3 is required for track metadata however ogg is slightly higer quality as it is not trsnacoded.
FORCE_PREMIUM Set this to true if ZSpotify isn't automatically detecting that you are using a premium account
@ -56,10 +55,24 @@ Currently no user has reported their account getting banned after using ZSpotify
This isn't to say _you_ won't get banned as it is technically against Spotify's TOS.
**Use ZSpotify at your own risk**, the developers of ZSpotify are not responsible if your account gets banned.
### What do I do if I see "Your session has been terminated"?
If you see this, don't worry! Just try logging back in. If you see the incorrect username or password error, reset your password and you should be able to log back in and continue using Spotify.
### Contributing
Please be sure to lint your code with pylint before issuing a pull-request, thanks!
Please refer to CONTRIBUTING.md
## **Changelog:**
**v2.2 (24 Oct 2021):**
- Added basic support for downloading an entire podcast series.
- Split code into multiple files for easier maintenance.
- Changed initial launch script to app.py
- Simplified audio formats.
- Added prebuild exe for Windows users.
- Added Docker file.
- Added CONTRIBUTING.md.
- Fixed artist names getting cutoff in metadata.
- Removed data sanitization of metadata tags.
**v2.1 (23 Oct 2021):**
- Moved configuration from hard-coded values to separate zs_config.json file.
- Add subfolders for each disc.

View File

@ -3,3 +3,4 @@ music_tag
pydub
Pillow
tqdm
tabulate

54
src/album.py Normal file
View File

@ -0,0 +1,54 @@
from tqdm import tqdm
from const import ITEMS, ARTISTS, NAME, ID
from track import download_track
from utils import sanitize_data
from zspotify import ZSpotify
ALBUM_URL = 'https://api.spotify.com/v1/albums'
ARTIST_URL = 'https://api.spotify.com/v1/artists'
def get_album_tracks(album_id):
""" Returns album tracklist """
songs = []
offset = 0
limit = 50
while True:
resp = ZSpotify.invoke_url_with_params(f'{ALBUM_URL}/{album_id}/tracks', limit=limit, offset=offset)
offset += limit
songs.extend(resp[ITEMS])
if len(resp[ITEMS]) < limit:
break
return songs
def get_album_name(album_id):
""" Returns album name """
resp = ZSpotify.invoke_url(f'{ALBUM_URL}/{album_id}')
return resp[ARTISTS][0][NAME], sanitize_data(resp[NAME])
def get_artist_albums(artist_id):
""" Returns artist's albums """
resp = ZSpotify.invoke_url(f'{ARTIST_URL}/{artist_id}/albums')
# Return a list each album's id
return [resp[ITEMS][i][ID] for i in range(len(resp[ITEMS]))]
def download_album(album):
""" Downloads songs from an album """
artist, album_name = get_album_name(album)
tracks = get_album_tracks(album)
for n, track in tqdm(enumerate(tracks, start=1), unit_scale=True, unit='Song', total=len(tracks)):
download_track(track[ID], f'{artist}/{album_name}',
prefix=True, prefix_value=str(n), disable_progressbar=True)
def download_artist_albums(artist):
""" Downloads albums of an artist """
albums = get_artist_albums(artist)
for album_id in albums:
download_album(album_id)

172
src/app.py Normal file
View File

@ -0,0 +1,172 @@
import sys
from librespot.audio.decoders import AudioQuality
from tabulate import tabulate
from album import download_album, download_artist_albums
from const import TRACK, NAME, ID, ARTISTS, ITEMS, TRACKS, EXPLICIT, ALBUMS, OWNER, \
PLAYLISTS, DISPLAY_NAME
from playlist import get_playlist_songs, get_playlist_info, download_from_user_playlist, download_playlist
from podcast import download_episode, get_show_episodes
from track import download_track, get_saved_tracks
from utils import sanitize_data, splash, split_input, regex_input_for_urls
from zspotify import ZSpotify
SEARCH_URL = 'https://api.spotify.com/v1/search'
def client() -> None:
""" Connects to spotify to perform query's and get songs to download """
ZSpotify()
splash()
if ZSpotify.check_premium():
print('[ DETECTED PREMIUM ACCOUNT - USING VERY_HIGH QUALITY ]\n\n')
ZSpotify.DOWNLOAD_QUALITY = AudioQuality.VERY_HIGH
else:
print('[ DETECTED FREE ACCOUNT - USING HIGH QUALITY ]\n\n')
ZSpotify.DOWNLOAD_QUALITY = AudioQuality.HIGH
while True:
if len(sys.argv) > 1:
if sys.argv[1] == '-p' or sys.argv[1] == '--playlist':
download_from_user_playlist()
elif sys.argv[1] == '-ls' or sys.argv[1] == '--liked-songs':
for song in get_saved_tracks():
if not song[TRACK][NAME]:
print('### SKIPPING: SONG DOES NOT EXISTS ON SPOTIFY ANYMORE ###')
else:
download_track(song[TRACK][ID], 'Liked Songs/')
print('\n')
else:
track_id, album_id, playlist_id, episode_id, show_id, artist_id = regex_input_for_urls(sys.argv[1])
if track_id is not None:
download_track(track_id)
elif artist_id is not None:
download_artist_albums(artist_id)
elif album_id is not None:
download_album(album_id)
elif playlist_id is not None:
playlist_songs = get_playlist_songs(playlist_id)
name, _ = get_playlist_info(playlist_id)
for song in playlist_songs:
download_track(song[TRACK][ID],
sanitize_data(name) + '/')
print('\n')
elif episode_id is not None:
download_episode(episode_id)
elif show_id is not None:
for episode in get_show_episodes(show_id):
download_episode(episode)
else:
search_text = ''
while len(search_text) == 0:
search_text = input('Enter search or URL: ')
track_id, album_id, playlist_id, episode_id, show_id, artist_id = regex_input_for_urls(search_text)
if track_id is not None:
download_track(track_id)
elif artist_id is not None:
download_artist_albums(artist_id)
elif album_id is not None:
download_album(album_id)
elif playlist_id is not None:
playlist_songs = get_playlist_songs(playlist_id)
name, _ = get_playlist_info(playlist_id)
for song in playlist_songs:
download_track(song[TRACK][ID], sanitize_data(name) + '/')
print('\n')
elif episode_id is not None:
download_episode(episode_id)
elif show_id is not None:
for episode in get_show_episodes(show_id):
download_episode(episode)
else:
search(search_text)
# wait()
def search(search_term):
""" Searches Spotify's API for relevant data """
params = {'limit': '10', 'offset': '0', 'q': search_term, 'type': 'track,album,artist,playlist'}
resp = ZSpotify.invoke_url_with_params(SEARCH_URL, **params)
counter = 1
tracks = resp[TRACKS][ITEMS]
if len(tracks) > 0:
print('### TRACKS ###')
track_data = []
for track in tracks:
if track[EXPLICIT]:
explicit = '[E]'
else:
explicit = ''
track_data.append([counter, f'{track[NAME]} {explicit}',
','.join([artist[NAME] for artist in track[ARTISTS]])])
counter += 1
total_tracks = counter - 1
print(tabulate(track_data, headers=['S.NO', 'Name', 'Artists'], tablefmt='pretty'))
print('\n')
else:
total_tracks = 0
albums = resp[ALBUMS][ITEMS]
if len(albums) > 0:
print('### ALBUMS ###')
album_data = []
for album in albums:
album_data.append([counter, album[NAME], ','.join([artist[NAME] for artist in album[ARTISTS]])])
counter += 1
total_albums = counter - total_tracks - 1
print(tabulate(album_data, headers=['S.NO', 'Album', 'Artists'], tablefmt='pretty'))
print('\n')
else:
total_albums = 0
artists = resp[ARTISTS][ITEMS]
if len(artists) > 0:
print('### ARTISTS ###')
artist_data = []
for artist in artists:
artist_data.append([counter, artist[NAME]])
counter += 1
total_artists = counter - total_tracks - total_albums - 1
print(tabulate(artist_data, headers=['S.NO', 'Name'], tablefmt='pretty'))
print('\n')
else:
total_artists = 0
playlists = resp[PLAYLISTS][ITEMS]
print('### PLAYLISTS ###')
playlist_data = []
for playlist in playlists:
playlist_data.append([counter, playlist[NAME], playlist[OWNER][DISPLAY_NAME]])
counter += 1
print(tabulate(playlist_data, headers=['S.NO', 'Name', 'Owner'], tablefmt='pretty'))
print('\n')
if len(tracks) + len(albums) + len(playlists) == 0:
print('NO RESULTS FOUND - EXITING...')
else:
selection = ''
while len(selection) == 0:
selection = str(input('SELECT ITEM(S) BY S.NO: '))
inputs = split_input(selection)
for pos in inputs:
position = int(pos)
if position <= total_tracks:
track_id = tracks[position - 1][ID]
download_track(track_id)
elif position <= total_albums + total_tracks:
download_album(albums[position - total_tracks - 1][ID])
elif position <= total_artists + total_tracks + total_albums:
download_artist_albums(artists[position - total_tracks - total_albums - 1][ID])
else:
download_playlist(playlists, position - total_tracks - total_albums - total_artists)
if __name__ == '__main__':
client()

95
src/const.py Normal file
View File

@ -0,0 +1,95 @@
SANITIZE = ('\\', '/', ':', '*', '?', '\'', '<', '>', '"')
SAVED_TRACKS_URL = 'https://api.spotify.com/v1/me/tracks'
TRACKS_URL = 'https://api.spotify.com/v1/tracks'
TRACKNUMBER = 'tracknumber'
DISCNUMBER = 'discnumber'
YEAR = 'year'
ALBUM = 'album'
TRACKTITLE = 'tracktitle'
ARTIST = 'artist'
ARTISTS = 'artists'
ARTWORK = 'artwork'
TRACKS = 'tracks'
TRACK = 'track'
ITEMS = 'items'
NAME = 'name'
ID = 'id'
URL = 'url'
RELEASE_DATE = 'release_date'
IMAGES = 'images'
LIMIT = 'limit'
OFFSET = 'offset'
AUTHORIZATION = 'Authorization'
IS_PLAYABLE = 'is_playable'
TRACK_NUMBER = 'track_number'
DISC_NUMBER = 'disc_number'
SHOW = 'show'
ERROR = 'error'
EXPLICIT = 'explicit'
PLAYLISTS = 'playlists'
OWNER = 'owner'
DISPLAY_NAME = 'display_name'
ALBUMS = 'albums'
TYPE = 'type'
PREMIUM = 'premium'
USER_READ_EMAIL = 'user-read-email'
PLAYLIST_READ_PRIVATE = 'playlist-read-private'
WINDOWS_SYSTEM = 'Windows'
CREDENTIALS_JSON = 'credentials.json'
CONFIG_FILE_PATH = '../zs_config.json'
ROOT_PATH = 'ROOT_PATH'
ROOT_PODCAST_PATH = 'ROOT_PODCAST_PATH'
SKIP_EXISTING_FILES = 'SKIP_EXISTING_FILES'
DOWNLOAD_FORMAT = 'DOWNLOAD_FORMAT'
FORCE_PREMIUM = 'FORCE_PREMIUM'
ANTI_BAN_WAIT_TIME = 'ANTI_BAN_WAIT_TIME'
OVERRIDE_AUTO_WAIT = 'OVERRIDE_AUTO_WAIT'
CHUNK_SIZE = 'CHUNK_SIZE'
SPLIT_ALBUM_DISCS = 'SPLIT_ALBUM_DISCS'

87
src/playlist.py Normal file
View File

@ -0,0 +1,87 @@
from tqdm import tqdm
from const import ITEMS, ID, TRACK, NAME
from track import download_track
from utils import sanitize_data
from zspotify import ZSpotify
MY_PLAYLISTS_URL = 'https://api.spotify.com/v1/me/playlists'
PLAYLISTS_URL = 'https://api.spotify.com/v1/playlists'
def get_all_playlists():
""" Returns list of users playlists """
playlists = []
limit = 50
offset = 0
while True:
resp = ZSpotify.invoke_url_with_params(MY_PLAYLISTS_URL, limit=limit, offset=offset)
offset += limit
playlists.extend(resp[ITEMS])
if len(resp[ITEMS]) < limit:
break
return playlists
def get_playlist_songs(playlist_id):
""" returns list of songs in a playlist """
songs = []
offset = 0
limit = 100
while True:
resp = ZSpotify.invoke_url_with_params(f'{PLAYLISTS_URL}/{playlist_id}/tracks', limit=limit, offset=offset)
offset += limit
songs.extend(resp[ITEMS])
if len(resp[ITEMS]) < limit:
break
return songs
def get_playlist_info(playlist_id):
""" Returns information scraped from playlist """
resp = ZSpotify.invoke_url(f'{PLAYLISTS_URL}/{playlist_id}?fields=name,owner(display_name)&market=from_token')
return resp['name'].strip(), resp['owner']['display_name'].strip()
def download_playlist(playlists, playlist_number):
"""Downloads all the songs from a playlist"""
playlist_songs = [song for song in get_playlist_songs(playlists[int(playlist_number) - 1][ID]) if song[TRACK][ID]]
p_bar = tqdm(playlist_songs, unit='song', total=len(playlist_songs), unit_scale=True)
for song in p_bar:
download_track(song[TRACK][ID], sanitize_data(playlists[int(playlist_number) - 1][NAME].strip()) + '/',
disable_progressbar=True)
p_bar.set_description(song[TRACK][NAME])
def download_from_user_playlist():
""" Select which playlist(s) to download """
playlists = get_all_playlists()
count = 1
for playlist in playlists:
print(str(count) + ': ' + playlist[NAME].strip())
count += 1
print('\n> SELECT A PLAYLIST BY ID')
print('> SELECT A RANGE BY ADDING A DASH BETWEEN BOTH ID\'s')
print('> For example, typing 10 to get one playlist or 10-20 to get\nevery playlist from 10-20 (inclusive)\n')
playlist_choices = input('ID(s): ').split('-')
if len(playlist_choices) == 1:
download_playlist(playlists, playlist_choices[0])
else:
start = int(playlist_choices[0])
end = int(playlist_choices[1]) + 1
print(f'Downloading from {start} to {end}...')
for playlist_number in range(start, end):
download_playlist(playlists, playlist_number)
print('\n**All playlists have been downloaded**\n')

70
src/podcast.py Normal file
View File

@ -0,0 +1,70 @@
import os
from typing import Optional
from librespot.audio.decoders import VorbisOnlyAudioQuality
from librespot.metadata import EpisodeId
from tqdm import tqdm
from const import NAME, ERROR, SHOW, ITEMS, ID, ROOT_PODCAST_PATH, CHUNK_SIZE
from utils import sanitize_data, create_download_directory, MusicFormat
from zspotify import ZSpotify
EPISODE_INFO_URL = 'https://api.spotify.com/v1/episodes'
SHOWS_URL = 'https://api.spotify.com/v1/shows'
def get_episode_info(episode_id_str) -> tuple[Optional[str], Optional[str]]:
info = ZSpotify.invoke_url(f'{EPISODE_INFO_URL}/{episode_id_str}')
if ERROR in info:
return None, None
return sanitize_data(info[SHOW][NAME]), sanitize_data(info[NAME])
def get_show_episodes(show_id_str) -> list:
episodes = []
offset = 0
limit = 50
while True:
resp = ZSpotify.invoke_url_with_params(f'{SHOWS_URL}/{show_id_str}/episodes', limit=limit, offset=offset)
offset += limit
for episode in resp[ITEMS]:
episodes.append(episode[ID])
if len(resp[ITEMS]) < limit:
break
return episodes
def download_episode(episode_id) -> None:
podcast_name, episode_name = get_episode_info(episode_id)
extra_paths = podcast_name + '/'
if podcast_name is None:
print('### SKIPPING: (EPISODE NOT FOUND) ###')
else:
filename = podcast_name + ' - ' + episode_name
episode_id = EpisodeId.from_base62(episode_id)
stream = ZSpotify.get_content_stream(episode_id, ZSpotify.DOWNLOAD_QUALITY)
download_directory = os.path.dirname(__file__) + ZSpotify.get_config(ROOT_PODCAST_PATH) + extra_paths
create_download_directory(download_directory)
total_size = stream.input_stream.size
with open(download_directory + filename + MusicFormat.OGG.value,
'wb') as file, tqdm(
desc=filename,
total=total_size,
unit='B',
unit_scale=True,
unit_divisor=1024
) as bar:
for _ in range(int(total_size / ZSpotify.get_config(CHUNK_SIZE)) + 1):
bar.update(file.write(
stream.input_stream.stream().read(ZSpotify.get_config(CHUNK_SIZE))))
# convert_audio_format(ROOT_PODCAST_PATH +
# extra_paths + filename + '.ogg')

128
src/track.py Normal file
View File

@ -0,0 +1,128 @@
import os
import time
from typing import Any
from librespot.audio.decoders import AudioQuality
from librespot.metadata import TrackId
from pydub import AudioSegment
from tqdm import tqdm
from const import TRACKS, ALBUM, NAME, ITEMS, DISC_NUMBER, TRACK_NUMBER, IS_PLAYABLE, ARTISTS, IMAGES, URL, \
RELEASE_DATE, ID, TRACKS_URL, SAVED_TRACKS_URL, SPLIT_ALBUM_DISCS, ROOT_PATH, DOWNLOAD_FORMAT, CHUNK_SIZE, \
SKIP_EXISTING_FILES, ANTI_BAN_WAIT_TIME, OVERRIDE_AUTO_WAIT
from utils import sanitize_data, set_audio_tags, set_music_thumbnail, create_download_directory, \
MusicFormat
from zspotify import ZSpotify
def get_saved_tracks() -> list:
""" Returns user's saved tracks """
songs = []
offset = 0
limit = 50
while True:
resp = ZSpotify.invoke_url_with_params(SAVED_TRACKS_URL, limit=limit, offset=offset)
offset += limit
songs.extend(resp[ITEMS])
if len(resp[ITEMS]) < limit:
break
return songs
def get_song_info(song_id) -> tuple[list[str], str, str, Any, Any, Any, Any, Any, Any]:
""" Retrieves metadata for downloaded songs """
info = ZSpotify.invoke_url(f'{TRACKS_URL}?ids={song_id}&market=from_token')
artists = []
for data in info[TRACKS][0][ARTISTS]:
artists.append(sanitize_data(data[NAME]))
album_name = sanitize_data(info[TRACKS][0][ALBUM][NAME])
name = sanitize_data(info[TRACKS][0][NAME])
image_url = info[TRACKS][0][ALBUM][IMAGES][0][URL]
release_year = info[TRACKS][0][ALBUM][RELEASE_DATE].split('-')[0]
disc_number = info[TRACKS][0][DISC_NUMBER]
track_number = info[TRACKS][0][TRACK_NUMBER]
scraped_song_id = info[TRACKS][0][ID]
is_playable = info[TRACKS][0][IS_PLAYABLE]
return artists, album_name, name, image_url, release_year, disc_number, track_number, scraped_song_id, is_playable
# noinspection PyBroadException
def download_track(track_id: str, extra_paths='', prefix=False, prefix_value='', disable_progressbar=False) -> None:
""" Downloads raw song audio from Spotify """
download_directory = os.path.join(os.path.dirname(__file__), ZSpotify.get_config(ROOT_PATH), extra_paths)
try:
(artists, album_name, name, image_url, release_year, disc_number,
track_number, scraped_song_id, is_playable) = get_song_info(track_id)
song_name = artists[0] + ' - ' + name
if prefix:
song_name = f'{prefix_value.zfill(2)} - {song_name}' if prefix_value.isdigit(
) else f'{prefix_value} - {song_name}'
if ZSpotify.get_config(SPLIT_ALBUM_DISCS):
filename = os.path.join(download_directory, f'Disc {disc_number}',
f'{song_name}.{ZSpotify.get_config(DOWNLOAD_FORMAT)}')
else:
filename = os.path.join(download_directory,
f'{song_name}.{ZSpotify.get_config(DOWNLOAD_FORMAT)}')
except Exception:
print('### SKIPPING SONG - FAILED TO QUERY METADATA ###')
else:
try:
if not is_playable:
print('\n### SKIPPING:', song_name,
'(SONG IS UNAVAILABLE) ###')
else:
if os.path.isfile(filename) and os.path.getsize(filename) and ZSpotify.get_config(SKIP_EXISTING_FILES):
print('\n### SKIPPING:', song_name,
'(SONG ALREADY EXISTS) ###')
else:
if track_id != scraped_song_id:
track_id = scraped_song_id
track_id = TrackId.from_base62(track_id)
stream = ZSpotify.get_content_stream(track_id, ZSpotify.DOWNLOAD_QUALITY)
create_download_directory(download_directory)
total_size = stream.input_stream.size
with open(filename, 'wb') as file, tqdm(
desc=song_name,
total=total_size,
unit='B',
unit_scale=True,
unit_divisor=1024,
disable=disable_progressbar
) as p_bar:
for _ in range(int(total_size / ZSpotify.get_config(CHUNK_SIZE)) + 1):
p_bar.update(file.write(
stream.input_stream.stream().read(ZSpotify.get_config(CHUNK_SIZE))))
if ZSpotify.get_config(DOWNLOAD_FORMAT) == 'mp3':
convert_audio_format(filename)
set_audio_tags(filename, artists, name, album_name,
release_year, disc_number, track_number)
set_music_thumbnail(filename, image_url)
if not ZSpotify.get_config(OVERRIDE_AUTO_WAIT):
time.sleep(ZSpotify.get_config(ANTI_BAN_WAIT_TIME))
except Exception as e:
print('### SKIPPING:', song_name,
'(GENERAL DOWNLOAD ERROR) ###')
print(e)
if os.path.exists(filename):
os.remove(filename)
def convert_audio_format(filename) -> None:
""" Converts raw audio into playable mp3 """
# print('### CONVERTING TO ' + MUSIC_FORMAT.upper() + ' ###')
raw_audio = AudioSegment.from_file(filename, format=MusicFormat.OGG.value,
frame_rate=44100, channels=2, sample_width=2)
if ZSpotify.DOWNLOAD_QUALITY == AudioQuality.VERY_HIGH:
bitrate = '320k'
else:
bitrate = '160k'
raw_audio.export(filename, format=ZSpotify.get_config(DOWNLOAD_FORMAT), bitrate=bitrate)

180
src/utils.py Normal file
View File

@ -0,0 +1,180 @@
import os
import platform
import re
import time
from enum import Enum
import music_tag
import requests
from const import SANITIZE, ARTIST, TRACKTITLE, ALBUM, YEAR, DISCNUMBER, TRACKNUMBER, ARTWORK, \
WINDOWS_SYSTEM
class MusicFormat(str, Enum):
MP3 = 'mp3',
OGG = 'ogg',
def create_download_directory(download_path: str) -> None:
os.makedirs(download_path, exist_ok=True)
def wait(seconds: int = 3) -> None:
""" Pause for a set number of seconds """
for second in range(seconds)[::-1]:
print(f'\rWait for {second + 1} second(s)...', end='')
time.sleep(1)
def split_input(selection) -> list[str]:
""" Returns a list of inputted strings """
inputs = []
if '-' in selection:
for number in range(int(selection.split('-')[0]), int(selection.split('-')[1]) + 1):
inputs.append(number)
else:
selections = selection.split(',')
for i in selections:
inputs.append(i.strip())
return inputs
def splash() -> None:
""" Displays splash screen """
print("""
""")
def clear() -> None:
""" Clear the console window """
if platform.system() == WINDOWS_SYSTEM:
os.system('cls')
else:
os.system('clear')
def sanitize_data(value) -> str:
""" Returns given string with problematic removed """
for pattern in SANITIZE:
value = value.replace(pattern, '')
return value.replace('|', '-')
def set_audio_tags(filename, artists, name, album_name, release_year, disc_number, track_number) -> None:
""" sets music_tag metadata """
tags = music_tag.load_file(filename)
tags[ARTIST] = conv_artist_format(artists)
tags[TRACKTITLE] = name
tags[ALBUM] = album_name
tags[YEAR] = release_year
tags[DISCNUMBER] = disc_number
tags[TRACKNUMBER] = track_number
tags.save()
def conv_artist_format(artists) -> str:
""" Returns converted artist format """
return ', '.join(artists)
def set_music_thumbnail(filename, image_url) -> None:
""" Downloads cover artwork """
img = requests.get(image_url).content
tags = music_tag.load_file(filename)
tags[ARTWORK] = img
tags.save()
def regex_input_for_urls(search_input) -> tuple[str, str, str, str, str, str]:
""" Since many kinds of search may be passed at the command line, process them all here. """
track_uri_search = re.search(
r'^spotify:track:(?P<TrackID>[0-9a-zA-Z]{22})$', search_input)
track_url_search = re.search(
r'^(https?://)?open\.spotify\.com/track/(?P<TrackID>[0-9a-zA-Z]{22})(\?si=.+?)?$',
search_input,
)
album_uri_search = re.search(
r'^spotify:album:(?P<AlbumID>[0-9a-zA-Z]{22})$', search_input)
album_url_search = re.search(
r'^(https?://)?open\.spotify\.com/album/(?P<AlbumID>[0-9a-zA-Z]{22})(\?si=.+?)?$',
search_input,
)
playlist_uri_search = re.search(
r'^spotify:playlist:(?P<PlaylistID>[0-9a-zA-Z]{22})$', search_input)
playlist_url_search = re.search(
r'^(https?://)?open\.spotify\.com/playlist/(?P<PlaylistID>[0-9a-zA-Z]{22})(\?si=.+?)?$',
search_input,
)
episode_uri_search = re.search(
r'^spotify:episode:(?P<EpisodeID>[0-9a-zA-Z]{22})$', search_input)
episode_url_search = re.search(
r'^(https?://)?open\.spotify\.com/episode/(?P<EpisodeID>[0-9a-zA-Z]{22})(\?si=.+?)?$',
search_input,
)
show_uri_search = re.search(
r'^spotify:show:(?P<ShowID>[0-9a-zA-Z]{22})$', search_input)
show_url_search = re.search(
r'^(https?://)?open\.spotify\.com/show/(?P<ShowID>[0-9a-zA-Z]{22})(\?si=.+?)?$',
search_input,
)
artist_uri_search = re.search(
r'^spotify:artist:(?P<ArtistID>[0-9a-zA-Z]{22})$', search_input)
artist_url_search = re.search(
r'^(https?://)?open\.spotify\.com/artist/(?P<ArtistID>[0-9a-zA-Z]{22})(\?si=.+?)?$',
search_input,
)
if track_uri_search is not None or track_url_search is not None:
track_id_str = (track_uri_search
if track_uri_search is not None else
track_url_search).group('TrackID')
else:
track_id_str = None
if album_uri_search is not None or album_url_search is not None:
album_id_str = (album_uri_search
if album_uri_search is not None else
album_url_search).group('AlbumID')
else:
album_id_str = None
if playlist_uri_search is not None or playlist_url_search is not None:
playlist_id_str = (playlist_uri_search
if playlist_uri_search is not None else
playlist_url_search).group('PlaylistID')
else:
playlist_id_str = None
if episode_uri_search is not None or episode_url_search is not None:
episode_id_str = (episode_uri_search
if episode_uri_search is not None else
episode_url_search).group('EpisodeID')
else:
episode_id_str = None
if show_uri_search is not None or show_url_search is not None:
show_id_str = (show_uri_search
if show_uri_search is not None else
show_url_search).group('ShowID')
else:
show_id_str = None
if artist_uri_search is not None or artist_url_search is not None:
artist_id_str = (artist_uri_search
if artist_uri_search is not None else
artist_url_search).group('ArtistID')
else:
artist_id_str = None
return track_id_str, album_id_str, playlist_id_str, episode_id_str, show_id_str, artist_id_str

96
src/zspotify.py Normal file
View File

@ -0,0 +1,96 @@
#! /usr/bin/env python3
"""
ZSpotify
It's like youtube-dl, but for Spotify.
(Made by Deathmonger/Footsiefat - @doomslayer117:matrix.org)
"""
import json
import os
import os.path
from getpass import getpass
from typing import Any
import requests
from librespot.audio.decoders import VorbisOnlyAudioQuality
from librespot.core import Session
from const import CREDENTIALS_JSON, TYPE, \
PREMIUM, USER_READ_EMAIL, AUTHORIZATION, OFFSET, LIMIT, CONFIG_FILE_PATH, FORCE_PREMIUM, \
PLAYLIST_READ_PRIVATE
from utils import MusicFormat
class ZSpotify:
SESSION: Session = None
DOWNLOAD_QUALITY = None
CONFIG = {}
def __init__(self):
ZSpotify.load_config()
ZSpotify.login()
@classmethod
def login(cls):
""" Authenticates with Spotify and saves credentials to a file """
if os.path.isfile(CREDENTIALS_JSON):
try:
cls.SESSION = Session.Builder().stored_file().create()
return
except RuntimeError:
pass
while True:
user_name = ''
while len(user_name) == 0:
user_name = input('Username: ')
password = getpass()
try:
cls.SESSION = Session.Builder().user_pass(user_name, password).create()
return
except RuntimeError:
pass
@classmethod
def load_config(cls) -> None:
app_dir = os.path.dirname(__file__)
with open(os.path.join(app_dir, CONFIG_FILE_PATH), encoding='utf-8') as config_file:
cls.CONFIG = json.load(config_file)
@classmethod
def get_config(cls, key) -> Any:
return cls.CONFIG.get(key)
@classmethod
def get_content_stream(cls, content_id, quality):
return cls.SESSION.content_feeder().load(content_id, VorbisOnlyAudioQuality(quality), False, None)
@classmethod
def __get_auth_token(cls):
return cls.SESSION.tokens().get_token(USER_READ_EMAIL, PLAYLIST_READ_PRIVATE).access_token
@classmethod
def get_auth_header(cls):
return {
AUTHORIZATION: f'Bearer {cls.__get_auth_token()}'}
@classmethod
def get_auth_header_and_params(cls, limit, offset):
return {AUTHORIZATION: f'Bearer {cls.__get_auth_token()}'}, {LIMIT: limit, OFFSET: offset}
@classmethod
def invoke_url_with_params(cls, url, limit, offset, **kwargs):
headers, params = cls.get_auth_header_and_params(limit=limit, offset=offset)
params.update(kwargs)
return requests.get(url, headers=headers, params=params).json()
@classmethod
def invoke_url(cls, url):
headers = cls.get_auth_header()
return requests.get(url, headers=headers).json()
@classmethod
def check_premium(cls) -> bool:
""" If user has spotify premium return true """
return (cls.SESSION.get_user_attribute(TYPE) == PREMIUM) or cls.get_config(FORCE_PREMIUM)

View File

@ -1,12 +1,11 @@
{
"ROOT_PATH": "ZSpotify Music/",
"ROOT_PODCAST_PATH": "ZSpotify Podcasts/",
"ROOT_PATH": "../ZSpotify Music/",
"ROOT_PODCAST_PATH": "../ZSpotify Podcasts/",
"SKIP_EXISTING_FILES": true,
"MUSIC_FORMAT": "mp3",
"RAW_AUDIO_AS_IS": false,
"DOWNLOAD_FORMAT": "mp3",
"FORCE_PREMIUM": false,
"ANTI_BAN_WAIT_TIME": 1,
"OVERRIDE_AUTO_WAIT": false,
"CHUNK_SIZE": 50000,
"SPLIT_ALBUM_DISCS": false
}
}

View File

@ -1,885 +0,0 @@
#! /usr/bin/env python3
"""
ZSpotify
It's like youtube-dl, but for Spotify.
(Made by Deathmonger/Footsiefat - @doomslayer117:matrix.org)
"""
from getpass import getpass
import json
import os
import os.path
import platform
import re
import sys
import time
from librespot.audio.decoders import AudioQuality, VorbisOnlyAudioQuality
from librespot.core import Session
from librespot.metadata import TrackId, EpisodeId
import music_tag
from pydub import AudioSegment
import requests
from tqdm import tqdm
QUALITY = None
SESSION: Session = None
SANITIZE = ["\\", "/", ":", "*", "?", "'", "<", ">", "\""]
# user-customizable variables that adjust the core functionality of ZSpotify
with open("zs_config.json", encoding="utf-8") as config_file:
ZS_CONFIG = json.load(config_file)
# miscellaneous functions for general use
def clear():
""" Clear the console window """
if platform.system() == "Windows":
os.system("cls")
else:
os.system("clear")
def wait(seconds: int = 3):
""" Pause for a set number of seconds """
for i in range(seconds)[::-1]:
print(f"\rWait for {i + 1} second(s)...", end="")
time.sleep(1)
def sanitize_data(value):
""" Returns given string with problematic removed """
for i in SANITIZE:
value = value.replace(i, "")
return value.replace("|", "-")
def split_input(selection):
""" Returns a list of inputted strings """
inputs = []
if "-" in selection:
for number in range(int(selection.split("-")[0]), int(selection.split("-")[1]) + 1):
inputs.append(number)
else:
selections = selection.split(",")
for i in selections:
inputs.append(i.strip())
return inputs
def splash():
""" Displays splash screen """
print("""
""")
# two mains functions for logging in and doing client stuff
def login():
""" Authenticates with Spotify and saves credentials to a file """
global SESSION # pylint: disable=global-statement
if os.path.isfile("credentials.json"):
try:
SESSION = Session.Builder().stored_file().create()
return
except RuntimeError:
pass
while True:
user_name = input("Username: ")
password = getpass()
try:
SESSION = Session.Builder().user_pass(user_name, password).create()
return
except RuntimeError:
pass
def client(): # pylint: disable=too-many-branches,too-many-statements
""" Connects to spotify to perform query's and get songs to download """
global QUALITY # pylint: disable=global-statement
splash()
token = SESSION.tokens().get("user-read-email")
if check_premium():
print("[ DETECTED PREMIUM ACCOUNT - USING VERY_HIGH QUALITY ]\n\n")
QUALITY = AudioQuality.VERY_HIGH
else:
print("[ DETECTED FREE ACCOUNT - USING HIGH QUALITY ]\n\n")
QUALITY = AudioQuality.HIGH
while True:
if len(sys.argv) > 1:
if sys.argv[1] == "-p" or sys.argv[1] == "--playlist":
download_from_user_playlist()
elif sys.argv[1] == "-ls" or sys.argv[1] == "--liked-songs":
for song in get_saved_tracks(token):
if not song["track"]["name"]:
print(
"### SKIPPING: SONG DOES NOT EXISTS ON SPOTIFY ANYMORE ###")
else:
download_track(song["track"]["id"], "Liked Songs/")
print("\n")
else:
track_id_str, album_id_str, playlist_id_str, episode_id_str, show_id_str, artist_id_str = regex_input_for_urls(
sys.argv[1])
if track_id_str is not None:
download_track(track_id_str)
elif artist_id_str is not None:
download_artist_albums(artist_id_str)
elif album_id_str is not None:
download_album(album_id_str)
elif playlist_id_str is not None:
playlist_songs = get_playlist_songs(token, playlist_id_str)
name, _ = get_playlist_info(token, playlist_id_str)
for song in playlist_songs:
download_track(song["track"]["id"],
sanitize_data(name) + "/")
print("\n")
elif episode_id_str is not None:
download_episode(episode_id_str)
elif show_id_str is not None:
for episode in get_show_episodes(token, show_id_str):
download_episode(episode)
else:
search_text = input("Enter search or URL: ")
track_id_str, album_id_str, playlist_id_str, episode_id_str, show_id_str, artist_id_str = regex_input_for_urls(
search_text)
if track_id_str is not None:
download_track(track_id_str)
elif artist_id_str is not None:
download_artist_albums(artist_id_str)
elif album_id_str is not None:
download_album(album_id_str)
elif playlist_id_str is not None:
playlist_songs = get_playlist_songs(token, playlist_id_str)
name, _ = get_playlist_info(token, playlist_id_str)
for song in playlist_songs:
download_track(song["track"]["id"],
sanitize_data(name) + "/")
print("\n")
elif episode_id_str is not None:
download_episode(episode_id_str)
elif show_id_str is not None:
for episode in get_show_episodes(token, show_id_str):
download_episode(episode)
else:
search(search_text)
# wait()
def regex_input_for_urls(search_input): # pylint: disable=too-many-locals
""" Since many kinds of search may be passed at the command line, process them all here. """
track_uri_search = re.search(
r"^spotify:track:(?P<TrackID>[0-9a-zA-Z]{22})$", search_input)
track_url_search = re.search(
r"^(https?://)?open\.spotify\.com/track/(?P<TrackID>[0-9a-zA-Z]{22})(\?si=.+?)?$",
search_input,
)
album_uri_search = re.search(
r"^spotify:album:(?P<AlbumID>[0-9a-zA-Z]{22})$", search_input)
album_url_search = re.search(
r"^(https?://)?open\.spotify\.com/album/(?P<AlbumID>[0-9a-zA-Z]{22})(\?si=.+?)?$",
search_input,
)
playlist_uri_search = re.search(
r"^spotify:playlist:(?P<PlaylistID>[0-9a-zA-Z]{22})$", search_input)
playlist_url_search = re.search(
r"^(https?://)?open\.spotify\.com/playlist/(?P<PlaylistID>[0-9a-zA-Z]{22})(\?si=.+?)?$",
search_input,
)
episode_uri_search = re.search(
r"^spotify:episode:(?P<EpisodeID>[0-9a-zA-Z]{22})$", search_input)
episode_url_search = re.search(
r"^(https?://)?open\.spotify\.com/episode/(?P<EpisodeID>[0-9a-zA-Z]{22})(\?si=.+?)?$",
search_input,
)
show_uri_search = re.search(
r"^spotify:show:(?P<ShowID>[0-9a-zA-Z]{22})$", search_input)
show_url_search = re.search(
r"^(https?://)?open\.spotify\.com/show/(?P<ShowID>[0-9a-zA-Z]{22})(\?si=.+?)?$",
search_input,
)
artist_uri_search = re.search(
r"^spotify:artist:(?P<ArtistID>[0-9a-zA-Z]{22})$", search_input)
artist_url_search = re.search(
r"^(https?://)?open\.spotify\.com/artist/(?P<ArtistID>[0-9a-zA-Z]{22})(\?si=.+?)?$",
search_input,
)
if track_uri_search is not None or track_url_search is not None:
track_id_str = (track_uri_search
if track_uri_search is not None else
track_url_search).group("TrackID")
else:
track_id_str = None
if album_uri_search is not None or album_url_search is not None:
album_id_str = (album_uri_search
if album_uri_search is not None else
album_url_search).group("AlbumID")
else:
album_id_str = None
if playlist_uri_search is not None or playlist_url_search is not None:
playlist_id_str = (playlist_uri_search
if playlist_uri_search is not None else
playlist_url_search).group("PlaylistID")
else:
playlist_id_str = None
if episode_uri_search is not None or episode_url_search is not None:
episode_id_str = (episode_uri_search
if episode_uri_search is not None else
episode_url_search).group("EpisodeID")
else:
episode_id_str = None
if show_uri_search is not None or show_url_search is not None:
show_id_str = (show_uri_search
if show_uri_search is not None else
show_url_search).group("ShowID")
else:
show_id_str = None
if artist_uri_search is not None or artist_url_search is not None:
artist_id_str = (artist_uri_search
if artist_uri_search is not None else
artist_url_search).group("ArtistID")
else:
artist_id_str = None
return track_id_str, album_id_str, playlist_id_str, episode_id_str, show_id_str, artist_id_str
def get_episode_info(episode_id_str): # pylint: disable=missing-function-docstring
token = SESSION.tokens().get("user-read-email")
info = json.loads(requests.get(f"https://api.spotify.com/v1/episodes/{episode_id_str}",
headers={"Authorization": f"Bearer {token}"}).text)
if "error" in info:
return None, None
# print(info["images"][0]["url"])
return sanitize_data(info["show"]["name"]), sanitize_data(info["name"])
def get_show_episodes(access_token, show_id_str): # pylint: disable=missing-function-docstring
episodes = []
headers = {"Authorization": f"Bearer {access_token}"}
resp = requests.get(
f"https://api.spotify.com/v1/shows/{show_id_str}/episodes", headers=headers).json()
for episode in resp["items"]:
episodes.append(episode["id"])
return episodes
def download_episode(episode_id_str): # pylint: disable=missing-function-docstring
podcast_name, episode_name = get_episode_info(episode_id_str)
extra_paths = podcast_name + "/"
if podcast_name is None:
print("### SKIPPING: (EPISODE NOT FOUND) ###")
else:
filename = podcast_name + " - " + episode_name
episode_id = EpisodeId.from_base62(episode_id_str)
stream = SESSION.content_feeder().load(
episode_id, VorbisOnlyAudioQuality(QUALITY), False, None)
# print("### DOWNLOADING '" + podcast_name + " - " +
# episode_name + "' - THIS MAY TAKE A WHILE ###")
os.makedirs(ZS_CONFIG["ROOT_PODCAST_PATH"] +
extra_paths, exist_ok=True)
total_size = stream.input_stream.size
with open(ZS_CONFIG["ROOT_PODCAST_PATH"] + extra_paths + filename + ".wav", "wb") as file, tqdm(
desc=filename,
total=total_size,
unit="B",
unit_scale=True,
unit_divisor=1024
) as p_bar:
for _ in range(int(total_size / ZS_CONFIG["CHUNK_SIZE"]) + 1):
p_bar.update(file.write(
stream.input_stream.stream().read(ZS_CONFIG["CHUNK_SIZE"])))
# convert_audio_format(ZS_CONFIG["ROOT_PODCAST_PATH"] +
# extra_paths + filename + ".wav")
# related functions that do stuff with the spotify API
def search(search_term): # pylint: disable=too-many-locals,too-many-branches
""" Searches Spotify's API for relevant data """
token = SESSION.tokens().get("user-read-email")
params = {
"limit" : "10",
"q" : search_term,
"type" : set(),
}
# Block for parsing passed arguments
splits = search_term.split()
for split in splits:
index = splits.index(split)
if split[0] == "-" and len(split) > 1:
if len(splits)-1 == index:
raise IndexError("No parameters passed after option: {}\n".
format(split))
if split == "-l" or split == "-limit":
try:
int(splits[index+1])
except ValueError:
raise ValueError("Paramater passed after {} option must be an integer.\n".
format(split))
if int(splits[index+1]) > 50:
raise ValueError("Invalid limit passed. Max is 50.\n")
params["limit"] = splits[index+1]
if split == "-t" or split == "-type":
allowed_types = ["track", "playlist", "album", "artist"]
for i in range(index+1, len(splits)):
if splits[i][0] == "-":
break
if splits[i] not in allowed_types:
raise ValueError("Parameters passed after {} option must be from this list:\n{}".
format(split, '\n'.join(allowed_types)))
params["type"].add(splits[i])
if len(params["type"]) == 0:
params["type"] = {"track", "album", "playlist", "artist"}
# Clean search term
search_term_list = []
for split in splits:
if split[0] == "-":
break
search_term_list.append(split)
if not search_term_list:
raise ValueError("Invalid query.")
params["q"] = ' '.join(search_term_list)
resp = requests.get(
"https://api.spotify.com/v1/search",
{
"limit": params["limit"],
"offset": "0",
"q": params["q"],
"type": ",".join(params["type"])
},
headers={"Authorization": f"Bearer {token}"},
)
# print(resp.json())
enum = 1
dics = []
# add all returned tracks to dics
if "track" in params["type"]:
tracks = resp.json()["tracks"]["items"]
if len(tracks) > 0:
print("### TRACKS ###")
for track in tracks:
if track["explicit"]:
explicit = "[E]"
else:
explicit = ""
# collect needed data
dic = {
"id" : track["id"],
"name" : track["name"],
"artists/owner" : [artist["name"] for artist in track["artists"]],
"type" : "track",
}
dics.append(dic)
print("{}, {} {} | {}".format(
enum,
dic["name"],
explicit,
",".join(dic["artists/owner"]),
))
enum += 1
total_tracks = enum - 1
print("\n")
# free up memory
del tracks
else:
total_tracks = 0
if "album" in params["type"]:
albums = resp.json()["albums"]["items"]
if len(albums) > 0:
print("### ALBUMS ###")
for album in albums:
# collect needed data
dic = {
"id" : album["id"],
"name" : album["name"],
"artists/owner" : [artist["name"] for artist in album["artists"]],
"type" : "album",
}
dics.append(dic)
print("{}, {} | {}".format(
enum,
dic["name"],
",".join(dic["artists/owner"]),
))
enum += 1
total_albums = enum - total_tracks - 1
print("\n")
# free up memory
del albums
else:
total_albums = 0
if "playlist" in params["type"]:
playlists = resp.json()["playlists"]["items"]
if len(playlists) > 0:
print("### PLAYLISTS ###")
for playlist in playlists:
# collect needed data
dic = {
"id" : playlist["id"],
"name" : playlist["name"],
"artists/owner" : [playlist["owner"]["display_name"]],
"type" : "playlist",
}
dics.append(dic)
print("{}, {} | {}".format(
enum,
dic["name"],
",".join(dic['artists/owner']),
))
enum += 1
total_playlists = enum - total_tracks - total_albums - 1
print("\n")
# free up memory
del playlists
else:
total_playlists = 0
if "artist" in params["type"]:
artists = resp.json()["artists"]["items"]
if len(artists) > 0:
print("### ARTISTS ###")
for artist in artists:
# collect needed data
dic = {
"id" : artist["id"],
"name" : artist["name"],
"type" : "artist",
}
dics.append(dic)
print("{}, {}".format(
enum,
dic["name"],
))
enum += 1
total_artists = enum - total_tracks - total_albums - total_playlists - 1
print("\n")
else:
total_artists = 0
if total_tracks + total_albums + total_playlists + total_artists == 0:
print("NO RESULTS FOUND - EXITING...")
else:
selection = str(input("SELECT ITEM(S) BY ID: "))
inputs = split_input(selection)
for pos in inputs:
position = int(pos)
for dic in dics:
# find dictionary
print_pos = dics.index(dic) + 1
if print_pos == position:
# if request is for track
if dic["type"] == "track":
download_track(dic["id"])
# if request is for album
if dic["type"] == "album":
download_album(dic["id"])
# if request is for artist
if dic["type"] == "artist":
download_artist_albums(dic["id"])
# if request is for playlist
if dic["type"] == "playlist":
playlist_songs = get_playlist_songs(token, dic["id"])
for song in playlist_songs:
if song["track"]["id"] is not None:
download_track(song["track"]["id"],
sanitize_data(dic["name"].strip()) + "/")
print("\n")
def get_song_info(song_id):
""" Retrieves metadata for downloaded songs """
token = SESSION.tokens().get("user-read-email")
info = json.loads(requests.get("https://api.spotify.com/v1/tracks?ids=" + song_id +
"&market=from_token", headers={"Authorization": f"Bearer {token}"}).text)
artists = []
for data in info["tracks"][0]["artists"]:
artists.append(sanitize_data(data["name"]))
album_name = sanitize_data(info["tracks"][0]["album"]["name"])
name = sanitize_data(info["tracks"][0]["name"])
image_url = info["tracks"][0]["album"]["images"][0]["url"]
release_year = info["tracks"][0]["album"]["release_date"].split("-")[0]
disc_number = info["tracks"][0]["disc_number"]
track_number = info["tracks"][0]["track_number"]
scraped_song_id = info["tracks"][0]["id"]
is_playable = info["tracks"][0]["is_playable"]
return artists, album_name, name, image_url, release_year, disc_number, track_number, scraped_song_id, is_playable
def check_premium():
""" If user has spotify premium return true """
return bool((SESSION.get_user_attribute("type") == "premium") or ZS_CONFIG["FORCE_PREMIUM"])
# Functions directly related to modifying the downloaded audio and its metadata
def convert_audio_format(filename):
""" Converts raw audio into playable mp3 or ogg vorbis """
# print("### CONVERTING TO " + ZS_CONFIG["MUSIC_FORMAT"].upper() + " ###")
raw_audio = AudioSegment.from_file(filename, format="ogg",
frame_rate=44100, channels=2, sample_width=2)
if QUALITY == AudioQuality.VERY_HIGH:
bitrate = "320k"
else:
bitrate = "160k"
raw_audio.export(
filename, format=ZS_CONFIG["MUSIC_FORMAT"], bitrate=bitrate)
def set_audio_tags(filename, artists, name, album_name, release_year, disc_number, track_number): # pylint: disable=too-many-arguments
""" sets music_tag metadata """
# print("### SETTING MUSIC TAGS ###")
tags = music_tag.load_file(filename)
tags["artist"] = conv_artist_format(artists)
tags["tracktitle"] = name
tags["album"] = album_name
tags["year"] = release_year
tags["discnumber"] = disc_number
tags["tracknumber"] = track_number
tags.save()
def set_music_thumbnail(filename, image_url):
""" Downloads cover artwork """
# print("### SETTING THUMBNAIL ###")
img = requests.get(image_url).content
tags = music_tag.load_file(filename)
tags["artwork"] = img
tags.save()
def conv_artist_format(artists):
""" Returns converted artist format """
formatted = ""
for artist in artists:
formatted += artist + ", "
return formatted[:-2]
# Extra functions directly related to spotify playlists
def get_all_playlists(access_token):
""" Returns list of users playlists """
playlists = []
limit = 50
offset = 0
while True:
headers = {"Authorization": f"Bearer {access_token}"}
params = {"limit": limit, "offset": offset}
resp = requests.get("https://api.spotify.com/v1/me/playlists",
headers=headers, params=params).json()
offset += limit
playlists.extend(resp["items"])
if len(resp["items"]) < limit:
break
return playlists
def get_playlist_songs(access_token, playlist_id):
""" returns list of songs in a playlist """
songs = []
offset = 0
limit = 100
while True:
headers = {"Authorization": f"Bearer {access_token}"}
params = {"limit": limit, "offset": offset}
resp = requests.get(
f"https://api.spotify.com/v1/playlists/{playlist_id}/tracks", headers=headers, params=params).json()
offset += limit
songs.extend(resp["items"])
if len(resp["items"]) < limit:
break
return songs
def get_playlist_info(access_token, playlist_id):
""" Returns information scraped from playlist """
headers = {"Authorization": f"Bearer {access_token}"}
resp = requests.get(
f"https://api.spotify.com/v1/playlists/{playlist_id}?fields=name,owner(display_name)&market=from_token",
headers=headers).json()
return resp["name"].strip(), resp["owner"]["display_name"].strip()
# Extra functions directly related to spotify albums
def get_album_tracks(access_token, album_id):
""" Returns album tracklist """
songs = []
offset = 0
limit = 50
while True:
headers = {"Authorization": f"Bearer {access_token}"}
params = {"limit": limit, "offset": offset}
resp = requests.get(
f"https://api.spotify.com/v1/albums/{album_id}/tracks", headers=headers, params=params).json()
offset += limit
songs.extend(resp["items"])
if len(resp["items"]) < limit:
break
return songs
def get_album_name(access_token, album_id):
""" Returns album name """
headers = {"Authorization": f"Bearer {access_token}"}
resp = requests.get(
f"https://api.spotify.com/v1/albums/{album_id}", headers=headers).json()
return resp["artists"][0]["name"], sanitize_data(resp["name"])
# Extra functions directly related to spotify artists
def get_artist_albums(access_token, artist_id):
""" Returns artist's albums """
headers = {"Authorization": f"Bearer {access_token}"}
resp = requests.get(
f"https://api.spotify.com/v1/artists/{artist_id}/albums", headers=headers).json()
# Return a list each album's id
return [resp["items"][i]["id"] for i in range(len(resp["items"]))]
# Extra functions directly related to our saved tracks
def get_saved_tracks(access_token):
""" Returns user's saved tracks """
songs = []
offset = 0
limit = 50
while True:
headers = {"Authorization": f"Bearer {access_token}"}
params = {"limit": limit, "offset": offset}
resp = requests.get("https://api.spotify.com/v1/me/tracks",
headers=headers, params=params).json()
offset += limit
songs.extend(resp["items"])
if len(resp["items"]) < limit:
break
return songs
# Functions directly related to downloading stuff
def download_track(track_id_str: str, extra_paths="", prefix=False, prefix_value="", disable_progressbar=False): # pylint: disable=too-many-locals,too-many-branches
""" Downloads raw song audio from Spotify """
try:
artists, album_name, name, image_url, release_year, disc_number, track_number, scraped_song_id, is_playable = get_song_info(
track_id_str)
song_name = artists[0] + " - " + name
if prefix:
song_name = f"{prefix_value.zfill(2)} - {song_name}" if prefix_value.isdigit(
) else f"{prefix_value} - {song_name}"
if ZS_CONFIG["SPLIT_ALBUM_DISCS"]:
filename = os.path.join(ZS_CONFIG["ROOT_PATH"], extra_paths, "Disc " + str(
disc_number) + "/" + song_name + "." + ZS_CONFIG["MUSIC_FORMAT"])
else:
filename = os.path.join(ZS_CONFIG["ROOT_PATH"], extra_paths,
song_name + "." + ZS_CONFIG["MUSIC_FORMAT"])
except Exception as err: # pylint: disable=broad-except,unused-variable
print("### SKIPPING SONG - FAILED TO QUERY METADATA ###")
# print(err)
else:
try:
if not is_playable:
print("### SKIPPING:", song_name,
"(SONG IS UNAVAILABLE) ###")
else:
if os.path.isfile(filename) and os.path.getsize(filename) and ZS_CONFIG["SKIP_EXISTING_FILES"]:
print("### SKIPPING:", song_name,
"(SONG ALREADY EXISTS) ###")
else:
if track_id_str != scraped_song_id:
track_id_str = scraped_song_id
track_id = TrackId.from_base62(track_id_str)
# print("### FOUND SONG:", song_name, " ###")
stream = SESSION.content_feeder().load(
track_id, VorbisOnlyAudioQuality(QUALITY), False, None)
# print("### DOWNLOADING RAW AUDIO ###")
if ZS_CONFIG["SPLIT_ALBUM_DISCS"]:
os.makedirs(
ZS_CONFIG["ROOT_PATH"] + extra_paths + "/Disc " + str(disc_number) + "/", exist_ok=True)
else:
os.makedirs(ZS_CONFIG["ROOT_PATH"] +
extra_paths, exist_ok=True)
total_size = stream.input_stream.size
with open(filename, "wb") as file, tqdm(
desc=song_name,
total=total_size,
unit="B",
unit_scale=True,
unit_divisor=1024,
disable=disable_progressbar
) as p_bar:
for _ in range(int(total_size / ZS_CONFIG["CHUNK_SIZE"]) + 1):
p_bar.update(file.write(
stream.input_stream.stream().read(ZS_CONFIG["CHUNK_SIZE"])))
if not ZS_CONFIG["RAW_AUDIO_AS_IS"]:
convert_audio_format(filename)
set_audio_tags(filename, artists, name, album_name,
release_year, disc_number, track_number)
set_music_thumbnail(filename, image_url)
if not ZS_CONFIG["OVERRIDE_AUTO_WAIT"]:
time.sleep(ZS_CONFIG["ANTI_BAN_WAIT_TIME"])
except Exception: # pylint: disable=broad-except
print("### SKIPPING:", song_name,
"(GENERAL DOWNLOAD ERROR) ###")
if os.path.exists(filename):
os.remove(filename)
def download_album(album):
""" Downloads songs from an album """
token = SESSION.tokens().get("user-read-email")
artist, album_name = get_album_name(token, album)
tracks = get_album_tracks(token, album)
for num, track in tqdm(enumerate(tracks, start=1), unit_scale=True, unit="Song", total=len(tracks)):
download_track(track["id"], f"{artist}/{album_name}",
prefix=True, prefix_value=str(num), disable_progressbar=True)
def download_artist_albums(artist):
""" Downloads albums of an artist """
token = SESSION.tokens().get("user-read-email")
albums = get_artist_albums(token, artist)
for album_id in albums:
download_album(album_id)
def download_playlist(playlists, playlist_choice):
"""Downloads all the songs from a playlist"""
token = SESSION.tokens().get("user-read-email")
playlist_songs = get_playlist_songs(
token, playlists[int(playlist_choice) - 1]["id"])
for song in playlist_songs:
if song["track"]["id"] is not None:
download_track(song["track"]["id"], sanitize_data(
playlists[int(playlist_choice) - 1]["name"].strip()) + "/")
print("\n")
def download_from_user_playlist():
""" Select which playlist(s) to download """
token = SESSION.tokens().get("user-read-email")
playlists = get_all_playlists(token)
count = 1
for playlist in playlists:
print(str(count) + ": " + playlist["name"].strip())
count += 1
print("\n> SELECT A PLAYLIST BY ID")
print("> SELECT A RANGE BY ADDING A DASH BETWEEN BOTH ID's")
print("> For example, typing 10 to get one playlist or 10-20 to get\nevery playlist from 10-20 (inclusive)\n")
playlist_choices = input("ID(s): ").split("-")
if len(playlist_choices) == 1:
download_playlist(playlists, playlist_choices[0])
else:
start = int(playlist_choices[0])
end = int(playlist_choices[1]) + 1
print(f"Downloading from {start} to {end}...")
for playlist in range(start, end):
download_playlist(playlists, playlist)
print("\n**All playlists have been downloaded**\n")
# Core functions here
def check_raw(): # pylint: disable=missing-function-docstring
if ZS_CONFIG["RAW_AUDIO_AS_IS"]:
ZS_CONFIG["MUSIC_FORMAT"] = "wav"
def main():
""" Main function """
check_raw()
login()
client()
if __name__ == "__main__":
main()