1# Copyright 2004-2005 Joe Wreschnig, Michael Urman
2#           2009-2014 Christoph Reiter
3#
4# This program is free software; you can redistribute it and/or modify
5# it under the terms of the GNU General Public License as published by
6# the Free Software Foundation; either version 2 of the License, or
7# (at your option) any later version.
8
9import sys
10import base64
11
12import mutagen
13from mutagen.flac import Picture, error as FLACError
14from mutagen.id3 import ID3
15
16from quodlibet import config
17from quodlibet import const
18from quodlibet.util.path import get_temp_cover_file
19
20from ._audio import AudioFile, translate_errors, AudioFileError
21from ._image import EmbeddedImage, APICType
22
23
24# Migrate old layout
25sys.modules["formats.flac"] = sys.modules[__name__]
26sys.modules["formats.oggvorbis"] = sys.modules[__name__]
27
28
29class MutagenVCFile(AudioFile):
30    format = "Unknown Mutagen + vorbiscomment"
31    MutagenType = None
32
33    can_change_images = True
34
35    def __init__(self, filename, audio=None):
36        # If we're done a type probe, use the results of that to avoid
37        # reopening the file.
38        if audio is None:
39            with translate_errors():
40                audio = self.MutagenType(filename)
41        self["~#length"] = audio.info.length
42        try:
43            self["~#bitrate"] = int(audio.info.bitrate / 1000)
44        except AttributeError:
45            pass
46        try:
47            self["~#channels"] = audio.info.channels
48        except AttributeError:
49            pass
50        try:
51            self["~#samplerate"] = audio.info.sample_rate
52        except AttributeError:
53            pass
54        if audio.tags and audio.tags.vendor:
55            self["~encoding"] = audio.tags.vendor
56        # mutagen keys are lower cased
57        for key, value in (audio.tags or {}).items():
58            self[key] = "\n".join(value)
59        self.__post_read()
60        self.sanitize(filename)
61
62    def __post_read_total(self, main, fallback, single):
63        one = None
64        total = None
65
66        if single in self:
67            parts = self[single].split("/", 1)
68            if parts[0]:
69                one = parts[0]
70            if len(parts) > 1:
71                total = parts[1]
72            del self[single]
73
74        if main in self:
75            total = self[main]
76            del self[main]
77        else:
78            if fallback in self:
79                total = self[fallback]
80                del self[fallback]
81
82        final = None
83        if one is not None:
84            final = one
85        if total is not None:
86            if final is None:
87                final = "/" + total
88            else:
89                final += "/" + total
90
91        if final is not None:
92            self[single] = final
93
94    def __post_read(self):
95        email = config.get("editing", "save_email").strip()
96        maps = {"rating": float, "playcount": int}
97        for keyed_key, func in maps.items():
98            emails = [s.lower() for s in ["", ":" + const.EMAIL, ":" + email]]
99            for subkey in emails:
100                key = keyed_key + subkey
101                if key in self:
102                    try:
103                        self["~#" + keyed_key] = func(self[key])
104                    except ValueError:
105                        pass
106                    del(self[key])
107
108        if "metadata_block_picture" in self:
109            self.has_images = True
110            del(self["metadata_block_picture"])
111
112        if "coverart" in self:
113            self.has_images = True
114            del(self["coverart"])
115
116        if "coverartmime" in self:
117            del(self["coverartmime"])
118
119        self.__post_read_total("tracktotal", "totaltracks", "tracknumber")
120        self.__post_read_total("disctotal", "totaldiscs", "discnumber")
121
122    def get_images(self):
123        try:
124            audio = self.MutagenType(self["~filename"])
125        except Exception:
126            return []
127
128        # metadata_block_picture
129        images = []
130        for data in audio.get("metadata_block_picture", []):
131            try:
132                cover = Picture(base64.b64decode(data))
133            except (TypeError, FLACError):
134                continue
135
136            f = get_temp_cover_file(cover.data)
137            images.append(EmbeddedImage(
138                f, cover.mime, cover.width, cover.height, cover.depth,
139                cover.type))
140
141        # coverart + coverartmime
142        cover = audio.get("coverart")
143        try:
144            cover = cover and base64.b64decode(cover[0])
145        except TypeError:
146            cover = None
147
148        if cover:
149            mime = audio.get("coverartmime")
150            mime = (mime and mime[0]) or "image/"
151            f = get_temp_cover_file(cover)
152            images.append(EmbeddedImage(f, mime))
153
154        images.sort(key=lambda c: c.sort_key)
155
156        return images
157
158    def get_primary_image(self):
159        """Returns the primary embedded image"""
160
161        try:
162            audio = self.MutagenType(self["~filename"])
163        except Exception:
164            return None
165
166        pictures = []
167        for data in audio.get("metadata_block_picture", []):
168            try:
169                pictures.append(Picture(base64.b64decode(data)))
170            except (TypeError, FLACError, ValueError):
171                pass
172
173        cover = None
174        for pic in pictures:
175            if pic.type == APICType.COVER_FRONT:
176                cover = pic
177                break
178            cover = cover or pic
179
180        if cover:
181            f = get_temp_cover_file(cover.data)
182            return EmbeddedImage(
183                f, cover.mime, cover.width, cover.height, cover.depth,
184                cover.type)
185
186        cover = audio.get("coverart")
187        try:
188            cover = cover and base64.b64decode(cover[0])
189        except (TypeError, ValueError):
190            cover = None
191
192        if not cover:
193            self.has_images = False
194            return
195
196        mime = audio.get("coverartmime")
197        mime = (mime and mime[0]) or "image/"
198        f = get_temp_cover_file(cover)
199        return EmbeddedImage(f, mime)
200
201    def clear_images(self):
202        """Delete all embedded images"""
203
204        with translate_errors():
205            audio = self.MutagenType(self["~filename"])
206            audio.pop("metadata_block_picture", None)
207            audio.pop("coverart", None)
208            audio.pop("coverartmime", None)
209            audio.save()
210
211        self.has_images = False
212
213    def set_image(self, image):
214        """Replaces all embedded images by the passed image"""
215
216        with translate_errors():
217            audio = self.MutagenType(self["~filename"])
218
219        try:
220            data = image.read()
221        except EnvironmentError as e:
222            raise AudioFileError(e)
223
224        pic = Picture()
225        pic.data = data
226        pic.type = APICType.COVER_FRONT
227        pic.mime = image.mime_type
228        pic.width = image.width
229        pic.height = image.height
230        pic.depth = image.color_depth
231
232        audio.pop("coverart", None)
233        audio.pop("coverartmime", None)
234        audio["metadata_block_picture"] = base64.b64encode(
235            pic.write()).decode("ascii")
236
237        with translate_errors():
238            audio.save()
239
240        self.has_images = True
241
242    def can_change(self, k=None):
243        if k is None:
244            return super(MutagenVCFile, self).can_change(None)
245        else:
246            l = k.lower()
247            return (super(MutagenVCFile, self).can_change(k) and
248                    l not in ["rating", "playcount",
249                              "metadata_block_picture",
250                              "coverart", "coverartmime"] and
251                    not l.startswith("rating:") and
252                    not l.startswith("playcount:"))
253
254    def __prep_write(self, comments):
255        email = config.get("editing", "save_email").strip()
256        for key in comments.keys():
257            if key.startswith("rating:") or key.startswith("playcount:"):
258                if key.split(":", 1)[1] in [const.EMAIL, email]:
259                    del(comments[key])
260            elif key not in ["metadata_block_picture", "coverart",
261                    "coverartmime"]:
262                del(comments[key])
263
264        if config.getboolean("editing", "save_to_songs"):
265            email = email or const.EMAIL
266            if self.has_rating:
267                comments["rating:" + email] = str(self("~#rating"))
268            playcount = self.get("~#playcount", 0)
269            if playcount != 0:
270                comments["playcount:" + email] = str(playcount)
271
272    def __prep_write_total(self, comments, main, fallback, single):
273        lower = self.as_lowercased()
274
275        for k in [main, fallback, single]:
276            if k in comments:
277                del comments[k]
278
279        if single in lower:
280            parts = lower[single].split("/", 1)
281
282            if parts[0]:
283                comments[single] = [parts[0]]
284
285            if len(parts) > 1:
286                comments[main] = [parts[1]]
287
288        if main in lower:
289            comments[main] = lower.list(main)
290
291        if fallback in lower:
292            if main in comments:
293                comments[fallback] = lower.list(fallback)
294            else:
295                comments[main] = lower.list(fallback)
296
297    def write(self):
298        with translate_errors():
299            audio = self.MutagenType(self["~filename"])
300        if audio.tags is None:
301            audio.add_tags()
302
303        self.__prep_write(audio.tags)
304
305        lower = self.as_lowercased()
306        for key in lower.realkeys():
307            audio.tags[key] = lower.list(key)
308
309        self.__prep_write_total(audio.tags,
310                                "tracktotal", "totaltracks", "tracknumber")
311        self.__prep_write_total(audio.tags,
312                                "disctotal", "totaldiscs", "discnumber")
313
314        with translate_errors():
315            audio.save()
316        self.sanitize()
317
318extensions = []
319ogg_formats = []
320
321from mutagen.oggvorbis import OggVorbis
322extensions.append(".ogg")
323extensions.append(".oga")
324ogg_formats.append(OggVorbis)
325
326from mutagen.flac import FLAC, FLACNoHeaderError
327extensions.append(".flac")
328ogg_formats.append(FLAC)
329
330from mutagen.oggflac import OggFLAC
331extensions.append(".oggflac")
332ogg_formats.append(OggFLAC)
333
334from mutagen.oggspeex import OggSpeex
335extensions.append(".spx")
336ogg_formats.append(OggSpeex)
337
338from mutagen.oggtheora import OggTheora
339extensions.append(".ogv")
340ogg_formats.append(OggTheora)
341
342from mutagen.oggopus import OggOpus
343extensions.append(".opus")
344ogg_formats.append(OggOpus)
345
346
347class OggFile(MutagenVCFile):
348    format = "Ogg Vorbis"
349    mimes = ["audio/vorbis", "audio/ogg; codecs=vorbis"]
350    MutagenType = OggVorbis
351
352
353class OggFLACFile(MutagenVCFile):
354    format = "Ogg FLAC"
355    mimes = ["audio/x-oggflac", "audio/ogg; codecs=flac"]
356    MutagenType = OggFLAC
357
358
359class OggSpeexFile(MutagenVCFile):
360    format = "Ogg Speex"
361    mimes = ["audio/x-speex", "audio/ogg; codecs=speex"]
362    MutagenType = OggSpeex
363
364
365class OggTheoraFile(MutagenVCFile):
366    format = "Ogg Theora"
367    mimes = ["video/x-theora", "video/ogg; codecs=theora"]
368    MutagenType = OggTheora
369
370
371class OggOpusFile(MutagenVCFile):
372    format = "Ogg Opus"
373    mimes = ["audio/ogg; codecs=opus"]
374    MutagenType = OggOpus
375
376    def __init__(self, *args, **kwargs):
377        super(OggOpusFile, self).__init__(*args, **kwargs)
378        self["~#samplerate"] = 48000
379
380
381class FLACFile(MutagenVCFile):
382    format = "FLAC"
383    mimes = ["audio/x-flac", "application/x-flac"]
384    MutagenType = FLAC
385
386    def __init__(self, filename, audio=None):
387        if audio is None:
388            with translate_errors():
389                audio = FLAC(filename)
390        super(FLACFile, self).__init__(filename, audio)
391        if audio.pictures:
392            self.has_images = True
393        self["~#bitdepth"] = audio.info.bits_per_sample
394
395    def get_images(self):
396        images = super(FLACFile, self).get_images()
397
398        try:
399            tag = FLAC(self["~filename"])
400        except Exception:
401            return images
402
403        for cover in tag.pictures:
404            fileobj = get_temp_cover_file(cover.data)
405            images.append(EmbeddedImage(
406                fileobj, cover.mime, cover.width, cover.height, cover.depth,
407                cover.type))
408
409        images.sort(key=lambda c: c.sort_key)
410
411        return images
412
413    def get_primary_image(self):
414        """Returns the primary embedded image"""
415
416        try:
417            tag = FLAC(self["~filename"])
418        except Exception:
419            return None
420
421        covers = tag.pictures
422        if not covers:
423            return super(FLACFile, self).get_primary_image()
424
425        covers.sort(key=lambda c: APICType.sort_key(c.type))
426        cover = covers[0]
427
428        fileobj = get_temp_cover_file(cover.data)
429        return EmbeddedImage(
430            fileobj, cover.mime, cover.width, cover.height, cover.depth,
431            cover.type)
432
433    def clear_images(self):
434        """Delete all embedded images"""
435
436        with translate_errors():
437            tag = FLAC(self["~filename"])
438            tag.clear_pictures()
439            tag.save()
440
441        # clear vcomment tags
442        super(FLACFile, self).clear_images()
443
444        self.has_images = False
445
446    def set_image(self, image):
447        """Replaces all embedded images by the passed image"""
448
449        with translate_errors():
450            tag = FLAC(self["~filename"])
451
452        try:
453            data = image.read()
454        except EnvironmentError as e:
455            raise AudioFileError(e)
456
457        pic = Picture()
458        pic.data = data
459        pic.type = APICType.COVER_FRONT
460        pic.mime = image.mime_type
461        pic.width = image.width
462        pic.height = image.height
463        pic.depth = image.color_depth
464
465        tag.add_picture(pic)
466
467        with translate_errors():
468            tag.save()
469
470        # clear vcomment tags
471        super(FLACFile, self).clear_images()
472
473        self.has_images = True
474
475    def write(self):
476        if ID3 is not None:
477            with translate_errors():
478                ID3().delete(filename=self["~filename"])
479        super(FLACFile, self).write()
480
481types = []
482for var in list(globals().values()):
483    if getattr(var, 'MutagenType', None):
484        types.append(var)
485
486
487def loader(filename):
488    """
489    Returns:
490        AudioFile
491    Raises:
492        AudioFileError
493    """
494
495    with translate_errors():
496        audio = mutagen.File(filename, options=ogg_formats)
497        if audio is None and FLAC is not None:
498            # FLAC with ID3
499            try:
500                audio = FLAC(filename)
501            except FLACNoHeaderError:
502                pass
503        if audio is None:
504            raise AudioFileError("file type could not be determined")
505        Kind = type(audio)
506        for klass in globals().values():
507            if Kind is getattr(klass, 'MutagenType', None):
508                return klass(filename, audio)
509        raise AudioFileError("file type could not be determined")
510