1#!/usr/bin/python
2
3# Audio Tools, a module and set of tools for manipulating audio data
4# Copyright (C) 2007-2014  Brian Langenberger
5
6# This program is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation; either version 2 of the License, or
9# (at your option) any later version.
10
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14# GNU General Public License for more details.
15
16# You should have received a copy of the GNU General Public License
17# along with this program; if not, write to the Free Software
18# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
19
20
21import sys
22from audiotools import (AudioFile, MetaData)
23
24
25# takes a pair of integers (or None) for the current and total values
26# returns a unicode string of their combined pair
27# for example, __number_pair__(2,3) returns u"2/3"
28# whereas      __number_pair__(4,0) returns u"4"
29def __number_pair__(current, total):
30    def empty(i):
31        return i is None
32
33    unslashed_format = u"%d"
34    slashed_format = u"%d/%d"
35
36    if empty(current) and empty(total):
37        return unslashed_format % (0,)
38    elif (not empty(current)) and empty(total):
39        return unslashed_format % (current,)
40    elif empty(current) and (not empty(total)):
41        return slashed_format % (0, total)
42    else:
43        # neither current or total are empty
44        return slashed_format % (current, total)
45
46
47def limited_transfer_data(from_function, to_function, max_bytes):
48    """transfers up to max_bytes from from_function to to_function
49    or as many bytes as from_function generates as strings"""
50
51    BUFFER_SIZE = 0x100000
52    s = from_function(BUFFER_SIZE)
53    while (len(s) > 0) and (max_bytes > 0):
54        if len(s) > max_bytes:
55            s = s[0:max_bytes]
56        to_function(s)
57        max_bytes -= len(s)
58        s = from_function(BUFFER_SIZE)
59
60
61class ApeTagItem(object):
62    """a single item in the ApeTag, typically a unicode value"""
63
64    FORMAT = "32u 1u 2u 29p"
65
66    def __init__(self, item_type, read_only, key, data):
67        """fields are as follows:
68
69        item_type is 0 = UTF-8, 1 = binary, 2 = external, 3 = reserved
70        read_only is 1 if the item is read only
71        key is a bytes object of the item's key
72        data is a bytes object of the data itself
73        """
74
75        self.type = item_type
76        self.read_only = read_only
77
78        assert(isinstance(key, bytes))
79        self.key = key
80        assert(isinstance(data, bytes))
81        self.data = data
82
83    def __eq__(self, item):
84        for attr in ["type", "read_only", "key", "data"]:
85            if ((not hasattr(item, attr)) or (getattr(self, attr) !=
86                                              getattr(item, attr))):
87                return False
88        else:
89            return True
90
91    def total_size(self):
92        """returns total size of item in bytes"""
93
94        return 4 + 4 + len(self.key) + 1 + len(self.data)
95
96    def copy(self):
97        """returns a duplicate ApeTagItem"""
98
99        return ApeTagItem(self.type,
100                          self.read_only,
101                          self.key,
102                          self.data)
103
104    def __repr__(self):
105        return "ApeTagItem(%s,%s,%s,%s)" % \
106            (repr(self.type),
107             repr(self.read_only),
108             repr(self.key),
109             repr(self.data))
110
111    def raw_info_pair(self):
112        """returns a human-readable key/value pair of item data"""
113
114        if self.type == 0:    # text
115            if self.read_only:
116                return (self.key.decode('ascii'),
117                        u"(read only) %s" % (self.data.decode('utf-8')))
118            else:
119                return (self.key.decode('ascii'), self.data.decode('utf-8'))
120        elif self.type == 1:  # binary
121            return (self.key.decode('ascii'),
122                    u"(binary) %d bytes" % (len(self.data)))
123        elif self.type == 2:  # external
124            return (self.key.decode('ascii'),
125                    u"(external) %d bytes" % (len(self.data)))
126        else:                   # reserved
127            return (self.key.decode('ascii'),
128                    u"(reserved) %d bytes" % (len(self.data)))
129
130    if sys.version_info[0] >= 3:
131        def __str__(self):
132            return self.__unicode__()
133    else:
134        def __str__(self):
135            return self.data
136
137    def __unicode__(self):
138        return self.data.rstrip(b"\x00").decode('utf-8', 'replace')
139
140    def number(self):
141        """returns the track/album_number portion of a slashed number pair"""
142
143        import re
144
145        unicode_value = self.__unicode__()
146
147        int_string = re.search(r'\d+', unicode_value)
148        if int_string is None:
149            return None
150
151        int_value = int(int_string.group(0))
152        if (int_value == 0) and (u"/" in unicode_value):
153            total_value = re.search(r'\d+',
154                                    unicode_value.split(u"/")[1])
155            if total_value is not None:
156                # don't return placeholder 0 value
157                # when a _total value is present
158                # but _number value is 0
159                return None
160            else:
161                return int_value
162        else:
163            return int_value
164
165    def total(self):
166        """returns the track/album_total portion of a slashed number pair"""
167
168        import re
169
170        unicode_value = self.__unicode__()
171
172        if u"/" not in unicode_value:
173            return None
174
175        int_string = re.search(r'\d+', unicode_value.split(u"/")[1])
176
177        if int_string is not None:
178            return int(int_string.group(0))
179        else:
180            return None
181
182    @classmethod
183    def parse(cls, reader):
184        """returns an ApeTagItem parsed from the given BitstreamReader"""
185
186        (item_value_length,
187         read_only,
188         encoding) = reader.parse(cls.FORMAT)
189
190        key = []
191        c = reader.read_bytes(1)
192        while c != b"\x00":
193            key.append(c)
194            c = reader.read_bytes(1)
195
196        value = reader.read_bytes(item_value_length)
197
198        return cls(encoding, read_only, b"".join(key), value)
199
200    def build(self, writer):
201        """writes the ApeTagItem values to the given BitstreamWriter"""
202
203        writer.build("%s %db 8u %db" % (self.FORMAT,
204                                        len(self.key),
205                                        len(self.data)),
206                     (len(self.data),
207                      self.read_only,
208                      self.type,
209                      self.key, 0, self.data))
210
211    @classmethod
212    def binary(cls, key, data):
213        """returns an ApeTagItem of binary data
214
215        key is an ASCII string, data is a binary string"""
216
217        return cls(1, 0, key, data)
218
219    @classmethod
220    def external(cls, key, data):
221        """returns an ApeTagItem of external data
222
223        key is an ASCII string, data is a binary string"""
224
225        return cls(2, 0, key, data)
226
227    @classmethod
228    def string(cls, key, data):
229        """returns an ApeTagItem of text data
230
231        key is a bytes object, data is a unicode string"""
232
233        assert(isinstance(key, bytes))
234        assert(isinstance(data,
235                          str if (sys.version_info[0] >= 3) else unicode))
236
237        return cls(0, 0, key, data.encode('utf-8', 'replace'))
238
239
240class ApeTag(MetaData):
241    """a complete APEv2 tag"""
242
243    HEADER_FORMAT = "8b 32u 32u 32u 1u 2u 26p 1u 1u 1u 64p"
244
245    ITEM = ApeTagItem
246
247    ATTRIBUTE_MAP = {'track_name': b'Title',
248                     'track_number': b'Track',
249                     'track_total': b'Track',
250                     'album_number': b'Media',
251                     'album_total': b'Media',
252                     'album_name': b'Album',
253                     'artist_name': b'Artist',
254                     # "Performer" is not a defined APEv2 key
255                     # it would be nice to have, yet would not be standard
256                     'performer_name': b'Performer',
257                     'composer_name': b'Composer',
258                     'conductor_name': b'Conductor',
259                     'ISRC': b'ISRC',
260                     'catalog': b'Catalog',
261                     'copyright': b'Copyright',
262                     'publisher': b'Publisher',
263                     'year': b'Year',
264                     'date': b'Record Date',
265                     'comment': b'Comment'}
266
267    INTEGER_ITEMS = (b'Track', b'Media')
268
269    def __init__(self, tags, contains_header=True, contains_footer=True):
270        """constructs an ApeTag from a list of ApeTagItem objects"""
271
272        for tag in tags:
273            assert(isinstance(tag, ApeTagItem))
274        MetaData.__setattr__(self, "tags", list(tags))
275        MetaData.__setattr__(self, "contains_header", contains_header)
276        MetaData.__setattr__(self, "contains_footer", contains_footer)
277
278    def __repr__(self):
279        return "ApeTag(%s, %s, %s)" % (repr(self.tags),
280                                       repr(self.contains_header),
281                                       repr(self.contains_footer))
282
283    def total_size(self):
284        """returns the minimum size of the total ApeTag, in bytes"""
285
286        size = 0
287        if self.contains_header:
288            size += 32
289        for tag in self.tags:
290            size += tag.total_size()
291        if self.contains_footer:
292            size += 32
293        return size
294
295    def __eq__(self, metadata):
296        if isinstance(metadata, ApeTag):
297            if set(self.keys()) != set(metadata.keys()):
298                return False
299
300            for tag in self.tags:
301                try:
302                    if tag.data != metadata[tag.key].data:
303                        return False
304                except KeyError:
305                    return False
306            else:
307                return True
308        elif isinstance(metadata, MetaData):
309            return MetaData.__eq__(self, metadata)
310        else:
311            return False
312
313    def keys(self):
314        return [tag.key for tag in self.tags]
315
316    def __contains__(self, key):
317        for tag in self.tags:
318            if tag.key == key:
319                return True
320        else:
321            return False
322
323    def __getitem__(self, key):
324        assert(isinstance(key, bytes))
325
326        for tag in self.tags:
327            if tag.key == key:
328                return tag
329        else:
330            raise KeyError(key)
331
332    def get(self, key, default):
333        assert(isinstance(key, bytes))
334
335        try:
336            return self[key]
337        except KeyError:
338            return default
339
340    def __setitem__(self, key, value):
341        assert(isinstance(key, bytes))
342
343        for i in range(len(self.tags)):
344            if self.tags[i].key == key:
345                self.tags[i] = value
346                return
347        else:
348            self.tags.append(value)
349
350    def index(self, key):
351        assert(isinstance(key, bytes))
352
353        for (i, tag) in enumerate(self.tags):
354            if tag.key == key:
355                return i
356        else:
357            raise ValueError(key)
358
359    def __delitem__(self, key):
360        assert(isinstance(key, bytes))
361
362        new_tags = [tag for tag in self.tags if tag.key != key]
363        if len(new_tags) < len(self.tags):
364            self.tags = new_tags
365        else:
366            raise KeyError(key)
367
368    def __getattr__(self, attr):
369        if attr in self.ATTRIBUTE_MAP:
370            try:
371                if attr in {'track_number', 'album_number'}:
372                    return self[self.ATTRIBUTE_MAP[attr]].number()
373                elif attr in {'track_total', 'album_total'}:
374                    return self[self.ATTRIBUTE_MAP[attr]].total()
375                else:
376                    return self[self.ATTRIBUTE_MAP[attr]].__unicode__()
377            except KeyError:
378                return None
379        elif attr in MetaData.FIELDS:
380            return None
381        else:
382            return MetaData.__getattribute__(self, attr)
383
384    # if an attribute is updated (e.g. self.track_name)
385    # make sure to update the corresponding dict pair
386    def __setattr__(self, attr, value):
387        def swap_number(unicode_value, new_number):
388            import re
389
390            return re.sub(r'\d+', u"%d" % (new_number), unicode_value, 1)
391
392        def swap_slashed_number(unicode_value, new_number):
393            if u"/" in unicode_value:
394                (first, second) = unicode_value.split(u"/", 1)
395                return u"/".join([first, swap_number(second, new_number)])
396            else:
397                return u"/".join([unicode_value, u"%d" % (new_number)])
398
399        if attr in self.ATTRIBUTE_MAP:
400            key = self.ATTRIBUTE_MAP[attr]
401            if value is not None:
402                if attr in {'track_number', 'album_number'}:
403                    try:
404                        current_value = self[key].__unicode__()
405                        self[key] = self.ITEM.string(
406                            key, swap_number(current_value, value))
407                    except KeyError:
408                        self[key] = self.ITEM.string(
409                            key, __number_pair__(value, None))
410                elif attr in {'track_total', 'album_total'}:
411                    try:
412                        current_value = self[key].__unicode__()
413                        self[key] = self.ITEM.string(
414                            key, swap_slashed_number(current_value, value))
415                    except KeyError:
416                        self[key] = self.ITEM.string(
417                            key, __number_pair__(None, value))
418                else:
419                    self[key] = self.ITEM.string(key, value)
420            else:
421                delattr(self, attr)
422        else:
423            MetaData.__setattr__(self, attr, value)
424
425    def __delattr__(self, attr):
426        import re
427
428        def zero_number(unicode_value):
429            return re.sub(r'\d+', u"0", unicode_value, 1)
430
431        if attr in self.ATTRIBUTE_MAP:
432            key = self.ATTRIBUTE_MAP[attr]
433
434            if attr in {'track_number', 'album_number'}:
435                try:
436                    tag = self[key]
437                    if tag.total() is None:
438                        # if no slashed _total field, delete entire tag
439                        del(self[key])
440                    else:
441                        # otherwise replace initial portion with 0
442                        self[key] = self.ITEM.string(
443                            key, zero_number(tag.__unicode__()))
444                except KeyError:
445                    # no tag to delete
446                    pass
447            elif attr in {'track_total', 'album_total'}:
448                try:
449                    tag = self[key]
450                    if tag.total() is not None:
451                        if tag.number() is not None:
452                            self[key] = self.ITEM.string(
453                                key,
454                                tag.__unicode__().split(u"/", 1)[0].rstrip())
455                        else:
456                            del(self[key])
457                    else:
458                        # no total portion, so nothing to do
459                        pass
460                except KeyError:
461                    # no tag to delete portion of
462                    pass
463            else:
464                try:
465                    del(self[key])
466                except KeyError:
467                    pass
468        elif attr in MetaData.FIELDS:
469            pass
470        else:
471            MetaData.__delattr__(self, attr)
472
473    @classmethod
474    def converted(cls, metadata):
475        """converts a MetaData object to an ApeTag object"""
476
477        if metadata is None:
478            return None
479        elif isinstance(metadata, ApeTag):
480            return ApeTag([tag.copy() for tag in metadata.tags],
481                          contains_header=metadata.contains_header,
482                          contains_footer=metadata.contains_footer)
483        else:
484            tags = cls([])
485
486            for (field, value) in metadata.filled_fields():
487                if field in cls.ATTRIBUTE_MAP.keys():
488                    setattr(tags, field, value)
489
490            for image in metadata.images():
491                tags.add_image(image)
492
493            return tags
494
495    def raw_info(self):
496        """returns the ApeTag as a human-readable unicode string"""
497
498        from os import linesep
499        from audiotools import output_table
500
501        # align tag values on the "=" sign
502        table = output_table()
503
504        for tag in self.tags:
505            row = table.row()
506            (key, value) = tag.raw_info_pair()
507            row.add_column(key, "right")
508            row.add_column(u" = ")
509            row.add_column(value)
510
511        return (u"APEv2:" + linesep + linesep.join(table.format()))
512
513    @classmethod
514    def supports_images(cls):
515        """returns True"""
516
517        return True
518
519    def __parse_image__(self, key, type):
520        from audiotools import Image
521        from io import BytesIO
522
523        data = BytesIO(self[key].data)
524        description = []
525        c = data.read(1)
526        while c != b'\x00':
527            description.append(c)
528            c = data.read(1)
529
530        return Image.new(data.read(),
531                         b"".join(description).decode('utf-8', 'replace'),
532                         type)
533
534    def add_image(self, image):
535        """embeds an Image object in this metadata"""
536
537        from audiotools import FRONT_COVER, BACK_COVER
538
539        if image.type == FRONT_COVER:
540            self[b'Cover Art (front)'] = self.ITEM.binary(
541                b'Cover Art (front)',
542                image.description.encode('utf-8', 'replace') +
543                b"\x00" +
544                image.data)
545        elif image.type == BACK_COVER:
546            self[b'Cover Art (back)'] = self.ITEM.binary(
547                b'Cover Art (back)',
548                image.description.encode('utf-8', 'replace') +
549                b"\x00" +
550                image.data)
551
552    def delete_image(self, image):
553        """deletes an Image object from this metadata"""
554
555        if (image.type == 0) and b'Cover Art (front)' in self.keys():
556            del(self[b'Cover Art (front)'])
557        elif (image.type == 1) and b'Cover Art (back)' in self.keys():
558            del(self[b'Cover Art (back)'])
559
560    def images(self):
561        """returns a list of embedded Image objects"""
562
563        from audiotools import FRONT_COVER, BACK_COVER
564
565        # APEv2 supports only one value per key
566        # so a single front and back cover are all that is possible
567        img = []
568        if b'Cover Art (front)' in self.keys():
569            img.append(self.__parse_image__(b'Cover Art (front)',
570                                            FRONT_COVER))
571        if b'Cover Art (back)' in self.keys():
572            img.append(self.__parse_image__(b'Cover Art (back)',
573                                            BACK_COVER))
574        return img
575
576    @classmethod
577    def read(cls, apefile):
578        """returns an ApeTag object from an APEv2 tagged file object
579
580        may return None if the file object has no tag"""
581
582        from audiotools.bitstream import BitstreamReader, parse
583
584        apefile.seek(-32, 2)
585        tag_footer = apefile.read(32)
586
587        if len(tag_footer) < 32:
588            # not enough bytes for an ApeV2 tag
589            return None
590
591        (preamble,
592         version,
593         tag_size,
594         item_count,
595         read_only,
596         item_encoding,
597         is_header,
598         no_footer,
599         has_header) = parse(cls.HEADER_FORMAT, True, tag_footer)
600
601        if (preamble != b"APETAGEX") or (version != 2000):
602            return None
603
604        apefile.seek(-tag_size, 2)
605        reader = BitstreamReader(apefile, True)
606
607        return cls([ApeTagItem.parse(reader) for i in range(item_count)],
608                   contains_header=has_header,
609                   contains_footer=True)
610
611    def build(self, writer):
612        """outputs an APEv2 tag to BitstreamWriter"""
613
614        tag_size = sum(tag.total_size() for tag in self.tags) + 32
615
616        if self.contains_header:
617            writer.build(ApeTag.HEADER_FORMAT,
618                         (b"APETAGEX",               # preamble
619                          2000,                      # version
620                          tag_size,                  # tag size
621                          len(self.tags),            # item count
622                          0,                         # read only
623                          0,                         # encoding
624                          1,                         # is header
625                          not self.contains_footer,  # no footer
626                          self.contains_header))     # has header
627
628        for tag in self.tags:
629            tag.build(writer)
630
631        if self.contains_footer:
632            writer.build(ApeTag.HEADER_FORMAT,
633                         (b"APETAGEX",               # preamble
634                          2000,                      # version
635                          tag_size,                  # tag size
636                          len(self.tags),            # item count
637                          0,                         # read only
638                          0,                         # encoding
639                          0,                         # is header
640                          not self.contains_footer,  # no footer
641                          self.contains_header))     # has header
642
643    def clean(self):
644        import re
645        from audiotools.text import (CLEAN_REMOVE_DUPLICATE_TAG,
646                                     CLEAN_REMOVE_TRAILING_WHITESPACE,
647                                     CLEAN_REMOVE_LEADING_WHITESPACE,
648                                     CLEAN_FIX_TAG_FORMATTING,
649                                     CLEAN_REMOVE_EMPTY_TAG)
650
651        fixes_performed = []
652        used_tags = set()
653        tag_items = []
654        for tag in self.tags:
655            if tag.key.upper() in used_tags:
656                fixes_performed.append(
657                    CLEAN_REMOVE_DUPLICATE_TAG %
658                    {"field": tag.key.decode('ascii')})
659            elif tag.type == 0:
660                used_tags.add(tag.key.upper())
661                text = tag.__unicode__()
662
663                # check trailing whitespace
664                fix1 = text.rstrip()
665                if fix1 != text:
666                    fixes_performed.append(
667                        CLEAN_REMOVE_TRAILING_WHITESPACE %
668                        {"field": tag.key.decode('ascii')})
669
670                # check leading whitespace
671                fix2 = fix1.lstrip()
672                if fix2 != fix1:
673                    fixes_performed.append(
674                        CLEAN_REMOVE_LEADING_WHITESPACE %
675                        {"field": tag.key.decode('ascii')})
676
677                if tag.key in self.INTEGER_ITEMS:
678                    if u"/" in fix2:
679                        # item is a slashed field of some sort
680                        (current, total) = fix2.split(u"/", 1)
681                        current_int = re.search(r'\d+', current)
682                        total_int = re.search(r'\d+', total)
683                        if (current_int is None) and (total_int is None):
684                            # neither side contains an integer value
685                            # so ignore it altogether
686                            fix3 = fix2
687                        elif ((current_int is not None) and
688                              (total_int is None)):
689                            fix3 = u"%d" % (int(current_int.group(0)))
690                        elif ((current_int is None) and
691                              (total_int is not None)):
692                            fix3 = u"%d/%d" % (0, int(total_int.group(0)))
693                        else:
694                            # both sides contain an int
695                            fix3 = u"%d/%d" % (int(current_int.group(0)),
696                                               int(total_int.group(0)))
697                    else:
698                        # item contains no slash
699                        current_int = re.search(r'\d+', fix2)
700                        if current_int is not None:
701                            # item contains an integer
702                            fix3 = u"%d" % (int(current_int.group(0)),)
703                        else:
704                            # item contains no integer value so ignore it
705                            # (although 'Track' should only contain
706                            # integers, 'Media' may contain strings
707                            # so it may be best to simply ignore that case)
708                            fix3 = fix2
709
710                    if fix3 != fix2:
711                        fixes_performed.append(
712                            CLEAN_FIX_TAG_FORMATTING %
713                            {"field": tag.key.decode('ascii')})
714                else:
715                    fix3 = fix2
716
717                if len(fix3) > 0:
718                    tag_items.append(ApeTagItem.string(tag.key, fix3))
719                else:
720                    fixes_performed.append(
721                        CLEAN_REMOVE_EMPTY_TAG %
722                        {"field": tag.key.decode('ascii')})
723            else:
724                used_tags.add(tag.key.upper())
725                tag_items.append(tag)
726
727        return (self.__class__(tag_items,
728                               self.contains_header,
729                               self.contains_footer),
730                fixes_performed)
731
732
733class ApeTaggedAudio(object):
734    """a class for handling audio formats with APEv2 tags
735
736    this class presumes there will be a filename attribute which
737    can be opened and checked for tags, or written if necessary"""
738
739    @classmethod
740    def supports_metadata(cls):
741        """returns True if this audio type supports MetaData"""
742
743        return True
744
745    def get_metadata(self):
746        """returns an ApeTag object, or None
747
748        raises IOError if unable to read the file"""
749
750        with open(self.filename, "rb") as f:
751            return ApeTag.read(f)
752
753    def update_metadata(self, metadata):
754        """takes this track's current MetaData object
755        as returned by get_metadata() and sets this track's metadata
756        with any fields updated in that object
757
758        raises IOError if unable to write the file
759        """
760
761        if metadata is None:
762            return
763        elif not isinstance(metadata, ApeTag):
764            from audiotools.text import ERR_FOREIGN_METADATA
765            raise ValueError(ERR_FOREIGN_METADATA)
766
767        from audiotools.bitstream import parse, BitstreamWriter
768        from audiotools import transfer_data
769
770        f = open(self.filename, "r+b")
771        f.seek(-32, 2)
772        tag_footer = f.read(32)
773
774        if len(tag_footer) < 32:
775            # no existing ApeTag can fit, so append fresh tag
776            f.close()
777            with BitstreamWriter(open(self.filename, "ab"), True) as writer:
778                metadata.build(writer)
779            return
780
781        (preamble,
782         version,
783         tag_size,
784         item_count,
785         read_only,
786         item_encoding,
787         is_header,
788         no_footer,
789         has_header) = parse(ApeTag.HEADER_FORMAT, True, tag_footer)
790
791        if (preamble == b'APETAGEX') and (version == 2000):
792            if has_header:
793                old_tag_size = 32 + tag_size
794            else:
795                old_tag_size = tag_size
796
797            if metadata.total_size() >= old_tag_size:
798                # metadata has grown
799                # so append it to existing file
800                f.seek(-old_tag_size, 2)
801                writer = BitstreamWriter(f, True)
802                metadata.build(writer)
803                writer.close()
804            else:
805                f.close()
806
807                # metadata has shrunk
808                # so rewrite file with smaller metadata
809                from audiotools import TemporaryFile
810                from os.path import getsize
811
812                # copy everything but the last "old_tag_size" bytes
813                # from existing file to rewritten file
814                new_apev2 = TemporaryFile(self.filename)
815
816                with open(self.filename, "rb") as old_apev2:
817                    limited_transfer_data(
818                        old_apev2.read,
819                        new_apev2.write,
820                        getsize(self.filename) - old_tag_size)
821
822                # append new tag to rewritten file
823                with BitstreamWriter(new_apev2, True) as writer:
824                    metadata.build(writer)
825                    # closing writer closes new_apev2 also
826        else:
827            # no existing metadata, so simply append a fresh tag
828            f.close()
829            with BitstreamWriter(open(self.filename, "ab"), True) as writer:
830                metadata.build(writer)
831
832    def set_metadata(self, metadata):
833        """takes a MetaData object and sets this track's metadata
834
835        raises IOError if unable to write the file"""
836
837        if metadata is None:
838            return self.delete_metadata()
839
840        from audiotools.bitstream import BitstreamWriter
841
842        old_metadata = self.get_metadata()
843        new_metadata = ApeTag.converted(metadata)
844
845        if old_metadata is not None:
846            # transfer ReplayGain tags from old metadata to new metadata
847            for tag in [b"replaygain_track_gain",
848                        b"replaygain_track_peak",
849                        b"replaygain_album_gain",
850                        b"replaygain_album_peak"]:
851                try:
852                    # if old_metadata has tag, shift it over
853                    new_metadata[tag] = old_metadata[tag]
854                except KeyError:
855                    try:
856                        # otherwise, if new_metadata has tag, delete it
857                        del(new_metadata[tag])
858                    except KeyError:
859                        # if neither has tag, ignore it
860                        continue
861
862            # transfer Cuesheet from old metadata to new metadata
863            if b"Cuesheet" in old_metadata:
864                new_metadata[b"Cuesheet"] = old_metadata[b"Cuesheet"]
865            elif b"Cuesheet" in new_metadata:
866                del(new_metadata[b"Cuesheet"])
867
868            self.update_metadata(new_metadata)
869        else:
870            # delete ReplayGain tags from new metadata
871            for tag in [b"replaygain_track_gain",
872                        b"replaygain_track_peak",
873                        b"replaygain_album_gain",
874                        b"replaygain_album_peak"]:
875                try:
876                    del(new_metadata[tag])
877                except KeyError:
878                    continue
879
880            # delete Cuesheet from new metadata
881            if b"Cuesheet" in new_metadata:
882                del(new_metadata[b"Cuesheet"])
883
884            # no existing metadata, so simply append a fresh tag
885            with BitstreamWriter(open(self.filename, "ab"), True) as writer:
886                new_metadata.build(writer)
887
888    def delete_metadata(self):
889        """deletes the track's MetaData
890
891        raises IOError if unable to write the file"""
892
893        if ((self.get_replay_gain() is not None) or
894            (self.get_cuesheet() is not None)):
895            # non-textual metadata is present and needs preserving
896            self.set_metadata(MetaData())
897        else:
898            # no non-textual metadata, so wipe out the entire block
899            from os import access, R_OK, W_OK
900            from audiotools.bitstream import BitstreamReader
901            from audiotools import transfer_data
902
903            if not access(self.filename, R_OK | W_OK):
904                raise IOError(self.filename)
905
906            with open(self.filename, "rb") as f:
907                f.seek(-32, 2)
908
909                (preamble,
910                 version,
911                 tag_size,
912                 item_count,
913                 read_only,
914                 item_encoding,
915                 is_header,
916                 no_footer,
917                 has_header) = BitstreamReader(f, True).parse(
918                    ApeTag.HEADER_FORMAT)
919
920            if (preamble == b'APETAGEX') and (version == 2000):
921                from audiotools import TemporaryFile
922                from os.path import getsize
923
924                # there's existing metadata to delete
925                # so rewrite file without trailing metadata tag
926                if has_header:
927                    old_tag_size = 32 + tag_size
928                else:
929                    old_tag_size = tag_size
930
931                # copy everything but the last "old_tag_size" bytes
932                # from existing file to rewritten file
933                new_apev2 = TemporaryFile(self.filename)
934                old_apev2 = open(self.filename, "rb")
935
936                limited_transfer_data(
937                    old_apev2.read,
938                    new_apev2.write,
939                    getsize(self.filename) - old_tag_size)
940
941                old_apev2.close()
942                new_apev2.close()
943
944
945class ApeGainedAudio(object):
946    @classmethod
947    def supports_replay_gain(cls):
948        """returns True if this class supports ReplayGain"""
949
950        return True
951
952    def get_replay_gain(self):
953        """returns a ReplayGain object of our ReplayGain values
954
955        returns None if we have no values"""
956
957        from audiotools import ReplayGain
958
959        metadata = self.get_metadata()
960        if metadata is None:
961            return None
962
963        if ({b'replaygain_track_gain', b'replaygain_track_peak',
964             b'replaygain_album_gain', b'replaygain_album_peak'}.issubset(
965                metadata.keys())):  # we have ReplayGain data
966            try:
967                return ReplayGain(
968                    metadata[
969                        b'replaygain_track_gain'].__unicode__()[0:-len(" dB")],
970                    metadata[
971                        b'replaygain_track_peak'].__unicode__(),
972                    metadata[
973                        b'replaygain_album_gain'].__unicode__()[0:-len(" dB")],
974                    metadata[
975                        b'replaygain_album_peak'].__unicode__())
976            except ValueError:
977                return None
978        else:
979            return None
980
981    def set_replay_gain(self, replaygain):
982        """given a ReplayGain object, sets the track's gain to those values
983
984        may raise IOError if unable to read or write the file"""
985
986        if replaygain is None:
987            return self.delete_replay_gain()
988
989        metadata = self.get_metadata()
990        if metadata is None:
991            metadata = ApeTag([])
992
993        metadata[b"replaygain_track_gain"] = ApeTagItem.string(
994            b"replaygain_track_gain",
995            u"%+1.2f dB" % (replaygain.track_gain))
996        metadata[b"replaygain_track_peak"] = ApeTagItem.string(
997            b"replaygain_track_peak",
998            u"%1.6f" % (replaygain.track_peak))
999        metadata[b"replaygain_album_gain"] = ApeTagItem.string(
1000            b"replaygain_album_gain",
1001            u"%+1.2f dB" % (replaygain.album_gain))
1002        metadata[b"replaygain_album_peak"] = ApeTagItem.string(
1003            b"replaygain_album_peak",
1004            u"%1.6f" % (replaygain.album_peak))
1005
1006        self.update_metadata(metadata)
1007
1008    def delete_replay_gain(self):
1009        """removes ReplayGain values from file, if any
1010
1011        may raise IOError if unable to modify the file"""
1012
1013        metadata = self.get_metadata()
1014        if metadata is not None:
1015            for field in [b"replaygain_track_gain",
1016                          b"replaygain_track_peak",
1017                          b"replaygain_album_gain",
1018                          b"replaygain_album_peak"]:
1019                try:
1020                    del(metadata[field])
1021                except KeyError:
1022                    pass
1023
1024            self.update_metadata(metadata)
1025
1026
1027class ApeAudio(ApeTaggedAudio, AudioFile):
1028    """a Monkey's Audio file"""
1029
1030    SUFFIX = "ape"
1031    NAME = SUFFIX
1032    DEFAULT_COMPRESSION = "5000"
1033    COMPRESSION_MODES = tuple([str(x * 1000) for x in range(1, 6)])
1034    BINARIES = ("mac",)
1035
1036    # FILE_HEAD = Con.Struct("ape_head",
1037    #                        Con.String('id', 4),
1038    #                        Con.ULInt16('version'))
1039
1040    # #version >= 3.98
1041    # APE_DESCRIPTOR = Con.Struct("ape_descriptor",
1042    #                             Con.ULInt16('padding'),
1043    #                             Con.ULInt32('descriptor_bytes'),
1044    #                             Con.ULInt32('header_bytes'),
1045    #                             Con.ULInt32('seektable_bytes'),
1046    #                             Con.ULInt32('header_data_bytes'),
1047    #                             Con.ULInt32('frame_data_bytes'),
1048    #                             Con.ULInt32('frame_data_bytes_high'),
1049    #                             Con.ULInt32('terminating_data_bytes'),
1050    #                             Con.String('md5', 16))
1051
1052    # APE_HEADER = Con.Struct("ape_header",
1053    #                         Con.ULInt16('compression_level'),
1054    #                         Con.ULInt16('format_flags'),
1055    #                         Con.ULInt32('blocks_per_frame'),
1056    #                         Con.ULInt32('final_frame_blocks'),
1057    #                         Con.ULInt32('total_frames'),
1058    #                         Con.ULInt16('bits_per_sample'),
1059    #                         Con.ULInt16('number_of_channels'),
1060    #                         Con.ULInt32('sample_rate'))
1061
1062    # #version <= 3.97
1063    # APE_HEADER_OLD = Con.Struct("ape_header_old",
1064    #                             Con.ULInt16('compression_level'),
1065    #                             Con.ULInt16('format_flags'),
1066    #                             Con.ULInt16('number_of_channels'),
1067    #                             Con.ULInt32('sample_rate'),
1068    #                             Con.ULInt32('header_bytes'),
1069    #                             Con.ULInt32('terminating_bytes'),
1070    #                             Con.ULInt32('total_frames'),
1071    #                             Con.ULInt32('final_frame_blocks'))
1072
1073    def __init__(self, filename):
1074        """filename is a plain string"""
1075
1076        AudioFile.__init__(self, filename)
1077
1078        (self.__samplespersec__,
1079         self.__channels__,
1080         self.__bitspersample__,
1081         self.__totalsamples__) = ApeAudio.__ape_info__(filename)
1082
1083    @classmethod
1084    def is_type(cls, file):
1085        """returns True if the given file object describes this format
1086
1087        takes a seekable file pointer rewound to the start of the file"""
1088
1089        return file.read(4) == "MAC "
1090
1091    def lossless(self):
1092        """returns True"""
1093
1094        return True
1095
1096    @classmethod
1097    def supports_foreign_riff_chunks(cls):
1098        """returns True"""
1099
1100        return True
1101
1102    def has_foreign_riff_chunks(self):
1103        """returns True"""
1104
1105        # FIXME - this isn't strictly true
1106        # I'll need a way to detect foreign chunks in APE's stream
1107        # without decoding it first,
1108        # but since I'm not supporting APE anyway, I'll take the lazy way out
1109        return True
1110
1111    def bits_per_sample(self):
1112        """returns an integer number of bits-per-sample this track contains"""
1113
1114        return self.__bitspersample__
1115
1116    def channels(self):
1117        """returns an integer number of channels this track contains"""
1118
1119        return self.__channels__
1120
1121    def total_frames(self):
1122        """returns the total PCM frames of the track as an integer"""
1123
1124        return self.__totalsamples__
1125
1126    def sample_rate(self):
1127        """returns the rate of the track's audio as an integer number of Hz"""
1128
1129        return self.__samplespersec__
1130
1131    @classmethod
1132    def __ape_info__(cls, filename):
1133        f = open(filename, 'rb')
1134        try:
1135            file_head = cls.FILE_HEAD.parse_stream(f)
1136
1137            if file_head.id != 'MAC ':
1138                from audiotools.text import ERR_APE_INVALID_HEADER
1139                raise InvalidFile(ERR_APE_INVALID_HEADER)
1140
1141            if file_head.version >= 3980:  # the latest APE file type
1142                descriptor = cls.APE_DESCRIPTOR.parse_stream(f)
1143                header = cls.APE_HEADER.parse_stream(f)
1144
1145                return (header.sample_rate,
1146                        header.number_of_channels,
1147                        header.bits_per_sample,
1148                        ((header.total_frames - 1) *
1149                         header.blocks_per_frame) +
1150                        header.final_frame_blocks)
1151            else:                           # old-style APE file (obsolete)
1152                header = cls.APE_HEADER_OLD.parse_stream(f)
1153
1154                if file_head.version >= 3950:
1155                    blocks_per_frame = 0x48000
1156                elif ((file_head.version >= 3900) or
1157                      ((file_head.version >= 3800) and
1158                       (header.compression_level == 4000))):
1159                    blocks_per_frame = 0x12000
1160                else:
1161                    blocks_per_frame = 0x2400
1162
1163                if header.format_flags & 0x01:
1164                    bits_per_sample = 8
1165                elif header.format_flags & 0x08:
1166                    bits_per_sample = 24
1167                else:
1168                    bits_per_sample = 16
1169
1170                return (header.sample_rate,
1171                        header.number_of_channels,
1172                        bits_per_sample,
1173                        ((header.total_frames - 1) *
1174                         blocks_per_frame) +
1175                        header.final_frame_blocks)
1176
1177        finally:
1178            f.close()
1179
1180    def to_wave(self, wave_filename):
1181        """writes the contents of this file to the given .wav filename string
1182
1183        raises EncodingError if some error occurs during decoding"""
1184
1185        from audiotools import BIN
1186        from audiotools import transfer_data
1187        import subprocess
1188        import os
1189
1190        if self.filename.endswith(".ape"):
1191            devnull = open(os.devnull, "wb")
1192            sub = subprocess.Popen([BIN['mac'],
1193                                    self.filename,
1194                                    wave_filename,
1195                                    '-d'],
1196                                   stdout=devnull,
1197                                   stderr=devnull)
1198            sub.wait()
1199            devnull.close()
1200        else:
1201            devnull = open(os.devnull, 'ab')
1202            import tempfile
1203            ape = tempfile.NamedTemporaryFile(suffix='.ape')
1204            f = open(self.filename, 'rb')
1205            transfer_data(f.read, ape.write)
1206            f.close()
1207            ape.flush()
1208            sub = subprocess.Popen([BIN['mac'],
1209                                    ape.name,
1210                                    wave_filename,
1211                                    '-d'],
1212                                   stdout=devnull,
1213                                   stderr=devnull)
1214            sub.wait()
1215            ape.close()
1216            devnull.close()
1217
1218    @classmethod
1219    def from_wave(cls, filename, wave_filename, compression=None):
1220        """encodes a new AudioFile from an existing .wav file
1221
1222        takes a filename string, wave_filename string
1223        of an existing WaveAudio file
1224        and an optional compression level string
1225        encodes a new audio file from the wave's data
1226        at the given filename with the specified compression level
1227        and returns a new ApeAudio object"""
1228
1229        from audiotools import BIN
1230        import subprocess
1231        import os
1232
1233        if str(compression) not in cls.COMPRESSION_MODES:
1234            compression = cls.DEFAULT_COMPRESSION
1235
1236        devnull = open(os.devnull, "wb")
1237        sub = subprocess.Popen([BIN['mac'],
1238                                wave_filename,
1239                                filename,
1240                                "-c%s" % (compression)],
1241                               stdout=devnull,
1242                               stderr=devnull)
1243        sub.wait()
1244        devnull.close()
1245        return ApeAudio(filename)
1246