1# -*- coding: utf-8 -*-
2
3import os
4import sys
5from tempfile import mkstemp
6import shutil
7import warnings
8
9from hypothesis.strategies import composite, integers, one_of
10from hypothesis import given
11
12from tests import TestCase, DATA_DIR, get_temp_copy
13from mutagen._compat import cBytesIO, text_type
14from mutagen import File, Metadata, FileType, MutagenError, PaddingInfo
15from mutagen._util import loadfile, get_size
16from mutagen.oggvorbis import OggVorbis
17from mutagen.oggflac import OggFLAC
18from mutagen.oggspeex import OggSpeex
19from mutagen.oggtheora import OggTheora
20from mutagen.oggopus import OggOpus
21from mutagen.mp3 import MP3, EasyMP3
22from mutagen.id3 import ID3FileType
23from mutagen.apev2 import APEv2File
24from mutagen.flac import FLAC
25from mutagen.wavpack import WavPack
26from mutagen.trueaudio import TrueAudio, EasyTrueAudio
27from mutagen.mp4 import MP4
28from mutagen.musepack import Musepack
29from mutagen.monkeysaudio import MonkeysAudio
30from mutagen.optimfrog import OptimFROG
31from mutagen.asf import ASF
32from mutagen.aiff import AIFF
33from mutagen.aac import AAC
34from mutagen.smf import SMF
35from mutagen.dsf import DSF
36from os import devnull
37
38
39class TMetadata(TestCase):
40
41    class FakeMeta(Metadata):
42        def __init__(self):
43            pass
44
45    def test_virtual_constructor(self):
46        self.failUnlessRaises(NotImplementedError, Metadata, cBytesIO())
47
48    def test_load(self):
49        m = Metadata()
50        self.failUnlessRaises(NotImplementedError, m.load, cBytesIO())
51
52    def test_virtual_save(self):
53        self.failUnlessRaises(
54            NotImplementedError, self.FakeMeta().save, cBytesIO())
55        self.failUnlessRaises(
56            NotImplementedError, self.FakeMeta().save, cBytesIO())
57
58    def test_virtual_delete(self):
59        self.failUnlessRaises(
60            NotImplementedError, self.FakeMeta().delete, cBytesIO())
61        self.failUnlessRaises(
62            NotImplementedError, self.FakeMeta().delete, cBytesIO())
63
64
65class TPaddingInfo(TestCase):
66
67    def test_props(self):
68        info = PaddingInfo(10, 100)
69        self.assertEqual(info.size, 100)
70        self.assertEqual(info.padding, 10)
71
72        info = PaddingInfo(-10, 100)
73        self.assertEqual(info.size, 100)
74        self.assertEqual(info.padding, -10)
75
76    def test_default_strategy(self):
77        s = 100000
78        self.assertEqual(PaddingInfo(10, s).get_default_padding(), 10)
79        self.assertEqual(PaddingInfo(-10, s).get_default_padding(), 1124)
80        self.assertEqual(PaddingInfo(0, s).get_default_padding(), 0)
81        self.assertEqual(PaddingInfo(20000, s).get_default_padding(), 1124)
82
83        self.assertEqual(PaddingInfo(10, 0).get_default_padding(), 10)
84        self.assertEqual(PaddingInfo(-10, 0).get_default_padding(), 1024)
85        self.assertEqual(PaddingInfo(1050, 0).get_default_padding(), 1050)
86        self.assertEqual(PaddingInfo(20000, 0).get_default_padding(), 1024)
87
88    def test_repr(self):
89        info = PaddingInfo(10, 100)
90        self.assertEqual(repr(info), "<PaddingInfo size=100 padding=10>")
91
92
93class MyFileType(FileType):
94
95    @loadfile()
96    def load(self, filething, arg=1):
97        self.filename = filething.filename
98        self.fileobj = filething.fileobj
99        self.arg = arg
100
101
102class TFileTypeLoad(TestCase):
103
104    filename = os.path.join(DATA_DIR, "empty.ogg")
105
106    def test_old_argument_handling(self):
107        with warnings.catch_warnings():
108            warnings.simplefilter("ignore")
109            f = MyFileType()
110        self.assertFalse(hasattr(f, "a"))
111
112        f = MyFileType(self.filename)
113        self.assertEquals(f.arg, 1)
114
115        f = MyFileType(self.filename, 42)
116        self.assertEquals(f.arg, 42)
117        self.assertEquals(f.filename, self.filename)
118
119        f = MyFileType(self.filename, arg=42)
120        self.assertEquals(f.arg, 42)
121
122        f = MyFileType(filename=self.filename, arg=42)
123        self.assertEquals(f.arg, 42)
124
125        self.assertRaises(TypeError, MyFileType, self.filename, nope=42)
126        self.assertRaises(TypeError, MyFileType, nope=42)
127        self.assertRaises(TypeError, MyFileType, self.filename, 42, 24)
128
129    def test_both_args(self):
130        # fileobj wins, but filename is saved
131        x = cBytesIO()
132        f = MyFileType(filename="foo", fileobj=x)
133        self.assertTrue(f.fileobj is x)
134        self.assertEquals(f.filename, "foo")
135
136    def test_fileobj(self):
137        x = cBytesIO()
138        f = MyFileType(fileobj=x)
139        self.assertTrue(f.fileobj is x)
140        self.assertTrue(f.filename is None)
141
142    def test_magic(self):
143        x = cBytesIO()
144        f = MyFileType(x)
145        self.assertTrue(f.fileobj is x)
146        self.assertTrue(f.filename is None)
147
148    def test_filething(self):
149        # while load() has that arg, we don't allow it as kwarg, either
150        # pass per arg, or be explicit about the type.
151        x = cBytesIO()
152        self.assertRaises(TypeError, MyFileType, filething=x)
153
154    def test_filename_explicit(self):
155        x = cBytesIO()
156        self.assertRaises(ValueError, MyFileType, filename=x)
157
158
159class TFileType(TestCase):
160
161    def setUp(self):
162        self.vorbis = File(os.path.join(DATA_DIR, "empty.ogg"))
163
164        filename = get_temp_copy(os.path.join(DATA_DIR, "xing.mp3"))
165        self.mp3_notags = File(filename)
166        self.mp3_filename = filename
167
168    def tearDown(self):
169        os.remove(self.mp3_filename)
170
171    def test_delitem_not_there(self):
172        self.failUnlessRaises(KeyError, self.vorbis.__delitem__, "foobar")
173
174    def test_add_tags(self):
175        with warnings.catch_warnings():
176            warnings.simplefilter("ignore")
177            self.failUnlessRaises(NotImplementedError, FileType().add_tags)
178
179    def test_delitem(self):
180        self.vorbis["foobar"] = "quux"
181        del(self.vorbis["foobar"])
182        self.failIf("quux" in self.vorbis)
183
184    def test_save_no_tags(self):
185        self.assertTrue(self.mp3_notags.tags is None)
186        self.assertTrue(self.mp3_notags.filename)
187        self.mp3_notags.save()
188        self.assertTrue(self.mp3_notags.tags is None)
189
190
191class _TestFileObj(object):
192    """A file-like object which fails in various ways"""
193
194    def __init__(self, fileobj, stop_after=-1, fail_after=-1):
195        """
196        Args:
197            stop_after (int): size of data to return on read in total
198            fail_after (int): after this number of operations every method
199                will raise IOError
200        """
201
202        self._fileobj = fileobj
203        self._stop_after = stop_after
204        self._fail_after = fail_after
205
206        self.dataread = 0
207        self.operations = 0
208
209        fileobj.seek(0, 0)
210
211    def _check_fail(self):
212        self.operations += 1
213        if self._fail_after != -1:
214            if self.operations > self._fail_after:
215                raise IOError("fail")
216
217    def tell(self):
218        self._check_fail()
219        return self._fileobj.tell()
220
221    def write(self, data):
222        try:
223            self._check_fail()
224        except IOError:
225            # we use write(b"") to check if the fileobj is writable
226            if len(data):
227                raise
228        self._fileobj.write(data)
229
230    def truncate(self, *args, **kwargs):
231        self._check_fail()
232        self._fileobj.truncate(*args, **kwargs)
233
234    def flush(self):
235        self._fileobj.flush()
236
237    def read(self, size=-1):
238        try:
239            self._check_fail()
240        except IOError:
241            # we use read(0) to test for the file object type, so don't error
242            # out in that case
243            if size != 0:
244                raise
245
246        data = self._fileobj.read(size)
247        self.dataread += len(data)
248        if self._stop_after != -1 and self.dataread > self._stop_after:
249            data = data[:self._stop_after - self.dataread]
250        return data
251
252    def seek(self, offset, whence=0):
253        self._check_fail()
254
255        # make sure we don't go negative
256        if whence == 0:
257            final_position = offset
258        elif whence == 1:
259            final_position = self._fileobj.tell() + offset
260        elif whence == 2:
261            final_position = get_size(self._fileobj) + offset
262        assert final_position >= 0, final_position
263
264        return self._fileobj.seek(offset, whence)
265
266
267def generate_test_file_objects(fileobj, func):
268    """Given a file object yields the same file object which fails differently
269    each time
270    """
271
272    t = _TestFileObj(fileobj)
273    # first figure out how much a successful attempt reads and how many
274    # file object operations it executes.
275    func(t)
276
277    @composite
278    def strategy(draw):
279
280        stop_strat = integers(
281            min_value=0, max_value=t.dataread).map(
282                lambda i: _TestFileObj(fileobj, stop_after=i))
283
284        fail_strat = integers(
285            min_value=0, max_value=t.operations).map(
286                lambda i: _TestFileObj(fileobj, fail_after=i))
287
288        x = draw(one_of(stop_strat, fail_strat))
289        return x
290
291    return strategy()
292
293
294class TAbstractFileType(object):
295
296    PATH = None
297    KIND = None
298
299    def setUp(self):
300        self.filename = get_temp_copy(self.PATH)
301        self.audio = self.KIND(self.filename)
302
303    def tearDown(self):
304        try:
305            os.remove(self.filename)
306        except OSError:
307            pass
308
309    def test_fileobj_load(self):
310        with open(self.filename, "rb") as h:
311            self.KIND(h)
312
313    def test_fileobj_save(self):
314        with open(self.filename, "rb+") as h:
315            f = self.KIND(h)
316            h.seek(0)
317            f.save(h)
318            h.seek(0)
319            f.delete(h)
320
321    def test_module_delete_fileobj(self):
322        mod = sys.modules[self.KIND.__module__]
323        if hasattr(mod, "delete"):
324            with open(self.filename, "rb+") as h:
325                mod.delete(fileobj=h)
326
327    def test_stringio(self):
328        with open(self.filename, "rb") as h:
329            fileobj = cBytesIO(h.read())
330            self.KIND(fileobj)
331            # make sure it's not closed
332            fileobj.read(0)
333
334    def test_testfileobj(self):
335        with open(self.filename, "rb") as h:
336            self.KIND(_TestFileObj(h))
337
338    def test_test_fileobj_load(self):
339        with open(self.filename, "rb") as h:
340
341            @given(generate_test_file_objects(h, self.KIND))
342            def run(t):
343                try:
344                    self.KIND(t)
345                except MutagenError:
346                    pass
347
348            run()
349
350    def test_test_fileobj_save(self):
351        with open(self.filename, "rb+") as h:
352            o = self.KIND(_TestFileObj(h))
353
354            @given(generate_test_file_objects(h, lambda t: o.save(fileobj=t)))
355            def run(t):
356                try:
357                    o.save(fileobj=t)
358                except MutagenError:
359                    pass
360
361            run()
362
363    def test_test_fileobj_delete(self):
364        with open(self.filename, "rb+") as h:
365            o = self.KIND(_TestFileObj(h))
366
367            @given(generate_test_file_objects(
368                h, lambda t: o.delete(fileobj=t)))
369            def run(t):
370                try:
371                    o.delete(fileobj=t)
372                except MutagenError:
373                    pass
374
375            run()
376
377    def test_filename(self):
378        self.assertEqual(self.audio.filename, self.filename)
379
380    def test_file(self):
381        self.assertTrue(isinstance(File(self.PATH), self.KIND))
382
383    def test_not_file(self):
384        self.failUnlessRaises(MutagenError, self.KIND, "/dev/doesnotexist")
385
386    def test_pprint(self):
387        res = self.audio.pprint()
388        self.assertTrue(res)
389        self.assertTrue(isinstance(res, text_type))
390
391    def test_info(self):
392        self.assertTrue(self.audio.info)
393
394    def test_info_pprint(self):
395        res = self.audio.info.pprint()
396        self.assertTrue(res)
397        self.assertTrue(isinstance(res, text_type))
398
399    def test_mime(self):
400        self.assertTrue(self.audio.mime)
401        self.assertTrue(isinstance(self.audio.mime, list))
402
403    def test_load(self):
404        with warnings.catch_warnings():
405            warnings.simplefilter("ignore")
406            x = self.KIND()
407        x.load(self.filename)
408        x.save()
409
410    def test_delete(self):
411        self.audio.delete(self.filename)
412        self.audio.delete()
413
414    def test_delete_nonexisting(self):
415        # if there are none, add them first
416        if not self.audio.tags:
417            try:
418                self.audio.add_tags()
419            except MutagenError:
420                pass
421            else:
422                self.audio.save()
423
424        os.remove(self.filename)
425        try:
426            self.audio.delete()
427        except MutagenError:
428            pass
429
430    def test_save_nonexisting(self):
431        os.remove(self.filename)
432        tags = self.audio.tags
433        # Metadata creates a new file
434        if not isinstance(tags, Metadata):
435            try:
436                self.audio.save()
437            except MutagenError:
438                pass
439
440    def test_save(self):
441        self.audio.save(self.filename)
442        self.audio.save()
443
444    def test_add_tags(self):
445        had_tags = self.audio.tags is not None
446        try:
447            self.audio.add_tags()
448        except MutagenError:
449            pass
450        else:
451            self.assertFalse(had_tags)
452            self.assertTrue(self.audio.tags is not None)
453        self.assertRaises(MutagenError, self.audio.add_tags)
454
455    def test_score(self):
456        with open(self.filename, "rb") as fileobj:
457            header = fileobj.read(128)
458            self.KIND.score(self.filename, fileobj, header)
459
460    def test_dict(self):
461        self.audio.keys()
462        self.assertRaises(KeyError, self.audio.__delitem__, "nopenopenopenope")
463        for key, value in self.audio.items():
464            del self.audio[key]
465            self.audio[key] = value
466
467
468_FILETYPES = {
469    OggVorbis: [os.path.join(DATA_DIR, "empty.ogg")],
470    OggFLAC: [os.path.join(DATA_DIR, "empty.oggflac")],
471    OggSpeex: [os.path.join(DATA_DIR, "empty.spx")],
472    OggTheora: [os.path.join(DATA_DIR, "sample.oggtheora")],
473    OggOpus: [os.path.join(DATA_DIR, "example.opus")],
474    FLAC: [os.path.join(DATA_DIR, "silence-44-s.flac")],
475    TrueAudio: [os.path.join(DATA_DIR, "empty.tta")],
476    WavPack: [os.path.join(DATA_DIR, "silence-44-s.wv")],
477    MP3: [
478        os.path.join(DATA_DIR, "bad-xing.mp3"),
479        os.path.join(DATA_DIR, "xing.mp3"),
480        os.path.join(DATA_DIR, "silence-44-s.mp3"),
481    ],
482    Musepack: [
483        os.path.join(DATA_DIR, "click.mpc"),
484        os.path.join(DATA_DIR, "sv4_header.mpc"),
485        os.path.join(DATA_DIR, "sv5_header.mpc"),
486        os.path.join(DATA_DIR, "sv8_header.mpc"),
487    ],
488    OptimFROG: [
489        os.path.join(DATA_DIR, "empty.ofr"),
490        os.path.join(DATA_DIR, "empty.ofs"),
491    ],
492    AAC: [
493        os.path.join(DATA_DIR, "empty.aac"),
494        os.path.join(DATA_DIR, "adif.aac"),
495    ],
496    ASF: [
497        os.path.join(DATA_DIR, "silence-1.wma"),
498        os.path.join(DATA_DIR, "silence-2.wma"),
499        os.path.join(DATA_DIR, "silence-3.wma"),
500    ],
501    AIFF: [
502        os.path.join(DATA_DIR, "with-id3.aif"),
503        os.path.join(DATA_DIR, "11k-1ch-2s-silence.aif"),
504        os.path.join(DATA_DIR, "48k-2ch-s16-silence.aif"),
505        os.path.join(DATA_DIR, "8k-1ch-1s-silence.aif"),
506        os.path.join(DATA_DIR, "8k-1ch-3.5s-silence.aif"),
507        os.path.join(DATA_DIR, "8k-4ch-1s-silence.aif")
508    ],
509    MonkeysAudio: [
510        os.path.join(DATA_DIR, "mac-399.ape"),
511        os.path.join(DATA_DIR, "mac-396.ape"),
512    ],
513    MP4: [
514        os.path.join(DATA_DIR, "has-tags.m4a"),
515        os.path.join(DATA_DIR, "no-tags.m4a"),
516        os.path.join(DATA_DIR, "no-tags.3g2"),
517        os.path.join(DATA_DIR, "truncated-64bit.mp4"),
518    ],
519    SMF: [
520        os.path.join(DATA_DIR, "sample.mid"),
521    ],
522    DSF: [
523        os.path.join(DATA_DIR, '2822400-1ch-0s-silence.dsf'),
524        os.path.join(DATA_DIR, '5644800-2ch-s01-silence.dsf'),
525        os.path.join(DATA_DIR, 'with-id3.dsf'),
526        os.path.join(DATA_DIR, 'without-id3.dsf'),
527    ]
528}
529
530_FILETYPES[ID3FileType] = _FILETYPES[MP3]
531_FILETYPES[APEv2File] = _FILETYPES[MonkeysAudio]
532
533
534def create_filetype_tests():
535    for kind, paths in _FILETYPES.items():
536        for i, path in enumerate(paths):
537            suffix = "_" + str(i + 1) if i else ""
538            new_type = type("TFileType" + kind.__name__ + suffix,
539                            (TAbstractFileType, TestCase),
540                            {"PATH": path, "KIND": kind})
541            globals()[new_type.__name__] = new_type
542
543create_filetype_tests()
544
545
546class TFile(TestCase):
547
548    @property
549    def filenames(self):
550        for kind, paths in _FILETYPES.items():
551            for path in paths:
552                yield path
553
554    def test_bad(self):
555        try:
556            self.failUnless(File(devnull) is None)
557        except (OSError, IOError):
558            print("WARNING: Unable to open %s." % devnull)
559        self.failUnless(File(__file__) is None)
560
561    def test_empty(self):
562        filename = os.path.join(DATA_DIR, "empty")
563        open(filename, "wb").close()
564        try:
565            self.failUnless(File(filename) is None)
566        finally:
567            os.unlink(filename)
568
569    def test_not_file(self):
570        self.failUnlessRaises(MutagenError, File, "/dev/doesnotexist")
571
572    def test_no_options(self):
573        for filename in self.filenames:
574            filename = os.path.join(DATA_DIR, filename)
575            self.failIf(File(filename, options=[]))
576
577    def test_fileobj(self):
578        for filename in self.filenames:
579            with open(filename, "rb") as h:
580                self.assertTrue(File(h) is not None)
581            with open(filename, "rb") as h:
582                fileobj = cBytesIO(h.read())
583                self.assertTrue(File(fileobj, filename=filename) is not None)
584
585    def test_mock_fileobj(self):
586        for filename in self.filenames:
587            with open(filename, "rb") as h:
588
589                @given(generate_test_file_objects(h, File))
590                def run(t):
591                    try:
592                        File(t)
593                    except MutagenError:
594                        pass
595
596                run()
597
598    def test_easy_mp3(self):
599        self.failUnless(isinstance(
600            File(os.path.join(DATA_DIR, "silence-44-s.mp3"), easy=True),
601            EasyMP3))
602
603    def test_apev2(self):
604        self.failUnless(isinstance(
605            File(os.path.join(DATA_DIR, "oldtag.apev2")), APEv2File))
606
607    def test_easy_tta(self):
608        self.failUnless(isinstance(
609            File(os.path.join(DATA_DIR, "empty.tta"), easy=True),
610            EasyTrueAudio))
611
612    def test_id3_indicates_mp3_not_tta(self):
613        header = b"ID3 the rest of this is garbage"
614        fileobj = cBytesIO(header)
615        filename = "not-identifiable.ext"
616        self.failUnless(TrueAudio.score(filename, fileobj, header) <
617                        MP3.score(filename, fileobj, header))
618
619    def test_prefer_theora_over_vorbis(self):
620        header = (
621            b"OggS\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\xe1x\x06\x0f"
622            b"\x00\x00\x00\x00)S'\xf4\x01*\x80theora\x03\x02\x01\x006\x00\x1e"
623            b"\x00\x03V\x00\x01\xe0\x00\x00\x00\x00\x00\x18\x00\x00\x00\x01"
624            b"\x00\x00\x00\x00\x00\x00\x00&%\xa0\x00\xc0OggS\x00\x02\x00\x00"
625            b"\x00\x00\x00\x00\x00\x00d#\xa8\x1f\x00\x00\x00\x00]Y\xc0\xc0"
626            b"\x01\x1e\x01vorbis\x00\x00\x00\x00\x02\x80\xbb\x00\x00\x00\x00"
627            b"\x00\x00\x00\xee\x02\x00\x00\x00\x00\x00\xb8\x01")
628        fileobj = cBytesIO(header)
629        filename = "not-identifiable.ext"
630        self.failUnless(OggVorbis.score(filename, fileobj, header) <
631                        OggTheora.score(filename, fileobj, header))
632
633
634class TFileUpperExt(TestCase):
635    FILES = [
636        (os.path.join(DATA_DIR, "empty.ofr"), OptimFROG),
637        (os.path.join(DATA_DIR, "sv5_header.mpc"), Musepack),
638        (os.path.join(DATA_DIR, "silence-3.wma"), ASF),
639        (os.path.join(DATA_DIR, "truncated-64bit.mp4"), MP4),
640        (os.path.join(DATA_DIR, "silence-44-s.flac"), FLAC),
641    ]
642
643    def setUp(self):
644        checks = []
645        for (original, instance) in self.FILES:
646            ext = os.path.splitext(original)[1]
647            fd, filename = mkstemp(suffix=ext.upper())
648            os.close(fd)
649            shutil.copy(original, filename)
650            checks.append((filename, instance))
651        self.checks = checks
652
653    def test_case_insensitive_ext(self):
654        for (path, instance) in self.checks:
655            if isinstance(path, bytes):
656                path = path.decode("ascii")
657            self.failUnless(
658                isinstance(File(path, options=[instance]), instance))
659            path = path.encode("ascii")
660            self.failUnless(
661                isinstance(File(path, options=[instance]), instance))
662
663    def tearDown(self):
664        for (path, instance) in self.checks:
665            os.unlink(path)
666
667
668class TModuleImportAll(TestCase):
669
670    def setUp(self):
671        import mutagen
672        files = os.listdir(mutagen.__path__[0])
673        modules = set(os.path.splitext(f)[0] for f in files)
674        modules = [f for f in modules if not f.startswith("_")]
675
676        self.modules = []
677        for module in modules:
678            mod = getattr(__import__("mutagen." + module), module)
679            self.modules.append(mod)
680
681    def tearDown(self):
682        del self.modules[:]
683
684    def test_all(self):
685        for mod in self.modules:
686            for attr in getattr(mod, "__all__", []):
687                getattr(mod, attr)
688
689    def test_errors(self):
690        for mod in self.modules:
691            self.assertTrue(issubclass(mod.error, MutagenError), msg=mod.error)
692