1# Copyright (c) 2014-2021 Cedric Bellegarde <cedric.bellegarde@adishatz.org>
2# This program is free software: you can redistribute it and/or modify
3# it under the terms of the GNU General Public License as published by
4# the Free Software Foundation, either version 3 of the License, or
5# (at your option) any later version.
6# This program is distributed in the hope that it will be useful,
7# but WITHOUT ANY WARRANTY; without even the implied warranty of
8# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9# GNU General Public License for more details.
10# You should have received a copy of the GNU General Public License
11# along with this program. If not, see <http://www.gnu.org/licenses/>.
12
13from gi.repository import Gst, GstPbutils, GLib, Gio
14
15from re import match
16from gettext import gettext as _
17
18from lollypop.define import App
19from lollypop.logger import Logger
20from lollypop.utils_file import decodeUnicode, splitUnicode
21from lollypop.utils import format_artist_name, get_iso_date_from_string
22from lollypop.tag_frame_text import FrameTextTag
23from lollypop.tag_frame_lang import FrameLangTag
24
25
26class Discoverer:
27    """
28        Discover tags
29    """
30
31    def __init__(self):
32        """
33            Init tag reader
34        """
35
36        self._discoverer = GstPbutils.Discoverer.new(10 * Gst.SECOND)
37
38    def get_info(self, uri):
39        """
40            Return information for file at uri
41            @param uri as str
42            @Exception GLib.Error
43            @return GstPbutils.DiscovererInfo
44        """
45        info = self._discoverer.discover_uri(uri)
46        return info
47
48
49class TagReader:
50    """
51        Scanner tag reader
52    """
53
54    __STRING = ["title", "artist", "composer", "conductor",
55                "musicbrainz-albumid", "musicbrainz-trackid",
56                "musicbrainz-artistid", "musicbrainz-albumartistid",
57                "version", "performer", "artist-sortname",
58                "album-artist-sortname", "interpreted-by", "album-artist",
59                "album", "genre", "lyrics", "publisher"]
60    __INT = ["album-disc-number", "track-number"]
61    __DOUBLE = ["beats-per-minute"]
62
63    def __init__(self):
64        """
65            Init tag reader
66        """
67        pass
68
69    def get_title(self, tags, filepath):
70        """
71            Return title for tags
72            @param tags as Gst.TagList
73            @param filepath as string
74            @return title as string
75        """
76        if tags is None:
77            return GLib.path_get_basename(filepath)
78        title = self.__get(tags, ["title"])
79        if not title:
80            title = GLib.path_get_basename(filepath)
81        return title
82
83    def get_artists(self, tags):
84        """
85            Return artists for tags
86            @param tags as Gst.TagList
87            @return string like "artist1;artist2;..."
88        """
89        if tags is None:
90            return _("Unknown")
91        return self.__get(tags, ["artist"])
92
93    def get_composers(self, tags):
94        """
95            Return composers for tags
96            @param tags as Gst.TagList
97            @return string like "composer1;composer2;..."
98        """
99        if tags is None:
100            return _("Unknown")
101        return self.__get(tags, ["composer"])
102
103    def get_conductors(self, tags):
104        """
105            Return conductors for tags
106            @param tags as Gst.TagList
107            @return string like "conductor1;conductor2;..."
108        """
109        if tags is None:
110            return _("Unknown")
111        return self.__get(tags, ["conductor"])
112
113    def get_mb_id(self, tags, name):
114        """
115            Get MusicBrainz ID
116            @param tags as Gst.TagList
117            @param name as str
118            @return str
119        """
120        if tags is None or not name:
121            return ""
122        return self.__get(tags, ["musicbrainz-" + name])
123
124    def get_mb_album_id(self, tags):
125        """
126            Get album id (musicbrainz)
127            @param tags as Gst.TagList
128            @return str
129        """
130        return self.get_mb_id(tags, 'albumid')
131
132    def get_mb_track_id(self, tags):
133        """
134            Get recording id (musicbrainz)
135            @param tags as Gst.TagList
136            @return str
137        """
138        return self.get_mb_id(tags, 'trackid')
139
140    def get_mb_artist_id(self, tags):
141        """
142            Get artist id (musicbrainz)
143            @param tags as Gst.TagList
144            @return str
145        """
146        return self.get_mb_id(tags, 'artistid')
147
148    def get_mb_album_artist_id(self, tags):
149        """
150            Get album artist id (musicbrainz)
151            @param tags as Gst.TagList
152            @return str
153        """
154        return self.get_mb_id(tags, 'albumartistid')
155
156    def get_version(self, tags):
157        """
158            Get recording version
159            @param tags as Gst.TagList
160            @return str
161        """
162        if tags is None:
163            return ""
164        return self.__get(tags, ["version"])
165
166    def get_performers(self, tags):
167        """
168            Return performers for tags
169            @param tags as Gst.TagList
170            @return string like "performer1;performer2;..."
171        """
172        if tags is None:
173            return _("Unknown")
174        return self.__get(tags, ["performer"])
175
176    def get_artist_sortnames(self, tags):
177        """
178            Return artist sort names
179            @param tags as Gst.TagList
180            @return artist sort names as "str;str"
181        """
182        if tags is None:
183            return ""
184        return self.__get(tags, ["artist-sortname"])
185
186    def get_album_artist_sortnames(self, tags):
187        """
188            Return album artist sort names
189            @param tags as Gst.TagList
190            @return artist sort names as "str;str"
191        """
192        if tags is None:
193            return ""
194        return self.__get(tags, ["album-artist-sortname"])
195
196    def get_remixers(self, tags):
197        """
198            Get remixers tag
199            @param tags as Gst.TagList
200            @return artist sort names as "str,str"
201        """
202        if tags is None:
203            return _("Unknown")
204        remixers = self.__get(tags, ["interpreted-by"])
205        if not remixers:
206            remixers = self.__get_extended(tags, ["REMIXER"])
207        return remixers
208
209    def get_album_artists(self, tags):
210        """
211            Return album artists for tags
212            @param tags as Gst.TagList
213            @return album artist as string or None
214        """
215        if tags is None:
216            return _("Unknown")
217        return self.__get(tags, ["album-artist"])
218
219    def get_album_name(self, tags):
220        """
221            Return album for tags
222            @param tags as Gst.TagList
223            @return album name as string
224        """
225        if tags is None:
226            return _("Unknown")
227        album = self.__get(tags, ["album"])
228        if not album:
229            album = _("Unknown")
230        return album
231
232    def get_genres(self, tags):
233        """
234            Return genres for tags
235            @param tags as Gst.TagList
236            @return string like "genre1;genre2;..."
237        """
238        if tags is None:
239            return _("Unknown")
240        genres = self.__get(tags, ["genre"])
241        if not genres:
242            genres = _("Unknown")
243        return genres
244
245    def get_discname(self, tags):
246        """
247            Return disc name
248            @param tags as Gst.TagList
249            @return disc name as str
250        """
251        return self.__get_extended(tags, ['PART', 'DISCSUBTITLE'])
252
253    def get_discnumber(self, tags):
254        """
255            Return disc number for tags
256            @param tags as Gst.TagList
257            @return disc number as int
258        """
259        if tags is None:
260            return 0
261        discnumber = self.__get(tags, ["album-disc-number"])
262        if not discnumber:
263            discnumber = 0
264        return discnumber
265
266    def get_compilation(self, tags):
267        """
268            Return True if album is a compilation
269            @param tags as Gst.TagList
270            @return bool
271        """
272        if tags is None:
273            return False
274        try:
275            compilation = self.__get_private_string(tags, "TCMP", False)
276            if not compilation:
277                compilation = self.__get_extended(tags, ["COMPILATION"])
278            if compilation:
279                return bool(compilation)
280        except Exception as e:
281            Logger.error("TagReader::get_compilation(): %s" % e)
282        return False
283
284    def get_tracknumber(self, tags, filename):
285        """
286            Return track number for tags
287            @param tags as Gst.TagList
288            @param filename as str
289            @return track number as int
290        """
291        if tags is not None:
292            tracknumber = self.__get(tags, ["track-number"])
293        else:
294            tracknumber = None
295        if not tracknumber:
296            # Guess from filename
297            m = match("^([0-9]*)[ ]*-", filename)
298            if m:
299                try:
300                    tracknumber = int(m.group(1))
301                except:
302                    tracknumber = 0
303            else:
304                tracknumber = 0
305        return min(abs(tracknumber), GLib.MAXINT32)
306
307    def get_year(self, tags):
308        """
309            Return track year for tags
310            @param tags as Gst.TagList
311            @return year and timestamp (int, int)
312        """
313        try:
314            (exists_date, date) = tags.get_date_index("date", 0)
315            (exists_datetime, datetime) = tags.get_date_time_index("datetime",
316                                                                   0)
317            year = timestamp = None
318            if exists_datetime:
319                if datetime.has_year():
320                    year = datetime.get_year()
321                if datetime.has_month():
322                    month = datetime.get_month()
323                else:
324                    month = 1
325                if datetime.has_day():
326                    day = datetime.get_day()
327                else:
328                    day = 1
329            if exists_date and date.valid():
330                year = date.get_year()
331                month = date.get_month()
332                day = date.get_day()
333
334            if year is not None:
335                gst_datetime = Gst.DateTime.new_local_time(
336                    year, month, day, 0, 0, 0)
337                glib_datetime = gst_datetime.to_g_date_time()
338                timestamp = glib_datetime.to_unix()
339            return (year, timestamp)
340        except Exception as e:
341            Logger.error("TagReader::get_year(): %s", e)
342        return (None, None)
343
344    def get_original_year(self, tags):
345        """
346            Return original release year
347            @param tags as Gst.TagList
348            @return year and timestamp (int, int)
349        """
350        def get_id3():
351            date_string = self.__get_private_string(tags, "TDOR", False)
352            try:
353                date = get_iso_date_from_string(date_string)
354                datetime = GLib.DateTime.new_from_iso8601(date, None)
355                return (datetime.get_year(), datetime.to_unix())
356            except:
357                pass
358            return (None, None)
359
360        def get_ogg():
361            try:
362                date_string = self.__get_extended(tags, ['ORIGINALDATE'])
363                date = get_iso_date_from_string(date_string)
364                datetime = GLib.DateTime.new_from_iso8601(date, None)
365                return (datetime.get_year(), datetime.to_unix())
366            except:
367                pass
368            return (None, None)
369
370        if tags is None:
371            return None
372        values = get_id3()
373        if values[0] is None:
374            values = get_ogg()
375        return values
376
377    def get_bpm(self, tags):
378        """
379            Get BPM from tags
380            @param tags as Gst.TagList
381            @return int/None
382        """
383        bpm = self.__get(tags, ["beats-per-minute"])
384        if not bpm:
385            bpm = None
386        return bpm
387
388    def get_popm(self, tags):
389        """
390            Get popularity tag
391            @param tags as Gst.TagList
392            @return int
393        """
394        try:
395            if tags is None:
396                return 0
397            size = tags.get_tag_size("private-id3v2-frame")
398            for i in range(0, size):
399                (exists, sample) = tags.get_sample_index("private-id3v2-frame",
400                                                         i)
401                if not exists:
402                    continue
403                (exists, m) = sample.get_buffer().map(Gst.MapFlags.READ)
404                if not exists:
405                    continue
406                # Gstreamer 1.18 API breakage
407                try:
408                    bytes = m.data.tobytes()
409                except:
410                    bytes = m.data
411
412                if len(bytes) > 4 and bytes[0:4] == b"POPM":
413                    try:
414                        popm = bytes.split(b"\x00")[6][0]
415                    except:
416                        popm = 0
417                    if popm == 0:
418                        value = 0
419                    elif popm >= 1 and popm < 64:
420                        value = 1
421                    elif popm >= 64 and popm < 128:
422                        value = 2
423                    elif popm >= 128 and popm < 196:
424                        value = 3
425                    elif popm >= 196 and popm < 255:
426                        value = 4
427                    elif popm == 255:
428                        value = 5
429                    else:
430                        value = 0
431                    return value
432        except Exception as e:
433            Logger.warning("TagReader::get_popm(): %s", e)
434        return 0
435
436    def get_lyrics(self, tags):
437        """
438            Return lyrics for tags
439            @parma tags as Gst.TagList
440            @return lyrics as str
441        """
442        def get_mp4():
443            return self.__get(tags, ["lyrics"])
444
445        def get_id3():
446            return self.__get_private_string(tags, "USLT", True)
447
448        def get_ogg():
449            return self.__get_extended(tags, ["LYRICS"])
450
451        if tags is None:
452            return ""
453        lyrics = get_mp4()
454        if not lyrics:
455            lyrics = get_id3()
456        if not lyrics:
457            lyrics = get_ogg()
458        return lyrics
459
460    def get_synced_lyrics(self, tags):
461        """
462            Return synced lyrics for tags
463            @parma tags as Gst.TagList
464            @return lyrics as ([str, int])
465        """
466        def decode_lyrics(bytes_list, encoding):
467            lyrics = []
468            try:
469                for frame in bytes_list:
470                    (l, t) = splitUnicode(frame, encoding)
471                    if l:
472                        lyrics.append((decodeUnicode(l, encoding),
473                                       int.from_bytes(t[1:4], "big")))
474            except Exception as e:
475                Logger.warning(
476                        "TagReader::get_synced_lyrics.decode_lyrics(): %s", e)
477            return lyrics
478
479        def get_id3():
480            try:
481                b = self.__get_private_bytes(tags, "SYLT")
482                if b:
483                    frame = b[10:]
484                    encoding = frame[0:1]
485                    string = decode_lyrics(frame.split(b"\n"), encoding)
486                    if string is not None:
487                        return string
488            except Exception as e:
489                Logger.warning("TagReader::get_synced_lyrics.get_id3(): %s", e)
490            return ""
491
492        if tags is None:
493            return ""
494        lyrics = get_id3()
495        return lyrics
496
497    def add_artists(self, artists, sortnames, mb_artist_id=""):
498        """
499            Add artists to db
500            @param artists as str
501            @param sortnames as str
502            @param mb_artist_id as str
503            @return ([int], [int]): (added artist ids, artist ids)
504        """
505        artist_ids = []
506        added_artist_ids = []
507        artistsplit = artists.split(";")
508        sortsplit = sortnames.split(";")
509        sortlen = len(sortsplit)
510        mbidsplit = mb_artist_id.split(";")
511        mbidlen = len(mbidsplit)
512        if len(artistsplit) != mbidlen:
513            mbidsplit = []
514            mbidlen = 0
515        i = 0
516        for artist in artistsplit:
517            artist = artist.strip()
518            if artist != "":
519                if i >= mbidlen or mbidsplit[i] == "":
520                    mbid = None
521                else:
522                    mbid = mbidsplit[i].strip()
523                # Get artist id, add it if missing
524                (artist_id, db_name) = App().artists.get_id(artist, mbid)
525                if i >= sortlen or sortsplit[i] == "":
526                    sortname = None
527                else:
528                    sortname = sortsplit[i].strip()
529                if artist_id is None:
530                    if sortname is None:
531                        sortname = format_artist_name(artist)
532                    artist_id = App().artists.add(artist, sortname, mbid)
533                    added_artist_ids.append(artist_id)
534                else:
535                    # artists.get_id() is NOCASE, check if we need to update
536                    # artist name
537                    if db_name != artist:
538                        App().artists.set_name(artist_id, artist)
539                    if sortname is not None:
540                        App().artists.set_sortname(artist_id, sortname)
541                    if mbid is not None:
542                        App().artists.set_mb_artist_id(artist_id, mbid)
543                i += 1
544                artist_ids.append(artist_id)
545        return (added_artist_ids, artist_ids)
546
547    def add_genres(self, genres):
548        """
549            Add genres to db
550            @param genres as string
551            @return ([int], [int]): (added genre ids, genre ids)
552        """
553        genre_ids = []
554        added_genre_ids = []
555        for genre in genres.split(";"):
556            genre = genre.strip()
557            if genre != "":
558                # Get genre id, add genre if missing
559                genre_id = App().genres.get_id(genre)
560                if genre_id is None:
561                    genre_id = App().genres.add(genre)
562                    added_genre_ids.append(genre_id)
563                genre_ids.append(genre_id)
564        return (added_genre_ids, genre_ids)
565
566    def add_album(self, album_name, mb_album_id, lp_album_id, artist_ids,
567                  uri, loved, popularity, rate, synced, mtime, storage_type):
568        """
569            Add album to db
570            @param album_name as str
571            @param mb_album_id as str
572            @param lp_album_id as str
573            @param artist_ids as [int]
574            @param uri as str
575            @param loved as bool
576            @param popularity as int
577            @param rate as int
578            @param synced as int
579            @param mtime as int
580            @param storage_type as StorageType
581            @return (added as bool, album_id as int)
582            @commit needed
583        """
584        added = False
585        if uri.find("://") != -1:
586            f = Gio.File.new_for_uri(uri)
587            parent = f.get_parent()
588            if parent is not None:
589                uri = parent.get_uri()
590        album_id = App().albums.get_id(album_name, mb_album_id, artist_ids)
591        # Check storage type did not changed, remove album then
592        if album_id is not None:
593            current_storage_type = App().albums.get_storage_type(album_id)
594            if current_storage_type != storage_type:
595                App().tracks.remove_album(album_id)
596                App().tracks.clean(False)
597                App().albums.clean(False)
598                App().artists.clean(False)
599                album_id = None
600        if album_id is None:
601            added = True
602            album_id = App().albums.add(album_name, mb_album_id, lp_album_id,
603                                        artist_ids, uri, loved, popularity,
604                                        rate, synced, mtime, storage_type)
605        # Check if path did not change
606        elif App().albums.get_uri(album_id) != uri:
607            App().albums.set_uri(album_id, uri)
608        return (added, album_id)
609
610#######################
611# PRIVATE             #
612#######################
613    def __get_extended(self, tags, keys):
614        """
615            Return tag from tags following keys
616            @param tags as Gst.TagList
617            @param keys as [str]
618            @return Tag as str
619        """
620        if tags is None:
621            return ""
622        items = []
623        try:
624            for i in range(tags.get_tag_size("extended-comment")):
625                (exists, read) = tags.get_string_index("extended-comment", i)
626                for key in keys:
627                    if exists and read.startswith(key + "="):
628                        items.append("".join(read.split("=")[1:]))
629        except Exception as e:
630            Logger.error("TagReader::__get_extended(): %s", e)
631        return ";".join(items)
632
633    def __get(self, tags, keys):
634        """
635            Return tag from tags following keys
636            Only handles string/uint/double
637            @param tags as Gst.TagList
638            @param keys as [str]
639            @return Tag as str/int/double. Empty string if does not exist
640        """
641        if tags is None:
642            return ""
643        items = []
644        try:
645            for key in keys:
646                for i in range(tags.get_tag_size(key)):
647                    if key in self.__STRING:
648                        (exists, read) = tags.get_string_index(key, i)
649                        if exists and read.strip(" "):
650                            items.append(read)
651                    elif key in self.__INT:
652                        (exists, read) = tags.get_uint_index(key, i)
653                        if exists:
654                            return read
655                    elif key in self.__DOUBLE:
656                        (exists, read) = tags.get_double_index(key, i)
657                        if exists:
658                            return read
659                    else:
660                        Logger.error("Missing key" % key)
661        except Exception as e:
662            Logger.error("TagReader::__get(): %s", e)
663        return ";".join(items)
664
665    def __get_private_bytes(self, tags, key):
666        """
667            Get key from private frame
668            @param tags as Gst.TagList
669            @param key as str
670            @return frame as bytes
671        """
672        try:
673            size = tags.get_tag_size("private-id3v2-frame")
674            encoded_key = key.encode("utf-8")
675            for i in range(0, size):
676                (exists, sample) = tags.get_sample_index(
677                    "private-id3v2-frame",
678                    i)
679                if not exists:
680                    continue
681                (exists, m) = sample.get_buffer().map(Gst.MapFlags.READ)
682                if not exists:
683                    continue
684                # Gstreamer 1.18 API breakage
685                try:
686                    b = m.data.tobytes()
687                except:
688                    b = m.data
689
690                if b[0:len(encoded_key)] != encoded_key:
691                    continue
692                return b
693        except Exception as e:
694            Logger.error("TagReader::__get_private_bytes(): %s" % e)
695        return b""
696
697    def __get_private_string(self, tags, key, lang):
698        """
699            Get key from private frame
700            @param tags as Gst.TagList
701            @param key as str
702            @param lang as bool
703            @return Tag as str
704        """
705        try:
706            b = self.__get_private_bytes(tags, key)
707            if lang:
708                frame = FrameLangTag(b)
709            else:
710                frame = FrameTextTag(b)
711            if frame.key == key:
712                return frame.string
713        except Exception as e:
714            Logger.error("TagReader::__get_private(): %s" % e)
715        return ""
716