librespot-python/librespot/core.py

1914 lines
71 KiB
Python
Raw Normal View History

2021-09-12 05:58:24 +02:00
from __future__ import annotations
from Cryptodome import Random
from Cryptodome.Cipher import AES
2021-09-12 05:58:24 +02:00
from Cryptodome.Hash import HMAC, SHA1
from Cryptodome.Protocol.KDF import PBKDF2
2021-09-12 05:58:24 +02:00
from Cryptodome.PublicKey import RSA
from Cryptodome.Signature import PKCS1_v1_5
2021-09-12 23:47:53 +02:00
from librespot import util, Version
2021-09-12 05:58:24 +02:00
from librespot.audio import AudioKeyManager, CdnManager, PlayableContentFeeder
from librespot.audio.storage import ChannelManager
from librespot.cache import CacheManager
from librespot.crypto import CipherPair, DiffieHellman, Packet
from librespot.mercury import MercuryClient, MercuryRequests, RawMercuryRequest
2023-01-03 02:16:51 +01:00
from librespot.metadata import AlbumId, ArtistId, EpisodeId, ShowId, TrackId, PlaylistId
from librespot.proto import Authentication_pb2 as Authentication, ClientToken_pb2 as ClientToken, Connect_pb2 as Connect, Connectivity_pb2 as Connectivity, Keyexchange_pb2 as Keyexchange, Metadata_pb2 as Metadata, Playlist4External_pb2 as Playlist4External
2021-09-12 09:56:20 +02:00
from librespot.proto.ExplicitContentPubsub_pb2 import UserAttributesUpdate
from librespot.structure import Closeable, MessageListener, RequestListener, SubListener
2021-09-12 05:58:24 +02:00
import base64
2023-05-21 11:55:13 +02:00
import binascii
2021-09-12 05:58:24 +02:00
import concurrent.futures
import defusedxml.ElementTree
import enum
2021-09-12 09:56:20 +02:00
import gzip
2021-09-12 05:58:24 +02:00
import io
import json
import logging
import os
import random
import requests
import sched
import socket
import struct
import threading
import time
import typing
2021-09-12 23:47:53 +02:00
import urllib.parse
2021-09-12 09:56:20 +02:00
import websocket
2021-09-12 05:58:24 +02:00
class ApiClient(Closeable):
logger = logging.getLogger("Librespot:ApiClient")
__base_url: str
2022-06-28 13:55:10 +02:00
__client_token_str: str = None
2021-09-12 05:58:24 +02:00
__session: Session
def __init__(self, session: Session):
self.__session = session
self.__base_url = "https://{}".format(ApResolver.get_random_spclient())
def build_request(
self, method: str, suffix: str,
headers: typing.Union[None, typing.Dict[str, str]],
body: typing.Union[None, bytes]) -> requests.PreparedRequest:
2022-06-28 13:55:10 +02:00
if self.__client_token_str is None:
resp = self.__client_token()
self.__client_token_str = resp.granted_token.token
self.logger.debug("Updated client token: {}".format(self.__client_token_str))
2021-09-12 05:58:24 +02:00
request = requests.PreparedRequest()
request.method = method
request.data = body
request.headers = {}
if headers is not None:
request.headers = headers
request.headers["Authorization"] = "Bearer {}".format(
self.__session.tokens().get("playlist-read"))
2022-06-28 23:47:04 +02:00
request.headers["client-token"] = self.__client_token_str
2021-09-12 05:58:24 +02:00
request.url = self.__base_url + suffix
return request
def send(self, method: str, suffix: str,
headers: typing.Union[None, typing.Dict[str, str]],
body: typing.Union[None, bytes]) -> requests.Response:
response = self.__session.client().send(
self.build_request(method, suffix, headers, body))
2021-09-12 05:58:24 +02:00
return response
def put_connect_state(self, connection_id: str,
proto: Connect.PutStateRequest) -> None:
2021-09-12 05:58:24 +02:00
response = self.send(
"PUT",
"/connect-state/v1/devices/{}".format(self.__session.device_id()),
{
"Content-Type": "application/protobuf",
"X-Spotify-Connection-Id": connection_id
},
2021-09-12 05:58:24 +02:00
proto.SerializeToString(),
)
if response.status_code == 413:
self.logger.warning(
"PUT state payload is too large: {} bytes uncompressed.".
format(len(proto.SerializeToString())))
2021-09-12 05:58:24 +02:00
elif response.status_code != 200:
self.logger.warning("PUT state returned {}. headers: {}".format(
response.status_code, response.headers))
2021-09-12 05:58:24 +02:00
def get_metadata_4_track(self, track: TrackId) -> Metadata.Track:
response = self.send("GET",
"/metadata/4/track/{}".format(track.hex_id()),
None, None)
2021-09-12 05:58:24 +02:00
ApiClient.StatusCodeException.check_status(response)
body = response.content
if body is None:
raise RuntimeError()
proto = Metadata.Track()
proto.ParseFromString(body)
return proto
def get_metadata_4_episode(self, episode: EpisodeId) -> Metadata.Episode:
response = self.send("GET",
"/metadata/4/episode/{}".format(episode.hex_id()),
None, None)
2021-09-12 05:58:24 +02:00
ApiClient.StatusCodeException.check_status(response)
body = response.content
if body is None:
raise IOError()
proto = Metadata.Episode()
proto.ParseFromString(body)
return proto
def get_metadata_4_album(self, album: AlbumId) -> Metadata.Album:
response = self.send("GET",
"/metadata/4/album/{}".format(album.hex_id()),
None, None)
2021-09-12 05:58:24 +02:00
ApiClient.StatusCodeException.check_status(response)
body = response.content
if body is None:
raise IOError()
proto = Metadata.Album()
proto.ParseFromString(body)
return proto
def get_metadata_4_artist(self, artist: ArtistId) -> Metadata.Artist:
response = self.send("GET",
"/metadata/4/artist/{}".format(artist.hex_id()),
None, None)
2021-09-12 05:58:24 +02:00
ApiClient.StatusCodeException.check_status(response)
body = response.content
if body is None:
raise IOError()
proto = Metadata.Artist()
proto.ParseFromString(body)
return proto
def get_metadata_4_show(self, show: ShowId) -> Metadata.Show:
response = self.send("GET",
"/metadata/4/show/{}".format(show.hex_id()), None,
None)
2021-09-12 05:58:24 +02:00
ApiClient.StatusCodeException.check_status(response)
body = response.content
if body is None:
raise IOError()
proto = Metadata.Show()
proto.ParseFromString(body)
return proto
2023-01-03 02:16:51 +01:00
def get_playlist(self, _id: PlaylistId) -> Playlist4External.SelectedListContent:
response = self.send("GET",
"/playlist/v2/playlist/{}".format(_id.id()), None,
None)
ApiClient.StatusCodeException.check_status(response)
body = response.content
if body is None:
raise IOError()
proto = Playlist4External.SelectedListContent()
proto.ParseFromString(body)
return proto
2022-06-28 13:55:10 +02:00
def set_client_token(self, client_token):
self.__client_token_str = client_token
def __client_token(self):
proto_req = ClientToken.ClientTokenRequest(
request_type=ClientToken.ClientTokenRequestType.REQUEST_CLIENT_DATA_REQUEST,
client_data=ClientToken.ClientDataRequest(
client_id=MercuryRequests.keymaster_client_id,
client_version=Version.version_name,
connectivity_sdk_data=Connectivity.ConnectivitySdkData(
device_id=self.__session.device_id(),
platform_specific_data=Connectivity.PlatformSpecificData(
windows=Connectivity.NativeWindowsData(
something1=10,
something3=21370,
something4=2,
something6=9,
something7=332,
something8=33404,
something10=True,
),
),
),
),
)
resp = requests.post("https://clienttoken.spotify.com/v1/clienttoken",
proto_req.SerializeToString(),
headers={
"Accept": "application/x-protobuf",
"Content-Encoding": "",
})
ApiClient.StatusCodeException.check_status(resp)
proto_resp = ClientToken.ClientTokenResponse()
proto_resp.ParseFromString(resp.content)
return proto_resp
2021-09-12 05:58:24 +02:00
class StatusCodeException(IOError):
code: int
def __init__(self, response: requests.Response):
super().__init__(response.status_code)
self.code = response.status_code
@staticmethod
def check_status(response: requests.Response) -> None:
if response.status_code != 200:
raise ApiClient.StatusCodeException(response)
class ApResolver:
base_url = "https://apresolve.spotify.com/"
@staticmethod
def request(service_type: str) -> typing.Any:
"""
Gets the specified ApResolve
Args:
service_type: Unique ID for service name
Returns:
The resulting object will be returned
"""
response = requests.get("{}?type={}".format(ApResolver.base_url,
service_type))
2021-09-12 05:58:24 +02:00
return response.json()
@staticmethod
def get_random_of(service_type: str) -> str:
"""
Gets the specified random ApResolve url
Args:
service_type: Unique ID for service name
Returns:
A random ApResolve url will be returned
"""
pool = ApResolver.request(service_type)
urls = pool.get(service_type)
if urls is None or len(urls) == 0:
raise RuntimeError("No ApResolve url available")
return random.choice(urls)
@staticmethod
def get_random_dealer() -> str:
"""
Get dealer endpoint url
Returns:
dealer endpoint url
"""
return ApResolver.get_random_of("dealer")
@staticmethod
def get_random_spclient() -> str:
"""
Get spclient endpoint url
Returns:
spclient endpoint url
"""
return ApResolver.get_random_of("spclient")
@staticmethod
def get_random_accesspoint() -> str:
"""
Get accesspoint endpoint url
Returns:
accesspoint endpoint url
"""
return ApResolver.get_random_of("accesspoint")
2021-09-12 09:56:20 +02:00
class DealerClient(Closeable):
logger = logging.getLogger("Librespot:DealerClient")
__connection: typing.Union[ConnectionHolder, None]
__last_scheduled_reconnection: typing.Union[sched.Event, None]
__message_listeners: typing.Dict[MessageListener, typing.List[str]] = {}
__message_listeners_lock = threading.Condition()
__request_listeners: typing.Dict[str, RequestListener] = {}
__request_listeners_lock = threading.Condition()
__scheduler = sched.scheduler()
__session: Session
__worker = concurrent.futures.ThreadPoolExecutor()
def __init__(self, session: Session):
self.__session = session
def add_message_listener(self, listener: MessageListener,
uris: list[str]) -> None:
2021-09-12 09:56:20 +02:00
with self.__message_listeners_lock:
2021-12-26 05:59:47 +01:00
if listener in self.__message_listeners:
raise TypeError(
"A listener for {} has already been added.".format(uris))
2021-09-12 09:56:20 +02:00
self.__message_listeners[listener] = uris
self.__message_listeners_lock.notify_all()
def add_request_listener(self, listener: RequestListener, uri: str):
with self.__request_listeners_lock:
2021-12-26 05:59:47 +01:00
if uri in self.__request_listeners:
raise TypeError(
"A listener for '{}' has already been added.".format(uri))
2021-09-12 09:56:20 +02:00
self.__request_listeners[uri] = listener
self.__request_listeners_lock.notify_all()
def close(self) -> None:
self.__worker.shutdown()
def connect(self) -> None:
self.__connection = DealerClient.ConnectionHolder(
self.__session, self, "wss://{}/?access_token={}".format(
ApResolver.get_random_dealer(),
self.__session.tokens().get("playlist-read")))
2021-09-12 09:56:20 +02:00
def connection_invalided(self) -> None:
self.__connection = None
self.logger.debug("Scheduled reconnection attempt in 10 seconds...")
def anonymous():
self.__last_scheduled_reconnection = None
self.connect()
self.__last_scheduled_reconnection = self.__scheduler.enter(
10, 1, anonymous)
2021-09-12 09:56:20 +02:00
def handle_message(self, obj: typing.Any) -> None:
uri = obj.get("uri")
headers = self.__get_headers(obj)
payloads = obj.get("payloads")
decoded_payloads: typing.Any
if payloads is not None:
if headers.get("Content-Type") == "application/json":
decoded_payloads = payloads
elif headers.get("Content-Type") == "plain/text":
decoded_payloads = payloads
else:
decoded_payloads = base64.b64decode(payloads)
if headers.get("Transfer-Encoding") == "gzip":
decoded_payloads = gzip.decompress(decoded_payloads)
else:
decoded_payloads = b""
interesting = False
with self.__message_listeners_lock:
2021-12-26 05:59:47 +01:00
for listener in self.__message_listeners:
2021-09-12 09:56:20 +02:00
dispatched = False
keys = self.__message_listeners.get(listener)
for key in keys:
if uri.startswith(key) and not dispatched:
interesting = True
def anonymous():
listener.on_message(uri, headers, decoded_payloads)
2021-09-12 09:56:20 +02:00
self.__worker.submit(anonymous)
dispatched = True
if not interesting:
self.logger.debug("Couldn't dispatch message: {}".format(uri))
def handle_request(self, obj: typing.Any) -> None:
mid = obj.get("message_ident")
key = obj.get("key")
headers = self.__get_headers(obj)
payload = obj.get("payload")
if headers.get("Transfer-Encoding") == "gzip":
gz = base64.b64decode(payload.get("compressed"))
payload = json.loads(gzip.decompress(gz))
pid = payload.get("message_id")
sender = payload.get("sent_by_device_id")
command = payload.get("command")
self.logger.debug(
"Received request. [mid: {}, key: {}, pid: {}, sender: {}, command: {}]"
.format(mid, key, pid, sender, command))
2021-09-12 09:56:20 +02:00
interesting = False
with self.__request_listeners_lock:
2021-12-26 05:59:47 +01:00
for mid_prefix in self.__request_listeners:
2021-09-12 09:56:20 +02:00
if mid.startswith(mid_prefix):
listener = self.__request_listeners.get(mid_prefix)
interesting = True
def anonymous():
result = listener.on_request(mid, pid, sender, command)
if self.__connection is not None:
self.__connection.send_reply(key, result)
self.logger.warning(
"Handled request. [key: {}, result: {}]".format(
key, result))
2021-09-12 09:56:20 +02:00
self.__worker.submit(anonymous)
if not interesting:
self.logger.debug("Couldn't dispatch request: {}".format(mid))
def remove_message_listener(self, listener: MessageListener) -> None:
with self.__message_listeners_lock:
self.__message_listeners.pop(listener)
def remove_request_listener(self, listener: RequestListener) -> None:
with self.__request_listeners_lock:
2021-12-25 07:49:19 +01:00
request_listeners = {}
2021-09-12 09:56:20 +02:00
for key, value in self.__request_listeners.items():
2021-12-25 07:49:19 +01:00
if value != listener:
request_listeners[key] = value
self.__request_listeners = request_listeners
2021-09-12 09:56:20 +02:00
def wait_for_listener(self) -> None:
with self.__message_listeners_lock:
if self.__message_listeners == {}:
return
self.__message_listeners_lock.wait()
def __get_headers(self, obj: typing.Any) -> dict[str, str]:
headers = obj.get("headers")
if headers is None:
return {}
return headers
class ConnectionHolder(Closeable):
__closed = False
__dealer_client: DealerClient
__last_scheduled_ping: sched.Event
__received_pong = False
__scheduler = sched.scheduler()
__session: Session
__url: str
__ws: websocket.WebSocketApp
def __init__(self, session: Session, dealer_client: DealerClient,
url: str):
2021-09-12 09:56:20 +02:00
self.__session = session
self.__dealer_client = dealer_client
self.__url = url
self.__ws = websocket.WebSocketApp(url)
def close(self):
if not self.__closed:
self.__ws.close()
self.__closed = True
if self.__last_scheduled_ping is not None:
self.__scheduler.cancel(self.__last_scheduled_ping)
def on_failure(self, ws: websocket.WebSocketApp, error):
if self.__closed:
return
self.__dealer_client.logger.warning(
"An exception occurred. Reconnecting...")
2021-09-12 09:56:20 +02:00
self.close()
def on_message(self, ws: websocket.WebSocketApp, text: str):
obj = json.loads(text)
self.__dealer_client.wait_for_listener()
typ = MessageType.parse(obj.get("type"))
if typ == MessageType.MESSAGE:
self.__dealer_client.handle_message(obj)
elif typ == MessageType.REQUEST:
self.__dealer_client.handle_request(obj)
elif typ == MessageType.PONG:
self.__received_pong = True
elif typ == MessageType.PING:
pass
else:
raise RuntimeError("Unknown message type for {}".format(
typ.value))
2021-09-12 09:56:20 +02:00
def on_open(self, ws: websocket.WebSocketApp):
if self.__closed:
self.__dealer_client.logger.fatal(
"I wonder what happened here... Terminating. [closed: {}]".
format(self.__closed))
self.__dealer_client.logger.debug(
"Dealer connected! [url: {}]".format(self.__url))
2021-09-12 09:56:20 +02:00
def anonymous():
self.send_ping()
self.__received_pong = False
def anonymous2():
if self.__last_scheduled_ping is None:
return
if not self.__received_pong:
self.__dealer_client.logger.warning(
"Did not receive ping in 3 seconds. Reconnecting..."
)
2021-09-12 09:56:20 +02:00
self.close()
return
self.__received_pong = False
2021-09-12 09:56:20 +02:00
self.__scheduler.enter(3, 1, anonymous2)
self.__last_scheduled_ping = self.__scheduler.enter(
30, 1, anonymous)
self.__last_scheduled_ping = self.__scheduler.enter(
30, 1, anonymous)
2021-09-12 09:56:20 +02:00
def send_ping(self):
self.__ws.send("{\"type\":\"ping\"}")
def send_reply(self, key: str, result: DealerClient.RequestResult):
success = "true" if result == DealerClient.RequestResult.SUCCESS else "false"
self.__ws.send(
"{\"type\":\"reply\",\"key\":\"%s\",\"payload\":{\"success\":%s}"
% (key, success))
2021-09-12 09:56:20 +02:00
class RequestResult(enum.Enum):
UNKNOWN_SEND_COMMAND_RESULT = 0
SUCCESS = 1
DEVICE_NOT_FOUND = 2
CONTEXT_PLAYER_ERROR = 3
DEVICE_DISAPPEARED = 4
UPSTREAM_ERROR = 5
DEVICE_DOES_NOT_SUPPORT_COMMAND = 6
RATE_LIMITED = 7
2021-09-12 05:58:24 +02:00
class EventService(Closeable):
logger = logging.getLogger("Librespot:EventService")
__session: Session
__worker = concurrent.futures.ThreadPoolExecutor()
def __init__(self, session: Session):
self.__session = session
def __worker_callback(self, event_builder: EventBuilder):
try:
body = event_builder.to_array()
resp = self.__session.mercury().send_sync(
RawMercuryRequest.Builder().set_uri(
"hm://event-service/v1/events").set_method("POST").
add_user_field("Accept-Language", "en").add_user_field(
"X-ClientTimeStamp",
int(time.time() * 1000)).add_payload_part(body).build())
self.logger.debug("Event sent. body: {}, result: {}".format(
body, resp.status_code))
2021-09-12 05:58:24 +02:00
except IOError as ex:
self.logger.error("Failed sending event: {} {}".format(
event_builder, ex))
2021-09-12 05:58:24 +02:00
def send_event(self, event_or_builder: typing.Union[GenericEvent,
EventBuilder]):
2021-09-12 05:58:24 +02:00
if type(event_or_builder) is EventService.GenericEvent:
builder = event_or_builder.build()
elif type(event_or_builder) is EventService.EventBuilder:
builder = event_or_builder
else:
raise TypeError()
self.__worker.submit(lambda: self.__worker_callback(builder))
def language(self, lang: str):
event = EventService.EventBuilder(EventService.Type.LANGUAGE)
event.append(s=lang)
def close(self):
self.__worker.shutdown()
class Type(enum.Enum):
LANGUAGE = ("812", 1)
FETCHED_FILE_ID = ("274", 3)
NEW_SESSION_ID = ("557", 3)
NEW_PLAYBACK_ID = ("558", 1)
TRACK_PLAYED = ("372", 1)
TRACK_TRANSITION = ("12", 37)
CDN_REQUEST = ("10", 20)
eventId: str
unknown: str
def __init__(self, event_id: str, unknown: str):
self.eventId = event_id
self.unknown = unknown
class GenericEvent:
def build(self) -> EventService.EventBuilder:
raise NotImplementedError
class EventBuilder:
2021-11-21 13:57:57 +01:00
body: io.BytesIO
2021-09-12 05:58:24 +02:00
def __init__(self, event_type: EventService.Type):
2021-11-21 13:57:57 +01:00
self.body = io.BytesIO()
2021-09-12 05:58:24 +02:00
self.append_no_delimiter(event_type.value[0])
self.append(event_type.value[1])
def append_no_delimiter(self, s: str = None) -> None:
if s is None:
s = ""
self.body.write(s.encode())
def append(self,
c: int = None,
s: str = None) -> EventService.EventBuilder:
2021-09-12 05:58:24 +02:00
if c is None and s is None or c is not None and s is not None:
raise TypeError()
if c is not None:
self.body.write(b"\x09")
self.body.write(bytes([c]))
return self
if s is not None:
self.body.write(b"\x09")
self.append_no_delimiter(s)
return self
def to_array(self) -> bytes:
pos = self.body.tell()
self.body.seek(0)
data = self.body.read()
self.body.seek(pos)
return data
2021-09-12 09:56:20 +02:00
class MessageType(enum.Enum):
MESSAGE = "message"
PING = "ping"
PONG = "pong"
REQUEST = "request"
@staticmethod
def parse(_typ: str):
if _typ == MessageType.MESSAGE.value:
return MessageType.MESSAGE
if _typ == MessageType.PING.value:
2021-09-12 09:56:20 +02:00
return MessageType.PING
if _typ == MessageType.PONG.value:
2021-09-12 09:56:20 +02:00
return MessageType.PONG
if _typ == MessageType.REQUEST.value:
2021-09-12 09:56:20 +02:00
return MessageType.REQUEST
raise TypeError("Unknown MessageType: {}".format(_typ))
2021-09-12 09:56:20 +02:00
class Session(Closeable, MessageListener, SubListener):
2021-09-12 05:58:24 +02:00
cipher_pair: typing.Union[CipherPair, None]
2021-09-12 11:40:51 +02:00
country_code: str = "EN"
2021-09-12 05:58:24 +02:00
connection: typing.Union[ConnectionHolder, None]
logger = logging.getLogger("Librespot:Session")
scheduled_reconnect: typing.Union[sched.Event, None] = None
scheduler = sched.scheduler(time.time)
__api: ApiClient
__ap_welcome: Authentication.APWelcome
2021-11-21 13:57:57 +01:00
__audio_key_manager: typing.Union[AudioKeyManager, None] = None
2021-09-12 05:58:24 +02:00
__auth_lock = threading.Condition()
__auth_lock_bool = False
__cache_manager: typing.Union[CacheManager, None]
__cdn_manager: typing.Union[CdnManager, None]
2021-11-21 13:57:57 +01:00
__channel_manager: typing.Union[ChannelManager, None] = None
2021-09-12 05:58:24 +02:00
__client: typing.Union[requests.Session, None]
__closed = False
__closing = False
__content_feeder: typing.Union[PlayableContentFeeder, None]
2021-11-21 13:57:57 +01:00
__dealer_client: typing.Union[DealerClient, None] = None
__event_service: typing.Union[EventService, None] = None
2021-09-12 05:58:24 +02:00
__keys: DiffieHellman
__mercury_client: MercuryClient
2021-11-21 13:57:57 +01:00
__receiver: typing.Union[Receiver, None] = None
2021-09-12 11:40:51 +02:00
__search: typing.Union[SearchManager, None]
2021-09-12 05:58:24 +02:00
__server_key = b"\xac\xe0F\x0b\xff\xc20\xaf\xf4k\xfe\xc3\xbf\xbf\x86=" \
b"\xa1\x91\xc6\xcc3l\x93\xa1O\xb3\xb0\x16\x12\xac\xacj" \
b"\xf1\x80\xe7\xf6\x14\xd9B\x9d\xbe.4fC\xe3b\xd22z\x1a" \
b"\r\x92;\xae\xdd\x14\x02\xb1\x81U\x05a\x04\xd5,\x96\xa4" \
b"L\x1e\xcc\x02J\xd4\xb2\x0c\x00\x1f\x17\xed\xc2/\xc45" \
b"!\xc8\xf0\xcb\xae\xd2\xad\xd7+\x0f\x9d\xb3\xc52\x1a*" \
b"\xfeY\xf3Z\r\xach\xf1\xfab\x1e\xfb,\x8d\x0c\xb79-\x92" \
b"G\xe3\xd75\x1am\xbd$\xc2\xae%[\x88\xff\xabs)\x8a\x0b" \
b"\xcc\xcd\x0cXg1\x89\xe8\xbd4\x80xJ_\xc9k\x89\x9d\x95k" \
b"\xfc\x86\xd7O3\xa6x\x17\x96\xc9\xc3-\r2\xa5\xab\xcd\x05'" \
b"\xe2\xf7\x10\xa3\x96\x13\xc4/\x99\xc0'\xbf\xed\x04\x9c" \
b"<'X\x04\xb6\xb2\x19\xf9\xc1/\x02\xe9Hc\xec\xa1\xb6B\xa0" \
b"\x9dH%\xf8\xb3\x9d\xd0\xe8j\xf9HM\xa1\xc2\xba\x860B\xea" \
b"\x9d\xb3\x08l\x19\x0eH\xb3\x9df\xeb\x00\x06\xa2Z\xee\xa1" \
b"\x1b\x13\x87<\xd7\x19\xe6U\xbd"
2023-05-21 11:55:13 +02:00
__stored_str: str = ""
2021-09-12 05:58:24 +02:00
__token_provider: typing.Union[TokenProvider, None]
__user_attributes = {}
def __init__(self, inner: Inner, address: str) -> None:
self.__client = Session.create_client(inner.conf)
self.connection = Session.ConnectionHolder.create(address, None)
self.__inner = inner
self.__keys = DiffieHellman()
self.logger.info("Created new session! device_id: {}, ap: {}".format(
inner.device_id, address))
2021-09-12 05:58:24 +02:00
def api(self) -> ApiClient:
self.__wait_auth_lock()
if self.__api is None:
raise RuntimeError("Session isn't authenticated!")
return self.__api
def ap_welcome(self):
self.__wait_auth_lock()
if self.__ap_welcome is None:
raise RuntimeError("Session isn't authenticated!")
return self.__ap_welcome
def audio_key(self) -> AudioKeyManager:
self.__wait_auth_lock()
if self.__audio_key_manager is None:
raise RuntimeError("Session isn't authenticated!")
return self.__audio_key_manager
def authenticate(self,
credential: Authentication.LoginCredentials) -> None:
2021-09-12 05:58:24 +02:00
"""
Log in to Spotify
Args:
credential: Spotify account login information
"""
self.__authenticate_partial(credential, False)
with self.__auth_lock:
self.__mercury_client = MercuryClient(self)
self.__token_provider = TokenProvider(self)
self.__audio_key_manager = AudioKeyManager(self)
self.__channel_manager = ChannelManager(self)
self.__api = ApiClient(self)
self.__cdn_manager = CdnManager(self)
self.__content_feeder = PlayableContentFeeder(self)
self.__cache_manager = CacheManager(self)
2021-09-12 09:56:20 +02:00
self.__dealer_client = DealerClient(self)
2021-09-12 11:40:51 +02:00
self.__search = SearchManager(self)
2021-09-12 05:58:24 +02:00
self.__event_service = EventService(self)
self.__auth_lock_bool = False
self.__auth_lock.notify_all()
2021-09-12 09:56:20 +02:00
self.dealer().connect()
self.logger.info("Authenticated as {}!".format(
self.__ap_welcome.canonical_username))
2021-09-12 05:58:24 +02:00
self.mercury().interested_in("spotify:user:attributes:update", self)
self.dealer().add_message_listener(
self, ["hm://connect-state/v1/connect/logout"])
2021-09-12 05:58:24 +02:00
def cache(self) -> CacheManager:
self.__wait_auth_lock()
if self.__cache_manager is None:
raise RuntimeError("Session isn't authenticated!")
return self.__cache_manager
def cdn(self) -> CdnManager:
self.__wait_auth_lock()
if self.__cdn_manager is None:
raise RuntimeError("Session isn't authenticated!")
return self.__cdn_manager
def channel(self) -> ChannelManager:
self.__wait_auth_lock()
if self.__channel_manager is None:
raise RuntimeError("Session isn't authenticated!")
return self.__channel_manager
def client(self) -> requests.Session:
return self.__client
def close(self) -> None:
"""
Close instance
"""
self.logger.info("Closing session. device_id: {}".format(
self.__inner.device_id))
2021-09-12 05:58:24 +02:00
self.__closing = True
2021-09-12 09:56:20 +02:00
if self.__dealer_client is not None:
self.__dealer_client.close()
self.__dealer_client = None
2021-09-12 05:58:24 +02:00
if self.__audio_key_manager is not None:
self.__audio_key_manager = None
if self.__channel_manager is not None:
self.__channel_manager.close()
self.__channel_manager = None
if self.__event_service is not None:
self.__event_service.close()
self.__event_service = None
if self.__receiver is not None:
self.__receiver.stop()
self.__receiver = None
if self.__client is not None:
self.__client.close()
self.__client = None
if self.connection is not None:
self.connection.close()
self.connection = None
with self.__auth_lock:
self.__ap_welcome = None
self.cipher_pair = None
self.__closed = True
self.logger.info("Closed session. device_id: {}".format(
self.__inner.device_id))
2021-09-12 05:58:24 +02:00
def connect(self) -> None:
"""
Connect to the Spotify Server
"""
acc = Session.Accumulator()
# Send ClientHello
nonce = Random.get_random_bytes(0x10)
client_hello_proto = Keyexchange.ClientHello(
build_info=Version.standard_build_info(),
client_nonce=nonce,
cryptosuites_supported=[
Keyexchange.Cryptosuite.CRYPTO_SUITE_SHANNON
],
login_crypto_hello=Keyexchange.LoginCryptoHelloUnion(
diffie_hellman=Keyexchange.LoginCryptoDiffieHellmanHello(
gc=self.__keys.public_key_bytes(), server_keys_known=1), ),
2021-09-12 05:58:24 +02:00
padding=b"\x1e",
)
client_hello_bytes = client_hello_proto.SerializeToString()
self.connection.write(b"\x00\x04")
self.connection.write_int(2 + 4 + len(client_hello_bytes))
self.connection.write(client_hello_bytes)
self.connection.flush()
acc.write(b"\x00\x04")
acc.write_int(2 + 4 + len(client_hello_bytes))
acc.write(client_hello_bytes)
# Read APResponseMessage
ap_response_message_length = self.connection.read_int()
acc.write_int(ap_response_message_length)
ap_response_message_bytes = self.connection.read(
ap_response_message_length - 4)
2021-09-12 05:58:24 +02:00
acc.write(ap_response_message_bytes)
ap_response_message_proto = Keyexchange.APResponseMessage()
ap_response_message_proto.ParseFromString(ap_response_message_bytes)
shared_key = util.int_to_bytes(
self.__keys.compute_shared_key(
ap_response_message_proto.challenge.login_crypto_challenge.
diffie_hellman.gs))
2021-09-12 05:58:24 +02:00
# Check gs_signature
rsa = RSA.construct((int.from_bytes(self.__server_key, "big"), 65537))
pkcs1_v1_5 = PKCS1_v1_5.new(rsa)
sha1 = SHA1.new()
sha1.update(ap_response_message_proto.challenge.login_crypto_challenge.
diffie_hellman.gs)
2021-09-12 05:58:24 +02:00
if not pkcs1_v1_5.verify(
sha1, ap_response_message_proto.challenge.
login_crypto_challenge.diffie_hellman.gs_signature):
2021-09-12 05:58:24 +02:00
raise RuntimeError("Failed signature check!")
# Solve challenge
buffer = io.BytesIO()
for i in range(1, 6):
mac = HMAC.new(shared_key, digestmod=SHA1)
mac.update(acc.read())
mac.update(bytes([i]))
buffer.write(mac.digest())
buffer.seek(0)
mac = HMAC.new(buffer.read(20), digestmod=SHA1)
mac.update(acc.read())
challenge = mac.digest()
client_response_plaintext_proto = Keyexchange.ClientResponsePlaintext(
crypto_response=Keyexchange.CryptoResponseUnion(),
login_crypto_response=Keyexchange.LoginCryptoResponseUnion(
diffie_hellman=Keyexchange.LoginCryptoDiffieHellmanResponse(
hmac=challenge)),
2021-09-12 05:58:24 +02:00
pow_response=Keyexchange.PoWResponseUnion(),
)
client_response_plaintext_bytes = client_response_plaintext_proto.SerializeToString(
)
2021-09-12 05:58:24 +02:00
self.connection.write_int(4 + len(client_response_plaintext_bytes))
self.connection.write(client_response_plaintext_bytes)
self.connection.flush()
try:
self.connection.set_timeout(1)
scrap = self.connection.read(4)
if len(scrap) == 4:
payload = self.connection.read(
struct.unpack(">i", scrap)[0] - 4)
2021-09-12 05:58:24 +02:00
failed = Keyexchange.APResponseMessage()
failed.ParseFromString(payload)
raise RuntimeError(failed)
except socket.timeout:
pass
finally:
self.connection.set_timeout(0)
buffer.seek(20)
with self.__auth_lock:
self.cipher_pair = CipherPair(buffer.read(32), buffer.read(32))
self.__auth_lock_bool = True
self.logger.info("Connection successfully!")
def content_feeder(self) -> PlayableContentFeeder:
self.__wait_auth_lock()
if self.__content_feeder is None:
raise RuntimeError("Session isn't authenticated!")
return self.__content_feeder
@staticmethod
def create_client(conf: Configuration) -> requests.Session:
client = requests.Session()
return client
2021-09-12 09:56:20 +02:00
def dealer(self) -> DealerClient:
self.__wait_auth_lock()
if self.__dealer_client is None:
raise RuntimeError("Session isn't authenticated!")
return self.__dealer_client
2021-09-12 05:58:24 +02:00
def device_id(self) -> str:
return self.__inner.device_id
2021-09-14 14:05:52 +02:00
def device_name(self) -> str:
return self.__inner.device_name
def device_type(self) -> Connect.DeviceType:
return self.__inner.device_type
2021-09-12 09:56:20 +02:00
def event(self, resp: MercuryClient.Response) -> None:
if resp.uri == "spotify:user:attributes:update":
attributes_update = UserAttributesUpdate()
attributes_update.ParseFromString(resp.payload)
for pair in attributes_update.pairs_list:
self.__user_attributes[pair.key] = pair.value
self.logger.info("Updated user attribute: {} -> {}".format(
pair.key, pair.value))
2021-09-12 09:56:20 +02:00
2021-09-12 05:58:24 +02:00
def get_user_attribute(self, key: str, fallback: str = None) -> str:
return self.__user_attributes.get(key) if self.__user_attributes.get(
key) is not None else fallback
2021-09-12 05:58:24 +02:00
def is_valid(self) -> bool:
if self.__closed:
return False
self.__wait_auth_lock()
return self.__ap_welcome is not None and self.connection is not None
def mercury(self) -> MercuryClient:
self.__wait_auth_lock()
if self.__mercury_client is None:
raise RuntimeError("Session isn't authenticated!")
return self.__mercury_client
def on_message(self, uri: str, headers: typing.Dict[str, str],
payload: bytes):
2021-09-12 09:56:20 +02:00
if uri == "hm://connect-state/v1/connect/logout":
self.close()
2021-09-12 05:58:24 +02:00
def parse_product_info(self, data) -> None:
"""
Parse product information
Args:
data: Raw product information
"""
products = defusedxml.ElementTree.fromstring(data)
if products is None:
return
product = products[0]
if product is None:
return
for i in range(len(product)):
self.__user_attributes[product[i].tag] = product[i].text
self.logger.debug("Parsed product info: {}".format(
self.__user_attributes))
2021-09-12 05:58:24 +02:00
2021-09-12 11:40:51 +02:00
def preferred_locale(self) -> str:
return self.__inner.preferred_locale
2021-09-12 05:58:24 +02:00
def reconnect(self) -> None:
"""
Reconnect to the Spotify Server
"""
if self.connection is not None:
self.connection.close()
self.__receiver.stop()
self.connection = Session.ConnectionHolder.create(
ApResolver.get_random_accesspoint(), self.__inner.conf)
2021-09-12 05:58:24 +02:00
self.connect()
self.__authenticate_partial(
Authentication.LoginCredentials(
typ=self.__ap_welcome.reusable_auth_credentials_type,
username=self.__ap_welcome.canonical_username,
auth_data=self.__ap_welcome.reusable_auth_credentials,
),
True,
)
self.logger.info("Re-authenticated as {}!".format(
self.__ap_welcome.canonical_username))
2021-09-12 05:58:24 +02:00
def reconnecting(self) -> bool:
return not self.__closing and not self.__closed and self.connection is None
2021-09-12 11:40:51 +02:00
def search(self) -> SearchManager:
self.__wait_auth_lock()
if self.__search is None:
raise RuntimeError("Session isn't authenticated!")
return self.__search
2021-09-12 05:58:24 +02:00
def send(self, cmd: bytes, payload: bytes):
"""
Send data to socket using send_unchecked
Args:
cmd: Command
payload: Payload
"""
if self.__closing and self.connection is None:
self.logger.debug("Connection was broken while closing.")
return
if self.__closed:
raise RuntimeError("Session is closed!")
with self.__auth_lock:
if self.cipher_pair is None or self.__auth_lock_bool:
self.__auth_lock.wait()
self.__send_unchecked(cmd, payload)
def tokens(self) -> TokenProvider:
self.__wait_auth_lock()
if self.__token_provider is None:
raise RuntimeError("Session isn't authenticated!")
return self.__token_provider
2021-09-12 11:40:51 +02:00
def username(self):
return self.__ap_welcome.canonical_username
2023-05-21 11:55:13 +02:00
def stored(self):
return self.__stored_str
def __authenticate_partial(self,
credential: Authentication.LoginCredentials,
remove_lock: bool) -> None:
2021-09-12 05:58:24 +02:00
"""
Login to Spotify
Args:
credential: Spotify account login information
"""
if self.cipher_pair is None:
raise RuntimeError("Connection not established!")
client_response_encrypted_proto = Authentication.ClientResponseEncrypted(
login_credentials=credential,
system_info=Authentication.SystemInfo(
os=Authentication.Os.OS_UNKNOWN,
cpu_family=Authentication.CpuFamily.CPU_UNKNOWN,
system_information_string=Version.system_info_string(),
device_id=self.__inner.device_id,
),
version_string=Version.version_string(),
)
self.__send_unchecked(
Packet.Type.login,
client_response_encrypted_proto.SerializeToString())
2021-09-12 05:58:24 +02:00
packet = self.cipher_pair.receive_encoded(self.connection)
if packet.is_cmd(Packet.Type.ap_welcome):
self.__ap_welcome = Authentication.APWelcome()
self.__ap_welcome.ParseFromString(packet.payload)
self.__receiver = Session.Receiver(self)
bytes0x0f = Random.get_random_bytes(0x14)
self.__send_unchecked(Packet.Type.unknown_0x0f, bytes0x0f)
preferred_locale = io.BytesIO()
preferred_locale.write(b"\x00\x00\x10\x00\x02preferred-locale" +
self.__inner.preferred_locale.encode())
2021-09-12 05:58:24 +02:00
preferred_locale.seek(0)
self.__send_unchecked(Packet.Type.preferred_locale,
preferred_locale.read())
2021-09-12 05:58:24 +02:00
if remove_lock:
with self.__auth_lock:
self.__auth_lock_bool = False
self.__auth_lock.notify_all()
if self.__inner.conf.store_credentials:
reusable = self.__ap_welcome.reusable_auth_credentials
reusable_type = Authentication.AuthenticationType.Name(
self.__ap_welcome.reusable_auth_credentials_type)
if self.__inner.conf.stored_credentials_file is None:
raise TypeError(
"The file path to be saved is not specified")
2023-05-21 11:55:13 +02:00
self.__stored_str = base64.b64encode(json.dumps({
"username": self.__ap_welcome.canonical_username,
"credentials": base64.b64encode(reusable).decode(),
"type": reusable_type,
}).encode()).decode()
2021-09-12 05:58:24 +02:00
with open(self.__inner.conf.stored_credentials_file, "w") as f:
json.dump(
{
"username": self.__ap_welcome.canonical_username,
"credentials": base64.b64encode(reusable).decode(),
"type": reusable_type,
}, f)
2021-09-12 05:58:24 +02:00
elif packet.is_cmd(Packet.Type.auth_failure):
ap_login_failed = Keyexchange.APLoginFailed()
ap_login_failed.ParseFromString(packet.payload)
2021-11-21 13:57:57 +01:00
self.close()
2021-09-12 05:58:24 +02:00
raise Session.SpotifyAuthenticationException(ap_login_failed)
else:
raise RuntimeError("Unknown CMD 0x" + packet.cmd.hex())
def __send_unchecked(self, cmd: bytes, payload: bytes) -> None:
self.cipher_pair.send_encoded(self.connection, cmd, payload)
def __wait_auth_lock(self) -> None:
if self.__closing and self.connection is None:
self.logger.debug("Connection was broken while closing.")
return
if self.__closed:
raise RuntimeError("Session is closed!")
with self.__auth_lock:
if self.cipher_pair is None or self.__auth_lock_bool:
self.__auth_lock.wait()
class AbsBuilder:
conf = None
device_id = None
2021-09-14 12:29:47 +02:00
device_name = "librespot-python"
2021-09-12 05:58:24 +02:00
device_type = Connect.DeviceType.COMPUTER
preferred_locale = "en"
def __init__(self, conf: Session.Configuration = None):
if conf is None:
self.conf = Session.Configuration.Builder().build()
else:
self.conf = conf
def set_preferred_locale(self, locale: str) -> Session.AbsBuilder:
if len(locale) != 2:
raise TypeError("Invalid locale: {}".format(locale))
self.preferred_locale = locale
return self
def set_device_name(self, device_name: str) -> Session.AbsBuilder:
self.device_name = device_name
return self
def set_device_id(self, device_id: str) -> Session.AbsBuilder:
if self.device_id is not None and len(device_id) != 40:
raise TypeError("Device ID must be 40 chars long.")
self.device_id = device_id
return self
def set_device_type(
self, device_type: Connect.DeviceType) -> Session.AbsBuilder:
self.device_type = device_type
return self
class Accumulator:
2021-11-21 13:57:57 +01:00
__buffer: io.BytesIO
def __init__(self):
self.__buffer = io.BytesIO()
2021-09-12 05:58:24 +02:00
def read(self) -> bytes:
"""
Read all buffer
Returns:
All buffer
"""
pos = self.__buffer.tell()
self.__buffer.seek(0)
data = self.__buffer.read()
self.__buffer.seek(pos)
return data
def write(self, data: bytes) -> None:
"""
Write data to buffer
Args:
data: Bytes to be written
"""
self.__buffer.write(data)
def write_int(self, data: int) -> None:
"""
Write data to buffer
Args:
data: Integer to be written
"""
self.write(struct.pack(">i", data))
def write_short(self, data: int) -> None:
"""
Write data to buffer
Args:
data: Short integer to be written
"""
self.write(struct.pack(">h", data))
class Builder(AbsBuilder):
login_credentials: Authentication.LoginCredentials = None
def blob(self, username: str, blob: bytes) -> Session.Builder:
if self.device_id is None:
raise TypeError("You must specify the device ID first.")
self.login_credentials = self.decrypt_blob(self.device_id,
username, blob)
return self
def decrypt_blob(
self, device_id: str, username: str,
encrypted_blob: bytes) -> Authentication.LoginCredentials:
encrypted_blob = base64.b64decode(encrypted_blob)
sha1 = SHA1.new()
sha1.update(device_id.encode())
secret = sha1.digest()
base_key = PBKDF2(secret,
username.encode(),
20,
0x100,
hmac_hash_module=SHA1)
2021-09-14 12:23:10 +02:00
sha1 = SHA1.new()
sha1.update(base_key)
key = sha1.digest() + b"\x00\x00\x00\x14"
aes = AES.new(key, AES.MODE_ECB)
decrypted_blob = bytearray(aes.decrypt(encrypted_blob))
l = len(decrypted_blob)
for i in range(0, l - 0x10):
decrypted_blob[l - i - 1] ^= decrypted_blob[l - i - 0x11]
blob = io.BytesIO(decrypted_blob)
blob.read(1)
le = self.read_blob_int(blob)
blob.read(le)
blob.read(1)
type_int = self.read_blob_int(blob)
type_ = Authentication.AuthenticationType.Name(type_int)
if type_ is None:
raise IOError(
TypeError(
"Unknown AuthenticationType: {}".format(type_int)))
2021-09-14 12:23:10 +02:00
blob.read(1)
l = self.read_blob_int(blob)
auth_data = blob.read(l)
return Authentication.LoginCredentials(
auth_data=auth_data,
typ=type_,
username=username,
)
def read_blob_int(self, buffer: io.BytesIO) -> int:
lo = buffer.read(1)
if (int(lo[0]) & 0x80) == 0:
return int(lo[0])
hi = buffer.read(1)
return int(lo[0]) & 0x7f | int(hi[0]) << 7
2023-05-21 11:55:13 +02:00
def stored(self, stored_credentials_str: str):
2021-09-12 05:58:24 +02:00
"""
2023-05-21 11:55:13 +02:00
Create credential from stored string
Args:
stored_credentials: credential string
Returns:
Builder
2021-09-12 05:58:24 +02:00
"""
2023-05-21 11:55:13 +02:00
try:
obj = json.loads(base64.b64decode(stored_credentials_str))
except binascii.Error:
pass
except json.JSONDecodeError:
pass
else:
try:
self.login_credentials = Authentication.LoginCredentials(
typ=Authentication.AuthenticationType.Value(
obj["type"]),
username=obj["username"],
auth_data=base64.b64decode(obj["credentials"]),
)
except KeyError:
pass
return self
2021-09-12 05:58:24 +02:00
def stored_file(self,
stored_credentials: str = None) -> Session.Builder:
2021-09-12 05:58:24 +02:00
"""
Create credential from stored file
Args:
stored_credentials: credential file path
Returns:
Builder
"""
if stored_credentials is None:
stored_credentials = self.conf.stored_credentials_file
if os.path.isfile(stored_credentials):
try:
with open(stored_credentials) as f:
obj = json.load(f)
except json.JSONDecodeError:
pass
else:
try:
self.login_credentials = Authentication.LoginCredentials(
typ=Authentication.AuthenticationType.Value(
obj["type"]),
2021-09-12 05:58:24 +02:00
username=obj["username"],
auth_data=base64.b64decode(obj["credentials"]),
)
except KeyError:
pass
return self
def user_pass(self, username: str, password: str) -> Session.Builder:
"""
Create credential from username and password
Args:
username: Spotify's account username
password: Spotify's account password
Returns:
Builder
"""
self.login_credentials = Authentication.LoginCredentials(
username=username,
typ=Authentication.AuthenticationType.AUTHENTICATION_USER_PASS,
auth_data=password.encode(),
)
return self
def create(self) -> Session:
"""
Create the Session instance
Returns:
Session instance
"""
if self.login_credentials is None:
raise RuntimeError("You must select an authentication method.")
session = Session(
Session.Inner(
self.device_type,
self.device_name,
self.preferred_locale,
self.conf,
self.device_id,
),
ApResolver.get_random_accesspoint(),
)
session.connect()
session.authenticate(self.login_credentials)
return session
class Configuration:
# Proxy
# proxyEnabled: bool
# proxyType: Proxy.Type
# proxyAddress: str
# proxyPort: int
# proxyAuth: bool
# proxyUsername: str
# proxyPassword: str
# Cache
cache_enabled: bool
cache_dir: str
do_cache_clean_up: bool
# Stored credentials
store_credentials: bool
stored_credentials_file: str
# Fetching
retry_on_chunk_error: bool
def __init__(
self,
# proxy_enabled: bool,
# proxy_type: Proxy.Type,
# proxy_address: str,
# proxy_port: int,
# proxy_auth: bool,
# proxy_username: str,
# proxy_password: str,
cache_enabled: bool,
cache_dir: str,
do_cache_clean_up: bool,
store_credentials: bool,
stored_credentials_file: str,
retry_on_chunk_error: bool,
2021-09-12 05:58:24 +02:00
):
# self.proxyEnabled = proxy_enabled
# self.proxyType = proxy_type
# self.proxyAddress = proxy_address
# self.proxyPort = proxy_port
# self.proxyAuth = proxy_auth
# self.proxyUsername = proxy_username
# self.proxyPassword = proxy_password
self.cache_enabled = cache_enabled
self.cache_dir = cache_dir
self.do_cache_clean_up = do_cache_clean_up
self.store_credentials = store_credentials
self.stored_credentials_file = stored_credentials_file
self.retry_on_chunk_error = retry_on_chunk_error
class Builder:
# Proxy
# proxyEnabled: bool = False
# proxyType: Proxy.Type = Proxy.Type.DIRECT
# proxyAddress: str = None
# proxyPort: int = None
# proxyAuth: bool = None
# proxyUsername: str = None
# proxyPassword: str = None
# Cache
cache_enabled: bool = True
cache_dir: str = os.path.join(os.getcwd(), "cache")
do_cache_clean_up: bool = True
# Stored credentials
store_credentials: bool = True
stored_credentials_file: str = os.path.join(
os.getcwd(), "credentials.json")
2021-09-12 05:58:24 +02:00
# Fetching
retry_on_chunk_error: bool = True
# def set_proxy_enabled(
# self,
# proxy_enabled: bool) -> Session.Configuration.Builder:
# self.proxyEnabled = proxy_enabled
# return self
# def set_proxy_type(
# self,
# proxy_type: Proxy.Type) -> Session.Configuration.Builder:
# self.proxyType = proxy_type
# return self
# def set_proxy_address(
# self, proxy_address: str) -> Session.Configuration.Builder:
# self.proxyAddress = proxy_address
# return self
# def set_proxy_auth(
# self, proxy_auth: bool) -> Session.Configuration.Builder:
# self.proxyAuth = proxy_auth
# return self
# def set_proxy_username(
# self,
# proxy_username: str) -> Session.Configuration.Builder:
# self.proxyUsername = proxy_username
# return self
# def set_proxy_password(
# self,
# proxy_password: str) -> Session.Configuration.Builder:
# self.proxyPassword = proxy_password
# return self
def set_cache_enabled(
self,
cache_enabled: bool) -> Session.Configuration.Builder:
2021-09-12 05:58:24 +02:00
"""
Set cache_enabled
Args:
cache_enabled: Cache enabled
Returns:
Builder
"""
self.cache_enabled = cache_enabled
return self
def set_cache_dir(self,
cache_dir: str) -> Session.Configuration.Builder:
2021-09-12 05:58:24 +02:00
"""
Set cache_dir
Args:
cache_dir: Cache directory
Returns:
Builder
"""
self.cache_dir = cache_dir
return self
def set_do_cache_clean_up(
self,
do_cache_clean_up: bool) -> Session.Configuration.Builder:
2021-09-12 05:58:24 +02:00
"""
Set do_cache_clean_up
Args:
do_cache_clean_up: Do cache clean up
Returns:
Builder
"""
self.do_cache_clean_up = do_cache_clean_up
return self
def set_store_credentials(
self,
store_credentials: bool) -> Session.Configuration.Builder:
2021-09-12 05:58:24 +02:00
"""
Set store_credentials
Args:
store_credentials: Store credentials
Returns:
Builder
"""
self.store_credentials = store_credentials
return self
def set_stored_credential_file(
self, stored_credential_file: str
) -> Session.Configuration.Builder:
2021-09-12 05:58:24 +02:00
"""
Set stored_credential_file
Args:
stored_credential_file: Stored credential file
Returns:
Builder
"""
self.stored_credentials_file = stored_credential_file
return self
def set_retry_on_chunk_error(
self, retry_on_chunk_error: bool
) -> Session.Configuration.Builder:
2021-09-12 05:58:24 +02:00
"""
Set retry_on_chunk_error
Args:
retry_on_chunk_error: Retry on chunk error
Returns:
Builder
"""
self.retry_on_chunk_error = retry_on_chunk_error
return self
def build(self) -> Session.Configuration:
"""
Build Configuration instance
Returns:
Session.Configuration
"""
return Session.Configuration(
# self.proxyEnabled,
# self.proxyType,
# self.proxyAddress,
# self.proxyPort,
# self.proxyAuth,
# self.proxyUsername,
# self.proxyPassword,
self.cache_enabled,
self.cache_dir,
self.do_cache_clean_up,
self.store_credentials,
self.stored_credentials_file,
self.retry_on_chunk_error,
)
class ConnectionHolder:
2021-11-21 13:57:57 +01:00
__buffer: io.BytesIO
2021-09-12 05:58:24 +02:00
__socket: socket.socket
def __init__(self, sock: socket.socket):
2021-11-21 13:57:57 +01:00
self.__buffer = io.BytesIO()
2021-09-12 05:58:24 +02:00
self.__socket = sock
@staticmethod
def create(address: str, conf) \
-> Session.ConnectionHolder:
"""
Create the ConnectionHolder instance
Args:
address: Address to connect
conf: Configuration
Returns:
ConnectionHolder instance
"""
ap_address = address.split(":")[0]
ap_port = int(address.split(":")[1])
sock = socket.socket()
sock.connect((ap_address, ap_port))
return Session.ConnectionHolder(sock)
def close(self) -> None:
"""
Close the connection
"""
self.__socket.close()
def flush(self) -> None:
"""
Flush data to socket
"""
2022-10-21 00:11:44 +02:00
try:
self.__buffer.seek(0)
self.__socket.send(self.__buffer.read())
self.__buffer = io.BytesIO()
except BrokenPipeError:
pass
2021-09-12 05:58:24 +02:00
def read(self, length: int) -> bytes:
"""
Read data from socket
Args:
length: Reading length
Returns:
Bytes data from socket
"""
return self.__socket.recv(length)
def read_int(self) -> int:
"""
Read integer from socket
Returns:
integer from socket
"""
return struct.unpack(">i", self.read(4))[0]
def read_short(self) -> int:
"""
Read short integer from socket
Returns:
short integer from socket
"""
return struct.unpack(">h", self.read(2))[0]
def set_timeout(self, seconds: float) -> None:
"""
Set socket's timeout
Args:
seconds: Number of seconds until timeout
"""
self.__socket.settimeout(None if seconds == 0 else seconds)
def write(self, data: bytes) -> None:
"""
Write data to buffer
Args:
data: Bytes to be written
"""
self.__buffer.write(data)
def write_int(self, data: int) -> None:
"""
Write data to buffer
Args:
data: Integer to be written
"""
self.write(struct.pack(">i", data))
def write_short(self, data: int) -> None:
"""
Write data to buffer
Args:
data: Short integer to be written
"""
self.write(struct.pack(">h", data))
class Inner:
device_type: Connect.DeviceType = None
device_name: str
device_id: str
conf = None
preferred_locale: str
def __init__(
self,
device_type: Connect.DeviceType,
device_name: str,
preferred_locale: str,
conf: Session.Configuration,
device_id: str = None,
2021-09-12 05:58:24 +02:00
):
self.preferred_locale = preferred_locale
self.conf = conf
self.device_type = device_type
self.device_name = device_name
self.device_id = (device_id if device_id is not None else
util.random_hex_string(40))
2021-09-12 05:58:24 +02:00
class Receiver:
__session: Session
__thread: threading.Thread
__running: bool = True
def __init__(self, session):
self.__session = session
self.__thread = threading.Thread(target=self.run)
2022-10-21 00:11:44 +02:00
self.__thread.daemon = True
self.__thread.name = "session-packet-receiver"
2021-09-12 05:58:24 +02:00
self.__thread.start()
def stop(self) -> None:
self.__running = False
def run(self) -> None:
"""
Receive Packet thread function
"""
self.__session.logger.info("Session.Receiver started")
while self.__running:
packet: Packet
cmd: bytes
try:
packet = self.__session.cipher_pair.receive_encoded(
self.__session.connection)
2021-09-12 05:58:24 +02:00
cmd = Packet.Type.parse(packet.cmd)
if cmd is None:
self.__session.logger.info(
"Skipping unknown command cmd: 0x{}, payload: {}".
format(util.bytes_to_hex(packet.cmd),
packet.payload))
2021-09-12 05:58:24 +02:00
continue
2023-04-13 00:01:17 +02:00
except (RuntimeError, ConnectionResetError) as ex:
2021-09-12 05:58:24 +02:00
if self.__running:
self.__session.logger.fatal(
"Failed reading packet! {}".format(ex))
2021-09-12 05:58:24 +02:00
self.__session.reconnect()
break
if not self.__running:
break
if cmd == Packet.Type.ping:
if self.__session.scheduled_reconnect is not None:
self.__session.scheduler.cancel(
self.__session.scheduled_reconnect)
2021-09-12 05:58:24 +02:00
def anonymous():
self.__session.logger.warning(
"Socket timed out. Reconnecting...")
2021-09-12 05:58:24 +02:00
self.__session.reconnect()
self.__session.scheduled_reconnect = self.__session.scheduler.enter(
2 * 60 + 5, 1, anonymous)
2021-09-12 05:58:24 +02:00
self.__session.send(Packet.Type.pong, packet.payload)
elif cmd == Packet.Type.pong_ack:
continue
elif cmd == Packet.Type.country_code:
2021-09-12 11:40:51 +02:00
self.__session.__country_code = packet.payload.decode()
self.__session.logger.info(
"Received country_code: {}".format(
2021-09-12 11:40:51 +02:00
self.__session.__country_code))
2021-09-12 05:58:24 +02:00
elif cmd == Packet.Type.license_version:
license_version = io.BytesIO(packet.payload)
license_id = struct.unpack(">h",
license_version.read(2))[0]
2021-09-12 05:58:24 +02:00
if license_id != 0:
buffer = license_version.read()
self.__session.logger.info(
"Received license_version: {}, {}".format(
license_id, buffer.decode()))
2021-09-12 05:58:24 +02:00
else:
self.__session.logger.info(
"Received license_version: {}".format(license_id))
2021-09-12 05:58:24 +02:00
elif cmd == Packet.Type.unknown_0x10:
self.__session.logger.debug("Received 0x10: {}".format(
util.bytes_to_hex(packet.payload)))
2021-09-12 05:58:24 +02:00
elif cmd in [
Packet.Type.mercury_sub, Packet.Type.mercury_unsub,
Packet.Type.mercury_event, Packet.Type.mercury_req
2021-09-12 05:58:24 +02:00
]:
self.__session.mercury().dispatch(packet)
elif cmd in [Packet.Type.aes_key, Packet.Type.aes_key_error]:
self.__session.audio_key().dispatch(packet)
elif cmd in [
Packet.Type.channel_error, Packet.Type.stream_chunk_res
2021-09-12 05:58:24 +02:00
]:
self.__session.channel().dispatch(packet)
elif cmd == Packet.Type.product_info:
self.__session.parse_product_info(packet.payload)
else:
self.__session.logger.info("Skipping {}".format(
util.bytes_to_hex(cmd)))
2021-09-12 05:58:24 +02:00
class SpotifyAuthenticationException(Exception):
def __init__(self, login_failed: Keyexchange.APLoginFailed):
super().__init__(
Keyexchange.ErrorCode.Name(login_failed.error_code))
2021-09-12 05:58:24 +02:00
2021-09-12 11:40:51 +02:00
class SearchManager:
base_url = "hm://searchview/km/v4/search/"
__session: Session
def __init__(self, session: Session):
self.__session = session
def request(self, request: SearchRequest) -> typing.Any:
if request.get_username() == "":
request.set_username(self.__session.username())
if request.get_country() == "":
request.set_country(self.__session.country_code)
if request.get_locale() == "":
request.set_locale(self.__session.preferred_locale())
response = self.__session.mercury().send_sync(
RawMercuryRequest.new_builder().set_method("GET").set_uri(
request.build_url()).build())
2021-09-12 11:40:51 +02:00
if response.status_code != 200:
raise SearchManager.SearchException(response.status_code)
return json.loads(response.payload)
class SearchException(Exception):
def __init__(self, status_code: int):
super().__init__("Search failed with code {}.".format(status_code))
class SearchRequest:
query: typing.Final[str]
__catalogue = ""
__country = ""
__image_size = ""
__limit = 10
__locale = ""
__username = ""
def __init__(self, query: str):
self.query = query
if query == "":
raise TypeError
def build_url(self) -> str:
url = SearchManager.base_url + urllib.parse.quote(self.query)
url += "?entityVersion=2"
url += "&catalogue=" + urllib.parse.quote(self.__catalogue)
url += "&country=" + urllib.parse.quote(self.__country)
url += "&imageSize=" + urllib.parse.quote(self.__image_size)
url += "&limit=" + str(self.__limit)
url += "&locale=" + urllib.parse.quote(self.__locale)
url += "&username=" + urllib.parse.quote(self.__username)
return url
def get_catalogue(self) -> str:
return self.__catalogue
def get_country(self) -> str:
return self.__country
def get_image_size(self) -> str:
return self.__image_size
def get_limit(self) -> int:
return self.__limit
def get_locale(self) -> str:
return self.__locale
def get_username(self) -> str:
return self.__username
def set_catalogue(self, catalogue: str) -> SearchManager.SearchRequest:
self.__catalogue = catalogue
return self
def set_country(self, country: str) -> SearchManager.SearchRequest:
self.__country = country
return self
def set_image_size(self,
image_size: str) -> SearchManager.SearchRequest:
2021-09-12 11:40:51 +02:00
self.__image_size = image_size
return self
def set_limit(self, limit: int) -> SearchManager.SearchRequest:
self.__limit = limit
return self
def set_locale(self, locale: str) -> SearchManager.SearchRequest:
self.__locale = locale
return self
def set_username(self, username: str) -> SearchManager.SearchRequest:
self.__username = username
return self
2021-09-12 05:58:24 +02:00
class TokenProvider:
logger = logging.getLogger("Librespot:TokenProvider")
token_expire_threshold = 10
__session: Session
__tokens: typing.List[StoredToken] = []
def __init__(self, session: Session):
self._session = session
def find_token_with_all_scopes(
self, scopes: typing.List[str]) -> typing.Union[StoredToken, None]:
2021-09-12 05:58:24 +02:00
for token in self.__tokens:
if token.has_scopes(scopes):
return token
return None
def get(self, scope: str) -> str:
return self.get_token(scope).access_token
def get_token(self, *scopes) -> StoredToken:
scopes = list(scopes)
if len(scopes) == 0:
raise RuntimeError("The token doesn't have any scope")
token = self.find_token_with_all_scopes(scopes)
if token is not None:
if token.expired():
self.__tokens.remove(token)
else:
return token
self.logger.debug(
"Token expired or not suitable, requesting again. scopes: {}, old_token: {}"
.format(scopes, token))
2021-09-12 05:58:24 +02:00
response = self._session.mercury().send_sync_json(
MercuryRequests.request_token(self._session.device_id(),
",".join(scopes)))
2021-09-12 05:58:24 +02:00
token = TokenProvider.StoredToken(response)
self.logger.debug(
"Updated token successfully! scopes: {}, new_token: {}".format(
scopes, token))
2021-09-12 05:58:24 +02:00
self.__tokens.append(token)
return token
class StoredToken:
expires_in: int
access_token: str
scopes: typing.List[str]
timestamp: int
def __init__(self, obj):
self.timestamp = int(time.time_ns() / 1000)
self.expires_in = obj["expiresIn"]
self.access_token = obj["accessToken"]
self.scopes = obj["scope"]
def expired(self) -> bool:
return (
self.timestamp +
(self.expires_in - TokenProvider.token_expire_threshold) * 1000
< int(time.time_ns() / 1000))
2021-09-12 05:58:24 +02:00
def has_scope(self, scope: str) -> bool:
for s in self.scopes:
if s == scope:
return True
return False
def has_scopes(self, sc: typing.List[str]) -> bool:
for s in sc:
if not self.has_scope(s):
return False
return True