Revert "added m3u playlist file creation"

This commit is contained in:
Footsiefat 2021-10-25 18:36:17 +13:00 committed by GitHub
parent 841df577a5
commit afeb84de39
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 346 additions and 284 deletions

View File

@ -1,4 +1,3 @@
"""It's provides functions for downloading the albums"""
from tqdm import tqdm from tqdm import tqdm
from const import ITEMS, ARTISTS, NAME, ID from const import ITEMS, ARTISTS, NAME, ID
@ -17,8 +16,7 @@ def get_album_tracks(album_id):
limit = 50 limit = 50
while True: while True:
resp = ZSpotify.invoke_url_with_params(f'{ALBUM_URL}/{album_id}/tracks', resp = ZSpotify.invoke_url_with_params(f'{ALBUM_URL}/{album_id}/tracks', limit=limit, offset=offset)
limit=limit, offset=offset)
offset += limit offset += limit
songs.extend(resp[ITEMS]) songs.extend(resp[ITEMS])
if len(resp[ITEMS]) < limit: if len(resp[ITEMS]) < limit:
@ -50,10 +48,9 @@ def download_album(album):
""" Downloads songs from an album """ """ Downloads songs from an album """
artist, album_name = get_album_name(album) artist, album_name = get_album_name(album)
tracks = get_album_tracks(album) tracks = get_album_tracks(album)
for album_number, track in tqdm(enumerate(tracks, start=1), unit_scale=True, for n, track in tqdm(enumerate(tracks, start=1), unit_scale=True, unit='Song', total=len(tracks)):
unit='Song', total=len(tracks)):
download_track(track[ID], f'{artist}/{album_name}', download_track(track[ID], f'{artist}/{album_name}',
prefix=True, prefix_value=str(album_number), disable_progressbar=True) prefix=True, prefix_value=str(n), disable_progressbar=True)
def download_artist_albums(artist): def download_artist_albums(artist):

View File

@ -1,17 +1,15 @@
"""Entrypoint of ZSpotify app. It provides functions for searching"""
import sys import sys
from librespot.audio.decoders import AudioQuality from librespot.audio.decoders import AudioQuality
from tabulate import tabulate from tabulate import tabulate
from album import download_album, download_artist_albums from album import download_album, download_artist_albums
from const import TRACK, NAME, ID, ARTISTS, ITEMS, TRACKS, EXPLICIT, ALBUMS, OWNER, \ from const import TRACK, NAME, ID, ARTIST, ARTISTS, ITEMS, TRACKS, EXPLICIT, ALBUM, ALBUMS, \
PLAYLISTS, DISPLAY_NAME, LIMIT, OFFSET, TYPE, S_NO, ALBUM OWNER, PLAYLIST, PLAYLISTS, DISPLAY_NAME
from playlist import download_from_user_playlist, download_playlist, \ from playlist import get_playlist_songs, get_playlist_info, download_from_user_playlist, download_playlist
download_playlist_with_id
from podcast import download_episode, get_show_episodes from podcast import download_episode, get_show_episodes
from track import download_track, get_saved_tracks from track import download_track, get_saved_tracks
from utils import splash, split_input, regex_input_for_urls from utils import sanitize_data, splash, split_input, regex_input_for_urls
from zspotify import ZSpotify from zspotify import ZSpotify
SEARCH_URL = 'https://api.spotify.com/v1/search' SEARCH_URL = 'https://api.spotify.com/v1/search'
@ -29,121 +27,224 @@ def client() -> None:
print('[ DETECTED FREE ACCOUNT - USING HIGH QUALITY ]\n\n') print('[ DETECTED FREE ACCOUNT - USING HIGH QUALITY ]\n\n')
ZSpotify.DOWNLOAD_QUALITY = AudioQuality.HIGH ZSpotify.DOWNLOAD_QUALITY = AudioQuality.HIGH
while True: if len(sys.argv) > 1:
if len(sys.argv) > 1: if sys.argv[1] == '-p' or sys.argv[1] == '--playlist':
process_args_input() download_from_user_playlist()
elif sys.argv[1] == '-ls' or sys.argv[1] == '--liked-songs':
for song in get_saved_tracks():
if not song[TRACK][NAME]:
print(
'### SKIPPING: SONG DOES NOT EXIST ON SPOTIFY ANYMORE ###')
else:
download_track(song[TRACK][ID], 'Liked Songs/')
print('\n')
else: else:
search_text = '' track_id, album_id, playlist_id, episode_id, show_id, artist_id = regex_input_for_urls(
while len(search_text) == 0: sys.argv[1])
search_text = input('Enter search or URL: ')
process_url_input(search_text, call_search=True)
# wait()
if track_id is not None:
download_track(track_id)
elif artist_id is not None:
download_artist_albums(artist_id)
elif album_id is not None:
download_album(album_id)
elif playlist_id is not None:
playlist_songs = get_playlist_songs(playlist_id)
name, _ = get_playlist_info(playlist_id)
for song in playlist_songs:
download_track(song[TRACK][ID],
sanitize_data(name) + '/')
print('\n')
elif episode_id is not None:
download_episode(episode_id)
elif show_id is not None:
for episode in get_show_episodes(show_id):
download_episode(episode)
def process_args_input():
"""
process the sys args
"""
if sys.argv[1] == '-p' or sys.argv[1] == '--playlist':
download_from_user_playlist()
elif sys.argv[1] == '-ls' or sys.argv[1] == '--liked-songs':
for song in get_saved_tracks():
if not song[TRACK][NAME]:
print('### SKIPPING: SONG DOES NOT EXISTS ON SPOTIFY ANYMORE ###')
else:
download_track(song[TRACK][ID], 'Liked Songs/')
print('\n')
else: else:
process_url_input(sys.argv[1]) search_text = ''
while len(search_text) == 0:
search_text = input('Enter search or URL: ')
track_id, album_id, playlist_id, episode_id, show_id, artist_id = regex_input_for_urls(
search_text)
def process_url_input(url, call_search=False): if track_id is not None:
""" download_track(track_id)
process the input and calls appropriate download method elif artist_id is not None:
@param url: input url download_artist_albums(artist_id)
@param call_search: boolean variable to notify calling search method elif album_id is not None:
""" download_album(album_id)
track_id, album_id, playlist_id, episode_id, show_id, artist_id = regex_input_for_urls(url) elif playlist_id is not None:
playlist_songs = get_playlist_songs(playlist_id)
if track_id: name, _ = get_playlist_info(playlist_id)
download_track(track_id) for song in playlist_songs:
elif artist_id: download_track(song[TRACK][ID], sanitize_data(name) + '/')
download_artist_albums(artist_id) print('\n')
elif album_id: elif episode_id is not None:
download_album(album_id) download_episode(episode_id)
elif playlist_id: elif show_id is not None:
download_playlist_with_id(playlist_id) for episode in get_show_episodes(show_id):
elif episode_id: download_episode(episode)
download_episode(episode_id) else:
elif show_id: search(search_text)
for episode in get_show_episodes(show_id):
download_episode(episode)
elif call_search:
search(url)
def search(search_term): def search(search_term):
""" Searches Spotify's API for relevant data """ """ Searches Spotify's API for relevant data """
params = {LIMIT: '10', OFFSET: '0', 'q': search_term, TYPE: 'track,album,artist,playlist'} params = {'limit': '10',
'offset': '0',
'q': search_term,
'type': 'track,album,artist,playlist'}
# Parse args
splits = search_term.split()
for split in splits:
index = splits.index(split)
if split[0] == '-' and len(split) > 1:
if len(splits)-1 == index:
raise IndexError('No parameters passed after option: {}\n'.
format(split))
if split == '-l' or split == '-limit':
try:
int(splits[index+1])
except ValueError:
raise ValueError('Paramater passed after {} option must be an integer.\n'.
format(split))
if int(splits[index+1]) > 50:
raise ValueError('Invalid limit passed. Max is 50.\n')
params['limit'] = splits[index+1]
if split == '-t' or split == '-type':
allowed_types = ['track', 'playlist', 'album', 'artist']
passed_types = []
for i in range(index+1, len(splits)):
if splits[i][0] == '-':
break
if splits[i] not in allowed_types:
raise ValueError('Parameters passed after {} option must be from this list:\n{}'.
format(split, '\n'.join(allowed_types)))
passed_types.append(splits[i])
params['type'] = ','.join(passed_types)
if len(params['type']) == 0:
params['type'] = 'track,album,artist,playlist'
# Clean search term
search_term_list = []
for split in splits:
if split[0] == "-":
break
search_term_list.append(split)
if not search_term_list:
raise ValueError("Invalid query.")
params["q"] = ' '.join(search_term_list)
resp = ZSpotify.invoke_url_with_params(SEARCH_URL, **params) resp = ZSpotify.invoke_url_with_params(SEARCH_URL, **params)
total_tracks = total_albums = total_artists = 0
counter = 1 counter = 1
tracks = resp[TRACKS][ITEMS] dics = []
if len(tracks) > 0:
print('### TRACKS ###')
track_data = []
for track in tracks:
explicit = '[E]' if track[EXPLICIT] else ''
track_data.append([counter, f'{track[NAME]} {explicit}',
','.join([artist[NAME] for artist in track[ARTISTS]])])
counter += 1
total_tracks = counter - 1
print(tabulate(track_data, headers=[S_NO, NAME.title(), ARTISTS.title()],
tablefmt='pretty'))
print('\n')
albums = resp[ALBUMS][ITEMS] total_tracks = 0
if len(albums) > 0: if TRACK in params['type'].split(','):
print('### ALBUMS ###') tracks = resp[TRACKS][ITEMS]
album_data = [] if len(tracks) > 0:
for album in albums: print('### TRACKS ###')
album_data.append([counter, album[NAME], ','.join([artist[NAME] track_data = []
for artist in album[ARTISTS]])]) for track in tracks:
counter += 1 if track[EXPLICIT]:
total_albums = counter - total_tracks - 1 explicit = '[E]'
print(tabulate(album_data, headers=[S_NO, ALBUM.title(), ARTISTS.title()], else:
tablefmt='pretty')) explicit = ''
print('\n')
artists = resp[ARTISTS][ITEMS] track_data.append([counter, f'{track[NAME]} {explicit}',
if len(artists) > 0: ','.join([artist[NAME] for artist in track[ARTISTS]])])
print('### ARTISTS ###') dics.append({
artist_data = [] ID: track[ID],
for artist in artists: NAME: track[NAME],
artist_data.append([counter, artist[NAME]]) 'type': TRACK,
counter += 1 })
total_artists = counter - total_tracks - total_albums - 1
print(tabulate(artist_data, headers=[S_NO, NAME.title()], tablefmt='pretty'))
print('\n')
playlists = resp[PLAYLISTS][ITEMS] counter += 1
print('### PLAYLISTS ###') total_tracks = counter - 1
playlist_data = [] print(tabulate(track_data, headers=[
for playlist in playlists: 'S.NO', 'Name', 'Artists'], tablefmt='pretty'))
playlist_data.append([counter, playlist[NAME], playlist[OWNER][DISPLAY_NAME]]) print('\n')
counter += 1 del tracks
print(tabulate(playlist_data, headers=[S_NO, NAME.title(), OWNER.title()], tablefmt='pretty')) del track_data
print('\n')
perform_action(tracks, albums, playlists, artists, total_tracks, total_albums, total_artists)
total_albums = 0
if ALBUM in params['type'].split(','):
albums = resp[ALBUMS][ITEMS]
if len(albums) > 0:
print('### ALBUMS ###')
album_data = []
for album in albums:
album_data.append([counter, album[NAME],
','.join([artist[NAME] for artist in album[ARTISTS]])])
dics.append({
ID: album[ID],
NAME: album[NAME],
'type': ALBUM,
})
def perform_action(tracks: list, albums: list, playlists: list, artists: list, counter += 1
total_tracks: int, total_albums: int, total_artists: int): total_albums = counter - total_tracks - 1
""" print(tabulate(album_data, headers=[
process and downloads the user selection 'S.NO', 'Album', 'Artists'], tablefmt='pretty'))
""" print('\n')
if len(tracks) + len(albums) + len(playlists) == 0: del albums
del album_data
total_artists = 0
if ARTIST in params['type'].split(','):
artists = resp[ARTISTS][ITEMS]
if len(artists) > 0:
print('### ARTISTS ###')
artist_data = []
for artist in artists:
artist_data.append([counter, artist[NAME]])
dics.append({
ID: artist[ID],
NAME: artist[NAME],
'type': ARTIST,
})
counter += 1
total_artists = counter - total_tracks - total_albums - 1
print(tabulate(artist_data, headers=[
'S.NO', 'Name'], tablefmt='pretty'))
print('\n')
del artists
del artist_data
total_playlists = 0
if PLAYLIST in params['type'].split(','):
playlists = resp[PLAYLISTS][ITEMS]
if len(playlists) > 0:
print('### PLAYLISTS ###')
playlist_data = []
for playlist in playlists:
playlist_data.append(
[counter, playlist[NAME], playlist[OWNER][DISPLAY_NAME]])
dics.append({
ID: playlist[ID],
NAME: playlist[NAME],
'type': PLAYLIST,
})
counter += 1
total_playlists = counter - total_artists - total_tracks - total_albums - 1
print(tabulate(playlist_data, headers=[
'S.NO', 'Name', 'Owner'], tablefmt='pretty'))
print('\n')
del playlists
del playlist_data
if total_tracks + total_albums + total_artists + total_playlists == 0:
print('NO RESULTS FOUND - EXITING...') print('NO RESULTS FOUND - EXITING...')
else: else:
selection = '' selection = ''
@ -152,13 +253,14 @@ def perform_action(tracks: list, albums: list, playlists: list, artists: list,
inputs = split_input(selection) inputs = split_input(selection)
for pos in inputs: for pos in inputs:
position = int(pos) position = int(pos)
if position <= total_tracks: for dic in dics:
track_id = tracks[position - 1][ID] print_pos = dics.index(dic) + 1
download_track(track_id) if print_pos == position:
elif position <= total_albums + total_tracks: if dic['type'] == TRACK:
download_album(albums[position - total_tracks - 1][ID]) download_track(dic[ID])
elif position <= total_artists + total_tracks + total_albums: elif dic['type'] == ALBUM:
download_artist_albums(artists[position - total_tracks - total_albums - 1][ID]) download_album(dic[ID])
else: elif dic['type'] == ARTIST:
download_playlist(playlists, position - total_tracks - total_albums - total_artists) download_artist_albums(dic[ID])
else:
download_playlist(dic)

View File

@ -1,4 +1,3 @@
""" provides commonly used string across different modules"""
SANITIZE = ('\\', '/', ':', '*', '?', '\'', '<', '>', '"') SANITIZE = ('\\', '/', ':', '*', '?', '\'', '<', '>', '"')
SAVED_TRACKS_URL = 'https://api.spotify.com/v1/me/tracks' SAVED_TRACKS_URL = 'https://api.spotify.com/v1/me/tracks'
@ -97,21 +96,6 @@ CHUNK_SIZE = 'CHUNK_SIZE'
SPLIT_ALBUM_DISCS = 'SPLIT_ALBUM_DISCS' SPLIT_ALBUM_DISCS = 'SPLIT_ALBUM_DISCS'
DURATION_MS = 'duration_ms'
ARTIST_ID = 'ArtistID'
SHOW_ID = 'ShowID'
EPISODE_ID = 'EpisodeID'
PLAYLIST_ID = 'PlaylistID'
ALBUM_ID = 'AlbumID'
TRACK_ID = 'TrackID'
S_NO = 'S.NO'
CONFIG_DEFAULT_SETTINGS = { CONFIG_DEFAULT_SETTINGS = {
'ROOT_PATH': '../ZSpotify Music/', 'ROOT_PATH': '../ZSpotify Music/',
'ROOT_PODCAST_PATH': '../ZSpotify Podcasts/', 'ROOT_PODCAST_PATH': '../ZSpotify Podcasts/',

View File

@ -32,8 +32,7 @@ def get_playlist_songs(playlist_id):
limit = 100 limit = 100
while True: while True:
resp = ZSpotify.invoke_url_with_params(f'{PLAYLISTS_URL}/{playlist_id}/tracks', resp = ZSpotify.invoke_url_with_params(f'{PLAYLISTS_URL}/{playlist_id}/tracks', limit=limit, offset=offset)
limit=limit, offset=offset)
offset += limit offset += limit
songs.extend(resp[ITEMS]) songs.extend(resp[ITEMS])
if len(resp[ITEMS]) < limit: if len(resp[ITEMS]) < limit:
@ -48,21 +47,17 @@ def get_playlist_info(playlist_id):
return resp['name'].strip(), resp['owner']['display_name'].strip() return resp['name'].strip(), resp['owner']['display_name'].strip()
def download_playlist_with_id(playlist_id): def download_playlist(playlist):
name, _ = get_playlist_info(playlist_id) """Downloads all the songs from a playlist"""
playlist_songs = [song for song in get_playlist_songs(playlist_id) if song[TRACK][ID]]
playlist_songs = [song for song in get_playlist_songs(playlist[ID]) if song[TRACK][ID]]
p_bar = tqdm(playlist_songs, unit='song', total=len(playlist_songs), unit_scale=True) p_bar = tqdm(playlist_songs, unit='song', total=len(playlist_songs), unit_scale=True)
for song in p_bar: for song in p_bar:
download_track(song[TRACK][ID], sanitize_data(name.strip()) + '/', disable_progressbar=True, download_track(song[TRACK][ID], sanitize_data(playlist[NAME].strip()) + '/',
create_m3u_file=True) disable_progressbar=True)
p_bar.set_description(song[TRACK][NAME]) p_bar.set_description(song[TRACK][NAME])
def download_playlist(playlists, playlist_number):
"""Downloads all the songs from a playlist"""
download_playlist_with_id(playlists[int(playlist_number) - 1][ID])
def download_from_user_playlist(): def download_from_user_playlist():
""" Select which playlist(s) to download """ """ Select which playlist(s) to download """
playlists = get_all_playlists() playlists = get_all_playlists()

View File

@ -1,6 +1,7 @@
import os import os
from typing import Optional, Tuple from typing import Optional, Tuple
from librespot.audio.decoders import VorbisOnlyAudioQuality
from librespot.metadata import EpisodeId from librespot.metadata import EpisodeId
from tqdm import tqdm from tqdm import tqdm
@ -8,24 +9,25 @@ from const import NAME, ERROR, SHOW, ITEMS, ID, ROOT_PODCAST_PATH, CHUNK_SIZE
from utils import sanitize_data, create_download_directory, MusicFormat from utils import sanitize_data, create_download_directory, MusicFormat
from zspotify import ZSpotify from zspotify import ZSpotify
EPISODE_INFO_URL = 'https://api.spotify.com/v1/episodes' EPISODE_INFO_URL = 'https://api.spotify.com/v1/episodes'
SHOWS_URL = 'https://api.spotify.com/v1/shows' SHOWS_URL = 'https://api.spotify.com/v1/shows'
def get_episode_info(episode_id) -> Tuple[Optional[str], Optional[str]]: def get_episode_info(episode_id_str) -> Tuple[Optional[str], Optional[str]]:
info = ZSpotify.invoke_url(f'{EPISODE_INFO_URL}/{episode_id}') info = ZSpotify.invoke_url(f'{EPISODE_INFO_URL}/{episode_id_str}')
if ERROR in info: if ERROR in info:
return None, None return None, None
return sanitize_data(info[SHOW][NAME]), sanitize_data(info[NAME]) return sanitize_data(info[SHOW][NAME]), sanitize_data(info[NAME])
def get_show_episodes(show_id) -> list: def get_show_episodes(show_id_str) -> list:
episodes = [] episodes = []
offset = 0 offset = 0
limit = 50 limit = 50
while True: while True:
resp = ZSpotify.invoke_url_with_params(f'{SHOWS_URL}/{show_id}/episodes', limit=limit, offset=offset) resp = ZSpotify.invoke_url_with_params(f'{SHOWS_URL}/{show_id_str}/episodes', limit=limit, offset=offset)
offset += limit offset += limit
for episode in resp[ITEMS]: for episode in resp[ITEMS]:
episodes.append(episode[ID]) episodes.append(episode[ID])
@ -40,7 +42,7 @@ def download_episode(episode_id) -> None:
extra_paths = podcast_name + '/' extra_paths = podcast_name + '/'
if not podcast_name: if podcast_name is None:
print('### SKIPPING: (EPISODE NOT FOUND) ###') print('### SKIPPING: (EPISODE NOT FOUND) ###')
else: else:
filename = podcast_name + ' - ' + episode_name filename = podcast_name + ' - ' + episode_name
@ -54,11 +56,11 @@ def download_episode(episode_id) -> None:
total_size = stream.input_stream.size total_size = stream.input_stream.size
with open(download_directory + filename + MusicFormat.OGG.value, with open(download_directory + filename + MusicFormat.OGG.value,
'wb') as file, tqdm( 'wb') as file, tqdm(
desc=filename, desc=filename,
total=total_size, total=total_size,
unit='B', unit='B',
unit_scale=True, unit_scale=True,
unit_divisor=1024 unit_divisor=1024
) as bar: ) as bar:
for _ in range(int(total_size / ZSpotify.get_config(CHUNK_SIZE)) + 1): for _ in range(int(total_size / ZSpotify.get_config(CHUNK_SIZE)) + 1):
bar.update(file.write( bar.update(file.write(

View File

@ -1,4 +1,3 @@
import math
import os import os
import time import time
from typing import Any, Tuple, List from typing import Any, Tuple, List
@ -10,7 +9,7 @@ from tqdm import tqdm
from const import TRACKS, ALBUM, NAME, ITEMS, DISC_NUMBER, TRACK_NUMBER, IS_PLAYABLE, ARTISTS, IMAGES, URL, \ from const import TRACKS, ALBUM, NAME, ITEMS, DISC_NUMBER, TRACK_NUMBER, IS_PLAYABLE, ARTISTS, IMAGES, URL, \
RELEASE_DATE, ID, TRACKS_URL, SAVED_TRACKS_URL, SPLIT_ALBUM_DISCS, ROOT_PATH, DOWNLOAD_FORMAT, CHUNK_SIZE, \ RELEASE_DATE, ID, TRACKS_URL, SAVED_TRACKS_URL, SPLIT_ALBUM_DISCS, ROOT_PATH, DOWNLOAD_FORMAT, CHUNK_SIZE, \
SKIP_EXISTING_FILES, ANTI_BAN_WAIT_TIME, OVERRIDE_AUTO_WAIT, DURATION_MS SKIP_EXISTING_FILES, ANTI_BAN_WAIT_TIME, OVERRIDE_AUTO_WAIT
from utils import sanitize_data, set_audio_tags, set_music_thumbnail, create_download_directory, \ from utils import sanitize_data, set_audio_tags, set_music_thumbnail, create_download_directory, \
MusicFormat MusicFormat
from zspotify import ZSpotify from zspotify import ZSpotify
@ -33,7 +32,7 @@ def get_saved_tracks() -> list:
return songs return songs
def get_song_info(song_id) -> Tuple[List[str], str, str, Any, Any, Any, Any, Any, Any, Any]: def get_song_info(song_id) -> Tuple[List[str], str, str, Any, Any, Any, Any, Any, Any]:
""" Retrieves metadata for downloaded songs """ """ Retrieves metadata for downloaded songs """
info = ZSpotify.invoke_url(f'{TRACKS_URL}?ids={song_id}&market=from_token') info = ZSpotify.invoke_url(f'{TRACKS_URL}?ids={song_id}&market=from_token')
@ -48,123 +47,82 @@ def get_song_info(song_id) -> Tuple[List[str], str, str, Any, Any, Any, Any, Any
track_number = info[TRACKS][0][TRACK_NUMBER] track_number = info[TRACKS][0][TRACK_NUMBER]
scraped_song_id = info[TRACKS][0][ID] scraped_song_id = info[TRACKS][0][ID]
is_playable = info[TRACKS][0][IS_PLAYABLE] is_playable = info[TRACKS][0][IS_PLAYABLE]
duration = math.ceil(info[TRACKS][0][DURATION_MS] / 1000)
return (artists, album_name, name, image_url, release_year, disc_number, track_number, scraped_song_id, is_playable, return artists, album_name, name, image_url, release_year, disc_number, track_number, scraped_song_id, is_playable
duration)
# noinspection PyBroadException # noinspection PyBroadException
def download_track(track_id: str, extra_paths: str = '', prefix: bool = False, prefix_value='', def download_track(track_id: str, extra_paths='', prefix=False, prefix_value='', disable_progressbar=False) -> None:
disable_progressbar: bool = False,
create_m3u_file: bool = False) -> None:
""" Downloads raw song audio from Spotify """ """ Downloads raw song audio from Spotify """
try: try:
(artists, album_name, name, image_url, release_year, disc_number, (artists, album_name, name, image_url, release_year, disc_number,
track_number, scraped_song_id, is_playable, duration) = get_song_info(track_id) track_number, scraped_song_id, is_playable) = get_song_info(track_id)
song_name, filename, m3u_filename = process_track_metadata(artists, name, disc_number, extra_paths, prefix,
prefix_value, create_m3u_file) if ZSpotify.get_config(SPLIT_ALBUM_DISCS):
except Exception: download_directory = os.path.join(os.path.dirname(
__file__), ZSpotify.get_config(ROOT_PATH), extra_paths, f'Disc {disc_number}')
else:
download_directory = os.path.join(os.path.dirname(
__file__), ZSpotify.get_config(ROOT_PATH), extra_paths)
song_name = artists[0] + ' - ' + name
if prefix:
song_name = f'{prefix_value.zfill(2)} - {song_name}' if prefix_value.isdigit(
) else f'{prefix_value} - {song_name}'
filename = os.path.join(
download_directory, f'{song_name}.{ZSpotify.get_config(DOWNLOAD_FORMAT)}')
except Exception as e:
print('### SKIPPING SONG - FAILED TO QUERY METADATA ###') print('### SKIPPING SONG - FAILED TO QUERY METADATA ###')
print(e)
else: else:
try: try:
track_info = ( if not is_playable:
artists, album_name, name, image_url, release_year, disc_number, track_number, scraped_song_id, print('\n### SKIPPING:', song_name,
is_playable, duration) '(SONG IS UNAVAILABLE) ###')
playlist_info = (create_m3u_file, m3u_filename) else:
creat_track(track_id, extra_paths, song_name, filename, disable_progressbar, track_info, playlist_info) if os.path.isfile(filename) and os.path.getsize(filename) and ZSpotify.get_config(SKIP_EXISTING_FILES):
except Exception: print('\n### SKIPPING:', song_name,
'(SONG ALREADY EXISTS) ###')
else:
if track_id != scraped_song_id:
track_id = scraped_song_id
track_id = TrackId.from_base62(track_id)
stream = ZSpotify.get_content_stream(
track_id, ZSpotify.DOWNLOAD_QUALITY)
create_download_directory(download_directory)
total_size = stream.input_stream.size
with open(filename, 'wb') as file, tqdm(
desc=song_name,
total=total_size,
unit='B',
unit_scale=True,
unit_divisor=1024,
disable=disable_progressbar
) as p_bar:
for _ in range(int(total_size / ZSpotify.get_config(CHUNK_SIZE)) + 1):
p_bar.update(file.write(
stream.input_stream.stream().read(ZSpotify.get_config(CHUNK_SIZE))))
if ZSpotify.get_config(DOWNLOAD_FORMAT) == 'mp3':
convert_audio_format(filename)
set_audio_tags(filename, artists, name, album_name,
release_year, disc_number, track_number)
set_music_thumbnail(filename, image_url)
if not ZSpotify.get_config(OVERRIDE_AUTO_WAIT):
time.sleep(ZSpotify.get_config(ANTI_BAN_WAIT_TIME))
except Exception as e:
print('### SKIPPING:', song_name, print('### SKIPPING:', song_name,
'(GENERAL DOWNLOAD ERROR) ###') '(GENERAL DOWNLOAD ERROR) ###')
print(e)
if os.path.exists(filename): if os.path.exists(filename):
os.remove(filename) os.remove(filename)
def process_track_metadata(artists: list, name: str, disc_number: Any, extra_paths: str, prefix: bool,
prefix_value: str,
create_m3u_file: bool):
m3u_filename = None
if create_m3u_file:
download_directory = os.path.join(os.path.dirname(
__file__), ZSpotify.get_config(ROOT_PATH), extra_paths, extra_paths[:-1])
m3u_filename = f'{download_directory}.m3u'
if ZSpotify.get_config(SPLIT_ALBUM_DISCS):
download_directory = os.path.join(os.path.dirname(
__file__), ZSpotify.get_config(ROOT_PATH), extra_paths, f'Disc {disc_number}')
else:
download_directory = os.path.join(os.path.dirname(
__file__), ZSpotify.get_config(ROOT_PATH), extra_paths)
song_name = artists[0] + ' - ' + name
if prefix:
song_name = f'{prefix_value.zfill(2)} - {song_name}' if prefix_value.isdigit(
) else f'{prefix_value} - {song_name}'
filename = os.path.join(
download_directory, f'{song_name}.{ZSpotify.get_config(DOWNLOAD_FORMAT)}')
return song_name, filename, m3u_filename
def creat_track(track_id, extra_paths: str, song_name: str, filename: str, disable_progressbar: bool, track_info: tuple,
playlist_info: tuple):
(artists, album_name, name, image_url, release_year, disc_number,
track_number, scraped_song_id, is_playable, duration) = track_info
create_m3u, m3u_filename = playlist_info
if not is_playable:
print(f'\n### SKIPPING: {song_name} (SONG IS UNAVAILABLE) ###')
else:
if os.path.isfile(filename) and os.path.getsize(filename) and ZSpotify.get_config(SKIP_EXISTING_FILES):
playlist_data = f'#EXTINF:{duration}, {artists[0]} - {name}\n{os.path.abspath(filename)}'
create_playlist_file(create_m3u, playlist_data, m3u_filename)
print(f'\n### SKIPPING: {song_name} (SONG ALREADY EXISTS) ###')
else:
if track_id != scraped_song_id:
track_id = scraped_song_id
track_id = TrackId.from_base62(track_id)
stream = ZSpotify.get_content_stream(track_id, ZSpotify.DOWNLOAD_QUALITY)
create_download_directory(ZSpotify.get_config(ROOT_PATH) + extra_paths)
total_size = stream.input_stream.size
write_stream_to_file(stream, filename, song_name, total_size, disable_progressbar, track_info,
playlist_info)
def write_stream_to_file(stream, filename: str, song_name: str, total_size: Any, disable_progressbar: bool,
track_info: tuple, playlist_info: tuple):
(artists, album_name, name, image_url, release_year, disc_number,
track_number, scraped_song_id, is_playable, duration) = track_info
create_m3u, m3u_filename = playlist_info
with open(filename, 'wb') as file, tqdm(
desc=song_name,
total=total_size,
unit='B',
unit_scale=True,
unit_divisor=1024,
disable=disable_progressbar
) as p_bar:
for _ in range(int(total_size / ZSpotify.get_config(CHUNK_SIZE)) + 1):
p_bar.update(file.write(
stream.input_stream.stream().read(ZSpotify.get_config(CHUNK_SIZE))))
playlist_data = f'#EXTINF:{duration}, {artists[0]} - {name}\n{os.path.abspath(filename)}'
if ZSpotify.get_config(DOWNLOAD_FORMAT) == MusicFormat.MP3.value:
convert_audio_format(filename)
set_audio_tags(filename, artists, name, album_name,
release_year, disc_number, track_number)
set_music_thumbnail(filename, image_url)
if not ZSpotify.get_config(OVERRIDE_AUTO_WAIT):
time.sleep(ZSpotify.get_config(ANTI_BAN_WAIT_TIME))
create_playlist_file(create_m3u, playlist_data, m3u_filename)
def create_playlist_file(create_m3u: bool, playlist_data: str, m3u_filename: str):
if create_m3u and m3u_filename and playlist_data:
with open(m3u_filename, 'a+') as pfile:
if os.path.getsize(m3u_filename) == 0:
pfile.write('#EXTM3U\n')
pfile.write(playlist_data + '\n')
def convert_audio_format(filename) -> None: def convert_audio_format(filename) -> None:
""" Converts raw audio into playable mp3 """ """ Converts raw audio into playable mp3 """
# print('### CONVERTING TO ' + MUSIC_FORMAT.upper() + ' ###') # print('### CONVERTING TO ' + MUSIC_FORMAT.upper() + ' ###')
@ -174,4 +132,5 @@ def convert_audio_format(filename) -> None:
bitrate = '320k' bitrate = '320k'
else: else:
bitrate = '160k' bitrate = '160k'
raw_audio.export(filename, format=ZSpotify.get_config(DOWNLOAD_FORMAT), bitrate=bitrate) raw_audio.export(filename, format=ZSpotify.get_config(
DOWNLOAD_FORMAT), bitrate=bitrate)

View File

@ -3,19 +3,19 @@ import platform
import re import re
import time import time
from enum import Enum from enum import Enum
from typing import List, Tuple, Match from typing import List, Tuple
import music_tag import music_tag
import requests import requests
from const import SANITIZE, ARTIST, TRACKTITLE, ALBUM, YEAR, DISCNUMBER, TRACKNUMBER, ARTWORK, \ from const import SANITIZE, ARTIST, TRACKTITLE, ALBUM, YEAR, DISCNUMBER, TRACKNUMBER, ARTWORK, \
WINDOWS_SYSTEM, TRACK_ID, ALBUM_ID, PLAYLIST_ID, EPISODE_ID, SHOW_ID, ARTIST_ID WINDOWS_SYSTEM
class MusicFormat(str, Enum): class MusicFormat(str, Enum):
MP3 = 'mp3', MP3 = 'mp3',
OGG = 'ogg', OGG = 'ogg',
def create_download_directory(download_path: str) -> None: def create_download_directory(download_path: str) -> None:
os.makedirs(download_path, exist_ok=True) os.makedirs(download_path, exist_ok=True)
@ -136,23 +136,46 @@ def regex_input_for_urls(search_input) -> Tuple[str, str, str, str, str, str]:
search_input, search_input,
) )
return ( if track_uri_search is not None or track_url_search is not None:
extract_info_from_regex_response(TRACK_ID, track_uri_search, track_url_search), track_id_str = (track_uri_search
if track_uri_search is not None else
extract_info_from_regex_response(ALBUM_ID, album_uri_search, album_url_search), track_url_search).group('TrackID')
extract_info_from_regex_response(PLAYLIST_ID, playlist_uri_search, playlist_url_search),
extract_info_from_regex_response(EPISODE_ID, episode_uri_search, episode_url_search),
extract_info_from_regex_response(SHOW_ID, show_uri_search, show_url_search),
extract_info_from_regex_response(ARTIST_ID, artist_uri_search, artist_url_search)
)
def extract_info_from_regex_response(key, uri_data: Match[str], url_data: Match[str]):
if uri_data or url_data:
return (uri_data if uri_data else url_data).group(key)
else: else:
return None track_id_str = None
if album_uri_search is not None or album_url_search is not None:
album_id_str = (album_uri_search
if album_uri_search is not None else
album_url_search).group('AlbumID')
else:
album_id_str = None
if playlist_uri_search is not None or playlist_url_search is not None:
playlist_id_str = (playlist_uri_search
if playlist_uri_search is not None else
playlist_url_search).group('PlaylistID')
else:
playlist_id_str = None
if episode_uri_search is not None or episode_url_search is not None:
episode_id_str = (episode_uri_search
if episode_uri_search is not None else
episode_url_search).group('EpisodeID')
else:
episode_id_str = None
if show_uri_search is not None or show_url_search is not None:
show_id_str = (show_uri_search
if show_uri_search is not None else
show_url_search).group('ShowID')
else:
show_id_str = None
if artist_uri_search is not None or artist_url_search is not None:
artist_id_str = (artist_uri_search
if artist_uri_search is not None else
artist_url_search).group('ArtistID')
else:
artist_id_str = None
return track_id_str, album_id_str, playlist_id_str, episode_id_str, show_id_str, artist_id_str