Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 44 additions & 58 deletions librespot/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -910,8 +910,6 @@ class Session(Closeable, MessageListener, SubListener):
__dealer_client: typing.Union[DealerClient, None] = None
__event_service: typing.Union[EventService, None] = None
__keys: DiffieHellman
__login5_access_token: typing.Union[str, None] = None
__login5_token_expiry: typing.Union[int, None] = None
__mercury_client: MercuryClient
__receiver: typing.Union[Receiver, None] = None
__search: typing.Union[SearchManager, None]
Expand Down Expand Up @@ -972,7 +970,6 @@ def authenticate(self,

"""
self.__authenticate_partial(credential, False)
self.__authenticate_login5()
with self.__auth_lock:
self.__mercury_client = MercuryClient(self)
self.__token_provider = TokenProvider(self)
Expand Down Expand Up @@ -1208,13 +1205,6 @@ def is_valid(self) -> bool:
self.__wait_auth_lock()
return self.__ap_welcome is not None and self.connection is not None

def login5(self) -> tuple[str, int]:
""" """
self.__wait_auth_lock()
if self.__login5_access_token is None or self.__login5_token_expiry is None:
raise RuntimeError("Session isn't authenticated!")
return self.__login5_access_token, self.__login5_token_expiry

def mercury(self) -> MercuryClient:
""" """
self.__wait_auth_lock()
Expand Down Expand Up @@ -1394,43 +1384,6 @@ def __authenticate_partial(self,
else:
raise RuntimeError("Unknown CMD 0x" + packet.cmd.hex())

def __authenticate_login5(self) -> None:
"""Authenticate using Login5 to get access token"""
login5_request = Login5.LoginRequest()
login5_request.client_info.client_id = MercuryRequests.keymaster_client_id
login5_request.client_info.device_id = self.__inner.device_id

# Set stored credential from APWelcome
if hasattr(self, '_Session__ap_welcome') and self.__ap_welcome:
stored_cred = Login5Credentials.StoredCredential()
stored_cred.username = self.__ap_welcome.canonical_username
stored_cred.data = self.__ap_welcome.reusable_auth_credentials
login5_request.stored_credential.CopyFrom(stored_cred)

response = requests.post(
"https://login5.spotify.com/v3/login",
data=login5_request.SerializeToString(),
headers={
"Content-Type": "application/x-protobuf",
"Accept": "application/x-protobuf"
}
)

if response.status_code == 200:
login5_response = Login5.LoginResponse()
login5_response.ParseFromString(response.content)

if login5_response.HasField('ok'):
self.__login5_access_token = login5_response.ok.access_token
self.__login5_token_expiry = int(time.time()) + login5_response.ok.access_token_expires_in
self.logger.info("Login5 authentication successful, got access token")
else:
self.logger.warning("Login5 authentication failed: {}".format(login5_response.error))
else:
self.logger.warning("Login5 request failed with status: {}".format(response.status_code))
else:
self.logger.error("Login5 authentication failed: No APWelcome found")

def __send_unchecked(self, cmd: bytes, payload: bytes) -> None:
self.cipher_pair.send_encoded(self.connection, cmd, payload)

Expand Down Expand Up @@ -2343,22 +2296,55 @@ def get_token(self, *scopes) -> StoredToken:
if token is not None:
if token.expired():
self.__tokens.remove(token)
self.logger.debug("Login5 token expired, need to re-authenticate")
else:
return token

login5_token = None
login5_access_token, login5_token_expiry = self.__session.login5()
if int(time.time()) < login5_token_expiry - 60: # 60 second buffer
login5_token = TokenProvider.StoredToken({
"expiresIn": login5_token_expiry - int(time.time()),
"accessToken": login5_access_token,
"scope": scopes
})
self.__tokens.append(login5_token)
token = self.login5(scopes)
if token is not None:
self.__tokens.append(token)
self.logger.debug("Using Login5 access token for scopes: {}".format(scopes))
return token

def login5(self, scopes: typing.List[str]) -> typing.Union[StoredToken, None]:
"""Submit Login5 request for a fresh access token"""

if self.__session.ap_welcome():
login5_request = Login5.LoginRequest()
login5_request.client_info.client_id = MercuryRequests.keymaster_client_id
login5_request.client_info.device_id = self.__session.device_id()

stored_cred = Login5Credentials.StoredCredential()
stored_cred.username = self.__session.username()
stored_cred.data = self.__session.ap_welcome().reusable_auth_credentials
login5_request.stored_credential.CopyFrom(stored_cred)

response = requests.post(
"https://login5.spotify.com/v3/login",
data=login5_request.SerializeToString(),
headers={
"Content-Type": "application/x-protobuf",
"Accept": "application/x-protobuf"
})

if response.status_code == 200:
login5_response = Login5.LoginResponse()
login5_response.ParseFromString(response.content)

if login5_response.HasField('ok'):
self.logger.info("Login5 authentication successful, got access token".format(login5_response.ok.access_token))
token = TokenProvider.StoredToken({
"expiresIn": login5_response.ok.access_token_expires_in, # approximately one hour
"accessToken": login5_response.ok.access_token,
"scope": scopes
})
return token
else:
self.logger.warning("Login5 authentication failed: {}".format(login5_response.error))
else:
self.logger.warning("Login5 request failed with status: {}".format(response.status_code))
else:
self.logger.debug("Login5 token expired, need to re-authenticate")
return login5_token
self.logger.error("Login5 authentication failed: No APWelcome found")

class StoredToken:
""" """
Expand Down
2 changes: 1 addition & 1 deletion librespot/proto/spotify/login5/v3/Login5_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading