Add dealer support

This commit is contained in:
kokarare1212 2021-09-12 16:56:20 +09:00
parent af4f3c91b9
commit 1eca34a601
No known key found for this signature in database
GPG Key ID: 9FB32C7C7D874F7A
5 changed files with 275 additions and 6 deletions

View File

@ -1,3 +1 @@
# API Reference
Coming soon :)
# API Reference

View File

@ -11,12 +11,14 @@ from librespot.crypto import CipherPair, DiffieHellman, Packet
from librespot.mercury import MercuryClient, MercuryRequests, RawMercuryRequest
from librespot.metadata import AlbumId, ArtistId, EpisodeId, ShowId, TrackId
from librespot.proto import Authentication_pb2 as Authentication, Connect_pb2 as Connect, Keyexchange_pb2 as Keyexchange, Metadata_pb2 as Metadata
from librespot.structure import Closeable, SubListener
from librespot.proto.ExplicitContentPubsub_pb2 import UserAttributesUpdate
from librespot.structure import Closeable, MessageListener, RequestListener, SubListener
from librespot.version import Version
import base64
import concurrent.futures
import defusedxml.ElementTree
import enum
import gzip
import io
import json
import logging
@ -30,6 +32,7 @@ import struct
import threading
import time
import typing
import websocket
class ApiClient(Closeable):
@ -215,6 +218,217 @@ class ApResolver:
return ApResolver.get_random_of("accesspoint")
class DealerClient(Closeable):
logger = logging.getLogger("Librespot:DealerClient")
__connection: typing.Union[ConnectionHolder, None]
__last_scheduled_reconnection: typing.Union[sched.Event, None]
__message_listeners: typing.Dict[MessageListener, typing.List[str]] = {}
__message_listeners_lock = threading.Condition()
__request_listeners: typing.Dict[str, RequestListener] = {}
__request_listeners_lock = threading.Condition()
__scheduler = sched.scheduler()
__session: Session
__worker = concurrent.futures.ThreadPoolExecutor()
def __init__(self, session: Session):
self.__session = session
def add_message_listener(self, listener: MessageListener, uris: list[str]) -> None:
with self.__message_listeners_lock:
if listener in self.__message_listeners.keys():
raise TypeError("A listener for {} has already been added.".format(uris))
self.__message_listeners[listener] = uris
self.__message_listeners_lock.notify_all()
def add_request_listener(self, listener: RequestListener, uri: str):
with self.__request_listeners_lock:
if uri in self.__request_listeners.keys():
raise TypeError("A listener for '{}' has already been added.".format(uri))
self.__request_listeners[uri] = listener
self.__request_listeners_lock.notify_all()
def close(self) -> None:
self.__worker.shutdown()
def connect(self) -> None:
self.__connection = DealerClient.ConnectionHolder(self.__session, self, "wss://{}/?access_token={}"
.format(ApResolver.get_random_dealer(), self.__session.tokens().get("playlist-read")))
def connection_invalided(self) -> None:
self.__connection = None
self.logger.debug("Scheduled reconnection attempt in 10 seconds...")
def anonymous():
self.__last_scheduled_reconnection = None
self.connect()
self.__last_scheduled_reconnection = self.__scheduler.enter(10, 1, anonymous)
def handle_message(self, obj: typing.Any) -> None:
uri = obj.get("uri")
headers = self.__get_headers(obj)
payloads = obj.get("payloads")
decoded_payloads: typing.Any
if payloads is not None:
if headers.get("Content-Type") == "application/json":
decoded_payloads = payloads
elif headers.get("Content-Type") == "plain/text":
decoded_payloads = payloads
else:
decoded_payloads = base64.b64decode(payloads)
if headers.get("Transfer-Encoding") == "gzip":
decoded_payloads = gzip.decompress(decoded_payloads)
else:
decoded_payloads = b""
interesting = False
with self.__message_listeners_lock:
for listener in self.__message_listeners.keys():
dispatched = False
keys = self.__message_listeners.get(listener)
for key in keys:
if uri.startswith(key) and not dispatched:
interesting = True
def anonymous():
listener.on_message(uri, headers, decoded_payloads)
self.__worker.submit(anonymous)
dispatched = True
if not interesting:
self.logger.debug("Couldn't dispatch message: {}".format(uri))
def handle_request(self, obj: typing.Any) -> None:
mid = obj.get("message_ident")
key = obj.get("key")
headers = self.__get_headers(obj)
payload = obj.get("payload")
if headers.get("Transfer-Encoding") == "gzip":
gz = base64.b64decode(payload.get("compressed"))
payload = json.loads(gzip.decompress(gz))
pid = payload.get("message_id")
sender = payload.get("sent_by_device_id")
command = payload.get("command")
self.logger.debug("Received request. [mid: {}, key: {}, pid: {}, sender: {}, command: {}]".format(mid, key, pid, sender, command))
interesting = False
with self.__request_listeners_lock:
for mid_prefix in self.__request_listeners.keys():
if mid.startswith(mid_prefix):
listener = self.__request_listeners.get(mid_prefix)
interesting = True
def anonymous():
result = listener.on_request(mid, pid, sender, command)
if self.__connection is not None:
self.__connection.send_reply(key, result)
self.logger.warning("Handled request. [key: {}, result: {}]".format(key, result))
self.__worker.submit(anonymous)
if not interesting:
self.logger.debug("Couldn't dispatch request: {}".format(mid))
def remove_message_listener(self, listener: MessageListener) -> None:
with self.__message_listeners_lock:
self.__message_listeners.pop(listener)
def remove_request_listener(self, listener: RequestListener) -> None:
with self.__request_listeners_lock:
for key, value in self.__request_listeners.items():
if value == listener:
self.__request_listeners.pop(key)
break
def wait_for_listener(self) -> None:
with self.__message_listeners_lock:
if self.__message_listeners == {}:
return
self.__message_listeners_lock.wait()
def __get_headers(self, obj: typing.Any) -> dict[str, str]:
headers = obj.get("headers")
if headers is None:
return {}
return headers
class ConnectionHolder(Closeable):
__closed = False
__dealer_client: DealerClient
__last_scheduled_ping: sched.Event
__received_pong = False
__scheduler = sched.scheduler()
__session: Session
__url: str
__ws: websocket.WebSocketApp
def __init__(self, session: Session, dealer_client: DealerClient, url: str):
self.__session = session
self.__dealer_client = dealer_client
self.__url = url
self.__ws = websocket.WebSocketApp(url)
def close(self):
if not self.__closed:
self.__ws.close()
self.__closed = True
if self.__last_scheduled_ping is not None:
self.__scheduler.cancel(self.__last_scheduled_ping)
def on_failure(self, ws: websocket.WebSocketApp, error):
if self.__closed:
return
self.__dealer_client.logger.warning("An exception occurred. Reconnecting...")
self.close()
def on_message(self, ws: websocket.WebSocketApp, text: str):
obj = json.loads(text)
self.__dealer_client.wait_for_listener()
typ = MessageType.parse(obj.get("type"))
if typ == MessageType.MESSAGE:
self.__dealer_client.handle_message(obj)
elif typ == MessageType.REQUEST:
self.__dealer_client.handle_request(obj)
elif typ == MessageType.PONG:
self.__received_pong = True
elif typ == MessageType.PING:
pass
else:
raise RuntimeError("Unknown message type for {}".format(typ.value))
def on_open(self, ws: websocket.WebSocketApp):
if self.__closed:
self.__dealer_client.logger.fatal("I wonder what happened here... Terminating. [closed: {}]".format(self.__closed))
self.__dealer_client.logger.debug("Dealer connected! [url: {}]".format(self.__url))
def anonymous():
self.send_ping()
self.__received_pong = False
def anonymous2():
if self.__last_scheduled_ping is None:
return
if not self.__received_pong:
self.__dealer_client.logger.warning("Did not receive ping in 3 seconds. Reconnecting...")
self.close()
return
self.__received_pong = False
self.__scheduler.enter(3, 1, anonymous2)
self.__last_scheduled_ping = self.__scheduler.enter(30, 1, anonymous)
self.__last_scheduled_ping = self.__scheduler.enter(30, 1, anonymous)
def send_ping(self):
self.__ws.send("{\"type\":\"ping\"}")
def send_reply(self, key: str, result: DealerClient.RequestResult):
success = "true" if result == DealerClient.RequestResult.SUCCESS else "false"
self.__ws.send("{\"type\":\"reply\",\"key\":\"%s\",\"payload\":{\"success\":%s}" % (key, success))
class RequestResult(enum.Enum):
UNKNOWN_SEND_COMMAND_RESULT = 0
SUCCESS = 1
DEVICE_NOT_FOUND = 2
CONTEXT_PLAYER_ERROR = 3
DEVICE_DISAPPEARED = 4
UPSTREAM_ERROR = 5
DEVICE_DOES_NOT_SUPPORT_COMMAND = 6
RATE_LIMITED = 7
class EventService(Closeable):
logger = logging.getLogger("Librespot:EventService")
__session: Session
@ -309,7 +523,27 @@ class EventService(Closeable):
return data
class Session(Closeable, SubListener):
class MessageType(enum.Enum):
MESSAGE = "message"
PING = "ping"
PONG = "pong"
REQUEST = "request"
@staticmethod
def parse(_typ: str):
if _typ == MessageType.MESSAGE.value:
return MessageType.MESSAGE
elif _typ == MessageType.PING.value:
return MessageType.PING
elif _typ == MessageType.PONG.value:
return MessageType.PONG
elif _typ == MessageType.REQUEST.value:
return MessageType.REQUEST
else:
raise TypeError("Unknown MessageType: {}".format(_typ))
class Session(Closeable, MessageListener, SubListener):
cipher_pair: typing.Union[CipherPair, None]
connection: typing.Union[ConnectionHolder, None]
country_code: str
@ -328,6 +562,7 @@ class Session(Closeable, SubListener):
__closed = False
__closing = False
__content_feeder: typing.Union[PlayableContentFeeder, None]
__dealer_client: typing.Union[DealerClient, None]
__event_service: typing.Union[EventService, None]
__keys: DiffieHellman
__mercury_client: MercuryClient
@ -395,12 +630,15 @@ class Session(Closeable, SubListener):
self.__cdn_manager = CdnManager(self)
self.__content_feeder = PlayableContentFeeder(self)
self.__cache_manager = CacheManager(self)
self.__dealer_client = DealerClient(self)
self.__event_service = EventService(self)
self.__auth_lock_bool = False
self.__auth_lock.notify_all()
self.dealer().connect()
self.logger.info("Authenticated as {}!".format(
self.__ap_welcome.canonical_username))
self.mercury().interested_in("spotify:user:attributes:update", self)
self.dealer().add_message_listener(self, ["hm://connect-state/v1/connect/logout"])
def cache(self) -> CacheManager:
self.__wait_auth_lock()
@ -430,6 +668,9 @@ class Session(Closeable, SubListener):
self.logger.info("Closing session. device_id: {}".format(
self.__inner.device_id))
self.__closing = True
if self.__dealer_client is not None:
self.__dealer_client.close()
self.__dealer_client = None
if self.__audio_key_manager is not None:
self.__audio_key_manager = None
if self.__channel_manager is not None:
@ -555,9 +796,23 @@ class Session(Closeable, SubListener):
client = requests.Session()
return client
def dealer(self) -> DealerClient:
self.__wait_auth_lock()
if self.__dealer_client is None:
raise RuntimeError("Session isn't authenticated!")
return self.__dealer_client
def device_id(self) -> str:
return self.__inner.device_id
def event(self, resp: MercuryClient.Response) -> None:
if resp.uri == "spotify:user:attributes:update":
attributes_update = UserAttributesUpdate()
attributes_update.ParseFromString(resp.payload)
for pair in attributes_update.pairs_list:
self.__user_attributes[pair.key] = pair.value
self.logger.info("Updated user attribute: {} -> {}".format(pair.key, pair.value))
def get_user_attribute(self, key: str, fallback: str = None) -> str:
return self.__user_attributes.get(key) if self.__user_attributes.get(
key) is not None else fallback
@ -574,6 +829,10 @@ class Session(Closeable, SubListener):
raise RuntimeError("Session isn't authenticated!")
return self.__mercury_client
def on_message(self, uri: str, headers: typing.Dict[str, str], payload: bytes):
if uri == "hm://connect-state/v1/connect/logout":
self.close()
def parse_product_info(self, data) -> None:
"""
Parse product information

View File

@ -4,6 +4,7 @@ import typing
if typing.TYPE_CHECKING:
from librespot.audio import AbsChunkedInputStream
from librespot.audio.format import SuperAudioFormat
from librespot.core import DealerClient
from librespot.crypto import Packet
from librespot.mercury import MercuryClient
from librespot.proto import Metadata_pb2 as Metadata
@ -55,6 +56,11 @@ class HaltListener:
raise NotImplementedError
class MessageListener:
def on_message(self, uri: str, headers: typing.Dict[str, str], payload: bytes):
raise NotImplementedError
class NoopAudioDecrypt(AudioDecrypt):
def decrypt_chunk(self, chunk_index: int, buffer: bytes):
raise NotImplementedError
@ -68,6 +74,11 @@ class PacketsReceiver:
raise NotImplementedError
class RequestListener:
def on_request(self, mid: str, pid: int, sender: str, command: typing.Any) -> DealerClient.RequestResult:
raise NotImplementedError
class SubListener:
def event(self, resp: MercuryClient.Response) -> None:
raise NotImplementedError

View File

@ -2,4 +2,5 @@ defusedxml==0.7.1
protobuf==3.17.3
pycryptodomex==3.10.1
pyogg==0.6.14a.1
requests==2.26.0
requests==2.26.0
websocket-client==1.2.1