Restyled by black

This commit is contained in:
Restyled.io 2021-05-21 03:50:07 +00:00
parent 9baf1c06d6
commit efe2840fbb
1 changed files with 92 additions and 59 deletions

View File

@ -30,9 +30,11 @@ class CdnManager:
self._session = session self._session = session
def get_head(self, file_id: bytes): def get_head(self, file_id: bytes):
resp = self._session.client() \ resp = self._session.client().get(
.get(self._session.get_user_attribute("head-files-url", "https://heads-fa.spotify.com/head/{file_id}") self._session.get_user_attribute(
.replace("{file_id}", Utils.bytes_to_hex(file_id))) "head-files-url", "https://heads-fa.spotify.com/head/{file_id}"
).replace("{file_id}", Utils.bytes_to_hex(file_id))
)
if resp.status_code != 200: if resp.status_code != 200:
raise IOError("{}".format(resp.status_code)) raise IOError("{}".format(resp.status_code))
@ -43,27 +45,45 @@ class CdnManager:
return body return body
def stream_external_episode(self, episode: Metadata.Episode, def stream_external_episode(
external_url: str, self, episode: Metadata.Episode, external_url: str, halt_listener: HaltListener
halt_listener: HaltListener): ):
return CdnManager.Streamer(self._session, StreamId(episode), return CdnManager.Streamer(
SuperAudioFormat.MP3, self._session,
CdnManager.CdnUrl(self, None, external_url), StreamId(episode),
self._session.cache(), NoopAudioDecrypt(), SuperAudioFormat.MP3,
halt_listener) CdnManager.CdnUrl(self, None, external_url),
self._session.cache(),
NoopAudioDecrypt(),
halt_listener,
)
def stream_file(self, file: Metadata.AudioFile, key: bytes, url: str, def stream_file(
halt_listener: HaltListener): self,
return CdnManager.Streamer(self._session, StreamId.StreamId(file), file: Metadata.AudioFile,
SuperAudioFormat.get(file.format), key: bytes,
CdnManager.CdnUrl(self, file.file_id, url), url: str,
self._session.cache(), AesAudioDecrypt(key), halt_listener: HaltListener,
halt_listener) ):
return CdnManager.Streamer(
self._session,
StreamId.StreamId(file),
SuperAudioFormat.get(file.format),
CdnManager.CdnUrl(self, file.file_id, url),
self._session.cache(),
AesAudioDecrypt(key),
halt_listener,
)
def get_audio_url(self, file_id: bytes): def get_audio_url(self, file_id: bytes):
resp = self._session.api().send( resp = self._session.api().send(
"GET", "/storage-resolve/files/audio/interactive/{}".format( "GET",
Utils.bytes_to_hex(file_id)), None, None) "/storage-resolve/files/audio/interactive/{}".format(
Utils.bytes_to_hex(file_id)
),
None,
None,
)
if resp.status_code != 200: if resp.status_code != 200:
raise IOError(resp.status_code) raise IOError(resp.status_code)
@ -76,11 +96,13 @@ class CdnManager:
proto.ParseFromString(body) proto.ParseFromString(body)
if proto.result == StorageResolve.StorageResolveResponse.Result.CDN: if proto.result == StorageResolve.StorageResolveResponse.Result.CDN:
url = random.choice(proto.cdnurl) url = random.choice(proto.cdnurl)
self._LOGGER.debug("Fetched CDN url for {}: {}".format( self._LOGGER.debug(
Utils.bytes_to_hex(file_id), url)) "Fetched CDN url for {}: {}".format(Utils.bytes_to_hex(file_id), url)
)
return url return url
raise CdnManager.CdnException( raise CdnManager.CdnException(
"Could not retrieve CDN url! result: {}".format(proto.result)) "Could not retrieve CDN url! result: {}".format(proto.result)
)
class CdnException(Exception): class CdnException(Exception):
pass pass
@ -134,13 +156,14 @@ class CdnManager:
continue continue
if s[0][:i] == "exp": if s[0][:i] == "exp":
expire_at = int(s[0][i + 1:]) expire_at = int(s[0][i + 1 :])
break break
if expire_at is None: if expire_at is None:
self._expiration = -1 self._expiration = -1
self._cdnManager._LOGGER.warning( self._cdnManager._LOGGER.warning(
"Invalid __token__ in CDN url: {}".format(url)) "Invalid __token__ in CDN url: {}".format(url)
)
return return
self._expiration = expire_at * 1000 self._expiration = expire_at * 1000
@ -150,8 +173,10 @@ class CdnManager:
except ValueError: except ValueError:
self._expiration = -1 self._expiration = -1
self._cdnManager._LOGGER.warning( self._cdnManager._LOGGER.warning(
"Couldn't extract expiration, invalid parameter in CDN url: " "Couldn't extract expiration, invalid parameter in CDN url: ".format(
.format(url)) url
)
)
return return
self._expiration = int(token_url.query[:i]) * 1000 self._expiration = int(token_url.query[:i]) * 1000
@ -159,8 +184,10 @@ class CdnManager:
else: else:
self._expiration = -1 self._expiration = -1
class Streamer(GeneralAudioStream.GeneralAudioStream, class Streamer(
GeneralWritableStream.GeneralWritableStream): GeneralAudioStream.GeneralAudioStream,
GeneralWritableStream.GeneralWritableStream,
):
_session: Session = None _session: Session = None
_streamId: StreamId = None _streamId: StreamId = None
_executorService = concurrent.futures.ThreadPoolExecutor() _executorService = concurrent.futures.ThreadPoolExecutor()
@ -175,10 +202,16 @@ class CdnManager:
_internalStream: CdnManager.Streamer.InternalStream = None _internalStream: CdnManager.Streamer.InternalStream = None
_haltListener: HaltListener = None _haltListener: HaltListener = None
def __init__(self, session: Session, stream_id: StreamId, def __init__(
audio_format: SuperAudioFormat, cdn_url, self,
cache: CacheManager, audio_decrypt: AudioDecrypt, session: Session,
halt_listener: HaltListener): stream_id: StreamId,
audio_format: SuperAudioFormat,
cdn_url,
cache: CacheManager,
audio_decrypt: AudioDecrypt,
halt_listener: HaltListener,
):
self._session = session self._session = session
self._streamId = stream_id self._streamId = stream_id
self._audioFormat = audio_format self._audioFormat = audio_format
@ -186,39 +219,38 @@ class CdnManager:
self._cdnUrl = cdn_url self._cdnUrl = cdn_url
self._haltListener = halt_listener self._haltListener = halt_listener
resp = self.request(range_start=0, resp = self.request(range_start=0, range_end=ChannelManager.CHUNK_SIZE - 1)
range_end=ChannelManager.CHUNK_SIZE - 1)
content_range = resp._headers.get("Content-Range") content_range = resp._headers.get("Content-Range")
if content_range is None: if content_range is None:
raise IOError("Missing Content-Range header!") raise IOError("Missing Content-Range header!")
split = Utils.split(content_range, "/") split = Utils.split(content_range, "/")
self._size = int(split[1]) self._size = int(split[1])
self._chunks = int( self._chunks = int(math.ceil(self._size / ChannelManager.CHUNK_SIZE))
math.ceil(self._size / ChannelManager.CHUNK_SIZE))
first_chunk = resp._buffer first_chunk = resp._buffer
self._available = [False for _ in range(self._chunks)] self._available = [False for _ in range(self._chunks)]
self._requested = [False for _ in range(self._chunks)] self._requested = [False for _ in range(self._chunks)]
self._buffer = [bytearray() for _ in range(self._chunks)] self._buffer = [bytearray() for _ in range(self._chunks)]
self._internalStream = CdnManager.Streamer.InternalStream( self._internalStream = CdnManager.Streamer.InternalStream(self, False)
self, False)
self._requested[0] = True self._requested[0] = True
self.write_chunk(first_chunk, 0, False) self.write_chunk(first_chunk, 0, False)
def write_chunk(self, chunk: bytes, chunk_index: int, def write_chunk(self, chunk: bytes, chunk_index: int, cached: bool) -> None:
cached: bool) -> None:
if self._internalStream.is_closed(): if self._internalStream.is_closed():
return return
self._session._LOGGER.debug( self._session._LOGGER.debug(
"Chunk {}/{} completed, cached: {}, stream: {}".format( "Chunk {}/{} completed, cached: {}, stream: {}".format(
chunk_index + 1, self._chunks, cached, self.describe())) chunk_index + 1, self._chunks, cached, self.describe()
)
)
self._buffer[chunk_index] = self._audioDecrypt.decrypt_chunk( self._buffer[chunk_index] = self._audioDecrypt.decrypt_chunk(
chunk_index, chunk) chunk_index, chunk
)
self._internalStream.notify_chunk_available(chunk_index) self._internalStream.notify_chunk_available(chunk_index)
def stream(self) -> AbsChunkedInputStream: def stream(self) -> AbsChunkedInputStream:
@ -229,8 +261,7 @@ class CdnManager:
def describe(self) -> str: def describe(self) -> str:
if self._streamId.is_episode(): if self._streamId.is_episode():
return "episode_gid: {}".format( return "episode_gid: {}".format(self._streamId.get_episode_gid())
self._streamId.get_episode_gid())
return "file_id: {}".format(self._streamId.get_file_id()) return "file_id: {}".format(self._streamId.get_file_id())
def decrypt_time_ms(self) -> int: def decrypt_time_ms(self) -> int:
@ -240,10 +271,9 @@ class CdnManager:
resp = self.request(index) resp = self.request(index)
self.write_chunk(resp._buffer, index, False) self.write_chunk(resp._buffer, index, False)
def request(self, def request(
chunk: int = None, self, chunk: int = None, range_start: int = None, range_end: int = None
range_start: int = None, ) -> CdnManager.InternalResponse:
range_end: int = None) -> CdnManager.InternalResponse:
if chunk is None and range_start is None and range_end is None: if chunk is None and range_start is None and range_end is None:
raise TypeError() raise TypeError()
@ -251,12 +281,10 @@ class CdnManager:
range_start = ChannelManager.CHUNK_SIZE * chunk range_start = ChannelManager.CHUNK_SIZE * chunk
range_end = (chunk + 1) * ChannelManager.CHUNK_SIZE - 1 range_end = (chunk + 1) * ChannelManager.CHUNK_SIZE - 1
resp = self._session.client().get(self._cdnUrl._url, resp = self._session.client().get(
headers={ self._cdnUrl._url,
"Range": headers={"Range": "bytes={}-{}".format(range_start, range_end)},
"bytes={}-{}".format( )
range_start, range_end)
})
if resp.status_code != 206: if resp.status_code != 206:
raise IOError(resp.status_code) raise IOError(resp.status_code)
@ -291,16 +319,21 @@ class CdnManager:
def request_chunk_from_stream(self, index: int) -> None: def request_chunk_from_stream(self, index: int) -> None:
self.streamer._executorService.submit( self.streamer._executorService.submit(
lambda: self.streamer.request_chunk(index)) lambda: self.streamer.request_chunk(index)
)
def stream_read_halted(self, chunk: int, _time: int) -> None: def stream_read_halted(self, chunk: int, _time: int) -> None:
if self.streamer._haltListener is not None: if self.streamer._haltListener is not None:
self.streamer._executorService.submit( self.streamer._executorService.submit(
lambda: self.streamer._haltListener.stream_read_halted( lambda: self.streamer._haltListener.stream_read_halted(
chunk, _time)) chunk, _time
)
)
def stream_read_resumed(self, chunk: int, _time: int) -> None: def stream_read_resumed(self, chunk: int, _time: int) -> None:
if self.streamer._haltListener is not None: if self.streamer._haltListener is not None:
self.streamer._executorService.submit( self.streamer._executorService.submit(
lambda: self.streamer._haltListener. lambda: self.streamer._haltListener.stream_read_resumed(
stream_read_resumed(chunk, _time)) chunk, _time
)
)