'Refactored by Sourcery'

This commit is contained in:
Sourcery AI 2023-10-24 08:25:30 +00:00
parent 450d66f434
commit b5987179e2
15 changed files with 315 additions and 343 deletions

View File

@ -28,9 +28,9 @@ def client():
splash()
cmd = input("Player >>> ")
args = cmd.split(" ")
if args[0] == "exit" or args[0] == "quit":
if args[0] in ["exit", "quit"]:
return
if (args[0] == "p" or args[0] == "play") and len(args) == 2:
if args[0] in ["p", "play"] and len(args) == 2:
track_uri_search = re.search(
r"^spotify:track:(?P<TrackID>[0-9a-zA-Z]{22})$", args[1])
track_url_search = re.search(
@ -43,44 +43,37 @@ def client():
track_url_search).group("TrackID")
play(track_id_str)
wait()
if args[0] == "q" or args[0] == "quality":
if args[0] in ["q", "quality"]:
if len(args) == 1:
print("Current Quality: " + quality.name)
print(f"Current Quality: {quality.name}")
wait()
elif len(args) == 2:
if args[1] == "normal" or args[1] == "96":
if args[1] in ["normal", "96"]:
quality = AudioQuality.NORMAL
elif args[1] == "high" or args[1] == "160":
elif args[1] in ["high", "160"]:
quality = AudioQuality.HIGH
elif args[1] == "veryhigh" or args[1] == "320":
elif args[1] in ["veryhigh", "320"]:
quality = AudioQuality.VERY_HIGH
print("Set Quality to %s" % quality.name)
print(f"Set Quality to {quality.name}")
wait()
if (args[0] == "s" or args[0] == "search") and len(args) >= 2:
if args[0] in ["s", "search"] and len(args) >= 2:
token = session.tokens().get("user-read-email")
resp = requests.get(
"https://api.spotify.com/v1/search",
{
"limit": "5",
"offset": "0",
"q": cmd[2:],
"type": "track"
},
headers={"Authorization": "Bearer %s" % token},
{"limit": "5", "offset": "0", "q": cmd[2:], "type": "track"},
headers={"Authorization": f"Bearer {token}"},
)
i = 1
tracks = resp.json()["tracks"]["items"]
for track in tracks:
for i, track in enumerate(tracks, start=1):
print("%d, %s | %s" % (
i,
track["name"],
",".join([artist["name"] for artist in track["artists"]]),
))
i += 1
position = -1
while True:
num_str = input("Select [1-5]: ")
if num_str == "exit" or num_str == "quit":
if num_str in ["exit", "quit"]:
return
try:
num = int(num_str)

View File

@ -75,46 +75,52 @@ def main():
def response(client: socket.socket, uri: str, header: dict,
body: bytes) -> tuple[str, list, bytes, bool]:
if re.search(r"^/audio/track/([0-9a-zA-Z]{22})$", uri) is not None:
track_id_search = re.search(
r"^/audio/track/(?P<TrackID>[0-9a-zA-Z]{22})$", uri)
track_id_str = track_id_search.group("TrackID")
track_id = TrackId.from_base62(track_id_str)
stream = session.content_feeder().load(
track_id, VorbisOnlyAudioQuality(AudioQuality.VERY_HIGH), False,
None)
start = 0
end = stream.input_stream.stream().size()
if header.get("range") is not None:
range_search = re.search(
"^bytes=(?P<start>[0-9]+?)-(?P<end>[0-9]+?)$",
header.get("range"))
if range_search is not None:
start = int(range_search.group("start"))
end = (int(range_search.group("end"))
if int(range_search.group("end")) <=
stream.input_stream.stream().size() else
stream.input_stream.stream().size())
stream.input_stream.stream().skip(start)
client.send(b"HTTP/1.0 200 OK\r\n")
client.send(b"Access-Control-Allow-Origin: *\r\n")
client.send(b"Content-Length: " +
(str(stream.input_stream.stream().size()).encode() if
stream.input_stream.stream().size() == end else "{}-{}/{}"
.format(start, end,
stream.input_stream.stream().size()).encode()) +
b"\r\n")
client.send(b"Content-Type: audio/ogg\r\n")
client.send(b"\r\n")
while True:
if (stream.input_stream.stream().pos() >=
stream.input_stream.stream().size()):
break
byte = stream.input_stream.stream().read(1)
client.send(byte)
return "", [], b"", True
else:
if re.search(r"^/audio/track/([0-9a-zA-Z]{22})$", uri) is None:
return HttpCode.http_404, [], HttpCode.http_404.encode(), False
track_id_search = re.search(
r"^/audio/track/(?P<TrackID>[0-9a-zA-Z]{22})$", uri)
track_id_str = track_id_search.group("TrackID")
track_id = TrackId.from_base62(track_id_str)
stream = session.content_feeder().load(
track_id, VorbisOnlyAudioQuality(AudioQuality.VERY_HIGH), False,
None)
start = 0
end = stream.input_stream.stream().size()
if header.get("range") is not None:
range_search = re.search(
"^bytes=(?P<start>[0-9]+?)-(?P<end>[0-9]+?)$",
header.get("range"))
if range_search is not None:
start = int(range_search.group("start"))
end = (int(range_search.group("end"))
if int(range_search.group("end")) <=
stream.input_stream.stream().size() else
stream.input_stream.stream().size())
stream.input_stream.stream().skip(start)
client.send(b"HTTP/1.0 200 OK\r\n")
client.send(b"Access-Control-Allow-Origin: *\r\n")
client.send(
(
(
b"Content-Length: "
+ (
str(stream.input_stream.stream().size()).encode()
if stream.input_stream.stream().size() == end
else f"{start}-{end}/{stream.input_stream.stream().size()}".encode()
)
)
+ b"\r\n"
)
)
client.send(b"Content-Type: audio/ogg\r\n")
client.send(b"\r\n")
while True:
if (stream.input_stream.stream().pos() >=
stream.input_stream.stream().size()):
break
byte = stream.input_stream.stream().read(1)
client.send(byte)
return "", [], b"", True
if __name__ == "__main__":

View File

@ -18,7 +18,7 @@ class Version:
@staticmethod
def version_string():
return "librespot-python " + Version.version_name
return f"librespot-python {Version.version_name}"
@staticmethod
def system_info_string():

View File

@ -219,8 +219,9 @@ class AbsChunkedInputStream(io.BytesIO, HaltListener):
@staticmethod
def from_stream_error(stream_error: int):
return AbsChunkedInputStream \
.ChunkException("Failed due to stream error, code: {}".format(stream_error))
return AbsChunkedInputStream.ChunkException(
f"Failed due to stream error, code: {stream_error}"
)
class AudioKeyManager(PacketsReceiver, Closeable):
@ -240,8 +241,7 @@ class AudioKeyManager(PacketsReceiver, Closeable):
seq = struct.unpack(">i", payload.read(4))[0]
callback = self.__callbacks.get(seq)
if callback is None:
self.logger.warning(
"Couldn't find callback for seq: {}".format(seq))
self.logger.warning(f"Couldn't find callback for seq: {seq}")
return
if packet.is_cmd(Packet.Type.aes_key):
key = payload.read(16)
@ -251,8 +251,8 @@ class AudioKeyManager(PacketsReceiver, Closeable):
callback.error(code)
else:
self.logger.warning(
"Couldn't handle packet, cmd: {}, length: {}".format(
packet.cmd, len(packet.payload)))
f"Couldn't handle packet, cmd: {packet.cmd}, length: {len(packet.payload)}"
)
def get_audio_key(self,
gid: bytes,
@ -276,8 +276,8 @@ class AudioKeyManager(PacketsReceiver, Closeable):
if retry:
return self.get_audio_key(gid, file_id, False)
raise RuntimeError(
"Failed fetching audio key! gid: {}, fileId: {}".format(
util.bytes_to_hex(gid), util.bytes_to_hex(file_id)))
f"Failed fetching audio key! gid: {util.bytes_to_hex(gid)}, fileId: {util.bytes_to_hex(file_id)}"
)
return key
class Callback:
@ -302,8 +302,7 @@ class AudioKeyManager(PacketsReceiver, Closeable):
self.__reference_lock.notify_all()
def error(self, code: int) -> None:
self.__audio_key_manager.logger.fatal(
"Audio key error, code: {}".format(code))
self.__audio_key_manager.logger.fatal(f"Audio key error, code: {code}")
with self.__reference_lock:
self.__reference.put(None)
self.__reference_lock.notify_all()
@ -359,8 +358,9 @@ class CdnFeedHelper:
CdnFeedHelper._LOGGER.warning("Couldn't resolve redirect!")
url = resp.url
CdnFeedHelper._LOGGER.debug("Fetched external url for {}: {}".format(
util.bytes_to_hex(episode.gid), url))
CdnFeedHelper._LOGGER.debug(
f"Fetched external url for {util.bytes_to_hex(episode.gid)}: {url}"
)
streamer = session.cdn().stream_external_episode(
episode, url, halt_listener)
@ -411,10 +411,10 @@ class CdnManager:
def get_head(self, file_id: bytes):
response = self.__session.client() \
.get(self.__session.get_user_attribute("head-files-url", "https://heads-fa.spotify.com/head/{file_id}")
.get(self.__session.get_user_attribute("head-files-url", "https://heads-fa.spotify.com/head/{file_id}")
.replace("{file_id}", util.bytes_to_hex(file_id)))
if response.status_code != 200:
raise IOError("{}".format(response.status_code))
raise IOError(f"{response.status_code}")
body = response.content
if body is None:
raise IOError("Response body is empty!")
@ -446,8 +446,12 @@ class CdnManager:
)
def get_audio_url(self, file_id: bytes):
response = self.__session.api()\
.send("GET", "/storage-resolve/files/audio/interactive/{}".format(util.bytes_to_hex(file_id)), None, None)
response = self.__session.api().send(
"GET",
f"/storage-resolve/files/audio/interactive/{util.bytes_to_hex(file_id)}",
None,
None,
)
if response.status_code != 200:
raise IOError(response.status_code)
body = response.content
@ -457,11 +461,11 @@ class CdnManager:
proto.ParseFromString(body)
if proto.result == StorageResolve.StorageResolveResponse.Result.CDN:
url = random.choice(proto.cdnurl)
self.logger.debug("Fetched CDN url for {}: {}".format(
util.bytes_to_hex(file_id), url))
self.logger.debug(f"Fetched CDN url for {util.bytes_to_hex(file_id)}: {url}")
return url
raise CdnManager.CdnException(
"Could not retrieve CDN url! result: {}".format(proto.result))
f"Could not retrieve CDN url! result: {proto.result}"
)
class CdnException(Exception):
pass
@ -508,7 +512,7 @@ class CdnManager:
expires_str = str(expires_list[0])
except TypeError:
expires_str = ""
if token_str != "None" and len(token_str) != 0:
if token_str not in ["None", ""]:
expire_at = None
split = token_str.split("~")
for s in split:
@ -521,17 +525,16 @@ class CdnManager:
break
if expire_at is None:
self.__expiration = -1
self.__cdn_manager.logger.warning(
"Invalid __token__ in CDN url: {}".format(url))
self.__cdn_manager.logger.warning(f"Invalid __token__ in CDN url: {url}")
return
self.__expiration = expire_at * 1000
elif expires_str != "None" and len(expires_str) != 0:
elif expires_str not in ["None", ""]:
expires_at = None
expires_str = expires_str.split("~")[0]
expires_at = int(expires_str)
if expires_at is None:
self.__expiration = -1
self.__cdn_manager.logger.warning("Invalid Expires param in CDN url: {}".format(url))
self.__cdn_manager.logger.warning(f"Invalid Expires param in CDN url: {url}")
return
self.__expiration = expires_at * 1000
else:
@ -539,8 +542,9 @@ class CdnManager:
i = token_url.query.index("_")
except ValueError:
self.__expiration = -1
self.__cdn_manager.logger \
.warning("Couldn't extract expiration, invalid parameter in CDN url: {}".format(url))
self.__cdn_manager.logger.warning(
f"Couldn't extract expiration, invalid parameter in CDN url: {url}"
)
return
self.__expiration = int(token_url.query[:i]) * 1000
@ -594,8 +598,8 @@ class CdnManager:
if self.__internal_stream.is_closed():
return
self.__session.logger.debug(
"Chunk {}/{} completed, cached: {}, stream: {}".format(
chunk_index + 1, self.chunks, cached, self.describe()))
f"Chunk {chunk_index + 1}/{self.chunks} completed, cached: {cached}, stream: {self.describe()}"
)
self.buffer[chunk_index] = self.__audio_decrypt.decrypt_chunk(
chunk_index, chunk)
self.__internal_stream.notify_chunk_available(chunk_index)
@ -608,9 +612,8 @@ class CdnManager:
def describe(self) -> str:
if self.__stream_id.is_episode():
return "episode_gid: {}".format(
self.__stream_id.get_episode_gid())
return "file_id: {}".format(self.__stream_id.get_file_id())
return f"episode_gid: {self.__stream_id.get_episode_gid()}"
return f"file_id: {self.__stream_id.get_file_id()}"
def decrypt_time_ms(self) -> int:
return self.__audio_decrypt.decrypt_time_ms()
@ -619,8 +622,7 @@ class CdnManager:
response = self.request(index)
self.write_chunk(response.buffer, index, False)
def request(self, chunk: int = None, range_start: int = None, range_end: int = None)\
-> CdnManager.InternalResponse:
def request(self, chunk: int = None, range_start: int = None, range_end: int = None) -> CdnManager.InternalResponse:
if chunk is None and range_start is None and range_end is None:
raise TypeError()
if chunk is not None:
@ -628,9 +630,7 @@ class CdnManager:
range_end = (chunk + 1) * ChannelManager.chunk_size - 1
response = self.__session.client().get(
self.__cdn_url.url,
headers={
"Range": "bytes={}-{}".format(range_start, range_end)
},
headers={"Range": f"bytes={range_start}-{range_end}"},
)
if response.status_code != 206:
raise IOError(response.status_code)
@ -695,8 +695,8 @@ class NormalizationData:
self.album_peak = album_peak
self._LOGGER.debug(
"Loaded normalization data, track_gain: {}, track_peak: {}, album_gain: {}, album_peak: {}"
.format(track_gain_db, track_peak, album_gain_db, album_peak))
f"Loaded normalization data, track_gain: {track_gain_db}, track_peak: {track_peak}, album_gain: {album_gain_db}, album_peak: {album_peak}"
)
@staticmethod
def read(input_stream: AbsChunkedInputStream) -> NormalizationData:
@ -738,7 +738,7 @@ class PlayableContentFeeder:
if type(playable_id) is EpisodeId:
return self.load_episode(playable_id, audio_quality_picker,
preload, halt_listener)
raise TypeError("Unknown content: {}".format(playable_id))
raise TypeError(f"Unknown content: {playable_id}")
def load_stream(self, file: Metadata.AudioFile, track: Metadata.Track,
episode: Metadata.Episode, preload: bool,
@ -753,14 +753,13 @@ class PlayableContentFeeder:
return CdnFeedHelper.load_episode(self.__session, episode, file,
response, preload, halt_lister)
if response.result == StorageResolve.StorageResolveResponse.Result.STORAGE:
if track is None:
pass
pass
elif response.result == StorageResolve.StorageResolveResponse.Result.RESTRICTED:
raise RuntimeError("Content is restricted!")
elif response.result == StorageResolve.StorageResolveResponse.Response.UNRECOGNIZED:
raise RuntimeError("Content is unrecognized!")
else:
raise RuntimeError("Unknown result: {}".format(response.result))
raise RuntimeError(f"Unknown result: {response.result}")
def load_episode(self, episode_id: EpisodeId,
audio_quality_picker: AudioQualityPicker, preload: bool,
@ -772,8 +771,8 @@ class PlayableContentFeeder:
file = audio_quality_picker.get_file(episode.audio)
if file is None:
self.logger.fatal(
"Couldn't find any suitable audio file, available: {}".format(
episode.audio))
f"Couldn't find any suitable audio file, available: {episode.audio}"
)
return self.load_stream(file, None, episode, preload, halt_listener)
def load_track(self, track_id_or_track: typing.Union[TrackId,
@ -791,8 +790,8 @@ class PlayableContentFeeder:
file = audio_quality_picker.get_file(track.file)
if file is None:
self.logger.fatal(
"Couldn't find any suitable audio file, available: {}".format(
track.file))
f"Couldn't find any suitable audio file, available: {track.file}"
)
raise FeederException()
return self.load_stream(file, track, None, preload, halt_listener)
@ -800,9 +799,9 @@ class PlayableContentFeeder:
self, track: Metadata.Track) -> typing.Union[Metadata.Track, None]:
if len(track.file) > 0:
return track
for alt in track.alternative:
if len(alt.file) > 0:
return Metadata.Track(
return next(
(
Metadata.Track(
gid=track.gid,
name=track.name,
album=track.album,
@ -821,8 +820,13 @@ class PlayableContentFeeder:
earliest_live_timestamp=track.earliest_live_timestamp,
has_lyrics=track.has_lyrics,
availability=track.availability,
licensor=track.licensor)
return None
licensor=track.licensor,
)
for alt in track.alternative
if len(alt.file) > 0
),
None,
)
def resolve_storage_interactive(
self, file_id: bytes,

View File

@ -35,16 +35,16 @@ class AudioQuality(enum.Enum):
AudioFile.AAC_48,
]:
return AudioQuality.VERY_HIGH
raise RuntimeError("Unknown format: {}".format(format))
raise RuntimeError(f"Unknown format: {format}")
def get_matches(self,
files: typing.List[AudioFile]) -> typing.List[AudioFile]:
file_list = []
for file in files:
if hasattr(file, "format") and AudioQuality.get_quality(
file.format) == self:
file_list.append(file)
return file_list
return [
file
for file in files
if hasattr(file, "format")
and AudioQuality.get_quality(file.format) == self
]
class VorbisOnlyAudioQuality(AudioQualityPicker):
@ -56,11 +56,15 @@ class VorbisOnlyAudioQuality(AudioQualityPicker):
@staticmethod
def get_vorbis_file(files: typing.List[Metadata.AudioFile]):
for file in files:
if file.HasField("format") and SuperAudioFormat.get(
file.format) == SuperAudioFormat.VORBIS:
return file
return None
return next(
(
file
for file in files
if file.HasField("format")
and SuperAudioFormat.get(file.format) == SuperAudioFormat.VORBIS
),
None,
)
def get_file(self, files: typing.List[Metadata.AudioFile]):
matches: typing.List[Metadata.AudioFile] = self.preferred.get_matches(
@ -72,9 +76,8 @@ class VorbisOnlyAudioQuality(AudioQualityPicker):
files)
if vorbis is not None:
self.logger.warning(
"Using {} because preferred {} couldn't be found.".format(
Metadata.AudioFile.Format.Name(vorbis.format),
self.preferred))
f"Using {Metadata.AudioFile.Format.Name(vorbis.format)} because preferred {self.preferred} couldn't be found."
)
else:
self.logger.fatal(
"Couldn't find any Vorbis file, available: {}")

View File

@ -32,8 +32,8 @@ class AesAudioDecrypt(AudioDecrypt):
new_buffer.write(decrypted_buffer)
if count != len(decrypted_buffer):
raise RuntimeError(
"Couldn't process all data, actual: {}, expected: {}".
format(len(decrypted_buffer), count))
f"Couldn't process all data, actual: {len(decrypted_buffer)}, expected: {count}"
)
iv += self.iv_diff
self.decrypt_total_time += time.time_ns() - start
self.decrypt_count += 1

View File

@ -29,4 +29,4 @@ class SuperAudioFormat(enum.Enum):
Metadata.AudioFile.Format.AAC_24_NORM,
]:
return SuperAudioFormat.AAC
raise RuntimeError("Unknown audio format: {}".format(audio_format))
raise RuntimeError(f"Unknown audio format: {audio_format}")

View File

@ -51,8 +51,8 @@ class ChannelManager(Closeable, PacketsReceiver):
channel = self.channels.get(chunk_id)
if channel is None:
self.logger.warning(
"Couldn't find channel, id: {}, received: {}".format(
chunk_id, len(packet.payload)))
f"Couldn't find channel, id: {chunk_id}, received: {len(packet.payload)}"
)
return
channel.add_to_queue(payload)
elif packet.is_cmd(Packet.Type.channel_error):
@ -60,15 +60,14 @@ class ChannelManager(Closeable, PacketsReceiver):
channel = self.channels.get(chunk_id)
if channel is None:
self.logger.warning(
"Dropping channel error, id: {}, code: {}".format(
chunk_id,
struct.unpack(">H", payload.read(2))[0]))
f'Dropping channel error, id: {chunk_id}, code: {struct.unpack(">H", payload.read(2))[0]}'
)
return
channel.stream_error(struct.unpack(">H", payload.read(2))[0])
else:
self.logger.warning(
"Couldn't handle packet, cmd: {}, payload: {}".format(
packet.cmd, util.bytes_to_hex(packet.payload)))
f"Couldn't handle packet, cmd: {packet.cmd}, payload: {util.bytes_to_hex(packet.payload)}"
)
def close(self) -> None:
self.executor_service.shutdown()
@ -95,7 +94,7 @@ class ChannelManager(Closeable, PacketsReceiver):
lambda: ChannelManager.Channel.Handler(self))
def _handle(self, payload: bytes) -> bool:
if len(payload) == 0:
if not payload:
if not self.__header:
self.__file.write_chunk(payload, self.__chunk_index, False)
return True
@ -106,7 +105,7 @@ class ChannelManager(Closeable, PacketsReceiver):
length: int
while len(payload.buffer) > 0:
length = payload.read_short()
if not length > 0:
if length <= 0:
break
header_id = payload.read_byte()
header_data = payload.read(length - 1)

View File

@ -71,7 +71,7 @@ class ApiClient(Closeable):
def __init__(self, session: Session):
self.__session = session
self.__base_url = "https://{}".format(ApResolver.get_random_spclient())
self.__base_url = f"https://{ApResolver.get_random_spclient()}"
def build_request(
self,
@ -94,8 +94,7 @@ class ApiClient(Closeable):
if self.__client_token_str is None:
resp = self.__client_token()
self.__client_token_str = resp.granted_token.token
self.logger.debug("Updated client token: {}".format(
self.__client_token_str))
self.logger.debug(f"Updated client token: {self.__client_token_str}")
request = requests.PreparedRequest()
request.method = method
@ -103,8 +102,9 @@ class ApiClient(Closeable):
request.headers = {}
if headers is not None:
request.headers = headers
request.headers["Authorization"] = "Bearer {}".format(
self.__session.tokens().get("playlist-read"))
request.headers[
"Authorization"
] = f'Bearer {self.__session.tokens().get("playlist-read")}'
request.headers["client-token"] = self.__client_token_str
request.url = self.__base_url + suffix
return request
@ -127,9 +127,9 @@ class ApiClient(Closeable):
:param bytes]:
"""
response = self.__session.client().send(
self.build_request(method, suffix, headers, body))
return response
return self.__session.client().send(
self.build_request(method, suffix, headers, body)
)
def put_connect_state(self, connection_id: str,
proto: Connect.PutStateRequest) -> None:
@ -141,7 +141,7 @@ class ApiClient(Closeable):
"""
response = self.send(
"PUT",
"/connect-state/v1/devices/{}".format(self.__session.device_id()),
f"/connect-state/v1/devices/{self.__session.device_id()}",
{
"Content-Type": "application/protobuf",
"X-Spotify-Connection-Id": connection_id,
@ -150,11 +150,12 @@ class ApiClient(Closeable):
)
if response.status_code == 413:
self.logger.warning(
"PUT state payload is too large: {} bytes uncompressed.".
format(len(proto.SerializeToString())))
f"PUT state payload is too large: {len(proto.SerializeToString())} bytes uncompressed."
)
elif response.status_code != 200:
self.logger.warning("PUT state returned {}. headers: {}".format(
response.status_code, response.headers))
self.logger.warning(
f"PUT state returned {response.status_code}. headers: {response.headers}"
)
def get_metadata_4_track(self, track: TrackId) -> Metadata.Track:
"""
@ -162,9 +163,7 @@ class ApiClient(Closeable):
:param track: TrackId:
"""
response = self.send("GET",
"/metadata/4/track/{}".format(track.hex_id()),
None, None)
response = self.send("GET", f"/metadata/4/track/{track.hex_id()}", None, None)
ApiClient.StatusCodeException.check_status(response)
body = response.content
if body is None:
@ -179,9 +178,9 @@ class ApiClient(Closeable):
:param episode: EpisodeId:
"""
response = self.send("GET",
"/metadata/4/episode/{}".format(episode.hex_id()),
None, None)
response = self.send(
"GET", f"/metadata/4/episode/{episode.hex_id()}", None, None
)
ApiClient.StatusCodeException.check_status(response)
body = response.content
if body is None:
@ -196,9 +195,7 @@ class ApiClient(Closeable):
:param album: AlbumId:
"""
response = self.send("GET",
"/metadata/4/album/{}".format(album.hex_id()),
None, None)
response = self.send("GET", f"/metadata/4/album/{album.hex_id()}", None, None)
ApiClient.StatusCodeException.check_status(response)
body = response.content
@ -214,9 +211,9 @@ class ApiClient(Closeable):
:param artist: ArtistId:
"""
response = self.send("GET",
"/metadata/4/artist/{}".format(artist.hex_id()),
None, None)
response = self.send(
"GET", f"/metadata/4/artist/{artist.hex_id()}", None, None
)
ApiClient.StatusCodeException.check_status(response)
body = response.content
if body is None:
@ -231,9 +228,7 @@ class ApiClient(Closeable):
:param show: ShowId:
"""
response = self.send("GET",
"/metadata/4/show/{}".format(show.hex_id()), None,
None)
response = self.send("GET", f"/metadata/4/show/{show.hex_id()}", None, None)
ApiClient.StatusCodeException.check_status(response)
body = response.content
if body is None:
@ -249,9 +244,7 @@ class ApiClient(Closeable):
:param _id: PlaylistId:
"""
response = self.send("GET",
"/playlist/v2/playlist/{}".format(_id.id()), None,
None)
response = self.send("GET", f"/playlist/v2/playlist/{_id.id()}", None, None)
ApiClient.StatusCodeException.check_status(response)
body = response.content
if body is None:
@ -337,8 +330,7 @@ class ApResolver:
:returns: The resulting object will be returned
"""
response = requests.get("{}?type={}".format(ApResolver.base_url,
service_type))
response = requests.get(f"{ApResolver.base_url}?type={service_type}")
if response.status_code != 200:
if response.status_code == 502:
raise RuntimeError(
@ -417,8 +409,7 @@ class DealerClient(Closeable):
"""
with self.__message_listeners_lock:
if listener in self.__message_listeners:
raise TypeError(
"A listener for {} has already been added.".format(uris))
raise TypeError(f"A listener for {uris} has already been added.")
self.__message_listeners[listener] = uris
self.__message_listeners_lock.notify_all()
@ -431,8 +422,7 @@ class DealerClient(Closeable):
"""
with self.__request_listeners_lock:
if uri in self.__request_listeners:
raise TypeError(
"A listener for '{}' has already been added.".format(uri))
raise TypeError(f"A listener for '{uri}' has already been added.")
self.__request_listeners[uri] = listener
self.__request_listeners_lock.notify_all()
@ -445,10 +435,7 @@ class DealerClient(Closeable):
self.__connection = DealerClient.ConnectionHolder(
self.__session,
self,
"wss://{}/?access_token={}".format(
ApResolver.get_random_dealer(),
self.__session.tokens().get("playlist-read"),
),
f'wss://{ApResolver.get_random_dealer()}/?access_token={self.__session.tokens().get("playlist-read")}',
)
def connection_invalided(self) -> None:
@ -558,10 +545,11 @@ class DealerClient(Closeable):
"""
with self.__request_listeners_lock:
request_listeners = {}
for key, value in self.__request_listeners.items():
if value != listener:
request_listeners[key] = value
request_listeners = {
key: value
for key, value in self.__request_listeners.items()
if value != listener
}
self.__request_listeners = request_listeners
def wait_for_listener(self) -> None:
@ -573,9 +561,7 @@ class DealerClient(Closeable):
def __get_headers(self, obj: typing.Any) -> dict[str, str]:
headers = obj.get("headers")
if headers is None:
return {}
return headers
return {} if headers is None else headers
class ConnectionHolder(Closeable):
""" """
@ -632,11 +618,8 @@ class DealerClient(Closeable):
self.__dealer_client.handle_request(obj)
elif typ == MessageType.PONG:
self.__received_pong = True
elif typ == MessageType.PING:
pass
else:
raise RuntimeError("Unknown message type for {}".format(
typ.value))
elif typ != MessageType.PING:
raise RuntimeError(f"Unknown message type for {typ.value}")
def on_open(self, ws: websocket.WebSocketApp):
"""
@ -722,11 +705,9 @@ class EventService(Closeable):
add_user_field("Accept-Language", "en").add_user_field(
"X-ClientTimeStamp",
int(time.time() * 1000)).add_payload_part(body).build())
self.logger.debug("Event sent. body: {}, result: {}".format(
body, resp.status_code))
self.logger.debug(f"Event sent. body: {body}, result: {resp.status_code}")
except IOError as ex:
self.logger.error("Failed sending event: {} {}".format(
event_builder, ex))
self.logger.error(f"Failed sending event: {event_builder} {ex}")
def send_event(self, event_or_builder: typing.Union[GenericEvent,
EventBuilder]):
@ -815,10 +796,9 @@ class EventService(Closeable):
self.body.write(b"\x09")
self.body.write(bytes([c]))
return self
if s is not None:
self.body.write(b"\x09")
self.append_no_delimiter(s)
return self
self.body.write(b"\x09")
self.append_no_delimiter(s)
return self
def to_array(self) -> bytes:
""" """
@ -851,7 +831,7 @@ class MessageType(enum.Enum):
return MessageType.PONG
if _typ == MessageType.REQUEST.value:
return MessageType.REQUEST
raise TypeError("Unknown MessageType: {}".format(_typ))
raise TypeError(f"Unknown MessageType: {_typ}")
class Session(Closeable, MessageListener, SubListener):
@ -904,8 +884,9 @@ class Session(Closeable, MessageListener, SubListener):
self.connection = Session.ConnectionHolder.create(address, None)
self.__inner = inner
self.__keys = DiffieHellman()
self.logger.info("Created new session! device_id: {}, ap: {}".format(
inner.device_id, address))
self.logger.info(
f"Created new session! device_id: {inner.device_id}, ap: {address}"
)
def api(self) -> ApiClient:
""" """
@ -952,8 +933,7 @@ class Session(Closeable, MessageListener, SubListener):
self.__auth_lock_bool = False
self.__auth_lock.notify_all()
self.dealer().connect()
self.logger.info("Authenticated as {}!".format(
self.__ap_welcome.canonical_username))
self.logger.info(f"Authenticated as {self.__ap_welcome.canonical_username}!")
self.mercury().interested_in("spotify:user:attributes:update", self)
self.dealer().add_message_listener(
self, ["hm://connect-state/v1/connect/logout"])
@ -985,8 +965,7 @@ class Session(Closeable, MessageListener, SubListener):
def close(self) -> None:
"""Close instance"""
self.logger.info("Closing session. device_id: {}".format(
self.__inner.device_id))
self.logger.info(f"Closing session. device_id: {self.__inner.device_id}")
self.__closing = True
if self.__dealer_client is not None:
self.__dealer_client.close()
@ -1012,8 +991,7 @@ class Session(Closeable, MessageListener, SubListener):
self.__ap_welcome = None
self.cipher_pair = None
self.__closed = True
self.logger.info("Closed session. device_id: {}".format(
self.__inner.device_id))
self.logger.info(f"Closed session. device_id: {self.__inner.device_id}")
def connect(self) -> None:
"""Connect to the Spotify Server"""
@ -1119,8 +1097,7 @@ class Session(Closeable, MessageListener, SubListener):
:param conf: Configuration:
"""
client = requests.Session()
return client
return requests.Session()
def dealer(self) -> DealerClient:
""" """
@ -1152,8 +1129,7 @@ class Session(Closeable, MessageListener, SubListener):
attributes_update.ParseFromString(resp.payload)
for pair in attributes_update.pairs_list:
self.__user_attributes[pair.key] = pair.value
self.logger.info("Updated user attribute: {} -> {}".format(
pair.key, pair.value))
self.logger.info(f"Updated user attribute: {pair.key} -> {pair.value}")
def get_user_attribute(self, key: str, fallback: str = None) -> str:
"""
@ -1206,8 +1182,7 @@ class Session(Closeable, MessageListener, SubListener):
return
for i in range(len(product)):
self.__user_attributes[product[i].tag] = product[i].text
self.logger.debug("Parsed product info: {}".format(
self.__user_attributes))
self.logger.debug(f"Parsed product info: {self.__user_attributes}")
def preferred_locale(self) -> str:
""" """
@ -1229,8 +1204,9 @@ class Session(Closeable, MessageListener, SubListener):
),
True,
)
self.logger.info("Re-authenticated as {}!".format(
self.__ap_welcome.canonical_username))
self.logger.info(
f"Re-authenticated as {self.__ap_welcome.canonical_username}!"
)
def reconnecting(self) -> bool:
""" """
@ -1349,7 +1325,7 @@ class Session(Closeable, MessageListener, SubListener):
self.close()
raise Session.SpotifyAuthenticationException(ap_login_failed)
else:
raise RuntimeError("Unknown CMD 0x" + packet.cmd.hex())
raise RuntimeError(f"Unknown CMD 0x{packet.cmd.hex()}")
def __send_unchecked(self, cmd: bytes, payload: bytes) -> None:
self.cipher_pair.send_encoded(self.connection, cmd, payload)
@ -1373,10 +1349,7 @@ class Session(Closeable, MessageListener, SubListener):
preferred_locale = "en"
def __init__(self, conf: Session.Configuration = None):
if conf is None:
self.conf = Session.Configuration.Builder().build()
else:
self.conf = conf
self.conf = Session.Configuration.Builder().build() if conf is None else conf
def set_preferred_locale(self, locale: str) -> Session.AbsBuilder:
"""
@ -1385,7 +1358,7 @@ class Session(Closeable, MessageListener, SubListener):
"""
if len(locale) != 2:
raise TypeError("Invalid locale: {}".format(locale))
raise TypeError(f"Invalid locale: {locale}")
self.preferred_locale = locale
return self
@ -1518,9 +1491,7 @@ class Session(Closeable, MessageListener, SubListener):
type_int = self.read_blob_int(blob)
type_ = Authentication.AuthenticationType.Name(type_int)
if type_ is None:
raise IOError(
TypeError(
"Unknown AuthenticationType: {}".format(type_int)))
raise IOError(TypeError(f"Unknown AuthenticationType: {type_int}"))
blob.read(1)
l = self.read_blob_int(blob)
auth_data = blob.read(l)
@ -2098,7 +2069,7 @@ class SearchManager:
""" """
def __init__(self, status_code: int):
super().__init__("Search failed with code {}.".format(status_code))
super().__init__(f"Search failed with code {status_code}.")
class SearchRequest:
""" """
@ -2112,19 +2083,19 @@ class SearchManager:
def __init__(self, query: str):
self.query = query
if query == "":
if not query:
raise TypeError
def build_url(self) -> str:
""" """
url = SearchManager.base_url + urllib.parse.quote(self.query)
url += "?entityVersion=2"
url += "&catalogue=" + urllib.parse.quote(self.__catalogue)
url += "&country=" + urllib.parse.quote(self.__country)
url += "&imageSize=" + urllib.parse.quote(self.__image_size)
url += "&limit=" + str(self.__limit)
url += "&locale=" + urllib.parse.quote(self.__locale)
url += "&username=" + urllib.parse.quote(self.__username)
url += f"&catalogue={urllib.parse.quote(self.__catalogue)}"
url += f"&country={urllib.parse.quote(self.__country)}"
url += f"&imageSize={urllib.parse.quote(self.__image_size)}"
url += f"&limit={str(self.__limit)}"
url += f"&locale={urllib.parse.quote(self.__locale)}"
url += f"&username={urllib.parse.quote(self.__username)}"
return url
def get_catalogue(self) -> str:
@ -2224,10 +2195,9 @@ class TokenProvider:
:param scopes: typing.List[str]:
"""
for token in self.__tokens:
if token.has_scopes(scopes):
return token
return None
return next(
(token for token in self.__tokens if token.has_scopes(scopes)), None
)
def get(self, scope: str) -> str:
"""
@ -2244,7 +2214,7 @@ class TokenProvider:
"""
scopes = list(scopes)
if len(scopes) == 0:
if not scopes:
raise RuntimeError("The token doesn't have any scope")
token = self.find_token_with_all_scopes(scopes)
if token is not None:
@ -2253,15 +2223,15 @@ class TokenProvider:
else:
return token
self.logger.debug(
"Token expired or not suitable, requesting again. scopes: {}, old_token: {}"
.format(scopes, token))
f"Token expired or not suitable, requesting again. scopes: {scopes}, old_token: {token}"
)
response = self._session.mercury().send_sync_json(
MercuryRequests.request_token(self._session.device_id(),
",".join(scopes)))
token = TokenProvider.StoredToken(response)
self.logger.debug(
"Updated token successfully! scopes: {}, new_token: {}".format(
scopes, token))
f"Updated token successfully! scopes: {scopes}, new_token: {token}"
)
self.__tokens.append(token)
return token
@ -2290,10 +2260,7 @@ class TokenProvider:
:param scope: str:
"""
for s in self.scopes:
if s == scope:
return True
return False
return any(s == scope for s in self.scopes)
def has_scopes(self, sc: typing.List[str]) -> bool:
"""
@ -2301,7 +2268,4 @@ class TokenProvider:
:param sc: typing.List[str]:
"""
for s in sc:
if not self.has_scope(s):
return False
return True
return all(self.has_scope(s) for s in sc)

View File

@ -167,14 +167,19 @@ class Packet:
@staticmethod
def parse(val: typing.Union[bytes, None]) -> typing.Union[bytes, None]:
for cmd in [
Packet.Type.__dict__[attr] for attr in Packet.Type.__dict__
if re.search("__.+?__", attr) is None
and type(Packet.Type.__dict__[attr]) is bytes
]:
if cmd == val:
return cmd
return None
return next(
(
cmd
for cmd in [
Packet.Type.__dict__[attr]
for attr in Packet.Type.__dict__
if re.search("__.+?__", attr) is None
and type(Packet.Type.__dict__[attr]) is bytes
]
if cmd == val
),
None,
)
@staticmethod
def for_method(method: str) -> bytes:

View File

@ -62,7 +62,7 @@ class MercuryClient(Closeable, PacketsReceiver):
elif seq_length == 8:
seq = struct.unpack(">q", payload.read(8))[0]
else:
raise RuntimeError("Unknown seq length: {}".format(seq_length))
raise RuntimeError(f"Unknown seq length: {seq_length}")
flags = payload.read(1)
parts = struct.unpack(">H", payload.read(2))[0]
partial = self.__partials.get(seq)
@ -70,8 +70,8 @@ class MercuryClient(Closeable, PacketsReceiver):
partial = []
self.__partials[seq] = partial
self.logger.debug(
"Handling packet, cmd: 0x{}, seq: {}, flags: {}, parts: {}".format(
util.bytes_to_hex(packet.cmd), seq, flags, parts))
f"Handling packet, cmd: 0x{util.bytes_to_hex(packet.cmd)}, seq: {seq}, flags: {flags}, parts: {parts}"
)
for _ in range(parts):
size = struct.unpack(">H", payload.read(2))[0]
buffer = payload.read(size)
@ -92,9 +92,8 @@ class MercuryClient(Closeable, PacketsReceiver):
dispatched = True
if not dispatched:
self.logger.debug(
"Couldn't dispatch Mercury event seq: {}, uri: {}, code: {}, payload: {}"
.format(seq, header.uri, header.status_code,
response.payload))
f"Couldn't dispatch Mercury event seq: {seq}, uri: {header.uri}, code: {header.status_code}, payload: {response.payload}"
)
elif (packet.is_cmd(Packet.Type.mercury_req)
or packet.is_cmd(Packet.Type.mercury_sub)
or packet.is_cmd(Packet.Type.mercury_sub)):
@ -104,14 +103,14 @@ class MercuryClient(Closeable, PacketsReceiver):
callback.response(response)
else:
self.logger.warning(
"Skipped Mercury response, seq: {}, uri: {}, code: {}".
format(seq, response.uri, response.status_code))
f"Skipped Mercury response, seq: {seq}, uri: {response.uri}, code: {response.status_code}"
)
with self.__remove_callback_lock:
self.__remove_callback_lock.notify_all()
else:
self.logger.warning(
"Couldn't handle packet, seq: {}, uri: {}, code: {}".format(
seq, header.uri, header.status_code))
f"Couldn't handle packet, seq: {seq}, uri: {header.uri}, code: {header.status_code}"
)
def interested_in(self, uri: str, listener: SubListener) -> None:
self.__subscriptions.append(
@ -141,8 +140,8 @@ class MercuryClient(Closeable, PacketsReceiver):
seq = self.__seq_holder
self.__seq_holder += 1
self.logger.debug(
"Send Mercury request, seq: {}, uri: {}, method: {}".format(
seq, request.header.uri, request.header.method))
f"Send Mercury request, seq: {seq}, uri: {request.header.uri}, method: {request.header.method}"
)
buffer.write(struct.pack(">H", 4))
buffer.write(struct.pack(">i", seq))
buffer.write(b"\x01")
@ -173,8 +172,8 @@ class MercuryClient(Closeable, PacketsReceiver):
response = callback.wait_response()
if response is None:
raise IOError(
"Request timeout out, {} passed, yet no response. seq: {}".
format(self.mercury_request_timeout, seq))
f"Request timeout out, {self.mercury_request_timeout} passed, yet no response. seq: {seq}"
)
return response
except queue.Empty as e:
raise IOError(e)
@ -204,7 +203,7 @@ class MercuryClient(Closeable, PacketsReceiver):
else:
self.__subscriptions.append(
MercuryClient.InternalSubListener(uri, listener, True))
self.logger.debug("Subscribed successfully to {}!".format(uri))
self.logger.debug(f"Subscribed successfully to {uri}!")
def unsubscribe(self, uri) -> None:
"""
@ -219,7 +218,7 @@ class MercuryClient(Closeable, PacketsReceiver):
if subscription.matches(uri):
self.__subscriptions.remove(subscription)
break
self.logger.debug("Unsubscribed successfully from {}!".format(uri))
self.logger.debug(f"Unsubscribed successfully from {uri}!")
class Callback:
def response(self, response: MercuryClient.Response) -> None:
@ -257,7 +256,7 @@ class MercuryClient(Closeable, PacketsReceiver):
code: int
def __init__(self, response: MercuryClient.Response):
super().__init__("status: {}".format(response.status_code))
super().__init__(f"status: {response.status_code}")
self.code = response.status_code
class PubSubException(MercuryException):
@ -303,9 +302,9 @@ class MercuryRequests:
def request_token(device_id, scope):
return JsonMercuryRequest(
RawMercuryRequest.get(
"hm://keymaster/token/authenticated?scope={}&client_id={}&device_id={}"
.format(scope, MercuryRequests.keymaster_client_id,
device_id)))
f"hm://keymaster/token/authenticated?scope={scope}&client_id={MercuryRequests.keymaster_client_id}&device_id={device_id}"
)
)
class RawMercuryRequest:

View File

@ -40,13 +40,15 @@ class PlayableId:
return TrackId.from_uri(uri)
if EpisodeId.pattern.search(uri) is not None:
return EpisodeId.from_uri(uri)
raise TypeError("Unknown uri: {}".format(uri))
raise TypeError(f"Unknown uri: {uri}")
@staticmethod
def is_supported(uri: str):
return (not uri.startswith("spotify:local:")
and not uri == "spotify:delimiter"
and not uri == "spotify:meta:delimiter")
return (
not uri.startswith("spotify:local:")
and uri != "spotify:delimiter"
and uri != "spotify:meta:delimiter"
)
@staticmethod
def should_play(track: ContextTrack):
@ -76,13 +78,13 @@ class PlaylistId(SpotifyId):
if matcher is not None:
playlist_id = matcher.group(1)
return PlaylistId(playlist_id)
raise TypeError("Not a Spotify playlist ID: {}.".format(uri))
raise TypeError(f"Not a Spotify playlist ID: {uri}.")
def id(self) -> str:
return self.__id
def to_spotify_uri(self) -> str:
return "spotify:playlist:" + self.__id
return f"spotify:playlist:{self.__id}"
class UnsupportedId(PlayableId):
@ -115,7 +117,7 @@ class AlbumId(SpotifyId):
if matcher is not None:
album_id = matcher.group(1)
return AlbumId(util.bytes_to_hex(AlbumId.base62.decode(album_id.encode(), 16)))
raise TypeError("Not a Spotify album ID: {}.".format(uri))
raise TypeError(f"Not a Spotify album ID: {uri}.")
@staticmethod
def from_base62(base62: str) -> AlbumId:
@ -126,14 +128,13 @@ class AlbumId(SpotifyId):
return AlbumId(hex_str)
def to_mercury_uri(self) -> str:
return "hm://metadata/4/album/{}".format(self.__hex_id)
return f"hm://metadata/4/album/{self.__hex_id}"
def hex_id(self) -> str:
return self.__hex_id
def to_spotify_uri(self) -> str:
return "spotify:album:{}".format(
AlbumId.base62.encode(util.hex_to_bytes(self.__hex_id)).decode())
return f"spotify:album:{AlbumId.base62.encode(util.hex_to_bytes(self.__hex_id)).decode()}"
class ArtistId(SpotifyId):
@ -151,7 +152,7 @@ class ArtistId(SpotifyId):
artist_id = matcher.group(1)
return ArtistId(
util.bytes_to_hex(ArtistId.base62.decode(artist_id.encode(), 16)))
raise TypeError("Not a Spotify artist ID: {}".format(uri))
raise TypeError(f"Not a Spotify artist ID: {uri}")
@staticmethod
def from_base62(base62: str) -> ArtistId:
@ -162,11 +163,10 @@ class ArtistId(SpotifyId):
return ArtistId(hex_str)
def to_mercury_uri(self) -> str:
return "hm://metadata/4/artist/{}".format(self.__hex_id)
return f"hm://metadata/4/artist/{self.__hex_id}"
def to_spotify_uri(self) -> str:
return "spotify:artist:{}".format(
ArtistId.base62.encode(util.hex_to_bytes(self.__hex_id)).decode())
return f"spotify:artist:{ArtistId.base62.encode(util.hex_to_bytes(self.__hex_id)).decode()}"
def hex_id(self) -> str:
return self.__hex_id
@ -186,7 +186,7 @@ class EpisodeId(SpotifyId, PlayableId):
episode_id = matcher.group(1)
return EpisodeId(
util.bytes_to_hex(PlayableId.base62.decode(episode_id.encode(), 16)))
raise TypeError("Not a Spotify episode ID: {}".format(uri))
raise TypeError(f"Not a Spotify episode ID: {uri}")
@staticmethod
def from_base62(base62: str) -> EpisodeId:
@ -198,11 +198,10 @@ class EpisodeId(SpotifyId, PlayableId):
return EpisodeId(hex_str)
def to_mercury_uri(self) -> str:
return "hm://metadata/4/episode/{}".format(self.__hex_id)
return f"hm://metadata/4/episode/{self.__hex_id}"
def to_spotify_uri(self) -> str:
return "Spotify:episode:{}".format(
PlayableId.base62.encode(util.hex_to_bytes(self.__hex_id)).decode())
return f"Spotify:episode:{PlayableId.base62.encode(util.hex_to_bytes(self.__hex_id)).decode()}"
def hex_id(self) -> str:
return self.__hex_id
@ -225,7 +224,7 @@ class ShowId(SpotifyId):
if matcher is not None:
show_id = matcher.group(1)
return ShowId(util.bytes_to_hex(ShowId.base62.decode(show_id.encode(), 16)))
raise TypeError("Not a Spotify show ID: {}".format(uri))
raise TypeError(f"Not a Spotify show ID: {uri}")
@staticmethod
def from_base62(base62: str) -> ShowId:
@ -236,11 +235,10 @@ class ShowId(SpotifyId):
return ShowId(hex_str)
def to_mercury_uri(self) -> str:
return "hm://metadata/4/show/{}".format(self.__hex_id)
return f"hm://metadata/4/show/{self.__hex_id}"
def to_spotify_uri(self) -> str:
return "spotify:show:{}".format(
ShowId.base62.encode(util.hex_to_bytes(self.__hex_id)).decode())
return f"spotify:show:{ShowId.base62.encode(util.hex_to_bytes(self.__hex_id)).decode()}"
def hex_id(self) -> str:
return self.__hex_id
@ -260,7 +258,7 @@ class TrackId(PlayableId, SpotifyId):
track_id = search.group(1)
return TrackId(
util.bytes_to_hex(PlayableId.base62.decode(track_id.encode(), 16)))
raise RuntimeError("Not a Spotify track ID: {}".format(uri))
raise RuntimeError(f"Not a Spotify track ID: {uri}")
@staticmethod
def from_base62(base62: str) -> TrackId:
@ -271,10 +269,10 @@ class TrackId(PlayableId, SpotifyId):
return TrackId(hex_str)
def to_mercury_uri(self) -> str:
return "hm://metadata/4/track/{}".format(self.__hex_id)
return f"hm://metadata/4/track/{self.__hex_id}"
def to_spotify_uri(self) -> str:
return "spotify:track:{}".format(TrackId.base62.encode(util.hex_to_bytes(self.__hex_id)).decode())
return f"spotify:track:{TrackId.base62.encode(util.hex_to_bytes(self.__hex_id)).decode()}"
def hex_id(self) -> str:
return self.__hex_id

View File

@ -33,7 +33,7 @@ def int_to_bytes(i: int):
def random_hex_string(length: int):
buffer = Random.get_random_bytes(int(length / 2))
buffer = Random.get_random_bytes(length // 2)
return bytes_to_hex(buffer)
@ -73,7 +73,7 @@ class Base62:
len(message), source_base, target_base) if length == -1 else length
out = b""
source = message
while len(source) > 0:
while source:
quotient = b""
remainder = 0
for b in source:

View File

@ -68,19 +68,20 @@ class ZeroconfServer(Closeable):
self.__zeroconf = zeroconf.Zeroconf()
self.__service_info = zeroconf.ServiceInfo(
ZeroconfServer.service,
inner.device_name + "." + ZeroconfServer.service,
f"{inner.device_name}.{ZeroconfServer.service}",
listen_port,
0,
0, {
0,
{
"CPath": "/",
"VERSION": "1.0",
"STACK": "SP",
},
self.get_useful_hostname() + ".",
f"{self.get_useful_hostname()}.",
addresses=[
socket.inet_aton(
socket.gethostbyname(self.get_useful_hostname()))
])
socket.inet_aton(socket.gethostbyname(self.get_useful_hostname()))
],
)
self.__zeroconf.register_service(self.__service_info)
threading.Thread(target=self.__zeroconf.start,
name="zeroconf-multicast-dns-server").start()
@ -102,9 +103,7 @@ class ZeroconfServer(Closeable):
def get_useful_hostname(self) -> str:
host = socket.gethostname()
if host == "localhost":
pass
else:
if host != "localhost":
return host
def handle_add_user(self, __socket: socket.socket, params: dict[str, str],
@ -122,8 +121,7 @@ class ZeroconfServer(Closeable):
self.logger.error("Missing clientKey!")
with self.__connection_lock:
if username == self.__connecting_username:
self.logger.info(
"{} is already trying to connect.".format(username))
self.logger.info(f"{username} is already trying to connect.")
__socket.send(http_version.encode())
__socket.send(b" 403 Forbidden")
__socket.send(self.__eol)
@ -164,8 +162,9 @@ class ZeroconfServer(Closeable):
self.close_session()
with self.__connection_lock:
self.__connecting_username = username
self.logger.info("Accepted new user from {}. [deviceId: {}]".format(
params.get("deviceName"), self.__inner.device_id))
self.logger.info(
f'Accepted new user from {params.get("deviceName")}. [deviceId: {self.__inner.device_id}]'
)
response = json.dumps(self.__default_successful_add_user)
__socket.send(http_version.encode())
__socket.send(b" 200 OK")
@ -176,12 +175,12 @@ class ZeroconfServer(Closeable):
__socket.send(self.__eol)
__socket.send(response.encode())
self.__session = Session.Builder(self.__inner.conf) \
.set_device_id(self.__inner.device_id) \
.set_device_name(self.__inner.device_name) \
.set_device_type(self.__inner.device_type) \
.set_preferred_locale(self.__inner.preferred_locale) \
.blob(username, decrypted) \
.create()
.set_device_id(self.__inner.device_id) \
.set_device_name(self.__inner.device_name) \
.set_device_type(self.__inner.device_type) \
.set_preferred_locale(self.__inner.preferred_locale) \
.blob(username, decrypted) \
.create()
with self.__connection_lock:
self.__connecting_username = None
for session_listener in self.__session_listeners:
@ -214,7 +213,7 @@ class ZeroconfServer(Closeable):
return valid
def parse_path(self, path: str) -> dict[str, str]:
url = "https://host" + path
url = f"https://host{path}"
parsed = {}
params = urllib.parse.parse_qs(urllib.parse.urlparse(url).query)
for key, values in params.items():
@ -250,8 +249,8 @@ class ZeroconfServer(Closeable):
self.__socket.listen(5)
self.__zeroconf_server = zeroconf_server
self.__zeroconf_server.logger.info(
"Zeroconf HTTP server started successfully on port {}!".format(
port))
f"Zeroconf HTTP server started successfully on port {port}!"
)
def close(self) -> None:
pass
@ -271,7 +270,8 @@ class ZeroconfServer(Closeable):
request_line = request.readline().strip().split(b" ")
if len(request_line) != 3:
self.__zeroconf_server.logger.warning(
"Unexpected request line: {}".format(request_line))
f"Unexpected request line: {request_line}"
)
method = request_line[0].decode()
path = request_line[1].decode()
http_version = request_line[2].decode()
@ -284,14 +284,13 @@ class ZeroconfServer(Closeable):
headers[split[0].decode()] = split[1].strip().decode()
if not self.__zeroconf_server.has_valid_session():
self.__zeroconf_server.logger.debug(
"Handling request: {}, {}, {}, headers: {}".format(
method, path, http_version, headers))
f"Handling request: {method}, {path}, {http_version}, headers: {headers}"
)
params = {}
if method == "POST":
content_type = headers.get("Content-Type")
if content_type != "application/x-www-form-urlencoded":
self.__zeroconf_server.logger.error(
"Bad Content-Type: {}".format(content_type))
self.__zeroconf_server.logger.error(f"Bad Content-Type: {content_type}")
return
content_length_str = headers.get("Content-Length")
if content_length_str is None:
@ -324,8 +323,7 @@ class ZeroconfServer(Closeable):
elif action == "getInfo":
self.__zeroconf_server.handle_get_info(__socket, http_version)
else:
self.__zeroconf_server.logger.warning(
"Unknown action: {}".format(action))
self.__zeroconf_server.logger.warning(f"Unknown action: {action}")
class Inner:
conf: typing.Final[Session.Configuration]

View File

@ -72,16 +72,15 @@ class DeviceStateHandler(Closeable, MessageListener, RequestListener):
def put_connect_state(self, request: Connect.PutStateRequest):
self.__session.api().put_connect_state(self.__connection_id, request)
self.logger.info("Put state. [ts: {}, connId: {}, reason: {}]".format(
request.client_side_timestamp, self.__connection_id,
request.put_state_reason))
self.logger.info(
f"Put state. [ts: {request.client_side_timestamp}, connId: {self.__connection_id}, reason: {request.put_state_reason}]"
)
def update_connection_id(self, newer: str) -> None:
newer = urllib.parse.unquote(newer)
if self.__connection_id is None or self.__connection_id != newer:
self.__connection_id = newer
self.logger.debug(
"Updated Spotify-Connection-Id: {}".format(newer))
self.logger.debug(f"Updated Spotify-Connection-Id: {newer}")
class Player:
@ -195,10 +194,14 @@ class StateWrapper(MessageListener):
self.__player = player
self.__device = DeviceStateHandler(session, conf)
self.__conf = conf
session.dealer().add_message_listener(self, [
"spotify:user:attributes:update", "hm://playlist/",
"hm://collection/collection/" + session.username() + "/json"
])
session.dealer().add_message_listener(
self,
[
"spotify:user:attributes:update",
"hm://playlist/",
f"hm://collection/collection/{session.username()}/json",
],
)
def on_message(self, uri: str, headers: typing.Dict[str, str],
payload: bytes):