from __future__ import annotations from librespot import util from librespot.crypto import Packet from librespot.proto import Mercury_pb2 as Mercury, Pubsub_pb2 as Pubsub from librespot.structure import Closeable, PacketsReceiver, SubListener import io import json import logging import queue import struct import threading import typing if typing.TYPE_CHECKING: from librespot.core import Session class JsonMercuryRequest: request: RawMercuryRequest def __init__(self, request: RawMercuryRequest): self.request = request class MercuryClient(Closeable, PacketsReceiver): logger = logging.getLogger("Librespot:MercuryClient") mercury_request_timeout = 3 __callbacks: typing.Dict[int, Callback] = {} __remove_callback_lock = threading.Condition() __partials: typing.Dict[int, typing.List[bytes]] = {} __seq_holder = 0 __seq_holder_lock = threading.Condition() __session: Session __subscriptions: typing.List[InternalSubListener] = [] __subscriptions_lock = threading.Condition() def __init__(self, session: Session): self.__session = session def close(self) -> None: """ Close the MercuryClient instance """ if len(self.__subscriptions) != 0: for listener in self.__subscriptions: if listener.is_sub: self.unsubscribe(listener.uri) else: self.not_interested_in(listener.listener) if len(self.__callbacks) != 0: with self.__remove_callback_lock: self.__remove_callback_lock.wait(self.mercury_request_timeout) self.__callbacks.clear() def dispatch(self, packet: Packet) -> None: payload = io.BytesIO(packet.payload) seq_length = struct.unpack(">H", payload.read(2))[0] if seq_length == 2: seq = struct.unpack(">H", payload.read(2))[0] elif seq_length == 4: seq = struct.unpack(">i", payload.read(4))[0] elif seq_length == 8: seq = struct.unpack(">q", payload.read(8))[0] else: raise RuntimeError("Unknown seq length: {}".format(seq_length)) flags = payload.read(1) parts = struct.unpack(">H", payload.read(2))[0] partial = self.__partials.get(seq) if partial is None or flags == 0: partial = [] self.__partials[seq] = partial self.logger.debug( "Handling packet, cmd: 0x{}, seq: {}, flags: {}, parts: {}".format( util.bytes_to_hex(packet.cmd), seq, flags, parts)) for _ in range(parts): size = struct.unpack(">H", payload.read(2))[0] buffer = payload.read(size) partial.append(buffer) self.__partials[seq] = partial if flags != b"\x01": return self.__partials.pop(seq) header = Mercury.Header() header.ParseFromString(partial[0]) response = MercuryClient.Response(header, partial) if packet.is_cmd(Packet.Type.mercury_event): dispatched = False with self.__subscriptions_lock: for sub in self.__subscriptions: if sub.matches(header.uri): sub.dispatch(response) dispatched = True if not dispatched: self.logger.debug( "Couldn't dispatch Mercury event seq: {}, uri: {}, code: {}, payload: {}" .format(seq, header.uri, header.status_code, response.payload)) elif (packet.is_cmd(Packet.Type.mercury_req) or packet.is_cmd(Packet.Type.mercury_sub) or packet.is_cmd(Packet.Type.mercury_sub)): callback = self.__callbacks.get(seq) self.__callbacks.pop(seq) if callback is not None: callback.response(response) else: self.logger.warning( "Skipped Mercury response, seq: {}, uri: {}, code: {}". format(seq, response.uri, response.status_code)) with self.__remove_callback_lock: self.__remove_callback_lock.notify_all() else: self.logger.warning( "Couldn't handle packet, seq: {}, uri: {}, code: {}".format( seq, header.uri, header.status_code)) def interested_in(self, uri: str, listener: SubListener) -> None: self.__subscriptions.append( MercuryClient.InternalSubListener(uri, listener, False)) def not_interested_in(self, listener: SubListener) -> None: try: for subscription in self.__subscriptions: if subscription.listener is listener: self.__subscriptions.remove(subscription) break except ValueError: pass def send(self, request: RawMercuryRequest, callback) -> int: """ Send the Mercury request Args: request: RawMercuryRequest callback: Callback function Returns: MercuryClient.Response """ buffer = io.BytesIO() seq: int with self.__seq_holder_lock: seq = self.__seq_holder self.__seq_holder += 1 self.logger.debug( "Send Mercury request, seq: {}, uri: {}, method: {}".format( seq, request.header.uri, request.header.method)) buffer.write(struct.pack(">H", 4)) buffer.write(struct.pack(">i", seq)) buffer.write(b"\x01") buffer.write(struct.pack(">H", 1 + len(request.payload))) header_bytes = request.header.SerializeToString() buffer.write(struct.pack(">H", len(header_bytes))) buffer.write(header_bytes) for part in request.payload: buffer.write(struct.pack(">H", len(part))) buffer.write(part) buffer.seek(0) cmd = Packet.Type.for_method(request.header.method) self.__session.send(cmd, buffer.read()) self.__callbacks[seq] = callback return seq def send_sync(self, request: RawMercuryRequest) -> Response: """ Send the Mercury request Args: request: RawMercuryRequest Returns: MercuryClient.Response """ callback = MercuryClient.SyncCallback() seq = self.send(request, callback) try: response = callback.wait_response() if response is None: raise IOError( "Request timeout out, {} passed, yet no response. seq: {}". format(self.mercury_request_timeout, seq)) return response except queue.Empty as e: raise IOError(e) def send_sync_json(self, request: JsonMercuryRequest) -> typing.Any: response = self.send_sync(request.request) if 200 <= response.status_code < 300: return json.loads(response.payload) raise MercuryClient.MercuryException(response) def subscribe(self, uri: str, listener: SubListener) -> None: """ Subscribe URI Args: uri: listener: """ response = self.send_sync(RawMercuryRequest.sub(uri)) if response.status_code != 200: raise RuntimeError(response) if len(response.payload) > 0: for payload in response.payload: sub = Pubsub.Subscription() sub.ParseFromString(payload) self.__subscriptions.append( MercuryClient.InternalSubListener(sub.uri, listener, True)) else: self.__subscriptions.append( MercuryClient.InternalSubListener(uri, listener, True)) self.logger.debug("Subscribed successfully to {}!".format(uri)) def unsubscribe(self, uri) -> None: """ Unsubscribe URI Args: uri: """ response = self.send_sync(RawMercuryRequest.unsub(uri)) if response.status_code != 200: raise RuntimeError(response) for subscription in self.__subscriptions: if subscription.matches(uri): self.__subscriptions.remove(subscription) break self.logger.debug("Unsubscribed successfully from {}!".format(uri)) class Callback: def response(self, response: MercuryClient.Response) -> None: raise NotImplementedError class InternalSubListener: uri: str listener: SubListener is_sub: bool def __init__(self, uri: str, listener: SubListener, is_sub: bool): self.uri = uri self.listener = listener self.is_sub = is_sub def matches(self, uri: str) -> bool: """ Compare with the URI given Args: uri: URI to be compared Returns: bool """ return uri.startswith(self.uri) def dispatch(self, response: MercuryClient.Response) -> None: """ Dispatch the event response Args: response: Response generated by the event """ self.listener.event(response) class MercuryException(Exception): code: int def __init__(self, response: MercuryClient.Response): super().__init__("status: {}".format(response.status_code)) self.code = response.status_code class PubSubException(MercuryException): pass class Response: uri: str payload: bytes status_code: int def __init__(self, header: Mercury.Header, payload: list[bytes]): self.uri = header.uri self.status_code = header.status_code self.payload = b"".join(payload[1:]) class SyncCallback(Callback): __reference = queue.Queue() def response(self, response: MercuryClient.Response) -> None: """ Set the response :param response: :return: """ self.__reference.put(response) self.__reference.task_done() def wait_response(self) -> typing.Any: return self.__reference.get( timeout=MercuryClient.mercury_request_timeout) class MercuryRequests: keymaster_client_id = "65b708073fc0480ea92a077233ca87bd" @staticmethod def get_root_playlists(username: str): """ @TODO implement function """ @staticmethod def request_token(device_id, scope): return JsonMercuryRequest( RawMercuryRequest.get( "hm://keymaster/token/authenticated?scope={}&client_id={}&device_id={}" .format(scope, MercuryRequests.keymaster_client_id, device_id))) class RawMercuryRequest: header: Mercury.Header payload: typing.List[bytes] def __init__(self, header: Mercury.Header, payload: typing.List[bytes]): self.header = header self.payload = payload @staticmethod def sub(uri: str): return RawMercuryRequest.new_builder().set_uri(uri).set_method( "SUB").build() @staticmethod def unsub(uri: str): return RawMercuryRequest.new_builder().set_uri(uri).set_method( "UNSUB").build() @staticmethod def get(uri: str): return RawMercuryRequest.new_builder().set_uri(uri).set_method( "GET").build() @staticmethod def send(uri: str, part: bytes): return RawMercuryRequest.new_builder().set_uri(uri) \ .add_payload_part(part).set_method("SEND").build() @staticmethod def post(uri: str, part: bytes): return RawMercuryRequest.new_builder().set_uri(uri) \ .set_method("POST").add_payload_part(part).build() @staticmethod def new_builder(): return RawMercuryRequest.Builder() class Builder: header_dict: dict payload: typing.List[bytes] def __init__(self): self.header_dict = {} self.payload = [] def set_uri(self, uri: str): self.header_dict["uri"] = uri return self def set_content_type(self, content_type: str): self.header_dict["content_type"] = content_type return self def set_method(self, method: str): self.header_dict["method"] = method return self def add_user_field(self, field: Mercury.UserField = None, key: str = None, value: str = None): if field is None and (key is None or value is None): return self try: self.header_dict["user_fields"] except KeyError: self.header_dict["user_fields"] = [] if field is not None: self.header_dict["user_fields"].append(field) if key is not None and value is not None: self.header_dict["user_fields"].append( Mercury.UserField(key=key, value=value.encode())) return self def add_payload_part(self, part: bytes): self.payload.append(part) return self def add_protobuf_payload(self, msg): return self.add_payload_part(msg) def build(self): return RawMercuryRequest(Mercury.Header(**self.header_dict), self.payload)