Added options to regulate terminal output (splash, progress, errors, skips, ...)

This commit is contained in:
Mike Schwörer 2021-11-19 18:45:04 +01:00
parent c8c6c7d8cc
commit de0f8b30b1
No known key found for this signature in database
GPG Key ID: D3C7172E0A70F8CF
8 changed files with 69 additions and 51 deletions

View File

@ -1,6 +1,5 @@
from tqdm import tqdm
from const import ITEMS, ARTISTS, NAME, ID
from termoutput import Printer
from track import download_track
from utils import fix_filename
from zspotify import ZSpotify
@ -48,7 +47,7 @@ def download_album(album):
""" Downloads songs from an album """
artist, album_name = get_album_name(album)
tracks = get_album_tracks(album)
for n, track in tqdm(enumerate(tracks, start=1), unit_scale=True, unit='Song', total=len(tracks)):
for n, track in Printer.progress(enumerate(tracks, start=1), unit_scale=True, unit='Song', total=len(tracks)):
download_track('album', track[ID], extra_keys={'album_num': str(n).zfill(2), 'artist': artist, 'album': album_name, 'album_id': album}, disable_progressbar=True)

View File

@ -7,6 +7,7 @@ from const import TRACK, NAME, ID, ARTIST, ARTISTS, ITEMS, TRACKS, EXPLICIT, ALB
OWNER, PLAYLIST, PLAYLISTS, DISPLAY_NAME
from playlist import get_playlist_songs, get_playlist_info, download_from_user_playlist, download_playlist
from podcast import download_episode, get_show_episodes
from termoutput import Printer, PrintChannel
from track import download_track, get_saved_tracks
from utils import fix_filename, splash, split_input, regex_input_for_urls
from zspotify import ZSpotify
@ -18,16 +19,13 @@ def client(args) -> None:
""" Connects to spotify to perform query's and get songs to download """
ZSpotify(args)
if not args.no_splash:
splash()
Printer.print(PrintChannel.SPLASH, splash())
if ZSpotify.check_premium():
if not args.no_splash:
print('[ DETECTED PREMIUM ACCOUNT - USING VERY_HIGH QUALITY ]\n\n')
Printer.print(PrintChannel.SPLASH, '[ DETECTED PREMIUM ACCOUNT - USING VERY_HIGH QUALITY ]\n\n')
ZSpotify.DOWNLOAD_QUALITY = AudioQuality.VERY_HIGH
else:
if not args.no_splash:
print('[ DETECTED FREE ACCOUNT - USING HIGH QUALITY ]\n\n')
Printer.print(PrintChannel.SPLASH, '[ DETECTED FREE ACCOUNT - USING HIGH QUALITY ]\n\n')
ZSpotify.DOWNLOAD_QUALITY = AudioQuality.HIGH
if args.download:
@ -40,7 +38,7 @@ def client(args) -> None:
download_from_urls(urls)
else:
print(f'File {filename} not found.\n')
Printer.print(PrintChannel.ERRORS, f'File {filename} not found.\n')
if args.urls:
download_from_urls(args.urls)
@ -51,11 +49,9 @@ def client(args) -> None:
if args.liked_songs:
for song in get_saved_tracks():
if not song[TRACK][NAME]:
print(
'### SKIPPING: SONG DOES NOT EXIST ON SPOTIFY ANYMORE ###')
Printer.print(PrintChannel.ERRORS, '### SKIPPING: SONG DOES NOT EXIST ON SPOTIFY ANYMORE ###' + "\n")
else:
download_track('liked', song[TRACK][ID])
print('\n')
if args.search_spotify:
search_text = ''
@ -88,7 +84,6 @@ def download_from_urls(urls: list[str]) -> bool:
name, _ = get_playlist_info(playlist_id)
for song in playlist_songs:
download_track('playlist', song[TRACK][ID], extra_keys={'playlist': name})
print('\n')
elif episode_id is not None:
download = True
download_episode(episode_id)

View File

@ -2,6 +2,7 @@ import json
import os
import sys
from typing import Any
from enum import Enum
CONFIG_FILE_PATH = '../zs_config.json'
@ -21,6 +22,11 @@ BITRATE = 'BITRATE'
SONG_ARCHIVE = 'SONG_ARCHIVE'
CREDENTIALS_LOCATION = 'CREDENTIALS_LOCATION'
OUTPUT = 'OUTPUT'
PRINT_SPLASH = 'PRINT_SPLASH'
PRINT_SKIPS = 'PRINT_SKIPS'
PRINT_DOWNLOAD_PROGRESS = 'PRINT_DOWNLOAD_PROGRESS'
PRINT_ERRORS = 'PRINT_ERRORS'
PRINT_DOWNLOADS = 'PRINT_DOWNLOADS'
CONFIG_VALUES = {
ROOT_PATH: { 'default': '../ZSpotify Music/', 'type': str, 'arg': '--root-path' },
@ -39,6 +45,11 @@ CONFIG_VALUES = {
SONG_ARCHIVE: { 'default': '.song_archive', 'type': str, 'arg': '--song-archive' },
CREDENTIALS_LOCATION: { 'default': 'credentials.json', 'type': str, 'arg': '--credentials-location' },
OUTPUT: { 'default': '', 'type': str, 'arg': '--output' },
PRINT_SPLASH: { 'default': 'True', 'type': bool, 'arg': '--print-splash' },
PRINT_SKIPS: { 'default': 'True', 'type': bool, 'arg': '--print-skips' },
PRINT_DOWNLOAD_PROGRESS: { 'default': 'True', 'type': bool, 'arg': '--print-download-progress' },
PRINT_ERRORS: { 'default': 'True', 'type': bool, 'arg': '--print-errors' },
PRINT_DOWNLOADS: { 'default': 'False', 'type': bool, 'arg': '--print-downloads' },
}
OUTPUT_DEFAULT_PLAYLIST = '{playlist}/{artist} - {song_name}.{ext}'
@ -86,6 +97,9 @@ class Config:
if key.lower() in vars(args) and vars(args)[key.lower()] is not None:
cls.Values[key] = cls.parse_arg_value(key, vars(args)[key.lower()])
if args.no_splash:
cls.Values[PRINT_SPLASH] = False
@classmethod
def get_default_json(cls) -> Any:
r = {}

View File

@ -1,6 +1,5 @@
from tqdm import tqdm
from const import ITEMS, ID, TRACK, NAME
from termoutput import Printer
from track import download_track
from utils import fix_filename, split_input
from zspotify import ZSpotify
@ -51,7 +50,7 @@ def download_playlist(playlist):
"""Downloads all the songs from a playlist"""
playlist_songs = [song for song in get_playlist_songs(playlist[ID]) if song[TRACK][ID]]
p_bar = tqdm(playlist_songs, unit='song', total=len(playlist_songs), unit_scale=True)
p_bar = Printer.progress(playlist_songs, unit='song', total=len(playlist_songs), unit_scale=True)
enum = 1
for song in p_bar:
download_track('extplaylist', song[TRACK][ID], extra_keys={'playlist': playlist[NAME], 'playlist_num': str(enum).zfill(2)}, disable_progressbar=True)

View File

@ -3,9 +3,9 @@ from typing import Optional, Tuple
from librespot.audio.decoders import VorbisOnlyAudioQuality
from librespot.metadata import EpisodeId
from tqdm import tqdm
from const import (ERROR, ID, ITEMS, NAME, SHOW)
from termoutput import PrintChannel, Printer
from utils import create_download_directory, fix_filename
from zspotify import ZSpotify
@ -70,7 +70,7 @@ def download_episode(episode_id) -> None:
extra_paths = podcast_name + '/'
if podcast_name is None:
print('### SKIPPING: (EPISODE NOT FOUND) ###')
Printer.print(PrintChannel.ERRORS, '### SKIPPING: (EPISODE NOT FOUND) ###')
else:
filename = podcast_name + ' - ' + episode_name
@ -98,16 +98,10 @@ def download_episode(episode_id) -> None:
and os.path.getsize(filepath) == total_size
and ZSpotify.CONFIG.get_skip_existing_files()
):
print(
"\n### SKIPPING:",
podcast_name,
"-",
episode_name,
"(EPISODE ALREADY EXISTS) ###",
)
Printer.print(PrintChannel.SKIPS, "\n### SKIPPING: " + podcast_name + " - " + episode_name + " (EPISODE ALREADY EXISTS) ###")
return
with open(filepath, 'wb') as file, tqdm(
with open(filepath, 'wb') as file, Printer.progress(
desc=filename,
total=total_size,
unit='B',

26
zspotify/termoutput.py Normal file
View File

@ -0,0 +1,26 @@
from enum import Enum
from tqdm import tqdm
from config import PRINT_SPLASH, PRINT_SKIPS, PRINT_DOWNLOAD_PROGRESS, PRINT_ERRORS, PRINT_DOWNLOADS
from zspotify import ZSpotify
class PrintChannel(Enum):
SPLASH = PRINT_SPLASH
SKIPS = PRINT_SKIPS
DOWNLOAD_PROGRESS = PRINT_DOWNLOAD_PROGRESS
ERRORS = PRINT_ERRORS
DOWNLOADS = PRINT_DOWNLOADS
class Printer:
@staticmethod
def print(channel: PrintChannel, msg: str) -> None:
if ZSpotify.CONFIG.get(channel.value):
print(msg)
@staticmethod
def progress(iterable=None, desc=None, total=None, unit='it', disable=False, unit_scale=False, unit_divisor=1000):
if not ZSpotify.CONFIG.get(PrintChannel.DOWNLOAD_PROGRESS.value):
disable = True
return tqdm(iterable=iterable, desc=desc, total=total, disable=disable, unit=unit, unit_scale=unit_scale, unit_divisor=unit_divisor)

View File

@ -7,10 +7,10 @@ from librespot.audio.decoders import AudioQuality
from librespot.metadata import TrackId
from ffmpy import FFmpeg
from pydub import AudioSegment
from tqdm import tqdm
from const import TRACKS, ALBUM, NAME, ITEMS, DISC_NUMBER, TRACK_NUMBER, IS_PLAYABLE, ARTISTS, IMAGES, URL, \
RELEASE_DATE, ID, TRACKS_URL, SAVED_TRACKS_URL, TRACK_STATS_URL, CODEC_MAP, EXT_MAP, DURATION_MS
from termoutput import Printer, PrintChannel
from utils import fix_filename, set_audio_tags, set_music_thumbnail, create_download_directory, \
get_directory_song_ids, add_to_directory_song_ids, get_previously_downloaded, add_to_archive
from zspotify import ZSpotify
@ -111,21 +111,18 @@ def download_track(mode: str, track_id: str, extra_keys={}, disable_progressbar=
except Exception as e:
print('### SKIPPING SONG - FAILED TO QUERY METADATA ###')
print(e)
Printer.print(PrintChannel.ERRORS, '### SKIPPING SONG - FAILED TO QUERY METADATA ###')
Printer.print(PrintChannel.ERRORS, str(e) + "\n")
else:
try:
if not is_playable:
print('\n### SKIPPING:', song_name,
'(SONG IS UNAVAILABLE) ###')
Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG IS UNAVAILABLE) ###' + "\n")
else:
if check_id and check_name and ZSpotify.CONFIG.get_skip_existing_files():
print('\n### SKIPPING:', song_name,
'(SONG ALREADY EXISTS) ###')
Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG ALREADY EXISTS) ###' + "\n")
elif check_all_time and ZSpotify.CONFIG.get_skip_previously_downloaded():
print('\n### SKIPPING:', song_name,
'(SONG ALREADY DOWNLOADED ONCE) ###')
Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG ALREADY DOWNLOADED ONCE) ###' + "\n")
else:
if track_id != scraped_song_id:
@ -136,7 +133,7 @@ def download_track(mode: str, track_id: str, extra_keys={}, disable_progressbar=
create_download_directory(filedir)
total_size = stream.input_stream.size
with open(filename, 'wb') as file, tqdm(
with open(filename, 'wb') as file, Printer.progress(
desc=song_name,
total=total_size,
unit='B',
@ -152,10 +149,11 @@ def download_track(mode: str, track_id: str, extra_keys={}, disable_progressbar=
time.sleep(pause)
convert_audio_format(filename)
set_audio_tags(filename, artists, name, album_name,
release_year, disc_number, track_number)
set_audio_tags(filename, artists, name, album_name, release_year, disc_number, track_number)
set_music_thumbnail(filename, image_url)
Printer.print(PrintChannel.DOWNLOADS, f'### Downloaded "{song_name}" to "{os.path.relpath(filename, os.path.dirname(__file__))}" ###' + "\n")
# add song id to archive file
if ZSpotify.CONFIG.get_skip_previously_downloaded():
add_to_archive(scraped_song_id, os.path.basename(filename), artists[0], name)
@ -166,9 +164,8 @@ def download_track(mode: str, track_id: str, extra_keys={}, disable_progressbar=
if not ZSpotify.CONFIG.get_anti_ban_wait_time():
time.sleep(ZSpotify.CONFIG.get_anti_ban_wait_time())
except Exception as e:
print('### SKIPPING:', song_name,
'(GENERAL DOWNLOAD ERROR) ###')
print(e)
Printer.print(PrintChannel.ERRORS, '### SKIPPING: ' + song_name + ' (GENERAL DOWNLOAD ERROR) ###')
Printer.print(PrintChannel.ERRORS, str(e) + "\n")
if os.path.exists(filename):
os.remove(filename)

View File

@ -86,12 +86,6 @@ def get_downloaded_song_duration(filename: str) -> float:
return duration
def wait(seconds: int = 3) -> None:
""" Pause for a set number of seconds """
for second in range(seconds)[::-1]:
print(f'\rWait for {second + 1} second(s)...', end='')
time.sleep(1)
def split_input(selection) -> List[str]:
""" Returns a list of inputted strings """
@ -106,15 +100,15 @@ def split_input(selection) -> List[str]:
return inputs
def splash() -> None:
def splash() -> str:
""" Displays splash screen """
print("""
return """
""")
"""
def clear() -> None: