1# -*- coding: utf-8 -*-
2# Copyright (C) 2017  Boris Pruessmann
3#
4# This program is free software; you can redistribute it and/or modify
5# it under the terms of the GNU General Public License as published by
6# the Free Software Foundation; either version 2 of the License, or
7# (at your option) any later version.
8
9"""Read and write DSF audio stream information and tags."""
10
11
12import sys
13import struct
14
15from ._compat import cBytesIO, reraise, endswith
16
17from mutagen import FileType, StreamInfo
18from mutagen._util import cdata, MutagenError, loadfile, convert_error
19from mutagen.id3 import ID3
20from mutagen.id3._util import ID3NoHeaderError, error as ID3Error
21
22
23__all__ = ["DSF", "Open", "delete"]
24
25
26class error(MutagenError):
27    pass
28
29
30class DSFChunk(object):
31    """A generic chunk of a DSFFile."""
32
33    chunk_offset = 0
34    chunk_header = "    "
35    chunk_size = -1
36
37    def __init__(self, fileobj, create=False):
38        self.fileobj = fileobj
39
40        if not create:
41            self.chunk_offset = fileobj.tell()
42            self.load()
43
44    def load(self):
45        raise NotImplementedError
46
47    def write(self):
48        raise NotImplementedError
49
50
51class DSDChunk(DSFChunk):
52    """Represents the first chunk of a DSF file"""
53
54    CHUNK_SIZE = 28
55
56    total_size = 0
57    offset_metdata_chunk = 0
58
59    def __init__(self, fileobj, create=False):
60        super(DSDChunk, self).__init__(fileobj, create)
61
62        if create:
63            self.chunk_header = b"DSD "
64            self.chunk_size = DSDChunk.CHUNK_SIZE
65
66    def load(self):
67        data = self.fileobj.read(DSDChunk.CHUNK_SIZE)
68        if len(data) != DSDChunk.CHUNK_SIZE:
69            raise error("DSF chunk truncated")
70
71        self.chunk_header = data[0:4]
72        if self.chunk_header != b"DSD ":
73            raise error("DSF dsd header not found")
74
75        self.chunk_size = cdata.ulonglong_le(data[4:12])
76        if self.chunk_size != DSDChunk.CHUNK_SIZE:
77            raise error("DSF dsd header size mismatch")
78
79        self.total_size = cdata.ulonglong_le(data[12:20])
80        self.offset_metdata_chunk = cdata.ulonglong_le(data[20:28])
81
82    def write(self):
83        f = cBytesIO()
84        f.write(self.chunk_header)
85        f.write(struct.pack("<Q", DSDChunk.CHUNK_SIZE))
86        f.write(struct.pack("<Q", self.total_size))
87        f.write(struct.pack("<Q", self.offset_metdata_chunk))
88
89        self.fileobj.seek(self.chunk_offset)
90        self.fileobj.write(f.getvalue())
91
92    def pprint(self):
93        return (u"DSD Chunk (Total file size = %d, "
94                u"Pointer to Metadata chunk = %d)" % (
95                    self.total_size, self.offset_metdata_chunk))
96
97
98class FormatChunk(DSFChunk):
99
100    CHUNK_SIZE = 52
101
102    VERSION = 1
103
104    FORMAT_DSD_RAW = 0
105    """Format ID: DSD Raw"""
106
107    format_version = VERSION
108    format_id = FORMAT_DSD_RAW
109    channel_type = 1
110    channel_num = 1
111    sampling_frequency = 2822400
112    bits_per_sample = 1
113    sample_count = 0
114    block_size_per_channel = 4096
115
116    def __init__(self, fileobj, create=False):
117        super(FormatChunk, self).__init__(fileobj, create)
118
119        if create:
120            self.chunk_header = b"fmt "
121            self.chunk_size = FormatChunk.CHUNK_SIZE
122
123    def load(self):
124        data = self.fileobj.read(FormatChunk.CHUNK_SIZE)
125        if len(data) != FormatChunk.CHUNK_SIZE:
126            raise error("DSF chunk truncated")
127
128        self.chunk_header = data[0:4]
129        if self.chunk_header != b"fmt ":
130            raise error("DSF fmt header not found")
131
132        self.chunk_size = cdata.ulonglong_le(data[4:12])
133        if self.chunk_size != FormatChunk.CHUNK_SIZE:
134            raise error("DSF dsd header size mismatch")
135
136        self.format_version = cdata.uint_le(data[12:16])
137        if self.format_version != FormatChunk.VERSION:
138            raise error("Unsupported format version")
139
140        self.format_id = cdata.uint_le(data[16:20])
141        if self.format_id != FormatChunk.FORMAT_DSD_RAW:
142            raise error("Unsupported format ID")
143
144        self.channel_type = cdata.uint_le(data[20:24])
145        self.channel_num = cdata.uint_le(data[24:28])
146        self.sampling_frequency = cdata.uint_le(data[28:32])
147        self.bits_per_sample = cdata.uint_le(data[32:36])
148        self.sample_count = cdata.ulonglong_le(data[36:44])
149
150    def pprint(self):
151        return u"fmt Chunk (Channel Type = %d, Channel Num = %d, " \
152               u"Sampling Frequency = %d, %.2f seconds)" % \
153               (self.channel_type, self.channel_num, self.sampling_frequency,
154                self.length)
155
156
157class DataChunk(DSFChunk):
158
159    CHUNK_SIZE = 12
160
161    data = ""
162
163    def __init__(self, fileobj, create=False):
164        super(DataChunk, self).__init__(fileobj, create)
165
166        if create:
167            self.chunk_header = b"data"
168            self.chunk_size = DataChunk.CHUNK_SIZE
169
170    def load(self):
171        data = self.fileobj.read(DataChunk.CHUNK_SIZE)
172        if len(data) != DataChunk.CHUNK_SIZE:
173            raise error("DSF chunk truncated")
174
175        self.chunk_header = data[0:4]
176        if self.chunk_header != b"data":
177            raise error("DSF data header not found")
178
179        self.chunk_size = cdata.ulonglong_le(data[4:12])
180        if self.chunk_size < DataChunk.CHUNK_SIZE:
181            raise error("DSF data header size mismatch")
182
183    def pprint(self):
184        return u"data Chunk (Chunk Offset = %d, Chunk Size = %d)" % (
185            self.chunk_offset, self.chunk_size)
186
187
188class _DSFID3(ID3):
189    """A DSF file with ID3v2 tags"""
190
191    @convert_error(IOError, error)
192    def _pre_load_header(self, fileobj):
193        fileobj.seek(0)
194        id3_location = DSDChunk(fileobj).offset_metdata_chunk
195        if id3_location == 0:
196            raise ID3NoHeaderError("File has no existing ID3 tag")
197
198        fileobj.seek(id3_location)
199
200    @convert_error(IOError, error)
201    @loadfile(writable=True)
202    def save(self, filething=None, v2_version=4, v23_sep='/', padding=None):
203        """Save ID3v2 data to the DSF file"""
204
205        fileobj = filething.fileobj
206        fileobj.seek(0)
207
208        dsd_header = DSDChunk(fileobj)
209        if dsd_header.offset_metdata_chunk == 0:
210            # create a new ID3 chunk at the end of the file
211            fileobj.seek(0, 2)
212
213            # store reference to ID3 location
214            dsd_header.offset_metdata_chunk = fileobj.tell()
215            dsd_header.write()
216
217        try:
218            data = self._prepare_data(
219                fileobj, dsd_header.offset_metdata_chunk, self.size,
220                v2_version, v23_sep, padding)
221        except ID3Error as e:
222            reraise(error, e, sys.exc_info()[2])
223
224        fileobj.seek(dsd_header.offset_metdata_chunk)
225        fileobj.write(data)
226        fileobj.truncate()
227
228        # Update total file size
229        dsd_header.total_size = fileobj.tell()
230        dsd_header.write()
231
232
233class DSFInfo(StreamInfo):
234    """DSF audio stream information.
235
236    Information is parsed from the fmt chunk of the DSF file.
237
238    Attributes:
239        length (`float`): audio length, in seconds.
240        channels (`int`): The number of audio channels.
241        sample_rate (`int`):
242            Sampling frequency, in Hz.
243            (2822400, 5644800, 11289600, or 22579200)
244        bits_per_sample (`int`): The audio sample size.
245        bitrate (`int`): The audio bitrate.
246    """
247
248    def __init__(self, fmt_chunk):
249        self.fmt_chunk = fmt_chunk
250
251    @property
252    def length(self):
253        return float(self.fmt_chunk.sample_count) / self.sample_rate
254
255    @property
256    def channels(self):
257        return self.fmt_chunk.channel_num
258
259    @property
260    def sample_rate(self):
261        return self.fmt_chunk.sampling_frequency
262
263    @property
264    def bits_per_sample(self):
265        return self.fmt_chunk.bits_per_sample
266
267    @property
268    def bitrate(self):
269        return self.sample_rate * self.bits_per_sample * self.channels
270
271    def pprint(self):
272        return u"%d channel DSF @ %d bits, %s Hz, %.2f seconds" % (
273            self.channels, self.bits_per_sample, self.sample_rate, self.length)
274
275
276class DSFFile(object):
277
278    dsd_chunk = None
279    fmt_chunk = None
280    data_chunk = None
281
282    def __init__(self, fileobj):
283        self.dsd_chunk = DSDChunk(fileobj)
284        self.fmt_chunk = FormatChunk(fileobj)
285        self.data_chunk = DataChunk(fileobj)
286
287
288class DSF(FileType):
289    """An DSF audio file.
290
291    Arguments:
292        filething (filething)
293
294    Attributes:
295        info (`DSFInfo`)
296        tags (`mutagen.id3.ID3Tags` or `None`)
297    """
298
299    _mimes = ["audio/dsf"]
300
301    @staticmethod
302    def score(filename, fileobj, header):
303        return header.startswith(b"DSD ") * 2 + \
304            endswith(filename.lower(), ".dsf")
305
306    def add_tags(self):
307        """Add a DSF tag block to the file."""
308
309        if self.tags is None:
310            self.tags = _DSFID3()
311        else:
312            raise error("an ID3 tag already exists")
313
314    @convert_error(IOError, error)
315    @loadfile()
316    def load(self, filething, **kwargs):
317        dsf_file = DSFFile(filething.fileobj)
318
319        try:
320            self.tags = _DSFID3(filething.fileobj, **kwargs)
321        except ID3NoHeaderError:
322            self.tags = None
323        except ID3Error as e:
324            raise error(e)
325        else:
326            self.tags.filename = self.filename
327
328        self.info = DSFInfo(dsf_file.fmt_chunk)
329
330    @loadfile(writable=True)
331    def delete(self, filething=None):
332        self.tags = None
333        delete(filething)
334
335
336@convert_error(IOError, error)
337@loadfile(method=False, writable=True)
338def delete(filething):
339    """Remove tags from a file.
340
341    Args:
342        filething (filething)
343    Raises:
344        mutagen.MutagenError
345    """
346
347    dsf_file = DSFFile(filething.fileobj)
348
349    if dsf_file.dsd_chunk.offset_metdata_chunk != 0:
350        id3_location = dsf_file.dsd_chunk.offset_metdata_chunk
351        dsf_file.dsd_chunk.offset_metdata_chunk = 0
352        dsf_file.dsd_chunk.write()
353
354        filething.fileobj.seek(id3_location)
355        filething.fileobj.truncate()
356
357
358Open = DSF
359