1# Copyright 2004-2005 Joe Wreschnig, Michael Urman 2# 2009-2014 Christoph Reiter 3# 4# This program is free software; you can redistribute it and/or modify 5# it under the terms of the GNU General Public License as published by 6# the Free Software Foundation; either version 2 of the License, or 7# (at your option) any later version. 8 9import sys 10import base64 11 12import mutagen 13from mutagen.flac import Picture, error as FLACError 14from mutagen.id3 import ID3 15 16from quodlibet import config 17from quodlibet import const 18from quodlibet.util.path import get_temp_cover_file 19 20from ._audio import AudioFile, translate_errors, AudioFileError 21from ._image import EmbeddedImage, APICType 22 23 24# Migrate old layout 25sys.modules["formats.flac"] = sys.modules[__name__] 26sys.modules["formats.oggvorbis"] = sys.modules[__name__] 27 28 29class MutagenVCFile(AudioFile): 30 format = "Unknown Mutagen + vorbiscomment" 31 MutagenType = None 32 33 can_change_images = True 34 35 def __init__(self, filename, audio=None): 36 # If we're done a type probe, use the results of that to avoid 37 # reopening the file. 38 if audio is None: 39 with translate_errors(): 40 audio = self.MutagenType(filename) 41 self["~#length"] = audio.info.length 42 try: 43 self["~#bitrate"] = int(audio.info.bitrate / 1000) 44 except AttributeError: 45 pass 46 try: 47 self["~#channels"] = audio.info.channels 48 except AttributeError: 49 pass 50 try: 51 self["~#samplerate"] = audio.info.sample_rate 52 except AttributeError: 53 pass 54 if audio.tags and audio.tags.vendor: 55 self["~encoding"] = audio.tags.vendor 56 # mutagen keys are lower cased 57 for key, value in (audio.tags or {}).items(): 58 self[key] = "\n".join(value) 59 self.__post_read() 60 self.sanitize(filename) 61 62 def __post_read_total(self, main, fallback, single): 63 one = None 64 total = None 65 66 if single in self: 67 parts = self[single].split("/", 1) 68 if parts[0]: 69 one = parts[0] 70 if len(parts) > 1: 71 total = parts[1] 72 del self[single] 73 74 if main in self: 75 total = self[main] 76 del self[main] 77 else: 78 if fallback in self: 79 total = self[fallback] 80 del self[fallback] 81 82 final = None 83 if one is not None: 84 final = one 85 if total is not None: 86 if final is None: 87 final = "/" + total 88 else: 89 final += "/" + total 90 91 if final is not None: 92 self[single] = final 93 94 def __post_read(self): 95 email = config.get("editing", "save_email").strip() 96 maps = {"rating": float, "playcount": int} 97 for keyed_key, func in maps.items(): 98 emails = [s.lower() for s in ["", ":" + const.EMAIL, ":" + email]] 99 for subkey in emails: 100 key = keyed_key + subkey 101 if key in self: 102 try: 103 self["~#" + keyed_key] = func(self[key]) 104 except ValueError: 105 pass 106 del(self[key]) 107 108 if "metadata_block_picture" in self: 109 self.has_images = True 110 del(self["metadata_block_picture"]) 111 112 if "coverart" in self: 113 self.has_images = True 114 del(self["coverart"]) 115 116 if "coverartmime" in self: 117 del(self["coverartmime"]) 118 119 self.__post_read_total("tracktotal", "totaltracks", "tracknumber") 120 self.__post_read_total("disctotal", "totaldiscs", "discnumber") 121 122 def get_images(self): 123 try: 124 audio = self.MutagenType(self["~filename"]) 125 except Exception: 126 return [] 127 128 # metadata_block_picture 129 images = [] 130 for data in audio.get("metadata_block_picture", []): 131 try: 132 cover = Picture(base64.b64decode(data)) 133 except (TypeError, FLACError): 134 continue 135 136 f = get_temp_cover_file(cover.data) 137 images.append(EmbeddedImage( 138 f, cover.mime, cover.width, cover.height, cover.depth, 139 cover.type)) 140 141 # coverart + coverartmime 142 cover = audio.get("coverart") 143 try: 144 cover = cover and base64.b64decode(cover[0]) 145 except TypeError: 146 cover = None 147 148 if cover: 149 mime = audio.get("coverartmime") 150 mime = (mime and mime[0]) or "image/" 151 f = get_temp_cover_file(cover) 152 images.append(EmbeddedImage(f, mime)) 153 154 images.sort(key=lambda c: c.sort_key) 155 156 return images 157 158 def get_primary_image(self): 159 """Returns the primary embedded image""" 160 161 try: 162 audio = self.MutagenType(self["~filename"]) 163 except Exception: 164 return None 165 166 pictures = [] 167 for data in audio.get("metadata_block_picture", []): 168 try: 169 pictures.append(Picture(base64.b64decode(data))) 170 except (TypeError, FLACError, ValueError): 171 pass 172 173 cover = None 174 for pic in pictures: 175 if pic.type == APICType.COVER_FRONT: 176 cover = pic 177 break 178 cover = cover or pic 179 180 if cover: 181 f = get_temp_cover_file(cover.data) 182 return EmbeddedImage( 183 f, cover.mime, cover.width, cover.height, cover.depth, 184 cover.type) 185 186 cover = audio.get("coverart") 187 try: 188 cover = cover and base64.b64decode(cover[0]) 189 except (TypeError, ValueError): 190 cover = None 191 192 if not cover: 193 self.has_images = False 194 return 195 196 mime = audio.get("coverartmime") 197 mime = (mime and mime[0]) or "image/" 198 f = get_temp_cover_file(cover) 199 return EmbeddedImage(f, mime) 200 201 def clear_images(self): 202 """Delete all embedded images""" 203 204 with translate_errors(): 205 audio = self.MutagenType(self["~filename"]) 206 audio.pop("metadata_block_picture", None) 207 audio.pop("coverart", None) 208 audio.pop("coverartmime", None) 209 audio.save() 210 211 self.has_images = False 212 213 def set_image(self, image): 214 """Replaces all embedded images by the passed image""" 215 216 with translate_errors(): 217 audio = self.MutagenType(self["~filename"]) 218 219 try: 220 data = image.read() 221 except EnvironmentError as e: 222 raise AudioFileError(e) 223 224 pic = Picture() 225 pic.data = data 226 pic.type = APICType.COVER_FRONT 227 pic.mime = image.mime_type 228 pic.width = image.width 229 pic.height = image.height 230 pic.depth = image.color_depth 231 232 audio.pop("coverart", None) 233 audio.pop("coverartmime", None) 234 audio["metadata_block_picture"] = base64.b64encode( 235 pic.write()).decode("ascii") 236 237 with translate_errors(): 238 audio.save() 239 240 self.has_images = True 241 242 def can_change(self, k=None): 243 if k is None: 244 return super(MutagenVCFile, self).can_change(None) 245 else: 246 l = k.lower() 247 return (super(MutagenVCFile, self).can_change(k) and 248 l not in ["rating", "playcount", 249 "metadata_block_picture", 250 "coverart", "coverartmime"] and 251 not l.startswith("rating:") and 252 not l.startswith("playcount:")) 253 254 def __prep_write(self, comments): 255 email = config.get("editing", "save_email").strip() 256 for key in comments.keys(): 257 if key.startswith("rating:") or key.startswith("playcount:"): 258 if key.split(":", 1)[1] in [const.EMAIL, email]: 259 del(comments[key]) 260 elif key not in ["metadata_block_picture", "coverart", 261 "coverartmime"]: 262 del(comments[key]) 263 264 if config.getboolean("editing", "save_to_songs"): 265 email = email or const.EMAIL 266 if self.has_rating: 267 comments["rating:" + email] = str(self("~#rating")) 268 playcount = self.get("~#playcount", 0) 269 if playcount != 0: 270 comments["playcount:" + email] = str(playcount) 271 272 def __prep_write_total(self, comments, main, fallback, single): 273 lower = self.as_lowercased() 274 275 for k in [main, fallback, single]: 276 if k in comments: 277 del comments[k] 278 279 if single in lower: 280 parts = lower[single].split("/", 1) 281 282 if parts[0]: 283 comments[single] = [parts[0]] 284 285 if len(parts) > 1: 286 comments[main] = [parts[1]] 287 288 if main in lower: 289 comments[main] = lower.list(main) 290 291 if fallback in lower: 292 if main in comments: 293 comments[fallback] = lower.list(fallback) 294 else: 295 comments[main] = lower.list(fallback) 296 297 def write(self): 298 with translate_errors(): 299 audio = self.MutagenType(self["~filename"]) 300 if audio.tags is None: 301 audio.add_tags() 302 303 self.__prep_write(audio.tags) 304 305 lower = self.as_lowercased() 306 for key in lower.realkeys(): 307 audio.tags[key] = lower.list(key) 308 309 self.__prep_write_total(audio.tags, 310 "tracktotal", "totaltracks", "tracknumber") 311 self.__prep_write_total(audio.tags, 312 "disctotal", "totaldiscs", "discnumber") 313 314 with translate_errors(): 315 audio.save() 316 self.sanitize() 317 318extensions = [] 319ogg_formats = [] 320 321from mutagen.oggvorbis import OggVorbis 322extensions.append(".ogg") 323extensions.append(".oga") 324ogg_formats.append(OggVorbis) 325 326from mutagen.flac import FLAC, FLACNoHeaderError 327extensions.append(".flac") 328ogg_formats.append(FLAC) 329 330from mutagen.oggflac import OggFLAC 331extensions.append(".oggflac") 332ogg_formats.append(OggFLAC) 333 334from mutagen.oggspeex import OggSpeex 335extensions.append(".spx") 336ogg_formats.append(OggSpeex) 337 338from mutagen.oggtheora import OggTheora 339extensions.append(".ogv") 340ogg_formats.append(OggTheora) 341 342from mutagen.oggopus import OggOpus 343extensions.append(".opus") 344ogg_formats.append(OggOpus) 345 346 347class OggFile(MutagenVCFile): 348 format = "Ogg Vorbis" 349 mimes = ["audio/vorbis", "audio/ogg; codecs=vorbis"] 350 MutagenType = OggVorbis 351 352 353class OggFLACFile(MutagenVCFile): 354 format = "Ogg FLAC" 355 mimes = ["audio/x-oggflac", "audio/ogg; codecs=flac"] 356 MutagenType = OggFLAC 357 358 359class OggSpeexFile(MutagenVCFile): 360 format = "Ogg Speex" 361 mimes = ["audio/x-speex", "audio/ogg; codecs=speex"] 362 MutagenType = OggSpeex 363 364 365class OggTheoraFile(MutagenVCFile): 366 format = "Ogg Theora" 367 mimes = ["video/x-theora", "video/ogg; codecs=theora"] 368 MutagenType = OggTheora 369 370 371class OggOpusFile(MutagenVCFile): 372 format = "Ogg Opus" 373 mimes = ["audio/ogg; codecs=opus"] 374 MutagenType = OggOpus 375 376 def __init__(self, *args, **kwargs): 377 super(OggOpusFile, self).__init__(*args, **kwargs) 378 self["~#samplerate"] = 48000 379 380 381class FLACFile(MutagenVCFile): 382 format = "FLAC" 383 mimes = ["audio/x-flac", "application/x-flac"] 384 MutagenType = FLAC 385 386 def __init__(self, filename, audio=None): 387 if audio is None: 388 with translate_errors(): 389 audio = FLAC(filename) 390 super(FLACFile, self).__init__(filename, audio) 391 if audio.pictures: 392 self.has_images = True 393 self["~#bitdepth"] = audio.info.bits_per_sample 394 395 def get_images(self): 396 images = super(FLACFile, self).get_images() 397 398 try: 399 tag = FLAC(self["~filename"]) 400 except Exception: 401 return images 402 403 for cover in tag.pictures: 404 fileobj = get_temp_cover_file(cover.data) 405 images.append(EmbeddedImage( 406 fileobj, cover.mime, cover.width, cover.height, cover.depth, 407 cover.type)) 408 409 images.sort(key=lambda c: c.sort_key) 410 411 return images 412 413 def get_primary_image(self): 414 """Returns the primary embedded image""" 415 416 try: 417 tag = FLAC(self["~filename"]) 418 except Exception: 419 return None 420 421 covers = tag.pictures 422 if not covers: 423 return super(FLACFile, self).get_primary_image() 424 425 covers.sort(key=lambda c: APICType.sort_key(c.type)) 426 cover = covers[0] 427 428 fileobj = get_temp_cover_file(cover.data) 429 return EmbeddedImage( 430 fileobj, cover.mime, cover.width, cover.height, cover.depth, 431 cover.type) 432 433 def clear_images(self): 434 """Delete all embedded images""" 435 436 with translate_errors(): 437 tag = FLAC(self["~filename"]) 438 tag.clear_pictures() 439 tag.save() 440 441 # clear vcomment tags 442 super(FLACFile, self).clear_images() 443 444 self.has_images = False 445 446 def set_image(self, image): 447 """Replaces all embedded images by the passed image""" 448 449 with translate_errors(): 450 tag = FLAC(self["~filename"]) 451 452 try: 453 data = image.read() 454 except EnvironmentError as e: 455 raise AudioFileError(e) 456 457 pic = Picture() 458 pic.data = data 459 pic.type = APICType.COVER_FRONT 460 pic.mime = image.mime_type 461 pic.width = image.width 462 pic.height = image.height 463 pic.depth = image.color_depth 464 465 tag.add_picture(pic) 466 467 with translate_errors(): 468 tag.save() 469 470 # clear vcomment tags 471 super(FLACFile, self).clear_images() 472 473 self.has_images = True 474 475 def write(self): 476 if ID3 is not None: 477 with translate_errors(): 478 ID3().delete(filename=self["~filename"]) 479 super(FLACFile, self).write() 480 481types = [] 482for var in list(globals().values()): 483 if getattr(var, 'MutagenType', None): 484 types.append(var) 485 486 487def loader(filename): 488 """ 489 Returns: 490 AudioFile 491 Raises: 492 AudioFileError 493 """ 494 495 with translate_errors(): 496 audio = mutagen.File(filename, options=ogg_formats) 497 if audio is None and FLAC is not None: 498 # FLAC with ID3 499 try: 500 audio = FLAC(filename) 501 except FLACNoHeaderError: 502 pass 503 if audio is None: 504 raise AudioFileError("file type could not be determined") 505 Kind = type(audio) 506 for klass in globals().values(): 507 if Kind is getattr(klass, 'MutagenType', None): 508 return klass(filename, audio) 509 raise AudioFileError("file type could not be determined") 510