1# Created By: Virgil Dupras 2# Created On: 2004/12/10 3# Copyright 2010 Hardcoded Software (http://www.hardcoded.net) 4 5# This software is licensed under the "BSD" License as described in the "LICENSE" file, 6# which should be included with this package. The terms are also available at 7# http://www.hardcoded.net/licenses/bsd_license 8 9from . import id3v1 10from . import id3v2 11import struct 12from struct import unpack 13 14from .util import tryint, FileOrPath 15 16HEADER_SIZE = 4 17 18ID_MPEG1 = 3 19ID_MPEG2 = 2 20ID_MPEG25 = 0 21 22ID_LAYER1 = 3 23ID_LAYER2 = 2 24ID_LAYER3 = 1 25 26MPEG_SYNC = 0xffe00000 # 11 bits set 27MPEG_PAD = 0x200 # pad flag mask (pos 20) 28 29BR_NULL = (0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) 30 31BR_M1_L1 = (0, 32, 64, 96, 128, 160, 192, 224, 256, 288, 320, 352, 384, 416, 448, 0) 32BR_M1_L2 = (0, 32, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256, 320, 384, 0) 33BR_M1_L3 = (0, 32, 40, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256, 320, 0) 34BR_M2_L1 = (0, 32, 48, 56, 64, 80, 96, 112, 128, 144, 160, 176, 192, 224, 256, 0) 35BR_M2_L23 = (0, 8, 16, 24, 32, 40, 48, 56, 64, 80, 96, 112, 128, 144, 160, 0) 36 37BR_M1 = (BR_NULL, BR_M1_L3, BR_M1_L2, BR_M1_L1) 38BR_M2 = (BR_NULL, BR_M2_L23, BR_M2_L23, BR_M2_L1) 39BR_NULLS = (BR_NULL, BR_NULL, BR_NULL, BR_NULL) 40 41BR_LIST = (BR_M2, BR_NULLS, BR_M2, BR_M1) 42 43SR_NULL = (0, 0, 0, 0) 44 45SR_M1 = (44100, 48000, 32000, 0) 46SR_M2 = (22050, 24000, 16000, 0) 47SR_M25 = (11025, 12000, 8000, 0) 48 49SR_LIST = (SR_M25, SR_NULL, SR_M2, SR_M1) 50 51SPF_NULL = (0, 0, 0, 0) 52 53SPF_M1 = (0, 1152, 1152, 384) 54SPF_M2 = (0, 576, 1152, 384) 55 56SPF_LIST = (SPF_M2, SPF_NULL, SPF_M2, SPF_M1) 57 58MPEG_CM_STEREO = 0 59MPEG_CM_JOINT_STEREO = 1 60MPEG_CM_DUAL_CHANNEL = 2 61MPEG_CM_MONO = 3 62MPEG_CM_UNKNOWN = 4 63 64MAX_SEEK_BYTES = 4096 65 66 67def get_vbr_offset(version, channel_mode): 68 # Depending on mpeg version and mode, the VBR header will be at a different offset 69 # after the mpeg header. 70 if version == ID_MPEG1: 71 if channel_mode == MPEG_CM_MONO: 72 return 17 73 else: 74 return 32 75 else: 76 if channel_mode == MPEG_CM_MONO: 77 return 9 78 else: 79 return 17 80 81 82def get_vbr_coefficient(version, layer): 83 if version == ID_MPEG1: 84 if layer == ID_LAYER1: 85 return 48 86 else: 87 return 144 88 else: 89 if layer == ID_LAYER1: 90 return 24 91 elif layer == ID_LAYER2: 92 return 144 93 else: 94 return 72 95 96 97class MpegFrameHeader: 98 def __init__(self, data): 99 # data = HEADER_SIZE bytes integer 100 self.valid = False 101 self.mpeg_id = 0 102 self.layer = 0 103 self.channel_mode = MPEG_CM_UNKNOWN 104 self.bitrate = 0 105 self.sample_rate = 0 106 self.sample_count = 0 107 self.padding_size = 0 108 self.size = 0 109 if (data & MPEG_SYNC) == MPEG_SYNC: 110 self.valid = True 111 self.mpeg_id = (data >> 19) & 0x3 112 self.layer = (data >> 17) & 0x3 113 self.channel_mode = (data >> 6) & 0x3 114 br_id = (data >> 12) & 0xf 115 fr_id = (data >> 10) & 0x3 116 self.bitrate = BR_LIST[self.mpeg_id][self.layer][br_id] 117 self.sample_rate = SR_LIST[self.mpeg_id][fr_id] 118 self.sample_count = SPF_LIST[self.mpeg_id][self.layer] 119 if data & MPEG_PAD: 120 self.padding_size = (4 if self.layer == ID_LAYER1 else 1) 121 else: 122 self.padding_size = 0 123 if self.sample_count and self.bitrate and self.sample_rate: 124 sc = self.sample_count 125 sr = self.sample_rate 126 br = self.bitrate 127 pad = self.padding_size 128 self.size = (((sc // 8) * br * 1000) // sr) + pad 129 else: 130 self.valid = False 131 132 133class XingHeader: 134 def __init__(self, data): # data is a 128 bytes str 135 self.valid = data[:4] == b'Xing' 136 self.frames = unpack('!I', data[8:12])[0] 137 self.size = unpack('!I', data[12:16])[0] 138 self.scale = data[119] 139 140 141class FhgHeader: 142 def __init__(self, data): 143 self.valid = data[:4] == b'VBRI' 144 self.frames = unpack('!I', data[14:18])[0] 145 self.size = unpack('!I', data[10:14])[0] 146 self.scale = unpack('B', data[9:10])[0] 147 148 149class ComputedVBRHeader: 150 def __init__(self, frame_browser): 151 self.valid = True 152 self.frames, self.size = frame_browser.stats() 153 154 155class FrameBrowser: 156 def __init__(self, fp): 157 self.fp = fp 158 self.frame_index = 0 159 if not self._read(): 160 self._seek() 161 self.initial_position = self.position 162 163 def _read(self): 164 self.position = tryint(self.fp.tell()) 165 data = self.fp.read(HEADER_SIZE) 166 try: 167 self.frame = MpegFrameHeader(unpack("!I", data)[0]) 168 except struct.error: 169 self.frame = MpegFrameHeader(0) 170 return self.frame.valid 171 172 def _seek(self): 173 # A mpeg header is 11 set bits. Which means that there is a \xff char followed by a char 174 # that is \xe0 or more. 175 self.fp.seek(self.position, 0) 176 data = self.fp.read(MAX_SEEK_BYTES) 177 tag_index = data.find(id3v2.ID_ID3) 178 if tag_index > -1: 179 self.fp.seek(self.position + tag_index, 0) 180 h = id3v2.Header(self.fp) 181 if h.valid: 182 self.position += tag_index + h.tagsize 183 return self._seek() 184 index = data.find(b'\xff') 185 while (index > -1): 186 try: 187 result = MpegFrameHeader(unpack('!I', data[index:index + HEADER_SIZE])[0]) 188 if result.valid: 189 nextindex = index + result.size 190 try: 191 next = MpegFrameHeader(unpack('!I', data[nextindex:nextindex + HEADER_SIZE])[0]) 192 if next.valid: 193 self.position += index 194 self.frame = result 195 return True 196 except struct.error: 197 pass 198 index = data.find(b'\xff', index + 1) 199 except struct.error: 200 index = -1 201 return False 202 203 def first(self): 204 self.fp.seek(self.initial_position, 0) 205 self.frame_index = 0 206 self._read() 207 return self.frame 208 209 def __next__(self): 210 if self.frame.valid: 211 self.fp.seek(self.position + self.frame.size, 0) 212 self._read() 213 self.frame_index += 1 214 return self.frame 215 216 def stats(self): 217 """Iterates over all frames and return (frame_count, total_size)""" 218 self.first() 219 size = self.frame.size 220 while next(self).valid: 221 size += self.frame.size 222 return (self.frame_index, size) 223 224 225def get_vbr_info(fp, b): 226 fheader = b.frame 227 vbr_offset = get_vbr_offset(fheader.mpeg_id, fheader.channel_mode) 228 fp.seek(vbr_offset + 4, 1) 229 vbr_id = fp.read(4) 230 fp.seek(-4, 1) 231 if vbr_id == b'Xing': 232 return XingHeader(fp.read(128)) 233 if vbr_id == b'VBRI': 234 return FhgHeader(fp.read(18)) 235 br = b.frame.bitrate 236 for i in range(4): 237 if next(b).bitrate != br: 238 return ComputedVBRHeader(b) 239 240 241class Mpeg: 242 '''The class used to handle MPEG metadata. 243 244 :param infile: The file object or path to process. 245 246 :ivar int ~mpeg.Mpeg.audio_offset: The offset, in bytes, at which audio data starts in the file. 247 :ivar int ~mpeg.Mpeg.duration: The duration of the audio file (in whole seconds). 248 :ivar ~mpeg.Mpeg.id3v1: The ID3 version 1 metadata, if present. 249 :vartype id3v1: :class:`hsaudiotag.id3v1.Id3v1` 250 :ivar ~mpeg.Mpeg.id3v2: The ID3 version 2 metadata, if present. 251 :vartype id3v2: :class:`hsaudiotag.id3v2.Id3v2` 252 :ivar int ~mpeg.Mpeg.size: The size of the file, in bytes. 253 :ivar ComputedVBRHeader ~mpeg.Mpeg.vbr: 254 :ivar bool ~mpeg.Mpeg.valid: Whether the file could correctly be read or not. 255 ''' 256 def __init__(self, infile): 257 with FileOrPath(infile) as fp: 258 self.id3v1 = id3v1.Id3v1(fp) 259 self.id3v2 = id3v2.Id3v2(fp) 260 if self.id3v2.exists and (self.id3v2.position == id3v2.POS_BEGIN): 261 start_offset = self.id3v2.size 262 else: 263 start_offset = 0 264 fp.seek(start_offset, 0) 265 b = FrameBrowser(fp) 266 self._frameheader = b.frame 267 self.audio_offset = b.position 268 fp.seek(b.position, 0) # Needed for VBR seeking 269 self.vbr = get_vbr_info(fp, b) 270 fp.seek(0, 2) 271 self.size = tryint(fp.tell()) 272 if self.bitrate: 273 # (audio_size * 8) / (bitrate * 1000) == audio_size / (bitrate * 125) 274 self.duration = self.audio_size // (self.bitrate * 125) 275 # 'and self.id3v2.duration' is there to avoid reading the mpeg frames when there is no TLEN in the tag. 276 if self.id3v2.exists and self.id3v2.duration and (self.id3v2.duration != self.duration): 277 # Tag duration and guessed durations are wrong. Read all frames 278 frames, size = b.stats() 279 self.duration = size // (self.bitrate * 125) 280 else: 281 self.duration = 0 282 self.valid = self._frameheader.valid 283 284 # --- Properties 285 @property 286 def tag(self): 287 '''The :class:`hsaudiotag.id3v2.Id3v2` or :class:`hsaudiotag.id3v1.Id3v1` 288 metadata object associated with the MPEG file. 289 ''' 290 if self.id3v2.exists: 291 return self.id3v2 292 elif self.id3v1.exists: 293 return self.id3v1 294 295 @property 296 def audio_size(self): 297 '''The size of the audio part of the file in bytes.''' 298 result = self.size - self.id3v1.size - self.audio_offset 299 if self.id3v2.position == id3v2.POS_END: 300 result -= self.id3v2.size 301 return result 302 303 @property 304 def bitrate(self): 305 '''The bitrate of the audio file.''' 306 if self.vbr and (self.vbr.frames > 0): 307 coeff = get_vbr_coefficient(self._frameheader.mpeg_id, self._frameheader.layer) 308 pad = self._frameheader.padding_size 309 sr = self._frameheader.sample_rate 310 size_per_frame = self.vbr.size // self.vbr.frames 311 return ((size_per_frame - pad) * sr) // (coeff * 1000) 312 else: 313 return self._frameheader.bitrate 314 315 @property 316 def sample_rate(self): 317 '''The sample rate of the audio file.''' 318 return self._frameheader.sample_rate 319