1#
2# pylast -
3#     A Python interface to Last.fm and Libre.fm
4#
5# Copyright 2008-2010 Amr Hassan
6# Copyright 2013-2021 hugovk
7#
8# Licensed under the Apache License, Version 2.0 (the "License");
9# you may not use this file except in compliance with the License.
10# You may obtain a copy of the License at
11#
12#     https://www.apache.org/licenses/LICENSE-2.0
13#
14# Unless required by applicable law or agreed to in writing, software
15# distributed under the License is distributed on an "AS IS" BASIS,
16# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17# See the License for the specific language governing permissions and
18# limitations under the License.
19#
20# https://github.com/pylast/pylast
21
22import collections
23import hashlib
24import html.entities
25import logging
26import os
27import shelve
28import ssl
29import tempfile
30import time
31import xml.dom
32from http.client import HTTPSConnection
33from urllib.parse import quote_plus
34from xml.dom import Node, minidom
35
36try:
37    # Python 3.8+
38    import importlib.metadata as importlib_metadata
39except ImportError:
40    # Python 3.7 and lower
41    import importlib_metadata
42
43__author__ = "Amr Hassan, hugovk, Mice Pápai"
44__copyright__ = "Copyright (C) 2008-2010 Amr Hassan, 2013-2021 hugovk, 2017 Mice Pápai"
45__license__ = "apache2"
46__email__ = "amr.hassan@gmail.com"
47__version__ = importlib_metadata.version(__name__)
48
49
50# 1 : This error does not exist
51STATUS_INVALID_SERVICE = 2
52STATUS_INVALID_METHOD = 3
53STATUS_AUTH_FAILED = 4
54STATUS_INVALID_FORMAT = 5
55STATUS_INVALID_PARAMS = 6
56STATUS_INVALID_RESOURCE = 7
57STATUS_OPERATION_FAILED = 8
58STATUS_INVALID_SK = 9
59STATUS_INVALID_API_KEY = 10
60STATUS_OFFLINE = 11
61STATUS_SUBSCRIBERS_ONLY = 12
62STATUS_INVALID_SIGNATURE = 13
63STATUS_TOKEN_UNAUTHORIZED = 14
64STATUS_TOKEN_EXPIRED = 15
65STATUS_TEMPORARILY_UNAVAILABLE = 16
66STATUS_LOGIN_REQUIRED = 17
67STATUS_TRIAL_EXPIRED = 18
68# 19 : This error does not exist
69STATUS_NOT_ENOUGH_CONTENT = 20
70STATUS_NOT_ENOUGH_MEMBERS = 21
71STATUS_NOT_ENOUGH_FANS = 22
72STATUS_NOT_ENOUGH_NEIGHBOURS = 23
73STATUS_NO_PEAK_RADIO = 24
74STATUS_RADIO_NOT_FOUND = 25
75STATUS_API_KEY_SUSPENDED = 26
76STATUS_DEPRECATED = 27
77# 28 : This error is not documented
78STATUS_RATE_LIMIT_EXCEEDED = 29
79
80PERIOD_OVERALL = "overall"
81PERIOD_7DAYS = "7day"
82PERIOD_1MONTH = "1month"
83PERIOD_3MONTHS = "3month"
84PERIOD_6MONTHS = "6month"
85PERIOD_12MONTHS = "12month"
86
87DOMAIN_ENGLISH = 0
88DOMAIN_GERMAN = 1
89DOMAIN_SPANISH = 2
90DOMAIN_FRENCH = 3
91DOMAIN_ITALIAN = 4
92DOMAIN_POLISH = 5
93DOMAIN_PORTUGUESE = 6
94DOMAIN_SWEDISH = 7
95DOMAIN_TURKISH = 8
96DOMAIN_RUSSIAN = 9
97DOMAIN_JAPANESE = 10
98DOMAIN_CHINESE = 11
99
100SIZE_SMALL = 0
101SIZE_MEDIUM = 1
102SIZE_LARGE = 2
103SIZE_EXTRA_LARGE = 3
104SIZE_MEGA = 4
105
106IMAGES_ORDER_POPULARITY = "popularity"
107IMAGES_ORDER_DATE = "dateadded"
108
109
110SCROBBLE_SOURCE_USER = "P"
111SCROBBLE_SOURCE_NON_PERSONALIZED_BROADCAST = "R"
112SCROBBLE_SOURCE_PERSONALIZED_BROADCAST = "E"
113SCROBBLE_SOURCE_LASTFM = "L"
114SCROBBLE_SOURCE_UNKNOWN = "U"
115
116SCROBBLE_MODE_PLAYED = ""
117SCROBBLE_MODE_LOVED = "L"
118SCROBBLE_MODE_BANNED = "B"
119SCROBBLE_MODE_SKIPPED = "S"
120
121# Delay time in seconds from section 4.4 of https://www.last.fm/api/tos
122DELAY_TIME = 0.2
123
124# Python >3.4 has sane defaults
125SSL_CONTEXT = ssl.create_default_context()
126
127logger = logging.getLogger(__name__)
128logging.getLogger(__name__).addHandler(logging.NullHandler())
129
130
131class _Network:
132    """
133    A music social network website such as Last.fm or
134    one with a Last.fm-compatible API.
135    """
136
137    def __init__(
138        self,
139        name,
140        homepage,
141        ws_server,
142        api_key,
143        api_secret,
144        session_key,
145        username,
146        password_hash,
147        domain_names,
148        urls,
149        token=None,
150    ):
151        """
152        name: the name of the network
153        homepage: the homepage URL
154        ws_server: the URL of the webservices server
155        api_key: a provided API_KEY
156        api_secret: a provided API_SECRET
157        session_key: a generated session_key or None
158        username: a username of a valid user
159        password_hash: the output of pylast.md5(password) where password is
160            the user's password
161        domain_names: a dict mapping each DOMAIN_* value to a string domain
162            name
163        urls: a dict mapping types to URLs
164        token: an authentication token to retrieve a session
165
166        if username and password_hash were provided and not session_key,
167        session_key will be generated automatically when needed.
168
169        Either a valid session_key or a combination of username and
170        password_hash must be present for scrobbling.
171
172        You should use a preconfigured network object through a
173        get_*_network(...) method instead of creating an object
174        of this class, unless you know what you're doing.
175        """
176
177        self.name = name
178        self.homepage = homepage
179        self.ws_server = ws_server
180        self.api_key = api_key
181        self.api_secret = api_secret
182        self.session_key = session_key
183        self.username = username
184        self.password_hash = password_hash
185        self.domain_names = domain_names
186        self.urls = urls
187
188        self.cache_backend = None
189        self.proxy_enabled = False
190        self.proxy = None
191        self.last_call_time = 0
192        self.limit_rate = False
193
194        # Load session_key and username from authentication token if provided
195        if token and not self.session_key:
196            sk_gen = SessionKeyGenerator(self)
197            self.session_key, self.username = sk_gen.get_web_auth_session_key_username(
198                url=None, token=token
199            )
200
201        # Generate a session_key if necessary
202        if (
203            (self.api_key and self.api_secret)
204            and not self.session_key
205            and (self.username and self.password_hash)
206        ):
207            sk_gen = SessionKeyGenerator(self)
208            self.session_key = sk_gen.get_session_key(self.username, self.password_hash)
209
210    def __str__(self):
211        return "%s Network" % self.name
212
213    def get_artist(self, artist_name):
214        """
215        Return an Artist object
216        """
217
218        return Artist(artist_name, self)
219
220    def get_track(self, artist, title):
221        """
222        Return a Track object
223        """
224
225        return Track(artist, title, self)
226
227    def get_album(self, artist, title):
228        """
229        Return an Album object
230        """
231
232        return Album(artist, title, self)
233
234    def get_authenticated_user(self):
235        """
236        Returns the authenticated user
237        """
238
239        return AuthenticatedUser(self)
240
241    def get_country(self, country_name):
242        """
243        Returns a country object
244        """
245
246        return Country(country_name, self)
247
248    def get_user(self, username):
249        """
250        Returns a user object
251        """
252
253        return User(username, self)
254
255    def get_tag(self, name):
256        """
257        Returns a tag object
258        """
259
260        return Tag(name, self)
261
262    def _get_language_domain(self, domain_language):
263        """
264        Returns the mapped domain name of the network to a DOMAIN_* value
265        """
266
267        if domain_language in self.domain_names:
268            return self.domain_names[domain_language]
269
270    def _get_url(self, domain, url_type):
271        return "https://{}/{}".format(
272            self._get_language_domain(domain), self.urls[url_type]
273        )
274
275    def _get_ws_auth(self):
276        """
277        Returns an (API_KEY, API_SECRET, SESSION_KEY) tuple.
278        """
279        return self.api_key, self.api_secret, self.session_key
280
281    def _delay_call(self):
282        """
283        Makes sure that web service calls are at least 0.2 seconds apart.
284        """
285        now = time.time()
286
287        time_since_last = now - self.last_call_time
288
289        if time_since_last < DELAY_TIME:
290            time.sleep(DELAY_TIME - time_since_last)
291
292        self.last_call_time = now
293
294    def get_top_artists(self, limit=None, cacheable=True):
295        """Returns the most played artists as a sequence of TopItem objects."""
296
297        params = {}
298        if limit:
299            params["limit"] = limit
300
301        doc = _Request(self, "chart.getTopArtists", params).execute(cacheable)
302
303        return _extract_top_artists(doc, self)
304
305    def get_top_tracks(self, limit=None, cacheable=True):
306        """Returns the most played tracks as a sequence of TopItem objects."""
307
308        params = {}
309        if limit:
310            params["limit"] = limit
311
312        doc = _Request(self, "chart.getTopTracks", params).execute(cacheable)
313
314        seq = []
315        for node in doc.getElementsByTagName("track"):
316            title = _extract(node, "name")
317            artist = _extract(node, "name", 1)
318            track = Track(artist, title, self)
319            weight = _number(_extract(node, "playcount"))
320            seq.append(TopItem(track, weight))
321
322        return seq
323
324    def get_top_tags(self, limit=None, cacheable=True):
325        """Returns the most used tags as a sequence of TopItem objects."""
326
327        # Last.fm has no "limit" parameter for tag.getTopTags
328        # so we need to get all (250) and then limit locally
329        doc = _Request(self, "tag.getTopTags").execute(cacheable)
330
331        seq = []
332        for node in doc.getElementsByTagName("tag"):
333            if limit and len(seq) >= limit:
334                break
335            tag = Tag(_extract(node, "name"), self)
336            weight = _number(_extract(node, "count"))
337            seq.append(TopItem(tag, weight))
338
339        return seq
340
341    def get_geo_top_artists(self, country, limit=None, cacheable=True):
342        """Get the most popular artists on Last.fm by country.
343        Parameters:
344        country (Required) : A country name, as defined by the ISO 3166-1
345            country names standard.
346        limit (Optional) : The number of results to fetch per page.
347            Defaults to 50.
348        """
349        params = {"country": country}
350
351        if limit:
352            params["limit"] = limit
353
354        doc = _Request(self, "geo.getTopArtists", params).execute(cacheable)
355
356        return _extract_top_artists(doc, self)
357
358    def get_geo_top_tracks(self, country, location=None, limit=None, cacheable=True):
359        """Get the most popular tracks on Last.fm last week by country.
360        Parameters:
361        country (Required) : A country name, as defined by the ISO 3166-1
362            country names standard
363        location (Optional) : A metro name, to fetch the charts for
364            (must be within the country specified)
365        limit (Optional) : The number of results to fetch per page.
366            Defaults to 50.
367        """
368        params = {"country": country}
369
370        if location:
371            params["location"] = location
372        if limit:
373            params["limit"] = limit
374
375        doc = _Request(self, "geo.getTopTracks", params).execute(cacheable)
376
377        tracks = doc.getElementsByTagName("track")
378        seq = []
379
380        for track in tracks:
381            title = _extract(track, "name")
382            artist = _extract(track, "name", 1)
383            listeners = _extract(track, "listeners")
384
385            seq.append(TopItem(Track(artist, title, self), listeners))
386
387        return seq
388
389    def enable_proxy(self, host, port):
390        """Enable a default web proxy"""
391
392        self.proxy = [host, _number(port)]
393        self.proxy_enabled = True
394
395    def disable_proxy(self):
396        """Disable using the web proxy"""
397
398        self.proxy_enabled = False
399
400    def is_proxy_enabled(self):
401        """Returns True if a web proxy is enabled."""
402
403        return self.proxy_enabled
404
405    def _get_proxy(self):
406        """Returns proxy details."""
407
408        return self.proxy
409
410    def enable_rate_limit(self):
411        """Enables rate limiting for this network"""
412        self.limit_rate = True
413
414    def disable_rate_limit(self):
415        """Disables rate limiting for this network"""
416        self.limit_rate = False
417
418    def is_rate_limited(self):
419        """Return True if web service calls are rate limited"""
420        return self.limit_rate
421
422    def enable_caching(self, file_path=None):
423        """Enables caching request-wide for all cacheable calls.
424
425        * file_path: A file path for the backend storage file. If
426        None set, a temp file would probably be created, according the backend.
427        """
428
429        if not file_path:
430            self.cache_backend = _ShelfCacheBackend.create_shelf()
431            return
432
433        self.cache_backend = _ShelfCacheBackend(file_path)
434
435    def disable_caching(self):
436        """Disables all caching features."""
437
438        self.cache_backend = None
439
440    def is_caching_enabled(self):
441        """Returns True if caching is enabled."""
442
443        return not (self.cache_backend is None)
444
445    def _get_cache_backend(self):
446
447        return self.cache_backend
448
449    def search_for_album(self, album_name):
450        """Searches for an album by its name. Returns a AlbumSearch object.
451        Use get_next_page() to retrieve sequences of results."""
452
453        return AlbumSearch(album_name, self)
454
455    def search_for_artist(self, artist_name):
456        """Searches of an artist by its name. Returns a ArtistSearch object.
457        Use get_next_page() to retrieve sequences of results."""
458
459        return ArtistSearch(artist_name, self)
460
461    def search_for_track(self, artist_name, track_name):
462        """Searches of a track by its name and its artist. Set artist to an
463        empty string if not available.
464        Returns a TrackSearch object.
465        Use get_next_page() to retrieve sequences of results."""
466
467        return TrackSearch(artist_name, track_name, self)
468
469    def get_track_by_mbid(self, mbid):
470        """Looks up a track by its MusicBrainz ID"""
471
472        params = {"mbid": mbid}
473
474        doc = _Request(self, "track.getInfo", params).execute(True)
475
476        return Track(_extract(doc, "name", 1), _extract(doc, "name"), self)
477
478    def get_artist_by_mbid(self, mbid):
479        """Looks up an artist by its MusicBrainz ID"""
480
481        params = {"mbid": mbid}
482
483        doc = _Request(self, "artist.getInfo", params).execute(True)
484
485        return Artist(_extract(doc, "name"), self)
486
487    def get_album_by_mbid(self, mbid):
488        """Looks up an album by its MusicBrainz ID"""
489
490        params = {"mbid": mbid}
491
492        doc = _Request(self, "album.getInfo", params).execute(True)
493
494        return Album(_extract(doc, "artist"), _extract(doc, "name"), self)
495
496    def update_now_playing(
497        self,
498        artist,
499        title,
500        album=None,
501        album_artist=None,
502        duration=None,
503        track_number=None,
504        mbid=None,
505        context=None,
506    ):
507        """
508        Used to notify Last.fm that a user has started listening to a track.
509
510            Parameters:
511                artist (Required) : The artist name
512                title (Required) : The track title
513                album (Optional) : The album name.
514                album_artist (Optional) : The album artist - if this differs
515                    from the track artist.
516                duration (Optional) : The length of the track in seconds.
517                track_number (Optional) : The track number of the track on the
518                    album.
519                mbid (Optional) : The MusicBrainz Track ID.
520                context (Optional) : Sub-client version
521                    (not public, only enabled for certain API keys)
522        """
523
524        params = {"track": title, "artist": artist}
525
526        if album:
527            params["album"] = album
528        if album_artist:
529            params["albumArtist"] = album_artist
530        if context:
531            params["context"] = context
532        if track_number:
533            params["trackNumber"] = track_number
534        if mbid:
535            params["mbid"] = mbid
536        if duration:
537            params["duration"] = duration
538
539        _Request(self, "track.updateNowPlaying", params).execute()
540
541    def scrobble(
542        self,
543        artist,
544        title,
545        timestamp,
546        album=None,
547        album_artist=None,
548        track_number=None,
549        duration=None,
550        stream_id=None,
551        context=None,
552        mbid=None,
553    ):
554
555        """Used to add a track-play to a user's profile.
556
557        Parameters:
558            artist (Required) : The artist name.
559            title (Required) : The track name.
560            timestamp (Required) : The time the track started playing, in UNIX
561                timestamp format (integer number of seconds since 00:00:00,
562                January 1st 1970 UTC). This must be in the UTC time zone.
563            album (Optional) : The album name.
564            album_artist (Optional) : The album artist - if this differs from
565                the track artist.
566            context (Optional) : Sub-client version (not public, only enabled
567                for certain API keys)
568            stream_id (Optional) : The stream id for this track received from
569                the radio.getPlaylist service.
570            track_number (Optional) : The track number of the track on the
571                album.
572            mbid (Optional) : The MusicBrainz Track ID.
573            duration (Optional) : The length of the track in seconds.
574        """
575
576        return self.scrobble_many(
577            (
578                {
579                    "artist": artist,
580                    "title": title,
581                    "timestamp": timestamp,
582                    "album": album,
583                    "album_artist": album_artist,
584                    "track_number": track_number,
585                    "duration": duration,
586                    "stream_id": stream_id,
587                    "context": context,
588                    "mbid": mbid,
589                },
590            )
591        )
592
593    def scrobble_many(self, tracks):
594        """
595        Used to scrobble a batch of tracks at once. The parameter tracks is a
596        sequence of dicts per track containing the keyword arguments as if
597        passed to the scrobble() method.
598        """
599
600        tracks_to_scrobble = tracks[:50]
601        if len(tracks) > 50:
602            remaining_tracks = tracks[50:]
603        else:
604            remaining_tracks = None
605
606        params = {}
607        for i in range(len(tracks_to_scrobble)):
608
609            params["artist[%d]" % i] = tracks_to_scrobble[i]["artist"]
610            params["track[%d]" % i] = tracks_to_scrobble[i]["title"]
611
612            additional_args = (
613                "timestamp",
614                "album",
615                "album_artist",
616                "context",
617                "stream_id",
618                "track_number",
619                "mbid",
620                "duration",
621            )
622            args_map_to = {  # so friggin lazy
623                "album_artist": "albumArtist",
624                "track_number": "trackNumber",
625                "stream_id": "streamID",
626            }
627
628            for arg in additional_args:
629
630                if arg in tracks_to_scrobble[i] and tracks_to_scrobble[i][arg]:
631                    if arg in args_map_to:
632                        maps_to = args_map_to[arg]
633                    else:
634                        maps_to = arg
635
636                    params["%s[%d]" % (maps_to, i)] = tracks_to_scrobble[i][arg]
637
638        _Request(self, "track.scrobble", params).execute()
639
640        if remaining_tracks:
641            self.scrobble_many(remaining_tracks)
642
643
644class LastFMNetwork(_Network):
645
646    """A Last.fm network object
647
648    api_key: a provided API_KEY
649    api_secret: a provided API_SECRET
650    session_key: a generated session_key or None
651    username: a username of a valid user
652    password_hash: the output of pylast.md5(password) where password is the
653        user's password
654
655    if username and password_hash were provided and not session_key,
656    session_key will be generated automatically when needed.
657
658    Either a valid session_key or a combination of username and password_hash
659    must be present for scrobbling.
660
661    Most read-only webservices only require an api_key and an api_secret, see
662    about obtaining them from:
663    https://www.last.fm/api/account
664    """
665
666    def __init__(
667        self,
668        api_key="",
669        api_secret="",
670        session_key="",
671        username="",
672        password_hash="",
673        token="",
674    ):
675        super().__init__(
676            name="Last.fm",
677            homepage="https://www.last.fm",
678            ws_server=("ws.audioscrobbler.com", "/2.0/"),
679            api_key=api_key,
680            api_secret=api_secret,
681            session_key=session_key,
682            username=username,
683            password_hash=password_hash,
684            token=token,
685            domain_names={
686                DOMAIN_ENGLISH: "www.last.fm",
687                DOMAIN_GERMAN: "www.last.fm/de",
688                DOMAIN_SPANISH: "www.last.fm/es",
689                DOMAIN_FRENCH: "www.last.fm/fr",
690                DOMAIN_ITALIAN: "www.last.fm/it",
691                DOMAIN_POLISH: "www.last.fm/pl",
692                DOMAIN_PORTUGUESE: "www.last.fm/pt",
693                DOMAIN_SWEDISH: "www.last.fm/sv",
694                DOMAIN_TURKISH: "www.last.fm/tr",
695                DOMAIN_RUSSIAN: "www.last.fm/ru",
696                DOMAIN_JAPANESE: "www.last.fm/ja",
697                DOMAIN_CHINESE: "www.last.fm/zh",
698            },
699            urls={
700                "album": "music/%(artist)s/%(album)s",
701                "artist": "music/%(artist)s",
702                "country": "place/%(country_name)s",
703                "tag": "tag/%(name)s",
704                "track": "music/%(artist)s/_/%(title)s",
705                "user": "user/%(name)s",
706            },
707        )
708
709    def __repr__(self):
710        return "pylast.LastFMNetwork(%s)" % (
711            ", ".join(
712                (
713                    "'%s'" % self.api_key,
714                    "'%s'" % self.api_secret,
715                    "'%s'" % self.session_key,
716                    "'%s'" % self.username,
717                    "'%s'" % self.password_hash,
718                )
719            )
720        )
721
722
723class LibreFMNetwork(_Network):
724    """
725    A preconfigured _Network object for Libre.fm
726
727    api_key: a provided API_KEY
728    api_secret: a provided API_SECRET
729    session_key: a generated session_key or None
730    username: a username of a valid user
731    password_hash: the output of pylast.md5(password) where password is the
732        user's password
733
734    if username and password_hash were provided and not session_key,
735    session_key will be generated automatically when needed.
736    """
737
738    def __init__(
739        self, api_key="", api_secret="", session_key="", username="", password_hash=""
740    ):
741
742        super().__init__(
743            name="Libre.fm",
744            homepage="https://libre.fm",
745            ws_server=("libre.fm", "/2.0/"),
746            api_key=api_key,
747            api_secret=api_secret,
748            session_key=session_key,
749            username=username,
750            password_hash=password_hash,
751            domain_names={
752                DOMAIN_ENGLISH: "libre.fm",
753                DOMAIN_GERMAN: "libre.fm",
754                DOMAIN_SPANISH: "libre.fm",
755                DOMAIN_FRENCH: "libre.fm",
756                DOMAIN_ITALIAN: "libre.fm",
757                DOMAIN_POLISH: "libre.fm",
758                DOMAIN_PORTUGUESE: "libre.fm",
759                DOMAIN_SWEDISH: "libre.fm",
760                DOMAIN_TURKISH: "libre.fm",
761                DOMAIN_RUSSIAN: "libre.fm",
762                DOMAIN_JAPANESE: "libre.fm",
763                DOMAIN_CHINESE: "libre.fm",
764            },
765            urls={
766                "album": "artist/%(artist)s/album/%(album)s",
767                "artist": "artist/%(artist)s",
768                "country": "place/%(country_name)s",
769                "tag": "tag/%(name)s",
770                "track": "music/%(artist)s/_/%(title)s",
771                "user": "user/%(name)s",
772            },
773        )
774
775    def __repr__(self):
776        return "pylast.LibreFMNetwork(%s)" % (
777            ", ".join(
778                (
779                    "'%s'" % self.api_key,
780                    "'%s'" % self.api_secret,
781                    "'%s'" % self.session_key,
782                    "'%s'" % self.username,
783                    "'%s'" % self.password_hash,
784                )
785            )
786        )
787
788
789class _ShelfCacheBackend:
790    """Used as a backend for caching cacheable requests."""
791
792    def __init__(self, file_path=None, flag=None):
793        if flag is not None:
794            self.shelf = shelve.open(file_path, flag=flag)
795        else:
796            self.shelf = shelve.open(file_path)
797        self.cache_keys = set(self.shelf.keys())
798
799    def __contains__(self, key):
800        return key in self.cache_keys
801
802    def __iter__(self):
803        return iter(self.shelf.keys())
804
805    def get_xml(self, key):
806        return self.shelf[key]
807
808    def set_xml(self, key, xml_string):
809        self.cache_keys.add(key)
810        self.shelf[key] = xml_string
811
812    @classmethod
813    def create_shelf(cls):
814        file_descriptor, file_path = tempfile.mkstemp(prefix="pylast_tmp_")
815        os.close(file_descriptor)
816        return cls(file_path=file_path, flag="n")
817
818
819class _Request:
820    """Representing an abstract web service operation."""
821
822    def __init__(self, network, method_name, params=None):
823        logger.debug(method_name)
824
825        if params is None:
826            params = {}
827
828        self.network = network
829        self.params = {}
830
831        for key in params:
832            self.params[key] = _unicode(params[key])
833
834        (self.api_key, self.api_secret, self.session_key) = network._get_ws_auth()
835
836        self.params["api_key"] = self.api_key
837        self.params["method"] = method_name
838
839        if network.is_caching_enabled():
840            self.cache = network._get_cache_backend()
841
842        if self.session_key:
843            self.params["sk"] = self.session_key
844            self.sign_it()
845
846    def sign_it(self):
847        """Sign this request."""
848
849        if "api_sig" not in self.params.keys():
850            self.params["api_sig"] = self._get_signature()
851
852    def _get_signature(self):
853        """
854        Returns a 32-character hexadecimal md5 hash of the signature string.
855        """
856
857        keys = list(self.params.keys())
858
859        keys.sort()
860
861        string = ""
862
863        for name in keys:
864            string += name
865            string += self.params[name]
866
867        string += self.api_secret
868
869        return md5(string)
870
871    def _get_cache_key(self):
872        """
873        The cache key is a string of concatenated sorted names and values.
874        """
875
876        keys = list(self.params.keys())
877        keys.sort()
878
879        cache_key = ""
880
881        for key in keys:
882            if key != "api_sig" and key != "api_key" and key != "sk":
883                cache_key += key + self.params[key]
884
885        return hashlib.sha1(cache_key.encode("utf-8")).hexdigest()
886
887    def _get_cached_response(self):
888        """Returns a file object of the cached response."""
889
890        if not self._is_cached():
891            response = self._download_response()
892            self.cache.set_xml(self._get_cache_key(), response)
893
894        return self.cache.get_xml(self._get_cache_key())
895
896    def _is_cached(self):
897        """Returns True if the request is already in cache."""
898
899        return self._get_cache_key() in self.cache
900
901    def _download_response(self):
902        """Returns a response body string from the server."""
903
904        if self.network.limit_rate:
905            self.network._delay_call()
906
907        username = self.params.pop("username", None)
908        username = f"?username={username}" if username is not None else ""
909
910        data = []
911        for name in self.params.keys():
912            data.append("=".join((name, quote_plus(_string(self.params[name])))))
913        data = "&".join(data)
914
915        headers = {
916            "Content-type": "application/x-www-form-urlencoded",
917            "Accept-Charset": "utf-8",
918            "User-Agent": "pylast/" + __version__,
919        }
920
921        (host_name, host_subdir) = self.network.ws_server
922
923        if self.network.is_proxy_enabled():
924            conn = HTTPSConnection(
925                context=SSL_CONTEXT,
926                host=self.network._get_proxy()[0],
927                port=self.network._get_proxy()[1],
928            )
929
930            try:
931                conn.request(
932                    method="POST",
933                    url=f"https://{host_name}{host_subdir}{username}",
934                    body=data,
935                    headers=headers,
936                )
937            except Exception as e:
938                raise NetworkError(self.network, e) from e
939
940        else:
941            conn = HTTPSConnection(context=SSL_CONTEXT, host=host_name)
942
943            try:
944                conn.request(
945                    method="POST",
946                    url=f"{host_subdir}{username}",
947                    body=data,
948                    headers=headers,
949                )
950            except Exception as e:
951                raise NetworkError(self.network, e) from e
952
953        try:
954            response = conn.getresponse()
955            if response.status in [500, 502, 503, 504]:
956                raise WSError(
957                    self.network,
958                    response.status,
959                    "Connection to the API failed with HTTP code "
960                    + str(response.status),
961                )
962            response_text = _unicode(response.read())
963        except Exception as e:
964            raise MalformedResponseError(self.network, e) from e
965
966        try:
967            self._check_response_for_errors(response_text)
968        finally:
969            conn.close()
970        return response_text
971
972    def execute(self, cacheable=False):
973        """Returns the XML DOM response of the POST Request from the server"""
974
975        if self.network.is_caching_enabled() and cacheable:
976            response = self._get_cached_response()
977        else:
978            response = self._download_response()
979
980        return minidom.parseString(_string(response).replace("opensearch:", ""))
981
982    def _check_response_for_errors(self, response):
983        """Checks the response for errors and raises one if any exists."""
984
985        try:
986            doc = minidom.parseString(_string(response).replace("opensearch:", ""))
987        except Exception as e:
988            raise MalformedResponseError(self.network, e) from e
989
990        e = doc.getElementsByTagName("lfm")[0]
991        # logger.debug(doc.toprettyxml())
992
993        if e.getAttribute("status") != "ok":
994            e = doc.getElementsByTagName("error")[0]
995            status = e.getAttribute("code")
996            details = e.firstChild.data.strip()
997            raise WSError(self.network, status, details)
998
999
1000class SessionKeyGenerator:
1001    """Methods of generating a session key:
1002    1) Web Authentication:
1003        a. network = get_*_network(API_KEY, API_SECRET)
1004        b. sg = SessionKeyGenerator(network)
1005        c. url = sg.get_web_auth_url()
1006        d. Ask the user to open the URL and authorize you, and wait for it.
1007        e. session_key = sg.get_web_auth_session_key(url)
1008    2) Username and Password Authentication:
1009        a. network = get_*_network(API_KEY, API_SECRET)
1010        b. username = raw_input("Please enter your username: ")
1011        c. password_hash = pylast.md5(raw_input("Please enter your password: ")
1012        d. session_key = SessionKeyGenerator(network).get_session_key(username,
1013            password_hash)
1014
1015    A session key's lifetime is infinite, unless the user revokes the rights
1016    of the given API Key.
1017
1018    If you create a Network object with just a API_KEY and API_SECRET and a
1019    username and a password_hash, a SESSION_KEY will be automatically generated
1020    for that network and stored in it so you don't have to do this manually,
1021    unless you want to.
1022    """
1023
1024    def __init__(self, network):
1025        self.network = network
1026        self.web_auth_tokens = {}
1027
1028    def _get_web_auth_token(self):
1029        """
1030        Retrieves a token from the network for web authentication.
1031        The token then has to be authorized from getAuthURL before creating
1032        session.
1033        """
1034
1035        request = _Request(self.network, "auth.getToken")
1036
1037        # default action is that a request is signed only when
1038        # a session key is provided.
1039        request.sign_it()
1040
1041        doc = request.execute()
1042
1043        e = doc.getElementsByTagName("token")[0]
1044        return e.firstChild.data
1045
1046    def get_web_auth_url(self):
1047        """
1048        The user must open this page, and you first, then
1049        call get_web_auth_session_key(url) after that.
1050        """
1051
1052        token = self._get_web_auth_token()
1053
1054        url = "{homepage}/api/auth/?api_key={api}&token={token}".format(
1055            homepage=self.network.homepage, api=self.network.api_key, token=token
1056        )
1057
1058        self.web_auth_tokens[url] = token
1059
1060        return url
1061
1062    def get_web_auth_session_key_username(self, url, token=""):
1063        """
1064        Retrieves the session key/username of a web authorization process by its URL.
1065        """
1066
1067        if url in self.web_auth_tokens.keys():
1068            token = self.web_auth_tokens[url]
1069
1070        request = _Request(self.network, "auth.getSession", {"token": token})
1071
1072        # default action is that a request is signed only when
1073        # a session key is provided.
1074        request.sign_it()
1075
1076        doc = request.execute()
1077
1078        session_key = doc.getElementsByTagName("key")[0].firstChild.data
1079        username = doc.getElementsByTagName("name")[0].firstChild.data
1080        return session_key, username
1081
1082    def get_web_auth_session_key(self, url, token=""):
1083        """
1084        Retrieves the session key of a web authorization process by its URL.
1085        """
1086        session_key, _username = self.get_web_auth_session_key_username(url, token)
1087        return session_key
1088
1089    def get_session_key(self, username, password_hash):
1090        """
1091        Retrieve a session key with a username and a md5 hash of the user's
1092        password.
1093        """
1094
1095        params = {"username": username, "authToken": md5(username + password_hash)}
1096        request = _Request(self.network, "auth.getMobileSession", params)
1097
1098        # default action is that a request is signed only when
1099        # a session key is provided.
1100        request.sign_it()
1101
1102        doc = request.execute()
1103
1104        return _extract(doc, "key")
1105
1106
1107TopItem = collections.namedtuple("TopItem", ["item", "weight"])
1108SimilarItem = collections.namedtuple("SimilarItem", ["item", "match"])
1109LibraryItem = collections.namedtuple("LibraryItem", ["item", "playcount", "tagcount"])
1110PlayedTrack = collections.namedtuple(
1111    "PlayedTrack", ["track", "album", "playback_date", "timestamp"]
1112)
1113LovedTrack = collections.namedtuple("LovedTrack", ["track", "date", "timestamp"])
1114ImageSizes = collections.namedtuple(
1115    "ImageSizes", ["original", "large", "largesquare", "medium", "small", "extralarge"]
1116)
1117Image = collections.namedtuple(
1118    "Image", ["title", "url", "dateadded", "format", "owner", "sizes", "votes"]
1119)
1120
1121
1122def _string_output(func):
1123    def r(*args):
1124        return _string(func(*args))
1125
1126    return r
1127
1128
1129class _BaseObject:
1130    """An abstract webservices object."""
1131
1132    network = None
1133
1134    def __init__(self, network, ws_prefix):
1135        self.network = network
1136        self.ws_prefix = ws_prefix
1137
1138    def _request(self, method_name, cacheable=False, params=None):
1139        if not params:
1140            params = self._get_params()
1141
1142        return _Request(self.network, method_name, params).execute(cacheable)
1143
1144    def _get_params(self):
1145        """Returns the most common set of parameters between all objects."""
1146
1147        return {}
1148
1149    def __hash__(self):
1150        # Convert any ints (or whatever) into strings
1151        values = map(str, self._get_params().values())
1152
1153        return hash(self.network) + hash(
1154            str(type(self))
1155            + "".join(list(self._get_params().keys()) + list(values)).lower()
1156        )
1157
1158    def _extract_cdata_from_request(self, method_name, tag_name, params):
1159        doc = self._request(method_name, True, params)
1160
1161        first_child = doc.getElementsByTagName(tag_name)[0].firstChild
1162
1163        if first_child is None:
1164            return None
1165
1166        return first_child.wholeText.strip()
1167
1168    def _get_things(
1169        self, method, thing_type, params=None, cacheable=True, stream=False
1170    ):
1171        """Returns a list of the most played thing_types by this thing."""
1172
1173        def _stream_get_things():
1174            limit = params.get("limit", 50)
1175            nodes = _collect_nodes(
1176                limit,
1177                self,
1178                self.ws_prefix + "." + method,
1179                cacheable,
1180                params,
1181                stream=stream,
1182            )
1183            for node in nodes:
1184                title = _extract(node, "name")
1185                artist = _extract(node, "name", 1)
1186                playcount = _number(_extract(node, "playcount"))
1187
1188                yield TopItem(thing_type(artist, title, self.network), playcount)
1189
1190        return _stream_get_things() if stream else list(_stream_get_things())
1191
1192    def get_wiki_published_date(self):
1193        """
1194        Returns the summary of the wiki.
1195        Only for Album/Track.
1196        """
1197        return self.get_wiki("published")
1198
1199    def get_wiki_summary(self):
1200        """
1201        Returns the summary of the wiki.
1202        Only for Album/Track.
1203        """
1204        return self.get_wiki("summary")
1205
1206    def get_wiki_content(self):
1207        """
1208        Returns the summary of the wiki.
1209        Only for Album/Track.
1210        """
1211        return self.get_wiki("content")
1212
1213    def get_wiki(self, section):
1214        """
1215        Returns a section of the wiki.
1216        Only for Album/Track.
1217        section can be "content", "summary" or
1218            "published" (for published date)
1219        """
1220
1221        doc = self._request(self.ws_prefix + ".getInfo", True)
1222
1223        if len(doc.getElementsByTagName("wiki")) == 0:
1224            return
1225
1226        node = doc.getElementsByTagName("wiki")[0]
1227
1228        return _extract(node, section)
1229
1230
1231class _Chartable(_BaseObject):
1232    """Common functions for classes with charts."""
1233
1234    def __init__(self, network, ws_prefix):
1235        super().__init__(network=network, ws_prefix=ws_prefix)
1236
1237    def get_weekly_chart_dates(self):
1238        """Returns a list of From and To tuples for the available charts."""
1239
1240        doc = self._request(self.ws_prefix + ".getWeeklyChartList", True)
1241
1242        seq = []
1243        for node in doc.getElementsByTagName("chart"):
1244            seq.append((node.getAttribute("from"), node.getAttribute("to")))
1245
1246        return seq
1247
1248    def get_weekly_album_charts(self, from_date=None, to_date=None):
1249        """
1250        Returns the weekly album charts for the week starting from the
1251        from_date value to the to_date value.
1252        Only for User.
1253        """
1254        return self.get_weekly_charts("album", from_date, to_date)
1255
1256    def get_weekly_artist_charts(self, from_date=None, to_date=None):
1257        """
1258        Returns the weekly artist charts for the week starting from the
1259        from_date value to the to_date value.
1260        Only for User.
1261        """
1262        return self.get_weekly_charts("artist", from_date, to_date)
1263
1264    def get_weekly_track_charts(self, from_date=None, to_date=None):
1265        """
1266        Returns the weekly track charts for the week starting from the
1267        from_date value to the to_date value.
1268        Only for User.
1269        """
1270        return self.get_weekly_charts("track", from_date, to_date)
1271
1272    def get_weekly_charts(self, chart_kind, from_date=None, to_date=None):
1273        """
1274        Returns the weekly charts for the week starting from the
1275        from_date value to the to_date value.
1276        chart_kind should be one of "album", "artist" or "track"
1277        """
1278        method = ".getWeekly" + chart_kind.title() + "Chart"
1279        chart_type = eval(chart_kind.title())  # string to type
1280
1281        params = self._get_params()
1282        if from_date and to_date:
1283            params["from"] = from_date
1284            params["to"] = to_date
1285
1286        doc = self._request(self.ws_prefix + method, True, params)
1287
1288        seq = []
1289        for node in doc.getElementsByTagName(chart_kind.lower()):
1290            if chart_kind == "artist":
1291                item = chart_type(_extract(node, "name"), self.network)
1292            else:
1293                item = chart_type(
1294                    _extract(node, "artist"), _extract(node, "name"), self.network
1295                )
1296            weight = _number(_extract(node, "playcount"))
1297            seq.append(TopItem(item, weight))
1298
1299        return seq
1300
1301
1302class _Taggable(_BaseObject):
1303    """Common functions for classes with tags."""
1304
1305    def __init__(self, network, ws_prefix):
1306        super().__init__(network=network, ws_prefix=ws_prefix)
1307
1308    def add_tags(self, tags):
1309        """Adds one or several tags.
1310        * tags: A sequence of tag names or Tag objects.
1311        """
1312
1313        for tag in tags:
1314            self.add_tag(tag)
1315
1316    def add_tag(self, tag):
1317        """Adds one tag.
1318        * tag: a tag name or a Tag object.
1319        """
1320
1321        if isinstance(tag, Tag):
1322            tag = tag.get_name()
1323
1324        params = self._get_params()
1325        params["tags"] = tag
1326
1327        self._request(self.ws_prefix + ".addTags", False, params)
1328
1329    def remove_tag(self, tag):
1330        """Remove a user's tag from this object."""
1331
1332        if isinstance(tag, Tag):
1333            tag = tag.get_name()
1334
1335        params = self._get_params()
1336        params["tag"] = tag
1337
1338        self._request(self.ws_prefix + ".removeTag", False, params)
1339
1340    def get_tags(self):
1341        """Returns a list of the tags set by the user to this object."""
1342
1343        # Uncacheable because it can be dynamically changed by the user.
1344        params = self._get_params()
1345
1346        doc = self._request(self.ws_prefix + ".getTags", False, params)
1347        tag_names = _extract_all(doc, "name")
1348        tags = []
1349        for tag in tag_names:
1350            tags.append(Tag(tag, self.network))
1351
1352        return tags
1353
1354    def remove_tags(self, tags):
1355        """Removes one or several tags from this object.
1356        * tags: a sequence of tag names or Tag objects.
1357        """
1358
1359        for tag in tags:
1360            self.remove_tag(tag)
1361
1362    def clear_tags(self):
1363        """Clears all the user-set tags."""
1364
1365        self.remove_tags(*(self.get_tags()))
1366
1367    def set_tags(self, tags):
1368        """Sets this object's tags to only those tags.
1369        * tags: a sequence of tag names or Tag objects.
1370        """
1371
1372        c_old_tags = []
1373        old_tags = []
1374        c_new_tags = []
1375        new_tags = []
1376
1377        to_remove = []
1378        to_add = []
1379
1380        tags_on_server = self.get_tags()
1381
1382        for tag in tags_on_server:
1383            c_old_tags.append(tag.get_name().lower())
1384            old_tags.append(tag.get_name())
1385
1386        for tag in tags:
1387            c_new_tags.append(tag.lower())
1388            new_tags.append(tag)
1389
1390        for i in range(0, len(old_tags)):
1391            if not c_old_tags[i] in c_new_tags:
1392                to_remove.append(old_tags[i])
1393
1394        for i in range(0, len(new_tags)):
1395            if not c_new_tags[i] in c_old_tags:
1396                to_add.append(new_tags[i])
1397
1398        self.remove_tags(to_remove)
1399        self.add_tags(to_add)
1400
1401    def get_top_tags(self, limit=None):
1402        """Returns a list of the most frequently used Tags on this object."""
1403
1404        doc = self._request(self.ws_prefix + ".getTopTags", True)
1405
1406        elements = doc.getElementsByTagName("tag")
1407        seq = []
1408
1409        for element in elements:
1410            tag_name = _extract(element, "name")
1411            tag_count = _extract(element, "count")
1412
1413            seq.append(TopItem(Tag(tag_name, self.network), tag_count))
1414
1415        if limit:
1416            seq = seq[:limit]
1417
1418        return seq
1419
1420
1421class PyLastError(Exception):
1422    """Generic exception raised by PyLast"""
1423
1424    pass
1425
1426
1427class WSError(PyLastError):
1428    """Exception related to the Network web service"""
1429
1430    def __init__(self, network, status, details):
1431        self.status = status
1432        self.details = details
1433        self.network = network
1434
1435    @_string_output
1436    def __str__(self):
1437        return self.details
1438
1439    def get_id(self):
1440        """Returns the exception ID, from one of the following:
1441        STATUS_INVALID_SERVICE = 2
1442        STATUS_INVALID_METHOD = 3
1443        STATUS_AUTH_FAILED = 4
1444        STATUS_INVALID_FORMAT = 5
1445        STATUS_INVALID_PARAMS = 6
1446        STATUS_INVALID_RESOURCE = 7
1447        STATUS_OPERATION_FAILED = 8
1448        STATUS_INVALID_SK = 9
1449        STATUS_INVALID_API_KEY = 10
1450        STATUS_OFFLINE = 11
1451        STATUS_SUBSCRIBERS_ONLY = 12
1452        STATUS_TOKEN_UNAUTHORIZED = 14
1453        STATUS_TOKEN_EXPIRED = 15
1454        STATUS_TEMPORARILY_UNAVAILABLE = 16
1455        STATUS_LOGIN_REQUIRED = 17
1456        STATUS_TRIAL_EXPIRED = 18
1457        STATUS_NOT_ENOUGH_CONTENT = 20
1458        STATUS_NOT_ENOUGH_MEMBERS  = 21
1459        STATUS_NOT_ENOUGH_FANS = 22
1460        STATUS_NOT_ENOUGH_NEIGHBOURS = 23
1461        STATUS_NO_PEAK_RADIO = 24
1462        STATUS_RADIO_NOT_FOUND = 25
1463        STATUS_API_KEY_SUSPENDED = 26
1464        STATUS_DEPRECATED = 27
1465        STATUS_RATE_LIMIT_EXCEEDED = 29
1466        """
1467
1468        return self.status
1469
1470
1471class MalformedResponseError(PyLastError):
1472    """Exception conveying a malformed response from the music network."""
1473
1474    def __init__(self, network, underlying_error):
1475        self.network = network
1476        self.underlying_error = underlying_error
1477
1478    def __str__(self):
1479        return "Malformed response from {}. Underlying error: {}".format(
1480            self.network.name, str(self.underlying_error)
1481        )
1482
1483
1484class NetworkError(PyLastError):
1485    """Exception conveying a problem in sending a request to Last.fm"""
1486
1487    def __init__(self, network, underlying_error):
1488        self.network = network
1489        self.underlying_error = underlying_error
1490
1491    def __str__(self):
1492        return "NetworkError: %s" % str(self.underlying_error)
1493
1494
1495class _Opus(_Taggable):
1496    """An album or track."""
1497
1498    artist = None
1499    title = None
1500    username = None
1501
1502    __hash__ = _BaseObject.__hash__
1503
1504    def __init__(self, artist, title, network, ws_prefix, username=None, info=None):
1505        """
1506        Create an opus instance.
1507        # Parameters:
1508            * artist: An artist name or an Artist object.
1509            * title: The album or track title.
1510            * ws_prefix: 'album' or 'track'
1511        """
1512
1513        if info is None:
1514            info = {}
1515
1516        super().__init__(network=network, ws_prefix=ws_prefix)
1517
1518        if isinstance(artist, Artist):
1519            self.artist = artist
1520        else:
1521            self.artist = Artist(artist, self.network)
1522
1523        self.title = title
1524        self.username = (
1525            username if username else network.username
1526        )  # Default to current user
1527        self.info = info
1528
1529    def __repr__(self):
1530        return "pylast.{}({}, {}, {})".format(
1531            self.ws_prefix.title(),
1532            repr(self.artist.name),
1533            repr(self.title),
1534            repr(self.network),
1535        )
1536
1537    @_string_output
1538    def __str__(self):
1539        return _unicode("%s - %s") % (self.get_artist().get_name(), self.get_title())
1540
1541    def __eq__(self, other):
1542        if type(self) != type(other):
1543            return False
1544        a = self.get_title().lower()
1545        b = other.get_title().lower()
1546        c = self.get_artist().get_name().lower()
1547        d = other.get_artist().get_name().lower()
1548        return (a == b) and (c == d)
1549
1550    def __ne__(self, other):
1551        return not self == other
1552
1553    def _get_params(self):
1554        return {
1555            "artist": self.get_artist().get_name(),
1556            self.ws_prefix: self.get_title(),
1557        }
1558
1559    def get_artist(self):
1560        """Returns the associated Artist object."""
1561
1562        return self.artist
1563
1564    def get_cover_image(self, size=SIZE_EXTRA_LARGE):
1565        """
1566        Returns a URI to the cover image
1567        size can be one of:
1568            SIZE_EXTRA_LARGE
1569            SIZE_LARGE
1570            SIZE_MEDIUM
1571            SIZE_SMALL
1572        """
1573        if "image" not in self.info:
1574            self.info["image"] = _extract_all(
1575                self._request(self.ws_prefix + ".getInfo", cacheable=True), "image"
1576            )
1577        return self.info["image"][size]
1578
1579    def get_title(self, properly_capitalized=False):
1580        """Returns the artist or track title."""
1581        if properly_capitalized:
1582            self.title = _extract(
1583                self._request(self.ws_prefix + ".getInfo", True), "name"
1584            )
1585
1586        return self.title
1587
1588    def get_name(self, properly_capitalized=False):
1589        """Returns the album or track title (alias to get_title())."""
1590
1591        return self.get_title(properly_capitalized)
1592
1593    def get_playcount(self):
1594        """Returns the number of plays on the network"""
1595
1596        return _number(
1597            _extract(
1598                self._request(self.ws_prefix + ".getInfo", cacheable=True), "playcount"
1599            )
1600        )
1601
1602    def get_userplaycount(self):
1603        """Returns the number of plays by a given username"""
1604
1605        if not self.username:
1606            return
1607
1608        params = self._get_params()
1609        params["username"] = self.username
1610
1611        doc = self._request(self.ws_prefix + ".getInfo", True, params)
1612        return _number(_extract(doc, "userplaycount"))
1613
1614    def get_listener_count(self):
1615        """Returns the number of listeners on the network"""
1616
1617        return _number(
1618            _extract(
1619                self._request(self.ws_prefix + ".getInfo", cacheable=True), "listeners"
1620            )
1621        )
1622
1623    def get_mbid(self):
1624        """Returns the MusicBrainz ID of the album or track."""
1625
1626        doc = self._request(self.ws_prefix + ".getInfo", cacheable=True)
1627
1628        try:
1629            lfm = doc.getElementsByTagName("lfm")[0]
1630            opus = next(self._get_children_by_tag_name(lfm, self.ws_prefix))
1631            mbid = next(self._get_children_by_tag_name(opus, "mbid"))
1632            return mbid.firstChild.nodeValue
1633        except StopIteration:
1634            return None
1635
1636    def _get_children_by_tag_name(self, node, tag_name):
1637        for child in node.childNodes:
1638            if child.nodeType == child.ELEMENT_NODE and (
1639                tag_name == "*" or child.tagName == tag_name
1640            ):
1641                yield child
1642
1643
1644class Album(_Opus):
1645    """An album."""
1646
1647    __hash__ = _Opus.__hash__
1648
1649    def __init__(self, artist, title, network, username=None, info=None):
1650        super().__init__(artist, title, network, "album", username, info)
1651
1652    def get_tracks(self):
1653        """Returns the list of Tracks on this album."""
1654
1655        return _extract_tracks(
1656            self._request(self.ws_prefix + ".getInfo", cacheable=True), self.network
1657        )
1658
1659    def get_url(self, domain_name=DOMAIN_ENGLISH):
1660        """Returns the URL of the album or track page on the network.
1661        # Parameters:
1662        * domain_name str: The network's language domain. Possible values:
1663            o DOMAIN_ENGLISH
1664            o DOMAIN_GERMAN
1665            o DOMAIN_SPANISH
1666            o DOMAIN_FRENCH
1667            o DOMAIN_ITALIAN
1668            o DOMAIN_POLISH
1669            o DOMAIN_PORTUGUESE
1670            o DOMAIN_SWEDISH
1671            o DOMAIN_TURKISH
1672            o DOMAIN_RUSSIAN
1673            o DOMAIN_JAPANESE
1674            o DOMAIN_CHINESE
1675        """
1676
1677        artist = _url_safe(self.get_artist().get_name())
1678        title = _url_safe(self.get_title())
1679
1680        return self.network._get_url(domain_name, self.ws_prefix) % {
1681            "artist": artist,
1682            "album": title,
1683        }
1684
1685
1686class Artist(_Taggable):
1687    """An artist."""
1688
1689    name = None
1690    username = None
1691
1692    __hash__ = _BaseObject.__hash__
1693
1694    def __init__(self, name, network, username=None, info=None):
1695        """Create an artist object.
1696        # Parameters:
1697            * name str: The artist's name.
1698        """
1699
1700        if info is None:
1701            info = {}
1702
1703        super().__init__(network=network, ws_prefix="artist")
1704
1705        self.name = name
1706        self.username = username
1707        self.info = info
1708
1709    def __repr__(self):
1710        return f"pylast.Artist({repr(self.get_name())}, {repr(self.network)})"
1711
1712    def __unicode__(self):
1713        return str(self.get_name())
1714
1715    @_string_output
1716    def __str__(self):
1717        return self.__unicode__()
1718
1719    def __eq__(self, other):
1720        if type(self) is type(other):
1721            return self.get_name().lower() == other.get_name().lower()
1722        else:
1723            return False
1724
1725    def __ne__(self, other):
1726        return not self == other
1727
1728    def _get_params(self):
1729        return {self.ws_prefix: self.get_name()}
1730
1731    def get_name(self, properly_capitalized=False):
1732        """Returns the name of the artist.
1733        If properly_capitalized was asserted then the name would be downloaded
1734        overwriting the given one."""
1735
1736        if properly_capitalized:
1737            self.name = _extract(
1738                self._request(self.ws_prefix + ".getInfo", True), "name"
1739            )
1740
1741        return self.name
1742
1743    def get_correction(self):
1744        """Returns the corrected artist name."""
1745
1746        return _extract(self._request(self.ws_prefix + ".getCorrection"), "name")
1747
1748    def get_playcount(self):
1749        """Returns the number of plays on the network."""
1750
1751        return _number(
1752            _extract(self._request(self.ws_prefix + ".getInfo", True), "playcount")
1753        )
1754
1755    def get_userplaycount(self):
1756        """Returns the number of plays by a given username"""
1757
1758        if not self.username:
1759            return
1760
1761        params = self._get_params()
1762        params["username"] = self.username
1763
1764        doc = self._request(self.ws_prefix + ".getInfo", True, params)
1765        return _number(_extract(doc, "userplaycount"))
1766
1767    def get_mbid(self):
1768        """Returns the MusicBrainz ID of this artist."""
1769
1770        doc = self._request(self.ws_prefix + ".getInfo", True)
1771
1772        return _extract(doc, "mbid")
1773
1774    def get_listener_count(self):
1775        """Returns the number of listeners on the network."""
1776
1777        if hasattr(self, "listener_count"):
1778            return self.listener_count
1779        else:
1780            self.listener_count = _number(
1781                _extract(self._request(self.ws_prefix + ".getInfo", True), "listeners")
1782            )
1783            return self.listener_count
1784
1785    def is_streamable(self):
1786        """Returns True if the artist is streamable."""
1787
1788        return bool(
1789            _number(
1790                _extract(self._request(self.ws_prefix + ".getInfo", True), "streamable")
1791            )
1792        )
1793
1794    def get_bio(self, section, language=None):
1795        """
1796        Returns a section of the bio.
1797        section can be "content", "summary" or
1798            "published" (for published date)
1799        """
1800        if language:
1801            params = self._get_params()
1802            params["lang"] = language
1803        else:
1804            params = None
1805
1806        try:
1807            bio = self._extract_cdata_from_request(
1808                self.ws_prefix + ".getInfo", section, params
1809            )
1810        except IndexError:
1811            bio = None
1812
1813        return bio
1814
1815    def get_bio_published_date(self):
1816        """Returns the date on which the artist's biography was published."""
1817        return self.get_bio("published")
1818
1819    def get_bio_summary(self, language=None):
1820        """Returns the summary of the artist's biography."""
1821        return self.get_bio("summary", language)
1822
1823    def get_bio_content(self, language=None):
1824        """Returns the content of the artist's biography."""
1825        return self.get_bio("content", language)
1826
1827    def get_similar(self, limit=None):
1828        """Returns the similar artists on the network."""
1829
1830        params = self._get_params()
1831        if limit:
1832            params["limit"] = limit
1833
1834        doc = self._request(self.ws_prefix + ".getSimilar", True, params)
1835
1836        names = _extract_all(doc, "name")
1837        matches = _extract_all(doc, "match")
1838
1839        artists = []
1840        for i in range(0, len(names)):
1841            artists.append(
1842                SimilarItem(Artist(names[i], self.network), _number(matches[i]))
1843            )
1844
1845        return artists
1846
1847    def get_top_albums(self, limit=None, cacheable=True, stream=False):
1848        """Returns a list of the top albums."""
1849        params = self._get_params()
1850        if limit:
1851            params["limit"] = limit
1852
1853        return self._get_things("getTopAlbums", Album, params, cacheable, stream=stream)
1854
1855    def get_top_tracks(self, limit=None, cacheable=True, stream=False):
1856        """Returns a list of the most played Tracks by this artist."""
1857        params = self._get_params()
1858        if limit:
1859            params["limit"] = limit
1860
1861        return self._get_things("getTopTracks", Track, params, cacheable, stream=stream)
1862
1863    def get_url(self, domain_name=DOMAIN_ENGLISH):
1864        """Returns the URL of the artist page on the network.
1865        # Parameters:
1866        * domain_name: The network's language domain. Possible values:
1867          o DOMAIN_ENGLISH
1868          o DOMAIN_GERMAN
1869          o DOMAIN_SPANISH
1870          o DOMAIN_FRENCH
1871          o DOMAIN_ITALIAN
1872          o DOMAIN_POLISH
1873          o DOMAIN_PORTUGUESE
1874          o DOMAIN_SWEDISH
1875          o DOMAIN_TURKISH
1876          o DOMAIN_RUSSIAN
1877          o DOMAIN_JAPANESE
1878          o DOMAIN_CHINESE
1879        """
1880
1881        artist = _url_safe(self.get_name())
1882
1883        return self.network._get_url(domain_name, "artist") % {"artist": artist}
1884
1885
1886class Country(_BaseObject):
1887    """A country at Last.fm."""
1888
1889    name = None
1890
1891    __hash__ = _BaseObject.__hash__
1892
1893    def __init__(self, name, network):
1894        super().__init__(network=network, ws_prefix="geo")
1895
1896        self.name = name
1897
1898    def __repr__(self):
1899        return f"pylast.Country({repr(self.name)}, {repr(self.network)})"
1900
1901    @_string_output
1902    def __str__(self):
1903        return self.get_name()
1904
1905    def __eq__(self, other):
1906        return self.get_name().lower() == other.get_name().lower()
1907
1908    def __ne__(self, other):
1909        return not self == other
1910
1911    def _get_params(self):  # TODO can move to _BaseObject
1912        return {"country": self.get_name()}
1913
1914    def get_name(self):
1915        """Returns the country name."""
1916
1917        return self.name
1918
1919    def get_top_artists(self, limit=None, cacheable=True):
1920        """Returns a sequence of the most played artists."""
1921        params = self._get_params()
1922        if limit:
1923            params["limit"] = limit
1924
1925        doc = self._request("geo.getTopArtists", cacheable, params)
1926
1927        return _extract_top_artists(doc, self)
1928
1929    def get_top_tracks(self, limit=None, cacheable=True, stream=False):
1930        """Returns a sequence of the most played tracks"""
1931        params = self._get_params()
1932        if limit:
1933            params["limit"] = limit
1934
1935        return self._get_things("getTopTracks", Track, params, cacheable, stream=stream)
1936
1937    def get_url(self, domain_name=DOMAIN_ENGLISH):
1938        """Returns the URL of the country page on the network.
1939        * domain_name: The network's language domain. Possible values:
1940          o DOMAIN_ENGLISH
1941          o DOMAIN_GERMAN
1942          o DOMAIN_SPANISH
1943          o DOMAIN_FRENCH
1944          o DOMAIN_ITALIAN
1945          o DOMAIN_POLISH
1946          o DOMAIN_PORTUGUESE
1947          o DOMAIN_SWEDISH
1948          o DOMAIN_TURKISH
1949          o DOMAIN_RUSSIAN
1950          o DOMAIN_JAPANESE
1951          o DOMAIN_CHINESE
1952        """
1953
1954        country_name = _url_safe(self.get_name())
1955
1956        return self.network._get_url(domain_name, "country") % {
1957            "country_name": country_name
1958        }
1959
1960
1961class Library(_BaseObject):
1962    """A user's Last.fm library."""
1963
1964    user = None
1965
1966    __hash__ = _BaseObject.__hash__
1967
1968    def __init__(self, user, network):
1969        super().__init__(network=network, ws_prefix="library")
1970
1971        if isinstance(user, User):
1972            self.user = user
1973        else:
1974            self.user = User(user, self.network)
1975
1976    def __repr__(self):
1977        return f"pylast.Library({repr(self.user)}, {repr(self.network)})"
1978
1979    @_string_output
1980    def __str__(self):
1981        return repr(self.get_user()) + "'s Library"
1982
1983    def _get_params(self):
1984        return {"user": self.user.get_name()}
1985
1986    def get_user(self):
1987        """Returns the user who owns this library."""
1988        return self.user
1989
1990    def get_artists(self, limit=50, cacheable=True, stream=False):
1991        """
1992        Returns a sequence of Album objects
1993        if limit==None it will return all (may take a while)
1994        """
1995
1996        def _get_artists():
1997            for node in _collect_nodes(
1998                limit, self, self.ws_prefix + ".getArtists", cacheable, stream=stream
1999            ):
2000                name = _extract(node, "name")
2001
2002                playcount = _number(_extract(node, "playcount"))
2003                tagcount = _number(_extract(node, "tagcount"))
2004
2005                yield LibraryItem(Artist(name, self.network), playcount, tagcount)
2006
2007        return _get_artists() if stream else list(_get_artists())
2008
2009
2010class Tag(_Chartable):
2011    """A Last.fm object tag."""
2012
2013    name = None
2014
2015    __hash__ = _BaseObject.__hash__
2016
2017    def __init__(self, name, network):
2018        super().__init__(network=network, ws_prefix="tag")
2019
2020        self.name = name
2021
2022    def __repr__(self):
2023        return f"pylast.Tag({repr(self.name)}, {repr(self.network)})"
2024
2025    @_string_output
2026    def __str__(self):
2027        return self.get_name()
2028
2029    def __eq__(self, other):
2030        return self.get_name().lower() == other.get_name().lower()
2031
2032    def __ne__(self, other):
2033        return not self == other
2034
2035    def _get_params(self):
2036        return {self.ws_prefix: self.get_name()}
2037
2038    def get_name(self, properly_capitalized=False):
2039        """Returns the name of the tag."""
2040
2041        if properly_capitalized:
2042            self.name = _extract(
2043                self._request(self.ws_prefix + ".getInfo", True), "name"
2044            )
2045
2046        return self.name
2047
2048    def get_top_albums(self, limit=None, cacheable=True):
2049        """Returns a list of the top albums."""
2050        params = self._get_params()
2051        if limit:
2052            params["limit"] = limit
2053
2054        doc = self._request(self.ws_prefix + ".getTopAlbums", cacheable, params)
2055
2056        return _extract_top_albums(doc, self.network)
2057
2058    def get_top_tracks(self, limit=None, cacheable=True, stream=False):
2059        """Returns a list of the most played Tracks for this tag."""
2060        params = self._get_params()
2061        if limit:
2062            params["limit"] = limit
2063
2064        return self._get_things("getTopTracks", Track, params, cacheable, stream=stream)
2065
2066    def get_top_artists(self, limit=None, cacheable=True):
2067        """Returns a sequence of the most played artists."""
2068
2069        params = self._get_params()
2070        if limit:
2071            params["limit"] = limit
2072
2073        doc = self._request(self.ws_prefix + ".getTopArtists", cacheable, params)
2074
2075        return _extract_top_artists(doc, self.network)
2076
2077    def get_url(self, domain_name=DOMAIN_ENGLISH):
2078        """Returns the URL of the tag page on the network.
2079        * domain_name: The network's language domain. Possible values:
2080          o DOMAIN_ENGLISH
2081          o DOMAIN_GERMAN
2082          o DOMAIN_SPANISH
2083          o DOMAIN_FRENCH
2084          o DOMAIN_ITALIAN
2085          o DOMAIN_POLISH
2086          o DOMAIN_PORTUGUESE
2087          o DOMAIN_SWEDISH
2088          o DOMAIN_TURKISH
2089          o DOMAIN_RUSSIAN
2090          o DOMAIN_JAPANESE
2091          o DOMAIN_CHINESE
2092        """
2093
2094        name = _url_safe(self.get_name())
2095
2096        return self.network._get_url(domain_name, "tag") % {"name": name}
2097
2098
2099class Track(_Opus):
2100    """A Last.fm track."""
2101
2102    __hash__ = _Opus.__hash__
2103
2104    def __init__(self, artist, title, network, username=None, info=None):
2105        super().__init__(artist, title, network, "track", username, info)
2106
2107    def get_correction(self):
2108        """Returns the corrected track name."""
2109
2110        return _extract(self._request(self.ws_prefix + ".getCorrection"), "name")
2111
2112    def get_duration(self):
2113        """Returns the track duration."""
2114
2115        doc = self._request(self.ws_prefix + ".getInfo", True)
2116
2117        return _number(_extract(doc, "duration"))
2118
2119    def get_userloved(self):
2120        """Whether the user loved this track"""
2121
2122        if not self.username:
2123            return
2124
2125        params = self._get_params()
2126        params["username"] = self.username
2127
2128        doc = self._request(self.ws_prefix + ".getInfo", True, params)
2129        loved = _number(_extract(doc, "userloved"))
2130        return bool(loved)
2131
2132    def is_streamable(self):
2133        """Returns True if the track is available at Last.fm."""
2134
2135        doc = self._request(self.ws_prefix + ".getInfo", True)
2136        return _extract(doc, "streamable") == "1"
2137
2138    def is_fulltrack_available(self):
2139        """Returns True if the full track is available for streaming."""
2140
2141        doc = self._request(self.ws_prefix + ".getInfo", True)
2142        return (
2143            doc.getElementsByTagName("streamable")[0].getAttribute("fulltrack") == "1"
2144        )
2145
2146    def get_album(self):
2147        """Returns the album object of this track."""
2148        if "album" in self.info and self.info["album"] is not None:
2149            return Album(self.artist, self.info["album"], self.network)
2150
2151        doc = self._request(self.ws_prefix + ".getInfo", True)
2152
2153        albums = doc.getElementsByTagName("album")
2154
2155        if len(albums) == 0:
2156            return
2157
2158        node = doc.getElementsByTagName("album")[0]
2159        return Album(_extract(node, "artist"), _extract(node, "title"), self.network)
2160
2161    def love(self):
2162        """Adds the track to the user's loved tracks."""
2163
2164        self._request(self.ws_prefix + ".love")
2165
2166    def unlove(self):
2167        """Remove the track to the user's loved tracks."""
2168
2169        self._request(self.ws_prefix + ".unlove")
2170
2171    def get_similar(self, limit=None):
2172        """
2173        Returns similar tracks for this track on the network,
2174        based on listening data.
2175        """
2176
2177        params = self._get_params()
2178        if limit:
2179            params["limit"] = limit
2180
2181        doc = self._request(self.ws_prefix + ".getSimilar", True, params)
2182
2183        seq = []
2184        for node in doc.getElementsByTagName(self.ws_prefix):
2185            title = _extract(node, "name")
2186            artist = _extract(node, "name", 1)
2187            match = _number(_extract(node, "match"))
2188
2189            seq.append(SimilarItem(Track(artist, title, self.network), match))
2190
2191        return seq
2192
2193    def get_url(self, domain_name=DOMAIN_ENGLISH):
2194        """Returns the URL of the album or track page on the network.
2195        # Parameters:
2196        * domain_name str: The network's language domain. Possible values:
2197            o DOMAIN_ENGLISH
2198            o DOMAIN_GERMAN
2199            o DOMAIN_SPANISH
2200            o DOMAIN_FRENCH
2201            o DOMAIN_ITALIAN
2202            o DOMAIN_POLISH
2203            o DOMAIN_PORTUGUESE
2204            o DOMAIN_SWEDISH
2205            o DOMAIN_TURKISH
2206            o DOMAIN_RUSSIAN
2207            o DOMAIN_JAPANESE
2208            o DOMAIN_CHINESE
2209        """
2210
2211        artist = _url_safe(self.get_artist().get_name())
2212        title = _url_safe(self.get_title())
2213
2214        return self.network._get_url(domain_name, self.ws_prefix) % {
2215            "artist": artist,
2216            "title": title,
2217        }
2218
2219
2220class User(_Chartable):
2221    """A Last.fm user."""
2222
2223    name = None
2224
2225    __hash__ = _BaseObject.__hash__
2226
2227    def __init__(self, user_name, network):
2228        super().__init__(network=network, ws_prefix="user")
2229
2230        self.name = user_name
2231
2232    def __repr__(self):
2233        return f"pylast.User({repr(self.name)}, {repr(self.network)})"
2234
2235    @_string_output
2236    def __str__(self):
2237        return self.get_name()
2238
2239    def __eq__(self, other):
2240        if isinstance(other, User):
2241            return self.get_name() == other.get_name()
2242        else:
2243            return False
2244
2245    def __ne__(self, other):
2246        return not self == other
2247
2248    def _get_params(self):
2249        return {self.ws_prefix: self.get_name()}
2250
2251    def _extract_played_track(self, track_node):
2252        title = _extract(track_node, "name")
2253        track_artist = _extract(track_node, "artist")
2254        date = _extract(track_node, "date")
2255        album = _extract(track_node, "album")
2256        timestamp = track_node.getElementsByTagName("date")[0].getAttribute("uts")
2257        return PlayedTrack(
2258            Track(track_artist, title, self.network), album, date, timestamp
2259        )
2260
2261    def get_name(self, properly_capitalized=False):
2262        """Returns the user name."""
2263
2264        if properly_capitalized:
2265            self.name = _extract(
2266                self._request(self.ws_prefix + ".getInfo", True), "name"
2267            )
2268
2269        return self.name
2270
2271    def get_friends(self, limit=50, cacheable=False, stream=False):
2272        """Returns a list of the user's friends."""
2273
2274        def _get_friends():
2275            for node in _collect_nodes(
2276                limit, self, self.ws_prefix + ".getFriends", cacheable, stream=stream
2277            ):
2278                yield User(_extract(node, "name"), self.network)
2279
2280        return _get_friends() if stream else list(_get_friends())
2281
2282    def get_loved_tracks(self, limit=50, cacheable=True, stream=False):
2283        """
2284        Returns this user's loved track as a sequence of LovedTrack objects in
2285        reverse order of their timestamp, all the way back to the first track.
2286
2287        If limit==None, it will try to pull all the available data.
2288        If stream=True, it will yield tracks as soon as a page has been retrieved.
2289
2290        This method uses caching. Enable caching only if you're pulling a
2291        large amount of data.
2292        """
2293
2294        def _get_loved_tracks():
2295            params = self._get_params()
2296            if limit:
2297                params["limit"] = limit
2298
2299            for track in _collect_nodes(
2300                limit,
2301                self,
2302                self.ws_prefix + ".getLovedTracks",
2303                cacheable,
2304                params,
2305                stream=stream,
2306            ):
2307                try:
2308                    artist = _extract(track, "name", 1)
2309                except IndexError:  # pragma: no cover
2310                    continue
2311                title = _extract(track, "name")
2312                date = _extract(track, "date")
2313                timestamp = track.getElementsByTagName("date")[0].getAttribute("uts")
2314
2315                yield LovedTrack(Track(artist, title, self.network), date, timestamp)
2316
2317        return _get_loved_tracks() if stream else list(_get_loved_tracks())
2318
2319    def get_now_playing(self):
2320        """
2321        Returns the currently playing track, or None if nothing is playing.
2322        """
2323
2324        params = self._get_params()
2325        params["limit"] = "1"
2326
2327        doc = self._request(self.ws_prefix + ".getRecentTracks", False, params)
2328
2329        tracks = doc.getElementsByTagName("track")
2330
2331        if len(tracks) == 0:
2332            return None
2333
2334        e = tracks[0]
2335
2336        if not e.hasAttribute("nowplaying"):
2337            return None
2338
2339        artist = _extract(e, "artist")
2340        title = _extract(e, "name")
2341        info = {"album": _extract(e, "album"), "image": _extract_all(e, "image")}
2342
2343        return Track(artist, title, self.network, self.name, info=info)
2344
2345    def get_recent_tracks(
2346        self,
2347        limit=10,
2348        cacheable=True,
2349        time_from=None,
2350        time_to=None,
2351        stream=False,
2352        now_playing=False,
2353    ):
2354        """
2355        Returns this user's played track as a sequence of PlayedTrack objects
2356        in reverse order of playtime, all the way back to the first track.
2357
2358        Parameters:
2359        limit : If None, it will try to pull all the available data.
2360        from (Optional) : Beginning timestamp of a range - only display
2361        scrobbles after this time, in UNIX timestamp format (integer
2362        number of seconds since 00:00:00, January 1st 1970 UTC). This
2363        must be in the UTC time zone.
2364        to (Optional) : End timestamp of a range - only display scrobbles
2365        before this time, in UNIX timestamp format (integer number of
2366        seconds since 00:00:00, January 1st 1970 UTC). This must be in
2367        the UTC time zone.
2368        stream: If True, it will yield tracks as soon as a page has been retrieved.
2369
2370        This method uses caching. Enable caching only if you're pulling a
2371        large amount of data.
2372        """
2373
2374        def _get_recent_tracks():
2375            params = self._get_params()
2376            if limit:
2377                params["limit"] = limit + 1  # in case we remove the now playing track
2378            if time_from:
2379                params["from"] = time_from
2380            if time_to:
2381                params["to"] = time_to
2382
2383            track_count = 0
2384            for track_node in _collect_nodes(
2385                limit + 1 if limit else None,
2386                self,
2387                self.ws_prefix + ".getRecentTracks",
2388                cacheable,
2389                params,
2390                stream=stream,
2391            ):
2392                if track_node.hasAttribute("nowplaying") and not now_playing:
2393                    continue  # to prevent the now playing track from sneaking in
2394
2395                if limit and track_count >= limit:
2396                    break
2397                yield self._extract_played_track(track_node=track_node)
2398                track_count += 1
2399
2400        return _get_recent_tracks() if stream else list(_get_recent_tracks())
2401
2402    def get_country(self):
2403        """Returns the name of the country of the user."""
2404
2405        doc = self._request(self.ws_prefix + ".getInfo", True)
2406
2407        country = _extract(doc, "country")
2408
2409        if country is None or country == "None":
2410            return None
2411        else:
2412            return Country(country, self.network)
2413
2414    def is_subscriber(self):
2415        """Returns whether the user is a subscriber or not. True or False."""
2416
2417        doc = self._request(self.ws_prefix + ".getInfo", True)
2418
2419        return _extract(doc, "subscriber") == "1"
2420
2421    def get_playcount(self):
2422        """Returns the user's playcount so far."""
2423
2424        doc = self._request(self.ws_prefix + ".getInfo", True)
2425
2426        return _number(_extract(doc, "playcount"))
2427
2428    def get_registered(self):
2429        """Returns the user's registration date."""
2430
2431        doc = self._request(self.ws_prefix + ".getInfo", True)
2432
2433        return _extract(doc, "registered")
2434
2435    def get_unixtime_registered(self):
2436        """Returns the user's registration date as a UNIX timestamp."""
2437
2438        doc = self._request(self.ws_prefix + ".getInfo", True)
2439
2440        return int(doc.getElementsByTagName("registered")[0].getAttribute("unixtime"))
2441
2442    def get_tagged_albums(self, tag, limit=None, cacheable=True):
2443        """Returns the albums tagged by a user."""
2444
2445        params = self._get_params()
2446        params["tag"] = tag
2447        params["taggingtype"] = "album"
2448        if limit:
2449            params["limit"] = limit
2450        doc = self._request(self.ws_prefix + ".getpersonaltags", cacheable, params)
2451        return _extract_albums(doc, self.network)
2452
2453    def get_tagged_artists(self, tag, limit=None):
2454        """Returns the artists tagged by a user."""
2455
2456        params = self._get_params()
2457        params["tag"] = tag
2458        params["taggingtype"] = "artist"
2459        if limit:
2460            params["limit"] = limit
2461        doc = self._request(self.ws_prefix + ".getpersonaltags", True, params)
2462        return _extract_artists(doc, self.network)
2463
2464    def get_tagged_tracks(self, tag, limit=None, cacheable=True):
2465        """Returns the tracks tagged by a user."""
2466
2467        params = self._get_params()
2468        params["tag"] = tag
2469        params["taggingtype"] = "track"
2470        if limit:
2471            params["limit"] = limit
2472        doc = self._request(self.ws_prefix + ".getpersonaltags", cacheable, params)
2473        return _extract_tracks(doc, self.network)
2474
2475    def get_top_albums(self, period=PERIOD_OVERALL, limit=None, cacheable=True):
2476        """Returns the top albums played by a user.
2477        * period: The period of time. Possible values:
2478          o PERIOD_OVERALL
2479          o PERIOD_7DAYS
2480          o PERIOD_1MONTH
2481          o PERIOD_3MONTHS
2482          o PERIOD_6MONTHS
2483          o PERIOD_12MONTHS
2484        """
2485
2486        params = self._get_params()
2487        params["period"] = period
2488        if limit:
2489            params["limit"] = limit
2490
2491        doc = self._request(self.ws_prefix + ".getTopAlbums", cacheable, params)
2492
2493        return _extract_top_albums(doc, self.network)
2494
2495    def get_top_artists(self, period=PERIOD_OVERALL, limit=None):
2496        """Returns the top artists played by a user.
2497        * period: The period of time. Possible values:
2498          o PERIOD_OVERALL
2499          o PERIOD_7DAYS
2500          o PERIOD_1MONTH
2501          o PERIOD_3MONTHS
2502          o PERIOD_6MONTHS
2503          o PERIOD_12MONTHS
2504        """
2505
2506        params = self._get_params()
2507        params["period"] = period
2508        if limit:
2509            params["limit"] = limit
2510
2511        doc = self._request(self.ws_prefix + ".getTopArtists", True, params)
2512
2513        return _extract_top_artists(doc, self.network)
2514
2515    def get_top_tags(self, limit=None, cacheable=True):
2516        """
2517        Returns a sequence of the top tags used by this user with their counts
2518        as TopItem objects.
2519        * limit: The limit of how many tags to return.
2520        * cacheable: Whether to cache results.
2521        """
2522
2523        params = self._get_params()
2524        if limit:
2525            params["limit"] = limit
2526
2527        doc = self._request(self.ws_prefix + ".getTopTags", cacheable, params)
2528
2529        seq = []
2530        for node in doc.getElementsByTagName("tag"):
2531            seq.append(
2532                TopItem(
2533                    Tag(_extract(node, "name"), self.network), _extract(node, "count")
2534                )
2535            )
2536
2537        return seq
2538
2539    def get_top_tracks(
2540        self, period=PERIOD_OVERALL, limit=None, cacheable=True, stream=False
2541    ):
2542        """Returns the top tracks played by a user.
2543        * period: The period of time. Possible values:
2544          o PERIOD_OVERALL
2545          o PERIOD_7DAYS
2546          o PERIOD_1MONTH
2547          o PERIOD_3MONTHS
2548          o PERIOD_6MONTHS
2549          o PERIOD_12MONTHS
2550        """
2551
2552        params = self._get_params()
2553        params["period"] = period
2554        params["limit"] = limit
2555
2556        return self._get_things("getTopTracks", Track, params, cacheable, stream=stream)
2557
2558    def get_track_scrobbles(self, artist, track, cacheable=False, stream=False):
2559        """
2560        Get a list of this user's scrobbles of this artist's track,
2561        including scrobble time.
2562        """
2563        params = self._get_params()
2564        params["artist"] = artist
2565        params["track"] = track
2566
2567        def _get_track_scrobbles():
2568            for track_node in _collect_nodes(
2569                None,
2570                self,
2571                self.ws_prefix + ".getTrackScrobbles",
2572                cacheable,
2573                params,
2574                stream=stream,
2575            ):
2576                yield self._extract_played_track(track_node)
2577
2578        return _get_track_scrobbles() if stream else list(_get_track_scrobbles())
2579
2580    def get_image(self, size=SIZE_EXTRA_LARGE):
2581        """
2582        Returns the user's avatar
2583        size can be one of:
2584            SIZE_EXTRA_LARGE
2585            SIZE_LARGE
2586            SIZE_MEDIUM
2587            SIZE_SMALL
2588        """
2589
2590        doc = self._request(self.ws_prefix + ".getInfo", True)
2591
2592        return _extract_all(doc, "image")[size]
2593
2594    def get_url(self, domain_name=DOMAIN_ENGLISH):
2595        """Returns the URL of the user page on the network.
2596        * domain_name: The network's language domain. Possible values:
2597          o DOMAIN_ENGLISH
2598          o DOMAIN_GERMAN
2599          o DOMAIN_SPANISH
2600          o DOMAIN_FRENCH
2601          o DOMAIN_ITALIAN
2602          o DOMAIN_POLISH
2603          o DOMAIN_PORTUGUESE
2604          o DOMAIN_SWEDISH
2605          o DOMAIN_TURKISH
2606          o DOMAIN_RUSSIAN
2607          o DOMAIN_JAPANESE
2608          o DOMAIN_CHINESE
2609        """
2610
2611        name = _url_safe(self.get_name())
2612
2613        return self.network._get_url(domain_name, "user") % {"name": name}
2614
2615    def get_library(self):
2616        """Returns the associated Library object."""
2617
2618        return Library(self, self.network)
2619
2620
2621class AuthenticatedUser(User):
2622    def __init__(self, network):
2623        super().__init__(user_name=network.username, network=network)
2624
2625    def _get_params(self):
2626        return {"user": self.get_name()}
2627
2628    def get_name(self, properly_capitalized=False):
2629        """Returns the name of the authenticated user."""
2630        return super().get_name(properly_capitalized=properly_capitalized)
2631
2632
2633class _Search(_BaseObject):
2634    """An abstract class. Use one of its derivatives."""
2635
2636    def __init__(self, ws_prefix, search_terms, network):
2637        super().__init__(network, ws_prefix)
2638
2639        self._ws_prefix = ws_prefix
2640        self.search_terms = search_terms
2641
2642        self._last_page_index = 0
2643
2644    def _get_params(self):
2645        params = {}
2646
2647        for key in self.search_terms.keys():
2648            params[key] = self.search_terms[key]
2649
2650        return params
2651
2652    def get_total_result_count(self):
2653        """Returns the total count of all the results."""
2654
2655        doc = self._request(self._ws_prefix + ".search", True)
2656
2657        return _extract(doc, "totalResults")
2658
2659    def _retrieve_page(self, page_index):
2660        """Returns the node of matches to be processed"""
2661
2662        params = self._get_params()
2663        params["page"] = str(page_index)
2664        doc = self._request(self._ws_prefix + ".search", True, params)
2665
2666        return doc.getElementsByTagName(self._ws_prefix + "matches")[0]
2667
2668    def _retrieve_next_page(self):
2669        self._last_page_index += 1
2670        return self._retrieve_page(self._last_page_index)
2671
2672
2673class AlbumSearch(_Search):
2674    """Search for an album by name."""
2675
2676    def __init__(self, album_name, network):
2677        super().__init__(
2678            ws_prefix="album", search_terms={"album": album_name}, network=network
2679        )
2680
2681    def get_next_page(self):
2682        """Returns the next page of results as a sequence of Album objects."""
2683
2684        master_node = self._retrieve_next_page()
2685
2686        seq = []
2687        for node in master_node.getElementsByTagName("album"):
2688            seq.append(
2689                Album(
2690                    _extract(node, "artist"),
2691                    _extract(node, "name"),
2692                    self.network,
2693                    info={"image": _extract_all(node, "image")},
2694                )
2695            )
2696
2697        return seq
2698
2699
2700class ArtistSearch(_Search):
2701    """Search for an artist by artist name."""
2702
2703    def __init__(self, artist_name, network):
2704        super().__init__(
2705            ws_prefix="artist", search_terms={"artist": artist_name}, network=network
2706        )
2707
2708    def get_next_page(self):
2709        """Returns the next page of results as a sequence of Artist objects."""
2710
2711        master_node = self._retrieve_next_page()
2712
2713        seq = []
2714        for node in master_node.getElementsByTagName("artist"):
2715            artist = Artist(
2716                _extract(node, "name"),
2717                self.network,
2718                info={"image": _extract_all(node, "image")},
2719            )
2720            artist.listener_count = _number(_extract(node, "listeners"))
2721            seq.append(artist)
2722
2723        return seq
2724
2725
2726class TrackSearch(_Search):
2727    """
2728    Search for a track by track title. If you don't want to narrow the results
2729    down by specifying the artist name, set it to empty string.
2730    """
2731
2732    def __init__(self, artist_name, track_title, network):
2733        super().__init__(
2734            ws_prefix="track",
2735            search_terms={"track": track_title, "artist": artist_name},
2736            network=network,
2737        )
2738
2739    def get_next_page(self):
2740        """Returns the next page of results as a sequence of Track objects."""
2741
2742        master_node = self._retrieve_next_page()
2743
2744        seq = []
2745        for node in master_node.getElementsByTagName("track"):
2746            track = Track(
2747                _extract(node, "artist"),
2748                _extract(node, "name"),
2749                self.network,
2750                info={"image": _extract_all(node, "image")},
2751            )
2752            track.listener_count = _number(_extract(node, "listeners"))
2753            seq.append(track)
2754
2755        return seq
2756
2757
2758def md5(text):
2759    """Returns the md5 hash of a string."""
2760
2761    h = hashlib.md5()
2762    h.update(_unicode(text).encode("utf-8"))
2763
2764    return h.hexdigest()
2765
2766
2767def _unicode(text):
2768    if isinstance(text, bytes):
2769        return str(text, "utf-8")
2770    elif isinstance(text, str):
2771        return text
2772    else:
2773        return str(text)
2774
2775
2776def _string(string):
2777    if isinstance(string, str):
2778        return string
2779    return str(string)
2780
2781
2782def cleanup_nodes(doc):
2783    """
2784    Remove text nodes containing only whitespace
2785    """
2786    for node in doc.documentElement.childNodes:
2787        if node.nodeType == Node.TEXT_NODE and node.nodeValue.isspace():
2788            doc.documentElement.removeChild(node)
2789    return doc
2790
2791
2792def _collect_nodes(limit, sender, method_name, cacheable, params=None, stream=False):
2793    """
2794    Returns a sequence of dom.Node objects about as close to limit as possible
2795    """
2796    if not params:
2797        params = sender._get_params()
2798
2799    def _stream_collect_nodes():
2800        node_count = 0
2801        page = 1
2802        end_of_pages = False
2803
2804        while not end_of_pages and (not limit or (limit and node_count < limit)):
2805            params["page"] = str(page)
2806
2807            tries = 1
2808            while True:
2809                try:
2810                    doc = sender._request(method_name, cacheable, params)
2811                    break  # success
2812                except Exception as e:
2813                    if tries >= 3:
2814                        raise PyLastError() from e
2815                    # Wait and try again
2816                    time.sleep(1)
2817                    tries += 1
2818
2819            doc = cleanup_nodes(doc)
2820
2821            # break if there are no child nodes
2822            if not doc.documentElement.childNodes:
2823                break
2824            main = doc.documentElement.childNodes[0]
2825
2826            if main.hasAttribute("totalPages") or main.hasAttribute("totalpages"):
2827                total_pages = _number(
2828                    main.getAttribute("totalPages") or main.getAttribute("totalpages")
2829                )
2830            else:
2831                raise PyLastError("No total pages attribute")
2832
2833            for node in main.childNodes:
2834                if not node.nodeType == xml.dom.Node.TEXT_NODE and (
2835                    not limit or (node_count < limit)
2836                ):
2837                    node_count += 1
2838                    yield node
2839
2840            end_of_pages = page >= total_pages
2841
2842            page += 1
2843
2844    return _stream_collect_nodes() if stream else list(_stream_collect_nodes())
2845
2846
2847def _extract(node, name, index=0):
2848    """Extracts a value from the xml string"""
2849
2850    nodes = node.getElementsByTagName(name)
2851
2852    if len(nodes):
2853        if nodes[index].firstChild:
2854            return _unescape_htmlentity(nodes[index].firstChild.data.strip())
2855    else:
2856        return None
2857
2858
2859def _extract_all(node, name, limit_count=None):
2860    """Extracts all the values from the xml string. returning a list."""
2861
2862    seq = []
2863
2864    for i in range(0, len(node.getElementsByTagName(name))):
2865        if len(seq) == limit_count:
2866            break
2867
2868        seq.append(_extract(node, name, i))
2869
2870    return seq
2871
2872
2873def _extract_top_artists(doc, network):
2874    # TODO Maybe include the _request here too?
2875    seq = []
2876    for node in doc.getElementsByTagName("artist"):
2877        name = _extract(node, "name")
2878        playcount = _extract(node, "playcount")
2879
2880        seq.append(TopItem(Artist(name, network), playcount))
2881
2882    return seq
2883
2884
2885def _extract_top_albums(doc, network):
2886    # TODO Maybe include the _request here too?
2887    seq = []
2888    for node in doc.getElementsByTagName("album"):
2889        name = _extract(node, "name")
2890        artist = _extract(node, "name", 1)
2891        playcount = _extract(node, "playcount")
2892        info = {"image": _extract_all(node, "image")}
2893
2894        seq.append(TopItem(Album(artist, name, network, info=info), playcount))
2895
2896    return seq
2897
2898
2899def _extract_artists(doc, network):
2900    seq = []
2901    for node in doc.getElementsByTagName("artist"):
2902        seq.append(Artist(_extract(node, "name"), network))
2903    return seq
2904
2905
2906def _extract_albums(doc, network):
2907    seq = []
2908    for node in doc.getElementsByTagName("album"):
2909        name = _extract(node, "name")
2910        artist = _extract(node, "name", 1)
2911        seq.append(Album(artist, name, network))
2912    return seq
2913
2914
2915def _extract_tracks(doc, network):
2916    seq = []
2917    for node in doc.getElementsByTagName("track"):
2918        name = _extract(node, "name")
2919        artist = _extract(node, "name", 1)
2920        seq.append(Track(artist, name, network))
2921    return seq
2922
2923
2924def _url_safe(text):
2925    """Does all kinds of tricks on a text to make it safe to use in a URL."""
2926
2927    return quote_plus(quote_plus(_string(text))).lower()
2928
2929
2930def _number(string):
2931    """
2932    Extracts an int from a string.
2933    Returns a 0 if None or an empty string was passed.
2934    """
2935
2936    if not string:
2937        return 0
2938    else:
2939        try:
2940            return int(string)
2941        except ValueError:
2942            return float(string)
2943
2944
2945def _unescape_htmlentity(string):
2946    mapping = html.entities.name2codepoint
2947    for key in mapping:
2948        string = string.replace("&%s;" % key, chr(mapping[key]))
2949
2950    return string
2951
2952
2953# End of file
2954