mirror of
https://github.com/kokarare1212/librespot-python.git
synced 2024-10-03 17:56:49 +02:00
85 lines
2.6 KiB
Python
85 lines
2.6 KiB
Python
from __future__ import annotations
|
|
from librespot.core import Session, TimeProvider
|
|
from librespot.mercury import MercuryRequests
|
|
import logging
|
|
|
|
|
|
class TokenProvider:
|
|
_LOGGER: logging = logging.getLogger(__name__)
|
|
_TOKEN_EXPIRE_THRESHOLD = 10
|
|
_session: Session = None
|
|
_tokens: list[TokenProvider.StoredToken] = []
|
|
|
|
def __init__(self, session: Session):
|
|
self._session = session
|
|
|
|
def find_token_with_all_scopes(
|
|
self, scopes: list[str]) -> TokenProvider.StoredToken:
|
|
for token in self._tokens:
|
|
if token.has_scopes(scopes):
|
|
return token
|
|
|
|
# noinspection PyTypeChecker
|
|
return None
|
|
|
|
def get_token(self, *scopes) -> TokenProvider.StoredToken:
|
|
scopes = list(scopes)
|
|
if len(scopes) == 0:
|
|
raise RuntimeError()
|
|
|
|
token = self.find_token_with_all_scopes(scopes)
|
|
if token is not None:
|
|
if token.expired():
|
|
self._tokens.remove(token)
|
|
else:
|
|
return token
|
|
|
|
self._LOGGER.debug(
|
|
"Token expired or not suitable, requesting again. scopes: {}, old_token: {}"
|
|
.format(scopes, token))
|
|
resp = self._session.mercury().send_sync_json(
|
|
MercuryRequests.request_token(self._session.device_id(),
|
|
",".join(scopes)))
|
|
token = TokenProvider.StoredToken(resp)
|
|
|
|
self._LOGGER.debug(
|
|
"Updated token successfully! scopes: {}, new_token: {}".format(
|
|
scopes, token))
|
|
self._tokens.append(token)
|
|
|
|
return token
|
|
|
|
def get(self, scope: str) -> str:
|
|
return self.get_token(scope).access_token
|
|
|
|
class StoredToken:
|
|
expires_in: int
|
|
access_token: str
|
|
scopes: list[str]
|
|
timestamp: int
|
|
|
|
def __init__(self, obj):
|
|
self.timestamp = TimeProvider.TimeProvider().current_time_millis()
|
|
self.expires_in = obj["expiresIn"]
|
|
self.access_token = obj["accessToken"]
|
|
self.scopes = obj["scope"]
|
|
|
|
def expired(self) -> bool:
|
|
return self.timestamp + (
|
|
self.expires_in - TokenProvider._TOKEN_EXPIRE_THRESHOLD
|
|
) * 1000 < TimeProvider.TimeProvider().current_time_millis()
|
|
|
|
def has_scope(self, scope: str) -> bool:
|
|
for s in self.scopes:
|
|
if s == scope:
|
|
return True
|
|
|
|
return False
|
|
|
|
def has_scopes(self, sc: list[str]) -> bool:
|
|
for s in sc:
|
|
if not self.has_scope(s):
|
|
return False
|
|
|
|
return True
|