1# Created By: Virgil Dupras
2# Created On: 2004/12/10
3# Copyright 2010 Hardcoded Software (http://www.hardcoded.net)
4
5# This software is licensed under the "BSD" License as described in the "LICENSE" file,
6# which should be included with this package. The terms are also available at
7# http://www.hardcoded.net/licenses/bsd_license
8
9from . import id3v1
10from . import id3v2
11import struct
12from struct import unpack
13
14from .util import tryint, FileOrPath
15
16HEADER_SIZE = 4
17
18ID_MPEG1  = 3
19ID_MPEG2  = 2
20ID_MPEG25 = 0
21
22ID_LAYER1 = 3
23ID_LAYER2 = 2
24ID_LAYER3 = 1
25
26MPEG_SYNC = 0xffe00000  # 11 bits set
27MPEG_PAD  = 0x200  # pad flag mask (pos 20)
28
29BR_NULL   = (0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
30
31BR_M1_L1  = (0, 32, 64, 96, 128, 160, 192, 224, 256, 288, 320, 352, 384, 416, 448, 0)
32BR_M1_L2  = (0, 32, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256, 320, 384, 0)
33BR_M1_L3  = (0, 32, 40, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256, 320, 0)
34BR_M2_L1  = (0, 32, 48, 56, 64, 80, 96, 112, 128, 144, 160, 176, 192, 224, 256, 0)
35BR_M2_L23 = (0, 8, 16, 24, 32, 40, 48, 56, 64, 80, 96, 112, 128, 144, 160, 0)
36
37BR_M1 = (BR_NULL, BR_M1_L3, BR_M1_L2, BR_M1_L1)
38BR_M2 = (BR_NULL, BR_M2_L23, BR_M2_L23, BR_M2_L1)
39BR_NULLS = (BR_NULL, BR_NULL, BR_NULL, BR_NULL)
40
41BR_LIST = (BR_M2, BR_NULLS, BR_M2, BR_M1)
42
43SR_NULL = (0, 0, 0, 0)
44
45SR_M1  = (44100, 48000, 32000, 0)
46SR_M2  = (22050, 24000, 16000, 0)
47SR_M25 = (11025, 12000, 8000, 0)
48
49SR_LIST = (SR_M25, SR_NULL, SR_M2, SR_M1)
50
51SPF_NULL = (0, 0, 0, 0)
52
53SPF_M1 = (0, 1152, 1152, 384)
54SPF_M2 = (0, 576, 1152, 384)
55
56SPF_LIST = (SPF_M2, SPF_NULL, SPF_M2, SPF_M1)
57
58MPEG_CM_STEREO = 0
59MPEG_CM_JOINT_STEREO = 1
60MPEG_CM_DUAL_CHANNEL = 2
61MPEG_CM_MONO = 3
62MPEG_CM_UNKNOWN = 4
63
64MAX_SEEK_BYTES = 4096
65
66
67def get_vbr_offset(version, channel_mode):
68    # Depending on mpeg version and mode, the VBR header will be at a different offset
69    # after the mpeg header.
70    if version == ID_MPEG1:
71        if channel_mode == MPEG_CM_MONO:
72            return 17
73        else:
74            return 32
75    else:
76        if channel_mode == MPEG_CM_MONO:
77            return 9
78        else:
79            return 17
80
81
82def get_vbr_coefficient(version, layer):
83    if version == ID_MPEG1:
84        if layer == ID_LAYER1:
85            return 48
86        else:
87            return 144
88    else:
89        if layer == ID_LAYER1:
90            return 24
91        elif layer == ID_LAYER2:
92            return 144
93        else:
94            return 72
95
96
97class MpegFrameHeader:
98    def __init__(self, data):
99        # data = HEADER_SIZE bytes integer
100        self.valid = False
101        self.mpeg_id = 0
102        self.layer = 0
103        self.channel_mode = MPEG_CM_UNKNOWN
104        self.bitrate = 0
105        self.sample_rate = 0
106        self.sample_count = 0
107        self.padding_size = 0
108        self.size = 0
109        if (data & MPEG_SYNC) == MPEG_SYNC:
110            self.valid = True
111            self.mpeg_id = (data >> 19) & 0x3
112            self.layer = (data >> 17) & 0x3
113            self.channel_mode = (data >> 6) & 0x3
114            br_id = (data >> 12) & 0xf
115            fr_id = (data >> 10) & 0x3
116            self.bitrate = BR_LIST[self.mpeg_id][self.layer][br_id]
117            self.sample_rate = SR_LIST[self.mpeg_id][fr_id]
118            self.sample_count = SPF_LIST[self.mpeg_id][self.layer]
119            if data & MPEG_PAD:
120                self.padding_size = (4 if self.layer == ID_LAYER1 else 1)
121            else:
122                self.padding_size = 0
123            if self.sample_count and self.bitrate and self.sample_rate:
124                sc = self.sample_count
125                sr = self.sample_rate
126                br = self.bitrate
127                pad = self.padding_size
128                self.size = (((sc // 8) * br * 1000) // sr) + pad
129            else:
130                self.valid = False
131
132
133class XingHeader:
134    def __init__(self, data):  # data is a 128 bytes str
135        self.valid = data[:4] == b'Xing'
136        self.frames = unpack('!I', data[8:12])[0]
137        self.size = unpack('!I', data[12:16])[0]
138        self.scale = data[119]
139
140
141class FhgHeader:
142    def __init__(self, data):
143        self.valid = data[:4] == b'VBRI'
144        self.frames = unpack('!I', data[14:18])[0]
145        self.size = unpack('!I', data[10:14])[0]
146        self.scale = unpack('B', data[9:10])[0]
147
148
149class ComputedVBRHeader:
150    def __init__(self, frame_browser):
151        self.valid = True
152        self.frames, self.size = frame_browser.stats()
153
154
155class FrameBrowser:
156    def __init__(self, fp):
157        self.fp = fp
158        self.frame_index = 0
159        if not self._read():
160            self._seek()
161        self.initial_position = self.position
162
163    def _read(self):
164        self.position = tryint(self.fp.tell())
165        data = self.fp.read(HEADER_SIZE)
166        try:
167            self.frame = MpegFrameHeader(unpack("!I", data)[0])
168        except struct.error:
169            self.frame = MpegFrameHeader(0)
170        return self.frame.valid
171
172    def _seek(self):
173        # A mpeg header is 11 set bits. Which means that there is a \xff char followed by a char
174        # that is \xe0 or more.
175        self.fp.seek(self.position, 0)
176        data = self.fp.read(MAX_SEEK_BYTES)
177        tag_index = data.find(id3v2.ID_ID3)
178        if tag_index > -1:
179            self.fp.seek(self.position + tag_index, 0)
180            h = id3v2.Header(self.fp)
181            if h.valid:
182                self.position += tag_index + h.tagsize
183                return self._seek()
184        index = data.find(b'\xff')
185        while (index > -1):
186            try:
187                result = MpegFrameHeader(unpack('!I', data[index:index + HEADER_SIZE])[0])
188                if result.valid:
189                    nextindex = index + result.size
190                    try:
191                        next = MpegFrameHeader(unpack('!I', data[nextindex:nextindex + HEADER_SIZE])[0])
192                        if next.valid:
193                            self.position += index
194                            self.frame = result
195                            return True
196                    except struct.error:
197                        pass
198                index = data.find(b'\xff', index + 1)
199            except struct.error:
200                index = -1
201        return False
202
203    def first(self):
204        self.fp.seek(self.initial_position, 0)
205        self.frame_index = 0
206        self._read()
207        return self.frame
208
209    def __next__(self):
210        if self.frame.valid:
211            self.fp.seek(self.position + self.frame.size, 0)
212            self._read()
213            self.frame_index += 1
214        return self.frame
215
216    def stats(self):
217        """Iterates over all frames and return (frame_count, total_size)"""
218        self.first()
219        size = self.frame.size
220        while next(self).valid:
221            size += self.frame.size
222        return (self.frame_index, size)
223
224
225def get_vbr_info(fp, b):
226    fheader = b.frame
227    vbr_offset = get_vbr_offset(fheader.mpeg_id, fheader.channel_mode)
228    fp.seek(vbr_offset + 4, 1)
229    vbr_id = fp.read(4)
230    fp.seek(-4, 1)
231    if vbr_id == b'Xing':
232        return XingHeader(fp.read(128))
233    if vbr_id == b'VBRI':
234        return FhgHeader(fp.read(18))
235    br = b.frame.bitrate
236    for i in range(4):
237        if next(b).bitrate != br:
238            return ComputedVBRHeader(b)
239
240
241class Mpeg:
242    '''The class used to handle MPEG metadata.
243
244    :param infile: The file object or path to process.
245
246    :ivar int ~mpeg.Mpeg.audio_offset: The offset, in bytes, at which audio data starts in the file.
247    :ivar int ~mpeg.Mpeg.duration: The duration of the audio file (in whole seconds).
248    :ivar ~mpeg.Mpeg.id3v1: The ID3 version 1 metadata, if present.
249    :vartype id3v1: :class:`hsaudiotag.id3v1.Id3v1`
250    :ivar ~mpeg.Mpeg.id3v2: The ID3 version 2 metadata, if present.
251    :vartype id3v2: :class:`hsaudiotag.id3v2.Id3v2`
252    :ivar int ~mpeg.Mpeg.size: The size of the file, in bytes.
253    :ivar ComputedVBRHeader ~mpeg.Mpeg.vbr:
254    :ivar bool ~mpeg.Mpeg.valid: Whether the file could correctly be read or not.
255    '''
256    def __init__(self, infile):
257        with FileOrPath(infile) as fp:
258            self.id3v1 = id3v1.Id3v1(fp)
259            self.id3v2 = id3v2.Id3v2(fp)
260            if self.id3v2.exists and (self.id3v2.position == id3v2.POS_BEGIN):
261                start_offset = self.id3v2.size
262            else:
263                start_offset = 0
264            fp.seek(start_offset, 0)
265            b = FrameBrowser(fp)
266            self._frameheader = b.frame
267            self.audio_offset = b.position
268            fp.seek(b.position, 0)  # Needed for VBR seeking
269            self.vbr = get_vbr_info(fp, b)
270            fp.seek(0, 2)
271            self.size = tryint(fp.tell())
272            if self.bitrate:
273                # (audio_size * 8) / (bitrate * 1000) == audio_size / (bitrate * 125)
274                self.duration = self.audio_size // (self.bitrate * 125)
275                # 'and self.id3v2.duration' is there to avoid reading the mpeg frames when there is no TLEN in the tag.
276                if self.id3v2.exists and self.id3v2.duration and (self.id3v2.duration != self.duration):
277                    # Tag duration and guessed durations are wrong. Read all frames
278                    frames, size = b.stats()
279                    self.duration = size // (self.bitrate * 125)
280            else:
281                self.duration = 0
282            self.valid = self._frameheader.valid
283
284    # --- Properties
285    @property
286    def tag(self):
287        '''The :class:`hsaudiotag.id3v2.Id3v2` or :class:`hsaudiotag.id3v1.Id3v1`
288        metadata object associated with the MPEG file.
289        '''
290        if self.id3v2.exists:
291            return self.id3v2
292        elif self.id3v1.exists:
293            return self.id3v1
294
295    @property
296    def audio_size(self):
297        '''The size of the audio part of the file in bytes.'''
298        result = self.size - self.id3v1.size - self.audio_offset
299        if self.id3v2.position == id3v2.POS_END:
300            result -= self.id3v2.size
301        return result
302
303    @property
304    def bitrate(self):
305        '''The bitrate of the audio file.'''
306        if self.vbr and (self.vbr.frames > 0):
307            coeff = get_vbr_coefficient(self._frameheader.mpeg_id, self._frameheader.layer)
308            pad = self._frameheader.padding_size
309            sr = self._frameheader.sample_rate
310            size_per_frame = self.vbr.size // self.vbr.frames
311            return ((size_per_frame - pad) * sr) // (coeff * 1000)
312        else:
313            return self._frameheader.bitrate
314
315    @property
316    def sample_rate(self):
317        '''The sample rate of the audio file.'''
318        return self._frameheader.sample_rate
319