1# 2# pylast - 3# A Python interface to Last.fm and Libre.fm 4# 5# Copyright 2008-2010 Amr Hassan 6# Copyright 2013-2021 hugovk 7# 8# Licensed under the Apache License, Version 2.0 (the "License"); 9# you may not use this file except in compliance with the License. 10# You may obtain a copy of the License at 11# 12# https://www.apache.org/licenses/LICENSE-2.0 13# 14# Unless required by applicable law or agreed to in writing, software 15# distributed under the License is distributed on an "AS IS" BASIS, 16# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 17# See the License for the specific language governing permissions and 18# limitations under the License. 19# 20# https://github.com/pylast/pylast 21 22import collections 23import hashlib 24import html.entities 25import logging 26import os 27import shelve 28import ssl 29import tempfile 30import time 31import xml.dom 32from http.client import HTTPSConnection 33from urllib.parse import quote_plus 34from xml.dom import Node, minidom 35 36try: 37 # Python 3.8+ 38 import importlib.metadata as importlib_metadata 39except ImportError: 40 # Python 3.7 and lower 41 import importlib_metadata 42 43__author__ = "Amr Hassan, hugovk, Mice Pápai" 44__copyright__ = "Copyright (C) 2008-2010 Amr Hassan, 2013-2021 hugovk, 2017 Mice Pápai" 45__license__ = "apache2" 46__email__ = "amr.hassan@gmail.com" 47__version__ = importlib_metadata.version(__name__) 48 49 50# 1 : This error does not exist 51STATUS_INVALID_SERVICE = 2 52STATUS_INVALID_METHOD = 3 53STATUS_AUTH_FAILED = 4 54STATUS_INVALID_FORMAT = 5 55STATUS_INVALID_PARAMS = 6 56STATUS_INVALID_RESOURCE = 7 57STATUS_OPERATION_FAILED = 8 58STATUS_INVALID_SK = 9 59STATUS_INVALID_API_KEY = 10 60STATUS_OFFLINE = 11 61STATUS_SUBSCRIBERS_ONLY = 12 62STATUS_INVALID_SIGNATURE = 13 63STATUS_TOKEN_UNAUTHORIZED = 14 64STATUS_TOKEN_EXPIRED = 15 65STATUS_TEMPORARILY_UNAVAILABLE = 16 66STATUS_LOGIN_REQUIRED = 17 67STATUS_TRIAL_EXPIRED = 18 68# 19 : This error does not exist 69STATUS_NOT_ENOUGH_CONTENT = 20 70STATUS_NOT_ENOUGH_MEMBERS = 21 71STATUS_NOT_ENOUGH_FANS = 22 72STATUS_NOT_ENOUGH_NEIGHBOURS = 23 73STATUS_NO_PEAK_RADIO = 24 74STATUS_RADIO_NOT_FOUND = 25 75STATUS_API_KEY_SUSPENDED = 26 76STATUS_DEPRECATED = 27 77# 28 : This error is not documented 78STATUS_RATE_LIMIT_EXCEEDED = 29 79 80PERIOD_OVERALL = "overall" 81PERIOD_7DAYS = "7day" 82PERIOD_1MONTH = "1month" 83PERIOD_3MONTHS = "3month" 84PERIOD_6MONTHS = "6month" 85PERIOD_12MONTHS = "12month" 86 87DOMAIN_ENGLISH = 0 88DOMAIN_GERMAN = 1 89DOMAIN_SPANISH = 2 90DOMAIN_FRENCH = 3 91DOMAIN_ITALIAN = 4 92DOMAIN_POLISH = 5 93DOMAIN_PORTUGUESE = 6 94DOMAIN_SWEDISH = 7 95DOMAIN_TURKISH = 8 96DOMAIN_RUSSIAN = 9 97DOMAIN_JAPANESE = 10 98DOMAIN_CHINESE = 11 99 100SIZE_SMALL = 0 101SIZE_MEDIUM = 1 102SIZE_LARGE = 2 103SIZE_EXTRA_LARGE = 3 104SIZE_MEGA = 4 105 106IMAGES_ORDER_POPULARITY = "popularity" 107IMAGES_ORDER_DATE = "dateadded" 108 109 110SCROBBLE_SOURCE_USER = "P" 111SCROBBLE_SOURCE_NON_PERSONALIZED_BROADCAST = "R" 112SCROBBLE_SOURCE_PERSONALIZED_BROADCAST = "E" 113SCROBBLE_SOURCE_LASTFM = "L" 114SCROBBLE_SOURCE_UNKNOWN = "U" 115 116SCROBBLE_MODE_PLAYED = "" 117SCROBBLE_MODE_LOVED = "L" 118SCROBBLE_MODE_BANNED = "B" 119SCROBBLE_MODE_SKIPPED = "S" 120 121# Delay time in seconds from section 4.4 of https://www.last.fm/api/tos 122DELAY_TIME = 0.2 123 124# Python >3.4 has sane defaults 125SSL_CONTEXT = ssl.create_default_context() 126 127logger = logging.getLogger(__name__) 128logging.getLogger(__name__).addHandler(logging.NullHandler()) 129 130 131class _Network: 132 """ 133 A music social network website such as Last.fm or 134 one with a Last.fm-compatible API. 135 """ 136 137 def __init__( 138 self, 139 name, 140 homepage, 141 ws_server, 142 api_key, 143 api_secret, 144 session_key, 145 username, 146 password_hash, 147 domain_names, 148 urls, 149 token=None, 150 ): 151 """ 152 name: the name of the network 153 homepage: the homepage URL 154 ws_server: the URL of the webservices server 155 api_key: a provided API_KEY 156 api_secret: a provided API_SECRET 157 session_key: a generated session_key or None 158 username: a username of a valid user 159 password_hash: the output of pylast.md5(password) where password is 160 the user's password 161 domain_names: a dict mapping each DOMAIN_* value to a string domain 162 name 163 urls: a dict mapping types to URLs 164 token: an authentication token to retrieve a session 165 166 if username and password_hash were provided and not session_key, 167 session_key will be generated automatically when needed. 168 169 Either a valid session_key or a combination of username and 170 password_hash must be present for scrobbling. 171 172 You should use a preconfigured network object through a 173 get_*_network(...) method instead of creating an object 174 of this class, unless you know what you're doing. 175 """ 176 177 self.name = name 178 self.homepage = homepage 179 self.ws_server = ws_server 180 self.api_key = api_key 181 self.api_secret = api_secret 182 self.session_key = session_key 183 self.username = username 184 self.password_hash = password_hash 185 self.domain_names = domain_names 186 self.urls = urls 187 188 self.cache_backend = None 189 self.proxy_enabled = False 190 self.proxy = None 191 self.last_call_time = 0 192 self.limit_rate = False 193 194 # Load session_key and username from authentication token if provided 195 if token and not self.session_key: 196 sk_gen = SessionKeyGenerator(self) 197 self.session_key, self.username = sk_gen.get_web_auth_session_key_username( 198 url=None, token=token 199 ) 200 201 # Generate a session_key if necessary 202 if ( 203 (self.api_key and self.api_secret) 204 and not self.session_key 205 and (self.username and self.password_hash) 206 ): 207 sk_gen = SessionKeyGenerator(self) 208 self.session_key = sk_gen.get_session_key(self.username, self.password_hash) 209 210 def __str__(self): 211 return "%s Network" % self.name 212 213 def get_artist(self, artist_name): 214 """ 215 Return an Artist object 216 """ 217 218 return Artist(artist_name, self) 219 220 def get_track(self, artist, title): 221 """ 222 Return a Track object 223 """ 224 225 return Track(artist, title, self) 226 227 def get_album(self, artist, title): 228 """ 229 Return an Album object 230 """ 231 232 return Album(artist, title, self) 233 234 def get_authenticated_user(self): 235 """ 236 Returns the authenticated user 237 """ 238 239 return AuthenticatedUser(self) 240 241 def get_country(self, country_name): 242 """ 243 Returns a country object 244 """ 245 246 return Country(country_name, self) 247 248 def get_user(self, username): 249 """ 250 Returns a user object 251 """ 252 253 return User(username, self) 254 255 def get_tag(self, name): 256 """ 257 Returns a tag object 258 """ 259 260 return Tag(name, self) 261 262 def _get_language_domain(self, domain_language): 263 """ 264 Returns the mapped domain name of the network to a DOMAIN_* value 265 """ 266 267 if domain_language in self.domain_names: 268 return self.domain_names[domain_language] 269 270 def _get_url(self, domain, url_type): 271 return "https://{}/{}".format( 272 self._get_language_domain(domain), self.urls[url_type] 273 ) 274 275 def _get_ws_auth(self): 276 """ 277 Returns an (API_KEY, API_SECRET, SESSION_KEY) tuple. 278 """ 279 return self.api_key, self.api_secret, self.session_key 280 281 def _delay_call(self): 282 """ 283 Makes sure that web service calls are at least 0.2 seconds apart. 284 """ 285 now = time.time() 286 287 time_since_last = now - self.last_call_time 288 289 if time_since_last < DELAY_TIME: 290 time.sleep(DELAY_TIME - time_since_last) 291 292 self.last_call_time = now 293 294 def get_top_artists(self, limit=None, cacheable=True): 295 """Returns the most played artists as a sequence of TopItem objects.""" 296 297 params = {} 298 if limit: 299 params["limit"] = limit 300 301 doc = _Request(self, "chart.getTopArtists", params).execute(cacheable) 302 303 return _extract_top_artists(doc, self) 304 305 def get_top_tracks(self, limit=None, cacheable=True): 306 """Returns the most played tracks as a sequence of TopItem objects.""" 307 308 params = {} 309 if limit: 310 params["limit"] = limit 311 312 doc = _Request(self, "chart.getTopTracks", params).execute(cacheable) 313 314 seq = [] 315 for node in doc.getElementsByTagName("track"): 316 title = _extract(node, "name") 317 artist = _extract(node, "name", 1) 318 track = Track(artist, title, self) 319 weight = _number(_extract(node, "playcount")) 320 seq.append(TopItem(track, weight)) 321 322 return seq 323 324 def get_top_tags(self, limit=None, cacheable=True): 325 """Returns the most used tags as a sequence of TopItem objects.""" 326 327 # Last.fm has no "limit" parameter for tag.getTopTags 328 # so we need to get all (250) and then limit locally 329 doc = _Request(self, "tag.getTopTags").execute(cacheable) 330 331 seq = [] 332 for node in doc.getElementsByTagName("tag"): 333 if limit and len(seq) >= limit: 334 break 335 tag = Tag(_extract(node, "name"), self) 336 weight = _number(_extract(node, "count")) 337 seq.append(TopItem(tag, weight)) 338 339 return seq 340 341 def get_geo_top_artists(self, country, limit=None, cacheable=True): 342 """Get the most popular artists on Last.fm by country. 343 Parameters: 344 country (Required) : A country name, as defined by the ISO 3166-1 345 country names standard. 346 limit (Optional) : The number of results to fetch per page. 347 Defaults to 50. 348 """ 349 params = {"country": country} 350 351 if limit: 352 params["limit"] = limit 353 354 doc = _Request(self, "geo.getTopArtists", params).execute(cacheable) 355 356 return _extract_top_artists(doc, self) 357 358 def get_geo_top_tracks(self, country, location=None, limit=None, cacheable=True): 359 """Get the most popular tracks on Last.fm last week by country. 360 Parameters: 361 country (Required) : A country name, as defined by the ISO 3166-1 362 country names standard 363 location (Optional) : A metro name, to fetch the charts for 364 (must be within the country specified) 365 limit (Optional) : The number of results to fetch per page. 366 Defaults to 50. 367 """ 368 params = {"country": country} 369 370 if location: 371 params["location"] = location 372 if limit: 373 params["limit"] = limit 374 375 doc = _Request(self, "geo.getTopTracks", params).execute(cacheable) 376 377 tracks = doc.getElementsByTagName("track") 378 seq = [] 379 380 for track in tracks: 381 title = _extract(track, "name") 382 artist = _extract(track, "name", 1) 383 listeners = _extract(track, "listeners") 384 385 seq.append(TopItem(Track(artist, title, self), listeners)) 386 387 return seq 388 389 def enable_proxy(self, host, port): 390 """Enable a default web proxy""" 391 392 self.proxy = [host, _number(port)] 393 self.proxy_enabled = True 394 395 def disable_proxy(self): 396 """Disable using the web proxy""" 397 398 self.proxy_enabled = False 399 400 def is_proxy_enabled(self): 401 """Returns True if a web proxy is enabled.""" 402 403 return self.proxy_enabled 404 405 def _get_proxy(self): 406 """Returns proxy details.""" 407 408 return self.proxy 409 410 def enable_rate_limit(self): 411 """Enables rate limiting for this network""" 412 self.limit_rate = True 413 414 def disable_rate_limit(self): 415 """Disables rate limiting for this network""" 416 self.limit_rate = False 417 418 def is_rate_limited(self): 419 """Return True if web service calls are rate limited""" 420 return self.limit_rate 421 422 def enable_caching(self, file_path=None): 423 """Enables caching request-wide for all cacheable calls. 424 425 * file_path: A file path for the backend storage file. If 426 None set, a temp file would probably be created, according the backend. 427 """ 428 429 if not file_path: 430 self.cache_backend = _ShelfCacheBackend.create_shelf() 431 return 432 433 self.cache_backend = _ShelfCacheBackend(file_path) 434 435 def disable_caching(self): 436 """Disables all caching features.""" 437 438 self.cache_backend = None 439 440 def is_caching_enabled(self): 441 """Returns True if caching is enabled.""" 442 443 return not (self.cache_backend is None) 444 445 def _get_cache_backend(self): 446 447 return self.cache_backend 448 449 def search_for_album(self, album_name): 450 """Searches for an album by its name. Returns a AlbumSearch object. 451 Use get_next_page() to retrieve sequences of results.""" 452 453 return AlbumSearch(album_name, self) 454 455 def search_for_artist(self, artist_name): 456 """Searches of an artist by its name. Returns a ArtistSearch object. 457 Use get_next_page() to retrieve sequences of results.""" 458 459 return ArtistSearch(artist_name, self) 460 461 def search_for_track(self, artist_name, track_name): 462 """Searches of a track by its name and its artist. Set artist to an 463 empty string if not available. 464 Returns a TrackSearch object. 465 Use get_next_page() to retrieve sequences of results.""" 466 467 return TrackSearch(artist_name, track_name, self) 468 469 def get_track_by_mbid(self, mbid): 470 """Looks up a track by its MusicBrainz ID""" 471 472 params = {"mbid": mbid} 473 474 doc = _Request(self, "track.getInfo", params).execute(True) 475 476 return Track(_extract(doc, "name", 1), _extract(doc, "name"), self) 477 478 def get_artist_by_mbid(self, mbid): 479 """Looks up an artist by its MusicBrainz ID""" 480 481 params = {"mbid": mbid} 482 483 doc = _Request(self, "artist.getInfo", params).execute(True) 484 485 return Artist(_extract(doc, "name"), self) 486 487 def get_album_by_mbid(self, mbid): 488 """Looks up an album by its MusicBrainz ID""" 489 490 params = {"mbid": mbid} 491 492 doc = _Request(self, "album.getInfo", params).execute(True) 493 494 return Album(_extract(doc, "artist"), _extract(doc, "name"), self) 495 496 def update_now_playing( 497 self, 498 artist, 499 title, 500 album=None, 501 album_artist=None, 502 duration=None, 503 track_number=None, 504 mbid=None, 505 context=None, 506 ): 507 """ 508 Used to notify Last.fm that a user has started listening to a track. 509 510 Parameters: 511 artist (Required) : The artist name 512 title (Required) : The track title 513 album (Optional) : The album name. 514 album_artist (Optional) : The album artist - if this differs 515 from the track artist. 516 duration (Optional) : The length of the track in seconds. 517 track_number (Optional) : The track number of the track on the 518 album. 519 mbid (Optional) : The MusicBrainz Track ID. 520 context (Optional) : Sub-client version 521 (not public, only enabled for certain API keys) 522 """ 523 524 params = {"track": title, "artist": artist} 525 526 if album: 527 params["album"] = album 528 if album_artist: 529 params["albumArtist"] = album_artist 530 if context: 531 params["context"] = context 532 if track_number: 533 params["trackNumber"] = track_number 534 if mbid: 535 params["mbid"] = mbid 536 if duration: 537 params["duration"] = duration 538 539 _Request(self, "track.updateNowPlaying", params).execute() 540 541 def scrobble( 542 self, 543 artist, 544 title, 545 timestamp, 546 album=None, 547 album_artist=None, 548 track_number=None, 549 duration=None, 550 stream_id=None, 551 context=None, 552 mbid=None, 553 ): 554 555 """Used to add a track-play to a user's profile. 556 557 Parameters: 558 artist (Required) : The artist name. 559 title (Required) : The track name. 560 timestamp (Required) : The time the track started playing, in UNIX 561 timestamp format (integer number of seconds since 00:00:00, 562 January 1st 1970 UTC). This must be in the UTC time zone. 563 album (Optional) : The album name. 564 album_artist (Optional) : The album artist - if this differs from 565 the track artist. 566 context (Optional) : Sub-client version (not public, only enabled 567 for certain API keys) 568 stream_id (Optional) : The stream id for this track received from 569 the radio.getPlaylist service. 570 track_number (Optional) : The track number of the track on the 571 album. 572 mbid (Optional) : The MusicBrainz Track ID. 573 duration (Optional) : The length of the track in seconds. 574 """ 575 576 return self.scrobble_many( 577 ( 578 { 579 "artist": artist, 580 "title": title, 581 "timestamp": timestamp, 582 "album": album, 583 "album_artist": album_artist, 584 "track_number": track_number, 585 "duration": duration, 586 "stream_id": stream_id, 587 "context": context, 588 "mbid": mbid, 589 }, 590 ) 591 ) 592 593 def scrobble_many(self, tracks): 594 """ 595 Used to scrobble a batch of tracks at once. The parameter tracks is a 596 sequence of dicts per track containing the keyword arguments as if 597 passed to the scrobble() method. 598 """ 599 600 tracks_to_scrobble = tracks[:50] 601 if len(tracks) > 50: 602 remaining_tracks = tracks[50:] 603 else: 604 remaining_tracks = None 605 606 params = {} 607 for i in range(len(tracks_to_scrobble)): 608 609 params["artist[%d]" % i] = tracks_to_scrobble[i]["artist"] 610 params["track[%d]" % i] = tracks_to_scrobble[i]["title"] 611 612 additional_args = ( 613 "timestamp", 614 "album", 615 "album_artist", 616 "context", 617 "stream_id", 618 "track_number", 619 "mbid", 620 "duration", 621 ) 622 args_map_to = { # so friggin lazy 623 "album_artist": "albumArtist", 624 "track_number": "trackNumber", 625 "stream_id": "streamID", 626 } 627 628 for arg in additional_args: 629 630 if arg in tracks_to_scrobble[i] and tracks_to_scrobble[i][arg]: 631 if arg in args_map_to: 632 maps_to = args_map_to[arg] 633 else: 634 maps_to = arg 635 636 params["%s[%d]" % (maps_to, i)] = tracks_to_scrobble[i][arg] 637 638 _Request(self, "track.scrobble", params).execute() 639 640 if remaining_tracks: 641 self.scrobble_many(remaining_tracks) 642 643 644class LastFMNetwork(_Network): 645 646 """A Last.fm network object 647 648 api_key: a provided API_KEY 649 api_secret: a provided API_SECRET 650 session_key: a generated session_key or None 651 username: a username of a valid user 652 password_hash: the output of pylast.md5(password) where password is the 653 user's password 654 655 if username and password_hash were provided and not session_key, 656 session_key will be generated automatically when needed. 657 658 Either a valid session_key or a combination of username and password_hash 659 must be present for scrobbling. 660 661 Most read-only webservices only require an api_key and an api_secret, see 662 about obtaining them from: 663 https://www.last.fm/api/account 664 """ 665 666 def __init__( 667 self, 668 api_key="", 669 api_secret="", 670 session_key="", 671 username="", 672 password_hash="", 673 token="", 674 ): 675 super().__init__( 676 name="Last.fm", 677 homepage="https://www.last.fm", 678 ws_server=("ws.audioscrobbler.com", "/2.0/"), 679 api_key=api_key, 680 api_secret=api_secret, 681 session_key=session_key, 682 username=username, 683 password_hash=password_hash, 684 token=token, 685 domain_names={ 686 DOMAIN_ENGLISH: "www.last.fm", 687 DOMAIN_GERMAN: "www.last.fm/de", 688 DOMAIN_SPANISH: "www.last.fm/es", 689 DOMAIN_FRENCH: "www.last.fm/fr", 690 DOMAIN_ITALIAN: "www.last.fm/it", 691 DOMAIN_POLISH: "www.last.fm/pl", 692 DOMAIN_PORTUGUESE: "www.last.fm/pt", 693 DOMAIN_SWEDISH: "www.last.fm/sv", 694 DOMAIN_TURKISH: "www.last.fm/tr", 695 DOMAIN_RUSSIAN: "www.last.fm/ru", 696 DOMAIN_JAPANESE: "www.last.fm/ja", 697 DOMAIN_CHINESE: "www.last.fm/zh", 698 }, 699 urls={ 700 "album": "music/%(artist)s/%(album)s", 701 "artist": "music/%(artist)s", 702 "country": "place/%(country_name)s", 703 "tag": "tag/%(name)s", 704 "track": "music/%(artist)s/_/%(title)s", 705 "user": "user/%(name)s", 706 }, 707 ) 708 709 def __repr__(self): 710 return "pylast.LastFMNetwork(%s)" % ( 711 ", ".join( 712 ( 713 "'%s'" % self.api_key, 714 "'%s'" % self.api_secret, 715 "'%s'" % self.session_key, 716 "'%s'" % self.username, 717 "'%s'" % self.password_hash, 718 ) 719 ) 720 ) 721 722 723class LibreFMNetwork(_Network): 724 """ 725 A preconfigured _Network object for Libre.fm 726 727 api_key: a provided API_KEY 728 api_secret: a provided API_SECRET 729 session_key: a generated session_key or None 730 username: a username of a valid user 731 password_hash: the output of pylast.md5(password) where password is the 732 user's password 733 734 if username and password_hash were provided and not session_key, 735 session_key will be generated automatically when needed. 736 """ 737 738 def __init__( 739 self, api_key="", api_secret="", session_key="", username="", password_hash="" 740 ): 741 742 super().__init__( 743 name="Libre.fm", 744 homepage="https://libre.fm", 745 ws_server=("libre.fm", "/2.0/"), 746 api_key=api_key, 747 api_secret=api_secret, 748 session_key=session_key, 749 username=username, 750 password_hash=password_hash, 751 domain_names={ 752 DOMAIN_ENGLISH: "libre.fm", 753 DOMAIN_GERMAN: "libre.fm", 754 DOMAIN_SPANISH: "libre.fm", 755 DOMAIN_FRENCH: "libre.fm", 756 DOMAIN_ITALIAN: "libre.fm", 757 DOMAIN_POLISH: "libre.fm", 758 DOMAIN_PORTUGUESE: "libre.fm", 759 DOMAIN_SWEDISH: "libre.fm", 760 DOMAIN_TURKISH: "libre.fm", 761 DOMAIN_RUSSIAN: "libre.fm", 762 DOMAIN_JAPANESE: "libre.fm", 763 DOMAIN_CHINESE: "libre.fm", 764 }, 765 urls={ 766 "album": "artist/%(artist)s/album/%(album)s", 767 "artist": "artist/%(artist)s", 768 "country": "place/%(country_name)s", 769 "tag": "tag/%(name)s", 770 "track": "music/%(artist)s/_/%(title)s", 771 "user": "user/%(name)s", 772 }, 773 ) 774 775 def __repr__(self): 776 return "pylast.LibreFMNetwork(%s)" % ( 777 ", ".join( 778 ( 779 "'%s'" % self.api_key, 780 "'%s'" % self.api_secret, 781 "'%s'" % self.session_key, 782 "'%s'" % self.username, 783 "'%s'" % self.password_hash, 784 ) 785 ) 786 ) 787 788 789class _ShelfCacheBackend: 790 """Used as a backend for caching cacheable requests.""" 791 792 def __init__(self, file_path=None, flag=None): 793 if flag is not None: 794 self.shelf = shelve.open(file_path, flag=flag) 795 else: 796 self.shelf = shelve.open(file_path) 797 self.cache_keys = set(self.shelf.keys()) 798 799 def __contains__(self, key): 800 return key in self.cache_keys 801 802 def __iter__(self): 803 return iter(self.shelf.keys()) 804 805 def get_xml(self, key): 806 return self.shelf[key] 807 808 def set_xml(self, key, xml_string): 809 self.cache_keys.add(key) 810 self.shelf[key] = xml_string 811 812 @classmethod 813 def create_shelf(cls): 814 file_descriptor, file_path = tempfile.mkstemp(prefix="pylast_tmp_") 815 os.close(file_descriptor) 816 return cls(file_path=file_path, flag="n") 817 818 819class _Request: 820 """Representing an abstract web service operation.""" 821 822 def __init__(self, network, method_name, params=None): 823 logger.debug(method_name) 824 825 if params is None: 826 params = {} 827 828 self.network = network 829 self.params = {} 830 831 for key in params: 832 self.params[key] = _unicode(params[key]) 833 834 (self.api_key, self.api_secret, self.session_key) = network._get_ws_auth() 835 836 self.params["api_key"] = self.api_key 837 self.params["method"] = method_name 838 839 if network.is_caching_enabled(): 840 self.cache = network._get_cache_backend() 841 842 if self.session_key: 843 self.params["sk"] = self.session_key 844 self.sign_it() 845 846 def sign_it(self): 847 """Sign this request.""" 848 849 if "api_sig" not in self.params.keys(): 850 self.params["api_sig"] = self._get_signature() 851 852 def _get_signature(self): 853 """ 854 Returns a 32-character hexadecimal md5 hash of the signature string. 855 """ 856 857 keys = list(self.params.keys()) 858 859 keys.sort() 860 861 string = "" 862 863 for name in keys: 864 string += name 865 string += self.params[name] 866 867 string += self.api_secret 868 869 return md5(string) 870 871 def _get_cache_key(self): 872 """ 873 The cache key is a string of concatenated sorted names and values. 874 """ 875 876 keys = list(self.params.keys()) 877 keys.sort() 878 879 cache_key = "" 880 881 for key in keys: 882 if key != "api_sig" and key != "api_key" and key != "sk": 883 cache_key += key + self.params[key] 884 885 return hashlib.sha1(cache_key.encode("utf-8")).hexdigest() 886 887 def _get_cached_response(self): 888 """Returns a file object of the cached response.""" 889 890 if not self._is_cached(): 891 response = self._download_response() 892 self.cache.set_xml(self._get_cache_key(), response) 893 894 return self.cache.get_xml(self._get_cache_key()) 895 896 def _is_cached(self): 897 """Returns True if the request is already in cache.""" 898 899 return self._get_cache_key() in self.cache 900 901 def _download_response(self): 902 """Returns a response body string from the server.""" 903 904 if self.network.limit_rate: 905 self.network._delay_call() 906 907 username = self.params.pop("username", None) 908 username = f"?username={username}" if username is not None else "" 909 910 data = [] 911 for name in self.params.keys(): 912 data.append("=".join((name, quote_plus(_string(self.params[name]))))) 913 data = "&".join(data) 914 915 headers = { 916 "Content-type": "application/x-www-form-urlencoded", 917 "Accept-Charset": "utf-8", 918 "User-Agent": "pylast/" + __version__, 919 } 920 921 (host_name, host_subdir) = self.network.ws_server 922 923 if self.network.is_proxy_enabled(): 924 conn = HTTPSConnection( 925 context=SSL_CONTEXT, 926 host=self.network._get_proxy()[0], 927 port=self.network._get_proxy()[1], 928 ) 929 930 try: 931 conn.request( 932 method="POST", 933 url=f"https://{host_name}{host_subdir}{username}", 934 body=data, 935 headers=headers, 936 ) 937 except Exception as e: 938 raise NetworkError(self.network, e) from e 939 940 else: 941 conn = HTTPSConnection(context=SSL_CONTEXT, host=host_name) 942 943 try: 944 conn.request( 945 method="POST", 946 url=f"{host_subdir}{username}", 947 body=data, 948 headers=headers, 949 ) 950 except Exception as e: 951 raise NetworkError(self.network, e) from e 952 953 try: 954 response = conn.getresponse() 955 if response.status in [500, 502, 503, 504]: 956 raise WSError( 957 self.network, 958 response.status, 959 "Connection to the API failed with HTTP code " 960 + str(response.status), 961 ) 962 response_text = _unicode(response.read()) 963 except Exception as e: 964 raise MalformedResponseError(self.network, e) from e 965 966 try: 967 self._check_response_for_errors(response_text) 968 finally: 969 conn.close() 970 return response_text 971 972 def execute(self, cacheable=False): 973 """Returns the XML DOM response of the POST Request from the server""" 974 975 if self.network.is_caching_enabled() and cacheable: 976 response = self._get_cached_response() 977 else: 978 response = self._download_response() 979 980 return minidom.parseString(_string(response).replace("opensearch:", "")) 981 982 def _check_response_for_errors(self, response): 983 """Checks the response for errors and raises one if any exists.""" 984 985 try: 986 doc = minidom.parseString(_string(response).replace("opensearch:", "")) 987 except Exception as e: 988 raise MalformedResponseError(self.network, e) from e 989 990 e = doc.getElementsByTagName("lfm")[0] 991 # logger.debug(doc.toprettyxml()) 992 993 if e.getAttribute("status") != "ok": 994 e = doc.getElementsByTagName("error")[0] 995 status = e.getAttribute("code") 996 details = e.firstChild.data.strip() 997 raise WSError(self.network, status, details) 998 999 1000class SessionKeyGenerator: 1001 """Methods of generating a session key: 1002 1) Web Authentication: 1003 a. network = get_*_network(API_KEY, API_SECRET) 1004 b. sg = SessionKeyGenerator(network) 1005 c. url = sg.get_web_auth_url() 1006 d. Ask the user to open the URL and authorize you, and wait for it. 1007 e. session_key = sg.get_web_auth_session_key(url) 1008 2) Username and Password Authentication: 1009 a. network = get_*_network(API_KEY, API_SECRET) 1010 b. username = raw_input("Please enter your username: ") 1011 c. password_hash = pylast.md5(raw_input("Please enter your password: ") 1012 d. session_key = SessionKeyGenerator(network).get_session_key(username, 1013 password_hash) 1014 1015 A session key's lifetime is infinite, unless the user revokes the rights 1016 of the given API Key. 1017 1018 If you create a Network object with just a API_KEY and API_SECRET and a 1019 username and a password_hash, a SESSION_KEY will be automatically generated 1020 for that network and stored in it so you don't have to do this manually, 1021 unless you want to. 1022 """ 1023 1024 def __init__(self, network): 1025 self.network = network 1026 self.web_auth_tokens = {} 1027 1028 def _get_web_auth_token(self): 1029 """ 1030 Retrieves a token from the network for web authentication. 1031 The token then has to be authorized from getAuthURL before creating 1032 session. 1033 """ 1034 1035 request = _Request(self.network, "auth.getToken") 1036 1037 # default action is that a request is signed only when 1038 # a session key is provided. 1039 request.sign_it() 1040 1041 doc = request.execute() 1042 1043 e = doc.getElementsByTagName("token")[0] 1044 return e.firstChild.data 1045 1046 def get_web_auth_url(self): 1047 """ 1048 The user must open this page, and you first, then 1049 call get_web_auth_session_key(url) after that. 1050 """ 1051 1052 token = self._get_web_auth_token() 1053 1054 url = "{homepage}/api/auth/?api_key={api}&token={token}".format( 1055 homepage=self.network.homepage, api=self.network.api_key, token=token 1056 ) 1057 1058 self.web_auth_tokens[url] = token 1059 1060 return url 1061 1062 def get_web_auth_session_key_username(self, url, token=""): 1063 """ 1064 Retrieves the session key/username of a web authorization process by its URL. 1065 """ 1066 1067 if url in self.web_auth_tokens.keys(): 1068 token = self.web_auth_tokens[url] 1069 1070 request = _Request(self.network, "auth.getSession", {"token": token}) 1071 1072 # default action is that a request is signed only when 1073 # a session key is provided. 1074 request.sign_it() 1075 1076 doc = request.execute() 1077 1078 session_key = doc.getElementsByTagName("key")[0].firstChild.data 1079 username = doc.getElementsByTagName("name")[0].firstChild.data 1080 return session_key, username 1081 1082 def get_web_auth_session_key(self, url, token=""): 1083 """ 1084 Retrieves the session key of a web authorization process by its URL. 1085 """ 1086 session_key, _username = self.get_web_auth_session_key_username(url, token) 1087 return session_key 1088 1089 def get_session_key(self, username, password_hash): 1090 """ 1091 Retrieve a session key with a username and a md5 hash of the user's 1092 password. 1093 """ 1094 1095 params = {"username": username, "authToken": md5(username + password_hash)} 1096 request = _Request(self.network, "auth.getMobileSession", params) 1097 1098 # default action is that a request is signed only when 1099 # a session key is provided. 1100 request.sign_it() 1101 1102 doc = request.execute() 1103 1104 return _extract(doc, "key") 1105 1106 1107TopItem = collections.namedtuple("TopItem", ["item", "weight"]) 1108SimilarItem = collections.namedtuple("SimilarItem", ["item", "match"]) 1109LibraryItem = collections.namedtuple("LibraryItem", ["item", "playcount", "tagcount"]) 1110PlayedTrack = collections.namedtuple( 1111 "PlayedTrack", ["track", "album", "playback_date", "timestamp"] 1112) 1113LovedTrack = collections.namedtuple("LovedTrack", ["track", "date", "timestamp"]) 1114ImageSizes = collections.namedtuple( 1115 "ImageSizes", ["original", "large", "largesquare", "medium", "small", "extralarge"] 1116) 1117Image = collections.namedtuple( 1118 "Image", ["title", "url", "dateadded", "format", "owner", "sizes", "votes"] 1119) 1120 1121 1122def _string_output(func): 1123 def r(*args): 1124 return _string(func(*args)) 1125 1126 return r 1127 1128 1129class _BaseObject: 1130 """An abstract webservices object.""" 1131 1132 network = None 1133 1134 def __init__(self, network, ws_prefix): 1135 self.network = network 1136 self.ws_prefix = ws_prefix 1137 1138 def _request(self, method_name, cacheable=False, params=None): 1139 if not params: 1140 params = self._get_params() 1141 1142 return _Request(self.network, method_name, params).execute(cacheable) 1143 1144 def _get_params(self): 1145 """Returns the most common set of parameters between all objects.""" 1146 1147 return {} 1148 1149 def __hash__(self): 1150 # Convert any ints (or whatever) into strings 1151 values = map(str, self._get_params().values()) 1152 1153 return hash(self.network) + hash( 1154 str(type(self)) 1155 + "".join(list(self._get_params().keys()) + list(values)).lower() 1156 ) 1157 1158 def _extract_cdata_from_request(self, method_name, tag_name, params): 1159 doc = self._request(method_name, True, params) 1160 1161 first_child = doc.getElementsByTagName(tag_name)[0].firstChild 1162 1163 if first_child is None: 1164 return None 1165 1166 return first_child.wholeText.strip() 1167 1168 def _get_things( 1169 self, method, thing_type, params=None, cacheable=True, stream=False 1170 ): 1171 """Returns a list of the most played thing_types by this thing.""" 1172 1173 def _stream_get_things(): 1174 limit = params.get("limit", 50) 1175 nodes = _collect_nodes( 1176 limit, 1177 self, 1178 self.ws_prefix + "." + method, 1179 cacheable, 1180 params, 1181 stream=stream, 1182 ) 1183 for node in nodes: 1184 title = _extract(node, "name") 1185 artist = _extract(node, "name", 1) 1186 playcount = _number(_extract(node, "playcount")) 1187 1188 yield TopItem(thing_type(artist, title, self.network), playcount) 1189 1190 return _stream_get_things() if stream else list(_stream_get_things()) 1191 1192 def get_wiki_published_date(self): 1193 """ 1194 Returns the summary of the wiki. 1195 Only for Album/Track. 1196 """ 1197 return self.get_wiki("published") 1198 1199 def get_wiki_summary(self): 1200 """ 1201 Returns the summary of the wiki. 1202 Only for Album/Track. 1203 """ 1204 return self.get_wiki("summary") 1205 1206 def get_wiki_content(self): 1207 """ 1208 Returns the summary of the wiki. 1209 Only for Album/Track. 1210 """ 1211 return self.get_wiki("content") 1212 1213 def get_wiki(self, section): 1214 """ 1215 Returns a section of the wiki. 1216 Only for Album/Track. 1217 section can be "content", "summary" or 1218 "published" (for published date) 1219 """ 1220 1221 doc = self._request(self.ws_prefix + ".getInfo", True) 1222 1223 if len(doc.getElementsByTagName("wiki")) == 0: 1224 return 1225 1226 node = doc.getElementsByTagName("wiki")[0] 1227 1228 return _extract(node, section) 1229 1230 1231class _Chartable(_BaseObject): 1232 """Common functions for classes with charts.""" 1233 1234 def __init__(self, network, ws_prefix): 1235 super().__init__(network=network, ws_prefix=ws_prefix) 1236 1237 def get_weekly_chart_dates(self): 1238 """Returns a list of From and To tuples for the available charts.""" 1239 1240 doc = self._request(self.ws_prefix + ".getWeeklyChartList", True) 1241 1242 seq = [] 1243 for node in doc.getElementsByTagName("chart"): 1244 seq.append((node.getAttribute("from"), node.getAttribute("to"))) 1245 1246 return seq 1247 1248 def get_weekly_album_charts(self, from_date=None, to_date=None): 1249 """ 1250 Returns the weekly album charts for the week starting from the 1251 from_date value to the to_date value. 1252 Only for User. 1253 """ 1254 return self.get_weekly_charts("album", from_date, to_date) 1255 1256 def get_weekly_artist_charts(self, from_date=None, to_date=None): 1257 """ 1258 Returns the weekly artist charts for the week starting from the 1259 from_date value to the to_date value. 1260 Only for User. 1261 """ 1262 return self.get_weekly_charts("artist", from_date, to_date) 1263 1264 def get_weekly_track_charts(self, from_date=None, to_date=None): 1265 """ 1266 Returns the weekly track charts for the week starting from the 1267 from_date value to the to_date value. 1268 Only for User. 1269 """ 1270 return self.get_weekly_charts("track", from_date, to_date) 1271 1272 def get_weekly_charts(self, chart_kind, from_date=None, to_date=None): 1273 """ 1274 Returns the weekly charts for the week starting from the 1275 from_date value to the to_date value. 1276 chart_kind should be one of "album", "artist" or "track" 1277 """ 1278 method = ".getWeekly" + chart_kind.title() + "Chart" 1279 chart_type = eval(chart_kind.title()) # string to type 1280 1281 params = self._get_params() 1282 if from_date and to_date: 1283 params["from"] = from_date 1284 params["to"] = to_date 1285 1286 doc = self._request(self.ws_prefix + method, True, params) 1287 1288 seq = [] 1289 for node in doc.getElementsByTagName(chart_kind.lower()): 1290 if chart_kind == "artist": 1291 item = chart_type(_extract(node, "name"), self.network) 1292 else: 1293 item = chart_type( 1294 _extract(node, "artist"), _extract(node, "name"), self.network 1295 ) 1296 weight = _number(_extract(node, "playcount")) 1297 seq.append(TopItem(item, weight)) 1298 1299 return seq 1300 1301 1302class _Taggable(_BaseObject): 1303 """Common functions for classes with tags.""" 1304 1305 def __init__(self, network, ws_prefix): 1306 super().__init__(network=network, ws_prefix=ws_prefix) 1307 1308 def add_tags(self, tags): 1309 """Adds one or several tags. 1310 * tags: A sequence of tag names or Tag objects. 1311 """ 1312 1313 for tag in tags: 1314 self.add_tag(tag) 1315 1316 def add_tag(self, tag): 1317 """Adds one tag. 1318 * tag: a tag name or a Tag object. 1319 """ 1320 1321 if isinstance(tag, Tag): 1322 tag = tag.get_name() 1323 1324 params = self._get_params() 1325 params["tags"] = tag 1326 1327 self._request(self.ws_prefix + ".addTags", False, params) 1328 1329 def remove_tag(self, tag): 1330 """Remove a user's tag from this object.""" 1331 1332 if isinstance(tag, Tag): 1333 tag = tag.get_name() 1334 1335 params = self._get_params() 1336 params["tag"] = tag 1337 1338 self._request(self.ws_prefix + ".removeTag", False, params) 1339 1340 def get_tags(self): 1341 """Returns a list of the tags set by the user to this object.""" 1342 1343 # Uncacheable because it can be dynamically changed by the user. 1344 params = self._get_params() 1345 1346 doc = self._request(self.ws_prefix + ".getTags", False, params) 1347 tag_names = _extract_all(doc, "name") 1348 tags = [] 1349 for tag in tag_names: 1350 tags.append(Tag(tag, self.network)) 1351 1352 return tags 1353 1354 def remove_tags(self, tags): 1355 """Removes one or several tags from this object. 1356 * tags: a sequence of tag names or Tag objects. 1357 """ 1358 1359 for tag in tags: 1360 self.remove_tag(tag) 1361 1362 def clear_tags(self): 1363 """Clears all the user-set tags.""" 1364 1365 self.remove_tags(*(self.get_tags())) 1366 1367 def set_tags(self, tags): 1368 """Sets this object's tags to only those tags. 1369 * tags: a sequence of tag names or Tag objects. 1370 """ 1371 1372 c_old_tags = [] 1373 old_tags = [] 1374 c_new_tags = [] 1375 new_tags = [] 1376 1377 to_remove = [] 1378 to_add = [] 1379 1380 tags_on_server = self.get_tags() 1381 1382 for tag in tags_on_server: 1383 c_old_tags.append(tag.get_name().lower()) 1384 old_tags.append(tag.get_name()) 1385 1386 for tag in tags: 1387 c_new_tags.append(tag.lower()) 1388 new_tags.append(tag) 1389 1390 for i in range(0, len(old_tags)): 1391 if not c_old_tags[i] in c_new_tags: 1392 to_remove.append(old_tags[i]) 1393 1394 for i in range(0, len(new_tags)): 1395 if not c_new_tags[i] in c_old_tags: 1396 to_add.append(new_tags[i]) 1397 1398 self.remove_tags(to_remove) 1399 self.add_tags(to_add) 1400 1401 def get_top_tags(self, limit=None): 1402 """Returns a list of the most frequently used Tags on this object.""" 1403 1404 doc = self._request(self.ws_prefix + ".getTopTags", True) 1405 1406 elements = doc.getElementsByTagName("tag") 1407 seq = [] 1408 1409 for element in elements: 1410 tag_name = _extract(element, "name") 1411 tag_count = _extract(element, "count") 1412 1413 seq.append(TopItem(Tag(tag_name, self.network), tag_count)) 1414 1415 if limit: 1416 seq = seq[:limit] 1417 1418 return seq 1419 1420 1421class PyLastError(Exception): 1422 """Generic exception raised by PyLast""" 1423 1424 pass 1425 1426 1427class WSError(PyLastError): 1428 """Exception related to the Network web service""" 1429 1430 def __init__(self, network, status, details): 1431 self.status = status 1432 self.details = details 1433 self.network = network 1434 1435 @_string_output 1436 def __str__(self): 1437 return self.details 1438 1439 def get_id(self): 1440 """Returns the exception ID, from one of the following: 1441 STATUS_INVALID_SERVICE = 2 1442 STATUS_INVALID_METHOD = 3 1443 STATUS_AUTH_FAILED = 4 1444 STATUS_INVALID_FORMAT = 5 1445 STATUS_INVALID_PARAMS = 6 1446 STATUS_INVALID_RESOURCE = 7 1447 STATUS_OPERATION_FAILED = 8 1448 STATUS_INVALID_SK = 9 1449 STATUS_INVALID_API_KEY = 10 1450 STATUS_OFFLINE = 11 1451 STATUS_SUBSCRIBERS_ONLY = 12 1452 STATUS_TOKEN_UNAUTHORIZED = 14 1453 STATUS_TOKEN_EXPIRED = 15 1454 STATUS_TEMPORARILY_UNAVAILABLE = 16 1455 STATUS_LOGIN_REQUIRED = 17 1456 STATUS_TRIAL_EXPIRED = 18 1457 STATUS_NOT_ENOUGH_CONTENT = 20 1458 STATUS_NOT_ENOUGH_MEMBERS = 21 1459 STATUS_NOT_ENOUGH_FANS = 22 1460 STATUS_NOT_ENOUGH_NEIGHBOURS = 23 1461 STATUS_NO_PEAK_RADIO = 24 1462 STATUS_RADIO_NOT_FOUND = 25 1463 STATUS_API_KEY_SUSPENDED = 26 1464 STATUS_DEPRECATED = 27 1465 STATUS_RATE_LIMIT_EXCEEDED = 29 1466 """ 1467 1468 return self.status 1469 1470 1471class MalformedResponseError(PyLastError): 1472 """Exception conveying a malformed response from the music network.""" 1473 1474 def __init__(self, network, underlying_error): 1475 self.network = network 1476 self.underlying_error = underlying_error 1477 1478 def __str__(self): 1479 return "Malformed response from {}. Underlying error: {}".format( 1480 self.network.name, str(self.underlying_error) 1481 ) 1482 1483 1484class NetworkError(PyLastError): 1485 """Exception conveying a problem in sending a request to Last.fm""" 1486 1487 def __init__(self, network, underlying_error): 1488 self.network = network 1489 self.underlying_error = underlying_error 1490 1491 def __str__(self): 1492 return "NetworkError: %s" % str(self.underlying_error) 1493 1494 1495class _Opus(_Taggable): 1496 """An album or track.""" 1497 1498 artist = None 1499 title = None 1500 username = None 1501 1502 __hash__ = _BaseObject.__hash__ 1503 1504 def __init__(self, artist, title, network, ws_prefix, username=None, info=None): 1505 """ 1506 Create an opus instance. 1507 # Parameters: 1508 * artist: An artist name or an Artist object. 1509 * title: The album or track title. 1510 * ws_prefix: 'album' or 'track' 1511 """ 1512 1513 if info is None: 1514 info = {} 1515 1516 super().__init__(network=network, ws_prefix=ws_prefix) 1517 1518 if isinstance(artist, Artist): 1519 self.artist = artist 1520 else: 1521 self.artist = Artist(artist, self.network) 1522 1523 self.title = title 1524 self.username = ( 1525 username if username else network.username 1526 ) # Default to current user 1527 self.info = info 1528 1529 def __repr__(self): 1530 return "pylast.{}({}, {}, {})".format( 1531 self.ws_prefix.title(), 1532 repr(self.artist.name), 1533 repr(self.title), 1534 repr(self.network), 1535 ) 1536 1537 @_string_output 1538 def __str__(self): 1539 return _unicode("%s - %s") % (self.get_artist().get_name(), self.get_title()) 1540 1541 def __eq__(self, other): 1542 if type(self) != type(other): 1543 return False 1544 a = self.get_title().lower() 1545 b = other.get_title().lower() 1546 c = self.get_artist().get_name().lower() 1547 d = other.get_artist().get_name().lower() 1548 return (a == b) and (c == d) 1549 1550 def __ne__(self, other): 1551 return not self == other 1552 1553 def _get_params(self): 1554 return { 1555 "artist": self.get_artist().get_name(), 1556 self.ws_prefix: self.get_title(), 1557 } 1558 1559 def get_artist(self): 1560 """Returns the associated Artist object.""" 1561 1562 return self.artist 1563 1564 def get_cover_image(self, size=SIZE_EXTRA_LARGE): 1565 """ 1566 Returns a URI to the cover image 1567 size can be one of: 1568 SIZE_EXTRA_LARGE 1569 SIZE_LARGE 1570 SIZE_MEDIUM 1571 SIZE_SMALL 1572 """ 1573 if "image" not in self.info: 1574 self.info["image"] = _extract_all( 1575 self._request(self.ws_prefix + ".getInfo", cacheable=True), "image" 1576 ) 1577 return self.info["image"][size] 1578 1579 def get_title(self, properly_capitalized=False): 1580 """Returns the artist or track title.""" 1581 if properly_capitalized: 1582 self.title = _extract( 1583 self._request(self.ws_prefix + ".getInfo", True), "name" 1584 ) 1585 1586 return self.title 1587 1588 def get_name(self, properly_capitalized=False): 1589 """Returns the album or track title (alias to get_title()).""" 1590 1591 return self.get_title(properly_capitalized) 1592 1593 def get_playcount(self): 1594 """Returns the number of plays on the network""" 1595 1596 return _number( 1597 _extract( 1598 self._request(self.ws_prefix + ".getInfo", cacheable=True), "playcount" 1599 ) 1600 ) 1601 1602 def get_userplaycount(self): 1603 """Returns the number of plays by a given username""" 1604 1605 if not self.username: 1606 return 1607 1608 params = self._get_params() 1609 params["username"] = self.username 1610 1611 doc = self._request(self.ws_prefix + ".getInfo", True, params) 1612 return _number(_extract(doc, "userplaycount")) 1613 1614 def get_listener_count(self): 1615 """Returns the number of listeners on the network""" 1616 1617 return _number( 1618 _extract( 1619 self._request(self.ws_prefix + ".getInfo", cacheable=True), "listeners" 1620 ) 1621 ) 1622 1623 def get_mbid(self): 1624 """Returns the MusicBrainz ID of the album or track.""" 1625 1626 doc = self._request(self.ws_prefix + ".getInfo", cacheable=True) 1627 1628 try: 1629 lfm = doc.getElementsByTagName("lfm")[0] 1630 opus = next(self._get_children_by_tag_name(lfm, self.ws_prefix)) 1631 mbid = next(self._get_children_by_tag_name(opus, "mbid")) 1632 return mbid.firstChild.nodeValue 1633 except StopIteration: 1634 return None 1635 1636 def _get_children_by_tag_name(self, node, tag_name): 1637 for child in node.childNodes: 1638 if child.nodeType == child.ELEMENT_NODE and ( 1639 tag_name == "*" or child.tagName == tag_name 1640 ): 1641 yield child 1642 1643 1644class Album(_Opus): 1645 """An album.""" 1646 1647 __hash__ = _Opus.__hash__ 1648 1649 def __init__(self, artist, title, network, username=None, info=None): 1650 super().__init__(artist, title, network, "album", username, info) 1651 1652 def get_tracks(self): 1653 """Returns the list of Tracks on this album.""" 1654 1655 return _extract_tracks( 1656 self._request(self.ws_prefix + ".getInfo", cacheable=True), self.network 1657 ) 1658 1659 def get_url(self, domain_name=DOMAIN_ENGLISH): 1660 """Returns the URL of the album or track page on the network. 1661 # Parameters: 1662 * domain_name str: The network's language domain. Possible values: 1663 o DOMAIN_ENGLISH 1664 o DOMAIN_GERMAN 1665 o DOMAIN_SPANISH 1666 o DOMAIN_FRENCH 1667 o DOMAIN_ITALIAN 1668 o DOMAIN_POLISH 1669 o DOMAIN_PORTUGUESE 1670 o DOMAIN_SWEDISH 1671 o DOMAIN_TURKISH 1672 o DOMAIN_RUSSIAN 1673 o DOMAIN_JAPANESE 1674 o DOMAIN_CHINESE 1675 """ 1676 1677 artist = _url_safe(self.get_artist().get_name()) 1678 title = _url_safe(self.get_title()) 1679 1680 return self.network._get_url(domain_name, self.ws_prefix) % { 1681 "artist": artist, 1682 "album": title, 1683 } 1684 1685 1686class Artist(_Taggable): 1687 """An artist.""" 1688 1689 name = None 1690 username = None 1691 1692 __hash__ = _BaseObject.__hash__ 1693 1694 def __init__(self, name, network, username=None, info=None): 1695 """Create an artist object. 1696 # Parameters: 1697 * name str: The artist's name. 1698 """ 1699 1700 if info is None: 1701 info = {} 1702 1703 super().__init__(network=network, ws_prefix="artist") 1704 1705 self.name = name 1706 self.username = username 1707 self.info = info 1708 1709 def __repr__(self): 1710 return f"pylast.Artist({repr(self.get_name())}, {repr(self.network)})" 1711 1712 def __unicode__(self): 1713 return str(self.get_name()) 1714 1715 @_string_output 1716 def __str__(self): 1717 return self.__unicode__() 1718 1719 def __eq__(self, other): 1720 if type(self) is type(other): 1721 return self.get_name().lower() == other.get_name().lower() 1722 else: 1723 return False 1724 1725 def __ne__(self, other): 1726 return not self == other 1727 1728 def _get_params(self): 1729 return {self.ws_prefix: self.get_name()} 1730 1731 def get_name(self, properly_capitalized=False): 1732 """Returns the name of the artist. 1733 If properly_capitalized was asserted then the name would be downloaded 1734 overwriting the given one.""" 1735 1736 if properly_capitalized: 1737 self.name = _extract( 1738 self._request(self.ws_prefix + ".getInfo", True), "name" 1739 ) 1740 1741 return self.name 1742 1743 def get_correction(self): 1744 """Returns the corrected artist name.""" 1745 1746 return _extract(self._request(self.ws_prefix + ".getCorrection"), "name") 1747 1748 def get_playcount(self): 1749 """Returns the number of plays on the network.""" 1750 1751 return _number( 1752 _extract(self._request(self.ws_prefix + ".getInfo", True), "playcount") 1753 ) 1754 1755 def get_userplaycount(self): 1756 """Returns the number of plays by a given username""" 1757 1758 if not self.username: 1759 return 1760 1761 params = self._get_params() 1762 params["username"] = self.username 1763 1764 doc = self._request(self.ws_prefix + ".getInfo", True, params) 1765 return _number(_extract(doc, "userplaycount")) 1766 1767 def get_mbid(self): 1768 """Returns the MusicBrainz ID of this artist.""" 1769 1770 doc = self._request(self.ws_prefix + ".getInfo", True) 1771 1772 return _extract(doc, "mbid") 1773 1774 def get_listener_count(self): 1775 """Returns the number of listeners on the network.""" 1776 1777 if hasattr(self, "listener_count"): 1778 return self.listener_count 1779 else: 1780 self.listener_count = _number( 1781 _extract(self._request(self.ws_prefix + ".getInfo", True), "listeners") 1782 ) 1783 return self.listener_count 1784 1785 def is_streamable(self): 1786 """Returns True if the artist is streamable.""" 1787 1788 return bool( 1789 _number( 1790 _extract(self._request(self.ws_prefix + ".getInfo", True), "streamable") 1791 ) 1792 ) 1793 1794 def get_bio(self, section, language=None): 1795 """ 1796 Returns a section of the bio. 1797 section can be "content", "summary" or 1798 "published" (for published date) 1799 """ 1800 if language: 1801 params = self._get_params() 1802 params["lang"] = language 1803 else: 1804 params = None 1805 1806 try: 1807 bio = self._extract_cdata_from_request( 1808 self.ws_prefix + ".getInfo", section, params 1809 ) 1810 except IndexError: 1811 bio = None 1812 1813 return bio 1814 1815 def get_bio_published_date(self): 1816 """Returns the date on which the artist's biography was published.""" 1817 return self.get_bio("published") 1818 1819 def get_bio_summary(self, language=None): 1820 """Returns the summary of the artist's biography.""" 1821 return self.get_bio("summary", language) 1822 1823 def get_bio_content(self, language=None): 1824 """Returns the content of the artist's biography.""" 1825 return self.get_bio("content", language) 1826 1827 def get_similar(self, limit=None): 1828 """Returns the similar artists on the network.""" 1829 1830 params = self._get_params() 1831 if limit: 1832 params["limit"] = limit 1833 1834 doc = self._request(self.ws_prefix + ".getSimilar", True, params) 1835 1836 names = _extract_all(doc, "name") 1837 matches = _extract_all(doc, "match") 1838 1839 artists = [] 1840 for i in range(0, len(names)): 1841 artists.append( 1842 SimilarItem(Artist(names[i], self.network), _number(matches[i])) 1843 ) 1844 1845 return artists 1846 1847 def get_top_albums(self, limit=None, cacheable=True, stream=False): 1848 """Returns a list of the top albums.""" 1849 params = self._get_params() 1850 if limit: 1851 params["limit"] = limit 1852 1853 return self._get_things("getTopAlbums", Album, params, cacheable, stream=stream) 1854 1855 def get_top_tracks(self, limit=None, cacheable=True, stream=False): 1856 """Returns a list of the most played Tracks by this artist.""" 1857 params = self._get_params() 1858 if limit: 1859 params["limit"] = limit 1860 1861 return self._get_things("getTopTracks", Track, params, cacheable, stream=stream) 1862 1863 def get_url(self, domain_name=DOMAIN_ENGLISH): 1864 """Returns the URL of the artist page on the network. 1865 # Parameters: 1866 * domain_name: The network's language domain. Possible values: 1867 o DOMAIN_ENGLISH 1868 o DOMAIN_GERMAN 1869 o DOMAIN_SPANISH 1870 o DOMAIN_FRENCH 1871 o DOMAIN_ITALIAN 1872 o DOMAIN_POLISH 1873 o DOMAIN_PORTUGUESE 1874 o DOMAIN_SWEDISH 1875 o DOMAIN_TURKISH 1876 o DOMAIN_RUSSIAN 1877 o DOMAIN_JAPANESE 1878 o DOMAIN_CHINESE 1879 """ 1880 1881 artist = _url_safe(self.get_name()) 1882 1883 return self.network._get_url(domain_name, "artist") % {"artist": artist} 1884 1885 1886class Country(_BaseObject): 1887 """A country at Last.fm.""" 1888 1889 name = None 1890 1891 __hash__ = _BaseObject.__hash__ 1892 1893 def __init__(self, name, network): 1894 super().__init__(network=network, ws_prefix="geo") 1895 1896 self.name = name 1897 1898 def __repr__(self): 1899 return f"pylast.Country({repr(self.name)}, {repr(self.network)})" 1900 1901 @_string_output 1902 def __str__(self): 1903 return self.get_name() 1904 1905 def __eq__(self, other): 1906 return self.get_name().lower() == other.get_name().lower() 1907 1908 def __ne__(self, other): 1909 return not self == other 1910 1911 def _get_params(self): # TODO can move to _BaseObject 1912 return {"country": self.get_name()} 1913 1914 def get_name(self): 1915 """Returns the country name.""" 1916 1917 return self.name 1918 1919 def get_top_artists(self, limit=None, cacheable=True): 1920 """Returns a sequence of the most played artists.""" 1921 params = self._get_params() 1922 if limit: 1923 params["limit"] = limit 1924 1925 doc = self._request("geo.getTopArtists", cacheable, params) 1926 1927 return _extract_top_artists(doc, self) 1928 1929 def get_top_tracks(self, limit=None, cacheable=True, stream=False): 1930 """Returns a sequence of the most played tracks""" 1931 params = self._get_params() 1932 if limit: 1933 params["limit"] = limit 1934 1935 return self._get_things("getTopTracks", Track, params, cacheable, stream=stream) 1936 1937 def get_url(self, domain_name=DOMAIN_ENGLISH): 1938 """Returns the URL of the country page on the network. 1939 * domain_name: The network's language domain. Possible values: 1940 o DOMAIN_ENGLISH 1941 o DOMAIN_GERMAN 1942 o DOMAIN_SPANISH 1943 o DOMAIN_FRENCH 1944 o DOMAIN_ITALIAN 1945 o DOMAIN_POLISH 1946 o DOMAIN_PORTUGUESE 1947 o DOMAIN_SWEDISH 1948 o DOMAIN_TURKISH 1949 o DOMAIN_RUSSIAN 1950 o DOMAIN_JAPANESE 1951 o DOMAIN_CHINESE 1952 """ 1953 1954 country_name = _url_safe(self.get_name()) 1955 1956 return self.network._get_url(domain_name, "country") % { 1957 "country_name": country_name 1958 } 1959 1960 1961class Library(_BaseObject): 1962 """A user's Last.fm library.""" 1963 1964 user = None 1965 1966 __hash__ = _BaseObject.__hash__ 1967 1968 def __init__(self, user, network): 1969 super().__init__(network=network, ws_prefix="library") 1970 1971 if isinstance(user, User): 1972 self.user = user 1973 else: 1974 self.user = User(user, self.network) 1975 1976 def __repr__(self): 1977 return f"pylast.Library({repr(self.user)}, {repr(self.network)})" 1978 1979 @_string_output 1980 def __str__(self): 1981 return repr(self.get_user()) + "'s Library" 1982 1983 def _get_params(self): 1984 return {"user": self.user.get_name()} 1985 1986 def get_user(self): 1987 """Returns the user who owns this library.""" 1988 return self.user 1989 1990 def get_artists(self, limit=50, cacheable=True, stream=False): 1991 """ 1992 Returns a sequence of Album objects 1993 if limit==None it will return all (may take a while) 1994 """ 1995 1996 def _get_artists(): 1997 for node in _collect_nodes( 1998 limit, self, self.ws_prefix + ".getArtists", cacheable, stream=stream 1999 ): 2000 name = _extract(node, "name") 2001 2002 playcount = _number(_extract(node, "playcount")) 2003 tagcount = _number(_extract(node, "tagcount")) 2004 2005 yield LibraryItem(Artist(name, self.network), playcount, tagcount) 2006 2007 return _get_artists() if stream else list(_get_artists()) 2008 2009 2010class Tag(_Chartable): 2011 """A Last.fm object tag.""" 2012 2013 name = None 2014 2015 __hash__ = _BaseObject.__hash__ 2016 2017 def __init__(self, name, network): 2018 super().__init__(network=network, ws_prefix="tag") 2019 2020 self.name = name 2021 2022 def __repr__(self): 2023 return f"pylast.Tag({repr(self.name)}, {repr(self.network)})" 2024 2025 @_string_output 2026 def __str__(self): 2027 return self.get_name() 2028 2029 def __eq__(self, other): 2030 return self.get_name().lower() == other.get_name().lower() 2031 2032 def __ne__(self, other): 2033 return not self == other 2034 2035 def _get_params(self): 2036 return {self.ws_prefix: self.get_name()} 2037 2038 def get_name(self, properly_capitalized=False): 2039 """Returns the name of the tag.""" 2040 2041 if properly_capitalized: 2042 self.name = _extract( 2043 self._request(self.ws_prefix + ".getInfo", True), "name" 2044 ) 2045 2046 return self.name 2047 2048 def get_top_albums(self, limit=None, cacheable=True): 2049 """Returns a list of the top albums.""" 2050 params = self._get_params() 2051 if limit: 2052 params["limit"] = limit 2053 2054 doc = self._request(self.ws_prefix + ".getTopAlbums", cacheable, params) 2055 2056 return _extract_top_albums(doc, self.network) 2057 2058 def get_top_tracks(self, limit=None, cacheable=True, stream=False): 2059 """Returns a list of the most played Tracks for this tag.""" 2060 params = self._get_params() 2061 if limit: 2062 params["limit"] = limit 2063 2064 return self._get_things("getTopTracks", Track, params, cacheable, stream=stream) 2065 2066 def get_top_artists(self, limit=None, cacheable=True): 2067 """Returns a sequence of the most played artists.""" 2068 2069 params = self._get_params() 2070 if limit: 2071 params["limit"] = limit 2072 2073 doc = self._request(self.ws_prefix + ".getTopArtists", cacheable, params) 2074 2075 return _extract_top_artists(doc, self.network) 2076 2077 def get_url(self, domain_name=DOMAIN_ENGLISH): 2078 """Returns the URL of the tag page on the network. 2079 * domain_name: The network's language domain. Possible values: 2080 o DOMAIN_ENGLISH 2081 o DOMAIN_GERMAN 2082 o DOMAIN_SPANISH 2083 o DOMAIN_FRENCH 2084 o DOMAIN_ITALIAN 2085 o DOMAIN_POLISH 2086 o DOMAIN_PORTUGUESE 2087 o DOMAIN_SWEDISH 2088 o DOMAIN_TURKISH 2089 o DOMAIN_RUSSIAN 2090 o DOMAIN_JAPANESE 2091 o DOMAIN_CHINESE 2092 """ 2093 2094 name = _url_safe(self.get_name()) 2095 2096 return self.network._get_url(domain_name, "tag") % {"name": name} 2097 2098 2099class Track(_Opus): 2100 """A Last.fm track.""" 2101 2102 __hash__ = _Opus.__hash__ 2103 2104 def __init__(self, artist, title, network, username=None, info=None): 2105 super().__init__(artist, title, network, "track", username, info) 2106 2107 def get_correction(self): 2108 """Returns the corrected track name.""" 2109 2110 return _extract(self._request(self.ws_prefix + ".getCorrection"), "name") 2111 2112 def get_duration(self): 2113 """Returns the track duration.""" 2114 2115 doc = self._request(self.ws_prefix + ".getInfo", True) 2116 2117 return _number(_extract(doc, "duration")) 2118 2119 def get_userloved(self): 2120 """Whether the user loved this track""" 2121 2122 if not self.username: 2123 return 2124 2125 params = self._get_params() 2126 params["username"] = self.username 2127 2128 doc = self._request(self.ws_prefix + ".getInfo", True, params) 2129 loved = _number(_extract(doc, "userloved")) 2130 return bool(loved) 2131 2132 def is_streamable(self): 2133 """Returns True if the track is available at Last.fm.""" 2134 2135 doc = self._request(self.ws_prefix + ".getInfo", True) 2136 return _extract(doc, "streamable") == "1" 2137 2138 def is_fulltrack_available(self): 2139 """Returns True if the full track is available for streaming.""" 2140 2141 doc = self._request(self.ws_prefix + ".getInfo", True) 2142 return ( 2143 doc.getElementsByTagName("streamable")[0].getAttribute("fulltrack") == "1" 2144 ) 2145 2146 def get_album(self): 2147 """Returns the album object of this track.""" 2148 if "album" in self.info and self.info["album"] is not None: 2149 return Album(self.artist, self.info["album"], self.network) 2150 2151 doc = self._request(self.ws_prefix + ".getInfo", True) 2152 2153 albums = doc.getElementsByTagName("album") 2154 2155 if len(albums) == 0: 2156 return 2157 2158 node = doc.getElementsByTagName("album")[0] 2159 return Album(_extract(node, "artist"), _extract(node, "title"), self.network) 2160 2161 def love(self): 2162 """Adds the track to the user's loved tracks.""" 2163 2164 self._request(self.ws_prefix + ".love") 2165 2166 def unlove(self): 2167 """Remove the track to the user's loved tracks.""" 2168 2169 self._request(self.ws_prefix + ".unlove") 2170 2171 def get_similar(self, limit=None): 2172 """ 2173 Returns similar tracks for this track on the network, 2174 based on listening data. 2175 """ 2176 2177 params = self._get_params() 2178 if limit: 2179 params["limit"] = limit 2180 2181 doc = self._request(self.ws_prefix + ".getSimilar", True, params) 2182 2183 seq = [] 2184 for node in doc.getElementsByTagName(self.ws_prefix): 2185 title = _extract(node, "name") 2186 artist = _extract(node, "name", 1) 2187 match = _number(_extract(node, "match")) 2188 2189 seq.append(SimilarItem(Track(artist, title, self.network), match)) 2190 2191 return seq 2192 2193 def get_url(self, domain_name=DOMAIN_ENGLISH): 2194 """Returns the URL of the album or track page on the network. 2195 # Parameters: 2196 * domain_name str: The network's language domain. Possible values: 2197 o DOMAIN_ENGLISH 2198 o DOMAIN_GERMAN 2199 o DOMAIN_SPANISH 2200 o DOMAIN_FRENCH 2201 o DOMAIN_ITALIAN 2202 o DOMAIN_POLISH 2203 o DOMAIN_PORTUGUESE 2204 o DOMAIN_SWEDISH 2205 o DOMAIN_TURKISH 2206 o DOMAIN_RUSSIAN 2207 o DOMAIN_JAPANESE 2208 o DOMAIN_CHINESE 2209 """ 2210 2211 artist = _url_safe(self.get_artist().get_name()) 2212 title = _url_safe(self.get_title()) 2213 2214 return self.network._get_url(domain_name, self.ws_prefix) % { 2215 "artist": artist, 2216 "title": title, 2217 } 2218 2219 2220class User(_Chartable): 2221 """A Last.fm user.""" 2222 2223 name = None 2224 2225 __hash__ = _BaseObject.__hash__ 2226 2227 def __init__(self, user_name, network): 2228 super().__init__(network=network, ws_prefix="user") 2229 2230 self.name = user_name 2231 2232 def __repr__(self): 2233 return f"pylast.User({repr(self.name)}, {repr(self.network)})" 2234 2235 @_string_output 2236 def __str__(self): 2237 return self.get_name() 2238 2239 def __eq__(self, other): 2240 if isinstance(other, User): 2241 return self.get_name() == other.get_name() 2242 else: 2243 return False 2244 2245 def __ne__(self, other): 2246 return not self == other 2247 2248 def _get_params(self): 2249 return {self.ws_prefix: self.get_name()} 2250 2251 def _extract_played_track(self, track_node): 2252 title = _extract(track_node, "name") 2253 track_artist = _extract(track_node, "artist") 2254 date = _extract(track_node, "date") 2255 album = _extract(track_node, "album") 2256 timestamp = track_node.getElementsByTagName("date")[0].getAttribute("uts") 2257 return PlayedTrack( 2258 Track(track_artist, title, self.network), album, date, timestamp 2259 ) 2260 2261 def get_name(self, properly_capitalized=False): 2262 """Returns the user name.""" 2263 2264 if properly_capitalized: 2265 self.name = _extract( 2266 self._request(self.ws_prefix + ".getInfo", True), "name" 2267 ) 2268 2269 return self.name 2270 2271 def get_friends(self, limit=50, cacheable=False, stream=False): 2272 """Returns a list of the user's friends.""" 2273 2274 def _get_friends(): 2275 for node in _collect_nodes( 2276 limit, self, self.ws_prefix + ".getFriends", cacheable, stream=stream 2277 ): 2278 yield User(_extract(node, "name"), self.network) 2279 2280 return _get_friends() if stream else list(_get_friends()) 2281 2282 def get_loved_tracks(self, limit=50, cacheable=True, stream=False): 2283 """ 2284 Returns this user's loved track as a sequence of LovedTrack objects in 2285 reverse order of their timestamp, all the way back to the first track. 2286 2287 If limit==None, it will try to pull all the available data. 2288 If stream=True, it will yield tracks as soon as a page has been retrieved. 2289 2290 This method uses caching. Enable caching only if you're pulling a 2291 large amount of data. 2292 """ 2293 2294 def _get_loved_tracks(): 2295 params = self._get_params() 2296 if limit: 2297 params["limit"] = limit 2298 2299 for track in _collect_nodes( 2300 limit, 2301 self, 2302 self.ws_prefix + ".getLovedTracks", 2303 cacheable, 2304 params, 2305 stream=stream, 2306 ): 2307 try: 2308 artist = _extract(track, "name", 1) 2309 except IndexError: # pragma: no cover 2310 continue 2311 title = _extract(track, "name") 2312 date = _extract(track, "date") 2313 timestamp = track.getElementsByTagName("date")[0].getAttribute("uts") 2314 2315 yield LovedTrack(Track(artist, title, self.network), date, timestamp) 2316 2317 return _get_loved_tracks() if stream else list(_get_loved_tracks()) 2318 2319 def get_now_playing(self): 2320 """ 2321 Returns the currently playing track, or None if nothing is playing. 2322 """ 2323 2324 params = self._get_params() 2325 params["limit"] = "1" 2326 2327 doc = self._request(self.ws_prefix + ".getRecentTracks", False, params) 2328 2329 tracks = doc.getElementsByTagName("track") 2330 2331 if len(tracks) == 0: 2332 return None 2333 2334 e = tracks[0] 2335 2336 if not e.hasAttribute("nowplaying"): 2337 return None 2338 2339 artist = _extract(e, "artist") 2340 title = _extract(e, "name") 2341 info = {"album": _extract(e, "album"), "image": _extract_all(e, "image")} 2342 2343 return Track(artist, title, self.network, self.name, info=info) 2344 2345 def get_recent_tracks( 2346 self, 2347 limit=10, 2348 cacheable=True, 2349 time_from=None, 2350 time_to=None, 2351 stream=False, 2352 now_playing=False, 2353 ): 2354 """ 2355 Returns this user's played track as a sequence of PlayedTrack objects 2356 in reverse order of playtime, all the way back to the first track. 2357 2358 Parameters: 2359 limit : If None, it will try to pull all the available data. 2360 from (Optional) : Beginning timestamp of a range - only display 2361 scrobbles after this time, in UNIX timestamp format (integer 2362 number of seconds since 00:00:00, January 1st 1970 UTC). This 2363 must be in the UTC time zone. 2364 to (Optional) : End timestamp of a range - only display scrobbles 2365 before this time, in UNIX timestamp format (integer number of 2366 seconds since 00:00:00, January 1st 1970 UTC). This must be in 2367 the UTC time zone. 2368 stream: If True, it will yield tracks as soon as a page has been retrieved. 2369 2370 This method uses caching. Enable caching only if you're pulling a 2371 large amount of data. 2372 """ 2373 2374 def _get_recent_tracks(): 2375 params = self._get_params() 2376 if limit: 2377 params["limit"] = limit + 1 # in case we remove the now playing track 2378 if time_from: 2379 params["from"] = time_from 2380 if time_to: 2381 params["to"] = time_to 2382 2383 track_count = 0 2384 for track_node in _collect_nodes( 2385 limit + 1 if limit else None, 2386 self, 2387 self.ws_prefix + ".getRecentTracks", 2388 cacheable, 2389 params, 2390 stream=stream, 2391 ): 2392 if track_node.hasAttribute("nowplaying") and not now_playing: 2393 continue # to prevent the now playing track from sneaking in 2394 2395 if limit and track_count >= limit: 2396 break 2397 yield self._extract_played_track(track_node=track_node) 2398 track_count += 1 2399 2400 return _get_recent_tracks() if stream else list(_get_recent_tracks()) 2401 2402 def get_country(self): 2403 """Returns the name of the country of the user.""" 2404 2405 doc = self._request(self.ws_prefix + ".getInfo", True) 2406 2407 country = _extract(doc, "country") 2408 2409 if country is None or country == "None": 2410 return None 2411 else: 2412 return Country(country, self.network) 2413 2414 def is_subscriber(self): 2415 """Returns whether the user is a subscriber or not. True or False.""" 2416 2417 doc = self._request(self.ws_prefix + ".getInfo", True) 2418 2419 return _extract(doc, "subscriber") == "1" 2420 2421 def get_playcount(self): 2422 """Returns the user's playcount so far.""" 2423 2424 doc = self._request(self.ws_prefix + ".getInfo", True) 2425 2426 return _number(_extract(doc, "playcount")) 2427 2428 def get_registered(self): 2429 """Returns the user's registration date.""" 2430 2431 doc = self._request(self.ws_prefix + ".getInfo", True) 2432 2433 return _extract(doc, "registered") 2434 2435 def get_unixtime_registered(self): 2436 """Returns the user's registration date as a UNIX timestamp.""" 2437 2438 doc = self._request(self.ws_prefix + ".getInfo", True) 2439 2440 return int(doc.getElementsByTagName("registered")[0].getAttribute("unixtime")) 2441 2442 def get_tagged_albums(self, tag, limit=None, cacheable=True): 2443 """Returns the albums tagged by a user.""" 2444 2445 params = self._get_params() 2446 params["tag"] = tag 2447 params["taggingtype"] = "album" 2448 if limit: 2449 params["limit"] = limit 2450 doc = self._request(self.ws_prefix + ".getpersonaltags", cacheable, params) 2451 return _extract_albums(doc, self.network) 2452 2453 def get_tagged_artists(self, tag, limit=None): 2454 """Returns the artists tagged by a user.""" 2455 2456 params = self._get_params() 2457 params["tag"] = tag 2458 params["taggingtype"] = "artist" 2459 if limit: 2460 params["limit"] = limit 2461 doc = self._request(self.ws_prefix + ".getpersonaltags", True, params) 2462 return _extract_artists(doc, self.network) 2463 2464 def get_tagged_tracks(self, tag, limit=None, cacheable=True): 2465 """Returns the tracks tagged by a user.""" 2466 2467 params = self._get_params() 2468 params["tag"] = tag 2469 params["taggingtype"] = "track" 2470 if limit: 2471 params["limit"] = limit 2472 doc = self._request(self.ws_prefix + ".getpersonaltags", cacheable, params) 2473 return _extract_tracks(doc, self.network) 2474 2475 def get_top_albums(self, period=PERIOD_OVERALL, limit=None, cacheable=True): 2476 """Returns the top albums played by a user. 2477 * period: The period of time. Possible values: 2478 o PERIOD_OVERALL 2479 o PERIOD_7DAYS 2480 o PERIOD_1MONTH 2481 o PERIOD_3MONTHS 2482 o PERIOD_6MONTHS 2483 o PERIOD_12MONTHS 2484 """ 2485 2486 params = self._get_params() 2487 params["period"] = period 2488 if limit: 2489 params["limit"] = limit 2490 2491 doc = self._request(self.ws_prefix + ".getTopAlbums", cacheable, params) 2492 2493 return _extract_top_albums(doc, self.network) 2494 2495 def get_top_artists(self, period=PERIOD_OVERALL, limit=None): 2496 """Returns the top artists played by a user. 2497 * period: The period of time. Possible values: 2498 o PERIOD_OVERALL 2499 o PERIOD_7DAYS 2500 o PERIOD_1MONTH 2501 o PERIOD_3MONTHS 2502 o PERIOD_6MONTHS 2503 o PERIOD_12MONTHS 2504 """ 2505 2506 params = self._get_params() 2507 params["period"] = period 2508 if limit: 2509 params["limit"] = limit 2510 2511 doc = self._request(self.ws_prefix + ".getTopArtists", True, params) 2512 2513 return _extract_top_artists(doc, self.network) 2514 2515 def get_top_tags(self, limit=None, cacheable=True): 2516 """ 2517 Returns a sequence of the top tags used by this user with their counts 2518 as TopItem objects. 2519 * limit: The limit of how many tags to return. 2520 * cacheable: Whether to cache results. 2521 """ 2522 2523 params = self._get_params() 2524 if limit: 2525 params["limit"] = limit 2526 2527 doc = self._request(self.ws_prefix + ".getTopTags", cacheable, params) 2528 2529 seq = [] 2530 for node in doc.getElementsByTagName("tag"): 2531 seq.append( 2532 TopItem( 2533 Tag(_extract(node, "name"), self.network), _extract(node, "count") 2534 ) 2535 ) 2536 2537 return seq 2538 2539 def get_top_tracks( 2540 self, period=PERIOD_OVERALL, limit=None, cacheable=True, stream=False 2541 ): 2542 """Returns the top tracks played by a user. 2543 * period: The period of time. Possible values: 2544 o PERIOD_OVERALL 2545 o PERIOD_7DAYS 2546 o PERIOD_1MONTH 2547 o PERIOD_3MONTHS 2548 o PERIOD_6MONTHS 2549 o PERIOD_12MONTHS 2550 """ 2551 2552 params = self._get_params() 2553 params["period"] = period 2554 params["limit"] = limit 2555 2556 return self._get_things("getTopTracks", Track, params, cacheable, stream=stream) 2557 2558 def get_track_scrobbles(self, artist, track, cacheable=False, stream=False): 2559 """ 2560 Get a list of this user's scrobbles of this artist's track, 2561 including scrobble time. 2562 """ 2563 params = self._get_params() 2564 params["artist"] = artist 2565 params["track"] = track 2566 2567 def _get_track_scrobbles(): 2568 for track_node in _collect_nodes( 2569 None, 2570 self, 2571 self.ws_prefix + ".getTrackScrobbles", 2572 cacheable, 2573 params, 2574 stream=stream, 2575 ): 2576 yield self._extract_played_track(track_node) 2577 2578 return _get_track_scrobbles() if stream else list(_get_track_scrobbles()) 2579 2580 def get_image(self, size=SIZE_EXTRA_LARGE): 2581 """ 2582 Returns the user's avatar 2583 size can be one of: 2584 SIZE_EXTRA_LARGE 2585 SIZE_LARGE 2586 SIZE_MEDIUM 2587 SIZE_SMALL 2588 """ 2589 2590 doc = self._request(self.ws_prefix + ".getInfo", True) 2591 2592 return _extract_all(doc, "image")[size] 2593 2594 def get_url(self, domain_name=DOMAIN_ENGLISH): 2595 """Returns the URL of the user page on the network. 2596 * domain_name: The network's language domain. Possible values: 2597 o DOMAIN_ENGLISH 2598 o DOMAIN_GERMAN 2599 o DOMAIN_SPANISH 2600 o DOMAIN_FRENCH 2601 o DOMAIN_ITALIAN 2602 o DOMAIN_POLISH 2603 o DOMAIN_PORTUGUESE 2604 o DOMAIN_SWEDISH 2605 o DOMAIN_TURKISH 2606 o DOMAIN_RUSSIAN 2607 o DOMAIN_JAPANESE 2608 o DOMAIN_CHINESE 2609 """ 2610 2611 name = _url_safe(self.get_name()) 2612 2613 return self.network._get_url(domain_name, "user") % {"name": name} 2614 2615 def get_library(self): 2616 """Returns the associated Library object.""" 2617 2618 return Library(self, self.network) 2619 2620 2621class AuthenticatedUser(User): 2622 def __init__(self, network): 2623 super().__init__(user_name=network.username, network=network) 2624 2625 def _get_params(self): 2626 return {"user": self.get_name()} 2627 2628 def get_name(self, properly_capitalized=False): 2629 """Returns the name of the authenticated user.""" 2630 return super().get_name(properly_capitalized=properly_capitalized) 2631 2632 2633class _Search(_BaseObject): 2634 """An abstract class. Use one of its derivatives.""" 2635 2636 def __init__(self, ws_prefix, search_terms, network): 2637 super().__init__(network, ws_prefix) 2638 2639 self._ws_prefix = ws_prefix 2640 self.search_terms = search_terms 2641 2642 self._last_page_index = 0 2643 2644 def _get_params(self): 2645 params = {} 2646 2647 for key in self.search_terms.keys(): 2648 params[key] = self.search_terms[key] 2649 2650 return params 2651 2652 def get_total_result_count(self): 2653 """Returns the total count of all the results.""" 2654 2655 doc = self._request(self._ws_prefix + ".search", True) 2656 2657 return _extract(doc, "totalResults") 2658 2659 def _retrieve_page(self, page_index): 2660 """Returns the node of matches to be processed""" 2661 2662 params = self._get_params() 2663 params["page"] = str(page_index) 2664 doc = self._request(self._ws_prefix + ".search", True, params) 2665 2666 return doc.getElementsByTagName(self._ws_prefix + "matches")[0] 2667 2668 def _retrieve_next_page(self): 2669 self._last_page_index += 1 2670 return self._retrieve_page(self._last_page_index) 2671 2672 2673class AlbumSearch(_Search): 2674 """Search for an album by name.""" 2675 2676 def __init__(self, album_name, network): 2677 super().__init__( 2678 ws_prefix="album", search_terms={"album": album_name}, network=network 2679 ) 2680 2681 def get_next_page(self): 2682 """Returns the next page of results as a sequence of Album objects.""" 2683 2684 master_node = self._retrieve_next_page() 2685 2686 seq = [] 2687 for node in master_node.getElementsByTagName("album"): 2688 seq.append( 2689 Album( 2690 _extract(node, "artist"), 2691 _extract(node, "name"), 2692 self.network, 2693 info={"image": _extract_all(node, "image")}, 2694 ) 2695 ) 2696 2697 return seq 2698 2699 2700class ArtistSearch(_Search): 2701 """Search for an artist by artist name.""" 2702 2703 def __init__(self, artist_name, network): 2704 super().__init__( 2705 ws_prefix="artist", search_terms={"artist": artist_name}, network=network 2706 ) 2707 2708 def get_next_page(self): 2709 """Returns the next page of results as a sequence of Artist objects.""" 2710 2711 master_node = self._retrieve_next_page() 2712 2713 seq = [] 2714 for node in master_node.getElementsByTagName("artist"): 2715 artist = Artist( 2716 _extract(node, "name"), 2717 self.network, 2718 info={"image": _extract_all(node, "image")}, 2719 ) 2720 artist.listener_count = _number(_extract(node, "listeners")) 2721 seq.append(artist) 2722 2723 return seq 2724 2725 2726class TrackSearch(_Search): 2727 """ 2728 Search for a track by track title. If you don't want to narrow the results 2729 down by specifying the artist name, set it to empty string. 2730 """ 2731 2732 def __init__(self, artist_name, track_title, network): 2733 super().__init__( 2734 ws_prefix="track", 2735 search_terms={"track": track_title, "artist": artist_name}, 2736 network=network, 2737 ) 2738 2739 def get_next_page(self): 2740 """Returns the next page of results as a sequence of Track objects.""" 2741 2742 master_node = self._retrieve_next_page() 2743 2744 seq = [] 2745 for node in master_node.getElementsByTagName("track"): 2746 track = Track( 2747 _extract(node, "artist"), 2748 _extract(node, "name"), 2749 self.network, 2750 info={"image": _extract_all(node, "image")}, 2751 ) 2752 track.listener_count = _number(_extract(node, "listeners")) 2753 seq.append(track) 2754 2755 return seq 2756 2757 2758def md5(text): 2759 """Returns the md5 hash of a string.""" 2760 2761 h = hashlib.md5() 2762 h.update(_unicode(text).encode("utf-8")) 2763 2764 return h.hexdigest() 2765 2766 2767def _unicode(text): 2768 if isinstance(text, bytes): 2769 return str(text, "utf-8") 2770 elif isinstance(text, str): 2771 return text 2772 else: 2773 return str(text) 2774 2775 2776def _string(string): 2777 if isinstance(string, str): 2778 return string 2779 return str(string) 2780 2781 2782def cleanup_nodes(doc): 2783 """ 2784 Remove text nodes containing only whitespace 2785 """ 2786 for node in doc.documentElement.childNodes: 2787 if node.nodeType == Node.TEXT_NODE and node.nodeValue.isspace(): 2788 doc.documentElement.removeChild(node) 2789 return doc 2790 2791 2792def _collect_nodes(limit, sender, method_name, cacheable, params=None, stream=False): 2793 """ 2794 Returns a sequence of dom.Node objects about as close to limit as possible 2795 """ 2796 if not params: 2797 params = sender._get_params() 2798 2799 def _stream_collect_nodes(): 2800 node_count = 0 2801 page = 1 2802 end_of_pages = False 2803 2804 while not end_of_pages and (not limit or (limit and node_count < limit)): 2805 params["page"] = str(page) 2806 2807 tries = 1 2808 while True: 2809 try: 2810 doc = sender._request(method_name, cacheable, params) 2811 break # success 2812 except Exception as e: 2813 if tries >= 3: 2814 raise PyLastError() from e 2815 # Wait and try again 2816 time.sleep(1) 2817 tries += 1 2818 2819 doc = cleanup_nodes(doc) 2820 2821 # break if there are no child nodes 2822 if not doc.documentElement.childNodes: 2823 break 2824 main = doc.documentElement.childNodes[0] 2825 2826 if main.hasAttribute("totalPages") or main.hasAttribute("totalpages"): 2827 total_pages = _number( 2828 main.getAttribute("totalPages") or main.getAttribute("totalpages") 2829 ) 2830 else: 2831 raise PyLastError("No total pages attribute") 2832 2833 for node in main.childNodes: 2834 if not node.nodeType == xml.dom.Node.TEXT_NODE and ( 2835 not limit or (node_count < limit) 2836 ): 2837 node_count += 1 2838 yield node 2839 2840 end_of_pages = page >= total_pages 2841 2842 page += 1 2843 2844 return _stream_collect_nodes() if stream else list(_stream_collect_nodes()) 2845 2846 2847def _extract(node, name, index=0): 2848 """Extracts a value from the xml string""" 2849 2850 nodes = node.getElementsByTagName(name) 2851 2852 if len(nodes): 2853 if nodes[index].firstChild: 2854 return _unescape_htmlentity(nodes[index].firstChild.data.strip()) 2855 else: 2856 return None 2857 2858 2859def _extract_all(node, name, limit_count=None): 2860 """Extracts all the values from the xml string. returning a list.""" 2861 2862 seq = [] 2863 2864 for i in range(0, len(node.getElementsByTagName(name))): 2865 if len(seq) == limit_count: 2866 break 2867 2868 seq.append(_extract(node, name, i)) 2869 2870 return seq 2871 2872 2873def _extract_top_artists(doc, network): 2874 # TODO Maybe include the _request here too? 2875 seq = [] 2876 for node in doc.getElementsByTagName("artist"): 2877 name = _extract(node, "name") 2878 playcount = _extract(node, "playcount") 2879 2880 seq.append(TopItem(Artist(name, network), playcount)) 2881 2882 return seq 2883 2884 2885def _extract_top_albums(doc, network): 2886 # TODO Maybe include the _request here too? 2887 seq = [] 2888 for node in doc.getElementsByTagName("album"): 2889 name = _extract(node, "name") 2890 artist = _extract(node, "name", 1) 2891 playcount = _extract(node, "playcount") 2892 info = {"image": _extract_all(node, "image")} 2893 2894 seq.append(TopItem(Album(artist, name, network, info=info), playcount)) 2895 2896 return seq 2897 2898 2899def _extract_artists(doc, network): 2900 seq = [] 2901 for node in doc.getElementsByTagName("artist"): 2902 seq.append(Artist(_extract(node, "name"), network)) 2903 return seq 2904 2905 2906def _extract_albums(doc, network): 2907 seq = [] 2908 for node in doc.getElementsByTagName("album"): 2909 name = _extract(node, "name") 2910 artist = _extract(node, "name", 1) 2911 seq.append(Album(artist, name, network)) 2912 return seq 2913 2914 2915def _extract_tracks(doc, network): 2916 seq = [] 2917 for node in doc.getElementsByTagName("track"): 2918 name = _extract(node, "name") 2919 artist = _extract(node, "name", 1) 2920 seq.append(Track(artist, name, network)) 2921 return seq 2922 2923 2924def _url_safe(text): 2925 """Does all kinds of tricks on a text to make it safe to use in a URL.""" 2926 2927 return quote_plus(quote_plus(_string(text))).lower() 2928 2929 2930def _number(string): 2931 """ 2932 Extracts an int from a string. 2933 Returns a 0 if None or an empty string was passed. 2934 """ 2935 2936 if not string: 2937 return 0 2938 else: 2939 try: 2940 return int(string) 2941 except ValueError: 2942 return float(string) 2943 2944 2945def _unescape_htmlentity(string): 2946 mapping = html.entities.name2codepoint 2947 for key in mapping: 2948 string = string.replace("&%s;" % key, chr(mapping[key])) 2949 2950 return string 2951 2952 2953# End of file 2954