From 1eca34a60132af6241cf1a711303961674447b7c Mon Sep 17 00:00:00 2001 From: kokarare1212 Date: Sun, 12 Sep 2021 16:56:20 +0900 Subject: [PATCH] Add dealer support --- docs/api.md | 4 +- librespot/core.py | 263 ++++++++++++++++++++++++++- librespot/structure.py | 11 ++ librespot/{Version.py => version.py} | 0 requirements.txt | 3 +- 5 files changed, 275 insertions(+), 6 deletions(-) rename librespot/{Version.py => version.py} (100%) diff --git a/docs/api.md b/docs/api.md index 756262a..b2fad2e 100644 --- a/docs/api.md +++ b/docs/api.md @@ -1,3 +1 @@ -# API Reference - -Coming soon :) +# API Reference \ No newline at end of file diff --git a/librespot/core.py b/librespot/core.py index e7bfd78..a6fd623 100644 --- a/librespot/core.py +++ b/librespot/core.py @@ -11,12 +11,14 @@ from librespot.crypto import CipherPair, DiffieHellman, Packet from librespot.mercury import MercuryClient, MercuryRequests, RawMercuryRequest from librespot.metadata import AlbumId, ArtistId, EpisodeId, ShowId, TrackId from librespot.proto import Authentication_pb2 as Authentication, Connect_pb2 as Connect, Keyexchange_pb2 as Keyexchange, Metadata_pb2 as Metadata -from librespot.structure import Closeable, SubListener +from librespot.proto.ExplicitContentPubsub_pb2 import UserAttributesUpdate +from librespot.structure import Closeable, MessageListener, RequestListener, SubListener from librespot.version import Version import base64 import concurrent.futures import defusedxml.ElementTree import enum +import gzip import io import json import logging @@ -30,6 +32,7 @@ import struct import threading import time import typing +import websocket class ApiClient(Closeable): @@ -215,6 +218,217 @@ class ApResolver: return ApResolver.get_random_of("accesspoint") +class DealerClient(Closeable): + logger = logging.getLogger("Librespot:DealerClient") + __connection: typing.Union[ConnectionHolder, None] + __last_scheduled_reconnection: typing.Union[sched.Event, None] + __message_listeners: typing.Dict[MessageListener, typing.List[str]] = {} + __message_listeners_lock = threading.Condition() + __request_listeners: typing.Dict[str, RequestListener] = {} + __request_listeners_lock = threading.Condition() + __scheduler = sched.scheduler() + __session: Session + __worker = concurrent.futures.ThreadPoolExecutor() + + def __init__(self, session: Session): + self.__session = session + + def add_message_listener(self, listener: MessageListener, uris: list[str]) -> None: + with self.__message_listeners_lock: + if listener in self.__message_listeners.keys(): + raise TypeError("A listener for {} has already been added.".format(uris)) + self.__message_listeners[listener] = uris + self.__message_listeners_lock.notify_all() + + def add_request_listener(self, listener: RequestListener, uri: str): + with self.__request_listeners_lock: + if uri in self.__request_listeners.keys(): + raise TypeError("A listener for '{}' has already been added.".format(uri)) + self.__request_listeners[uri] = listener + self.__request_listeners_lock.notify_all() + + def close(self) -> None: + self.__worker.shutdown() + + def connect(self) -> None: + self.__connection = DealerClient.ConnectionHolder(self.__session, self, "wss://{}/?access_token={}" + .format(ApResolver.get_random_dealer(), self.__session.tokens().get("playlist-read"))) + + def connection_invalided(self) -> None: + self.__connection = None + self.logger.debug("Scheduled reconnection attempt in 10 seconds...") + + def anonymous(): + self.__last_scheduled_reconnection = None + self.connect() + self.__last_scheduled_reconnection = self.__scheduler.enter(10, 1, anonymous) + + def handle_message(self, obj: typing.Any) -> None: + uri = obj.get("uri") + headers = self.__get_headers(obj) + payloads = obj.get("payloads") + decoded_payloads: typing.Any + if payloads is not None: + if headers.get("Content-Type") == "application/json": + decoded_payloads = payloads + elif headers.get("Content-Type") == "plain/text": + decoded_payloads = payloads + else: + decoded_payloads = base64.b64decode(payloads) + if headers.get("Transfer-Encoding") == "gzip": + decoded_payloads = gzip.decompress(decoded_payloads) + else: + decoded_payloads = b"" + interesting = False + with self.__message_listeners_lock: + for listener in self.__message_listeners.keys(): + dispatched = False + keys = self.__message_listeners.get(listener) + for key in keys: + if uri.startswith(key) and not dispatched: + interesting = True + + def anonymous(): + listener.on_message(uri, headers, decoded_payloads) + self.__worker.submit(anonymous) + dispatched = True + if not interesting: + self.logger.debug("Couldn't dispatch message: {}".format(uri)) + + def handle_request(self, obj: typing.Any) -> None: + mid = obj.get("message_ident") + key = obj.get("key") + headers = self.__get_headers(obj) + payload = obj.get("payload") + if headers.get("Transfer-Encoding") == "gzip": + gz = base64.b64decode(payload.get("compressed")) + payload = json.loads(gzip.decompress(gz)) + pid = payload.get("message_id") + sender = payload.get("sent_by_device_id") + command = payload.get("command") + self.logger.debug("Received request. [mid: {}, key: {}, pid: {}, sender: {}, command: {}]".format(mid, key, pid, sender, command)) + interesting = False + with self.__request_listeners_lock: + for mid_prefix in self.__request_listeners.keys(): + if mid.startswith(mid_prefix): + listener = self.__request_listeners.get(mid_prefix) + interesting = True + + def anonymous(): + result = listener.on_request(mid, pid, sender, command) + if self.__connection is not None: + self.__connection.send_reply(key, result) + self.logger.warning("Handled request. [key: {}, result: {}]".format(key, result)) + self.__worker.submit(anonymous) + if not interesting: + self.logger.debug("Couldn't dispatch request: {}".format(mid)) + + def remove_message_listener(self, listener: MessageListener) -> None: + with self.__message_listeners_lock: + self.__message_listeners.pop(listener) + + def remove_request_listener(self, listener: RequestListener) -> None: + with self.__request_listeners_lock: + for key, value in self.__request_listeners.items(): + if value == listener: + self.__request_listeners.pop(key) + break + + def wait_for_listener(self) -> None: + with self.__message_listeners_lock: + if self.__message_listeners == {}: + return + self.__message_listeners_lock.wait() + + def __get_headers(self, obj: typing.Any) -> dict[str, str]: + headers = obj.get("headers") + if headers is None: + return {} + return headers + + class ConnectionHolder(Closeable): + __closed = False + __dealer_client: DealerClient + __last_scheduled_ping: sched.Event + __received_pong = False + __scheduler = sched.scheduler() + __session: Session + __url: str + __ws: websocket.WebSocketApp + + def __init__(self, session: Session, dealer_client: DealerClient, url: str): + self.__session = session + self.__dealer_client = dealer_client + self.__url = url + self.__ws = websocket.WebSocketApp(url) + + def close(self): + if not self.__closed: + self.__ws.close() + self.__closed = True + if self.__last_scheduled_ping is not None: + self.__scheduler.cancel(self.__last_scheduled_ping) + + def on_failure(self, ws: websocket.WebSocketApp, error): + if self.__closed: + return + self.__dealer_client.logger.warning("An exception occurred. Reconnecting...") + self.close() + + def on_message(self, ws: websocket.WebSocketApp, text: str): + obj = json.loads(text) + self.__dealer_client.wait_for_listener() + typ = MessageType.parse(obj.get("type")) + if typ == MessageType.MESSAGE: + self.__dealer_client.handle_message(obj) + elif typ == MessageType.REQUEST: + self.__dealer_client.handle_request(obj) + elif typ == MessageType.PONG: + self.__received_pong = True + elif typ == MessageType.PING: + pass + else: + raise RuntimeError("Unknown message type for {}".format(typ.value)) + + def on_open(self, ws: websocket.WebSocketApp): + if self.__closed: + self.__dealer_client.logger.fatal("I wonder what happened here... Terminating. [closed: {}]".format(self.__closed)) + self.__dealer_client.logger.debug("Dealer connected! [url: {}]".format(self.__url)) + + def anonymous(): + self.send_ping() + self.__received_pong = False + + def anonymous2(): + if self.__last_scheduled_ping is None: + return + if not self.__received_pong: + self.__dealer_client.logger.warning("Did not receive ping in 3 seconds. Reconnecting...") + self.close() + return + self.__received_pong = False + self.__scheduler.enter(3, 1, anonymous2) + self.__last_scheduled_ping = self.__scheduler.enter(30, 1, anonymous) + self.__last_scheduled_ping = self.__scheduler.enter(30, 1, anonymous) + + def send_ping(self): + self.__ws.send("{\"type\":\"ping\"}") + + def send_reply(self, key: str, result: DealerClient.RequestResult): + success = "true" if result == DealerClient.RequestResult.SUCCESS else "false" + self.__ws.send("{\"type\":\"reply\",\"key\":\"%s\",\"payload\":{\"success\":%s}" % (key, success)) + + class RequestResult(enum.Enum): + UNKNOWN_SEND_COMMAND_RESULT = 0 + SUCCESS = 1 + DEVICE_NOT_FOUND = 2 + CONTEXT_PLAYER_ERROR = 3 + DEVICE_DISAPPEARED = 4 + UPSTREAM_ERROR = 5 + DEVICE_DOES_NOT_SUPPORT_COMMAND = 6 + RATE_LIMITED = 7 + + class EventService(Closeable): logger = logging.getLogger("Librespot:EventService") __session: Session @@ -309,7 +523,27 @@ class EventService(Closeable): return data -class Session(Closeable, SubListener): +class MessageType(enum.Enum): + MESSAGE = "message" + PING = "ping" + PONG = "pong" + REQUEST = "request" + + @staticmethod + def parse(_typ: str): + if _typ == MessageType.MESSAGE.value: + return MessageType.MESSAGE + elif _typ == MessageType.PING.value: + return MessageType.PING + elif _typ == MessageType.PONG.value: + return MessageType.PONG + elif _typ == MessageType.REQUEST.value: + return MessageType.REQUEST + else: + raise TypeError("Unknown MessageType: {}".format(_typ)) + + +class Session(Closeable, MessageListener, SubListener): cipher_pair: typing.Union[CipherPair, None] connection: typing.Union[ConnectionHolder, None] country_code: str @@ -328,6 +562,7 @@ class Session(Closeable, SubListener): __closed = False __closing = False __content_feeder: typing.Union[PlayableContentFeeder, None] + __dealer_client: typing.Union[DealerClient, None] __event_service: typing.Union[EventService, None] __keys: DiffieHellman __mercury_client: MercuryClient @@ -395,12 +630,15 @@ class Session(Closeable, SubListener): self.__cdn_manager = CdnManager(self) self.__content_feeder = PlayableContentFeeder(self) self.__cache_manager = CacheManager(self) + self.__dealer_client = DealerClient(self) self.__event_service = EventService(self) self.__auth_lock_bool = False self.__auth_lock.notify_all() + self.dealer().connect() self.logger.info("Authenticated as {}!".format( self.__ap_welcome.canonical_username)) self.mercury().interested_in("spotify:user:attributes:update", self) + self.dealer().add_message_listener(self, ["hm://connect-state/v1/connect/logout"]) def cache(self) -> CacheManager: self.__wait_auth_lock() @@ -430,6 +668,9 @@ class Session(Closeable, SubListener): self.logger.info("Closing session. device_id: {}".format( self.__inner.device_id)) self.__closing = True + if self.__dealer_client is not None: + self.__dealer_client.close() + self.__dealer_client = None if self.__audio_key_manager is not None: self.__audio_key_manager = None if self.__channel_manager is not None: @@ -555,9 +796,23 @@ class Session(Closeable, SubListener): client = requests.Session() return client + def dealer(self) -> DealerClient: + self.__wait_auth_lock() + if self.__dealer_client is None: + raise RuntimeError("Session isn't authenticated!") + return self.__dealer_client + def device_id(self) -> str: return self.__inner.device_id + def event(self, resp: MercuryClient.Response) -> None: + if resp.uri == "spotify:user:attributes:update": + attributes_update = UserAttributesUpdate() + attributes_update.ParseFromString(resp.payload) + for pair in attributes_update.pairs_list: + self.__user_attributes[pair.key] = pair.value + self.logger.info("Updated user attribute: {} -> {}".format(pair.key, pair.value)) + def get_user_attribute(self, key: str, fallback: str = None) -> str: return self.__user_attributes.get(key) if self.__user_attributes.get( key) is not None else fallback @@ -574,6 +829,10 @@ class Session(Closeable, SubListener): raise RuntimeError("Session isn't authenticated!") return self.__mercury_client + def on_message(self, uri: str, headers: typing.Dict[str, str], payload: bytes): + if uri == "hm://connect-state/v1/connect/logout": + self.close() + def parse_product_info(self, data) -> None: """ Parse product information diff --git a/librespot/structure.py b/librespot/structure.py index 7519729..bbec172 100644 --- a/librespot/structure.py +++ b/librespot/structure.py @@ -4,6 +4,7 @@ import typing if typing.TYPE_CHECKING: from librespot.audio import AbsChunkedInputStream from librespot.audio.format import SuperAudioFormat + from librespot.core import DealerClient from librespot.crypto import Packet from librespot.mercury import MercuryClient from librespot.proto import Metadata_pb2 as Metadata @@ -55,6 +56,11 @@ class HaltListener: raise NotImplementedError +class MessageListener: + def on_message(self, uri: str, headers: typing.Dict[str, str], payload: bytes): + raise NotImplementedError + + class NoopAudioDecrypt(AudioDecrypt): def decrypt_chunk(self, chunk_index: int, buffer: bytes): raise NotImplementedError @@ -68,6 +74,11 @@ class PacketsReceiver: raise NotImplementedError +class RequestListener: + def on_request(self, mid: str, pid: int, sender: str, command: typing.Any) -> DealerClient.RequestResult: + raise NotImplementedError + + class SubListener: def event(self, resp: MercuryClient.Response) -> None: raise NotImplementedError diff --git a/librespot/Version.py b/librespot/version.py similarity index 100% rename from librespot/Version.py rename to librespot/version.py diff --git a/requirements.txt b/requirements.txt index e4ed2da..9fd7329 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,5 @@ defusedxml==0.7.1 protobuf==3.17.3 pycryptodomex==3.10.1 pyogg==0.6.14a.1 -requests==2.26.0 \ No newline at end of file +requests==2.26.0 +websocket-client==1.2.1 \ No newline at end of file