1#!/usr/bin/python 2 3# Audio Tools, a module and set of tools for manipulating audio data 4# Copyright (C) 2007-2014 Brian Langenberger 5 6# This program is free software; you can redistribute it and/or modify 7# it under the terms of the GNU General Public License as published by 8# the Free Software Foundation; either version 2 of the License, or 9# (at your option) any later version. 10 11# This program is distributed in the hope that it will be useful, 12# but WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14# GNU General Public License for more details. 15 16# You should have received a copy of the GNU General Public License 17# along with this program; if not, write to the Free Software 18# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 19 20 21import sys 22from audiotools import (AudioFile, MetaData) 23 24 25# takes a pair of integers (or None) for the current and total values 26# returns a unicode string of their combined pair 27# for example, __number_pair__(2,3) returns u"2/3" 28# whereas __number_pair__(4,0) returns u"4" 29def __number_pair__(current, total): 30 def empty(i): 31 return i is None 32 33 unslashed_format = u"%d" 34 slashed_format = u"%d/%d" 35 36 if empty(current) and empty(total): 37 return unslashed_format % (0,) 38 elif (not empty(current)) and empty(total): 39 return unslashed_format % (current,) 40 elif empty(current) and (not empty(total)): 41 return slashed_format % (0, total) 42 else: 43 # neither current or total are empty 44 return slashed_format % (current, total) 45 46 47def limited_transfer_data(from_function, to_function, max_bytes): 48 """transfers up to max_bytes from from_function to to_function 49 or as many bytes as from_function generates as strings""" 50 51 BUFFER_SIZE = 0x100000 52 s = from_function(BUFFER_SIZE) 53 while (len(s) > 0) and (max_bytes > 0): 54 if len(s) > max_bytes: 55 s = s[0:max_bytes] 56 to_function(s) 57 max_bytes -= len(s) 58 s = from_function(BUFFER_SIZE) 59 60 61class ApeTagItem(object): 62 """a single item in the ApeTag, typically a unicode value""" 63 64 FORMAT = "32u 1u 2u 29p" 65 66 def __init__(self, item_type, read_only, key, data): 67 """fields are as follows: 68 69 item_type is 0 = UTF-8, 1 = binary, 2 = external, 3 = reserved 70 read_only is 1 if the item is read only 71 key is a bytes object of the item's key 72 data is a bytes object of the data itself 73 """ 74 75 self.type = item_type 76 self.read_only = read_only 77 78 assert(isinstance(key, bytes)) 79 self.key = key 80 assert(isinstance(data, bytes)) 81 self.data = data 82 83 def __eq__(self, item): 84 for attr in ["type", "read_only", "key", "data"]: 85 if ((not hasattr(item, attr)) or (getattr(self, attr) != 86 getattr(item, attr))): 87 return False 88 else: 89 return True 90 91 def total_size(self): 92 """returns total size of item in bytes""" 93 94 return 4 + 4 + len(self.key) + 1 + len(self.data) 95 96 def copy(self): 97 """returns a duplicate ApeTagItem""" 98 99 return ApeTagItem(self.type, 100 self.read_only, 101 self.key, 102 self.data) 103 104 def __repr__(self): 105 return "ApeTagItem(%s,%s,%s,%s)" % \ 106 (repr(self.type), 107 repr(self.read_only), 108 repr(self.key), 109 repr(self.data)) 110 111 def raw_info_pair(self): 112 """returns a human-readable key/value pair of item data""" 113 114 if self.type == 0: # text 115 if self.read_only: 116 return (self.key.decode('ascii'), 117 u"(read only) %s" % (self.data.decode('utf-8'))) 118 else: 119 return (self.key.decode('ascii'), self.data.decode('utf-8')) 120 elif self.type == 1: # binary 121 return (self.key.decode('ascii'), 122 u"(binary) %d bytes" % (len(self.data))) 123 elif self.type == 2: # external 124 return (self.key.decode('ascii'), 125 u"(external) %d bytes" % (len(self.data))) 126 else: # reserved 127 return (self.key.decode('ascii'), 128 u"(reserved) %d bytes" % (len(self.data))) 129 130 if sys.version_info[0] >= 3: 131 def __str__(self): 132 return self.__unicode__() 133 else: 134 def __str__(self): 135 return self.data 136 137 def __unicode__(self): 138 return self.data.rstrip(b"\x00").decode('utf-8', 'replace') 139 140 def number(self): 141 """returns the track/album_number portion of a slashed number pair""" 142 143 import re 144 145 unicode_value = self.__unicode__() 146 147 int_string = re.search(r'\d+', unicode_value) 148 if int_string is None: 149 return None 150 151 int_value = int(int_string.group(0)) 152 if (int_value == 0) and (u"/" in unicode_value): 153 total_value = re.search(r'\d+', 154 unicode_value.split(u"/")[1]) 155 if total_value is not None: 156 # don't return placeholder 0 value 157 # when a _total value is present 158 # but _number value is 0 159 return None 160 else: 161 return int_value 162 else: 163 return int_value 164 165 def total(self): 166 """returns the track/album_total portion of a slashed number pair""" 167 168 import re 169 170 unicode_value = self.__unicode__() 171 172 if u"/" not in unicode_value: 173 return None 174 175 int_string = re.search(r'\d+', unicode_value.split(u"/")[1]) 176 177 if int_string is not None: 178 return int(int_string.group(0)) 179 else: 180 return None 181 182 @classmethod 183 def parse(cls, reader): 184 """returns an ApeTagItem parsed from the given BitstreamReader""" 185 186 (item_value_length, 187 read_only, 188 encoding) = reader.parse(cls.FORMAT) 189 190 key = [] 191 c = reader.read_bytes(1) 192 while c != b"\x00": 193 key.append(c) 194 c = reader.read_bytes(1) 195 196 value = reader.read_bytes(item_value_length) 197 198 return cls(encoding, read_only, b"".join(key), value) 199 200 def build(self, writer): 201 """writes the ApeTagItem values to the given BitstreamWriter""" 202 203 writer.build("%s %db 8u %db" % (self.FORMAT, 204 len(self.key), 205 len(self.data)), 206 (len(self.data), 207 self.read_only, 208 self.type, 209 self.key, 0, self.data)) 210 211 @classmethod 212 def binary(cls, key, data): 213 """returns an ApeTagItem of binary data 214 215 key is an ASCII string, data is a binary string""" 216 217 return cls(1, 0, key, data) 218 219 @classmethod 220 def external(cls, key, data): 221 """returns an ApeTagItem of external data 222 223 key is an ASCII string, data is a binary string""" 224 225 return cls(2, 0, key, data) 226 227 @classmethod 228 def string(cls, key, data): 229 """returns an ApeTagItem of text data 230 231 key is a bytes object, data is a unicode string""" 232 233 assert(isinstance(key, bytes)) 234 assert(isinstance(data, 235 str if (sys.version_info[0] >= 3) else unicode)) 236 237 return cls(0, 0, key, data.encode('utf-8', 'replace')) 238 239 240class ApeTag(MetaData): 241 """a complete APEv2 tag""" 242 243 HEADER_FORMAT = "8b 32u 32u 32u 1u 2u 26p 1u 1u 1u 64p" 244 245 ITEM = ApeTagItem 246 247 ATTRIBUTE_MAP = {'track_name': b'Title', 248 'track_number': b'Track', 249 'track_total': b'Track', 250 'album_number': b'Media', 251 'album_total': b'Media', 252 'album_name': b'Album', 253 'artist_name': b'Artist', 254 # "Performer" is not a defined APEv2 key 255 # it would be nice to have, yet would not be standard 256 'performer_name': b'Performer', 257 'composer_name': b'Composer', 258 'conductor_name': b'Conductor', 259 'ISRC': b'ISRC', 260 'catalog': b'Catalog', 261 'copyright': b'Copyright', 262 'publisher': b'Publisher', 263 'year': b'Year', 264 'date': b'Record Date', 265 'comment': b'Comment'} 266 267 INTEGER_ITEMS = (b'Track', b'Media') 268 269 def __init__(self, tags, contains_header=True, contains_footer=True): 270 """constructs an ApeTag from a list of ApeTagItem objects""" 271 272 for tag in tags: 273 assert(isinstance(tag, ApeTagItem)) 274 MetaData.__setattr__(self, "tags", list(tags)) 275 MetaData.__setattr__(self, "contains_header", contains_header) 276 MetaData.__setattr__(self, "contains_footer", contains_footer) 277 278 def __repr__(self): 279 return "ApeTag(%s, %s, %s)" % (repr(self.tags), 280 repr(self.contains_header), 281 repr(self.contains_footer)) 282 283 def total_size(self): 284 """returns the minimum size of the total ApeTag, in bytes""" 285 286 size = 0 287 if self.contains_header: 288 size += 32 289 for tag in self.tags: 290 size += tag.total_size() 291 if self.contains_footer: 292 size += 32 293 return size 294 295 def __eq__(self, metadata): 296 if isinstance(metadata, ApeTag): 297 if set(self.keys()) != set(metadata.keys()): 298 return False 299 300 for tag in self.tags: 301 try: 302 if tag.data != metadata[tag.key].data: 303 return False 304 except KeyError: 305 return False 306 else: 307 return True 308 elif isinstance(metadata, MetaData): 309 return MetaData.__eq__(self, metadata) 310 else: 311 return False 312 313 def keys(self): 314 return [tag.key for tag in self.tags] 315 316 def __contains__(self, key): 317 for tag in self.tags: 318 if tag.key == key: 319 return True 320 else: 321 return False 322 323 def __getitem__(self, key): 324 assert(isinstance(key, bytes)) 325 326 for tag in self.tags: 327 if tag.key == key: 328 return tag 329 else: 330 raise KeyError(key) 331 332 def get(self, key, default): 333 assert(isinstance(key, bytes)) 334 335 try: 336 return self[key] 337 except KeyError: 338 return default 339 340 def __setitem__(self, key, value): 341 assert(isinstance(key, bytes)) 342 343 for i in range(len(self.tags)): 344 if self.tags[i].key == key: 345 self.tags[i] = value 346 return 347 else: 348 self.tags.append(value) 349 350 def index(self, key): 351 assert(isinstance(key, bytes)) 352 353 for (i, tag) in enumerate(self.tags): 354 if tag.key == key: 355 return i 356 else: 357 raise ValueError(key) 358 359 def __delitem__(self, key): 360 assert(isinstance(key, bytes)) 361 362 new_tags = [tag for tag in self.tags if tag.key != key] 363 if len(new_tags) < len(self.tags): 364 self.tags = new_tags 365 else: 366 raise KeyError(key) 367 368 def __getattr__(self, attr): 369 if attr in self.ATTRIBUTE_MAP: 370 try: 371 if attr in {'track_number', 'album_number'}: 372 return self[self.ATTRIBUTE_MAP[attr]].number() 373 elif attr in {'track_total', 'album_total'}: 374 return self[self.ATTRIBUTE_MAP[attr]].total() 375 else: 376 return self[self.ATTRIBUTE_MAP[attr]].__unicode__() 377 except KeyError: 378 return None 379 elif attr in MetaData.FIELDS: 380 return None 381 else: 382 return MetaData.__getattribute__(self, attr) 383 384 # if an attribute is updated (e.g. self.track_name) 385 # make sure to update the corresponding dict pair 386 def __setattr__(self, attr, value): 387 def swap_number(unicode_value, new_number): 388 import re 389 390 return re.sub(r'\d+', u"%d" % (new_number), unicode_value, 1) 391 392 def swap_slashed_number(unicode_value, new_number): 393 if u"/" in unicode_value: 394 (first, second) = unicode_value.split(u"/", 1) 395 return u"/".join([first, swap_number(second, new_number)]) 396 else: 397 return u"/".join([unicode_value, u"%d" % (new_number)]) 398 399 if attr in self.ATTRIBUTE_MAP: 400 key = self.ATTRIBUTE_MAP[attr] 401 if value is not None: 402 if attr in {'track_number', 'album_number'}: 403 try: 404 current_value = self[key].__unicode__() 405 self[key] = self.ITEM.string( 406 key, swap_number(current_value, value)) 407 except KeyError: 408 self[key] = self.ITEM.string( 409 key, __number_pair__(value, None)) 410 elif attr in {'track_total', 'album_total'}: 411 try: 412 current_value = self[key].__unicode__() 413 self[key] = self.ITEM.string( 414 key, swap_slashed_number(current_value, value)) 415 except KeyError: 416 self[key] = self.ITEM.string( 417 key, __number_pair__(None, value)) 418 else: 419 self[key] = self.ITEM.string(key, value) 420 else: 421 delattr(self, attr) 422 else: 423 MetaData.__setattr__(self, attr, value) 424 425 def __delattr__(self, attr): 426 import re 427 428 def zero_number(unicode_value): 429 return re.sub(r'\d+', u"0", unicode_value, 1) 430 431 if attr in self.ATTRIBUTE_MAP: 432 key = self.ATTRIBUTE_MAP[attr] 433 434 if attr in {'track_number', 'album_number'}: 435 try: 436 tag = self[key] 437 if tag.total() is None: 438 # if no slashed _total field, delete entire tag 439 del(self[key]) 440 else: 441 # otherwise replace initial portion with 0 442 self[key] = self.ITEM.string( 443 key, zero_number(tag.__unicode__())) 444 except KeyError: 445 # no tag to delete 446 pass 447 elif attr in {'track_total', 'album_total'}: 448 try: 449 tag = self[key] 450 if tag.total() is not None: 451 if tag.number() is not None: 452 self[key] = self.ITEM.string( 453 key, 454 tag.__unicode__().split(u"/", 1)[0].rstrip()) 455 else: 456 del(self[key]) 457 else: 458 # no total portion, so nothing to do 459 pass 460 except KeyError: 461 # no tag to delete portion of 462 pass 463 else: 464 try: 465 del(self[key]) 466 except KeyError: 467 pass 468 elif attr in MetaData.FIELDS: 469 pass 470 else: 471 MetaData.__delattr__(self, attr) 472 473 @classmethod 474 def converted(cls, metadata): 475 """converts a MetaData object to an ApeTag object""" 476 477 if metadata is None: 478 return None 479 elif isinstance(metadata, ApeTag): 480 return ApeTag([tag.copy() for tag in metadata.tags], 481 contains_header=metadata.contains_header, 482 contains_footer=metadata.contains_footer) 483 else: 484 tags = cls([]) 485 486 for (field, value) in metadata.filled_fields(): 487 if field in cls.ATTRIBUTE_MAP.keys(): 488 setattr(tags, field, value) 489 490 for image in metadata.images(): 491 tags.add_image(image) 492 493 return tags 494 495 def raw_info(self): 496 """returns the ApeTag as a human-readable unicode string""" 497 498 from os import linesep 499 from audiotools import output_table 500 501 # align tag values on the "=" sign 502 table = output_table() 503 504 for tag in self.tags: 505 row = table.row() 506 (key, value) = tag.raw_info_pair() 507 row.add_column(key, "right") 508 row.add_column(u" = ") 509 row.add_column(value) 510 511 return (u"APEv2:" + linesep + linesep.join(table.format())) 512 513 @classmethod 514 def supports_images(cls): 515 """returns True""" 516 517 return True 518 519 def __parse_image__(self, key, type): 520 from audiotools import Image 521 from io import BytesIO 522 523 data = BytesIO(self[key].data) 524 description = [] 525 c = data.read(1) 526 while c != b'\x00': 527 description.append(c) 528 c = data.read(1) 529 530 return Image.new(data.read(), 531 b"".join(description).decode('utf-8', 'replace'), 532 type) 533 534 def add_image(self, image): 535 """embeds an Image object in this metadata""" 536 537 from audiotools import FRONT_COVER, BACK_COVER 538 539 if image.type == FRONT_COVER: 540 self[b'Cover Art (front)'] = self.ITEM.binary( 541 b'Cover Art (front)', 542 image.description.encode('utf-8', 'replace') + 543 b"\x00" + 544 image.data) 545 elif image.type == BACK_COVER: 546 self[b'Cover Art (back)'] = self.ITEM.binary( 547 b'Cover Art (back)', 548 image.description.encode('utf-8', 'replace') + 549 b"\x00" + 550 image.data) 551 552 def delete_image(self, image): 553 """deletes an Image object from this metadata""" 554 555 if (image.type == 0) and b'Cover Art (front)' in self.keys(): 556 del(self[b'Cover Art (front)']) 557 elif (image.type == 1) and b'Cover Art (back)' in self.keys(): 558 del(self[b'Cover Art (back)']) 559 560 def images(self): 561 """returns a list of embedded Image objects""" 562 563 from audiotools import FRONT_COVER, BACK_COVER 564 565 # APEv2 supports only one value per key 566 # so a single front and back cover are all that is possible 567 img = [] 568 if b'Cover Art (front)' in self.keys(): 569 img.append(self.__parse_image__(b'Cover Art (front)', 570 FRONT_COVER)) 571 if b'Cover Art (back)' in self.keys(): 572 img.append(self.__parse_image__(b'Cover Art (back)', 573 BACK_COVER)) 574 return img 575 576 @classmethod 577 def read(cls, apefile): 578 """returns an ApeTag object from an APEv2 tagged file object 579 580 may return None if the file object has no tag""" 581 582 from audiotools.bitstream import BitstreamReader, parse 583 584 apefile.seek(-32, 2) 585 tag_footer = apefile.read(32) 586 587 if len(tag_footer) < 32: 588 # not enough bytes for an ApeV2 tag 589 return None 590 591 (preamble, 592 version, 593 tag_size, 594 item_count, 595 read_only, 596 item_encoding, 597 is_header, 598 no_footer, 599 has_header) = parse(cls.HEADER_FORMAT, True, tag_footer) 600 601 if (preamble != b"APETAGEX") or (version != 2000): 602 return None 603 604 apefile.seek(-tag_size, 2) 605 reader = BitstreamReader(apefile, True) 606 607 return cls([ApeTagItem.parse(reader) for i in range(item_count)], 608 contains_header=has_header, 609 contains_footer=True) 610 611 def build(self, writer): 612 """outputs an APEv2 tag to BitstreamWriter""" 613 614 tag_size = sum(tag.total_size() for tag in self.tags) + 32 615 616 if self.contains_header: 617 writer.build(ApeTag.HEADER_FORMAT, 618 (b"APETAGEX", # preamble 619 2000, # version 620 tag_size, # tag size 621 len(self.tags), # item count 622 0, # read only 623 0, # encoding 624 1, # is header 625 not self.contains_footer, # no footer 626 self.contains_header)) # has header 627 628 for tag in self.tags: 629 tag.build(writer) 630 631 if self.contains_footer: 632 writer.build(ApeTag.HEADER_FORMAT, 633 (b"APETAGEX", # preamble 634 2000, # version 635 tag_size, # tag size 636 len(self.tags), # item count 637 0, # read only 638 0, # encoding 639 0, # is header 640 not self.contains_footer, # no footer 641 self.contains_header)) # has header 642 643 def clean(self): 644 import re 645 from audiotools.text import (CLEAN_REMOVE_DUPLICATE_TAG, 646 CLEAN_REMOVE_TRAILING_WHITESPACE, 647 CLEAN_REMOVE_LEADING_WHITESPACE, 648 CLEAN_FIX_TAG_FORMATTING, 649 CLEAN_REMOVE_EMPTY_TAG) 650 651 fixes_performed = [] 652 used_tags = set() 653 tag_items = [] 654 for tag in self.tags: 655 if tag.key.upper() in used_tags: 656 fixes_performed.append( 657 CLEAN_REMOVE_DUPLICATE_TAG % 658 {"field": tag.key.decode('ascii')}) 659 elif tag.type == 0: 660 used_tags.add(tag.key.upper()) 661 text = tag.__unicode__() 662 663 # check trailing whitespace 664 fix1 = text.rstrip() 665 if fix1 != text: 666 fixes_performed.append( 667 CLEAN_REMOVE_TRAILING_WHITESPACE % 668 {"field": tag.key.decode('ascii')}) 669 670 # check leading whitespace 671 fix2 = fix1.lstrip() 672 if fix2 != fix1: 673 fixes_performed.append( 674 CLEAN_REMOVE_LEADING_WHITESPACE % 675 {"field": tag.key.decode('ascii')}) 676 677 if tag.key in self.INTEGER_ITEMS: 678 if u"/" in fix2: 679 # item is a slashed field of some sort 680 (current, total) = fix2.split(u"/", 1) 681 current_int = re.search(r'\d+', current) 682 total_int = re.search(r'\d+', total) 683 if (current_int is None) and (total_int is None): 684 # neither side contains an integer value 685 # so ignore it altogether 686 fix3 = fix2 687 elif ((current_int is not None) and 688 (total_int is None)): 689 fix3 = u"%d" % (int(current_int.group(0))) 690 elif ((current_int is None) and 691 (total_int is not None)): 692 fix3 = u"%d/%d" % (0, int(total_int.group(0))) 693 else: 694 # both sides contain an int 695 fix3 = u"%d/%d" % (int(current_int.group(0)), 696 int(total_int.group(0))) 697 else: 698 # item contains no slash 699 current_int = re.search(r'\d+', fix2) 700 if current_int is not None: 701 # item contains an integer 702 fix3 = u"%d" % (int(current_int.group(0)),) 703 else: 704 # item contains no integer value so ignore it 705 # (although 'Track' should only contain 706 # integers, 'Media' may contain strings 707 # so it may be best to simply ignore that case) 708 fix3 = fix2 709 710 if fix3 != fix2: 711 fixes_performed.append( 712 CLEAN_FIX_TAG_FORMATTING % 713 {"field": tag.key.decode('ascii')}) 714 else: 715 fix3 = fix2 716 717 if len(fix3) > 0: 718 tag_items.append(ApeTagItem.string(tag.key, fix3)) 719 else: 720 fixes_performed.append( 721 CLEAN_REMOVE_EMPTY_TAG % 722 {"field": tag.key.decode('ascii')}) 723 else: 724 used_tags.add(tag.key.upper()) 725 tag_items.append(tag) 726 727 return (self.__class__(tag_items, 728 self.contains_header, 729 self.contains_footer), 730 fixes_performed) 731 732 733class ApeTaggedAudio(object): 734 """a class for handling audio formats with APEv2 tags 735 736 this class presumes there will be a filename attribute which 737 can be opened and checked for tags, or written if necessary""" 738 739 @classmethod 740 def supports_metadata(cls): 741 """returns True if this audio type supports MetaData""" 742 743 return True 744 745 def get_metadata(self): 746 """returns an ApeTag object, or None 747 748 raises IOError if unable to read the file""" 749 750 with open(self.filename, "rb") as f: 751 return ApeTag.read(f) 752 753 def update_metadata(self, metadata): 754 """takes this track's current MetaData object 755 as returned by get_metadata() and sets this track's metadata 756 with any fields updated in that object 757 758 raises IOError if unable to write the file 759 """ 760 761 if metadata is None: 762 return 763 elif not isinstance(metadata, ApeTag): 764 from audiotools.text import ERR_FOREIGN_METADATA 765 raise ValueError(ERR_FOREIGN_METADATA) 766 767 from audiotools.bitstream import parse, BitstreamWriter 768 from audiotools import transfer_data 769 770 f = open(self.filename, "r+b") 771 f.seek(-32, 2) 772 tag_footer = f.read(32) 773 774 if len(tag_footer) < 32: 775 # no existing ApeTag can fit, so append fresh tag 776 f.close() 777 with BitstreamWriter(open(self.filename, "ab"), True) as writer: 778 metadata.build(writer) 779 return 780 781 (preamble, 782 version, 783 tag_size, 784 item_count, 785 read_only, 786 item_encoding, 787 is_header, 788 no_footer, 789 has_header) = parse(ApeTag.HEADER_FORMAT, True, tag_footer) 790 791 if (preamble == b'APETAGEX') and (version == 2000): 792 if has_header: 793 old_tag_size = 32 + tag_size 794 else: 795 old_tag_size = tag_size 796 797 if metadata.total_size() >= old_tag_size: 798 # metadata has grown 799 # so append it to existing file 800 f.seek(-old_tag_size, 2) 801 writer = BitstreamWriter(f, True) 802 metadata.build(writer) 803 writer.close() 804 else: 805 f.close() 806 807 # metadata has shrunk 808 # so rewrite file with smaller metadata 809 from audiotools import TemporaryFile 810 from os.path import getsize 811 812 # copy everything but the last "old_tag_size" bytes 813 # from existing file to rewritten file 814 new_apev2 = TemporaryFile(self.filename) 815 816 with open(self.filename, "rb") as old_apev2: 817 limited_transfer_data( 818 old_apev2.read, 819 new_apev2.write, 820 getsize(self.filename) - old_tag_size) 821 822 # append new tag to rewritten file 823 with BitstreamWriter(new_apev2, True) as writer: 824 metadata.build(writer) 825 # closing writer closes new_apev2 also 826 else: 827 # no existing metadata, so simply append a fresh tag 828 f.close() 829 with BitstreamWriter(open(self.filename, "ab"), True) as writer: 830 metadata.build(writer) 831 832 def set_metadata(self, metadata): 833 """takes a MetaData object and sets this track's metadata 834 835 raises IOError if unable to write the file""" 836 837 if metadata is None: 838 return self.delete_metadata() 839 840 from audiotools.bitstream import BitstreamWriter 841 842 old_metadata = self.get_metadata() 843 new_metadata = ApeTag.converted(metadata) 844 845 if old_metadata is not None: 846 # transfer ReplayGain tags from old metadata to new metadata 847 for tag in [b"replaygain_track_gain", 848 b"replaygain_track_peak", 849 b"replaygain_album_gain", 850 b"replaygain_album_peak"]: 851 try: 852 # if old_metadata has tag, shift it over 853 new_metadata[tag] = old_metadata[tag] 854 except KeyError: 855 try: 856 # otherwise, if new_metadata has tag, delete it 857 del(new_metadata[tag]) 858 except KeyError: 859 # if neither has tag, ignore it 860 continue 861 862 # transfer Cuesheet from old metadata to new metadata 863 if b"Cuesheet" in old_metadata: 864 new_metadata[b"Cuesheet"] = old_metadata[b"Cuesheet"] 865 elif b"Cuesheet" in new_metadata: 866 del(new_metadata[b"Cuesheet"]) 867 868 self.update_metadata(new_metadata) 869 else: 870 # delete ReplayGain tags from new metadata 871 for tag in [b"replaygain_track_gain", 872 b"replaygain_track_peak", 873 b"replaygain_album_gain", 874 b"replaygain_album_peak"]: 875 try: 876 del(new_metadata[tag]) 877 except KeyError: 878 continue 879 880 # delete Cuesheet from new metadata 881 if b"Cuesheet" in new_metadata: 882 del(new_metadata[b"Cuesheet"]) 883 884 # no existing metadata, so simply append a fresh tag 885 with BitstreamWriter(open(self.filename, "ab"), True) as writer: 886 new_metadata.build(writer) 887 888 def delete_metadata(self): 889 """deletes the track's MetaData 890 891 raises IOError if unable to write the file""" 892 893 if ((self.get_replay_gain() is not None) or 894 (self.get_cuesheet() is not None)): 895 # non-textual metadata is present and needs preserving 896 self.set_metadata(MetaData()) 897 else: 898 # no non-textual metadata, so wipe out the entire block 899 from os import access, R_OK, W_OK 900 from audiotools.bitstream import BitstreamReader 901 from audiotools import transfer_data 902 903 if not access(self.filename, R_OK | W_OK): 904 raise IOError(self.filename) 905 906 with open(self.filename, "rb") as f: 907 f.seek(-32, 2) 908 909 (preamble, 910 version, 911 tag_size, 912 item_count, 913 read_only, 914 item_encoding, 915 is_header, 916 no_footer, 917 has_header) = BitstreamReader(f, True).parse( 918 ApeTag.HEADER_FORMAT) 919 920 if (preamble == b'APETAGEX') and (version == 2000): 921 from audiotools import TemporaryFile 922 from os.path import getsize 923 924 # there's existing metadata to delete 925 # so rewrite file without trailing metadata tag 926 if has_header: 927 old_tag_size = 32 + tag_size 928 else: 929 old_tag_size = tag_size 930 931 # copy everything but the last "old_tag_size" bytes 932 # from existing file to rewritten file 933 new_apev2 = TemporaryFile(self.filename) 934 old_apev2 = open(self.filename, "rb") 935 936 limited_transfer_data( 937 old_apev2.read, 938 new_apev2.write, 939 getsize(self.filename) - old_tag_size) 940 941 old_apev2.close() 942 new_apev2.close() 943 944 945class ApeGainedAudio(object): 946 @classmethod 947 def supports_replay_gain(cls): 948 """returns True if this class supports ReplayGain""" 949 950 return True 951 952 def get_replay_gain(self): 953 """returns a ReplayGain object of our ReplayGain values 954 955 returns None if we have no values""" 956 957 from audiotools import ReplayGain 958 959 metadata = self.get_metadata() 960 if metadata is None: 961 return None 962 963 if ({b'replaygain_track_gain', b'replaygain_track_peak', 964 b'replaygain_album_gain', b'replaygain_album_peak'}.issubset( 965 metadata.keys())): # we have ReplayGain data 966 try: 967 return ReplayGain( 968 metadata[ 969 b'replaygain_track_gain'].__unicode__()[0:-len(" dB")], 970 metadata[ 971 b'replaygain_track_peak'].__unicode__(), 972 metadata[ 973 b'replaygain_album_gain'].__unicode__()[0:-len(" dB")], 974 metadata[ 975 b'replaygain_album_peak'].__unicode__()) 976 except ValueError: 977 return None 978 else: 979 return None 980 981 def set_replay_gain(self, replaygain): 982 """given a ReplayGain object, sets the track's gain to those values 983 984 may raise IOError if unable to read or write the file""" 985 986 if replaygain is None: 987 return self.delete_replay_gain() 988 989 metadata = self.get_metadata() 990 if metadata is None: 991 metadata = ApeTag([]) 992 993 metadata[b"replaygain_track_gain"] = ApeTagItem.string( 994 b"replaygain_track_gain", 995 u"%+1.2f dB" % (replaygain.track_gain)) 996 metadata[b"replaygain_track_peak"] = ApeTagItem.string( 997 b"replaygain_track_peak", 998 u"%1.6f" % (replaygain.track_peak)) 999 metadata[b"replaygain_album_gain"] = ApeTagItem.string( 1000 b"replaygain_album_gain", 1001 u"%+1.2f dB" % (replaygain.album_gain)) 1002 metadata[b"replaygain_album_peak"] = ApeTagItem.string( 1003 b"replaygain_album_peak", 1004 u"%1.6f" % (replaygain.album_peak)) 1005 1006 self.update_metadata(metadata) 1007 1008 def delete_replay_gain(self): 1009 """removes ReplayGain values from file, if any 1010 1011 may raise IOError if unable to modify the file""" 1012 1013 metadata = self.get_metadata() 1014 if metadata is not None: 1015 for field in [b"replaygain_track_gain", 1016 b"replaygain_track_peak", 1017 b"replaygain_album_gain", 1018 b"replaygain_album_peak"]: 1019 try: 1020 del(metadata[field]) 1021 except KeyError: 1022 pass 1023 1024 self.update_metadata(metadata) 1025 1026 1027class ApeAudio(ApeTaggedAudio, AudioFile): 1028 """a Monkey's Audio file""" 1029 1030 SUFFIX = "ape" 1031 NAME = SUFFIX 1032 DEFAULT_COMPRESSION = "5000" 1033 COMPRESSION_MODES = tuple([str(x * 1000) for x in range(1, 6)]) 1034 BINARIES = ("mac",) 1035 1036 # FILE_HEAD = Con.Struct("ape_head", 1037 # Con.String('id', 4), 1038 # Con.ULInt16('version')) 1039 1040 # #version >= 3.98 1041 # APE_DESCRIPTOR = Con.Struct("ape_descriptor", 1042 # Con.ULInt16('padding'), 1043 # Con.ULInt32('descriptor_bytes'), 1044 # Con.ULInt32('header_bytes'), 1045 # Con.ULInt32('seektable_bytes'), 1046 # Con.ULInt32('header_data_bytes'), 1047 # Con.ULInt32('frame_data_bytes'), 1048 # Con.ULInt32('frame_data_bytes_high'), 1049 # Con.ULInt32('terminating_data_bytes'), 1050 # Con.String('md5', 16)) 1051 1052 # APE_HEADER = Con.Struct("ape_header", 1053 # Con.ULInt16('compression_level'), 1054 # Con.ULInt16('format_flags'), 1055 # Con.ULInt32('blocks_per_frame'), 1056 # Con.ULInt32('final_frame_blocks'), 1057 # Con.ULInt32('total_frames'), 1058 # Con.ULInt16('bits_per_sample'), 1059 # Con.ULInt16('number_of_channels'), 1060 # Con.ULInt32('sample_rate')) 1061 1062 # #version <= 3.97 1063 # APE_HEADER_OLD = Con.Struct("ape_header_old", 1064 # Con.ULInt16('compression_level'), 1065 # Con.ULInt16('format_flags'), 1066 # Con.ULInt16('number_of_channels'), 1067 # Con.ULInt32('sample_rate'), 1068 # Con.ULInt32('header_bytes'), 1069 # Con.ULInt32('terminating_bytes'), 1070 # Con.ULInt32('total_frames'), 1071 # Con.ULInt32('final_frame_blocks')) 1072 1073 def __init__(self, filename): 1074 """filename is a plain string""" 1075 1076 AudioFile.__init__(self, filename) 1077 1078 (self.__samplespersec__, 1079 self.__channels__, 1080 self.__bitspersample__, 1081 self.__totalsamples__) = ApeAudio.__ape_info__(filename) 1082 1083 @classmethod 1084 def is_type(cls, file): 1085 """returns True if the given file object describes this format 1086 1087 takes a seekable file pointer rewound to the start of the file""" 1088 1089 return file.read(4) == "MAC " 1090 1091 def lossless(self): 1092 """returns True""" 1093 1094 return True 1095 1096 @classmethod 1097 def supports_foreign_riff_chunks(cls): 1098 """returns True""" 1099 1100 return True 1101 1102 def has_foreign_riff_chunks(self): 1103 """returns True""" 1104 1105 # FIXME - this isn't strictly true 1106 # I'll need a way to detect foreign chunks in APE's stream 1107 # without decoding it first, 1108 # but since I'm not supporting APE anyway, I'll take the lazy way out 1109 return True 1110 1111 def bits_per_sample(self): 1112 """returns an integer number of bits-per-sample this track contains""" 1113 1114 return self.__bitspersample__ 1115 1116 def channels(self): 1117 """returns an integer number of channels this track contains""" 1118 1119 return self.__channels__ 1120 1121 def total_frames(self): 1122 """returns the total PCM frames of the track as an integer""" 1123 1124 return self.__totalsamples__ 1125 1126 def sample_rate(self): 1127 """returns the rate of the track's audio as an integer number of Hz""" 1128 1129 return self.__samplespersec__ 1130 1131 @classmethod 1132 def __ape_info__(cls, filename): 1133 f = open(filename, 'rb') 1134 try: 1135 file_head = cls.FILE_HEAD.parse_stream(f) 1136 1137 if file_head.id != 'MAC ': 1138 from audiotools.text import ERR_APE_INVALID_HEADER 1139 raise InvalidFile(ERR_APE_INVALID_HEADER) 1140 1141 if file_head.version >= 3980: # the latest APE file type 1142 descriptor = cls.APE_DESCRIPTOR.parse_stream(f) 1143 header = cls.APE_HEADER.parse_stream(f) 1144 1145 return (header.sample_rate, 1146 header.number_of_channels, 1147 header.bits_per_sample, 1148 ((header.total_frames - 1) * 1149 header.blocks_per_frame) + 1150 header.final_frame_blocks) 1151 else: # old-style APE file (obsolete) 1152 header = cls.APE_HEADER_OLD.parse_stream(f) 1153 1154 if file_head.version >= 3950: 1155 blocks_per_frame = 0x48000 1156 elif ((file_head.version >= 3900) or 1157 ((file_head.version >= 3800) and 1158 (header.compression_level == 4000))): 1159 blocks_per_frame = 0x12000 1160 else: 1161 blocks_per_frame = 0x2400 1162 1163 if header.format_flags & 0x01: 1164 bits_per_sample = 8 1165 elif header.format_flags & 0x08: 1166 bits_per_sample = 24 1167 else: 1168 bits_per_sample = 16 1169 1170 return (header.sample_rate, 1171 header.number_of_channels, 1172 bits_per_sample, 1173 ((header.total_frames - 1) * 1174 blocks_per_frame) + 1175 header.final_frame_blocks) 1176 1177 finally: 1178 f.close() 1179 1180 def to_wave(self, wave_filename): 1181 """writes the contents of this file to the given .wav filename string 1182 1183 raises EncodingError if some error occurs during decoding""" 1184 1185 from audiotools import BIN 1186 from audiotools import transfer_data 1187 import subprocess 1188 import os 1189 1190 if self.filename.endswith(".ape"): 1191 devnull = open(os.devnull, "wb") 1192 sub = subprocess.Popen([BIN['mac'], 1193 self.filename, 1194 wave_filename, 1195 '-d'], 1196 stdout=devnull, 1197 stderr=devnull) 1198 sub.wait() 1199 devnull.close() 1200 else: 1201 devnull = open(os.devnull, 'ab') 1202 import tempfile 1203 ape = tempfile.NamedTemporaryFile(suffix='.ape') 1204 f = open(self.filename, 'rb') 1205 transfer_data(f.read, ape.write) 1206 f.close() 1207 ape.flush() 1208 sub = subprocess.Popen([BIN['mac'], 1209 ape.name, 1210 wave_filename, 1211 '-d'], 1212 stdout=devnull, 1213 stderr=devnull) 1214 sub.wait() 1215 ape.close() 1216 devnull.close() 1217 1218 @classmethod 1219 def from_wave(cls, filename, wave_filename, compression=None): 1220 """encodes a new AudioFile from an existing .wav file 1221 1222 takes a filename string, wave_filename string 1223 of an existing WaveAudio file 1224 and an optional compression level string 1225 encodes a new audio file from the wave's data 1226 at the given filename with the specified compression level 1227 and returns a new ApeAudio object""" 1228 1229 from audiotools import BIN 1230 import subprocess 1231 import os 1232 1233 if str(compression) not in cls.COMPRESSION_MODES: 1234 compression = cls.DEFAULT_COMPRESSION 1235 1236 devnull = open(os.devnull, "wb") 1237 sub = subprocess.Popen([BIN['mac'], 1238 wave_filename, 1239 filename, 1240 "-c%s" % (compression)], 1241 stdout=devnull, 1242 stderr=devnull) 1243 sub.wait() 1244 devnull.close() 1245 return ApeAudio(filename) 1246