1# -*- coding: utf-8 -*- 2# Copyright (C) 2017 Boris Pruessmann 3# 4# This program is free software; you can redistribute it and/or modify 5# it under the terms of the GNU General Public License as published by 6# the Free Software Foundation; either version 2 of the License, or 7# (at your option) any later version. 8 9"""Read and write DSF audio stream information and tags.""" 10 11 12import sys 13import struct 14 15from ._compat import cBytesIO, reraise, endswith 16 17from mutagen import FileType, StreamInfo 18from mutagen._util import cdata, MutagenError, loadfile, convert_error 19from mutagen.id3 import ID3 20from mutagen.id3._util import ID3NoHeaderError, error as ID3Error 21 22 23__all__ = ["DSF", "Open", "delete"] 24 25 26class error(MutagenError): 27 pass 28 29 30class DSFChunk(object): 31 """A generic chunk of a DSFFile.""" 32 33 chunk_offset = 0 34 chunk_header = " " 35 chunk_size = -1 36 37 def __init__(self, fileobj, create=False): 38 self.fileobj = fileobj 39 40 if not create: 41 self.chunk_offset = fileobj.tell() 42 self.load() 43 44 def load(self): 45 raise NotImplementedError 46 47 def write(self): 48 raise NotImplementedError 49 50 51class DSDChunk(DSFChunk): 52 """Represents the first chunk of a DSF file""" 53 54 CHUNK_SIZE = 28 55 56 total_size = 0 57 offset_metdata_chunk = 0 58 59 def __init__(self, fileobj, create=False): 60 super(DSDChunk, self).__init__(fileobj, create) 61 62 if create: 63 self.chunk_header = b"DSD " 64 self.chunk_size = DSDChunk.CHUNK_SIZE 65 66 def load(self): 67 data = self.fileobj.read(DSDChunk.CHUNK_SIZE) 68 if len(data) != DSDChunk.CHUNK_SIZE: 69 raise error("DSF chunk truncated") 70 71 self.chunk_header = data[0:4] 72 if self.chunk_header != b"DSD ": 73 raise error("DSF dsd header not found") 74 75 self.chunk_size = cdata.ulonglong_le(data[4:12]) 76 if self.chunk_size != DSDChunk.CHUNK_SIZE: 77 raise error("DSF dsd header size mismatch") 78 79 self.total_size = cdata.ulonglong_le(data[12:20]) 80 self.offset_metdata_chunk = cdata.ulonglong_le(data[20:28]) 81 82 def write(self): 83 f = cBytesIO() 84 f.write(self.chunk_header) 85 f.write(struct.pack("<Q", DSDChunk.CHUNK_SIZE)) 86 f.write(struct.pack("<Q", self.total_size)) 87 f.write(struct.pack("<Q", self.offset_metdata_chunk)) 88 89 self.fileobj.seek(self.chunk_offset) 90 self.fileobj.write(f.getvalue()) 91 92 def pprint(self): 93 return (u"DSD Chunk (Total file size = %d, " 94 u"Pointer to Metadata chunk = %d)" % ( 95 self.total_size, self.offset_metdata_chunk)) 96 97 98class FormatChunk(DSFChunk): 99 100 CHUNK_SIZE = 52 101 102 VERSION = 1 103 104 FORMAT_DSD_RAW = 0 105 """Format ID: DSD Raw""" 106 107 format_version = VERSION 108 format_id = FORMAT_DSD_RAW 109 channel_type = 1 110 channel_num = 1 111 sampling_frequency = 2822400 112 bits_per_sample = 1 113 sample_count = 0 114 block_size_per_channel = 4096 115 116 def __init__(self, fileobj, create=False): 117 super(FormatChunk, self).__init__(fileobj, create) 118 119 if create: 120 self.chunk_header = b"fmt " 121 self.chunk_size = FormatChunk.CHUNK_SIZE 122 123 def load(self): 124 data = self.fileobj.read(FormatChunk.CHUNK_SIZE) 125 if len(data) != FormatChunk.CHUNK_SIZE: 126 raise error("DSF chunk truncated") 127 128 self.chunk_header = data[0:4] 129 if self.chunk_header != b"fmt ": 130 raise error("DSF fmt header not found") 131 132 self.chunk_size = cdata.ulonglong_le(data[4:12]) 133 if self.chunk_size != FormatChunk.CHUNK_SIZE: 134 raise error("DSF dsd header size mismatch") 135 136 self.format_version = cdata.uint_le(data[12:16]) 137 if self.format_version != FormatChunk.VERSION: 138 raise error("Unsupported format version") 139 140 self.format_id = cdata.uint_le(data[16:20]) 141 if self.format_id != FormatChunk.FORMAT_DSD_RAW: 142 raise error("Unsupported format ID") 143 144 self.channel_type = cdata.uint_le(data[20:24]) 145 self.channel_num = cdata.uint_le(data[24:28]) 146 self.sampling_frequency = cdata.uint_le(data[28:32]) 147 self.bits_per_sample = cdata.uint_le(data[32:36]) 148 self.sample_count = cdata.ulonglong_le(data[36:44]) 149 150 def pprint(self): 151 return u"fmt Chunk (Channel Type = %d, Channel Num = %d, " \ 152 u"Sampling Frequency = %d, %.2f seconds)" % \ 153 (self.channel_type, self.channel_num, self.sampling_frequency, 154 self.length) 155 156 157class DataChunk(DSFChunk): 158 159 CHUNK_SIZE = 12 160 161 data = "" 162 163 def __init__(self, fileobj, create=False): 164 super(DataChunk, self).__init__(fileobj, create) 165 166 if create: 167 self.chunk_header = b"data" 168 self.chunk_size = DataChunk.CHUNK_SIZE 169 170 def load(self): 171 data = self.fileobj.read(DataChunk.CHUNK_SIZE) 172 if len(data) != DataChunk.CHUNK_SIZE: 173 raise error("DSF chunk truncated") 174 175 self.chunk_header = data[0:4] 176 if self.chunk_header != b"data": 177 raise error("DSF data header not found") 178 179 self.chunk_size = cdata.ulonglong_le(data[4:12]) 180 if self.chunk_size < DataChunk.CHUNK_SIZE: 181 raise error("DSF data header size mismatch") 182 183 def pprint(self): 184 return u"data Chunk (Chunk Offset = %d, Chunk Size = %d)" % ( 185 self.chunk_offset, self.chunk_size) 186 187 188class _DSFID3(ID3): 189 """A DSF file with ID3v2 tags""" 190 191 @convert_error(IOError, error) 192 def _pre_load_header(self, fileobj): 193 fileobj.seek(0) 194 id3_location = DSDChunk(fileobj).offset_metdata_chunk 195 if id3_location == 0: 196 raise ID3NoHeaderError("File has no existing ID3 tag") 197 198 fileobj.seek(id3_location) 199 200 @convert_error(IOError, error) 201 @loadfile(writable=True) 202 def save(self, filething=None, v2_version=4, v23_sep='/', padding=None): 203 """Save ID3v2 data to the DSF file""" 204 205 fileobj = filething.fileobj 206 fileobj.seek(0) 207 208 dsd_header = DSDChunk(fileobj) 209 if dsd_header.offset_metdata_chunk == 0: 210 # create a new ID3 chunk at the end of the file 211 fileobj.seek(0, 2) 212 213 # store reference to ID3 location 214 dsd_header.offset_metdata_chunk = fileobj.tell() 215 dsd_header.write() 216 217 try: 218 data = self._prepare_data( 219 fileobj, dsd_header.offset_metdata_chunk, self.size, 220 v2_version, v23_sep, padding) 221 except ID3Error as e: 222 reraise(error, e, sys.exc_info()[2]) 223 224 fileobj.seek(dsd_header.offset_metdata_chunk) 225 fileobj.write(data) 226 fileobj.truncate() 227 228 # Update total file size 229 dsd_header.total_size = fileobj.tell() 230 dsd_header.write() 231 232 233class DSFInfo(StreamInfo): 234 """DSF audio stream information. 235 236 Information is parsed from the fmt chunk of the DSF file. 237 238 Attributes: 239 length (`float`): audio length, in seconds. 240 channels (`int`): The number of audio channels. 241 sample_rate (`int`): 242 Sampling frequency, in Hz. 243 (2822400, 5644800, 11289600, or 22579200) 244 bits_per_sample (`int`): The audio sample size. 245 bitrate (`int`): The audio bitrate. 246 """ 247 248 def __init__(self, fmt_chunk): 249 self.fmt_chunk = fmt_chunk 250 251 @property 252 def length(self): 253 return float(self.fmt_chunk.sample_count) / self.sample_rate 254 255 @property 256 def channels(self): 257 return self.fmt_chunk.channel_num 258 259 @property 260 def sample_rate(self): 261 return self.fmt_chunk.sampling_frequency 262 263 @property 264 def bits_per_sample(self): 265 return self.fmt_chunk.bits_per_sample 266 267 @property 268 def bitrate(self): 269 return self.sample_rate * self.bits_per_sample * self.channels 270 271 def pprint(self): 272 return u"%d channel DSF @ %d bits, %s Hz, %.2f seconds" % ( 273 self.channels, self.bits_per_sample, self.sample_rate, self.length) 274 275 276class DSFFile(object): 277 278 dsd_chunk = None 279 fmt_chunk = None 280 data_chunk = None 281 282 def __init__(self, fileobj): 283 self.dsd_chunk = DSDChunk(fileobj) 284 self.fmt_chunk = FormatChunk(fileobj) 285 self.data_chunk = DataChunk(fileobj) 286 287 288class DSF(FileType): 289 """An DSF audio file. 290 291 Arguments: 292 filething (filething) 293 294 Attributes: 295 info (`DSFInfo`) 296 tags (`mutagen.id3.ID3Tags` or `None`) 297 """ 298 299 _mimes = ["audio/dsf"] 300 301 @staticmethod 302 def score(filename, fileobj, header): 303 return header.startswith(b"DSD ") * 2 + \ 304 endswith(filename.lower(), ".dsf") 305 306 def add_tags(self): 307 """Add a DSF tag block to the file.""" 308 309 if self.tags is None: 310 self.tags = _DSFID3() 311 else: 312 raise error("an ID3 tag already exists") 313 314 @convert_error(IOError, error) 315 @loadfile() 316 def load(self, filething, **kwargs): 317 dsf_file = DSFFile(filething.fileobj) 318 319 try: 320 self.tags = _DSFID3(filething.fileobj, **kwargs) 321 except ID3NoHeaderError: 322 self.tags = None 323 except ID3Error as e: 324 raise error(e) 325 else: 326 self.tags.filename = self.filename 327 328 self.info = DSFInfo(dsf_file.fmt_chunk) 329 330 @loadfile(writable=True) 331 def delete(self, filething=None): 332 self.tags = None 333 delete(filething) 334 335 336@convert_error(IOError, error) 337@loadfile(method=False, writable=True) 338def delete(filething): 339 """Remove tags from a file. 340 341 Args: 342 filething (filething) 343 Raises: 344 mutagen.MutagenError 345 """ 346 347 dsf_file = DSFFile(filething.fileobj) 348 349 if dsf_file.dsd_chunk.offset_metdata_chunk != 0: 350 id3_location = dsf_file.dsd_chunk.offset_metdata_chunk 351 dsf_file.dsd_chunk.offset_metdata_chunk = 0 352 dsf_file.dsd_chunk.write() 353 354 filething.fileobj.seek(id3_location) 355 filething.fileobj.truncate() 356 357 358Open = DSF 359