1# Copyright (c) 2014-2021 Cedric Bellegarde <cedric.bellegarde@adishatz.org> 2# This program is free software: you can redistribute it and/or modify 3# it under the terms of the GNU General Public License as published by 4# the Free Software Foundation, either version 3 of the License, or 5# (at your option) any later version. 6# This program is distributed in the hope that it will be useful, 7# but WITHOUT ANY WARRANTY; without even the implied warranty of 8# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 9# GNU General Public License for more details. 10# You should have received a copy of the GNU General Public License 11# along with this program. If not, see <http://www.gnu.org/licenses/>. 12 13from gi.repository import Gst, GstPbutils, GLib, Gio 14 15from re import match 16from gettext import gettext as _ 17 18from lollypop.define import App 19from lollypop.logger import Logger 20from lollypop.utils_file import decodeUnicode, splitUnicode 21from lollypop.utils import format_artist_name, get_iso_date_from_string 22from lollypop.tag_frame_text import FrameTextTag 23from lollypop.tag_frame_lang import FrameLangTag 24 25 26class Discoverer: 27 """ 28 Discover tags 29 """ 30 31 def __init__(self): 32 """ 33 Init tag reader 34 """ 35 36 self._discoverer = GstPbutils.Discoverer.new(10 * Gst.SECOND) 37 38 def get_info(self, uri): 39 """ 40 Return information for file at uri 41 @param uri as str 42 @Exception GLib.Error 43 @return GstPbutils.DiscovererInfo 44 """ 45 info = self._discoverer.discover_uri(uri) 46 return info 47 48 49class TagReader: 50 """ 51 Scanner tag reader 52 """ 53 54 __STRING = ["title", "artist", "composer", "conductor", 55 "musicbrainz-albumid", "musicbrainz-trackid", 56 "musicbrainz-artistid", "musicbrainz-albumartistid", 57 "version", "performer", "artist-sortname", 58 "album-artist-sortname", "interpreted-by", "album-artist", 59 "album", "genre", "lyrics", "publisher"] 60 __INT = ["album-disc-number", "track-number"] 61 __DOUBLE = ["beats-per-minute"] 62 63 def __init__(self): 64 """ 65 Init tag reader 66 """ 67 pass 68 69 def get_title(self, tags, filepath): 70 """ 71 Return title for tags 72 @param tags as Gst.TagList 73 @param filepath as string 74 @return title as string 75 """ 76 if tags is None: 77 return GLib.path_get_basename(filepath) 78 title = self.__get(tags, ["title"]) 79 if not title: 80 title = GLib.path_get_basename(filepath) 81 return title 82 83 def get_artists(self, tags): 84 """ 85 Return artists for tags 86 @param tags as Gst.TagList 87 @return string like "artist1;artist2;..." 88 """ 89 if tags is None: 90 return _("Unknown") 91 return self.__get(tags, ["artist"]) 92 93 def get_composers(self, tags): 94 """ 95 Return composers for tags 96 @param tags as Gst.TagList 97 @return string like "composer1;composer2;..." 98 """ 99 if tags is None: 100 return _("Unknown") 101 return self.__get(tags, ["composer"]) 102 103 def get_conductors(self, tags): 104 """ 105 Return conductors for tags 106 @param tags as Gst.TagList 107 @return string like "conductor1;conductor2;..." 108 """ 109 if tags is None: 110 return _("Unknown") 111 return self.__get(tags, ["conductor"]) 112 113 def get_mb_id(self, tags, name): 114 """ 115 Get MusicBrainz ID 116 @param tags as Gst.TagList 117 @param name as str 118 @return str 119 """ 120 if tags is None or not name: 121 return "" 122 return self.__get(tags, ["musicbrainz-" + name]) 123 124 def get_mb_album_id(self, tags): 125 """ 126 Get album id (musicbrainz) 127 @param tags as Gst.TagList 128 @return str 129 """ 130 return self.get_mb_id(tags, 'albumid') 131 132 def get_mb_track_id(self, tags): 133 """ 134 Get recording id (musicbrainz) 135 @param tags as Gst.TagList 136 @return str 137 """ 138 return self.get_mb_id(tags, 'trackid') 139 140 def get_mb_artist_id(self, tags): 141 """ 142 Get artist id (musicbrainz) 143 @param tags as Gst.TagList 144 @return str 145 """ 146 return self.get_mb_id(tags, 'artistid') 147 148 def get_mb_album_artist_id(self, tags): 149 """ 150 Get album artist id (musicbrainz) 151 @param tags as Gst.TagList 152 @return str 153 """ 154 return self.get_mb_id(tags, 'albumartistid') 155 156 def get_version(self, tags): 157 """ 158 Get recording version 159 @param tags as Gst.TagList 160 @return str 161 """ 162 if tags is None: 163 return "" 164 return self.__get(tags, ["version"]) 165 166 def get_performers(self, tags): 167 """ 168 Return performers for tags 169 @param tags as Gst.TagList 170 @return string like "performer1;performer2;..." 171 """ 172 if tags is None: 173 return _("Unknown") 174 return self.__get(tags, ["performer"]) 175 176 def get_artist_sortnames(self, tags): 177 """ 178 Return artist sort names 179 @param tags as Gst.TagList 180 @return artist sort names as "str;str" 181 """ 182 if tags is None: 183 return "" 184 return self.__get(tags, ["artist-sortname"]) 185 186 def get_album_artist_sortnames(self, tags): 187 """ 188 Return album artist sort names 189 @param tags as Gst.TagList 190 @return artist sort names as "str;str" 191 """ 192 if tags is None: 193 return "" 194 return self.__get(tags, ["album-artist-sortname"]) 195 196 def get_remixers(self, tags): 197 """ 198 Get remixers tag 199 @param tags as Gst.TagList 200 @return artist sort names as "str,str" 201 """ 202 if tags is None: 203 return _("Unknown") 204 remixers = self.__get(tags, ["interpreted-by"]) 205 if not remixers: 206 remixers = self.__get_extended(tags, ["REMIXER"]) 207 return remixers 208 209 def get_album_artists(self, tags): 210 """ 211 Return album artists for tags 212 @param tags as Gst.TagList 213 @return album artist as string or None 214 """ 215 if tags is None: 216 return _("Unknown") 217 return self.__get(tags, ["album-artist"]) 218 219 def get_album_name(self, tags): 220 """ 221 Return album for tags 222 @param tags as Gst.TagList 223 @return album name as string 224 """ 225 if tags is None: 226 return _("Unknown") 227 album = self.__get(tags, ["album"]) 228 if not album: 229 album = _("Unknown") 230 return album 231 232 def get_genres(self, tags): 233 """ 234 Return genres for tags 235 @param tags as Gst.TagList 236 @return string like "genre1;genre2;..." 237 """ 238 if tags is None: 239 return _("Unknown") 240 genres = self.__get(tags, ["genre"]) 241 if not genres: 242 genres = _("Unknown") 243 return genres 244 245 def get_discname(self, tags): 246 """ 247 Return disc name 248 @param tags as Gst.TagList 249 @return disc name as str 250 """ 251 return self.__get_extended(tags, ['PART', 'DISCSUBTITLE']) 252 253 def get_discnumber(self, tags): 254 """ 255 Return disc number for tags 256 @param tags as Gst.TagList 257 @return disc number as int 258 """ 259 if tags is None: 260 return 0 261 discnumber = self.__get(tags, ["album-disc-number"]) 262 if not discnumber: 263 discnumber = 0 264 return discnumber 265 266 def get_compilation(self, tags): 267 """ 268 Return True if album is a compilation 269 @param tags as Gst.TagList 270 @return bool 271 """ 272 if tags is None: 273 return False 274 try: 275 compilation = self.__get_private_string(tags, "TCMP", False) 276 if not compilation: 277 compilation = self.__get_extended(tags, ["COMPILATION"]) 278 if compilation: 279 return bool(compilation) 280 except Exception as e: 281 Logger.error("TagReader::get_compilation(): %s" % e) 282 return False 283 284 def get_tracknumber(self, tags, filename): 285 """ 286 Return track number for tags 287 @param tags as Gst.TagList 288 @param filename as str 289 @return track number as int 290 """ 291 if tags is not None: 292 tracknumber = self.__get(tags, ["track-number"]) 293 else: 294 tracknumber = None 295 if not tracknumber: 296 # Guess from filename 297 m = match("^([0-9]*)[ ]*-", filename) 298 if m: 299 try: 300 tracknumber = int(m.group(1)) 301 except: 302 tracknumber = 0 303 else: 304 tracknumber = 0 305 return min(abs(tracknumber), GLib.MAXINT32) 306 307 def get_year(self, tags): 308 """ 309 Return track year for tags 310 @param tags as Gst.TagList 311 @return year and timestamp (int, int) 312 """ 313 try: 314 (exists_date, date) = tags.get_date_index("date", 0) 315 (exists_datetime, datetime) = tags.get_date_time_index("datetime", 316 0) 317 year = timestamp = None 318 if exists_datetime: 319 if datetime.has_year(): 320 year = datetime.get_year() 321 if datetime.has_month(): 322 month = datetime.get_month() 323 else: 324 month = 1 325 if datetime.has_day(): 326 day = datetime.get_day() 327 else: 328 day = 1 329 if exists_date and date.valid(): 330 year = date.get_year() 331 month = date.get_month() 332 day = date.get_day() 333 334 if year is not None: 335 gst_datetime = Gst.DateTime.new_local_time( 336 year, month, day, 0, 0, 0) 337 glib_datetime = gst_datetime.to_g_date_time() 338 timestamp = glib_datetime.to_unix() 339 return (year, timestamp) 340 except Exception as e: 341 Logger.error("TagReader::get_year(): %s", e) 342 return (None, None) 343 344 def get_original_year(self, tags): 345 """ 346 Return original release year 347 @param tags as Gst.TagList 348 @return year and timestamp (int, int) 349 """ 350 def get_id3(): 351 date_string = self.__get_private_string(tags, "TDOR", False) 352 try: 353 date = get_iso_date_from_string(date_string) 354 datetime = GLib.DateTime.new_from_iso8601(date, None) 355 return (datetime.get_year(), datetime.to_unix()) 356 except: 357 pass 358 return (None, None) 359 360 def get_ogg(): 361 try: 362 date_string = self.__get_extended(tags, ['ORIGINALDATE']) 363 date = get_iso_date_from_string(date_string) 364 datetime = GLib.DateTime.new_from_iso8601(date, None) 365 return (datetime.get_year(), datetime.to_unix()) 366 except: 367 pass 368 return (None, None) 369 370 if tags is None: 371 return None 372 values = get_id3() 373 if values[0] is None: 374 values = get_ogg() 375 return values 376 377 def get_bpm(self, tags): 378 """ 379 Get BPM from tags 380 @param tags as Gst.TagList 381 @return int/None 382 """ 383 bpm = self.__get(tags, ["beats-per-minute"]) 384 if not bpm: 385 bpm = None 386 return bpm 387 388 def get_popm(self, tags): 389 """ 390 Get popularity tag 391 @param tags as Gst.TagList 392 @return int 393 """ 394 try: 395 if tags is None: 396 return 0 397 size = tags.get_tag_size("private-id3v2-frame") 398 for i in range(0, size): 399 (exists, sample) = tags.get_sample_index("private-id3v2-frame", 400 i) 401 if not exists: 402 continue 403 (exists, m) = sample.get_buffer().map(Gst.MapFlags.READ) 404 if not exists: 405 continue 406 # Gstreamer 1.18 API breakage 407 try: 408 bytes = m.data.tobytes() 409 except: 410 bytes = m.data 411 412 if len(bytes) > 4 and bytes[0:4] == b"POPM": 413 try: 414 popm = bytes.split(b"\x00")[6][0] 415 except: 416 popm = 0 417 if popm == 0: 418 value = 0 419 elif popm >= 1 and popm < 64: 420 value = 1 421 elif popm >= 64 and popm < 128: 422 value = 2 423 elif popm >= 128 and popm < 196: 424 value = 3 425 elif popm >= 196 and popm < 255: 426 value = 4 427 elif popm == 255: 428 value = 5 429 else: 430 value = 0 431 return value 432 except Exception as e: 433 Logger.warning("TagReader::get_popm(): %s", e) 434 return 0 435 436 def get_lyrics(self, tags): 437 """ 438 Return lyrics for tags 439 @parma tags as Gst.TagList 440 @return lyrics as str 441 """ 442 def get_mp4(): 443 return self.__get(tags, ["lyrics"]) 444 445 def get_id3(): 446 return self.__get_private_string(tags, "USLT", True) 447 448 def get_ogg(): 449 return self.__get_extended(tags, ["LYRICS"]) 450 451 if tags is None: 452 return "" 453 lyrics = get_mp4() 454 if not lyrics: 455 lyrics = get_id3() 456 if not lyrics: 457 lyrics = get_ogg() 458 return lyrics 459 460 def get_synced_lyrics(self, tags): 461 """ 462 Return synced lyrics for tags 463 @parma tags as Gst.TagList 464 @return lyrics as ([str, int]) 465 """ 466 def decode_lyrics(bytes_list, encoding): 467 lyrics = [] 468 try: 469 for frame in bytes_list: 470 (l, t) = splitUnicode(frame, encoding) 471 if l: 472 lyrics.append((decodeUnicode(l, encoding), 473 int.from_bytes(t[1:4], "big"))) 474 except Exception as e: 475 Logger.warning( 476 "TagReader::get_synced_lyrics.decode_lyrics(): %s", e) 477 return lyrics 478 479 def get_id3(): 480 try: 481 b = self.__get_private_bytes(tags, "SYLT") 482 if b: 483 frame = b[10:] 484 encoding = frame[0:1] 485 string = decode_lyrics(frame.split(b"\n"), encoding) 486 if string is not None: 487 return string 488 except Exception as e: 489 Logger.warning("TagReader::get_synced_lyrics.get_id3(): %s", e) 490 return "" 491 492 if tags is None: 493 return "" 494 lyrics = get_id3() 495 return lyrics 496 497 def add_artists(self, artists, sortnames, mb_artist_id=""): 498 """ 499 Add artists to db 500 @param artists as str 501 @param sortnames as str 502 @param mb_artist_id as str 503 @return ([int], [int]): (added artist ids, artist ids) 504 """ 505 artist_ids = [] 506 added_artist_ids = [] 507 artistsplit = artists.split(";") 508 sortsplit = sortnames.split(";") 509 sortlen = len(sortsplit) 510 mbidsplit = mb_artist_id.split(";") 511 mbidlen = len(mbidsplit) 512 if len(artistsplit) != mbidlen: 513 mbidsplit = [] 514 mbidlen = 0 515 i = 0 516 for artist in artistsplit: 517 artist = artist.strip() 518 if artist != "": 519 if i >= mbidlen or mbidsplit[i] == "": 520 mbid = None 521 else: 522 mbid = mbidsplit[i].strip() 523 # Get artist id, add it if missing 524 (artist_id, db_name) = App().artists.get_id(artist, mbid) 525 if i >= sortlen or sortsplit[i] == "": 526 sortname = None 527 else: 528 sortname = sortsplit[i].strip() 529 if artist_id is None: 530 if sortname is None: 531 sortname = format_artist_name(artist) 532 artist_id = App().artists.add(artist, sortname, mbid) 533 added_artist_ids.append(artist_id) 534 else: 535 # artists.get_id() is NOCASE, check if we need to update 536 # artist name 537 if db_name != artist: 538 App().artists.set_name(artist_id, artist) 539 if sortname is not None: 540 App().artists.set_sortname(artist_id, sortname) 541 if mbid is not None: 542 App().artists.set_mb_artist_id(artist_id, mbid) 543 i += 1 544 artist_ids.append(artist_id) 545 return (added_artist_ids, artist_ids) 546 547 def add_genres(self, genres): 548 """ 549 Add genres to db 550 @param genres as string 551 @return ([int], [int]): (added genre ids, genre ids) 552 """ 553 genre_ids = [] 554 added_genre_ids = [] 555 for genre in genres.split(";"): 556 genre = genre.strip() 557 if genre != "": 558 # Get genre id, add genre if missing 559 genre_id = App().genres.get_id(genre) 560 if genre_id is None: 561 genre_id = App().genres.add(genre) 562 added_genre_ids.append(genre_id) 563 genre_ids.append(genre_id) 564 return (added_genre_ids, genre_ids) 565 566 def add_album(self, album_name, mb_album_id, lp_album_id, artist_ids, 567 uri, loved, popularity, rate, synced, mtime, storage_type): 568 """ 569 Add album to db 570 @param album_name as str 571 @param mb_album_id as str 572 @param lp_album_id as str 573 @param artist_ids as [int] 574 @param uri as str 575 @param loved as bool 576 @param popularity as int 577 @param rate as int 578 @param synced as int 579 @param mtime as int 580 @param storage_type as StorageType 581 @return (added as bool, album_id as int) 582 @commit needed 583 """ 584 added = False 585 if uri.find("://") != -1: 586 f = Gio.File.new_for_uri(uri) 587 parent = f.get_parent() 588 if parent is not None: 589 uri = parent.get_uri() 590 album_id = App().albums.get_id(album_name, mb_album_id, artist_ids) 591 # Check storage type did not changed, remove album then 592 if album_id is not None: 593 current_storage_type = App().albums.get_storage_type(album_id) 594 if current_storage_type != storage_type: 595 App().tracks.remove_album(album_id) 596 App().tracks.clean(False) 597 App().albums.clean(False) 598 App().artists.clean(False) 599 album_id = None 600 if album_id is None: 601 added = True 602 album_id = App().albums.add(album_name, mb_album_id, lp_album_id, 603 artist_ids, uri, loved, popularity, 604 rate, synced, mtime, storage_type) 605 # Check if path did not change 606 elif App().albums.get_uri(album_id) != uri: 607 App().albums.set_uri(album_id, uri) 608 return (added, album_id) 609 610####################### 611# PRIVATE # 612####################### 613 def __get_extended(self, tags, keys): 614 """ 615 Return tag from tags following keys 616 @param tags as Gst.TagList 617 @param keys as [str] 618 @return Tag as str 619 """ 620 if tags is None: 621 return "" 622 items = [] 623 try: 624 for i in range(tags.get_tag_size("extended-comment")): 625 (exists, read) = tags.get_string_index("extended-comment", i) 626 for key in keys: 627 if exists and read.startswith(key + "="): 628 items.append("".join(read.split("=")[1:])) 629 except Exception as e: 630 Logger.error("TagReader::__get_extended(): %s", e) 631 return ";".join(items) 632 633 def __get(self, tags, keys): 634 """ 635 Return tag from tags following keys 636 Only handles string/uint/double 637 @param tags as Gst.TagList 638 @param keys as [str] 639 @return Tag as str/int/double. Empty string if does not exist 640 """ 641 if tags is None: 642 return "" 643 items = [] 644 try: 645 for key in keys: 646 for i in range(tags.get_tag_size(key)): 647 if key in self.__STRING: 648 (exists, read) = tags.get_string_index(key, i) 649 if exists and read.strip(" "): 650 items.append(read) 651 elif key in self.__INT: 652 (exists, read) = tags.get_uint_index(key, i) 653 if exists: 654 return read 655 elif key in self.__DOUBLE: 656 (exists, read) = tags.get_double_index(key, i) 657 if exists: 658 return read 659 else: 660 Logger.error("Missing key" % key) 661 except Exception as e: 662 Logger.error("TagReader::__get(): %s", e) 663 return ";".join(items) 664 665 def __get_private_bytes(self, tags, key): 666 """ 667 Get key from private frame 668 @param tags as Gst.TagList 669 @param key as str 670 @return frame as bytes 671 """ 672 try: 673 size = tags.get_tag_size("private-id3v2-frame") 674 encoded_key = key.encode("utf-8") 675 for i in range(0, size): 676 (exists, sample) = tags.get_sample_index( 677 "private-id3v2-frame", 678 i) 679 if not exists: 680 continue 681 (exists, m) = sample.get_buffer().map(Gst.MapFlags.READ) 682 if not exists: 683 continue 684 # Gstreamer 1.18 API breakage 685 try: 686 b = m.data.tobytes() 687 except: 688 b = m.data 689 690 if b[0:len(encoded_key)] != encoded_key: 691 continue 692 return b 693 except Exception as e: 694 Logger.error("TagReader::__get_private_bytes(): %s" % e) 695 return b"" 696 697 def __get_private_string(self, tags, key, lang): 698 """ 699 Get key from private frame 700 @param tags as Gst.TagList 701 @param key as str 702 @param lang as bool 703 @return Tag as str 704 """ 705 try: 706 b = self.__get_private_bytes(tags, key) 707 if lang: 708 frame = FrameLangTag(b) 709 else: 710 frame = FrameTextTag(b) 711 if frame.key == key: 712 return frame.string 713 except Exception as e: 714 Logger.error("TagReader::__get_private(): %s" % e) 715 return "" 716