From b5987179e24c1ec3b18d9bd9a44521d7c0cad47a Mon Sep 17 00:00:00 2001 From: Sourcery AI <> Date: Tue, 24 Oct 2023 08:25:30 +0000 Subject: [PATCH] 'Refactored by Sourcery' --- examples/player/main.py | 33 +++--- examples/server/main.py | 84 ++++++++-------- librespot/__init__.py | 2 +- librespot/audio/__init__.py | 106 ++++++++++---------- librespot/audio/decoders.py | 33 +++--- librespot/audio/decrypt.py | 4 +- librespot/audio/format.py | 2 +- librespot/audio/storage.py | 17 ++-- librespot/core.py | 188 ++++++++++++++--------------------- librespot/crypto.py | 21 ++-- librespot/mercury.py | 39 ++++---- librespot/metadata.py | 48 +++++---- librespot/util.py | 4 +- librespot/zeroconf.py | 56 +++++------ librespot_player/__init__.py | 21 ++-- 15 files changed, 315 insertions(+), 343 deletions(-) diff --git a/examples/player/main.py b/examples/player/main.py index 1c1c0dd..1f8a4c0 100644 --- a/examples/player/main.py +++ b/examples/player/main.py @@ -28,9 +28,9 @@ def client(): splash() cmd = input("Player >>> ") args = cmd.split(" ") - if args[0] == "exit" or args[0] == "quit": + if args[0] in ["exit", "quit"]: return - if (args[0] == "p" or args[0] == "play") and len(args) == 2: + if args[0] in ["p", "play"] and len(args) == 2: track_uri_search = re.search( r"^spotify:track:(?P[0-9a-zA-Z]{22})$", args[1]) track_url_search = re.search( @@ -43,44 +43,37 @@ def client(): track_url_search).group("TrackID") play(track_id_str) wait() - if args[0] == "q" or args[0] == "quality": + if args[0] in ["q", "quality"]: if len(args) == 1: - print("Current Quality: " + quality.name) + print(f"Current Quality: {quality.name}") wait() elif len(args) == 2: - if args[1] == "normal" or args[1] == "96": + if args[1] in ["normal", "96"]: quality = AudioQuality.NORMAL - elif args[1] == "high" or args[1] == "160": + elif args[1] in ["high", "160"]: quality = AudioQuality.HIGH - elif args[1] == "veryhigh" or args[1] == "320": + elif args[1] in ["veryhigh", "320"]: quality = AudioQuality.VERY_HIGH - print("Set Quality to %s" % quality.name) + print(f"Set Quality to {quality.name}") wait() - if (args[0] == "s" or args[0] == "search") and len(args) >= 2: + if args[0] in ["s", "search"] and len(args) >= 2: token = session.tokens().get("user-read-email") resp = requests.get( "https://api.spotify.com/v1/search", - { - "limit": "5", - "offset": "0", - "q": cmd[2:], - "type": "track" - }, - headers={"Authorization": "Bearer %s" % token}, + {"limit": "5", "offset": "0", "q": cmd[2:], "type": "track"}, + headers={"Authorization": f"Bearer {token}"}, ) - i = 1 tracks = resp.json()["tracks"]["items"] - for track in tracks: + for i, track in enumerate(tracks, start=1): print("%d, %s | %s" % ( i, track["name"], ",".join([artist["name"] for artist in track["artists"]]), )) - i += 1 position = -1 while True: num_str = input("Select [1-5]: ") - if num_str == "exit" or num_str == "quit": + if num_str in ["exit", "quit"]: return try: num = int(num_str) diff --git a/examples/server/main.py b/examples/server/main.py index 1f707bd..e9020f5 100644 --- a/examples/server/main.py +++ b/examples/server/main.py @@ -75,46 +75,52 @@ def main(): def response(client: socket.socket, uri: str, header: dict, body: bytes) -> tuple[str, list, bytes, bool]: - if re.search(r"^/audio/track/([0-9a-zA-Z]{22})$", uri) is not None: - track_id_search = re.search( - r"^/audio/track/(?P[0-9a-zA-Z]{22})$", uri) - track_id_str = track_id_search.group("TrackID") - track_id = TrackId.from_base62(track_id_str) - stream = session.content_feeder().load( - track_id, VorbisOnlyAudioQuality(AudioQuality.VERY_HIGH), False, - None) - start = 0 - end = stream.input_stream.stream().size() - if header.get("range") is not None: - range_search = re.search( - "^bytes=(?P[0-9]+?)-(?P[0-9]+?)$", - header.get("range")) - if range_search is not None: - start = int(range_search.group("start")) - end = (int(range_search.group("end")) - if int(range_search.group("end")) <= - stream.input_stream.stream().size() else - stream.input_stream.stream().size()) - stream.input_stream.stream().skip(start) - client.send(b"HTTP/1.0 200 OK\r\n") - client.send(b"Access-Control-Allow-Origin: *\r\n") - client.send(b"Content-Length: " + - (str(stream.input_stream.stream().size()).encode() if - stream.input_stream.stream().size() == end else "{}-{}/{}" - .format(start, end, - stream.input_stream.stream().size()).encode()) + - b"\r\n") - client.send(b"Content-Type: audio/ogg\r\n") - client.send(b"\r\n") - while True: - if (stream.input_stream.stream().pos() >= - stream.input_stream.stream().size()): - break - byte = stream.input_stream.stream().read(1) - client.send(byte) - return "", [], b"", True - else: + if re.search(r"^/audio/track/([0-9a-zA-Z]{22})$", uri) is None: return HttpCode.http_404, [], HttpCode.http_404.encode(), False + track_id_search = re.search( + r"^/audio/track/(?P[0-9a-zA-Z]{22})$", uri) + track_id_str = track_id_search.group("TrackID") + track_id = TrackId.from_base62(track_id_str) + stream = session.content_feeder().load( + track_id, VorbisOnlyAudioQuality(AudioQuality.VERY_HIGH), False, + None) + start = 0 + end = stream.input_stream.stream().size() + if header.get("range") is not None: + range_search = re.search( + "^bytes=(?P[0-9]+?)-(?P[0-9]+?)$", + header.get("range")) + if range_search is not None: + start = int(range_search.group("start")) + end = (int(range_search.group("end")) + if int(range_search.group("end")) <= + stream.input_stream.stream().size() else + stream.input_stream.stream().size()) + stream.input_stream.stream().skip(start) + client.send(b"HTTP/1.0 200 OK\r\n") + client.send(b"Access-Control-Allow-Origin: *\r\n") + client.send( + ( + ( + b"Content-Length: " + + ( + str(stream.input_stream.stream().size()).encode() + if stream.input_stream.stream().size() == end + else f"{start}-{end}/{stream.input_stream.stream().size()}".encode() + ) + ) + + b"\r\n" + ) + ) + client.send(b"Content-Type: audio/ogg\r\n") + client.send(b"\r\n") + while True: + if (stream.input_stream.stream().pos() >= + stream.input_stream.stream().size()): + break + byte = stream.input_stream.stream().read(1) + client.send(byte) + return "", [], b"", True if __name__ == "__main__": diff --git a/librespot/__init__.py b/librespot/__init__.py index 9503e25..27f6c81 100644 --- a/librespot/__init__.py +++ b/librespot/__init__.py @@ -18,7 +18,7 @@ class Version: @staticmethod def version_string(): - return "librespot-python " + Version.version_name + return f"librespot-python {Version.version_name}" @staticmethod def system_info_string(): diff --git a/librespot/audio/__init__.py b/librespot/audio/__init__.py index afbbbfb..3b1a9f8 100644 --- a/librespot/audio/__init__.py +++ b/librespot/audio/__init__.py @@ -219,8 +219,9 @@ class AbsChunkedInputStream(io.BytesIO, HaltListener): @staticmethod def from_stream_error(stream_error: int): - return AbsChunkedInputStream \ - .ChunkException("Failed due to stream error, code: {}".format(stream_error)) + return AbsChunkedInputStream.ChunkException( + f"Failed due to stream error, code: {stream_error}" + ) class AudioKeyManager(PacketsReceiver, Closeable): @@ -240,8 +241,7 @@ class AudioKeyManager(PacketsReceiver, Closeable): seq = struct.unpack(">i", payload.read(4))[0] callback = self.__callbacks.get(seq) if callback is None: - self.logger.warning( - "Couldn't find callback for seq: {}".format(seq)) + self.logger.warning(f"Couldn't find callback for seq: {seq}") return if packet.is_cmd(Packet.Type.aes_key): key = payload.read(16) @@ -251,8 +251,8 @@ class AudioKeyManager(PacketsReceiver, Closeable): callback.error(code) else: self.logger.warning( - "Couldn't handle packet, cmd: {}, length: {}".format( - packet.cmd, len(packet.payload))) + f"Couldn't handle packet, cmd: {packet.cmd}, length: {len(packet.payload)}" + ) def get_audio_key(self, gid: bytes, @@ -276,8 +276,8 @@ class AudioKeyManager(PacketsReceiver, Closeable): if retry: return self.get_audio_key(gid, file_id, False) raise RuntimeError( - "Failed fetching audio key! gid: {}, fileId: {}".format( - util.bytes_to_hex(gid), util.bytes_to_hex(file_id))) + f"Failed fetching audio key! gid: {util.bytes_to_hex(gid)}, fileId: {util.bytes_to_hex(file_id)}" + ) return key class Callback: @@ -302,8 +302,7 @@ class AudioKeyManager(PacketsReceiver, Closeable): self.__reference_lock.notify_all() def error(self, code: int) -> None: - self.__audio_key_manager.logger.fatal( - "Audio key error, code: {}".format(code)) + self.__audio_key_manager.logger.fatal(f"Audio key error, code: {code}") with self.__reference_lock: self.__reference.put(None) self.__reference_lock.notify_all() @@ -359,8 +358,9 @@ class CdnFeedHelper: CdnFeedHelper._LOGGER.warning("Couldn't resolve redirect!") url = resp.url - CdnFeedHelper._LOGGER.debug("Fetched external url for {}: {}".format( - util.bytes_to_hex(episode.gid), url)) + CdnFeedHelper._LOGGER.debug( + f"Fetched external url for {util.bytes_to_hex(episode.gid)}: {url}" + ) streamer = session.cdn().stream_external_episode( episode, url, halt_listener) @@ -411,10 +411,10 @@ class CdnManager: def get_head(self, file_id: bytes): response = self.__session.client() \ - .get(self.__session.get_user_attribute("head-files-url", "https://heads-fa.spotify.com/head/{file_id}") + .get(self.__session.get_user_attribute("head-files-url", "https://heads-fa.spotify.com/head/{file_id}") .replace("{file_id}", util.bytes_to_hex(file_id))) if response.status_code != 200: - raise IOError("{}".format(response.status_code)) + raise IOError(f"{response.status_code}") body = response.content if body is None: raise IOError("Response body is empty!") @@ -446,8 +446,12 @@ class CdnManager: ) def get_audio_url(self, file_id: bytes): - response = self.__session.api()\ - .send("GET", "/storage-resolve/files/audio/interactive/{}".format(util.bytes_to_hex(file_id)), None, None) + response = self.__session.api().send( + "GET", + f"/storage-resolve/files/audio/interactive/{util.bytes_to_hex(file_id)}", + None, + None, + ) if response.status_code != 200: raise IOError(response.status_code) body = response.content @@ -457,11 +461,11 @@ class CdnManager: proto.ParseFromString(body) if proto.result == StorageResolve.StorageResolveResponse.Result.CDN: url = random.choice(proto.cdnurl) - self.logger.debug("Fetched CDN url for {}: {}".format( - util.bytes_to_hex(file_id), url)) + self.logger.debug(f"Fetched CDN url for {util.bytes_to_hex(file_id)}: {url}") return url raise CdnManager.CdnException( - "Could not retrieve CDN url! result: {}".format(proto.result)) + f"Could not retrieve CDN url! result: {proto.result}" + ) class CdnException(Exception): pass @@ -508,7 +512,7 @@ class CdnManager: expires_str = str(expires_list[0]) except TypeError: expires_str = "" - if token_str != "None" and len(token_str) != 0: + if token_str not in ["None", ""]: expire_at = None split = token_str.split("~") for s in split: @@ -521,17 +525,16 @@ class CdnManager: break if expire_at is None: self.__expiration = -1 - self.__cdn_manager.logger.warning( - "Invalid __token__ in CDN url: {}".format(url)) + self.__cdn_manager.logger.warning(f"Invalid __token__ in CDN url: {url}") return self.__expiration = expire_at * 1000 - elif expires_str != "None" and len(expires_str) != 0: + elif expires_str not in ["None", ""]: expires_at = None expires_str = expires_str.split("~")[0] expires_at = int(expires_str) if expires_at is None: self.__expiration = -1 - self.__cdn_manager.logger.warning("Invalid Expires param in CDN url: {}".format(url)) + self.__cdn_manager.logger.warning(f"Invalid Expires param in CDN url: {url}") return self.__expiration = expires_at * 1000 else: @@ -539,8 +542,9 @@ class CdnManager: i = token_url.query.index("_") except ValueError: self.__expiration = -1 - self.__cdn_manager.logger \ - .warning("Couldn't extract expiration, invalid parameter in CDN url: {}".format(url)) + self.__cdn_manager.logger.warning( + f"Couldn't extract expiration, invalid parameter in CDN url: {url}" + ) return self.__expiration = int(token_url.query[:i]) * 1000 @@ -594,8 +598,8 @@ class CdnManager: if self.__internal_stream.is_closed(): return self.__session.logger.debug( - "Chunk {}/{} completed, cached: {}, stream: {}".format( - chunk_index + 1, self.chunks, cached, self.describe())) + f"Chunk {chunk_index + 1}/{self.chunks} completed, cached: {cached}, stream: {self.describe()}" + ) self.buffer[chunk_index] = self.__audio_decrypt.decrypt_chunk( chunk_index, chunk) self.__internal_stream.notify_chunk_available(chunk_index) @@ -608,9 +612,8 @@ class CdnManager: def describe(self) -> str: if self.__stream_id.is_episode(): - return "episode_gid: {}".format( - self.__stream_id.get_episode_gid()) - return "file_id: {}".format(self.__stream_id.get_file_id()) + return f"episode_gid: {self.__stream_id.get_episode_gid()}" + return f"file_id: {self.__stream_id.get_file_id()}" def decrypt_time_ms(self) -> int: return self.__audio_decrypt.decrypt_time_ms() @@ -619,8 +622,7 @@ class CdnManager: response = self.request(index) self.write_chunk(response.buffer, index, False) - def request(self, chunk: int = None, range_start: int = None, range_end: int = None)\ - -> CdnManager.InternalResponse: + def request(self, chunk: int = None, range_start: int = None, range_end: int = None) -> CdnManager.InternalResponse: if chunk is None and range_start is None and range_end is None: raise TypeError() if chunk is not None: @@ -628,9 +630,7 @@ class CdnManager: range_end = (chunk + 1) * ChannelManager.chunk_size - 1 response = self.__session.client().get( self.__cdn_url.url, - headers={ - "Range": "bytes={}-{}".format(range_start, range_end) - }, + headers={"Range": f"bytes={range_start}-{range_end}"}, ) if response.status_code != 206: raise IOError(response.status_code) @@ -695,8 +695,8 @@ class NormalizationData: self.album_peak = album_peak self._LOGGER.debug( - "Loaded normalization data, track_gain: {}, track_peak: {}, album_gain: {}, album_peak: {}" - .format(track_gain_db, track_peak, album_gain_db, album_peak)) + f"Loaded normalization data, track_gain: {track_gain_db}, track_peak: {track_peak}, album_gain: {album_gain_db}, album_peak: {album_peak}" + ) @staticmethod def read(input_stream: AbsChunkedInputStream) -> NormalizationData: @@ -738,7 +738,7 @@ class PlayableContentFeeder: if type(playable_id) is EpisodeId: return self.load_episode(playable_id, audio_quality_picker, preload, halt_listener) - raise TypeError("Unknown content: {}".format(playable_id)) + raise TypeError(f"Unknown content: {playable_id}") def load_stream(self, file: Metadata.AudioFile, track: Metadata.Track, episode: Metadata.Episode, preload: bool, @@ -753,14 +753,13 @@ class PlayableContentFeeder: return CdnFeedHelper.load_episode(self.__session, episode, file, response, preload, halt_lister) if response.result == StorageResolve.StorageResolveResponse.Result.STORAGE: - if track is None: - pass + pass elif response.result == StorageResolve.StorageResolveResponse.Result.RESTRICTED: raise RuntimeError("Content is restricted!") elif response.result == StorageResolve.StorageResolveResponse.Response.UNRECOGNIZED: raise RuntimeError("Content is unrecognized!") else: - raise RuntimeError("Unknown result: {}".format(response.result)) + raise RuntimeError(f"Unknown result: {response.result}") def load_episode(self, episode_id: EpisodeId, audio_quality_picker: AudioQualityPicker, preload: bool, @@ -772,8 +771,8 @@ class PlayableContentFeeder: file = audio_quality_picker.get_file(episode.audio) if file is None: self.logger.fatal( - "Couldn't find any suitable audio file, available: {}".format( - episode.audio)) + f"Couldn't find any suitable audio file, available: {episode.audio}" + ) return self.load_stream(file, None, episode, preload, halt_listener) def load_track(self, track_id_or_track: typing.Union[TrackId, @@ -791,8 +790,8 @@ class PlayableContentFeeder: file = audio_quality_picker.get_file(track.file) if file is None: self.logger.fatal( - "Couldn't find any suitable audio file, available: {}".format( - track.file)) + f"Couldn't find any suitable audio file, available: {track.file}" + ) raise FeederException() return self.load_stream(file, track, None, preload, halt_listener) @@ -800,9 +799,9 @@ class PlayableContentFeeder: self, track: Metadata.Track) -> typing.Union[Metadata.Track, None]: if len(track.file) > 0: return track - for alt in track.alternative: - if len(alt.file) > 0: - return Metadata.Track( + return next( + ( + Metadata.Track( gid=track.gid, name=track.name, album=track.album, @@ -821,8 +820,13 @@ class PlayableContentFeeder: earliest_live_timestamp=track.earliest_live_timestamp, has_lyrics=track.has_lyrics, availability=track.availability, - licensor=track.licensor) - return None + licensor=track.licensor, + ) + for alt in track.alternative + if len(alt.file) > 0 + ), + None, + ) def resolve_storage_interactive( self, file_id: bytes, diff --git a/librespot/audio/decoders.py b/librespot/audio/decoders.py index ddb726c..92f5875 100644 --- a/librespot/audio/decoders.py +++ b/librespot/audio/decoders.py @@ -35,16 +35,16 @@ class AudioQuality(enum.Enum): AudioFile.AAC_48, ]: return AudioQuality.VERY_HIGH - raise RuntimeError("Unknown format: {}".format(format)) + raise RuntimeError(f"Unknown format: {format}") def get_matches(self, files: typing.List[AudioFile]) -> typing.List[AudioFile]: - file_list = [] - for file in files: - if hasattr(file, "format") and AudioQuality.get_quality( - file.format) == self: - file_list.append(file) - return file_list + return [ + file + for file in files + if hasattr(file, "format") + and AudioQuality.get_quality(file.format) == self + ] class VorbisOnlyAudioQuality(AudioQualityPicker): @@ -56,11 +56,15 @@ class VorbisOnlyAudioQuality(AudioQualityPicker): @staticmethod def get_vorbis_file(files: typing.List[Metadata.AudioFile]): - for file in files: - if file.HasField("format") and SuperAudioFormat.get( - file.format) == SuperAudioFormat.VORBIS: - return file - return None + return next( + ( + file + for file in files + if file.HasField("format") + and SuperAudioFormat.get(file.format) == SuperAudioFormat.VORBIS + ), + None, + ) def get_file(self, files: typing.List[Metadata.AudioFile]): matches: typing.List[Metadata.AudioFile] = self.preferred.get_matches( @@ -72,9 +76,8 @@ class VorbisOnlyAudioQuality(AudioQualityPicker): files) if vorbis is not None: self.logger.warning( - "Using {} because preferred {} couldn't be found.".format( - Metadata.AudioFile.Format.Name(vorbis.format), - self.preferred)) + f"Using {Metadata.AudioFile.Format.Name(vorbis.format)} because preferred {self.preferred} couldn't be found." + ) else: self.logger.fatal( "Couldn't find any Vorbis file, available: {}") diff --git a/librespot/audio/decrypt.py b/librespot/audio/decrypt.py index e72cd37..73f9b2e 100644 --- a/librespot/audio/decrypt.py +++ b/librespot/audio/decrypt.py @@ -32,8 +32,8 @@ class AesAudioDecrypt(AudioDecrypt): new_buffer.write(decrypted_buffer) if count != len(decrypted_buffer): raise RuntimeError( - "Couldn't process all data, actual: {}, expected: {}". - format(len(decrypted_buffer), count)) + f"Couldn't process all data, actual: {len(decrypted_buffer)}, expected: {count}" + ) iv += self.iv_diff self.decrypt_total_time += time.time_ns() - start self.decrypt_count += 1 diff --git a/librespot/audio/format.py b/librespot/audio/format.py index 24cb6d9..e8e9034 100644 --- a/librespot/audio/format.py +++ b/librespot/audio/format.py @@ -29,4 +29,4 @@ class SuperAudioFormat(enum.Enum): Metadata.AudioFile.Format.AAC_24_NORM, ]: return SuperAudioFormat.AAC - raise RuntimeError("Unknown audio format: {}".format(audio_format)) + raise RuntimeError(f"Unknown audio format: {audio_format}") diff --git a/librespot/audio/storage.py b/librespot/audio/storage.py index 51321e6..640f02f 100644 --- a/librespot/audio/storage.py +++ b/librespot/audio/storage.py @@ -51,8 +51,8 @@ class ChannelManager(Closeable, PacketsReceiver): channel = self.channels.get(chunk_id) if channel is None: self.logger.warning( - "Couldn't find channel, id: {}, received: {}".format( - chunk_id, len(packet.payload))) + f"Couldn't find channel, id: {chunk_id}, received: {len(packet.payload)}" + ) return channel.add_to_queue(payload) elif packet.is_cmd(Packet.Type.channel_error): @@ -60,15 +60,14 @@ class ChannelManager(Closeable, PacketsReceiver): channel = self.channels.get(chunk_id) if channel is None: self.logger.warning( - "Dropping channel error, id: {}, code: {}".format( - chunk_id, - struct.unpack(">H", payload.read(2))[0])) + f'Dropping channel error, id: {chunk_id}, code: {struct.unpack(">H", payload.read(2))[0]}' + ) return channel.stream_error(struct.unpack(">H", payload.read(2))[0]) else: self.logger.warning( - "Couldn't handle packet, cmd: {}, payload: {}".format( - packet.cmd, util.bytes_to_hex(packet.payload))) + f"Couldn't handle packet, cmd: {packet.cmd}, payload: {util.bytes_to_hex(packet.payload)}" + ) def close(self) -> None: self.executor_service.shutdown() @@ -95,7 +94,7 @@ class ChannelManager(Closeable, PacketsReceiver): lambda: ChannelManager.Channel.Handler(self)) def _handle(self, payload: bytes) -> bool: - if len(payload) == 0: + if not payload: if not self.__header: self.__file.write_chunk(payload, self.__chunk_index, False) return True @@ -106,7 +105,7 @@ class ChannelManager(Closeable, PacketsReceiver): length: int while len(payload.buffer) > 0: length = payload.read_short() - if not length > 0: + if length <= 0: break header_id = payload.read_byte() header_data = payload.read(length - 1) diff --git a/librespot/core.py b/librespot/core.py index f7d602d..06f39ff 100644 --- a/librespot/core.py +++ b/librespot/core.py @@ -71,7 +71,7 @@ class ApiClient(Closeable): def __init__(self, session: Session): self.__session = session - self.__base_url = "https://{}".format(ApResolver.get_random_spclient()) + self.__base_url = f"https://{ApResolver.get_random_spclient()}" def build_request( self, @@ -94,8 +94,7 @@ class ApiClient(Closeable): if self.__client_token_str is None: resp = self.__client_token() self.__client_token_str = resp.granted_token.token - self.logger.debug("Updated client token: {}".format( - self.__client_token_str)) + self.logger.debug(f"Updated client token: {self.__client_token_str}") request = requests.PreparedRequest() request.method = method @@ -103,8 +102,9 @@ class ApiClient(Closeable): request.headers = {} if headers is not None: request.headers = headers - request.headers["Authorization"] = "Bearer {}".format( - self.__session.tokens().get("playlist-read")) + request.headers[ + "Authorization" + ] = f'Bearer {self.__session.tokens().get("playlist-read")}' request.headers["client-token"] = self.__client_token_str request.url = self.__base_url + suffix return request @@ -127,9 +127,9 @@ class ApiClient(Closeable): :param bytes]: """ - response = self.__session.client().send( - self.build_request(method, suffix, headers, body)) - return response + return self.__session.client().send( + self.build_request(method, suffix, headers, body) + ) def put_connect_state(self, connection_id: str, proto: Connect.PutStateRequest) -> None: @@ -141,7 +141,7 @@ class ApiClient(Closeable): """ response = self.send( "PUT", - "/connect-state/v1/devices/{}".format(self.__session.device_id()), + f"/connect-state/v1/devices/{self.__session.device_id()}", { "Content-Type": "application/protobuf", "X-Spotify-Connection-Id": connection_id, @@ -150,11 +150,12 @@ class ApiClient(Closeable): ) if response.status_code == 413: self.logger.warning( - "PUT state payload is too large: {} bytes uncompressed.". - format(len(proto.SerializeToString()))) + f"PUT state payload is too large: {len(proto.SerializeToString())} bytes uncompressed." + ) elif response.status_code != 200: - self.logger.warning("PUT state returned {}. headers: {}".format( - response.status_code, response.headers)) + self.logger.warning( + f"PUT state returned {response.status_code}. headers: {response.headers}" + ) def get_metadata_4_track(self, track: TrackId) -> Metadata.Track: """ @@ -162,9 +163,7 @@ class ApiClient(Closeable): :param track: TrackId: """ - response = self.send("GET", - "/metadata/4/track/{}".format(track.hex_id()), - None, None) + response = self.send("GET", f"/metadata/4/track/{track.hex_id()}", None, None) ApiClient.StatusCodeException.check_status(response) body = response.content if body is None: @@ -179,9 +178,9 @@ class ApiClient(Closeable): :param episode: EpisodeId: """ - response = self.send("GET", - "/metadata/4/episode/{}".format(episode.hex_id()), - None, None) + response = self.send( + "GET", f"/metadata/4/episode/{episode.hex_id()}", None, None + ) ApiClient.StatusCodeException.check_status(response) body = response.content if body is None: @@ -196,9 +195,7 @@ class ApiClient(Closeable): :param album: AlbumId: """ - response = self.send("GET", - "/metadata/4/album/{}".format(album.hex_id()), - None, None) + response = self.send("GET", f"/metadata/4/album/{album.hex_id()}", None, None) ApiClient.StatusCodeException.check_status(response) body = response.content @@ -214,9 +211,9 @@ class ApiClient(Closeable): :param artist: ArtistId: """ - response = self.send("GET", - "/metadata/4/artist/{}".format(artist.hex_id()), - None, None) + response = self.send( + "GET", f"/metadata/4/artist/{artist.hex_id()}", None, None + ) ApiClient.StatusCodeException.check_status(response) body = response.content if body is None: @@ -231,9 +228,7 @@ class ApiClient(Closeable): :param show: ShowId: """ - response = self.send("GET", - "/metadata/4/show/{}".format(show.hex_id()), None, - None) + response = self.send("GET", f"/metadata/4/show/{show.hex_id()}", None, None) ApiClient.StatusCodeException.check_status(response) body = response.content if body is None: @@ -249,9 +244,7 @@ class ApiClient(Closeable): :param _id: PlaylistId: """ - response = self.send("GET", - "/playlist/v2/playlist/{}".format(_id.id()), None, - None) + response = self.send("GET", f"/playlist/v2/playlist/{_id.id()}", None, None) ApiClient.StatusCodeException.check_status(response) body = response.content if body is None: @@ -337,8 +330,7 @@ class ApResolver: :returns: The resulting object will be returned """ - response = requests.get("{}?type={}".format(ApResolver.base_url, - service_type)) + response = requests.get(f"{ApResolver.base_url}?type={service_type}") if response.status_code != 200: if response.status_code == 502: raise RuntimeError( @@ -417,8 +409,7 @@ class DealerClient(Closeable): """ with self.__message_listeners_lock: if listener in self.__message_listeners: - raise TypeError( - "A listener for {} has already been added.".format(uris)) + raise TypeError(f"A listener for {uris} has already been added.") self.__message_listeners[listener] = uris self.__message_listeners_lock.notify_all() @@ -431,8 +422,7 @@ class DealerClient(Closeable): """ with self.__request_listeners_lock: if uri in self.__request_listeners: - raise TypeError( - "A listener for '{}' has already been added.".format(uri)) + raise TypeError(f"A listener for '{uri}' has already been added.") self.__request_listeners[uri] = listener self.__request_listeners_lock.notify_all() @@ -445,10 +435,7 @@ class DealerClient(Closeable): self.__connection = DealerClient.ConnectionHolder( self.__session, self, - "wss://{}/?access_token={}".format( - ApResolver.get_random_dealer(), - self.__session.tokens().get("playlist-read"), - ), + f'wss://{ApResolver.get_random_dealer()}/?access_token={self.__session.tokens().get("playlist-read")}', ) def connection_invalided(self) -> None: @@ -558,10 +545,11 @@ class DealerClient(Closeable): """ with self.__request_listeners_lock: - request_listeners = {} - for key, value in self.__request_listeners.items(): - if value != listener: - request_listeners[key] = value + request_listeners = { + key: value + for key, value in self.__request_listeners.items() + if value != listener + } self.__request_listeners = request_listeners def wait_for_listener(self) -> None: @@ -573,9 +561,7 @@ class DealerClient(Closeable): def __get_headers(self, obj: typing.Any) -> dict[str, str]: headers = obj.get("headers") - if headers is None: - return {} - return headers + return {} if headers is None else headers class ConnectionHolder(Closeable): """ """ @@ -632,11 +618,8 @@ class DealerClient(Closeable): self.__dealer_client.handle_request(obj) elif typ == MessageType.PONG: self.__received_pong = True - elif typ == MessageType.PING: - pass - else: - raise RuntimeError("Unknown message type for {}".format( - typ.value)) + elif typ != MessageType.PING: + raise RuntimeError(f"Unknown message type for {typ.value}") def on_open(self, ws: websocket.WebSocketApp): """ @@ -722,11 +705,9 @@ class EventService(Closeable): add_user_field("Accept-Language", "en").add_user_field( "X-ClientTimeStamp", int(time.time() * 1000)).add_payload_part(body).build()) - self.logger.debug("Event sent. body: {}, result: {}".format( - body, resp.status_code)) + self.logger.debug(f"Event sent. body: {body}, result: {resp.status_code}") except IOError as ex: - self.logger.error("Failed sending event: {} {}".format( - event_builder, ex)) + self.logger.error(f"Failed sending event: {event_builder} {ex}") def send_event(self, event_or_builder: typing.Union[GenericEvent, EventBuilder]): @@ -815,10 +796,9 @@ class EventService(Closeable): self.body.write(b"\x09") self.body.write(bytes([c])) return self - if s is not None: - self.body.write(b"\x09") - self.append_no_delimiter(s) - return self + self.body.write(b"\x09") + self.append_no_delimiter(s) + return self def to_array(self) -> bytes: """ """ @@ -851,7 +831,7 @@ class MessageType(enum.Enum): return MessageType.PONG if _typ == MessageType.REQUEST.value: return MessageType.REQUEST - raise TypeError("Unknown MessageType: {}".format(_typ)) + raise TypeError(f"Unknown MessageType: {_typ}") class Session(Closeable, MessageListener, SubListener): @@ -904,8 +884,9 @@ class Session(Closeable, MessageListener, SubListener): self.connection = Session.ConnectionHolder.create(address, None) self.__inner = inner self.__keys = DiffieHellman() - self.logger.info("Created new session! device_id: {}, ap: {}".format( - inner.device_id, address)) + self.logger.info( + f"Created new session! device_id: {inner.device_id}, ap: {address}" + ) def api(self) -> ApiClient: """ """ @@ -952,8 +933,7 @@ class Session(Closeable, MessageListener, SubListener): self.__auth_lock_bool = False self.__auth_lock.notify_all() self.dealer().connect() - self.logger.info("Authenticated as {}!".format( - self.__ap_welcome.canonical_username)) + self.logger.info(f"Authenticated as {self.__ap_welcome.canonical_username}!") self.mercury().interested_in("spotify:user:attributes:update", self) self.dealer().add_message_listener( self, ["hm://connect-state/v1/connect/logout"]) @@ -985,8 +965,7 @@ class Session(Closeable, MessageListener, SubListener): def close(self) -> None: """Close instance""" - self.logger.info("Closing session. device_id: {}".format( - self.__inner.device_id)) + self.logger.info(f"Closing session. device_id: {self.__inner.device_id}") self.__closing = True if self.__dealer_client is not None: self.__dealer_client.close() @@ -1012,8 +991,7 @@ class Session(Closeable, MessageListener, SubListener): self.__ap_welcome = None self.cipher_pair = None self.__closed = True - self.logger.info("Closed session. device_id: {}".format( - self.__inner.device_id)) + self.logger.info(f"Closed session. device_id: {self.__inner.device_id}") def connect(self) -> None: """Connect to the Spotify Server""" @@ -1119,8 +1097,7 @@ class Session(Closeable, MessageListener, SubListener): :param conf: Configuration: """ - client = requests.Session() - return client + return requests.Session() def dealer(self) -> DealerClient: """ """ @@ -1152,8 +1129,7 @@ class Session(Closeable, MessageListener, SubListener): attributes_update.ParseFromString(resp.payload) for pair in attributes_update.pairs_list: self.__user_attributes[pair.key] = pair.value - self.logger.info("Updated user attribute: {} -> {}".format( - pair.key, pair.value)) + self.logger.info(f"Updated user attribute: {pair.key} -> {pair.value}") def get_user_attribute(self, key: str, fallback: str = None) -> str: """ @@ -1206,8 +1182,7 @@ class Session(Closeable, MessageListener, SubListener): return for i in range(len(product)): self.__user_attributes[product[i].tag] = product[i].text - self.logger.debug("Parsed product info: {}".format( - self.__user_attributes)) + self.logger.debug(f"Parsed product info: {self.__user_attributes}") def preferred_locale(self) -> str: """ """ @@ -1229,8 +1204,9 @@ class Session(Closeable, MessageListener, SubListener): ), True, ) - self.logger.info("Re-authenticated as {}!".format( - self.__ap_welcome.canonical_username)) + self.logger.info( + f"Re-authenticated as {self.__ap_welcome.canonical_username}!" + ) def reconnecting(self) -> bool: """ """ @@ -1349,7 +1325,7 @@ class Session(Closeable, MessageListener, SubListener): self.close() raise Session.SpotifyAuthenticationException(ap_login_failed) else: - raise RuntimeError("Unknown CMD 0x" + packet.cmd.hex()) + raise RuntimeError(f"Unknown CMD 0x{packet.cmd.hex()}") def __send_unchecked(self, cmd: bytes, payload: bytes) -> None: self.cipher_pair.send_encoded(self.connection, cmd, payload) @@ -1373,10 +1349,7 @@ class Session(Closeable, MessageListener, SubListener): preferred_locale = "en" def __init__(self, conf: Session.Configuration = None): - if conf is None: - self.conf = Session.Configuration.Builder().build() - else: - self.conf = conf + self.conf = Session.Configuration.Builder().build() if conf is None else conf def set_preferred_locale(self, locale: str) -> Session.AbsBuilder: """ @@ -1385,7 +1358,7 @@ class Session(Closeable, MessageListener, SubListener): """ if len(locale) != 2: - raise TypeError("Invalid locale: {}".format(locale)) + raise TypeError(f"Invalid locale: {locale}") self.preferred_locale = locale return self @@ -1518,9 +1491,7 @@ class Session(Closeable, MessageListener, SubListener): type_int = self.read_blob_int(blob) type_ = Authentication.AuthenticationType.Name(type_int) if type_ is None: - raise IOError( - TypeError( - "Unknown AuthenticationType: {}".format(type_int))) + raise IOError(TypeError(f"Unknown AuthenticationType: {type_int}")) blob.read(1) l = self.read_blob_int(blob) auth_data = blob.read(l) @@ -2098,7 +2069,7 @@ class SearchManager: """ """ def __init__(self, status_code: int): - super().__init__("Search failed with code {}.".format(status_code)) + super().__init__(f"Search failed with code {status_code}.") class SearchRequest: """ """ @@ -2112,19 +2083,19 @@ class SearchManager: def __init__(self, query: str): self.query = query - if query == "": + if not query: raise TypeError def build_url(self) -> str: """ """ url = SearchManager.base_url + urllib.parse.quote(self.query) url += "?entityVersion=2" - url += "&catalogue=" + urllib.parse.quote(self.__catalogue) - url += "&country=" + urllib.parse.quote(self.__country) - url += "&imageSize=" + urllib.parse.quote(self.__image_size) - url += "&limit=" + str(self.__limit) - url += "&locale=" + urllib.parse.quote(self.__locale) - url += "&username=" + urllib.parse.quote(self.__username) + url += f"&catalogue={urllib.parse.quote(self.__catalogue)}" + url += f"&country={urllib.parse.quote(self.__country)}" + url += f"&imageSize={urllib.parse.quote(self.__image_size)}" + url += f"&limit={str(self.__limit)}" + url += f"&locale={urllib.parse.quote(self.__locale)}" + url += f"&username={urllib.parse.quote(self.__username)}" return url def get_catalogue(self) -> str: @@ -2224,10 +2195,9 @@ class TokenProvider: :param scopes: typing.List[str]: """ - for token in self.__tokens: - if token.has_scopes(scopes): - return token - return None + return next( + (token for token in self.__tokens if token.has_scopes(scopes)), None + ) def get(self, scope: str) -> str: """ @@ -2244,7 +2214,7 @@ class TokenProvider: """ scopes = list(scopes) - if len(scopes) == 0: + if not scopes: raise RuntimeError("The token doesn't have any scope") token = self.find_token_with_all_scopes(scopes) if token is not None: @@ -2253,15 +2223,15 @@ class TokenProvider: else: return token self.logger.debug( - "Token expired or not suitable, requesting again. scopes: {}, old_token: {}" - .format(scopes, token)) + f"Token expired or not suitable, requesting again. scopes: {scopes}, old_token: {token}" + ) response = self._session.mercury().send_sync_json( MercuryRequests.request_token(self._session.device_id(), ",".join(scopes))) token = TokenProvider.StoredToken(response) self.logger.debug( - "Updated token successfully! scopes: {}, new_token: {}".format( - scopes, token)) + f"Updated token successfully! scopes: {scopes}, new_token: {token}" + ) self.__tokens.append(token) return token @@ -2290,10 +2260,7 @@ class TokenProvider: :param scope: str: """ - for s in self.scopes: - if s == scope: - return True - return False + return any(s == scope for s in self.scopes) def has_scopes(self, sc: typing.List[str]) -> bool: """ @@ -2301,7 +2268,4 @@ class TokenProvider: :param sc: typing.List[str]: """ - for s in sc: - if not self.has_scope(s): - return False - return True + return all(self.has_scope(s) for s in sc) diff --git a/librespot/crypto.py b/librespot/crypto.py index 6ae8c23..e69fd86 100644 --- a/librespot/crypto.py +++ b/librespot/crypto.py @@ -167,14 +167,19 @@ class Packet: @staticmethod def parse(val: typing.Union[bytes, None]) -> typing.Union[bytes, None]: - for cmd in [ - Packet.Type.__dict__[attr] for attr in Packet.Type.__dict__ - if re.search("__.+?__", attr) is None - and type(Packet.Type.__dict__[attr]) is bytes - ]: - if cmd == val: - return cmd - return None + return next( + ( + cmd + for cmd in [ + Packet.Type.__dict__[attr] + for attr in Packet.Type.__dict__ + if re.search("__.+?__", attr) is None + and type(Packet.Type.__dict__[attr]) is bytes + ] + if cmd == val + ), + None, + ) @staticmethod def for_method(method: str) -> bytes: diff --git a/librespot/mercury.py b/librespot/mercury.py index 450b98f..d9e7421 100644 --- a/librespot/mercury.py +++ b/librespot/mercury.py @@ -62,7 +62,7 @@ class MercuryClient(Closeable, PacketsReceiver): elif seq_length == 8: seq = struct.unpack(">q", payload.read(8))[0] else: - raise RuntimeError("Unknown seq length: {}".format(seq_length)) + raise RuntimeError(f"Unknown seq length: {seq_length}") flags = payload.read(1) parts = struct.unpack(">H", payload.read(2))[0] partial = self.__partials.get(seq) @@ -70,8 +70,8 @@ class MercuryClient(Closeable, PacketsReceiver): partial = [] self.__partials[seq] = partial self.logger.debug( - "Handling packet, cmd: 0x{}, seq: {}, flags: {}, parts: {}".format( - util.bytes_to_hex(packet.cmd), seq, flags, parts)) + f"Handling packet, cmd: 0x{util.bytes_to_hex(packet.cmd)}, seq: {seq}, flags: {flags}, parts: {parts}" + ) for _ in range(parts): size = struct.unpack(">H", payload.read(2))[0] buffer = payload.read(size) @@ -92,9 +92,8 @@ class MercuryClient(Closeable, PacketsReceiver): dispatched = True if not dispatched: self.logger.debug( - "Couldn't dispatch Mercury event seq: {}, uri: {}, code: {}, payload: {}" - .format(seq, header.uri, header.status_code, - response.payload)) + f"Couldn't dispatch Mercury event seq: {seq}, uri: {header.uri}, code: {header.status_code}, payload: {response.payload}" + ) elif (packet.is_cmd(Packet.Type.mercury_req) or packet.is_cmd(Packet.Type.mercury_sub) or packet.is_cmd(Packet.Type.mercury_sub)): @@ -104,14 +103,14 @@ class MercuryClient(Closeable, PacketsReceiver): callback.response(response) else: self.logger.warning( - "Skipped Mercury response, seq: {}, uri: {}, code: {}". - format(seq, response.uri, response.status_code)) + f"Skipped Mercury response, seq: {seq}, uri: {response.uri}, code: {response.status_code}" + ) with self.__remove_callback_lock: self.__remove_callback_lock.notify_all() else: self.logger.warning( - "Couldn't handle packet, seq: {}, uri: {}, code: {}".format( - seq, header.uri, header.status_code)) + f"Couldn't handle packet, seq: {seq}, uri: {header.uri}, code: {header.status_code}" + ) def interested_in(self, uri: str, listener: SubListener) -> None: self.__subscriptions.append( @@ -141,8 +140,8 @@ class MercuryClient(Closeable, PacketsReceiver): seq = self.__seq_holder self.__seq_holder += 1 self.logger.debug( - "Send Mercury request, seq: {}, uri: {}, method: {}".format( - seq, request.header.uri, request.header.method)) + f"Send Mercury request, seq: {seq}, uri: {request.header.uri}, method: {request.header.method}" + ) buffer.write(struct.pack(">H", 4)) buffer.write(struct.pack(">i", seq)) buffer.write(b"\x01") @@ -173,8 +172,8 @@ class MercuryClient(Closeable, PacketsReceiver): response = callback.wait_response() if response is None: raise IOError( - "Request timeout out, {} passed, yet no response. seq: {}". - format(self.mercury_request_timeout, seq)) + f"Request timeout out, {self.mercury_request_timeout} passed, yet no response. seq: {seq}" + ) return response except queue.Empty as e: raise IOError(e) @@ -204,7 +203,7 @@ class MercuryClient(Closeable, PacketsReceiver): else: self.__subscriptions.append( MercuryClient.InternalSubListener(uri, listener, True)) - self.logger.debug("Subscribed successfully to {}!".format(uri)) + self.logger.debug(f"Subscribed successfully to {uri}!") def unsubscribe(self, uri) -> None: """ @@ -219,7 +218,7 @@ class MercuryClient(Closeable, PacketsReceiver): if subscription.matches(uri): self.__subscriptions.remove(subscription) break - self.logger.debug("Unsubscribed successfully from {}!".format(uri)) + self.logger.debug(f"Unsubscribed successfully from {uri}!") class Callback: def response(self, response: MercuryClient.Response) -> None: @@ -257,7 +256,7 @@ class MercuryClient(Closeable, PacketsReceiver): code: int def __init__(self, response: MercuryClient.Response): - super().__init__("status: {}".format(response.status_code)) + super().__init__(f"status: {response.status_code}") self.code = response.status_code class PubSubException(MercuryException): @@ -303,9 +302,9 @@ class MercuryRequests: def request_token(device_id, scope): return JsonMercuryRequest( RawMercuryRequest.get( - "hm://keymaster/token/authenticated?scope={}&client_id={}&device_id={}" - .format(scope, MercuryRequests.keymaster_client_id, - device_id))) + f"hm://keymaster/token/authenticated?scope={scope}&client_id={MercuryRequests.keymaster_client_id}&device_id={device_id}" + ) + ) class RawMercuryRequest: diff --git a/librespot/metadata.py b/librespot/metadata.py index a5e01e9..dd093aa 100644 --- a/librespot/metadata.py +++ b/librespot/metadata.py @@ -40,13 +40,15 @@ class PlayableId: return TrackId.from_uri(uri) if EpisodeId.pattern.search(uri) is not None: return EpisodeId.from_uri(uri) - raise TypeError("Unknown uri: {}".format(uri)) + raise TypeError(f"Unknown uri: {uri}") @staticmethod def is_supported(uri: str): - return (not uri.startswith("spotify:local:") - and not uri == "spotify:delimiter" - and not uri == "spotify:meta:delimiter") + return ( + not uri.startswith("spotify:local:") + and uri != "spotify:delimiter" + and uri != "spotify:meta:delimiter" + ) @staticmethod def should_play(track: ContextTrack): @@ -76,13 +78,13 @@ class PlaylistId(SpotifyId): if matcher is not None: playlist_id = matcher.group(1) return PlaylistId(playlist_id) - raise TypeError("Not a Spotify playlist ID: {}.".format(uri)) + raise TypeError(f"Not a Spotify playlist ID: {uri}.") def id(self) -> str: return self.__id def to_spotify_uri(self) -> str: - return "spotify:playlist:" + self.__id + return f"spotify:playlist:{self.__id}" class UnsupportedId(PlayableId): @@ -115,7 +117,7 @@ class AlbumId(SpotifyId): if matcher is not None: album_id = matcher.group(1) return AlbumId(util.bytes_to_hex(AlbumId.base62.decode(album_id.encode(), 16))) - raise TypeError("Not a Spotify album ID: {}.".format(uri)) + raise TypeError(f"Not a Spotify album ID: {uri}.") @staticmethod def from_base62(base62: str) -> AlbumId: @@ -126,14 +128,13 @@ class AlbumId(SpotifyId): return AlbumId(hex_str) def to_mercury_uri(self) -> str: - return "hm://metadata/4/album/{}".format(self.__hex_id) + return f"hm://metadata/4/album/{self.__hex_id}" def hex_id(self) -> str: return self.__hex_id def to_spotify_uri(self) -> str: - return "spotify:album:{}".format( - AlbumId.base62.encode(util.hex_to_bytes(self.__hex_id)).decode()) + return f"spotify:album:{AlbumId.base62.encode(util.hex_to_bytes(self.__hex_id)).decode()}" class ArtistId(SpotifyId): @@ -151,7 +152,7 @@ class ArtistId(SpotifyId): artist_id = matcher.group(1) return ArtistId( util.bytes_to_hex(ArtistId.base62.decode(artist_id.encode(), 16))) - raise TypeError("Not a Spotify artist ID: {}".format(uri)) + raise TypeError(f"Not a Spotify artist ID: {uri}") @staticmethod def from_base62(base62: str) -> ArtistId: @@ -162,11 +163,10 @@ class ArtistId(SpotifyId): return ArtistId(hex_str) def to_mercury_uri(self) -> str: - return "hm://metadata/4/artist/{}".format(self.__hex_id) + return f"hm://metadata/4/artist/{self.__hex_id}" def to_spotify_uri(self) -> str: - return "spotify:artist:{}".format( - ArtistId.base62.encode(util.hex_to_bytes(self.__hex_id)).decode()) + return f"spotify:artist:{ArtistId.base62.encode(util.hex_to_bytes(self.__hex_id)).decode()}" def hex_id(self) -> str: return self.__hex_id @@ -186,7 +186,7 @@ class EpisodeId(SpotifyId, PlayableId): episode_id = matcher.group(1) return EpisodeId( util.bytes_to_hex(PlayableId.base62.decode(episode_id.encode(), 16))) - raise TypeError("Not a Spotify episode ID: {}".format(uri)) + raise TypeError(f"Not a Spotify episode ID: {uri}") @staticmethod def from_base62(base62: str) -> EpisodeId: @@ -198,11 +198,10 @@ class EpisodeId(SpotifyId, PlayableId): return EpisodeId(hex_str) def to_mercury_uri(self) -> str: - return "hm://metadata/4/episode/{}".format(self.__hex_id) + return f"hm://metadata/4/episode/{self.__hex_id}" def to_spotify_uri(self) -> str: - return "Spotify:episode:{}".format( - PlayableId.base62.encode(util.hex_to_bytes(self.__hex_id)).decode()) + return f"Spotify:episode:{PlayableId.base62.encode(util.hex_to_bytes(self.__hex_id)).decode()}" def hex_id(self) -> str: return self.__hex_id @@ -225,7 +224,7 @@ class ShowId(SpotifyId): if matcher is not None: show_id = matcher.group(1) return ShowId(util.bytes_to_hex(ShowId.base62.decode(show_id.encode(), 16))) - raise TypeError("Not a Spotify show ID: {}".format(uri)) + raise TypeError(f"Not a Spotify show ID: {uri}") @staticmethod def from_base62(base62: str) -> ShowId: @@ -236,11 +235,10 @@ class ShowId(SpotifyId): return ShowId(hex_str) def to_mercury_uri(self) -> str: - return "hm://metadata/4/show/{}".format(self.__hex_id) + return f"hm://metadata/4/show/{self.__hex_id}" def to_spotify_uri(self) -> str: - return "spotify:show:{}".format( - ShowId.base62.encode(util.hex_to_bytes(self.__hex_id)).decode()) + return f"spotify:show:{ShowId.base62.encode(util.hex_to_bytes(self.__hex_id)).decode()}" def hex_id(self) -> str: return self.__hex_id @@ -260,7 +258,7 @@ class TrackId(PlayableId, SpotifyId): track_id = search.group(1) return TrackId( util.bytes_to_hex(PlayableId.base62.decode(track_id.encode(), 16))) - raise RuntimeError("Not a Spotify track ID: {}".format(uri)) + raise RuntimeError(f"Not a Spotify track ID: {uri}") @staticmethod def from_base62(base62: str) -> TrackId: @@ -271,10 +269,10 @@ class TrackId(PlayableId, SpotifyId): return TrackId(hex_str) def to_mercury_uri(self) -> str: - return "hm://metadata/4/track/{}".format(self.__hex_id) + return f"hm://metadata/4/track/{self.__hex_id}" def to_spotify_uri(self) -> str: - return "spotify:track:{}".format(TrackId.base62.encode(util.hex_to_bytes(self.__hex_id)).decode()) + return f"spotify:track:{TrackId.base62.encode(util.hex_to_bytes(self.__hex_id)).decode()}" def hex_id(self) -> str: return self.__hex_id diff --git a/librespot/util.py b/librespot/util.py index 28d7441..67362df 100644 --- a/librespot/util.py +++ b/librespot/util.py @@ -33,7 +33,7 @@ def int_to_bytes(i: int): def random_hex_string(length: int): - buffer = Random.get_random_bytes(int(length / 2)) + buffer = Random.get_random_bytes(length // 2) return bytes_to_hex(buffer) @@ -73,7 +73,7 @@ class Base62: len(message), source_base, target_base) if length == -1 else length out = b"" source = message - while len(source) > 0: + while source: quotient = b"" remainder = 0 for b in source: diff --git a/librespot/zeroconf.py b/librespot/zeroconf.py index afe6847..cd64509 100644 --- a/librespot/zeroconf.py +++ b/librespot/zeroconf.py @@ -68,19 +68,20 @@ class ZeroconfServer(Closeable): self.__zeroconf = zeroconf.Zeroconf() self.__service_info = zeroconf.ServiceInfo( ZeroconfServer.service, - inner.device_name + "." + ZeroconfServer.service, + f"{inner.device_name}.{ZeroconfServer.service}", listen_port, 0, - 0, { + 0, + { "CPath": "/", "VERSION": "1.0", "STACK": "SP", }, - self.get_useful_hostname() + ".", + f"{self.get_useful_hostname()}.", addresses=[ - socket.inet_aton( - socket.gethostbyname(self.get_useful_hostname())) - ]) + socket.inet_aton(socket.gethostbyname(self.get_useful_hostname())) + ], + ) self.__zeroconf.register_service(self.__service_info) threading.Thread(target=self.__zeroconf.start, name="zeroconf-multicast-dns-server").start() @@ -102,9 +103,7 @@ class ZeroconfServer(Closeable): def get_useful_hostname(self) -> str: host = socket.gethostname() - if host == "localhost": - pass - else: + if host != "localhost": return host def handle_add_user(self, __socket: socket.socket, params: dict[str, str], @@ -122,8 +121,7 @@ class ZeroconfServer(Closeable): self.logger.error("Missing clientKey!") with self.__connection_lock: if username == self.__connecting_username: - self.logger.info( - "{} is already trying to connect.".format(username)) + self.logger.info(f"{username} is already trying to connect.") __socket.send(http_version.encode()) __socket.send(b" 403 Forbidden") __socket.send(self.__eol) @@ -164,8 +162,9 @@ class ZeroconfServer(Closeable): self.close_session() with self.__connection_lock: self.__connecting_username = username - self.logger.info("Accepted new user from {}. [deviceId: {}]".format( - params.get("deviceName"), self.__inner.device_id)) + self.logger.info( + f'Accepted new user from {params.get("deviceName")}. [deviceId: {self.__inner.device_id}]' + ) response = json.dumps(self.__default_successful_add_user) __socket.send(http_version.encode()) __socket.send(b" 200 OK") @@ -176,12 +175,12 @@ class ZeroconfServer(Closeable): __socket.send(self.__eol) __socket.send(response.encode()) self.__session = Session.Builder(self.__inner.conf) \ - .set_device_id(self.__inner.device_id) \ - .set_device_name(self.__inner.device_name) \ - .set_device_type(self.__inner.device_type) \ - .set_preferred_locale(self.__inner.preferred_locale) \ - .blob(username, decrypted) \ - .create() + .set_device_id(self.__inner.device_id) \ + .set_device_name(self.__inner.device_name) \ + .set_device_type(self.__inner.device_type) \ + .set_preferred_locale(self.__inner.preferred_locale) \ + .blob(username, decrypted) \ + .create() with self.__connection_lock: self.__connecting_username = None for session_listener in self.__session_listeners: @@ -214,7 +213,7 @@ class ZeroconfServer(Closeable): return valid def parse_path(self, path: str) -> dict[str, str]: - url = "https://host" + path + url = f"https://host{path}" parsed = {} params = urllib.parse.parse_qs(urllib.parse.urlparse(url).query) for key, values in params.items(): @@ -250,8 +249,8 @@ class ZeroconfServer(Closeable): self.__socket.listen(5) self.__zeroconf_server = zeroconf_server self.__zeroconf_server.logger.info( - "Zeroconf HTTP server started successfully on port {}!".format( - port)) + f"Zeroconf HTTP server started successfully on port {port}!" + ) def close(self) -> None: pass @@ -271,7 +270,8 @@ class ZeroconfServer(Closeable): request_line = request.readline().strip().split(b" ") if len(request_line) != 3: self.__zeroconf_server.logger.warning( - "Unexpected request line: {}".format(request_line)) + f"Unexpected request line: {request_line}" + ) method = request_line[0].decode() path = request_line[1].decode() http_version = request_line[2].decode() @@ -284,14 +284,13 @@ class ZeroconfServer(Closeable): headers[split[0].decode()] = split[1].strip().decode() if not self.__zeroconf_server.has_valid_session(): self.__zeroconf_server.logger.debug( - "Handling request: {}, {}, {}, headers: {}".format( - method, path, http_version, headers)) + f"Handling request: {method}, {path}, {http_version}, headers: {headers}" + ) params = {} if method == "POST": content_type = headers.get("Content-Type") if content_type != "application/x-www-form-urlencoded": - self.__zeroconf_server.logger.error( - "Bad Content-Type: {}".format(content_type)) + self.__zeroconf_server.logger.error(f"Bad Content-Type: {content_type}") return content_length_str = headers.get("Content-Length") if content_length_str is None: @@ -324,8 +323,7 @@ class ZeroconfServer(Closeable): elif action == "getInfo": self.__zeroconf_server.handle_get_info(__socket, http_version) else: - self.__zeroconf_server.logger.warning( - "Unknown action: {}".format(action)) + self.__zeroconf_server.logger.warning(f"Unknown action: {action}") class Inner: conf: typing.Final[Session.Configuration] diff --git a/librespot_player/__init__.py b/librespot_player/__init__.py index 630986e..08c7ae0 100644 --- a/librespot_player/__init__.py +++ b/librespot_player/__init__.py @@ -72,16 +72,15 @@ class DeviceStateHandler(Closeable, MessageListener, RequestListener): def put_connect_state(self, request: Connect.PutStateRequest): self.__session.api().put_connect_state(self.__connection_id, request) - self.logger.info("Put state. [ts: {}, connId: {}, reason: {}]".format( - request.client_side_timestamp, self.__connection_id, - request.put_state_reason)) + self.logger.info( + f"Put state. [ts: {request.client_side_timestamp}, connId: {self.__connection_id}, reason: {request.put_state_reason}]" + ) def update_connection_id(self, newer: str) -> None: newer = urllib.parse.unquote(newer) if self.__connection_id is None or self.__connection_id != newer: self.__connection_id = newer - self.logger.debug( - "Updated Spotify-Connection-Id: {}".format(newer)) + self.logger.debug(f"Updated Spotify-Connection-Id: {newer}") class Player: @@ -195,10 +194,14 @@ class StateWrapper(MessageListener): self.__player = player self.__device = DeviceStateHandler(session, conf) self.__conf = conf - session.dealer().add_message_listener(self, [ - "spotify:user:attributes:update", "hm://playlist/", - "hm://collection/collection/" + session.username() + "/json" - ]) + session.dealer().add_message_listener( + self, + [ + "spotify:user:attributes:update", + "hm://playlist/", + f"hm://collection/collection/{session.username()}/json", + ], + ) def on_message(self, uri: str, headers: typing.Dict[str, str], payload: bytes):