diff --git a/.gitignore b/.gitignore
index 7b1899b..0b54e6f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -84,4 +84,6 @@ prof/
*.csv
# Json session files
-tidal*.json
\ No newline at end of file
+tidal*.json
+*.m3u8
+*.mpd
\ No newline at end of file
diff --git a/README.rst b/README.rst
index 2f90d95..6e84d17 100644
--- a/README.rst
+++ b/README.rst
@@ -21,6 +21,21 @@ Install from `PyPI `_ using ``pip``:
$ pip install tidalapi
+GStreamer
+------------
+
+Playback of certain audio qualities
+Certain streaming qualities require gstreamer bad-plugins, e.g.:
+```
+sudo apt-get install gstreamer1.0-plugins-bad
+```
+This is mandatory to be able to play m4a streams and for playback of mpegdash or hls streams. Otherwise, you will likely get an error:
+```
+WARNING [MainThread] mopidy.audio.actor Could not find a application/x-hls decoder to handle media.
+WARNING [MainThread] mopidy.audio.gst GStreamer warning: No decoder available for type 'application/x-hls'.
+ERROR [MainThread] mopidy.audio.gst GStreamer error: Your GStreamer installation is missing a plug-in.
+```
+
Usage
-------------
diff --git a/examples/pkce_login.py b/examples/pkce_login.py
index d3a8f0e..4ccd2f5 100644
--- a/examples/pkce_login.py
+++ b/examples/pkce_login.py
@@ -15,7 +15,7 @@
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see .
#
-"""simple.py: A simple example script that describes how to get started using tidalapi"""
+"""pkce_login.py: A simple example script that describes how to use PKCE login and MPEG-DASH streams"""
import tidalapi
from tidalapi import Quality
@@ -29,18 +29,40 @@
# Override the required playback quality, if necessary
# Note: Set the quality according to your subscription.
-# Low: Quality.low_96k
-# Normal: Quality.low_320k
-# HiFi: Quality.high_lossless
-# HiFi+ Quality.hi_res_lossless
+# Low: Quality.low_96k (m4a 96k)
+# Normal: Quality.low_320k (m4a 320k)
+# HiFi: Quality.high_lossless (FLAC)
+# HiFi+ Quality.hi_res (FLAC MQA)
+# HiFi+ Quality.hi_res_lossless (FLAC HI_RES)
session.audio_quality = Quality.hi_res_lossless.value
-
-album = session.album("110827651") # Let's Rock // The Black Keys
+#album_id = "77640617" # U2 / Achtung Baby (Max quality: HI_RES MQA, 16bit/44100Hz)
+#album_id = "110827651" # The Black Keys / Let's Rock (Max quality: LOSSLESS FLAC, 24bit/48000Hz)
+album_id = "77646169" # Beck / Sea Change (Max quality: HI_RES_LOSSLESS FLAC, 24bit/192000Hz)
+album = session.album(album_id)
tracks = album.tracks()
# list album tracks
for track in tracks:
- print(track.name)
- # MPEG-DASH Stream is only supported when HiRes mode is used!
- print(track.get_stream())
- for artist in track.artists:
- print(' by: ', artist.name)
\ No newline at end of file
+ print("{}: '{}' by '{}'".format(track.id, track.name, track.artist.name))
+ stream = track.get_stream()
+ print("MimeType:{}".format(stream.manifest_mime_type))
+
+ manifest = stream.get_stream_manifest()
+ print("track:{}, (quality:{}, codec:{}, {}bit/{}Hz)".format(track.id,
+ stream.audio_quality,
+ manifest.get_codecs(),
+ stream.bit_depth,
+ stream.sample_rate))
+ if stream.is_MPD:
+ # HI_RES_LOSSLESS quality supported when using MPEG-DASH stream (PKCE only!)
+ # 1. Export as MPD manifest
+ mpd = stream.get_manifest_data()
+ # 2. Export as HLS m3u8 playlist
+ hls = manifest.get_hls()
+ # with open("{}_{}.mpd".format(album_id, track.id), "w") as my_file:
+ # my_file.write(mpd)
+ # with open("{}_{}.m3u8".format(album_id, track.id), "w") as my_file:
+ # my_file.write(hls)
+ elif stream.is_BTS:
+ # Direct URL (m4a or flac) is available for Quality < HI_RES_LOSSLESS
+ url = manifest.get_urls()
+ break
\ No newline at end of file
diff --git a/pyproject.toml b/pyproject.toml
index 396cde1..0cd3746 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -1,6 +1,6 @@
[tool.poetry]
name = "tidalapi"
-version = "0.7.4"
+version = "0.7.5"
description = "Unofficial API for TIDAL music streaming service."
authors = ["Thomas Amland "]
maintainers = ["tehkillerbee "]
diff --git a/tests/test_album.py b/tests/test_album.py
index 6bce38d..a783444 100644
--- a/tests/test_album.py
+++ b/tests/test_album.py
@@ -24,6 +24,7 @@
import tidalapi
from tidalapi.album import Album
+from tidalapi.exceptions import MetadataNotAvailable, ObjectNotFound
from .cover import verify_image_cover, verify_video_cover
@@ -110,12 +111,19 @@ def test_default_image_used_if_no_cover_art(mocker):
def test_similar(session):
album = session.album(108043414)
for alb in album.similar():
- if alb.id == 64522277:
- # Album with no similar albums should trigger AttributeError (response: 404)
- with pytest.raises(AttributeError):
- alb.similar()
- else:
- assert isinstance(alb.similar()[0], tidalapi.Album)
+ assert isinstance(alb.similar()[0], tidalapi.Album)
+ # if alb.id == 64522277:
+ # # Album with no similar albums should trigger MetadataNotAvailable (response: 404)
+ # # TODO Find an album with no similar albums related to it
+ # with pytest.raises(MetadataNotAvailable):
+ # alb.similar()
+ # else:
+ # assert isinstance(alb.similar()[0], tidalapi.Album)
+
+
+def test_album_not_found(session):
+ with pytest.raises(ObjectNotFound):
+ session.album(123456789)
def test_review(session):
diff --git a/tests/test_artist.py b/tests/test_artist.py
index 9fd15a6..82b021f 100644
--- a/tests/test_artist.py
+++ b/tests/test_artist.py
@@ -21,6 +21,7 @@
import requests
import tidalapi
+from tidalapi.exceptions import ObjectNotFound
from .cover import verify_image_cover
@@ -42,6 +43,11 @@ def test_artist(session):
assert requests.get(artist.image(160)).status_code == 200
+def test_artist_not_found(session):
+ with pytest.raises(ObjectNotFound):
+ session.artist(123456789)
+
+
def test_get_albums(session):
artist = session.artist(16147)
albums = [
diff --git a/tests/test_media.py b/tests/test_media.py
index 1f4ee78..03a360e 100644
--- a/tests/test_media.py
+++ b/tests/test_media.py
@@ -24,6 +24,7 @@
from dateutil import tz
import tidalapi
+from tidalapi.exceptions import MetadataNotAvailable
from .cover import verify_image_resolution, verify_video_resolution
@@ -47,7 +48,7 @@ def test_track(session):
assert track.version is None
assert (
track.copyright
- == "(P) 2019 MER under exclusive license to Sony Music Entertainment Sweden AB"
+ == "(P) 2019 Kreatell Music under exclusive license to Sony Music Entertainment Sweden AB"
)
assert track.isrc == "NOG841907010"
assert track.explicit is False
@@ -75,8 +76,8 @@ def test_lyrics(session):
def test_no_lyrics(session):
track = session.track(17626400)
- # Tracks with no lyrics should trigger AttributeError (response: 404)
- with pytest.raises(AttributeError):
+ # Tracks with no lyrics should trigger MetadataNotAvailable (response: 404)
+ with pytest.raises(MetadataNotAvailable):
track.lyrics()
diff --git a/tests/test_mix.py b/tests/test_mix.py
index 5c03eaf..ba0f934 100644
--- a/tests/test_mix.py
+++ b/tests/test_mix.py
@@ -17,7 +17,10 @@
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see .
+import pytest
+
import tidalapi
+from tidalapi.exceptions import ObjectNotFound
from .cover import verify_image_cover
@@ -32,3 +35,13 @@ def test_image(session):
mixes = session.mixes()
first = next(iter(mixes))
verify_image_cover(session, first, [320, 640, 1500])
+
+
+def test_mix_unavailable(session):
+ with pytest.raises(ObjectNotFound):
+ mix = session.mix("12345678")
+
+
+def test_mixv2_unavailable(session):
+ with pytest.raises(ObjectNotFound):
+ mix = session.mixv2("12345678")
diff --git a/tests/test_page.py b/tests/test_page.py
index 2e42ee7..b9e6606 100644
--- a/tests/test_page.py
+++ b/tests/test_page.py
@@ -44,8 +44,8 @@ def test_get_explore_items(session):
genres = explore.categories[1].show_more()
iterator = iter(genres)
next(iterator)
- assert next(iterator).title == "Africa"
assert next(iterator).title == "Blues"
+ assert next(iterator).title == "Classical"
def test_for_you(session):
@@ -97,7 +97,7 @@ def test_page_links(session):
def test_genres(session):
genres = session.genres()
first = next(iter(genres))
- assert first.title == "Africa"
+ assert first.title == "Blues"
assert isinstance(next(iter(first.get())), tidalapi.Playlist)
# NOTE local genres seems broken, and the first entry is no longer available
diff --git a/tests/test_playlist.py b/tests/test_playlist.py
index b8b4c2f..7c9e1a2 100644
--- a/tests/test_playlist.py
+++ b/tests/test_playlist.py
@@ -23,6 +23,7 @@
from dateutil import tz
import tidalapi
+from tidalapi.exceptions import ObjectNotFound
from .cover import verify_image_cover, verify_image_resolution
@@ -78,6 +79,11 @@ def test_updated_playlist(session):
assert creator.name == "user"
+def test_playlist_not_found(session):
+ with pytest.raises(ObjectNotFound):
+ session.playlist("12345678")
+
+
def test_video_playlist(session):
playlist = session.playlist("aa3611ff-5b25-4bbe-8ce4-36c678c3438f")
assert playlist.id == "aa3611ff-5b25-4bbe-8ce4-36c678c3438f"
diff --git a/tidalapi/__init__.py b/tidalapi/__init__.py
index 8eec358..cf50f98 100644
--- a/tidalapi/__init__.py
+++ b/tidalapi/__init__.py
@@ -19,4 +19,4 @@
User,
)
-__version__ = "0.7.4"
+__version__ = "0.7.5"
diff --git a/tidalapi/album.py b/tidalapi/album.py
index f381947..08bc40f 100644
--- a/tidalapi/album.py
+++ b/tidalapi/album.py
@@ -23,6 +23,7 @@
import dateutil.parser
+from tidalapi.exceptions import MetadataNotAvailable, ObjectNotFound
from tidalapi.types import JsonObj
if TYPE_CHECKING:
@@ -31,6 +32,7 @@
from tidalapi.page import Page
from tidalapi.session import Session
+
DEFAULT_ALBUM_IMAGE = (
"https://tidal.com/browse/assets/images/defaultImages/defaultAlbumImage.png"
)
@@ -68,11 +70,16 @@ class Album:
def __init__(self, session: "Session", album_id: Optional[str]):
self.session = session
- self.requests = session.request
+ self.request = session.request
self.artist = session.artist()
self.id = album_id
+
if self.id:
- self.requests.map_request(f"albums/{album_id}", parse=self.parse)
+ request = self.request.request("GET", "albums/%s" % self.id)
+ if request.status_code and request.status_code == 404:
+ raise ObjectNotFound("Album not found")
+ else:
+ self.request.map_json(request.json(), parse=self.parse)
def parse(
self,
@@ -127,7 +134,7 @@ def parse(
@property
def year(self) -> Optional[int]:
- """Convenience function to get the year using :class:`available_release_date`
+ """Get the year using :class:`available_release_date`
:return: An :any:`python:int` containing the year the track was released
"""
@@ -155,7 +162,8 @@ def tracks(self, limit: Optional[int] = None, offset: int = 0) -> List["Track"]:
:return: A list of the :class:`Tracks <.Track>` in the album.
"""
params = {"limit": limit, "offset": offset}
- tracks = self.requests.map_request(
+
+ tracks = self.request.map_request(
"albums/%s/tracks" % self.id, params, parse=self.session.parse_track
)
assert isinstance(tracks, list)
@@ -169,7 +177,7 @@ def items(self, limit: int = 100, offset: int = 0) -> List[Union["Track", "Video
:return: A list of :class:`Tracks<.Track>` and :class:`Videos`<.Video>`
"""
params = {"offset": offset, "limit": limit}
- items = self.requests.map_request(
+ items = self.request.map_request(
"albums/%s/items" % self.id, params=params, parse=self.session.parse_media
)
assert isinstance(items, list)
@@ -226,17 +234,18 @@ def page(self) -> "Page":
return self.session.page.get("pages/album", params={"albumId": self.id})
def similar(self) -> List["Album"]:
- """Retrieve albums similar to the current one. AttributeError is raised, when no
- similar albums exists.
+ """Retrieve albums similar to the current one. MetadataNotAvailable is raised,
+ when no similar albums exist.
:return: A :any:`list` of similar albums
"""
- json_obj = self.requests.map_request("albums/%s/similar" % self.id)
- if json_obj.get("status"):
- assert json_obj.get("status") == 404
- raise AttributeError("No similar albums exist for this album")
+ request = self.request.request("GET", "albums/%s/similar" % self.id)
+ if request.status_code and request.status_code == 404:
+ raise MetadataNotAvailable("No similar albums exist for this album")
else:
- albums = self.requests.map_json(json_obj, parse=self.session.parse_album)
+ albums = self.request.map_json(
+ request.json(), parse=self.session.parse_album
+ )
assert isinstance(albums, list)
return cast(List["Album"], albums)
@@ -247,7 +256,7 @@ def review(self) -> str:
:raises: :class:`requests.HTTPError` if there isn't a review yet
"""
# morguldir: TODO: Add parsing of wimplinks?
- review = self.requests.request("GET", "albums/%s/review" % self.id).json()[
+ review = self.request.request("GET", "albums/%s/review" % self.id).json()[
"text"
]
assert isinstance(review, str)
diff --git a/tidalapi/artist.py b/tidalapi/artist.py
index e8edc0b..6671bca 100644
--- a/tidalapi/artist.py
+++ b/tidalapi/artist.py
@@ -24,8 +24,9 @@
from typing import TYPE_CHECKING, List, Mapping, Optional, Union, cast
import dateutil.parser
-from typing_extensions import Never
+from typing_extensions import NoReturn
+from tidalapi.exceptions import ObjectNotFound
from tidalapi.types import JsonObj
if TYPE_CHECKING:
@@ -45,17 +46,26 @@ class Artist:
bio: Optional[str] = None
def __init__(self, session: "Session", artist_id: Optional[str]):
+ """Initialize the :class:`Artist` object, given a TIDAL artist ID :param
+ session: The current TIDAL :class:`Session` :param str artist_id: TIDAL artist
+ ID :raises: Raises :class:`exceptions.ObjectNotFound`"""
self.session = session
self.request = self.session.request
self.id = artist_id
+
if self.id:
- self.request.map_request(f"artists/{artist_id}", parse=self.parse_artist)
+ request = self.request.request("GET", "artists/%s" % self.id)
+ if request.status_code and request.status_code == 404:
+ raise ObjectNotFound("Artist not found")
+ else:
+ self.request.map_json(request.json(), parse=self.parse_artist)
def parse_artist(self, json_obj: JsonObj) -> "Artist":
- """
+ """Parses a TIDAL artist, replaces the current :class:`Artist` object. Made for
+ use within the python tidalapi module.
- :param json_obj:
- :return:
+ :param json_obj: :class:`JsonObj` containing the artist metadata
+ :return: Returns a copy of the :class:`Artist` object
"""
self.id = json_obj["id"]
self.name = json_obj["name"]
@@ -81,11 +91,11 @@ def parse_artist(self, json_obj: JsonObj) -> "Artist":
return copy.copy(self)
def parse_artists(self, json_obj: List[JsonObj]) -> List["Artist"]:
- """Parses a TIDAL artist, replaces the current artist object. Made for use
- inside of the python tidalapi module.
+ """Parses a list of TIDAL artists, returns a list of :class:`Artist` objects
+ Made for use within the python tidalapi module.
- :param json_obj: Json data returned from api.tidal.com containing an artist
- :return: Returns a copy of the original :exc: 'Artist': object
+ :param List[JsonObj] json_obj: List of :class:`JsonObj` containing the artist metadata for each artist
+ :return: Returns a list of :class:`Artist` objects
"""
return list(map(self.parse_artist, json_obj))
@@ -198,7 +208,7 @@ def get_radio(self) -> List["Track"]:
),
)
- def items(self) -> List[Never]:
+ def items(self) -> List[NoReturn]:
"""The artist page does not supply any items. This only exists for symmetry with
other model types.
diff --git a/tidalapi/exceptions.py b/tidalapi/exceptions.py
new file mode 100644
index 0000000..acef849
--- /dev/null
+++ b/tidalapi/exceptions.py
@@ -0,0 +1,34 @@
+class AuthenticationError(Exception):
+ pass
+
+
+class AssetNotAvailable(Exception):
+ pass
+
+
+class URLNotAvailable(Exception):
+ pass
+
+
+class StreamNotAvailable(Exception):
+ pass
+
+
+class MetadataNotAvailable(Exception):
+ pass
+
+
+class ObjectNotFound(Exception):
+ pass
+
+
+class UnknownManifestFormat(Exception):
+ pass
+
+
+class ManifestDecodeError(Exception):
+ pass
+
+
+class MPDNotAvailableError(Exception):
+ pass
diff --git a/tidalapi/media.py b/tidalapi/media.py
index ee0838f..1987264 100644
--- a/tidalapi/media.py
+++ b/tidalapi/media.py
@@ -25,7 +25,7 @@
import copy
from abc import abstractmethod
-from datetime import datetime
+from datetime import datetime, timedelta
from enum import Enum
from typing import TYPE_CHECKING, List, Optional, Union, cast
@@ -34,6 +34,22 @@
if TYPE_CHECKING:
import tidalapi
+import base64
+import json
+
+from isodate import parse_duration
+from mpegdash.parser import MPEGDASHParser
+
+from tidalapi.exceptions import (
+ AssetNotAvailable,
+ ManifestDecodeError,
+ MetadataNotAvailable,
+ MPDNotAvailableError,
+ ObjectNotFound,
+ StreamNotAvailable,
+ UnknownManifestFormat,
+ URLNotAvailable,
+)
from tidalapi.types import JsonObj
@@ -51,6 +67,84 @@ class VideoQuality(Enum):
low = "LOW"
+class AudioMode(Enum):
+ stereo = "STEREO"
+ sony_360 = "SONY_360RA"
+ dolby_atmos = "DOLBY_ATMOS"
+
+
+# class MediaMetadataTags(Enum):
+# mqa = 'MQA'
+# hires_lossless = 'HIRES_LOSSLESS'
+# lossless = 'LOSSLESS'
+# sony_360 = 'SONY_360RA'
+# dolby_atmos = 'DOLBY_ATMOS'
+
+
+class AudioExtensions(Enum):
+ FLAC = ".flac"
+ M4A = ".m4a"
+ MP4 = ".mp4"
+
+
+class VideoExtensions(Enum):
+ TS = ".ts"
+
+
+class ManifestMimeType(Enum):
+ # EMU: str = "application/vnd.tidal.emu"
+ # APPL: str = "application/vnd.apple.mpegurl"
+ MPD: str = "application/dash+xml"
+ BTS: str = "application/vnd.tidal.bts"
+ VIDEO: str = "video/mp2t"
+
+
+class Codec:
+ MP3: str = "MP3"
+ AAC: str = "AAC"
+ M4A: str = "MP4A"
+ FLAC: str = "FLAC"
+ MQA: str = "MQA"
+ Atmos: str = "EAC3"
+ AC4: str = "AC4"
+ SONY360RA: str = "MHA1"
+ LowResCodecs: [str] = [MP3, AAC, M4A]
+ PremiumCodecs: [str] = [MQA, Atmos, AC4]
+ HQCodecs: [str] = PremiumCodecs + [FLAC]
+
+
+class MimeType:
+ audio_mpeg = "audio/mpeg"
+ audio_mp3 = "audio/mp3"
+ audio_m4a = "audio/m4a"
+ audio_flac = "audio/flac"
+ audio_xflac = "audio/x-flac"
+ audio_eac3 = "audio/eac3"
+ audio_ac4 = "audio/mp4"
+ audio_m3u8 = "audio/mpegurl"
+ video_mp4 = "video/mp4"
+ video_m3u8 = "video/mpegurl"
+ audio_map = {
+ Codec.MP3: audio_mp3,
+ Codec.AAC: audio_m4a,
+ Codec.M4A: audio_m4a,
+ Codec.FLAC: audio_xflac,
+ Codec.MQA: audio_xflac,
+ Codec.Atmos: audio_eac3,
+ Codec.AC4: audio_ac4,
+ }
+
+ @staticmethod
+ def from_audio_codec(codec):
+ return MimeType.audio_map.get(codec, MimeType.audio_m4a)
+
+ @staticmethod
+ def is_FLAC(mime_type):
+ return (
+ True if mime_type in [MimeType.audio_flac, MimeType.audio_xflac] else False
+ )
+
+
class Media:
"""Base class for generic media, specifically :class:`Track` and :class:`Video`
@@ -191,13 +285,29 @@ def _get(self, media_id: str) -> "Track":
:param media_id: TIDAL's identifier of the track
:return: A :class:`Track` object containing all the information about the track
"""
- parse = self.parse_track
- track = self.requests.map_request("tracks/%s" % media_id, parse=parse)
- assert not isinstance(track, list)
- return cast("Track", track)
+
+ request = self.requests.request("GET", "tracks/%s" % media_id)
+ if request.status_code and request.status_code == 404:
+ # TODO Handle track not found or not available due to permissions
+ raise ObjectNotFound("Track not found or unavailable")
+ else:
+ json_obj = request.json()
+ track = self.requests.map_json(json_obj, parse=self.parse_track)
+ assert not isinstance(track, list)
+ return cast("Track", track)
def get_url(self) -> str:
- assert not self.session.is_pkce
+ """Retrieves the URL for a track.
+
+ :return: A `str` object containing the direct track URL
+ :raises: A :class:`exceptions.URLNotAvailable` if no URL is available for this track
+ """
+ if self.session.is_pkce:
+ raise URLNotAvailable(
+ "Track URL not available with quality:'{}'".format(
+ self.session.config.quality
+ )
+ )
params = {
"urlusagemode": "STREAM",
"audioquality": self.session.config.quality,
@@ -206,20 +316,23 @@ def get_url(self) -> str:
request = self.requests.request(
"GET", "tracks/%s/urlpostpaywall" % self.id, params
)
- return cast(str, request.json()["urls"][0])
+ if request.status_code and request.status_code == 404:
+ raise URLNotAvailable("URL not available for this track")
+ else:
+ json_obj = request.json()
+ return cast(str, json_obj["urls"][0])
def lyrics(self) -> "Lyrics":
"""Retrieves the lyrics for a song.
:return: A :class:`Lyrics` object containing the lyrics
- :raises: A :class:`requests.HTTPError` if there aren't any lyrics
+ :raises: A :class:`exceptions.MetadataNotAvailable` if there aren't any lyrics
"""
-
- json_obj = self.requests.map_request("tracks/%s/lyrics" % self.id)
- if json_obj.get("status"):
- assert json_obj.get("status") == 404
- raise AttributeError("No lyrics exists for this track")
+ request = self.requests.request("GET", "tracks/%s/lyrics" % self.id)
+ if request.status_code and request.status_code == 404:
+ raise MetadataNotAvailable("No lyrics exists for this track")
else:
+ json_obj = request.json()
lyrics = self.requests.map_json(json_obj, parse=Lyrics().parse)
assert not isinstance(lyrics, list)
return cast("Lyrics", lyrics)
@@ -229,30 +342,44 @@ def get_track_radio(self, limit: int = 100) -> List["Track"]:
to this track.
:return: A list of :class:`Tracks `
+ :raises: A :class:`exceptions.MetadataNotAvailable` if no track radio is available
"""
params = {"limit": limit}
- tracks = self.requests.map_request(
- "tracks/%s/radio" % self.id, params=params, parse=self.session.parse_track
+
+ request = self.requests.request(
+ "GET", "tracks/%s/radio" % self.id, params=params
)
- assert isinstance(tracks, list)
- return cast(List["Track"], tracks)
+ if request.status_code and request.status_code == 404:
+ raise MetadataNotAvailable("Track radio not available for this track")
+ else:
+ json_obj = request.json()
+ tracks = self.requests.map_json(json_obj, parse=self.session.parse_track)
+ assert isinstance(tracks, list)
+ return cast(List["Track"], tracks)
def get_stream(self) -> "Stream":
"""Retrieves the track streaming object, allowing for audio transmission.
:return: A :class:`Stream` object which holds audio file properties and
parameters needed for streaming via `MPEG-DASH` protocol.
+ :raises: A :class:`exceptions.StreamNotAvailable` if there is no stream available for this track
"""
params = {
"playbackmode": "STREAM",
"audioquality": self.session.config.quality,
"assetpresentation": "FULL",
}
- stream = self.requests.map_request(
- "tracks/%s/playbackinfopostpaywall" % self.id, params, parse=Stream().parse
+
+ request = self.requests.request(
+ "GET", "tracks/%s/playbackinfopostpaywall" % self.id, params
)
- assert not isinstance(stream, list)
- return cast("Stream", stream)
+ if request.status_code and request.status_code == 404:
+ raise StreamNotAvailable("Stream not available for this track")
+ else:
+ json_obj = request.json()
+ stream = self.requests.map_json(json_obj, parse=Stream().parse)
+ assert not isinstance(stream, list)
+ return cast("Stream", stream)
class Stream:
@@ -263,11 +390,18 @@ class Stream:
"""
track_id: int = -1
- audio_mode: str = ""
- audio_quality: str = "LOW"
+ audio_mode: str = AudioMode.stereo.value # STEREO, SONY_360RA, DOLBY_ATMOS
+ audio_quality: str = Quality.low_96k.value # LOW, HIGH, LOSSLESS, HI_RES
manifest_mime_type: str = ""
manifest_hash: str = ""
manifest: str = ""
+ asset_presentation: str = "FULL"
+ album_replay_gain: float = 1.0
+ album_peak_amplitude: float = 1.0
+ track_replay_gain: float = 1.0
+ track_peak_amplitude: float = 1.0
+ bit_depth: int = 16
+ sample_rate: int = 44100
def parse(self, json_obj: JsonObj) -> "Stream":
self.track_id = json_obj["trackId"]
@@ -276,11 +410,285 @@ def parse(self, json_obj: JsonObj) -> "Stream":
self.manifest_mime_type = json_obj["manifestMimeType"]
self.manifest_hash = json_obj["manifestHash"]
self.manifest = json_obj["manifest"]
+ self.album_replay_gain = json_obj["albumReplayGain"]
+ self.album_peak_amplitude = json_obj["albumPeakAmplitude"]
+ self.track_replay_gain = json_obj["trackReplayGain"]
+ self.track_peak_amplitude = json_obj["trackPeakAmplitude"]
+ if not (
+ self.audio_quality == Quality.low_96k.value
+ or self.audio_quality == Quality.low_320k.value
+ or self.audio_quality == Quality.hi_res.value
+ ):
+ # Bit depth, Sample rate not available for low quality modes. Assuming 16bit/44100Hz
+ self.bit_depth = json_obj["bitDepth"]
+ self.sample_rate = json_obj["sampleRate"]
return copy.copy(self)
+ def get_stream_manifest(self) -> "StreamManifest":
+ return StreamManifest(self)
+
+ def get_manifest_data(self) -> str:
+ try:
+ # Stream Manifest is base64 encoded.
+ return base64.b64decode(self.manifest).decode("utf-8")
+ except:
+ raise ManifestDecodeError
+
+ @property
+ def is_MPD(self) -> bool:
+ return True if ManifestMimeType.MPD.value in self.manifest_mime_type else False
+
+ @property
+ def is_BTS(self) -> bool:
+ return True if ManifestMimeType.BTS.value in self.manifest_mime_type else False
+
+
+class StreamManifest:
+ """An object containing a parsed StreamManifest."""
+
+ manifest: str = None
+ manifest_mime_type: str = None
+ manifest_parsed: str = None
+ codecs: str = None # MP3, AAC, FLAC, ALAC, MQA, EAC3, AC4, MHA1
+ encryption_key = None
+ encryption_type = None
+ sample_rate: int = 44100
+ urls: [str] = []
+ mime_type: MimeType = MimeType.audio_mpeg
+ file_extension = None
+ dash_info: DashInfo = None
+
+ def __init__(self, stream: Stream):
+ self.manifest = stream.manifest
+ self.manifest_mime_type = stream.manifest_mime_type
+ if stream.is_MPD:
+ # See https://ottverse.com/structure-of-an-mpeg-dash-mpd/ for more details
+ self.dash_info = DashInfo.from_mpd(stream.get_manifest_data())
+ self.urls = self.dash_info.urls
+ self.codecs = self.dash_info.codecs
+ self.mime_type = self.dash_info.mime_type
+ self.sample_rate = self.dash_info.audio_sampling_rate
+ # TODO: Handle encryption key.
+ self.encryption_type = "NONE"
+ self.encryption_key = None
+ elif stream.is_BTS:
+ # Stream Manifest is base64 encoded.
+ self.manifest_parsed = stream.get_manifest_data()
+ # JSON string to object.
+ stream_manifest = json.loads(self.manifest_parsed)
+ # TODO: Handle more than one download URL
+ self.urls = stream_manifest["urls"]
+ self.codecs = stream_manifest["codecs"].upper().split(".")[0]
+ self.mime_type = stream_manifest["mimeType"]
+ self.encryption_type = stream_manifest["encryptionType"]
+ self.encryption_key = (
+ stream_manifest["encryptionKey"] if self.is_encrypted else None
+ )
+ else:
+ raise UnknownManifestFormat
+
+ self.file_extension = self.get_file_extension(self.urls[0], self.codecs)
+
+ def get_urls(self) -> [str]:
+ if self.is_MPD:
+ return self.urls
+ else:
+ return self.urls[0]
+
+ def get_hls(self) -> str:
+ if self.is_MPD:
+ return self.dash_info.get_hls()
+ else:
+ raise MPDNotAvailableError("HLS stream requires MPD MetaData")
+
+ def get_codecs(self) -> str:
+ return self.codecs
+
+ def get_sampling_rate(self) -> int:
+ return self.dash_info.audio_sampling_rate
+
+ @staticmethod
+ def get_mimetype(stream_codec, stream_url: Optional[str] = None) -> str:
+ if stream_codec:
+ return MimeType.from_audio_codec(stream_codec)
+ if not stream_url:
+ return MimeType.audio_m4a
+ else:
+ if AudioExtensions.FLAC.value in stream_url:
+ return MimeType.audio_xflac
+ elif AudioExtensions.MP4.value in stream_url:
+ return MimeType.audio_m4a
+
+ @staticmethod
+ def get_file_extension(stream_url: str, stream_codec: Optional[str] = None) -> str:
+ if AudioExtensions.FLAC.value in stream_url:
+ result: str = AudioExtensions.FLAC.value
+ elif AudioExtensions.MP4.value in stream_url:
+ if "ac4" in stream_codec or "mha1" in stream_codec:
+ result = ".mp4"
+ elif "flac" in stream_codec:
+ result = ".flac"
+ else:
+ result = ".m4a"
+ result: str = AudioExtensions.MP4.value
+ elif VideoExtensions.TS.value in stream_url:
+ result: str = VideoExtensions.TS.value
+ else:
+ result: str = AudioExtensions.M4A.value
+
+ return result
+
+ @property
+ def is_encrypted(self) -> bool:
+ return True if self.encryption_key else False
+
+ @property
+ def is_MPD(self) -> bool:
+ return True if ManifestMimeType.MPD.value in self.manifest_mime_type else False
+
+ @property
+ def is_BTS(self) -> bool:
+ return True if ManifestMimeType.BTS.value in self.manifest_mime_type else False
+
+
+class DashInfo:
+ """An object containing the decoded MPEG-DASH / MPD manifest."""
+
+ duration: datetime = timedelta()
+ content_type: str = "audio"
+ mime_type: MimeType = MimeType.audio_ac4
+ codecs: str = Codec.FLAC
+ first_url: str = ""
+ media_url: str = ""
+ timescale: int = 44100
+ audio_sampling_rate: int = 44100
+ chunk_size: int = -1
+ last_chunk_size: int = -1
+ urls: [str] = [""]
+
+ @staticmethod
+ def from_stream(stream) -> "DashInfo":
+ try:
+ if stream.is_MPD and not stream.is_encrypted:
+ return DashInfo(stream.get_manifest_data())
+ except:
+ raise ManifestDecodeError
+
+ @staticmethod
+ def from_mpd(mpd_manifest) -> "DashInfo":
+ try:
+ return DashInfo(mpd_manifest)
+ except:
+ raise ManifestDecodeError
+
+ def __init__(self, mpd_xml):
+ mpd = MPEGDASHParser.parse(
+ mpd_xml.split("")[1]
+ )
+
+ self.duration = parse_duration(mpd.media_presentation_duration)
+ self.content_type = mpd.periods[0].adaptation_sets[0].content_type
+ self.mime_type = mpd.periods[0].adaptation_sets[0].mime_type
+ self.codecs = mpd.periods[0].adaptation_sets[0].representations[0].codecs
+ self.first_url = (
+ mpd.periods[0]
+ .adaptation_sets[0]
+ .representations[0]
+ .segment_templates[0]
+ .initialization
+ )
+ self.media_url = (
+ mpd.periods[0]
+ .adaptation_sets[0]
+ .representations[0]
+ .segment_templates[0]
+ .media
+ )
+ # self.startNumber = mpd.periods[0].adaptation_sets[0].representations[0].segment_templates[0].start_number
+ self.timescale = (
+ mpd.periods[0]
+ .adaptation_sets[0]
+ .representations[0]
+ .segment_templates[0]
+ .timescale
+ )
+ self.audio_sampling_rate = int(
+ mpd.periods[0].adaptation_sets[0].representations[0].audio_sampling_rate
+ )
+ self.chunk_size = (
+ mpd.periods[0]
+ .adaptation_sets[0]
+ .representations[0]
+ .segment_templates[0]
+ .segment_timelines[0]
+ .Ss[0]
+ .d
+ )
+ # self.chunkcount = mpd.periods[0].adaptation_sets[0].representations[0].segment_templates[0].segment_timelines[0].Ss[0].r + 1
+ self.last_chunk_size = (
+ mpd.periods[0]
+ .adaptation_sets[0]
+ .representations[0]
+ .segment_templates[0]
+ .segment_timelines[0]
+ .Ss[1]
+ .d
+ )
+
+ self.urls = self.get_urls(mpd)
+
+ @staticmethod
+ def get_urls(mpd) -> list[str]:
+ # min segments count; i.e. .initialization + the very first of .media;
+ # See https://developers.broadpeak.io/docs/foundations-dash
+ segments_count = 1 + 1
+
+ for s in (
+ mpd.periods[0]
+ .adaptation_sets[0]
+ .representations[0]
+ .segment_templates[0]
+ .segment_timelines[0]
+ .Ss
+ ):
+ segments_count += s.r if s.r else 1
+
+ # Populate segment urls.
+ segment_template = (
+ mpd.periods[0]
+ .adaptation_sets[0]
+ .representations[0]
+ .segment_templates[0]
+ .media
+ )
+ stream_urls: list[str] = []
+
+ for index in range(segments_count):
+ stream_urls.append(segment_template.replace("$Number$", str(index)))
+
+ return stream_urls
+
+ def get_hls(self) -> str:
+ hls = "#EXTM3U\n"
+ hls += "#EXT-X-TARGETDURATION:%s\n" % int(self.duration.seconds)
+ hls += "#EXT-X-VERSION:3\n"
+ items = self.urls
+ chunk_duration = "#EXTINF:%0.3f,\n" % (
+ float(self.chunk_size) / float(self.timescale)
+ )
+ hls += "\n".join(chunk_duration + item for item in items[0:-1])
+ chunk_duration = "#EXTINF:%0.3f,\n" % (
+ float(self.last_chunk_size) / float(self.timescale)
+ )
+ hls += "\n" + chunk_duration + items[-1] + "\n"
+ hls += "#EXT-X-ENDLIST\n"
+ return hls
+
class Lyrics:
+ """An object containing lyrics for a track."""
+
track_id: int = -1
provider: str = ""
provider_track_id: int = -1
@@ -328,21 +736,36 @@ def _get(self, media_id: str) -> Video:
:param media_id: TIDAL's identifier of the video
:return: A :class:`Video` object containing all the information about the video.
"""
- parse = self.parse_video
- video = self.requests.map_request("videos/%s" % media_id, parse=parse)
- assert not isinstance(video, list)
- return cast("Video", video)
+
+ request = self.requests.request("GET", "videos/%s" % self.id)
+ if request.status_code and request.status_code == 404:
+ raise ObjectNotFound("Video not found or unavailable")
+ else:
+ json_obj = request.json()
+ video = self.requests.map_json(json_obj, parse=self.parse_video)
+ assert not isinstance(video, list)
+ return cast("Video", video)
def get_url(self) -> str:
+ """Retrieves the URL for a video.
+
+ :return: A `str` object containing the direct video URL
+ :raises: A :class:`exceptions.URLNotAvailable` if no URL is available for this video
+ """
params = {
"urlusagemode": "STREAM",
"videoquality": self.session.config.video_quality,
"assetpresentation": "FULL",
}
+
request = self.requests.request(
"GET", "videos/%s/urlpostpaywall" % self.id, params
)
- return cast(str, request.json()["urls"][0])
+ if request.status_code and request.status_code == 404:
+ raise URLNotAvailable("URL not available for this video")
+ else:
+ json_obj = request.json()
+ return cast(str, json_obj["urls"][0])
def image(self, width: int = 1080, height: int = 720) -> str:
if (width, height) not in [(160, 107), (480, 320), (750, 500), (1080, 720)]:
diff --git a/tidalapi/mix.py b/tidalapi/mix.py
index 86a3dd8..ecd69c5 100644
--- a/tidalapi/mix.py
+++ b/tidalapi/mix.py
@@ -27,6 +27,7 @@
import dateutil.parser
+from tidalapi.exceptions import ObjectNotFound
from tidalapi.types import JsonObj
if TYPE_CHECKING:
@@ -93,13 +94,16 @@ def get(self, mix_id: Optional[str] = None) -> "Mix":
mix_id = self.id
params = {"mixId": mix_id, "deviceType": "BROWSER"}
- parse = self.session.parse_page
- result = self.request.map_request("pages/mix", parse=parse, params=params)
- assert not isinstance(result, list)
- self._retrieved = True
- self.__dict__.update(result.categories[0].__dict__)
- self._items = result.categories[1].items
- return self
+ request = self.request.request("GET", "pages/mix", params=params)
+ if request.status_code and request.status_code == 404:
+ raise ObjectNotFound("Mix not found")
+ else:
+ result = self.session.parse_page(request.json())
+ assert not isinstance(result, list)
+ self._retrieved = True
+ self.__dict__.update(result.categories[0].__dict__)
+ self._items = result.categories[1].items
+ return self
def parse(self, json_obj: JsonObj) -> "Mix":
"""Parse a mix into a :class:`Mix`, replaces the calling object.
@@ -167,6 +171,8 @@ class MixV2:
"""A mix from TIDALs v2 api endpoint, weirdly, it is used in only one place
currently."""
+ # tehkillerbee: TODO Doesn't look like this is using the v2 endpoint anyways!?
+
date_added: Optional[datetime] = None
title: str = ""
id: str = ""
@@ -196,13 +202,16 @@ def get(self, mix_id: Optional[str] = None) -> "Mix":
mix_id = self.id
params = {"mixId": mix_id, "deviceType": "BROWSER"}
- parse = self.session.parse_page
- result = self.request.map_request("pages/mix", parse=parse, params=params)
- assert not isinstance(result, list)
- self._retrieved = True
- self.__dict__.update(result.categories[0].__dict__)
- self._items = result.categories[1].items
- return self
+ request = self.request.request("GET", "pages/mix", params=params)
+ if request.status_code and request.status_code == 404:
+ raise ObjectNotFound("Mix not found")
+ else:
+ result = self.session.parse_page(request.json())
+ assert not isinstance(result, list)
+ self._retrieved = True
+ self.__dict__.update(result.categories[0].__dict__)
+ self._items = result.categories[1].items
+ return self
def parse(self, json_obj: JsonObj) -> "MixV2":
"""Parse a mix into a :class:`MixV2`, replaces the calling object.
diff --git a/tidalapi/playlist.py b/tidalapi/playlist.py
index d818871..efaa463 100644
--- a/tidalapi/playlist.py
+++ b/tidalapi/playlist.py
@@ -24,6 +24,7 @@
from datetime import datetime
from typing import TYPE_CHECKING, List, Optional, Sequence, Union, cast
+from tidalapi.exceptions import ObjectNotFound
from tidalapi.types import JsonObj
from tidalapi.user import LoggedInUser
@@ -62,12 +63,15 @@ class Playlist:
def __init__(self, session: "Session", playlist_id: Optional[str]):
self.id = playlist_id
self.session = session
- self.requests = session.request
+ self.request = session.request
self._base_url = "playlists/%s"
if playlist_id:
- request = self.requests.request("GET", self._base_url % playlist_id)
- self._etag = request.headers["etag"]
- self.parse(request.json())
+ request = self.request.request("GET", self._base_url % self.id)
+ if request.status_code and request.status_code == 404:
+ raise ObjectNotFound("Playlist not found")
+ else:
+ self._etag = request.headers["etag"]
+ self.parse(request.json())
def parse(self, json_obj: JsonObj) -> "Playlist":
"""Parses a playlist from tidal, replaces the current playlist object.
@@ -144,12 +148,12 @@ def tracks(self, limit: Optional[int] = None, offset: int = 0) -> List["Track"]:
:return: A list of :class:`Tracks <.Track>`
"""
params = {"limit": limit, "offset": offset}
- request = self.requests.request(
+ request = self.request.request(
"GET", self._base_url % self.id + "/tracks", params=params
)
self._etag = request.headers["etag"]
return list(
- self.requests.map_json(
+ self.request.map_json(
json_obj=request.json(), parse=self.session.parse_track
)
)
@@ -162,12 +166,12 @@ def items(self, limit: int = 100, offset: int = 0) -> List[Union["Track", "Video
:return: A list of :class:`Tracks<.Track>` and :class:`Videos<.Video>`
"""
params = {"limit": limit, "offset": offset}
- request = self.requests.request(
+ request = self.request.request(
"GET", self._base_url % self.id + "/items", params=params
)
self._etag = request.headers["etag"]
return list(
- self.requests.map_json(request.json(), parse=self.session.parse_media)
+ self.request.map_json(request.json(), parse=self.session.parse_media)
)
def image(self, dimensions: int = 480) -> str:
@@ -213,9 +217,9 @@ def wide_image(self, width: int = 1080, height: int = 720) -> str:
class UserPlaylist(Playlist):
def _reparse(self) -> None:
- request = self.requests.request("GET", self._base_url % self.id)
+ request = self.request.request("GET", self._base_url % self.id)
self._etag = request.headers["etag"]
- self.requests.map_json(request.json(), parse=self.parse)
+ self.request.map_json(request.json(), parse=self.parse)
def edit(
self, title: Optional[str] = None, description: Optional[str] = None
@@ -226,10 +230,10 @@ def edit(
description = self.description
data = {"title": title, "description": description}
- self.requests.request("POST", self._base_url % self.id, data=data)
+ self.request.request("POST", self._base_url % self.id, data=data)
def delete(self) -> None:
- self.requests.request("DELETE", self._base_url % self.id)
+ self.request.request("DELETE", self._base_url % self.id)
def add(self, media_ids: List[str]) -> None:
data = {
@@ -239,7 +243,7 @@ def add(self, media_ids: List[str]) -> None:
}
params = {"limit": 100}
headers = {"If-None-Match": self._etag} if self._etag else None
- self.requests.request(
+ self.request.request(
"POST",
self._base_url % self.id + "/items",
params=params,
@@ -250,14 +254,14 @@ def add(self, media_ids: List[str]) -> None:
def remove_by_index(self, index: int) -> None:
headers = {"If-None-Match": self._etag} if self._etag else None
- self.requests.request(
+ self.request.request(
"DELETE", (self._base_url + "/items/%i") % (self.id, index), headers=headers
)
def remove_by_indices(self, indices: Sequence[int]) -> None:
headers = {"If-None-Match": self._etag} if self._etag else None
track_index_string = ",".join([str(x) for x in indices])
- self.requests.request(
+ self.request.request(
"DELETE",
(self._base_url + "/tracks/%s") % (self.id, track_index_string),
headers=headers,
diff --git a/tidalapi/session.py b/tidalapi/session.py
index bb578ca..1e25954 100644
--- a/tidalapi/session.py
+++ b/tidalapi/session.py
@@ -50,6 +50,7 @@
import requests
+from tidalapi.exceptions import *
from tidalapi.types import JsonObj
from . import album, artist, genre, media, mix, page, playlist, request, user
@@ -57,7 +58,7 @@
if TYPE_CHECKING:
from tidalapi.user import FetchedUser, LoggedInUser, PlaylistCreator
-log = logging.getLogger("__NAME__")
+log = logging.getLogger(__name__)
SearchTypes: List[Optional[Any]] = [
artist.Artist,
album.Album,
@@ -380,6 +381,7 @@ def load_oauth_session(
access_token: str,
refresh_token: Optional[str] = None,
expiry_time: Optional[datetime.datetime] = None,
+ is_pkce: Optional[bool] = False,
) -> bool:
"""Login to TIDAL using details from a previous OAuth login, automatically
refreshes expired access tokens if refresh_token is supplied as well.
@@ -389,12 +391,14 @@ def load_oauth_session(
:param refresh_token: (Optional) A refresh token that lets you get a new access
token after it has expired
:param expiry_time: (Optional) The datetime the access token will expire
+ :param is_pkce: (Optional) Is session pkce?
:return: True if we believe the login was successful, otherwise false.
"""
self.token_type = token_type
self.access_token = access_token
self.refresh_token = refresh_token
self.expiry_time = expiry_time
+ self.is_pkce = is_pkce
request = self.request.request("GET", "sessions")
json = request.json()
@@ -434,35 +438,32 @@ def login(self, username: str, password: str) -> bool:
return True
def login_session_file(
- self, session_file: Path, do_pkce: Optional[bool] = False
+ self,
+ session_file: Path,
+ do_pkce: Optional[bool] = False,
) -> bool:
"""Logs in to the TIDAL api using an existing OAuth/PKCE session file. If no
session json file exists, a new one will be created after successful login.
:param session_file: The session json file
:param do_pkce: Perform PKCE login. Default: Use OAuth logon
+ :param fn_print: function for printing login prompts
:return: Returns true if we think the login was successful.
"""
- try:
- # attempt to reload existing session from file
- with open(session_file) as f:
- log.info("Loading OAuth session from %s...", session_file)
- data = json.load(f)
- self._load_session_from_file(**data)
- except Exception as e:
- log.info("Could not load OAuth session from %s: %s", session_file, e)
+ self.load_session_from_file(session_file)
+ # Session could not be loaded, attempt to create a new session
if not self.check_login():
if do_pkce:
- log.info("Creating new PKCE session...")
+ log.info("Creating new session (PKCE)...")
self.login_pkce()
else:
- log.info("Creating new OAuth session...")
+ log.info("Creating new session (OAuth)...")
self.login_oauth_simple()
if self.check_login():
log.info("TIDAL Login OK")
- self._save_session_to_file(session_file)
+ self.save_session_to_file(session_file)
return True
else:
log.info("TIDAL Login KO")
@@ -598,7 +599,7 @@ def login_oauth(self) -> Tuple[LinkLogin, concurrent.futures.Future[Any]]:
login, future = self._login_with_link()
return login, future
- def _save_session_to_file(self, oauth_file: Path):
+ def save_session_to_file(self, session_file: Path):
# create a new session
if self.check_login():
# store current session session
@@ -607,22 +608,31 @@ def _save_session_to_file(self, oauth_file: Path):
"session_id": {"data": self.session_id},
"access_token": {"data": self.access_token},
"refresh_token": {"data": self.refresh_token},
+ "is_pkce": {"data": self.is_pkce},
# "expiry_time": {"data": self.expiry_time},
}
- with oauth_file.open("w") as outfile:
+ with session_file.open("w") as outfile:
json.dump(data, outfile)
- self._oauth_saved = True
- def _load_session_from_file(self, **data):
+ def load_session_from_file(self, session_file: Path):
+ try:
+ with open(session_file) as f:
+ log.info("Loading session from %s...", session_file)
+ data = json.load(f)
+ except Exception as e:
+ log.info("Could not load session from %s: %s", session_file, e)
+ return False
+
assert self, "No session loaded"
args = {
"token_type": data.get("token_type", {}).get("data"),
"access_token": data.get("access_token", {}).get("data"),
"refresh_token": data.get("refresh_token", {}).get("data"),
+ "is_pkce": data.get("is_pkce", {}).get("data"),
# "expiry_time": data.get("expiry_time", {}).get("data"),
}
- self.load_oauth_session(**args)
+ return self.load_oauth_session(**args)
def _login_with_link(self) -> Tuple[LinkLogin, concurrent.futures.Future[Any]]:
url = "https://auth.tidal.com/v1/oauth2/device_authorization"
@@ -699,12 +709,19 @@ def token_refresh(self, refresh_token: str) -> bool:
params = {
"grant_type": "refresh_token",
"refresh_token": refresh_token,
- "client_id": self.config.client_id,
- "client_secret": self.config.client_secret,
+ "client_id": self.config.client_id_pkce
+ if self.is_pkce
+ else self.config.client_id,
+ "client_secret": self.config.client_secret_pkce
+ if self.is_pkce
+ else self.config.client_secret,
}
request = self.request_session.post(url, params)
json = request.json()
+ if request.status_code != 200:
+ raise AuthenticationError("Authentication failed")
+ # raise AuthenticationError(Authentication failed json["error"], json["error_description"])
if not request.ok:
log.warning("The refresh token has expired, a new login is required.")
return False
@@ -808,8 +825,11 @@ def playlist(
:param playlist_id: (Optional) The TIDAL id of the playlist. You may want access to the methods without an id.
:return: Returns a :class:`.Playlist` object that has access to the session instance used.
"""
-
- return playlist.Playlist(session=self, playlist_id=playlist_id).factory()
+ try:
+ return playlist.Playlist(session=self, playlist_id=playlist_id).factory()
+ except ObjectNotFound:
+ log.warning("Playlist '%s' is unavailable", playlist_id)
+ raise
def track(
self, track_id: Optional[str] = None, with_album: bool = False
@@ -822,14 +842,16 @@ def track(
:param with_album: (Optional) Whether to fetch the complete :class:`.Album` for the track or not
:return: Returns a :class:`.Track` object that has access to the session instance used.
"""
-
- item = media.Track(session=self, media_id=track_id)
- if item.album and with_album:
- album = self.album(item.album.id)
- if album:
- item.album = album
-
- return item
+ try:
+ item = media.Track(session=self, media_id=track_id)
+ if item.album and with_album:
+ alb = self.album(item.album.id)
+ if alb:
+ item.album = alb
+ return item
+ except ObjectNotFound:
+ log.warning("Track '%s' is unavailable", track_id)
+ raise
def video(self, video_id: Optional[str] = None) -> media.Video:
"""Function to create a Video object with access to the session instance in a
@@ -839,8 +861,11 @@ def video(self, video_id: Optional[str] = None) -> media.Video:
:param video_id: (Optional) The TIDAL id of the Video. You may want access to the methods without an id.
:return: Returns a :class:`.Video` object that has access to the session instance used.
"""
-
- return media.Video(session=self, media_id=video_id)
+ try:
+ return media.Video(session=self, media_id=video_id)
+ except ObjectNotFound:
+ log.warning("Video '%s' is unavailable", video_id)
+ raise
def artist(self, artist_id: Optional[str] = None) -> artist.Artist:
"""Function to create a Artist object with access to the session instance in a
@@ -850,8 +875,11 @@ def artist(self, artist_id: Optional[str] = None) -> artist.Artist:
:param artist_id: (Optional) The TIDAL id of the Artist. You may want access to the methods without an id.
:return: Returns a :class:`.Artist` object that has access to the session instance used.
"""
-
- return artist.Artist(session=self, artist_id=artist_id)
+ try:
+ return artist.Artist(session=self, artist_id=artist_id)
+ except ObjectNotFound:
+ log.warning("Artist '%s' is unavailable", artist_id)
+ raise
def album(self, album_id: Optional[str] = None) -> album.Album:
"""Function to create a Album object with access to the session instance in a
@@ -861,8 +889,11 @@ def album(self, album_id: Optional[str] = None) -> album.Album:
:param album_id: (Optional) The TIDAL id of the Album. You may want access to the methods without an id.
:return: Returns a :class:`.Album` object that has access to the session instance used.
"""
-
- return album.Album(session=self, album_id=album_id)
+ try:
+ return album.Album(session=self, album_id=album_id)
+ except ObjectNotFound:
+ log.warning("Album '%s' is unavailable", album_id)
+ raise
def mix(self, mix_id: Optional[str] = None) -> mix.Mix:
"""Function to create a mix object with access to the session instance smoothly
@@ -871,8 +902,11 @@ def mix(self, mix_id: Optional[str] = None) -> mix.Mix:
:param mix_id: (Optional) The TIDAL id of the Mix. You may want access to the mix methods without an id.
:return: Returns a :class:`.Mix` object that has access to the session instance used.
"""
-
- return mix.Mix(session=self, mix_id=mix_id)
+ try:
+ return mix.Mix(session=self, mix_id=mix_id)
+ except ObjectNotFound:
+ log.warning("Mix '%s' is unavailable", mix_id)
+ raise
def mixv2(self, mix_id=None) -> mix.MixV2:
"""Function to create a mix object with access to the session instance smoothly
@@ -882,8 +916,11 @@ def mixv2(self, mix_id=None) -> mix.MixV2:
:param mix_id: (Optional) The TIDAL id of the Mix. You may want access to the mix methods without an id.
:return: Returns a :class:`.MixV2` object that has access to the session instance used.
"""
-
- return mix.MixV2(session=self, mix_id=mix_id)
+ try:
+ return mix.MixV2(session=self, mix_id=mix_id)
+ except ObjectNotFound:
+ log.warning("Mix '%s' is unavailable", mix_id)
+ raise
def get_user(
self, user_id: Optional[int] = None