1#!/usr/bin/python 2 3# Audio Tools, a module and set of tools for manipulating audio data 4# Copyright (C) 2007-2014 Brian Langenberger 5 6# This program is free software; you can redistribute it and/or modify 7# it under the terms of the GNU General Public License as published by 8# the Free Software Foundation; either version 2 of the License, or 9# (at your option) any later version. 10 11# This program is distributed in the hope that it will be useful, 12# but WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14# GNU General Public License for more details. 15 16# You should have received a copy of the GNU General Public License 17# along with this program; if not, write to the Free Software 18# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 19 20 21from audiotools import (AudioFile, MetaData, InvalidFile, Image, 22 WaveContainer, AiffContainer, 23 Sheet, SheetTrack, SheetIndex) 24from audiotools.vorbiscomment import VorbisComment 25from audiotools.id3 import skip_id3v2_comment 26 27 28class InvalidFLAC(InvalidFile): 29 pass 30 31 32class FlacMetaDataBlockTooLarge(Exception): 33 """raised if one attempts to build a FlacMetaDataBlock too large""" 34 35 pass 36 37 38class FlacMetaData(MetaData): 39 """a class for managing a native FLAC's metadata""" 40 41 def __init__(self, blocks): 42 MetaData.__setattr__(self, "block_list", list(blocks)) 43 44 def has_block(self, block_id): 45 """returns True if the given block ID is present""" 46 47 return block_id in (b.BLOCK_ID for b in self.block_list) 48 49 def add_block(self, block): 50 """adds the given block to our list of blocks""" 51 52 # the specification only requires that STREAMINFO be first 53 # the rest are largely arbitrary, 54 # though I like to keep PADDING as the last block for aesthetic reasons 55 PREFERRED_ORDER = [Flac_STREAMINFO.BLOCK_ID, 56 Flac_SEEKTABLE.BLOCK_ID, 57 Flac_CUESHEET.BLOCK_ID, 58 Flac_VORBISCOMMENT.BLOCK_ID, 59 Flac_PICTURE.BLOCK_ID, 60 Flac_APPLICATION.BLOCK_ID, 61 Flac_PADDING.BLOCK_ID] 62 63 stop_blocks = set( 64 PREFERRED_ORDER[PREFERRED_ORDER.index(block.BLOCK_ID) + 1:]) 65 66 for (index, old_block) in enumerate(self.block_list): 67 if old_block.BLOCK_ID in stop_blocks: 68 self.block_list.insert(index, block) 69 break 70 else: 71 self.block_list.append(block) 72 73 def get_block(self, block_id): 74 """returns the first instance of the given block_id 75 76 may raise IndexError if the block is not in our list of blocks""" 77 78 for block in self.block_list: 79 if block.BLOCK_ID == block_id: 80 return block 81 else: 82 raise IndexError() 83 84 def get_blocks(self, block_id): 85 """returns all instances of the given block_id in our list of blocks""" 86 87 return [b for b in self.block_list if (b.BLOCK_ID == block_id)] 88 89 def replace_blocks(self, block_id, blocks): 90 """replaces all instances of the given block_id with 91 blocks taken from the given list 92 93 if insufficient matching blocks are present, 94 this uses add_block() to populate the remainder 95 96 if additional matching blocks are present, 97 they are removed 98 """ 99 100 new_blocks = [] 101 102 for block in self.block_list: 103 if block.BLOCK_ID == block_id: 104 if len(blocks) > 0: 105 new_blocks.append(blocks.pop(0)) 106 else: 107 pass 108 else: 109 new_blocks.append(block) 110 111 self.block_list = new_blocks 112 113 while len(blocks) > 0: 114 self.add_block(blocks.pop(0)) 115 116 def __setattr__(self, attr, value): 117 if attr in self.FIELDS: 118 try: 119 vorbis_comment = self.get_block(Flac_VORBISCOMMENT.BLOCK_ID) 120 except IndexError: 121 # add VORBIS comment block if necessary 122 from audiotools import VERSION 123 124 vorbis_comment = Flac_VORBISCOMMENT( 125 [], u"Python Audio Tools %s" % (VERSION)) 126 127 self.add_block(vorbis_comment) 128 129 setattr(vorbis_comment, attr, value) 130 else: 131 MetaData.__setattr__(self, attr, value) 132 133 def __getattr__(self, attr): 134 if attr in self.FIELDS: 135 try: 136 return getattr(self.get_block(Flac_VORBISCOMMENT.BLOCK_ID), 137 attr) 138 except IndexError: 139 # no VORBIS comment block, so all values are None 140 return None 141 else: 142 return MetaData.__getattribute__(self, attr) 143 144 def __delattr__(self, attr): 145 if attr in self.FIELDS: 146 try: 147 delattr(self.get_block(Flac_VORBISCOMMENT.BLOCK_ID), attr) 148 except IndexError: 149 # no VORBIS comment block, so nothing to delete 150 pass 151 else: 152 MetaData.__delattr__(self, attr) 153 154 @classmethod 155 def converted(cls, metadata): 156 """takes a MetaData object and returns a FlacMetaData object""" 157 158 if metadata is None: 159 return None 160 elif isinstance(metadata, FlacMetaData): 161 return cls([block.copy() for block in metadata.block_list]) 162 else: 163 return cls([Flac_VORBISCOMMENT.converted(metadata)] + 164 [Flac_PICTURE.converted(image) 165 for image in metadata.images()] + 166 [Flac_PADDING(4096)]) 167 168 def add_image(self, image): 169 """embeds an Image object in this metadata""" 170 171 self.add_block(Flac_PICTURE.converted(image)) 172 173 def delete_image(self, image): 174 """deletes an image object from this metadata""" 175 176 self.block_list = [b for b in self.block_list 177 if not ((b.BLOCK_ID == Flac_PICTURE.BLOCK_ID) and 178 (b == image))] 179 180 def images(self): 181 """returns a list of embedded Image objects""" 182 183 return self.get_blocks(Flac_PICTURE.BLOCK_ID) 184 185 @classmethod 186 def supports_images(cls): 187 """returns True""" 188 189 return True 190 191 def clean(self): 192 """returns (FlacMetaData, [fixes]) tuple 193 194 where FlacMetaData is a new MetaData object fixed of problems 195 and fixes is a list of Unicode strings of fixes performed 196 """ 197 198 from audiotools.text import (CLEAN_FLAC_REORDERED_STREAMINFO, 199 CLEAN_FLAC_MULITPLE_STREAMINFO, 200 CLEAN_FLAC_MULTIPLE_VORBISCOMMENT, 201 CLEAN_FLAC_MULTIPLE_SEEKTABLE, 202 CLEAN_FLAC_MULTIPLE_CUESHEET, 203 CLEAN_FLAC_UNDEFINED_BLOCK) 204 205 fixes_performed = [] 206 cleaned_blocks = [] 207 208 for block in self.block_list: 209 if block.BLOCK_ID == Flac_STREAMINFO.BLOCK_ID: 210 # reorder STREAMINFO block to be first, if necessary 211 if len(cleaned_blocks) == 0: 212 cleaned_blocks.append(block) 213 elif cleaned_blocks[0].BLOCK_ID != block.BLOCK_ID: 214 fixes_performed.append( 215 CLEAN_FLAC_REORDERED_STREAMINFO) 216 cleaned_blocks.insert(0, block) 217 else: 218 fixes_performed.append( 219 CLEAN_FLAC_MULITPLE_STREAMINFO) 220 elif block.BLOCK_ID == Flac_VORBISCOMMENT.BLOCK_ID: 221 if block.BLOCK_ID in [b.BLOCK_ID for b in cleaned_blocks]: 222 # remove redundant VORBIS_COMMENT blocks 223 fixes_performed.append( 224 CLEAN_FLAC_MULTIPLE_VORBISCOMMENT) 225 else: 226 # recursively clean up the text fields in FlacVorbisComment 227 (block, block_fixes) = block.clean() 228 cleaned_blocks.append(block) 229 fixes_performed.extend(block_fixes) 230 elif block.BLOCK_ID == Flac_PICTURE.BLOCK_ID: 231 # recursively clean up any image blocks 232 (block, block_fixes) = block.clean() 233 cleaned_blocks.append(block) 234 fixes_performed.extend(block_fixes) 235 elif block.BLOCK_ID == Flac_APPLICATION.BLOCK_ID: 236 cleaned_blocks.append(block) 237 elif block.BLOCK_ID == Flac_SEEKTABLE.BLOCK_ID: 238 # remove redundant seektable, if necessary 239 if block.BLOCK_ID in [b.BLOCK_ID for b in cleaned_blocks]: 240 fixes_performed.append( 241 CLEAN_FLAC_MULTIPLE_SEEKTABLE) 242 else: 243 (block, block_fixes) = block.clean() 244 cleaned_blocks.append(block) 245 fixes_performed.extend(block_fixes) 246 elif block.BLOCK_ID == Flac_CUESHEET.BLOCK_ID: 247 # remove redundant cuesheet, if necessary 248 if block.BLOCK_ID in [b.BLOCK_ID for b in cleaned_blocks]: 249 fixes_performed.append( 250 CLEAN_FLAC_MULTIPLE_CUESHEET) 251 else: 252 cleaned_blocks.append(block) 253 elif block.BLOCK_ID == Flac_PADDING.BLOCK_ID: 254 cleaned_blocks.append(block) 255 else: 256 # remove undefined blocks 257 fixes_performed.append(CLEAN_FLAC_UNDEFINED_BLOCK) 258 259 return (self.__class__(cleaned_blocks), fixes_performed) 260 261 def __repr__(self): 262 return "FlacMetaData(%s)" % (self.block_list) 263 264 @classmethod 265 def parse(cls, reader): 266 """returns a FlacMetaData object from the given BitstreamReader 267 which has already parsed the 4-byte 'fLaC' file ID""" 268 269 block_list = [] 270 271 last = 0 272 273 while last != 1: 274 (last, block_type, block_length) = reader.parse("1u7u24u") 275 276 if block_type == 0: # STREAMINFO 277 block_list.append( 278 Flac_STREAMINFO.parse(reader)) 279 elif block_type == 1: # PADDING 280 block_list.append( 281 Flac_PADDING.parse(reader, block_length)) 282 elif block_type == 2: # APPLICATION 283 block_list.append( 284 Flac_APPLICATION.parse(reader, block_length)) 285 elif block_type == 3: # SEEKTABLE 286 block_list.append( 287 Flac_SEEKTABLE.parse(reader, block_length // 18)) 288 elif block_type == 4: # VORBIS_COMMENT 289 block_list.append( 290 Flac_VORBISCOMMENT.parse(reader)) 291 elif block_type == 5: # CUESHEET 292 block_list.append( 293 Flac_CUESHEET.parse(reader)) 294 elif block_type == 6: # PICTURE 295 block_list.append( 296 Flac_PICTURE.parse(reader)) 297 elif (block_type >= 7) and (block_type <= 126): 298 from audiotools.text import ERR_FLAC_RESERVED_BLOCK 299 raise ValueError(ERR_FLAC_RESERVED_BLOCK % (block_type)) 300 else: 301 from audiotools.text import ERR_FLAC_INVALID_BLOCK 302 raise ValueError(ERR_FLAC_INVALID_BLOCK) 303 304 return cls(block_list) 305 306 def raw_info(self): 307 """returns human-readable metadata as a unicode string""" 308 309 from os import linesep 310 311 return linesep.join( 312 [u"FLAC Tags:"] + [block.raw_info() for block in self.blocks()]) 313 314 def blocks(self): 315 """yields FlacMetaData's individual metadata blocks""" 316 317 for block in self.block_list: 318 yield block 319 320 def build(self, writer): 321 """writes the FlacMetaData to the given BitstreamWriter 322 not including the 4-byte 'fLaC' file ID""" 323 324 from audiotools import iter_last 325 326 for (last_block, 327 block) in iter_last(iter([b for b in self.blocks() 328 if (b.size() < (2 ** 24))])): 329 if not last_block: 330 writer.build("1u7u24u", (0, block.BLOCK_ID, block.size())) 331 else: 332 writer.build("1u7u24u", (1, block.BLOCK_ID, block.size())) 333 334 block.build(writer) 335 336 def size(self): 337 """returns the size of all metadata blocks 338 including the block headers 339 but not including the 4-byte 'fLaC' file ID""" 340 341 return sum(4 + b.size() for b in self.block_list) 342 343 344class Flac_STREAMINFO(object): 345 BLOCK_ID = 0 346 347 def __init__(self, minimum_block_size, maximum_block_size, 348 minimum_frame_size, maximum_frame_size, 349 sample_rate, channels, bits_per_sample, 350 total_samples, md5sum): 351 """all values are non-negative integers except for md5sum 352 which is a 16-byte binary string""" 353 354 self.minimum_block_size = minimum_block_size 355 self.maximum_block_size = maximum_block_size 356 self.minimum_frame_size = minimum_frame_size 357 self.maximum_frame_size = maximum_frame_size 358 self.sample_rate = sample_rate 359 self.channels = channels 360 self.bits_per_sample = bits_per_sample 361 self.total_samples = total_samples 362 self.md5sum = md5sum 363 364 def copy(self): 365 """returns a duplicate of this metadata block""" 366 367 return Flac_STREAMINFO(self.minimum_block_size, 368 self.maximum_block_size, 369 self.minimum_frame_size, 370 self.maximum_frame_size, 371 self.sample_rate, 372 self.channels, 373 self.bits_per_sample, 374 self.total_samples, 375 self.md5sum) 376 377 def __eq__(self, block): 378 for attr in ["minimum_block_size", 379 "maximum_block_size", 380 "minimum_frame_size", 381 "maximum_frame_size", 382 "sample_rate", 383 "channels", 384 "bits_per_sample", 385 "total_samples", 386 "md5sum"]: 387 if ((not hasattr(block, attr)) or (getattr(self, attr) != 388 getattr(block, attr))): 389 return False 390 else: 391 return True 392 393 def __repr__(self): 394 return ("Flac_STREAMINFO(%s)" % 395 ",".join(["%s=%s" % (key, repr(getattr(self, key))) 396 for key in ["minimum_block_size", 397 "maximum_block_size", 398 "minimum_frame_size", 399 "maximum_frame_size", 400 "sample_rate", 401 "channels", 402 "bits_per_sample", 403 "total_samples", 404 "md5sum"]])) 405 406 def raw_info(self): 407 """returns a human-readable version of this metadata block 408 as unicode""" 409 410 from audiotools import hex_string 411 from os import linesep 412 413 return linesep.join( 414 [u" STREAMINFO:", 415 u" minimum block size = %d" % (self.minimum_block_size), 416 u" maximum block size = %d" % (self.maximum_block_size), 417 u" minimum frame size = %d" % (self.minimum_frame_size), 418 u" maximum frame size = %d" % (self.maximum_frame_size), 419 u" sample rate = %d" % (self.sample_rate), 420 u" channels = %d" % (self.channels), 421 u" bits-per-sample = %d" % (self.bits_per_sample), 422 u" total samples = %d" % (self.total_samples), 423 u" MD5 sum = %s" % (hex_string(self.md5sum))]) 424 425 @classmethod 426 def parse(cls, reader): 427 """returns this metadata block from a BitstreamReader""" 428 429 values = reader.parse("16u16u24u24u20u3u5u36U16b") 430 values[5] += 1 # channels 431 values[6] += 1 # bits-per-sample 432 return cls(*values) 433 434 def build(self, writer): 435 """writes this metadata block to a BitstreamWriter""" 436 437 writer.build("16u16u24u24u20u3u5u36U16b", 438 (self.minimum_block_size, 439 self.maximum_block_size, 440 self.minimum_frame_size, 441 self.maximum_frame_size, 442 self.sample_rate, 443 self.channels - 1, 444 self.bits_per_sample - 1, 445 self.total_samples, 446 self.md5sum)) 447 448 def size(self): 449 """the size of this metadata block 450 not including the 4-byte block header""" 451 452 return 34 453 454 455class Flac_PADDING(object): 456 BLOCK_ID = 1 457 458 def __init__(self, length): 459 self.length = length 460 461 def copy(self): 462 """returns a duplicate of this metadata block""" 463 464 return Flac_PADDING(self.length) 465 466 def __repr__(self): 467 return "Flac_PADDING(%d)" % (self.length) 468 469 def raw_info(self): 470 """returns a human-readable version of this metadata block 471 as unicode""" 472 473 from os import linesep 474 475 return linesep.join( 476 [u" PADDING:", 477 u" length = %d" % (self.length)]) 478 479 @classmethod 480 def parse(cls, reader, block_length): 481 """returns this metadata block from a BitstreamReader""" 482 483 reader.skip_bytes(block_length) 484 return cls(length=block_length) 485 486 def build(self, writer): 487 """writes this metadata block to a BitstreamWriter""" 488 489 writer.write_bytes(b"\x00" * self.length) 490 491 def size(self): 492 """the size of this metadata block 493 not including the 4-byte block header""" 494 495 return self.length 496 497 498class Flac_APPLICATION(object): 499 BLOCK_ID = 2 500 501 def __init__(self, application_id, data): 502 self.application_id = application_id 503 self.data = data 504 505 def __eq__(self, block): 506 for attr in ["application_id", "data"]: 507 if ((not hasattr(block, attr)) or (getattr(self, attr) != 508 getattr(block, attr))): 509 return False 510 else: 511 return True 512 513 def copy(self): 514 """returns a duplicate of this metadata block""" 515 516 return Flac_APPLICATION(self.application_id, 517 self.data) 518 519 def __repr__(self): 520 return "Flac_APPLICATION(%s, %s)" % (repr(self.application_id), 521 repr(self.data)) 522 523 def raw_info(self): 524 """returns a human-readable version of this metadata block 525 as unicode""" 526 527 from os import linesep 528 529 return u" APPLICATION:%s %s (%d bytes)" % \ 530 (linesep, 531 self.application_id.decode('ascii'), 532 len(self.data)) 533 534 @classmethod 535 def parse(cls, reader, block_length): 536 """returns this metadata block from a BitstreamReader""" 537 538 return cls(application_id=reader.read_bytes(4), 539 data=reader.read_bytes(block_length - 4)) 540 541 def build(self, writer): 542 """writes this metadata block to a BitstreamWriter""" 543 544 writer.write_bytes(self.application_id) 545 writer.write_bytes(self.data) 546 547 def size(self): 548 """the size of this metadata block 549 not including the 4-byte block header""" 550 551 return len(self.application_id) + len(self.data) 552 553 554class Flac_SEEKTABLE(object): 555 BLOCK_ID = 3 556 557 def __init__(self, seekpoints): 558 """seekpoints is a list of 559 (PCM frame offset, byte offset, PCM frame count) tuples""" 560 self.seekpoints = seekpoints 561 562 def __eq__(self, block): 563 if hasattr(block, "seekpoints"): 564 return self.seekpoints == block.seekpoints 565 else: 566 return False 567 568 def copy(self): 569 """returns a duplicate of this metadata block""" 570 571 return Flac_SEEKTABLE(self.seekpoints[:]) 572 573 def __repr__(self): 574 return "Flac_SEEKTABLE(%s)" % (repr(self.seekpoints)) 575 576 def raw_info(self): 577 """returns a human-readable version of this metadata block 578 as unicode""" 579 580 from os import linesep 581 582 return linesep.join( 583 [u" SEEKTABLE:", 584 u" first sample file offset frame samples"] + 585 [u" %14.1d %13.1X %15.d" % seekpoint 586 for seekpoint in self.seekpoints]) 587 588 @classmethod 589 def parse(cls, reader, total_seekpoints): 590 """returns this metadata block from a BitstreamReader""" 591 592 return cls([tuple(reader.parse("64U64U16u")) 593 for i in range(total_seekpoints)]) 594 595 def build(self, writer): 596 """writes this metadata block to a BitstreamWriter""" 597 598 for seekpoint in self.seekpoints: 599 writer.build("64U64U16u", seekpoint) 600 601 def size(self): 602 """the size of this metadata block 603 not including the 4-byte block header""" 604 605 from audiotools.bitstream import format_size 606 607 return (format_size("64U64U16u") // 8) * len(self.seekpoints) 608 609 def clean(self): 610 """removes any empty seek points 611 and ensures PCM frame offset and byte offset 612 are both incrementing""" 613 614 fixes_performed = [] 615 nonempty_points = [seekpoint for seekpoint in self.seekpoints 616 if (seekpoint[2] != 0)] 617 618 if len(nonempty_points) != len(self.seekpoints): 619 from audiotools.text import CLEAN_FLAC_REMOVE_SEEKPOINTS 620 fixes_performed.append(CLEAN_FLAC_REMOVE_SEEKPOINTS) 621 622 ascending_order = list(set(nonempty_points)) 623 ascending_order.sort() 624 625 if ascending_order != nonempty_points: 626 from audiotools.text import CLEAN_FLAC_REORDER_SEEKPOINTS 627 fixes_performed.append(CLEAN_FLAC_REORDER_SEEKPOINTS) 628 629 return (Flac_SEEKTABLE(ascending_order), fixes_performed) 630 631 632class Flac_VORBISCOMMENT(VorbisComment): 633 BLOCK_ID = 4 634 635 def copy(self): 636 """returns a duplicate of this metadata block""" 637 638 return Flac_VORBISCOMMENT(self.comment_strings[:], 639 self.vendor_string) 640 641 def __repr__(self): 642 return "Flac_VORBISCOMMENT(%s, %s)" % \ 643 (repr(self.comment_strings), repr(self.vendor_string)) 644 645 def raw_info(self): 646 """returns a human-readable version of this metadata block 647 as unicode""" 648 649 from os import linesep 650 from audiotools import output_table 651 652 # align the text strings on the "=" sign, if any 653 654 table = output_table() 655 656 for comment in self.comment_strings: 657 row = table.row() 658 row.add_column(u" " * 4) 659 if u"=" in comment: 660 (tag, value) = comment.split(u"=", 1) 661 row.add_column(tag, "right") 662 row.add_column(u"=") 663 row.add_column(value) 664 else: 665 row.add_column(comment) 666 row.add_column(u"") 667 row.add_column(u"") 668 669 return (u" VORBIS_COMMENT:" + linesep + 670 u" %s" % (self.vendor_string) + linesep + 671 linesep.join(table.format())) 672 673 @classmethod 674 def converted(cls, metadata): 675 """converts a MetaData object to a Flac_VORBISCOMMENT object""" 676 677 if (metadata is None) or (isinstance(metadata, Flac_VORBISCOMMENT)): 678 return metadata 679 else: 680 # make VorbisComment do all the work, 681 # then lift its data into a new Flac_VORBISCOMMENT 682 metadata = VorbisComment.converted(metadata) 683 return cls(metadata.comment_strings, 684 metadata.vendor_string) 685 686 @classmethod 687 def parse(cls, reader): 688 """returns this metadata block from a BitstreamReader""" 689 690 reader.set_endianness(True) 691 try: 692 vendor_string = \ 693 reader.read_bytes(reader.read(32)).decode('utf-8', 'replace') 694 695 return cls([reader.read_bytes(reader.read(32)).decode('utf-8', 696 'replace') 697 for i in range(reader.read(32))], 698 vendor_string) 699 finally: 700 reader.set_endianness(False) 701 702 def build(self, writer): 703 """writes this metadata block to a BitstreamWriter""" 704 705 writer.set_endianness(True) 706 try: 707 vendor_string = self.vendor_string.encode('utf-8') 708 writer.write(32, len(vendor_string)) 709 writer.write_bytes(vendor_string) 710 writer.write(32, len(self.comment_strings)) 711 for comment_string in self.comment_strings: 712 comment_string = comment_string.encode('utf-8') 713 writer.write(32, len(comment_string)) 714 writer.write_bytes(comment_string) 715 finally: 716 writer.set_endianness(False) 717 718 def size(self): 719 """the size of this metadata block 720 not including the 4-byte block header""" 721 722 return (4 + len(self.vendor_string.encode('utf-8')) + 723 4 + 724 sum(4 + len(comment.encode('utf-8')) 725 for comment in self.comment_strings)) 726 727 728class Flac_CUESHEET(Sheet): 729 BLOCK_ID = 5 730 731 def __init__(self, catalog_number, lead_in_samples, is_cdda, tracks): 732 """catalog_number is a 128 byte ASCII string, padded with NULLs 733 lead_in_samples is typically 2 seconds of samples 734 is_cdda is 1 if audio if from CDDA, 0 otherwise 735 tracks is a list of Flac_CHESHEET_track objects""" 736 737 assert(isinstance(catalog_number, bytes)) 738 assert(isinstance(lead_in_samples, int) or 739 isinstance(lead_in_samples, long)) 740 assert(is_cdda in {1, 0}) 741 742 self.__catalog_number__ = catalog_number 743 self.__lead_in_samples__ = lead_in_samples 744 self.__is_cdda__ = is_cdda 745 self.__tracks__ = tracks 746 747 def copy(self): 748 """returns a duplicate of this metadata block""" 749 750 return Flac_CUESHEET(self.__catalog_number__, 751 self.__lead_in_samples__, 752 self.__is_cdda__, 753 [track.copy() for track in self.__tracks__]) 754 755 def __eq__(self, cuesheet): 756 if isinstance(cuesheet, Flac_CUESHEET): 757 return ((self.__catalog_number__ == 758 cuesheet.__catalog_number__) and 759 (self.__lead_in_samples__ == 760 cuesheet.__lead_in_samples__) and 761 (self.__is_cdda__ == cuesheet.__is_cdda__) and 762 (self.__tracks__ == cuesheet.__tracks__)) 763 else: 764 return Sheet.__eq__(self, cuesheet) 765 766 def __repr__(self): 767 return ("Flac_CUESHEET(%s)" % 768 ",".join(["%s=%s" % (key, 769 repr(getattr(self, "__" + key + "__"))) 770 for key in ["catalog_number", 771 "lead_in_samples", 772 "is_cdda", 773 "tracks"]])) 774 775 def raw_info(self): 776 """returns a human-readable version of this metadata block 777 as unicode""" 778 779 from os import linesep 780 781 return linesep.join( 782 [u" CUESHEET:", 783 u" catalog number = %s" % 784 (self.__catalog_number__.decode('ascii', 'replace')), 785 u" lead-in samples = %d" % (self.__lead_in_samples__), 786 u" is CDDA = %d" % (self.__is_cdda__)] + 787 [track.raw_info(4) for track in self.__tracks__]) 788 789 @classmethod 790 def parse(cls, reader): 791 """returns this metadata block from a BitstreamReader""" 792 793 (catalog_number, 794 lead_in_samples, 795 is_cdda, 796 track_count) = reader.parse("128b64U1u2071p8u") 797 return cls(catalog_number, 798 lead_in_samples, 799 is_cdda, 800 [Flac_CUESHEET_track.parse(reader) 801 for i in range(track_count)]) 802 803 def build(self, writer): 804 """writes this metadata block to a BitstreamWriter""" 805 806 writer.build("128b64U1u2071p8u", 807 (self.__catalog_number__, 808 self.__lead_in_samples__, 809 self.__is_cdda__, 810 len(self.__tracks__))) 811 for track in self.__tracks__: 812 track.build(writer) 813 814 def size(self): 815 """the size of this metadata block 816 not including the 4-byte block header""" 817 818 return (396 + # format_size("128b64U1u2071p8u") // 8 819 sum(t.size() for t in self.__tracks__)) 820 821 def __len__(self): 822 # don't include lead-out track 823 return len(self.__tracks__) - 1 824 825 def __getitem__(self, index): 826 # don't include lead-out track 827 return self.__tracks__[0:-1][index] 828 829 def track_length(self, track_number): 830 """given a track_number (typically starting from 1) 831 returns the length of the track as a Fraction number of seconds 832 or None if the length is to the remainder of the stream 833 (typically for the last track in the album) 834 835 may raise KeyError if the track is not found""" 836 837 initial_track = self.track(track_number) 838 if (track_number + 1) in self.track_numbers(): 839 next_track = self.track(track_number + 1) 840 return (next_track.index(1).offset() - 841 initial_track.index(1).offset()) 842 else: 843 # getting track length of final track 844 845 from fractions import Fraction 846 847 lead_out_track = self.__tracks__[-1] 848 final_index = initial_track.index(1) 849 return (Fraction(lead_out_track.__offset__, 850 final_index.__sample_rate__) - 851 final_index.offset()) 852 853 def get_metadata(self): 854 """returns MetaData of Sheet, or None 855 this metadata often contains information such as catalog number 856 or CD-TEXT values""" 857 858 catalog = self.__catalog_number__.rstrip(b"\x00") 859 if len(catalog) > 0: 860 from audiotools import MetaData 861 862 return MetaData(catalog=catalog.decode("ascii", "replace")) 863 else: 864 return None 865 866 def set_track(self, audiofile): 867 """sets the AudioFile this cuesheet belongs to 868 869 this is necessary becuase FLAC's CUESHEET block 870 doesn't store the file's sample rate 871 which is needed to convert sample offsets to seconds""" 872 873 for track in self: 874 track.set_track(audiofile) 875 876 @classmethod 877 def converted(cls, sheet, total_pcm_frames, sample_rate, is_cdda=True): 878 """given a Sheet object, total PCM frames, sample rate and 879 optional boolean indicating whether cuesheet is CD audio 880 returns a Flac_CUESHEET object from that data""" 881 882 def pad(u, chars): 883 if u is not None: 884 s = u.encode("ascii", "replace") 885 return s[0:chars] + (b"\x00" * (chars - len(s))) 886 else: 887 return b"\x00" * chars 888 889 metadata = sheet.get_metadata() 890 if (metadata is not None) and (metadata.catalog is not None): 891 catalog_number = pad(metadata.catalog.encode("ascii", "replace"), 892 128) 893 else: 894 catalog_number = b"\x00" * 128 895 896 # assume standard 2 second disc lead-in 897 # and append empty lead-out track 898 return cls(catalog_number=catalog_number, 899 lead_in_samples=sample_rate * 2, 900 is_cdda=(1 if is_cdda else 0), 901 tracks=([Flac_CUESHEET_track.converted(t, sample_rate) 902 for t in sheet] + 903 [Flac_CUESHEET_track(offset=total_pcm_frames, 904 number=170, 905 ISRC=b"\x00" * 12, 906 track_type=0, 907 pre_emphasis=0, 908 index_points=[])])) 909 910 911class Flac_CUESHEET_track(SheetTrack): 912 def __init__(self, offset, number, ISRC, track_type, pre_emphasis, 913 index_points): 914 """offset is the track's first index point's offset 915 from the start of the stream, in PCM frames 916 number is the track number, typically starting from 1 917 ISRC is a 12 byte ASCII string, padded with NULLs 918 track_type is 0 for audio, 1 for non-audio 919 pre_emphasis is 0 for no, 1 for yes 920 index_points is a list of Flac_CUESHEET_index objects""" 921 922 assert(isinstance(offset, int) or isinstance(offset, long)) 923 assert(isinstance(number, int)) 924 assert(isinstance(ISRC, bytes)) 925 assert(track_type in {0, 1}) 926 assert(pre_emphasis in {0, 1}) 927 928 self.__offset__ = offset 929 self.__number__ = number 930 self.__ISRC__ = ISRC 931 self.__track_type__ = track_type 932 self.__pre_emphasis__ = pre_emphasis 933 self.__index_points__ = index_points 934 # the file this track belongs to 935 self.__filename__ = "" 936 937 @classmethod 938 def converted(cls, sheet_track, sample_rate): 939 """given a SheetTrack object and stream's sample rate, 940 returns a Flac_CUESHEET_track object""" 941 942 def pad(u, chars): 943 if u is not None: 944 s = u.encode("ascii", "replace") 945 return s[0:chars] + (b"\x00" * (chars - len(s))) 946 else: 947 return b"\x00" * chars 948 949 if len(sheet_track) > 0: 950 offset = int(sheet_track[0].offset() * sample_rate) 951 else: 952 # track with no index points 953 offset = 0 954 955 metadata = sheet_track.get_metadata() 956 957 if metadata is not None: 958 ISRC = pad(metadata.ISRC, 12) 959 else: 960 ISRC = b"\x00" * 12 961 962 return cls(offset=offset, 963 number=sheet_track.number(), 964 ISRC=ISRC, 965 track_type=(0 if sheet_track.is_audio() else 1), 966 pre_emphasis=(1 if sheet_track.pre_emphasis() else 0), 967 index_points=[Flac_CUESHEET_index.converted( 968 index, offset, sample_rate) for index in sheet_track]) 969 970 def copy(self): 971 """returns a duplicate of this metadata block""" 972 973 return Flac_CUESHEET_track(self.__offset__, 974 self.__number__, 975 self.__ISRC__, 976 self.__track_type__, 977 self.__pre_emphasis__, 978 [index.copy() for index in 979 self.__index_points__]) 980 981 def __repr__(self): 982 return ("Flac_CUESHEET_track(%s)" % 983 ",".join(["%s=%s" % (key, 984 repr(getattr(self, "__" + key + "__"))) 985 for key in ["offset", 986 "number", 987 "ISRC", 988 "track_type", 989 "pre_emphasis", 990 "index_points"]])) 991 992 def raw_info(self, indent): 993 """returns a human-readable version of this track as unicode""" 994 995 from os import linesep 996 997 lines = [((u"track : %(number)3.d " + 998 u"offset : %(offset)9.d " + 999 u"ISRC : %(ISRC)s") % 1000 {"number": self.__number__, 1001 "offset": self.__offset__, 1002 "type": self.__track_type__, 1003 "pre_emphasis": self.__pre_emphasis__, 1004 "ISRC": self.__ISRC__.strip(b"\x00").decode('ascii', 1005 'replace')}) 1006 ] + [i.raw_info(1) for i in self.__index_points__] 1007 1008 return linesep.join( 1009 [u" " * indent + line for line in lines]) 1010 1011 def __eq__(self, track): 1012 if isinstance(track, Flac_CUESHEET_track): 1013 return ((self.__offset__ == track.__offset__) and 1014 (self.__number__ == track.__number__) and 1015 (self.__ISRC__ == track.__ISRC__) and 1016 (self.__track_type__ == track.__track_type__) and 1017 (self.__pre_emphasis__ == track.__pre_emphasis__) and 1018 (self.__index_points__ == track.__index_points__)) 1019 else: 1020 return SheetTrack.__eq__(self, track) 1021 1022 @classmethod 1023 def parse(cls, reader): 1024 """returns this cuesheet track from a BitstreamReader""" 1025 1026 (offset, 1027 number, 1028 ISRC, 1029 track_type, 1030 pre_emphasis, 1031 index_points) = reader.parse("64U8u12b1u1u110p8u") 1032 return cls(offset, number, ISRC, track_type, pre_emphasis, 1033 [Flac_CUESHEET_index.parse(reader, offset) 1034 for i in range(index_points)]) 1035 1036 def build(self, writer): 1037 """writes this cuesheet track to a BitstreamWriter""" 1038 1039 writer.build("64U8u12b1u1u110p8u", 1040 (self.__offset__, 1041 self.__number__, 1042 self.__ISRC__, 1043 self.__track_type__, 1044 self.__pre_emphasis__, 1045 len(self.__index_points__))) 1046 for index_point in self.__index_points__: 1047 index_point.build(writer) 1048 1049 def size(self): 1050 return (36 + # format_size("64U8u12b1u1u110p8u") // 8 1051 sum(i.size() for i in self.__index_points__)) 1052 1053 def __len__(self): 1054 return len(self.__index_points__) 1055 1056 def __getitem__(self, index): 1057 return self.__index_points__[index] 1058 1059 def number(self): 1060 """return SheetTrack's number, starting from 1""" 1061 1062 return self.__number__ 1063 1064 def get_metadata(self): 1065 """returns SheetTrack's MetaData, or None""" 1066 1067 isrc = self.__ISRC__.rstrip(b"\x00") 1068 if len(isrc) > 0: 1069 from audiotools import MetaData 1070 1071 return MetaData(ISRC=isrc.decode("ascii", "replace")) 1072 else: 1073 return None 1074 1075 def filename(self): 1076 """returns SheetTrack's filename as a unicode string""" 1077 1078 from sys import version_info 1079 if version_info[0] >= 3: 1080 return self.__filename__ 1081 else: 1082 return self.__filename__.decode("UTF-8") 1083 1084 def is_audio(self): 1085 """returns whether SheetTrack contains audio data""" 1086 1087 return True 1088 1089 def pre_emphasis(self): 1090 """returns whether SheetTrack has pre-emphasis""" 1091 1092 return self.__pre_emphasis__ == 1 1093 1094 def copy_permitted(self): 1095 """returns whether copying is permitted""" 1096 1097 return False 1098 1099 def set_track(self, audiofile): 1100 """sets this track's source as the given AudioFile""" 1101 1102 from os.path import basename 1103 1104 self.__filename__ = basename(audiofile.filename) 1105 for index in self: 1106 index.set_track(audiofile) 1107 1108 1109class Flac_CUESHEET_index(SheetIndex): 1110 def __init__(self, track_offset, offset, number, sample_rate=44100): 1111 """track_offset is the index's track's offset in PCM frames 1112 1113 offset is the index's offset from the track offset, 1114 in PCM frames 1115 number is the index's number typically starting from 1 1116 (a number of 0 indicates a track pre-gap)""" 1117 1118 self.__track_offset__ = track_offset 1119 self.__offset__ = offset 1120 self.__number__ = number 1121 self.__sample_rate__ = sample_rate 1122 1123 @classmethod 1124 def converted(cls, sheet_index, track_offset, sample_rate): 1125 """given a SheetIndex object, track_offset (in PCM frames) 1126 and sample rate, returns a Flac_CUESHEET_index object""" 1127 1128 return cls(track_offset=track_offset, 1129 offset=((int(sheet_index.offset() * sample_rate)) - 1130 track_offset), 1131 number=sheet_index.number(), 1132 sample_rate=sample_rate) 1133 1134 def copy(self): 1135 """returns a duplicate of this metadata block""" 1136 1137 return Flac_CUESHEET_index(self.__track_offset__, 1138 self.__offset__, 1139 self.__number__, 1140 self.__sample_rate__) 1141 1142 def __repr__(self): 1143 return "Flac_CUESHEET_index(%s, %s, %s, %s)" % \ 1144 (repr(self.__track_offset__), 1145 repr(self.__offset__), 1146 repr(self.__number__), 1147 repr(self.__sample_rate__)) 1148 1149 def __eq__(self, index): 1150 if isinstance(index, Flac_CUESHEET_index): 1151 return ((self.__offset__ == index.__offset__) and 1152 (self.__number__ == index.__number__)) 1153 else: 1154 return SheetIndex.__eq__(self, index) 1155 1156 @classmethod 1157 def parse(cls, reader, track_offset): 1158 """returns this cuesheet index from a BitstreamReader""" 1159 1160 (offset, number) = reader.parse("64U8u24p") 1161 1162 return cls(track_offset=track_offset, 1163 offset=offset, 1164 number=number) 1165 1166 def build(self, writer): 1167 """writes this cuesheet index to a BitstreamWriter""" 1168 1169 writer.build("64U8u24p", (self.__offset__, self.__number__)) 1170 1171 def size(self): 1172 return 12 # format_size("64U8u24p") // 8 1173 1174 def raw_info(self, indent): 1175 return ((u" " * indent) + 1176 u"index : %3.2d offset : %9.9s" % 1177 (self.__number__, u"+%d" % (self.__offset__))) 1178 1179 def number(self): 1180 return self.__number__ 1181 1182 def offset(self): 1183 from fractions import Fraction 1184 1185 return Fraction(self.__track_offset__ + self.__offset__, 1186 self.__sample_rate__) 1187 1188 def set_track(self, audiofile): 1189 """sets this index's source to the given AudioFile""" 1190 1191 self.__sample_rate__ = audiofile.sample_rate() 1192 1193 1194class Flac_PICTURE(Image): 1195 BLOCK_ID = 6 1196 1197 def __init__(self, picture_type, mime_type, description, 1198 width, height, color_depth, color_count, data): 1199 """ 1200 picture_type - int of FLAC picture ID 1201 mime_type - unicode string of MIME type 1202 description - unicode string of description 1203 width - int width value 1204 height - int height value 1205 color_depth - int bits-per-pixel value 1206 color_count - int color count value 1207 data - binary string of image data 1208 """ 1209 1210 from audiotools import PY3 1211 1212 assert(isinstance(picture_type, int)) 1213 assert(isinstance(mime_type, str if PY3 else unicode)) 1214 assert(isinstance(description, str if PY3 else unicode)) 1215 assert(isinstance(width, int)) 1216 assert(isinstance(height, int)) 1217 assert(isinstance(color_depth, int)) 1218 assert(isinstance(color_count, int)) 1219 assert(isinstance(data, bytes)) 1220 1221 # bypass Image's constructor and set block fields directly 1222 Image.__setattr__(self, "data", data) 1223 Image.__setattr__(self, "mime_type", mime_type) 1224 Image.__setattr__(self, "width", width) 1225 Image.__setattr__(self, "height", height) 1226 Image.__setattr__(self, "color_depth", color_depth) 1227 Image.__setattr__(self, "color_count", color_count) 1228 Image.__setattr__(self, "description", description) 1229 Image.__setattr__(self, "picture_type", picture_type) 1230 1231 def copy(self): 1232 """returns a duplicate of this metadata block""" 1233 1234 return Flac_PICTURE(self.picture_type, 1235 self.mime_type, 1236 self.description, 1237 self.width, 1238 self.height, 1239 self.color_depth, 1240 self.color_count, 1241 self.data) 1242 1243 def __getattr__(self, attr): 1244 if attr == "type": 1245 # convert FLAC picture_type to Image type 1246 # 1247 # | Item | FLAC Picture ID | Image type | 1248 # |--------------+-----------------+------------| 1249 # | Other | 0 | 4 | 1250 # | Front Cover | 3 | 0 | 1251 # | Back Cover | 4 | 1 | 1252 # | Leaflet Page | 5 | 2 | 1253 # | Media | 6 | 3 | 1254 1255 from audiotools import (FRONT_COVER, 1256 BACK_COVER, 1257 LEAFLET_PAGE, 1258 MEDIA, 1259 OTHER) 1260 1261 return {0: OTHER, 1262 3: FRONT_COVER, 1263 4: BACK_COVER, 1264 5: LEAFLET_PAGE, 1265 6: MEDIA}.get(self.picture_type, OTHER) 1266 else: 1267 return Image.__getattribute__(self, attr) 1268 1269 def __setattr__(self, attr, value): 1270 if attr == "type": 1271 # convert Image type to FLAC picture_type 1272 # 1273 # | Item | Image type | FLAC Picture ID | 1274 # |--------------+------------+-----------------| 1275 # | Other | 4 | 0 | 1276 # | Front Cover | 0 | 3 | 1277 # | Back Cover | 1 | 4 | 1278 # | Leaflet Page | 2 | 5 | 1279 # | Media | 3 | 6 | 1280 1281 from audiotools import (FRONT_COVER, 1282 BACK_COVER, 1283 LEAFLET_PAGE, 1284 MEDIA, 1285 OTHER) 1286 1287 self.picture_type = {OTHER: 0, 1288 FRONT_COVER: 3, 1289 BACK_COVER: 4, 1290 LEAFLET_PAGE: 5, 1291 MEDIA: 6}.get(value, 0) 1292 else: 1293 Image.__setattr__(self, attr, value) 1294 1295 def __repr__(self): 1296 return ("Flac_PICTURE(%s)" % 1297 ",".join(["%s=%s" % (attr, repr(getattr(self, attr))) 1298 for attr in ["picture_type", 1299 "mime_type", 1300 "description", 1301 "width", 1302 "height", 1303 "color_depth", 1304 "color_count"]])) 1305 1306 def raw_info(self): 1307 """returns a human-readable version of this metadata block 1308 as unicode""" 1309 1310 from os import linesep 1311 1312 return linesep.join( 1313 [u" PICTURE:", 1314 u" picture type = %d" % (self.picture_type), 1315 u" MIME type = %s" % (self.mime_type), 1316 u" description = %s" % (self.description), 1317 u" width = %d" % (self.width), 1318 u" height = %d" % (self.height), 1319 u" color depth = %d" % (self.color_depth), 1320 u" color count = %d" % (self.color_count), 1321 u" bytes = %d" % (len(self.data))]) 1322 1323 @classmethod 1324 def parse(cls, reader): 1325 """returns this metadata block from a BitstreamReader""" 1326 1327 picture_type = reader.read(32) 1328 mime_type = reader.read_bytes(reader.read(32)).decode('ascii') 1329 description = reader.read_bytes(reader.read(32)).decode('utf-8') 1330 width = reader.read(32) 1331 height = reader.read(32) 1332 color_depth = reader.read(32) 1333 color_count = reader.read(32) 1334 data = reader.read_bytes(reader.read(32)) 1335 1336 return cls(picture_type=picture_type, 1337 mime_type=mime_type, 1338 description=description, 1339 width=width, 1340 height=height, 1341 color_depth=color_depth, 1342 color_count=color_count, 1343 data=data) 1344 1345 def build(self, writer): 1346 """writes this metadata block to a BitstreamWriter""" 1347 1348 writer.write(32, self.picture_type) 1349 mime_type = self.mime_type.encode('ascii') 1350 writer.write(32, len(mime_type)) 1351 writer.write_bytes(mime_type) 1352 description = self.description.encode('utf-8') 1353 writer.write(32, len(description)) 1354 writer.write_bytes(description) 1355 writer.write(32, self.width) 1356 writer.write(32, self.height) 1357 writer.write(32, self.color_depth) 1358 writer.write(32, self.color_count) 1359 writer.write(32, len(self.data)) 1360 writer.write_bytes(self.data) 1361 1362 def size(self): 1363 """the size of this metadata block 1364 not including the 4-byte block header""" 1365 1366 return (4 + # picture_type 1367 4 + len(self.mime_type.encode('ascii')) + 1368 4 + len(self.description.encode('utf-8')) + 1369 4 + # width 1370 4 + # height 1371 4 + # color_count 1372 4 + # color_depth 1373 4 + len(self.data)) 1374 1375 @classmethod 1376 def converted(cls, image): 1377 """converts an Image object to a FlacPictureComment""" 1378 1379 return cls( 1380 picture_type={4: 0, 0: 3, 1: 4, 2: 5, 3: 6}.get(image.type, 0), 1381 mime_type=image.mime_type, 1382 description=image.description, 1383 width=image.width, 1384 height=image.height, 1385 color_depth=image.color_depth, 1386 color_count=image.color_count, 1387 data=image.data) 1388 1389 def type_string(self): 1390 """returns the image's type as a human readable plain string 1391 1392 for example, an image of type 0 returns "Front Cover" 1393 """ 1394 1395 return {0: "Other", 1396 1: "File icon", 1397 2: "Other file icon", 1398 3: "Cover (front)", 1399 4: "Cover (back)", 1400 5: "Leaflet page", 1401 6: "Media", 1402 7: "Lead artist / lead performer / soloist", 1403 8: "Artist / Performer", 1404 9: "Conductor", 1405 10: "Band / Orchestra", 1406 11: "Composer", 1407 12: "Lyricist / Text writer", 1408 13: "Recording Location", 1409 14: "During recording", 1410 15: "During performance", 1411 16: "Movie / Video screen capture", 1412 17: "A bright colored fish", 1413 18: "Illustration", 1414 19: "Band/Artist logotype", 1415 20: "Publisher / Studio logotype"}.get(self.picture_type, 1416 "Other") 1417 1418 def clean(self): 1419 from audiotools.image import image_metrics 1420 1421 img = image_metrics(self.data) 1422 1423 if (((self.mime_type != img.mime_type) or 1424 (self.width != img.width) or 1425 (self.height != img.height) or 1426 (self.color_depth != img.bits_per_pixel) or 1427 (self.color_count != img.color_count))): 1428 1429 from audiotools.text import CLEAN_FIX_IMAGE_FIELDS 1430 1431 return (self.__class__.converted( 1432 Image(type=self.type, 1433 mime_type=img.mime_type, 1434 description=self.description, 1435 width=img.width, 1436 height=img.height, 1437 color_depth=img.bits_per_pixel, 1438 color_count=img.color_count, 1439 data=self.data)), [CLEAN_FIX_IMAGE_FIELDS]) 1440 else: 1441 return (self, []) 1442 1443 1444class FlacAudio(WaveContainer, AiffContainer): 1445 """a Free Lossless Audio Codec file""" 1446 1447 from audiotools.text import (COMP_FLAC_0, 1448 COMP_FLAC_8) 1449 1450 SUFFIX = "flac" 1451 NAME = SUFFIX 1452 DESCRIPTION = u"Free Lossless Audio Codec" 1453 DEFAULT_COMPRESSION = "8" 1454 COMPRESSION_MODES = tuple(map(str, range(0, 9))) 1455 COMPRESSION_DESCRIPTIONS = {"0": COMP_FLAC_0, 1456 "8": COMP_FLAC_8} 1457 1458 METADATA_CLASS = FlacMetaData 1459 1460 def __init__(self, filename): 1461 """filename is a plain string""" 1462 1463 AudioFile.__init__(self, filename) 1464 self.__samplerate__ = 0 1465 self.__channels__ = 0 1466 self.__bitspersample__ = 0 1467 self.__total_frames__ = 0 1468 self.__stream_offset__ = 0 1469 self.__stream_suffix__ = 0 1470 self.__md5__ = b"\x00" * 16 1471 1472 try: 1473 self.__read_streaminfo__() 1474 except IOError as msg: 1475 raise InvalidFLAC(str(msg)) 1476 1477 def channel_mask(self): 1478 """returns a ChannelMask object of this track's channel layout""" 1479 1480 from audiotools import ChannelMask 1481 1482 if self.channels() <= 2: 1483 return ChannelMask.from_channels(self.channels()) 1484 1485 try: 1486 metadata = self.get_metadata() 1487 if metadata is not None: 1488 channel_mask = ChannelMask( 1489 int(metadata.get_block( 1490 Flac_VORBISCOMMENT.BLOCK_ID)[ 1491 u"WAVEFORMATEXTENSIBLE_CHANNEL_MASK"][0], 16)) 1492 if len(channel_mask) == self.channels(): 1493 return channel_mask 1494 else: 1495 # channel count mismatch in given mask 1496 return ChannelMask(0) 1497 else: 1498 # proceed to generate channel mask 1499 raise ValueError() 1500 except (IndexError, KeyError, ValueError): 1501 # if there is no VORBIS_COMMENT block 1502 # or no WAVEFORMATEXTENSIBLE_CHANNEL_MASK in that block 1503 # or it's not an integer, 1504 # use FLAC's default mask based on channels 1505 if self.channels() == 3: 1506 return ChannelMask.from_fields( 1507 front_left=True, front_right=True, front_center=True) 1508 elif self.channels() == 4: 1509 return ChannelMask.from_fields( 1510 front_left=True, front_right=True, 1511 back_left=True, back_right=True) 1512 elif self.channels() == 5: 1513 return ChannelMask.from_fields( 1514 front_left=True, front_right=True, front_center=True, 1515 back_left=True, back_right=True) 1516 elif self.channels() == 6: 1517 return ChannelMask.from_fields( 1518 front_left=True, front_right=True, front_center=True, 1519 back_left=True, back_right=True, 1520 low_frequency=True) 1521 elif self.channels() == 7: 1522 return ChannelMask.from_fields( 1523 front_left=True, front_right=True, front_center=True, 1524 low_frequency=True, back_center=True, 1525 side_left=True, side_right=True) 1526 elif self.channels() == 8: 1527 return ChannelMask.from_fields( 1528 front_left=True, front_right=True, front_center=True, 1529 low_frequency=True, 1530 back_left=True, back_right=True, 1531 side_left=True, side_right=True) 1532 else: 1533 # shouldn't be able to happen 1534 return ChannelMask(0) 1535 1536 def lossless(self): 1537 """returns True""" 1538 1539 return True 1540 1541 @classmethod 1542 def supports_metadata(cls): 1543 """returns True if this audio type supports MetaData""" 1544 1545 return True 1546 1547 def get_metadata(self): 1548 """returns a MetaData object, or None 1549 1550 raises IOError if unable to read the file""" 1551 1552 from audiotools.bitstream import BitstreamReader 1553 1554 # FlacAudio *always* returns a FlacMetaData object 1555 # even if the blocks aren't present 1556 # so there's no need to test for None 1557 1558 with BitstreamReader(open(self.filename, 'rb'), False) as reader: 1559 reader.seek(self.__stream_offset__, 0) 1560 if reader.read_bytes(4) == b"fLaC": 1561 return FlacMetaData.parse(reader) 1562 else: 1563 # shouldn't be able to get here 1564 return None 1565 1566 def update_metadata(self, metadata): 1567 """takes this track's current MetaData object 1568 as returned by get_metadata() and sets this track's metadata 1569 with any fields updated in that object 1570 1571 raises IOError if unable to write the file 1572 """ 1573 1574 from audiotools.bitstream import BitstreamWriter 1575 from audiotools.bitstream import BitstreamReader 1576 from operator import add 1577 1578 if metadata is None: 1579 return 1580 1581 if not isinstance(metadata, FlacMetaData): 1582 from audiotools.text import ERR_FOREIGN_METADATA 1583 raise ValueError(ERR_FOREIGN_METADATA) 1584 1585 padding_blocks = metadata.get_blocks(Flac_PADDING.BLOCK_ID) 1586 has_padding = len(padding_blocks) > 0 1587 total_padding_size = sum(b.size() for b in padding_blocks) 1588 1589 metadata_delta = metadata.size() - self.get_metadata().size() 1590 1591 if has_padding and (metadata_delta <= total_padding_size): 1592 # if padding size is larger than change in metadata 1593 # shrink padding blocks so that new size matches old size 1594 # (if metadata_delta is negative, 1595 # this will enlarge padding blocks as necessary) 1596 1597 for padding in padding_blocks: 1598 if metadata_delta > 0: 1599 # extract bytes from PADDING blocks 1600 # until the metadata_delta is exhausted 1601 if metadata_delta <= padding.length: 1602 padding.length -= metadata_delta 1603 metadata_delta = 0 1604 else: 1605 metadata_delta -= padding.length 1606 padding.length = 0 1607 elif metadata_delta < 0: 1608 # dump all our new bytes into the first PADDING block found 1609 padding.length += -metadata_delta 1610 metadata_delta = 0 1611 else: 1612 break 1613 1614 # then overwrite the beginning of the file 1615 stream = open(self.filename, 'r+b') 1616 stream.seek(self.__stream_offset__, 0) 1617 writer = BitstreamWriter(stream, 0) 1618 writer.write_bytes(b'fLaC') 1619 metadata.build(writer) 1620 writer.flush() 1621 writer.close() 1622 else: 1623 # if padding is smaller than change in metadata, 1624 # or file has no padding, 1625 # rewrite entire file to fit new metadata 1626 1627 from audiotools import TemporaryFile, transfer_data 1628 from audiotools.bitstream import parse 1629 1630 # dump any prefix data from old file to new one 1631 old_file = open(self.filename, "rb") 1632 new_file = TemporaryFile(self.filename) 1633 1634 new_file.write(old_file.read(self.__stream_offset__)) 1635 1636 if old_file.read(4) != b'fLaC': 1637 from audiotools.text import ERR_FLAC_INVALID_FILE 1638 raise InvalidFLAC(ERR_FLAC_INVALID_FILE) 1639 1640 stop = 0 1641 while stop == 0: 1642 (stop, length) = parse("1u 7p 24u", False, old_file.read(4)) 1643 old_file.read(length) 1644 1645 # write new metadata to new file 1646 writer = BitstreamWriter(new_file, False) 1647 writer.write_bytes(b"fLaC") 1648 metadata.build(writer) 1649 1650 # write remaining old data to new file 1651 transfer_data(old_file.read, writer.write_bytes) 1652 1653 # commit change to disk 1654 old_file.close() 1655 writer.close() 1656 1657 def set_metadata(self, metadata): 1658 """takes a MetaData object and sets this track's metadata 1659 1660 this metadata includes track name, album name, and so on 1661 raises IOError if unable to read or write the file""" 1662 1663 if metadata is None: 1664 return self.delete_metadata() 1665 1666 new_metadata = self.METADATA_CLASS.converted(metadata) 1667 1668 old_metadata = self.get_metadata() 1669 if old_metadata is None: 1670 # this shouldn't happen 1671 old_metadata = FlacMetaData([]) 1672 1673 # replace old metadata's VORBIS_COMMENT with one from new metadata 1674 # (if any) 1675 if new_metadata.has_block(Flac_VORBISCOMMENT.BLOCK_ID): 1676 new_vorbiscomment = new_metadata.get_block( 1677 Flac_VORBISCOMMENT.BLOCK_ID) 1678 1679 if old_metadata.has_block(Flac_VORBISCOMMENT.BLOCK_ID): 1680 # both new and old metadata have a VORBIS_COMMENT block 1681 1682 old_vorbiscomment = old_metadata.get_block( 1683 Flac_VORBISCOMMENT.BLOCK_ID) 1684 1685 # update vendor string from our current VORBIS_COMMENT block 1686 new_vorbiscomment.vendor_string = \ 1687 old_vorbiscomment.vendor_string 1688 1689 # update REPLAYGAIN_* tags from 1690 # our current VORBIS_COMMENT block 1691 for key in [u"REPLAYGAIN_TRACK_GAIN", 1692 u"REPLAYGAIN_TRACK_PEAK", 1693 u"REPLAYGAIN_ALBUM_GAIN", 1694 u"REPLAYGAIN_ALBUM_PEAK", 1695 u"REPLAYGAIN_REFERENCE_LOUDNESS"]: 1696 try: 1697 new_vorbiscomment[key] = old_vorbiscomment[key] 1698 except KeyError: 1699 new_vorbiscomment[key] = [] 1700 1701 # update WAVEFORMATEXTENSIBLE_CHANNEL_MASK 1702 # from our current VORBIS_COMMENT block, if any 1703 if (((self.channels() > 2) or 1704 (self.bits_per_sample() > 16)) and 1705 (u"WAVEFORMATEXTENSIBLE_CHANNEL_MASK" in 1706 old_vorbiscomment.keys())): 1707 new_vorbiscomment[u"WAVEFORMATEXTENSIBLE_CHANNEL_MASK"] = \ 1708 old_vorbiscomment[u"WAVEFORMATEXTENSIBLE_CHANNEL_MASK"] 1709 elif (u"WAVEFORMATEXTENSIBLE_CHANNEL_MASK" in 1710 new_vorbiscomment.keys()): 1711 new_vorbiscomment[ 1712 u"WAVEFORMATEXTENSIBLE_CHANNEL_MASK"] = [] 1713 1714 # update CDTOC from our current VORBIS_COMMENT block, if any 1715 try: 1716 new_vorbiscomment[u"CDTOC"] = old_vorbiscomment[u"CDTOC"] 1717 except KeyError: 1718 new_vorbiscomment[u"CDTOC"] = [] 1719 1720 old_metadata.replace_blocks(Flac_VORBISCOMMENT.BLOCK_ID, 1721 [new_vorbiscomment]) 1722 else: 1723 # new metadata has VORBIS_COMMENT block, 1724 # but old metadata does not 1725 1726 # remove REPLAYGAIN_* tags from new VORBIS_COMMENT block 1727 for key in [u"REPLAYGAIN_TRACK_GAIN", 1728 u"REPLAYGAIN_TRACK_PEAK", 1729 u"REPLAYGAIN_ALBUM_GAIN", 1730 u"REPLAYGAIN_ALBUM_PEAK", 1731 u"REPLAYGAIN_REFERENCE_LOUDNESS"]: 1732 new_vorbiscomment[key] = [] 1733 1734 # update WAVEFORMATEXTENSIBLE_CHANNEL_MASK 1735 # from our actual mask if necessary 1736 if (self.channels() > 2) or (self.bits_per_sample() > 16): 1737 new_vorbiscomment[u"WAVEFORMATEXTENSIBLE_CHANNEL_MASK"] = [ 1738 u"0x%.4X" % (self.channel_mask())] 1739 1740 # remove CDTOC from new VORBIS_COMMENT block 1741 new_vorbiscomment[u"CDTOC"] = [] 1742 1743 old_metadata.add_block(new_vorbiscomment) 1744 else: 1745 # new metadata has no VORBIS_COMMENT block 1746 pass 1747 1748 # replace old metadata's PICTURE blocks with those from new metadata 1749 old_metadata.replace_blocks( 1750 Flac_PICTURE.BLOCK_ID, 1751 new_metadata.get_blocks(Flac_PICTURE.BLOCK_ID)) 1752 1753 # everything else remains as-is 1754 1755 self.update_metadata(old_metadata) 1756 1757 def delete_metadata(self): 1758 """deletes the track's MetaData 1759 1760 this removes or unsets tags as necessary in order to remove all data 1761 raises IOError if unable to write the file""" 1762 1763 self.set_metadata(MetaData()) 1764 1765 @classmethod 1766 def supports_cuesheet(cls): 1767 return True 1768 1769 def set_cuesheet(self, cuesheet): 1770 """imports cuesheet data from a Sheet object 1771 1772 Raises IOError if an error occurs setting the cuesheet""" 1773 1774 if cuesheet is not None: 1775 # overwrite old cuesheet (if any) with new block 1776 metadata = self.get_metadata() 1777 metadata.replace_blocks( 1778 Flac_CUESHEET.BLOCK_ID, 1779 [Flac_CUESHEET.converted( 1780 cuesheet, 1781 self.total_frames(), 1782 self.sample_rate(), 1783 (self.sample_rate() == 44100) and 1784 (self.channels() == 2) and 1785 (self.bits_per_sample() == 16))]) 1786 1787 # wipe out any CDTOC tag 1788 try: 1789 vorbiscomment = metadata.get_block(Flac_VORBISCOMMENT.BLOCK_ID) 1790 if u"CDTOC" in vorbiscomment: 1791 del(vorbiscomment[u"CDTOC"]) 1792 except IndexError: 1793 pass 1794 1795 self.update_metadata(metadata) 1796 else: 1797 self.delete_cuesheet() 1798 1799 def get_cuesheet(self): 1800 """returns the embedded Sheet object, or None 1801 1802 Raises IOError if a problem occurs when reading the file""" 1803 1804 metadata = self.get_metadata() 1805 1806 # first, check for a CUESHEET block 1807 try: 1808 cuesheet = metadata.get_block(Flac_CUESHEET.BLOCK_ID) 1809 cuesheet.set_track(self) 1810 return cuesheet 1811 except IndexError: 1812 pass 1813 1814 # then, check for a CDTOC tag 1815 try: 1816 vorbiscomment = metadata.get_block(Flac_VORBISCOMMENT.BLOCK_ID) 1817 if u"CDTOC" in vorbiscomment: 1818 from audiotools.cdtoc import CDTOC 1819 try: 1820 return CDTOC.from_unicode(vorbiscomment[u"CDTOC"][0]) 1821 except ValueError: 1822 pass 1823 except IndexError: 1824 pass 1825 1826 return None 1827 1828 def delete_cuesheet(self): 1829 """deletes embedded Sheet object, if any 1830 1831 Raises IOError if a problem occurs when updating the file""" 1832 1833 metadata = self.get_metadata() 1834 1835 # wipe out any CUESHEET blocks 1836 metadata.replace_blocks(Flac_CUESHEET.BLOCK_ID, []) 1837 1838 # then erase any CDTOC tags 1839 try: 1840 vorbiscomment = metadata.get_block(Flac_VORBISCOMMENT.BLOCK_ID) 1841 del(vorbiscomment[u"CDTOC"]) 1842 except IndexError: 1843 pass 1844 self.update_metadata(metadata) 1845 1846 def to_pcm(self): 1847 """returns a PCMReader object containing the track's PCM data""" 1848 1849 from audiotools import decoders 1850 from audiotools import PCMReaderError 1851 1852 try: 1853 flac = open(self.filename, "rb") 1854 if self.__stream_offset__ > 0: 1855 flac.seek(self.__stream_offset__) 1856 return decoders.FlacDecoder(flac) 1857 except (IOError, ValueError) as msg: 1858 # The only time this is likely to occur is 1859 # if the FLAC is modified between when FlacAudio 1860 # is initialized and when to_pcm() is called. 1861 flac.close() 1862 return PCMReaderError(error_message=str(msg), 1863 sample_rate=self.sample_rate(), 1864 channels=self.channels(), 1865 channel_mask=int(self.channel_mask()), 1866 bits_per_sample=self.bits_per_sample()) 1867 1868 @classmethod 1869 def from_pcm(cls, filename, pcmreader, 1870 compression=None, 1871 total_pcm_frames=None, 1872 encoding_function=None): 1873 """encodes a new file from PCM data 1874 1875 takes a filename string, PCMReader object, 1876 optional compression level string and 1877 optional total_pcm_frames integer 1878 encodes a new audio file from pcmreader's data 1879 at the given filename with the specified compression level 1880 and returns a new FlacAudio object""" 1881 1882 from audiotools.encoders import encode_flac 1883 from audiotools import EncodingError 1884 from audiotools import UnsupportedChannelCount 1885 from audiotools import BufferedPCMReader 1886 from audiotools import CounterPCMReader 1887 from audiotools import __default_quality__ 1888 1889 if ((compression is None) or (compression not in 1890 cls.COMPRESSION_MODES)): 1891 compression = __default_quality__(cls.NAME) 1892 1893 encoding_options = { 1894 "0": {"block_size": 1152, 1895 "max_lpc_order": 0, 1896 "min_residual_partition_order": 0, 1897 "max_residual_partition_order": 3}, 1898 "1": {"block_size": 1152, 1899 "max_lpc_order": 0, 1900 "adaptive_mid_side": True, 1901 "min_residual_partition_order": 0, 1902 "max_residual_partition_order": 3}, 1903 "2": {"block_size": 1152, 1904 "max_lpc_order": 0, 1905 "exhaustive_model_search": True, 1906 "min_residual_partition_order": 0, 1907 "max_residual_partition_order": 3}, 1908 "3": {"block_size": 4096, 1909 "max_lpc_order": 6, 1910 "min_residual_partition_order": 0, 1911 "max_residual_partition_order": 4}, 1912 "4": {"block_size": 4096, 1913 "max_lpc_order": 8, 1914 "adaptive_mid_side": True, 1915 "min_residual_partition_order": 0, 1916 "max_residual_partition_order": 4}, 1917 "5": {"block_size": 4096, 1918 "max_lpc_order": 8, 1919 "mid_side": True, 1920 "min_residual_partition_order": 0, 1921 "max_residual_partition_order": 5}, 1922 "6": {"block_size": 4096, 1923 "max_lpc_order": 8, 1924 "mid_side": True, 1925 "min_residual_partition_order": 0, 1926 "max_residual_partition_order": 6}, 1927 "7": {"block_size": 4096, 1928 "max_lpc_order": 8, 1929 "mid_side": True, 1930 "exhaustive_model_search": True, 1931 "min_residual_partition_order": 0, 1932 "max_residual_partition_order": 6}, 1933 "8": {"block_size": 4096, 1934 "max_lpc_order": 12, 1935 "mid_side": True, 1936 "exhaustive_model_search": True, 1937 "min_residual_partition_order": 0, 1938 "max_residual_partition_order": 6}}[compression] 1939 1940 if pcmreader.channels > 8: 1941 raise UnsupportedChannelCount(filename, pcmreader.channels) 1942 1943 if pcmreader.channel_mask == 0: 1944 if pcmreader.channels <= 6: 1945 channel_mask = {1: 0x0004, 1946 2: 0x0003, 1947 3: 0x0007, 1948 4: 0x0033, 1949 5: 0x0037, 1950 6: 0x003F}[pcmreader.channels] 1951 else: 1952 channel_mask = 0 1953 elif (pcmreader.channel_mask in 1954 {0x0001, # 1ch - mono 1955 0x0004, # 1ch - mono 1956 0x0003, # 2ch - left, right 1957 0x0007, # 3ch - left, right, center 1958 0x0033, # 4ch - left, right, back left, back right 1959 0x0603, # 4ch - left, right, side left, side right 1960 0x0037, # 5ch - L, R, C, back left, back right 1961 0x0607, # 5ch - L, R, C, side left, side right 1962 0x003F, # 6ch - L, R, C, LFE, back left, back right 1963 0x060F}): # 6ch - L, R, C, LFE, side left, side right 1964 channel_mask = pcmreader.channel_mask 1965 else: 1966 from audiotools import UnsupportedChannelMask 1967 1968 raise UnsupportedChannelMask(filename, pcmreader.channel_mask) 1969 1970 if total_pcm_frames is not None: 1971 expected_seekpoints = \ 1972 ((total_pcm_frames // (pcmreader.sample_rate * 10)) + 1973 (1 if (total_pcm_frames % (pcmreader.sample_rate * 10)) else 1974 0)) 1975 padding_size = 4096 + 4 + (expected_seekpoints * 18) 1976 pcmreader = CounterPCMReader(pcmreader) 1977 else: 1978 padding_size = 4096 1979 1980 try: 1981 offsets = (encode_flac if encoding_function is None 1982 else encoding_function)( 1983 filename, 1984 pcmreader=BufferedPCMReader(pcmreader), 1985 padding_size=padding_size, 1986 **encoding_options) 1987 1988 if ((total_pcm_frames is not None) and 1989 (total_pcm_frames != pcmreader.frames_written)): 1990 from audiotools.text import ERR_TOTAL_PCM_FRAMES_MISMATCH 1991 raise EncodingError(ERR_TOTAL_PCM_FRAMES_MISMATCH) 1992 1993 flac = FlacAudio(filename) 1994 metadata = flac.get_metadata() 1995 assert(metadata is not None) 1996 1997 # generate SEEKTABLE from encoder offsets and add it to metadata 1998 seekpoint_interval = pcmreader.sample_rate * 10 1999 2000 metadata.add_block( 2001 flac.seektable( 2002 [(byte_offset, 2003 pcm_frames) for byte_offset, pcm_frames in offsets], 2004 seekpoint_interval)) 2005 2006 # if channels or bps is too high, 2007 # automatically generate and add channel mask 2008 if ((((pcmreader.channels > 2) or 2009 (pcmreader.bits_per_sample > 16)) and 2010 (channel_mask != 0))): 2011 vorbis = metadata.get_block(Flac_VORBISCOMMENT.BLOCK_ID) 2012 vorbis[u"WAVEFORMATEXTENSIBLE_CHANNEL_MASK"] = [ 2013 u"0x%.4X" % (channel_mask)] 2014 2015 flac.update_metadata(metadata) 2016 2017 return flac 2018 except (IOError, ValueError) as err: 2019 cls.__unlink__(filename) 2020 raise EncodingError(str(err)) 2021 except Exception: 2022 cls.__unlink__(filename) 2023 raise 2024 finally: 2025 pcmreader.close() 2026 2027 def seekable(self): 2028 """returns True if the file is seekable""" 2029 2030 return self.get_metadata().has_block(Flac_SEEKTABLE.BLOCK_ID) 2031 2032 def seektable(self, offsets=None, seekpoint_interval=None): 2033 """returns a new Flac_SEEKTABLE object 2034 created from parsing the FLAC file itself""" 2035 2036 from bisect import bisect_right 2037 2038 if offsets is None: 2039 with self.to_pcm() as pcmreader: 2040 offsets = pcmreader.offsets() 2041 2042 if seekpoint_interval is None: 2043 seekpoint_interval = self.sample_rate() * 10 2044 2045 total_samples = 0 2046 all_frames = {} 2047 sample_offsets = [] 2048 for (byte_offset, pcm_frames) in offsets: 2049 all_frames[total_samples] = (byte_offset, pcm_frames) 2050 sample_offsets.append(total_samples) 2051 total_samples += pcm_frames 2052 2053 seekpoints = [] 2054 for pcm_frame in range(0, self.total_frames(), seekpoint_interval): 2055 flac_frame = bisect_right(sample_offsets, pcm_frame) - 1 2056 seekpoints.append((sample_offsets[flac_frame], 2057 all_frames[sample_offsets[flac_frame]][0], 2058 all_frames[sample_offsets[flac_frame]][1])) 2059 2060 return Flac_SEEKTABLE(seekpoints) 2061 2062 def has_foreign_wave_chunks(self): 2063 """returns True if the audio file contains non-audio RIFF chunks 2064 2065 during transcoding, if the source audio file has foreign RIFF chunks 2066 and the target audio format supports foreign RIFF chunks, 2067 conversion should be routed through .wav conversion 2068 to avoid losing those chunks""" 2069 2070 try: 2071 return b'riff' in [ 2072 block.application_id for block in 2073 self.get_metadata().get_blocks(Flac_APPLICATION.BLOCK_ID)] 2074 except IOError: 2075 return False 2076 2077 def wave_header_footer(self): 2078 """returns (header, footer) tuple of strings 2079 containing all data before and after the PCM stream 2080 2081 may raise ValueError if there's a problem with 2082 the header or footer data 2083 may raise IOError if there's a problem reading 2084 header or footer data from the file 2085 """ 2086 2087 from audiotools.wav import pad_data 2088 2089 header = [] 2090 if (pad_data(self.total_frames(), 2091 self.channels(), 2092 self.bits_per_sample())): 2093 footer = [b"\x00"] 2094 else: 2095 footer = [] 2096 current_block = header 2097 2098 metadata = self.get_metadata() 2099 2100 # convert individual chunks into combined header and footer strings 2101 for block in metadata.get_blocks(Flac_APPLICATION.BLOCK_ID): 2102 if block.application_id == b"riff": 2103 chunk_id = block.data[0:4] 2104 # combine APPLICATION metadata blocks up to "data" as header 2105 if chunk_id != b"data": 2106 current_block.append(block.data) 2107 else: 2108 # combine APPLICATION metadata blocks past "data" as footer 2109 current_block.append(block.data) 2110 current_block = footer 2111 2112 # return tuple of header and footer 2113 if (len(header) != 0) or (len(footer) != 0): 2114 return (b"".join(header), b"".join(footer)) 2115 else: 2116 raise ValueError("no foreign RIFF chunks") 2117 2118 @classmethod 2119 def from_wave(cls, filename, header, pcmreader, footer, compression=None): 2120 """encodes a new file from wave data 2121 2122 takes a filename string, header string, 2123 PCMReader object, footer string 2124 and optional compression level string 2125 encodes a new audio file from pcmreader's data 2126 at the given filename with the specified compression level 2127 and returns a new WaveAudio object 2128 2129 may raise EncodingError if some problem occurs when 2130 encoding the input file""" 2131 2132 from io import BytesIO 2133 from audiotools.bitstream import BitstreamReader 2134 from audiotools.bitstream import BitstreamRecorder 2135 from audiotools.bitstream import format_byte_size 2136 from audiotools.wav import (pad_data, WaveAudio) 2137 from audiotools import (EncodingError, CounterPCMReader) 2138 2139 # split header and footer into distinct chunks 2140 header_len = len(header) 2141 footer_len = len(footer) 2142 fmt_found = False 2143 blocks = [] 2144 try: 2145 # read everything from start of header to "data<size>" 2146 # chunk header 2147 r = BitstreamReader(BytesIO(header), True) 2148 (riff, remaining_size, wave) = r.parse("4b 32u 4b") 2149 if riff != b"RIFF": 2150 from audiotools.text import ERR_WAV_NOT_WAVE 2151 raise EncodingError(ERR_WAV_NOT_WAVE) 2152 elif wave != b"WAVE": 2153 from audiotools.text import ERR_WAV_INVALID_WAVE 2154 raise EncodingError(ERR_WAV_INVALID_WAVE) 2155 else: 2156 block_data = BitstreamRecorder(True) 2157 block_data.build("4b 32u 4b", (riff, remaining_size, wave)) 2158 blocks.append(Flac_APPLICATION(b"riff", block_data.data())) 2159 total_size = remaining_size + 8 2160 header_len -= format_byte_size("4b 32u 4b") 2161 2162 while header_len: 2163 block_data = BitstreamRecorder(True) 2164 (chunk_id, chunk_size) = r.parse("4b 32u") 2165 # ensure chunk ID is valid 2166 if (not frozenset(chunk_id).issubset( 2167 WaveAudio.PRINTABLE_ASCII)): 2168 from audiotools.text import ERR_WAV_INVALID_CHUNK 2169 raise EncodingError(ERR_WAV_INVALID_CHUNK) 2170 else: 2171 header_len -= format_byte_size("4b 32u") 2172 block_data.build("4b 32u", (chunk_id, chunk_size)) 2173 2174 if chunk_id == b"data": 2175 # transfer only "data" chunk header to APPLICATION block 2176 if header_len != 0: 2177 from audiotools.text import ERR_WAV_HEADER_EXTRA_DATA 2178 raise EncodingError(ERR_WAV_HEADER_EXTRA_DATA % 2179 (header_len)) 2180 elif not fmt_found: 2181 from audiotools.text import ERR_WAV_NO_FMT_CHUNK 2182 raise EncodingError(ERR_WAV_NO_FMT_CHUNK) 2183 else: 2184 blocks.append( 2185 Flac_APPLICATION(b"riff", block_data.data())) 2186 data_chunk_size = chunk_size 2187 break 2188 elif chunk_id == b"fmt ": 2189 if not fmt_found: 2190 fmt_found = True 2191 if chunk_size % 2: 2192 # transfer padded chunk to APPLICATION block 2193 block_data.write_bytes( 2194 r.read_bytes(chunk_size + 1)) 2195 header_len -= (chunk_size + 1) 2196 else: 2197 # transfer un-padded chunk to APPLICATION block 2198 block_data.write_bytes( 2199 r.read_bytes(chunk_size)) 2200 header_len -= chunk_size 2201 2202 blocks.append( 2203 Flac_APPLICATION(b"riff", block_data.data())) 2204 else: 2205 from audiotools.text import ERR_WAV_MULTIPLE_FMT 2206 raise EncodingError(ERR_WAV_MULTIPLE_FMT) 2207 else: 2208 if chunk_size % 2: 2209 # transfer padded chunk to APPLICATION block 2210 block_data.write_bytes(r.read_bytes(chunk_size + 1)) 2211 header_len -= (chunk_size + 1) 2212 else: 2213 # transfer un-padded chunk to APPLICATION block 2214 block_data.write_bytes(r.read_bytes(chunk_size)) 2215 header_len -= chunk_size 2216 2217 blocks.append(Flac_APPLICATION(b"riff", block_data.data())) 2218 else: 2219 from audiotools.text import ERR_WAV_NO_DATA_CHUNK 2220 raise EncodingError(ERR_WAV_NO_DATA_CHUNK) 2221 except IOError: 2222 from audiotools.text import ERR_WAV_HEADER_IOERROR 2223 raise EncodingError(ERR_WAV_HEADER_IOERROR) 2224 2225 try: 2226 # read everything from start of footer to end of footer 2227 r = BitstreamReader(BytesIO(footer), True) 2228 # skip initial footer pad byte 2229 if data_chunk_size % 2: 2230 r.skip_bytes(1) 2231 footer_len -= 1 2232 2233 while footer_len: 2234 block_data = BitstreamRecorder(True) 2235 (chunk_id, chunk_size) = r.parse("4b 32u") 2236 2237 if (not frozenset(chunk_id).issubset( 2238 WaveAudio.PRINTABLE_ASCII)): 2239 # ensure chunk ID is valid 2240 from audiotools.text import ERR_WAV_INVALID_CHUNK 2241 raise EncodingError(ERR_WAV_INVALID_CHUNK) 2242 elif chunk_id == b"fmt ": 2243 # multiple "fmt " chunks is an error 2244 from audiotools.text import ERR_WAV_MULTIPLE_FMT 2245 raise EncodingError(ERR_WAV_MULTIPLE_FMT) 2246 elif chunk_id == b"data": 2247 # multiple "data" chunks is an error 2248 from audiotools.text import ERR_WAV_MULTIPLE_DATA 2249 raise EncodingError(ERR_WAV_MULTIPLE_DATA) 2250 else: 2251 footer_len -= format_byte_size("4b 32u") 2252 block_data.build("4b 32u", (chunk_id, chunk_size)) 2253 2254 if chunk_size % 2: 2255 # transfer padded chunk to APPLICATION block 2256 block_data.write_bytes(r.read_bytes(chunk_size + 1)) 2257 footer_len -= (chunk_size + 1) 2258 else: 2259 # transfer un-padded chunk to APPLICATION block 2260 block_data.write_bytes(r.read_bytes(chunk_size)) 2261 footer_len -= chunk_size 2262 2263 blocks.append(Flac_APPLICATION(b"riff", block_data.data())) 2264 except IOError: 2265 from audiotools.text import ERR_WAV_FOOTER_IOERROR 2266 raise EncodingError(ERR_WAV_FOOTER_IOERROR) 2267 2268 counter = CounterPCMReader(pcmreader) 2269 2270 # perform standard FLAC encode from PCMReader 2271 flac = cls.from_pcm(filename, counter, compression) 2272 2273 data_bytes_written = counter.bytes_written() 2274 2275 # ensure processed PCM data equals size of "data" chunk 2276 if data_bytes_written != data_chunk_size: 2277 cls.__unlink__(filename) 2278 from audiotools.text import ERR_WAV_TRUNCATED_DATA_CHUNK 2279 raise EncodingError(ERR_WAV_TRUNCATED_DATA_CHUNK) 2280 2281 # ensure total size of header + PCM + footer matches wav's header 2282 if (len(header) + data_bytes_written + len(footer)) != total_size: 2283 cls.__unlink__(filename) 2284 from audiotools.text import ERR_WAV_INVALID_SIZE 2285 raise EncodingError(ERR_WAV_INVALID_SIZE) 2286 2287 # add chunks as APPLICATION metadata blocks 2288 metadata = flac.get_metadata() 2289 for block in blocks: 2290 metadata.add_block(block) 2291 flac.update_metadata(metadata) 2292 2293 # return encoded FLAC file 2294 return flac 2295 2296 def has_foreign_aiff_chunks(self): 2297 """returns True if the audio file contains non-audio AIFF chunks""" 2298 2299 try: 2300 return b'aiff' in [ 2301 block.application_id for block in 2302 self.get_metadata().get_blocks(Flac_APPLICATION.BLOCK_ID)] 2303 except IOError: 2304 return False 2305 2306 def aiff_header_footer(self): 2307 """returns (header, footer) tuple of strings 2308 containing all data before and after the PCM stream 2309 2310 if self.has_foreign_aiff_chunks() is False, 2311 may raise ValueError if the file has no header and footer 2312 for any reason""" 2313 2314 from audiotools.aiff import pad_data 2315 2316 header = [] 2317 if (pad_data(self.total_frames(), 2318 self.channels(), 2319 self.bits_per_sample())): 2320 footer = [b"\x00"] 2321 else: 2322 footer = [] 2323 current_block = header 2324 2325 metadata = self.get_metadata() 2326 if metadata is None: 2327 raise ValueError("no foreign AIFF chunks") 2328 2329 # convert individual chunks into combined header and footer strings 2330 for block in metadata.get_blocks(Flac_APPLICATION.BLOCK_ID): 2331 if block.application_id == b"aiff": 2332 chunk_id = block.data[0:4] 2333 # combine APPLICATION metadata blocks up to "SSND" as header 2334 if chunk_id != b"SSND": 2335 current_block.append(block.data) 2336 else: 2337 # combine APPLICATION metadata blocks past "SSND" as footer 2338 current_block.append(block.data) 2339 current_block = footer 2340 2341 # return tuple of header and footer 2342 if (len(header) != 0) or (len(footer) != 0): 2343 return (b"".join(header), b"".join(footer)) 2344 else: 2345 raise ValueError("no foreign AIFF chunks") 2346 2347 @classmethod 2348 def from_aiff(cls, filename, header, pcmreader, footer, compression=None): 2349 """encodes a new file from AIFF data 2350 2351 takes a filename string, header string, 2352 PCMReader object, footer string 2353 and optional compression level string 2354 encodes a new audio file from pcmreader's data 2355 at the given filename with the specified compression level 2356 and returns a new AiffAudio object 2357 2358 header + pcm data + footer should always result 2359 in the original AIFF file being restored 2360 without need for any padding bytes 2361 2362 may raise EncodingError if some problem occurs when 2363 encoding the input file""" 2364 2365 from io import BytesIO 2366 from audiotools.bitstream import BitstreamReader 2367 from audiotools.bitstream import BitstreamRecorder 2368 from audiotools.bitstream import format_byte_size 2369 from audiotools.aiff import (pad_data, AiffAudio) 2370 from audiotools import (EncodingError, CounterPCMReader) 2371 2372 # split header and footer into distinct chunks 2373 header_len = len(header) 2374 footer_len = len(footer) 2375 comm_found = False 2376 blocks = [] 2377 try: 2378 # read everything from start of header to "SSND<size>" 2379 # chunk header 2380 r = BitstreamReader(BytesIO(header), False) 2381 (form, remaining_size, aiff) = r.parse("4b 32u 4b") 2382 if form != b"FORM": 2383 from audiotools.text import ERR_AIFF_NOT_AIFF 2384 raise EncodingError(ERR_AIFF_NOT_AIFF) 2385 elif aiff != b"AIFF": 2386 from audiotools.text import ERR_AIFF_INVALID_AIFF 2387 raise EncodingError(ERR_AIFF_INVALID_AIFF) 2388 else: 2389 block_data = BitstreamRecorder(0) 2390 block_data.build("4b 32u 4b", (form, remaining_size, aiff)) 2391 blocks.append(Flac_APPLICATION("aiff", block_data.data())) 2392 total_size = remaining_size + 8 2393 header_len -= format_byte_size("4b 32u 4b") 2394 2395 while header_len: 2396 block_data = BitstreamRecorder(0) 2397 (chunk_id, chunk_size) = r.parse("4b 32u") 2398 # ensure chunk ID is valid 2399 if (not frozenset(chunk_id).issubset( 2400 AiffAudio.PRINTABLE_ASCII)): 2401 from audiotools.text import ERR_AIFF_INVALID_CHUNK 2402 raise EncodingError(ERR_AIFF_INVALID_CHUNK) 2403 else: 2404 header_len -= format_byte_size("4b 32u") 2405 block_data.build("4b 32u", (chunk_id, chunk_size)) 2406 2407 if chunk_id == b"SSND": 2408 from audiotools.text import (ERR_AIFF_HEADER_EXTRA_SSND, 2409 ERR_AIFF_HEADER_MISSING_SSND, 2410 ERR_AIFF_NO_COMM_CHUNK) 2411 2412 # transfer only "SSND" chunk header to APPLICATION block 2413 # (including 8 bytes after ID/size header) 2414 if header_len > 8: 2415 raise EncodingError(ERR_AIFF_HEADER_EXTRA_SSND) 2416 elif header_len < 8: 2417 raise EncodingError(ERR_AIFF_HEADER_MISSING_SSND) 2418 elif not comm_found: 2419 raise EncodingError(ERR_AIFF_NO_COMM_CHUNK) 2420 else: 2421 block_data.write_bytes(r.read_bytes(8)) 2422 blocks.append( 2423 Flac_APPLICATION(b"aiff", block_data.data())) 2424 ssnd_chunk_size = (chunk_size - 8) 2425 break 2426 elif chunk_id == b"COMM": 2427 from audiotools.text import ERR_AIFF_MULTIPLE_COMM_CHUNKS 2428 2429 if not comm_found: 2430 comm_found = True 2431 if chunk_size % 2: 2432 # transfer padded chunk to APPLICATION block 2433 block_data.write_bytes( 2434 r.read_bytes(chunk_size + 1)) 2435 header_len -= (chunk_size + 1) 2436 else: 2437 # transfer un-padded chunk to APPLICATION block 2438 block_data.write_bytes( 2439 r.read_bytes(chunk_size)) 2440 header_len -= chunk_size 2441 blocks.append( 2442 Flac_APPLICATION(b"aiff", block_data.data())) 2443 else: 2444 raise EncodingError(ERR_AIFF_MULTIPLE_COMM_CHUNKS) 2445 else: 2446 if chunk_size % 2: 2447 # transfer padded chunk to APPLICATION block 2448 block_data.write_bytes(r.read_bytes(chunk_size + 1)) 2449 header_len -= (chunk_size + 1) 2450 else: 2451 # transfer un-padded chunk to APPLICATION block 2452 block_data.write_bytes(r.read_bytes(chunk_size)) 2453 header_len -= chunk_size 2454 2455 blocks.append(Flac_APPLICATION(b"aiff", block_data.data())) 2456 else: 2457 from audiotools.text import ERR_AIFF_NO_SSND_CHUNK 2458 raise EncodingError(ERR_AIFF_NO_SSND_CHUNK) 2459 except IOError: 2460 from audiotools.text import ERR_AIFF_HEADER_IOERROR 2461 raise EncodingError(ERR_AIFF_HEADER_IOERROR) 2462 2463 try: 2464 # read everything from start of footer to end of footer 2465 r = BitstreamReader(BytesIO(footer), False) 2466 # skip initial footer pad byte 2467 if ssnd_chunk_size % 2: 2468 r.skip_bytes(1) 2469 footer_len -= 1 2470 2471 while footer_len: 2472 block_data = BitstreamRecorder(0) 2473 (chunk_id, chunk_size) = r.parse("4b 32u") 2474 2475 if (not frozenset(chunk_id).issubset( 2476 AiffAudio.PRINTABLE_ASCII)): 2477 # ensure chunk ID is valid 2478 from audiotools.text import ERR_AIFF_INVALID_CHUNK 2479 raise EncodingError(ERR_AIFF_INVALID_CHUNK) 2480 elif chunk_id == b"COMM": 2481 # multiple "COMM" chunks is an error 2482 from audiotools.text import ERR_AIFF_MULTIPLE_COMM_CHUNKS 2483 raise EncodingError(ERR_AIFF_MULTIPLE_COMM_CHUNKS) 2484 elif chunk_id == b"SSND": 2485 # multiple "SSND" chunks is an error 2486 from audiotools.text import ERR_AIFF_MULTIPLE_SSND_CHUNKS 2487 raise EncodingError(ERR_AIFF_MULTIPLE_SSND_CHUNKS) 2488 else: 2489 footer_len -= format_byte_size("4b 32u") 2490 block_data.build("4b 32u", (chunk_id, chunk_size)) 2491 2492 if chunk_size % 2: 2493 # transfer padded chunk to APPLICATION block 2494 block_data.write_bytes(r.read_bytes(chunk_size + 1)) 2495 footer_len -= (chunk_size + 1) 2496 else: 2497 # transfer un-padded chunk to APPLICATION block 2498 block_data.write_bytes(r.read_bytes(chunk_size)) 2499 footer_len -= chunk_size 2500 2501 blocks.append(Flac_APPLICATION(b"aiff", block_data.data())) 2502 except IOError: 2503 from audiotools.text import ERR_AIFF_FOOTER_IOERROR 2504 raise EncodingError(ERR_AIFF_FOOTER_IOERROR) 2505 2506 counter = CounterPCMReader(pcmreader) 2507 2508 # perform standard FLAC encode from PCMReader 2509 flac = cls.from_pcm(filename, counter, compression) 2510 2511 ssnd_bytes_written = counter.bytes_written() 2512 2513 # ensure processed PCM data equals size of "SSND" chunk 2514 if ssnd_bytes_written != ssnd_chunk_size: 2515 cls.__unlink__(filename) 2516 from audiotools.text import ERR_AIFF_TRUNCATED_SSND_CHUNK 2517 raise EncodingError(ERR_AIFF_TRUNCATED_SSND_CHUNK) 2518 2519 # ensure total size of header + PCM + footer matches aiff's header 2520 if (len(header) + ssnd_bytes_written + len(footer)) != total_size: 2521 cls.__unlink__(filename) 2522 from audiotools.text import ERR_AIFF_INVALID_SIZE 2523 raise EncodingError(ERR_AIFF_INVALID_SIZE) 2524 2525 # add chunks as APPLICATION metadata blocks 2526 metadata = flac.get_metadata() 2527 if metadata is not None: 2528 for block in blocks: 2529 metadata.add_block(block) 2530 flac.update_metadata(metadata) 2531 2532 # return encoded FLAC file 2533 return flac 2534 2535 def convert(self, target_path, target_class, compression=None, 2536 progress=None): 2537 """encodes a new AudioFile from existing AudioFile 2538 2539 take a filename string, target class and optional compression string 2540 encodes a new AudioFile in the target class and returns 2541 the resulting object 2542 may raise EncodingError if some problem occurs during encoding""" 2543 2544 # If a FLAC has embedded RIFF *and* embedded AIFF chunks, 2545 # RIFF takes precedence if the target format supports both. 2546 # (it's hard to envision a scenario in which that would happen) 2547 2548 from audiotools import WaveAudio 2549 from audiotools import AiffAudio 2550 from audiotools import to_pcm_progress 2551 2552 if ((self.has_foreign_wave_chunks() and 2553 hasattr(target_class, "from_wave") and 2554 callable(target_class.from_wave))): 2555 return WaveContainer.convert(self, 2556 target_path, 2557 target_class, 2558 compression, 2559 progress) 2560 elif (self.has_foreign_aiff_chunks() and 2561 hasattr(target_class, "from_aiff") and 2562 callable(target_class.from_aiff)): 2563 return AiffContainer.convert(self, 2564 target_path, 2565 target_class, 2566 compression, 2567 progress) 2568 else: 2569 return target_class.from_pcm( 2570 target_path, 2571 to_pcm_progress(self, progress), 2572 compression, 2573 total_pcm_frames=self.total_frames()) 2574 2575 def bits_per_sample(self): 2576 """returns an integer number of bits-per-sample this track contains""" 2577 2578 return self.__bitspersample__ 2579 2580 def channels(self): 2581 """returns an integer number of channels this track contains""" 2582 2583 return self.__channels__ 2584 2585 def total_frames(self): 2586 """returns the total PCM frames of the track as an integer""" 2587 2588 return self.__total_frames__ 2589 2590 def sample_rate(self): 2591 """returns the rate of the track's audio as an integer number of Hz""" 2592 2593 return self.__samplerate__ 2594 2595 def __read_streaminfo__(self): 2596 valid_header_types = frozenset(range(0, 6 + 1)) 2597 with open(self.filename, "rb") as f: 2598 try: 2599 f.seek(-128, 2) 2600 if f.read(3) == b"TAG": 2601 self.__stream_suffix__ = 128 2602 else: 2603 self.__stream_suffix__ = 0 2604 except IOError: 2605 self.__stream_suffix__ = 0 2606 2607 f.seek(0, 0) 2608 self.__stream_offset__ = skip_id3v2_comment(f) 2609 f.read(4) 2610 2611 from audiotools.bitstream import BitstreamReader 2612 2613 reader = BitstreamReader(f, False) 2614 2615 stop = 0 2616 2617 while stop == 0: 2618 (stop, header_type, length) = reader.parse("1u 7u 24u") 2619 if header_type not in valid_header_types: 2620 from audiotools.text import ERR_FLAC_INVALID_BLOCK 2621 raise InvalidFLAC(ERR_FLAC_INVALID_BLOCK) 2622 elif header_type == 0: 2623 (self.__samplerate__, 2624 self.__channels__, 2625 self.__bitspersample__, 2626 self.__total_frames__, 2627 self.__md5__) = reader.parse("80p 20u 3u 5u 36U 16b") 2628 self.__channels__ += 1 2629 self.__bitspersample__ += 1 2630 break 2631 else: 2632 # though the STREAMINFO should always be first, 2633 # we'll be permissive and check them all if necessary 2634 reader.skip_bytes(length) 2635 2636 @classmethod 2637 def supports_replay_gain(cls): 2638 """returns True if this class supports ReplayGain""" 2639 2640 return True 2641 2642 def get_replay_gain(self): 2643 """returns a ReplayGain object of our ReplayGain values 2644 2645 returns None if we have no values""" 2646 2647 from audiotools import ReplayGain 2648 2649 try: 2650 vorbis_metadata = self.get_metadata().get_block( 2651 Flac_VORBISCOMMENT.BLOCK_ID) 2652 except (IndexError, IOError): 2653 return None 2654 2655 if ({u'REPLAYGAIN_TRACK_PEAK', u'REPLAYGAIN_TRACK_GAIN', 2656 u'REPLAYGAIN_ALBUM_PEAK', u'REPLAYGAIN_ALBUM_GAIN'}.issubset( 2657 [key.upper() for key in vorbis_metadata.keys()])): 2658 # we have ReplayGain data 2659 try: 2660 return ReplayGain( 2661 vorbis_metadata[u'REPLAYGAIN_TRACK_GAIN'][0][0:-len(" dB")], 2662 vorbis_metadata[u'REPLAYGAIN_TRACK_PEAK'][0], 2663 vorbis_metadata[u'REPLAYGAIN_ALBUM_GAIN'][0][0:-len(" dB")], 2664 vorbis_metadata[u'REPLAYGAIN_ALBUM_PEAK'][0]) 2665 except ValueError: 2666 return None 2667 else: 2668 return None 2669 2670 def set_replay_gain(self, replaygain): 2671 """given a ReplayGain object, sets the track's gain to those values 2672 2673 may raise IOError if unable to modify the file""" 2674 2675 if replaygain is None: 2676 return self.delete_replay_gain() 2677 2678 metadata = self.get_metadata() 2679 2680 if metadata.has_block(Flac_VORBISCOMMENT.BLOCK_ID): 2681 vorbis_comment = metadata.get_block(Flac_VORBISCOMMENT.BLOCK_ID) 2682 else: 2683 from audiotools import VERSION 2684 2685 vorbis_comment = Flac_VORBISCOMMENT( 2686 [], u"Python Audio Tools %s" % (VERSION)) 2687 metadata.add_block(vorbis_comment) 2688 2689 vorbis_comment[u"REPLAYGAIN_TRACK_GAIN"] = [ 2690 u"%1.2f dB" % (replaygain.track_gain)] 2691 vorbis_comment[u"REPLAYGAIN_TRACK_PEAK"] = [ 2692 u"%1.8f" % (replaygain.track_peak)] 2693 vorbis_comment[u"REPLAYGAIN_ALBUM_GAIN"] = [ 2694 u"%1.2f dB" % (replaygain.album_gain)] 2695 vorbis_comment[u"REPLAYGAIN_ALBUM_PEAK"] = [ 2696 u"%1.8f" % (replaygain.album_peak)] 2697 vorbis_comment[u"REPLAYGAIN_REFERENCE_LOUDNESS"] = [u"89.0 dB"] 2698 2699 self.update_metadata(metadata) 2700 2701 def delete_replay_gain(self): 2702 """removes ReplayGain values from file, if any 2703 2704 may raise IOError if unable to modify the file""" 2705 2706 metadata = self.get_metadata() 2707 2708 if metadata.has_block(Flac_VORBISCOMMENT.BLOCK_ID): 2709 vorbis_comment = metadata.get_block(Flac_VORBISCOMMENT.BLOCK_ID) 2710 2711 for field in [u"REPLAYGAIN_TRACK_GAIN", 2712 u"REPLAYGAIN_TRACK_PEAK", 2713 u"REPLAYGAIN_ALBUM_GAIN", 2714 u"REPLAYGAIN_ALBUM_PEAK", 2715 u"REPLAYGAIN_REFERENCE_LOUDNESS"]: 2716 try: 2717 del(vorbis_comment[field]) 2718 except KeyError: 2719 pass 2720 2721 self.update_metadata(metadata) 2722 2723 def clean(self, output_filename=None): 2724 """cleans the file of known data and metadata problems 2725 2726 output_filename is an optional filename of the fixed file 2727 if present, a new AudioFile is written to that path 2728 otherwise, only a dry-run is performed and no new file is written 2729 2730 return list of fixes performed as Unicode strings 2731 2732 raises IOError if unable to write the file or its metadata 2733 raises ValueError if the file has errors of some sort 2734 """ 2735 2736 import os.path 2737 2738 def seektable_valid(seektable, metadata_offset, input_file): 2739 from audiotools.bitstream import BitstreamReader 2740 reader = BitstreamReader(input_file, False) 2741 2742 for (pcm_frame_offset, 2743 seekpoint_offset, 2744 pcm_frame_count) in seektable.seekpoints: 2745 reader.seek(seekpoint_offset + metadata_offset) 2746 try: 2747 (sync_code, 2748 reserved1, 2749 reserved2) = reader.parse( 2750 "14u 1u 1p 4p 4p 4p 3p 1u") 2751 if (((sync_code != 0x3FFE) or 2752 (reserved1 != 0) or 2753 (reserved2 != 0))): 2754 return False 2755 except IOError: 2756 return False 2757 else: 2758 return True 2759 2760 fixes_performed = [] 2761 with open(self.filename, "rb") as input_f: 2762 # remove ID3 tags from before and after FLAC stream 2763 stream_size = os.path.getsize(self.filename) 2764 2765 stream_offset = skip_id3v2_comment(input_f) 2766 if stream_offset > 0: 2767 from audiotools.text import CLEAN_FLAC_REMOVE_ID3V2 2768 fixes_performed.append(CLEAN_FLAC_REMOVE_ID3V2) 2769 stream_size -= stream_offset 2770 2771 try: 2772 input_f.seek(-128, 2) 2773 if input_f.read(3) == b'TAG': 2774 from audiotools.text import CLEAN_FLAC_REMOVE_ID3V1 2775 fixes_performed.append(CLEAN_FLAC_REMOVE_ID3V1) 2776 stream_size -= 128 2777 except IOError: 2778 # file isn't 128 bytes long 2779 pass 2780 2781 if output_filename is not None: 2782 with open(output_filename, "wb") as output_f: 2783 input_f.seek(stream_offset, 0) 2784 while stream_size > 0: 2785 s = input_f.read(4096) 2786 if len(s) > stream_size: 2787 s = s[0:stream_size] 2788 output_f.write(s) 2789 stream_size -= len(s) 2790 2791 output_track = self.__class__(output_filename) 2792 2793 metadata = self.get_metadata() 2794 metadata_size = metadata.size() 2795 2796 # fix empty MD5SUM 2797 if self.__md5__ == b"\x00" * 16: 2798 from hashlib import md5 2799 from audiotools import transfer_framelist_data 2800 2801 md5sum = md5() 2802 transfer_framelist_data( 2803 self.to_pcm(), 2804 md5sum.update, 2805 signed=True, 2806 big_endian=False) 2807 metadata.get_block( 2808 Flac_STREAMINFO.BLOCK_ID).md5sum = md5sum.digest() 2809 from audiotools.text import CLEAN_FLAC_POPULATE_MD5 2810 fixes_performed.append(CLEAN_FLAC_POPULATE_MD5) 2811 2812 # fix missing WAVEFORMATEXTENSIBLE_CHANNEL_MASK 2813 if (((self.channels() > 2) or 2814 (self.bits_per_sample() > 16))): 2815 from audiotools.text import CLEAN_FLAC_ADD_CHANNELMASK 2816 2817 try: 2818 vorbis_comment = metadata.get_block( 2819 Flac_VORBISCOMMENT.BLOCK_ID) 2820 except IndexError: 2821 from audiotools import VERSION 2822 2823 vorbis_comment = Flac_VORBISCOMMENT( 2824 [], u"Python Audio Tools %s" % (VERSION)) 2825 2826 if ((u"WAVEFORMATEXTENSIBLE_CHANNEL_MASK" not in 2827 vorbis_comment.keys())): 2828 fixes_performed.append(CLEAN_FLAC_ADD_CHANNELMASK) 2829 vorbis_comment[ 2830 u"WAVEFORMATEXTENSIBLE_CHANNEL_MASK"] = \ 2831 [u"0x%.4X" % (int(self.channel_mask()))] 2832 2833 metadata.replace_blocks( 2834 Flac_VORBISCOMMENT.BLOCK_ID, 2835 [vorbis_comment]) 2836 2837 if metadata.has_block(Flac_SEEKTABLE.BLOCK_ID): 2838 # fix an invalid SEEKTABLE, if necessary 2839 if (not seektable_valid( 2840 metadata.get_block(Flac_SEEKTABLE.BLOCK_ID), 2841 stream_offset + 4 + metadata_size, 2842 input_f)): 2843 from audiotools.text import CLEAN_FLAC_FIX_SEEKTABLE 2844 2845 fixes_performed.append(CLEAN_FLAC_FIX_SEEKTABLE) 2846 2847 metadata.replace_blocks(Flac_SEEKTABLE.BLOCK_ID, 2848 [self.seektable()]) 2849 else: 2850 # add SEEKTABLE block if not present 2851 from audiotools.text import CLEAN_FLAC_ADD_SEEKTABLE 2852 2853 fixes_performed.append(CLEAN_FLAC_ADD_SEEKTABLE) 2854 2855 metadata.add_block(self.seektable()) 2856 2857 # fix remaining metadata problems 2858 # which automatically shifts STREAMINFO to the right place 2859 # (the message indicating the fix has already been output) 2860 (metadata, metadata_fixes) = metadata.clean() 2861 if output_filename is not None: 2862 output_track.update_metadata(metadata) 2863 2864 return fixes_performed + metadata_fixes 2865 2866 2867class OggFlacMetaData(FlacMetaData): 2868 @classmethod 2869 def converted(cls, metadata): 2870 """takes a MetaData object and returns an OggFlacMetaData object""" 2871 2872 if metadata is None: 2873 return None 2874 elif isinstance(metadata, FlacMetaData): 2875 return cls([block.copy() for block in metadata.block_list]) 2876 else: 2877 return cls([Flac_VORBISCOMMENT.converted(metadata)] + 2878 [Flac_PICTURE.converted(image) 2879 for image in metadata.images()]) 2880 2881 def __repr__(self): 2882 return ("OggFlacMetaData(%s)" % (repr(self.block_list))) 2883 2884 @classmethod 2885 def parse(cls, packetreader): 2886 """returns an OggFlacMetaData object from the given ogg.PacketReader 2887 2888 raises IOError or ValueError if an error occurs reading MetaData""" 2889 2890 from io import BytesIO 2891 from audiotools.bitstream import BitstreamReader, parse 2892 2893 streaminfo = None 2894 applications = [] 2895 seektable = None 2896 vorbis_comment = None 2897 cuesheet = None 2898 pictures = [] 2899 2900 (packet_byte, 2901 ogg_signature, 2902 major_version, 2903 minor_version, 2904 header_packets, 2905 flac_signature, 2906 block_type, 2907 block_length, 2908 minimum_block_size, 2909 maximum_block_size, 2910 minimum_frame_size, 2911 maximum_frame_size, 2912 sample_rate, 2913 channels, 2914 bits_per_sample, 2915 total_samples, 2916 md5sum) = parse( 2917 "8u 4b 8u 8u 16u 4b 8u 24u 16u 16u 24u 24u 20u 3u 5u 36U 16b", 2918 False, 2919 packetreader.read_packet()) 2920 2921 block_list = [Flac_STREAMINFO(minimum_block_size=minimum_block_size, 2922 maximum_block_size=maximum_block_size, 2923 minimum_frame_size=minimum_frame_size, 2924 maximum_frame_size=maximum_frame_size, 2925 sample_rate=sample_rate, 2926 channels=channels + 1, 2927 bits_per_sample=bits_per_sample + 1, 2928 total_samples=total_samples, 2929 md5sum=md5sum)] 2930 2931 for i in range(header_packets): 2932 packet = BitstreamReader(BytesIO(packetreader.read_packet()), 2933 False) 2934 (block_type, length) = packet.parse("1p 7u 24u") 2935 if block_type == 1: # PADDING 2936 block_list.append(Flac_PADDING.parse(packet, length)) 2937 if block_type == 2: # APPLICATION 2938 block_list.append(Flac_APPLICATION.parse(packet, length)) 2939 elif block_type == 3: # SEEKTABLE 2940 block_list.append(Flac_SEEKTABLE.parse(packet, length // 18)) 2941 elif block_type == 4: # VORBIS_COMMENT 2942 block_list.append(Flac_VORBISCOMMENT.parse(packet)) 2943 elif block_type == 5: # CUESHEET 2944 block_list.append(Flac_CUESHEET.parse(packet)) 2945 elif block_type == 6: # PICTURE 2946 block_list.append(Flac_PICTURE.parse(packet)) 2947 elif (block_type >= 7) and (block_type <= 126): 2948 from audiotools.text import ERR_FLAC_RESERVED_BLOCK 2949 raise ValueError(ERR_FLAC_RESERVED_BLOCK % (block_type)) 2950 elif block_type == 127: 2951 from audiotools.text import ERR_FLAC_INVALID_BLOCK 2952 raise ValueError(ERR_FLAC_INVALID_BLOCK) 2953 2954 return cls(block_list) 2955 2956 def build(self, pagewriter, serial_number): 2957 """pagewriter is an ogg.PageWriter object 2958 2959 returns new sequence number""" 2960 2961 from audiotools.bitstream import build, BitstreamRecorder, format_size 2962 from audiotools.ogg import packet_to_pages 2963 2964 # build extended Ogg FLAC STREAMINFO block 2965 # which will always occupy its own page 2966 streaminfo = self.get_block(Flac_STREAMINFO.BLOCK_ID) 2967 2968 # all our non-STREAMINFO blocks that are small enough 2969 # to fit in the output stream 2970 valid_blocks = [b for b in self.blocks() 2971 if ((b.BLOCK_ID != Flac_STREAMINFO.BLOCK_ID) and 2972 (b.size() < (2 ** 24)))] 2973 2974 page = next(packet_to_pages( 2975 build("8u 4b 8u 8u 16u " + 2976 "4b 8u 24u 16u 16u 24u 24u 20u 3u 5u 36U 16b", 2977 False, 2978 (0x7F, 2979 b"FLAC", 2980 1, 2981 0, 2982 len(valid_blocks), 2983 b"fLaC", 2984 0, 2985 format_size("16u 16u 24u 24u 20u 3u 5u 36U 16b") // 8, 2986 streaminfo.minimum_block_size, 2987 streaminfo.maximum_block_size, 2988 streaminfo.minimum_frame_size, 2989 streaminfo.maximum_frame_size, 2990 streaminfo.sample_rate, 2991 streaminfo.channels - 1, 2992 streaminfo.bits_per_sample - 1, 2993 streaminfo.total_samples, 2994 streaminfo.md5sum)), 2995 bitstream_serial_number=serial_number, 2996 starting_sequence_number=0)) 2997 2998 page.stream_beginning = True 2999 pagewriter.write(page) 3000 3001 sequence_number = 1 3002 3003 # pack remaining metadata blocks into Ogg packets 3004 for (i, block) in enumerate(valid_blocks, 1): 3005 packet = BitstreamRecorder(False) 3006 packet.build("1u 7u 24u", 3007 (0 if not (i == len(valid_blocks)) else 1, 3008 block.BLOCK_ID, block.size())) 3009 block.build(packet) 3010 for page in packet_to_pages( 3011 packet.data(), 3012 bitstream_serial_number=serial_number, 3013 starting_sequence_number=sequence_number): 3014 pagewriter.write(page) 3015 sequence_number += 1 3016 3017 return sequence_number 3018 3019 3020class OggFlacAudio(FlacAudio): 3021 """a Free Lossless Audio Codec file inside an Ogg container""" 3022 3023 from audiotools.text import (COMP_FLAC_0, COMP_FLAC_8) 3024 3025 SUFFIX = "oga" 3026 NAME = SUFFIX 3027 DESCRIPTION = u"Ogg FLAC" 3028 DEFAULT_COMPRESSION = "8" 3029 COMPRESSION_MODES = tuple(map(str, range(0, 9))) 3030 COMPRESSION_DESCRIPTIONS = {"0": COMP_FLAC_0, 3031 "8": COMP_FLAC_8} 3032 BINARIES = ("flac",) 3033 BINARY_URLS = {"flac": "http://flac.sourceforge.net"} 3034 3035 METADATA_CLASS = OggFlacMetaData 3036 3037 def __init__(self, filename): 3038 """filename is a plain string""" 3039 3040 AudioFile.__init__(self, filename) 3041 self.__samplerate__ = 0 3042 self.__channels__ = 0 3043 self.__bitspersample__ = 0 3044 self.__total_frames__ = 0 3045 3046 try: 3047 self.__read_streaminfo__() 3048 except IOError as msg: 3049 raise InvalidFLAC(str(msg)) 3050 3051 def bits_per_sample(self): 3052 """returns an integer number of bits-per-sample this track contains""" 3053 3054 return self.__bitspersample__ 3055 3056 def channels(self): 3057 """returns an integer number of channels this track contains""" 3058 3059 return self.__channels__ 3060 3061 def total_frames(self): 3062 """returns the total PCM frames of the track as an integer""" 3063 3064 return self.__total_frames__ 3065 3066 def sample_rate(self): 3067 """returns the rate of the track's audio as an integer number of Hz""" 3068 3069 return self.__samplerate__ 3070 3071 @classmethod 3072 def supports_metadata(cls): 3073 """returns True if this audio type supports MetaData""" 3074 3075 return True 3076 3077 def get_metadata(self): 3078 """returns a MetaData object, or None 3079 3080 raise ValueError if some error reading metadata 3081 raises IOError if unable to read the file""" 3082 3083 from audiotools.ogg import PacketReader, PageReader 3084 3085 try: 3086 with open(self.filename, "rb") as f: 3087 return OggFlacMetaData.parse(PacketReader(PageReader(f))) 3088 except ValueError: 3089 return None 3090 3091 def update_metadata(self, metadata): 3092 """takes this track's current MetaData object 3093 as returned by get_metadata() and sets this track's metadata 3094 with any fields updated in that object 3095 3096 raises IOError if unable to write the file 3097 """ 3098 3099 import os 3100 from audiotools.ogg import (PageReader, PacketReader, PageWriter) 3101 from audiotools import TemporaryFile 3102 3103 if metadata is None: 3104 return None 3105 elif not isinstance(metadata, OggFlacMetaData): 3106 from audiotools.text import ERR_FOREIGN_METADATA 3107 raise ValueError(ERR_FOREIGN_METADATA) 3108 elif not os.access(self.filename, os.W_OK): 3109 raise IOError(self.filename) 3110 3111 # always overwrite Ogg FLAC with fresh metadata 3112 # 3113 # The trouble with Ogg FLAC padding is that Ogg header overhead 3114 # requires a variable amount of overhead bytes per Ogg page 3115 # which makes it very difficult to calculate how many 3116 # bytes to allocate to the PADDING packet. 3117 # We'd have to build a bunch of empty pages for padding 3118 # then go back and fill-in the initial padding page's length 3119 # field before re-checksumming it. 3120 3121 original_ogg = PageReader(open(self.filename, "rb")) 3122 new_ogg = PageWriter(TemporaryFile(self.filename)) 3123 3124 # skip the metadata packets in the original file 3125 OggFlacMetaData.parse(PacketReader(original_ogg)) 3126 3127 # write our new comment blocks to the new file 3128 sequence_number = metadata.build(new_ogg, self.__serial_number__) 3129 3130 # transfer the remaining pages from the original file to the new file 3131 page = original_ogg.read() 3132 page.sequence_number = sequence_number 3133 sequence_number += 1 3134 new_ogg.write(page) 3135 while not page.stream_end: 3136 page = original_ogg.read() 3137 page.sequence_number = sequence_number 3138 sequence_number += 1 3139 new_ogg.write(page) 3140 3141 original_ogg.close() 3142 new_ogg.close() 3143 3144 def __read_streaminfo__(self): 3145 from audiotools.bitstream import BitstreamReader 3146 3147 with BitstreamReader(open(self.filename, "rb"), True) as ogg_reader: 3148 (magic_number, 3149 version, 3150 header_type, 3151 granule_position, 3152 self.__serial_number__, 3153 page_sequence_number, 3154 checksum, 3155 segment_count) = ogg_reader.parse("4b 8u 8u 64S 32u 32u 32u 8u") 3156 3157 if magic_number != b'OggS': 3158 from audiotools.text import ERR_OGG_INVALID_MAGIC_NUMBER 3159 raise InvalidFLAC(ERR_OGG_INVALID_MAGIC_NUMBER) 3160 if version != 0: 3161 from audiotools.text import ERR_OGG_INVALID_VERSION 3162 raise InvalidFLAC(ERR_OGG_INVALID_VERSION) 3163 3164 segment_length = ogg_reader.read(8) 3165 3166 ogg_reader.set_endianness(0) 3167 3168 (packet_byte, 3169 ogg_signature, 3170 major_version, 3171 minor_version, 3172 self.__header_packets__, 3173 flac_signature, 3174 block_type, 3175 block_length, 3176 minimum_block_size, 3177 maximum_block_size, 3178 minimum_frame_size, 3179 maximum_frame_size, 3180 self.__samplerate__, 3181 self.__channels__, 3182 self.__bitspersample__, 3183 self.__total_frames__, 3184 self.__md5__) = ogg_reader.parse( 3185 "8u 4b 8u 8u 16u 4b 8u 24u 16u 16u 24u 24u 20u 3u 5u 36U 16b") 3186 3187 if packet_byte != 0x7F: 3188 from audiotools.text import ERR_OGGFLAC_INVALID_PACKET_BYTE 3189 raise InvalidFLAC(ERR_OGGFLAC_INVALID_PACKET_BYTE) 3190 if ogg_signature != b'FLAC': 3191 from audiotools.text import ERR_OGGFLAC_INVALID_OGG_SIGNATURE 3192 raise InvalidFLAC(ERR_OGGFLAC_INVALID_OGG_SIGNATURE) 3193 if major_version != 1: 3194 from audiotools.text import ERR_OGGFLAC_INVALID_MAJOR_VERSION 3195 raise InvalidFLAC(ERR_OGGFLAC_INVALID_MAJOR_VERSION) 3196 if minor_version != 0: 3197 from audiotools.text import ERR_OGGFLAC_INVALID_MINOR_VERSION 3198 raise InvalidFLAC(ERR_OGGFLAC_INVALID_MINOR_VERSION) 3199 if flac_signature != b'fLaC': 3200 from audiotools.text import ERR_OGGFLAC_VALID_FLAC_SIGNATURE 3201 raise InvalidFLAC(ERR_OGGFLAC_VALID_FLAC_SIGNATURE) 3202 3203 self.__channels__ += 1 3204 self.__bitspersample__ += 1 3205 3206 def to_pcm(self): 3207 """returns a PCMReader object containing the track's PCM data""" 3208 3209 from audiotools import decoders 3210 from audiotools import PCMReaderError 3211 3212 try: 3213 return decoders.OggFlacDecoder(self.filename, 3214 self.channel_mask()) 3215 except (IOError, ValueError) as msg: 3216 # The only time this is likely to occur is 3217 # if the Ogg FLAC is modified between when OggFlacAudio 3218 # is initialized and when to_pcm() is called. 3219 return PCMReaderError(error_message=str(msg), 3220 sample_rate=self.sample_rate(), 3221 channels=self.channels(), 3222 channel_mask=int(self.channel_mask()), 3223 bits_per_sample=self.bits_per_sample()) 3224 3225 @classmethod 3226 def from_pcm(cls, filename, pcmreader, 3227 compression=None, 3228 total_pcm_frames=None): 3229 """encodes a new file from PCM data 3230 3231 takes a filename string, PCMReader object, 3232 optional compression level string and 3233 optional total_pcm_frames integer 3234 encodes a new audio file from pcmreader's data 3235 at the given filename with the specified compression level 3236 and returns a new OggFlacAudio object""" 3237 3238 from audiotools import BIN 3239 from audiotools import transfer_framelist_data 3240 from audiotools import ignore_sigint 3241 from audiotools import EncodingError 3242 from audiotools import DecodingError 3243 from audiotools import UnsupportedChannelCount 3244 from audiotools import __default_quality__ 3245 from audiotools import CounterPCMReader 3246 import subprocess 3247 import os 3248 3249 SUBSTREAM_SAMPLE_RATES = {8000, 16000, 22050, 24000, 32000, 3250 44100, 48000, 96000} 3251 SUBSTREAM_BITS = {8, 12, 16, 20, 24} 3252 3253 if ((compression is None) or (compression not in 3254 cls.COMPRESSION_MODES)): 3255 compression = __default_quality__(cls.NAME) 3256 3257 if (((pcmreader.sample_rate in SUBSTREAM_SAMPLE_RATES) and 3258 (pcmreader.bits_per_sample in SUBSTREAM_BITS))): 3259 lax = [] 3260 else: 3261 lax = ["--lax"] 3262 3263 if pcmreader.channels > 8: 3264 raise UnsupportedChannelCount(filename, pcmreader.channels) 3265 3266 if pcmreader.channel_mask == 0: 3267 if pcmreader.channels <= 6: 3268 channel_mask = {1: 0x0004, 3269 2: 0x0003, 3270 3: 0x0007, 3271 4: 0x0033, 3272 5: 0x0037, 3273 6: 0x003F}[pcmreader.channels] 3274 else: 3275 channel_mask = 0 3276 elif (pcmreader.channel_mask in 3277 (0x0001, # 1ch - mono 3278 0x0004, # 1ch - mono 3279 0x0003, # 2ch - left, right 3280 0x0007, # 3ch - left, right, center 3281 0x0033, # 4ch - left, right, back left, back right 3282 0x0603, # 4ch - left, right, side left, side right 3283 0x0037, # 5ch - L, R, C, back left, back right 3284 0x0607, # 5ch - L, R, C, side left, side right 3285 0x003F, # 6ch - L, R, C, LFE, back left, back right 3286 0x060F)): # 6ch - L, R, C, LFE, side left, side right 3287 channel_mask = pcmreader.channel_mask 3288 else: 3289 from audiotools import UnsupportedChannelMask 3290 3291 raise UnsupportedChannelMask(filename, pcmreader.channel_mask) 3292 3293 if total_pcm_frames is not None: 3294 pcmreader = CounterPCMReader(pcmreader) 3295 3296 devnull = open(os.devnull, 'wb') 3297 3298 sub = subprocess.Popen([BIN['flac']] + lax + 3299 ["-s", "-f", "-%s" % (compression), 3300 "-V", "--ogg", 3301 "--endian=little", 3302 "--channels=%d" % (pcmreader.channels), 3303 "--bps=%d" % (pcmreader.bits_per_sample), 3304 "--sample-rate=%d" % (pcmreader.sample_rate), 3305 "--sign=signed", 3306 "--force-raw-format", 3307 "-o", filename, "-"], 3308 stdin=subprocess.PIPE, 3309 stdout=devnull, 3310 stderr=devnull, 3311 preexec_fn=ignore_sigint) 3312 3313 try: 3314 transfer_framelist_data(pcmreader, sub.stdin.write) 3315 except (ValueError, IOError) as err: 3316 try: 3317 sub.stdin.close() 3318 except: 3319 pass 3320 sub.wait() 3321 cls.__unlink__(filename) 3322 raise EncodingError(str(err)) 3323 except Exception: 3324 try: 3325 sub.stdin.close() 3326 except: 3327 pass 3328 sub.wait() 3329 cls.__unlink__(filename) 3330 raise 3331 finally: 3332 devnull.close() 3333 3334 sub.stdin.close() 3335 3336 if ((total_pcm_frames is not None) and 3337 (total_pcm_frames != pcmreader.frames_written)): 3338 from audiotools.text import ERR_TOTAL_PCM_FRAMES_MISMATCH 3339 cls.__unlink__(filename) 3340 raise EncodingError(ERR_TOTAL_PCM_FRAMES_MISMATCH) 3341 3342 if sub.wait() == 0: 3343 oggflac = OggFlacAudio(filename) 3344 if ((((pcmreader.channels > 2) or 3345 (pcmreader.bits_per_sample > 16)) and 3346 (channel_mask != 0))): 3347 metadata = oggflac.get_metadata() 3348 vorbis = metadata.get_block(Flac_VORBISCOMMENT.BLOCK_ID) 3349 vorbis[u"WAVEFORMATEXTENSIBLE_CHANNEL_MASK"] = [ 3350 u"0x%.4X" % (channel_mask)] 3351 oggflac.update_metadata(metadata) 3352 return oggflac 3353 else: 3354 raise EncodingError(u"error encoding file with flac") 3355 3356 def clean(self, output_filename): 3357 """cleans the file of known data and metadata problems 3358 3359 output_filename is an optional filename of the fixed file 3360 if present, a new AudioFile is written to that path 3361 otherwise, only a dry-run is performed and no new file is written 3362 3363 return list of fixes performed as Unicode strings 3364 3365 raises IOError if unable to write the file or its metadata 3366 raises ValueError if the file has errors of some sort 3367 """ 3368 3369 import os.path 3370 3371 fixes_performed = [] 3372 with open(self.filename, "rb") as input_f: 3373 # remove ID3 tags from before and after FLAC stream 3374 stream_size = os.path.getsize(self.filename) 3375 3376 stream_offset = skip_id3v2_comment(input_f) 3377 if stream_offset > 0: 3378 from audiotools.text import CLEAN_FLAC_REMOVE_ID3V2 3379 fixes_performed.append(CLEAN_FLAC_REMOVE_ID3V2) 3380 stream_size -= stream_offset 3381 3382 try: 3383 input_f.seek(-128, 2) 3384 if input_f.read(3) == b'TAG': 3385 from audiotools.text import CLEAN_FLAC_REMOVE_ID3V1 3386 fixes_performed.append(CLEAN_FLAC_REMOVE_ID3V1) 3387 stream_size -= 128 3388 except IOError: 3389 # file isn't 128 bytes long 3390 pass 3391 3392 if output_filename is not None: 3393 with open(output_filename, "wb") as output_f: 3394 input_f.seek(stream_offset, 0) 3395 while stream_size > 0: 3396 s = input_f.read(4096) 3397 if len(s) > stream_size: 3398 s = s[0:stream_size] 3399 output_f.write(s) 3400 stream_size -= len(s) 3401 3402 output_track = self.__class__(output_filename) 3403 3404 metadata = self.get_metadata() 3405 metadata_size = metadata.size() 3406 3407 # fix empty MD5SUM 3408 if self.__md5__ == b"\x00" * 16: 3409 from hashlib import md5 3410 from audiotools import transfer_framelist_data 3411 3412 md5sum = md5() 3413 transfer_framelist_data( 3414 self.to_pcm(), 3415 md5sum.update, 3416 signed=True, 3417 big_endian=False) 3418 metadata.get_block( 3419 Flac_STREAMINFO.BLOCK_ID).md5sum = md5sum.digest() 3420 from audiotools.text import CLEAN_FLAC_POPULATE_MD5 3421 fixes_performed.append(CLEAN_FLAC_POPULATE_MD5) 3422 3423 # fix missing WAVEFORMATEXTENSIBLE_CHANNEL_MASK 3424 if (((self.channels() > 2) or 3425 (self.bits_per_sample() > 16))): 3426 from audiotools.text import CLEAN_FLAC_ADD_CHANNELMASK 3427 3428 try: 3429 vorbis_comment = metadata.get_block( 3430 Flac_VORBISCOMMENT.BLOCK_ID) 3431 except IndexError: 3432 from audiotools import VERSION 3433 3434 vorbis_comment = Flac_VORBISCOMMENT( 3435 [], u"Python Audio Tools %s" % (VERSION)) 3436 3437 if ((u"WAVEFORMATEXTENSIBLE_CHANNEL_MASK" not in 3438 vorbis_comment.keys())): 3439 fixes_performed.append(CLEAN_FLAC_ADD_CHANNELMASK) 3440 vorbis_comment[ 3441 u"WAVEFORMATEXTENSIBLE_CHANNEL_MASK"] = \ 3442 [u"0x%.4X" % (int(self.channel_mask()))] 3443 3444 metadata.replace_blocks( 3445 Flac_VORBISCOMMENT.BLOCK_ID, 3446 [vorbis_comment]) 3447 3448 # fix remaining metadata problems 3449 # which automatically shifts STREAMINFO to the right place 3450 # (the message indicating the fix has already been output) 3451 (metadata, metadata_fixes) = metadata.clean() 3452 if output_filename is not None: 3453 output_track.update_metadata(metadata) 3454 3455 return fixes_performed + metadata_fixes 3456