1# -*- coding: utf-8 -*-
2
3import os
4import random
5import subprocess
6
7from mutagen._compat import BytesIO, xrange
8from tests import TestCase, DATA_DIR, get_temp_copy
9from mutagen.ogg import OggPage, error as OggError
10from mutagen._util import cdata
11from mutagen import _util
12
13
14class TOggPage(TestCase):
15
16    def setUp(self):
17        self.fileobj = open(os.path.join(DATA_DIR, "empty.ogg"), "rb")
18        self.page = OggPage(self.fileobj)
19
20        pages = [OggPage(), OggPage(), OggPage()]
21        pages[0].packets = [b"foo"]
22        pages[1].packets = [b"bar"]
23        pages[2].packets = [b"baz"]
24        for i in xrange(len(pages)):
25            pages[i].sequence = i
26        for page in pages:
27            page.serial = 1
28        self.pages = pages
29
30    def test_flags(self):
31        self.failUnless(self.page.first)
32        self.failIf(self.page.continued)
33        self.failIf(self.page.last)
34        self.failUnless(self.page.complete)
35
36        for first in [True, False]:
37            self.page.first = first
38            for last in [True, False]:
39                self.page.last = last
40                for continued in [True, False]:
41                    self.page.continued = continued
42                    self.failUnlessEqual(self.page.first, first)
43                    self.failUnlessEqual(self.page.last, last)
44                    self.failUnlessEqual(self.page.continued, continued)
45
46    def test_flags_next_page(self):
47        page = OggPage(self.fileobj)
48        self.failIf(page.first)
49        self.failIf(page.continued)
50        self.failIf(page.last)
51
52    def test_length(self):
53        # Always true for Ogg Vorbis files
54        self.failUnlessEqual(self.page.size, 58)
55        self.failUnlessEqual(len(self.page.write()), 58)
56
57    def test_first_metadata_page_is_separate(self):
58        self.failIf(OggPage(self.fileobj).continued)
59
60    def test_single_page_roundtrip(self):
61        self.failUnlessEqual(
62            self.page, OggPage(BytesIO(self.page.write())))
63
64    def test_at_least_one_audio_page(self):
65        page = OggPage(self.fileobj)
66        while not page.last:
67            page = OggPage(self.fileobj)
68        self.failUnless(page.last)
69
70    def test_crappy_fragmentation(self):
71        packets = [b"1" * 511, b"2" * 511, b"3" * 511]
72        pages = OggPage.from_packets(packets, default_size=510, wiggle_room=0)
73        self.failUnless(len(pages) > 3)
74        self.failUnlessEqual(OggPage.to_packets(pages), packets)
75
76    def test_wiggle_room(self):
77        packets = [b"1" * 511, b"2" * 511, b"3" * 511]
78        pages = OggPage.from_packets(
79            packets, default_size=510, wiggle_room=100)
80        self.failUnlessEqual(len(pages), 3)
81        self.failUnlessEqual(OggPage.to_packets(pages), packets)
82
83    def test_one_packet_per_wiggle(self):
84        packets = [b"1" * 511, b"2" * 511, b"3" * 511]
85        pages = OggPage.from_packets(
86            packets, default_size=1000, wiggle_room=1000000)
87        self.failUnlessEqual(len(pages), 2)
88        self.failUnlessEqual(OggPage.to_packets(pages), packets)
89
90    def test_replace(self):
91        # create interleaved pages
92        fileobj = BytesIO()
93        pages = [OggPage(), OggPage(), OggPage()]
94        pages[0].serial = 42
95        pages[0].sequence = 0
96        pages[0].packets = [b"foo"]
97        pages[1].serial = 24
98        pages[1].sequence = 0
99        pages[1].packets = [b"bar"]
100        pages[2].serial = 42
101        pages[2].sequence = 1
102        pages[2].packets = [b"baz"]
103        for page in pages:
104            fileobj.write(page.write())
105
106        fileobj.seek(0, 0)
107        pages_from_file = [OggPage(fileobj), OggPage(fileobj),
108                           OggPage(fileobj)]
109
110        old_pages = [pages_from_file[0], pages_from_file[2]]
111        packets = OggPage.to_packets(old_pages, strict=True)
112        self.assertEqual(packets, [b"foo", b"baz"])
113        new_packets = [b"1111", b"2222"]
114        new_pages = OggPage.from_packets(new_packets,
115                                         sequence=old_pages[0].sequence)
116        self.assertEqual(len(new_pages), 1)
117        OggPage.replace(fileobj, old_pages, new_pages)
118
119        fileobj.seek(0, 0)
120        first = OggPage(fileobj)
121        self.assertEqual(first.serial, 42)
122        self.assertEqual(OggPage.to_packets([first], strict=True),
123                         [b"1111", b"2222"])
124        second = OggPage(fileobj)
125        self.assertEqual(second.serial, 24)
126        self.assertEqual(OggPage.to_packets([second], strict=True), [b"bar"])
127
128    def test_replace_fast_path(self):
129        # create interleaved pages
130        fileobj = BytesIO()
131        pages = [OggPage(), OggPage(), OggPage()]
132        pages[0].serial = 42
133        pages[0].sequence = 0
134        pages[0].packets = [b"foo"]
135        pages[1].serial = 24
136        pages[1].sequence = 0
137        pages[1].packets = [b"bar"]
138        pages[2].serial = 42
139        pages[2].sequence = 1
140        pages[2].packets = [b"baz"]
141        for page in pages:
142            fileobj.write(page.write())
143
144        fileobj.seek(0, 0)
145        pages_from_file = [OggPage(fileobj), OggPage(fileobj),
146                           OggPage(fileobj)]
147
148        old_pages = [pages_from_file[0], pages_from_file[2]]
149        packets = OggPage.to_packets(old_pages, strict=True)
150        self.assertEqual(packets, [b"foo", b"baz"])
151        new_packets = [b"111", b"222"]
152        new_pages = OggPage._from_packets_try_preserve(new_packets, old_pages)
153        self.assertEqual(len(new_pages), 2)
154
155        # remove insert_bytes, so we can be sure the fast path was taken
156        old_insert_bytes = _util.insert_bytes
157        _util.insert_bytes = None
158        try:
159            OggPage.replace(fileobj, old_pages, new_pages)
160        finally:
161            _util.insert_bytes = old_insert_bytes
162
163        # validate that the new data was written and the other pages
164        # are untouched
165        fileobj.seek(0, 0)
166        pages_from_file = [OggPage(fileobj), OggPage(fileobj),
167                           OggPage(fileobj)]
168        packets = OggPage.to_packets(
169            [pages_from_file[0], pages_from_file[2]], strict=True)
170        self.assertEqual(packets, [b"111", b"222"])
171        packets = OggPage.to_packets([pages_from_file[1]], strict=True)
172        self.assertEqual(packets, [b"bar"])
173
174    def test_replace_continued(self):
175        # take a partial packet and replace it with a new page
176        # replace() should make it spanning again
177        fileobj = BytesIO()
178        pages = [OggPage(), OggPage()]
179        pages[0].serial = 1
180        pages[0].sequence = 0
181        pages[0].complete = False
182        pages[0].packets = [b"foo"]
183        pages[1].serial = 1
184        pages[1].sequence = 1
185        pages[1].continued = True
186        pages[1].packets = [b"bar"]
187        fileobj = BytesIO()
188        for page in pages:
189            fileobj.write(page.write())
190
191        fileobj.seek(0, 0)
192        pages_from_file = [OggPage(fileobj), OggPage(fileobj)]
193        self.assertEqual(OggPage.to_packets(pages_from_file), [b"foobar"])
194        packets_part = OggPage.to_packets([pages_from_file[0]])
195        self.assertEqual(packets_part, [b"foo"])
196        new_pages = OggPage.from_packets([b"quuux"])
197        OggPage.replace(fileobj, [pages_from_file[0]], new_pages)
198
199        fileobj.seek(0, 0)
200        written = OggPage.to_packets([OggPage(fileobj), OggPage(fileobj)])
201        self.assertEquals(written, [b"quuuxbar"])
202
203    def test_renumber(self):
204        self.failUnlessEqual(
205            [page.sequence for page in self.pages], [0, 1, 2])
206        fileobj = BytesIO()
207        for page in self.pages:
208            fileobj.write(page.write())
209        fileobj.seek(0)
210        OggPage.renumber(fileobj, 1, 10)
211        fileobj.seek(0)
212        pages = [OggPage(fileobj) for i in xrange(3)]
213        self.failUnlessEqual([page.sequence for page in pages], [10, 11, 12])
214
215        fileobj.seek(0)
216        OggPage.renumber(fileobj, 1, 20)
217        fileobj.seek(0)
218        pages = [OggPage(fileobj) for i in xrange(3)]
219        self.failUnlessEqual([page.sequence for page in pages], [20, 21, 22])
220
221    def test_renumber_extradata(self):
222        fileobj = BytesIO()
223        for page in self.pages:
224            fileobj.write(page.write())
225        fileobj.write(b"left over data")
226        fileobj.seek(0)
227        # Trying to rewrite should raise an error...
228        self.failUnlessRaises(Exception, OggPage.renumber, fileobj, 1, 10)
229        fileobj.seek(0)
230        # But the already written data should remain valid,
231        pages = [OggPage(fileobj) for i in xrange(3)]
232        self.failUnlessEqual([page.sequence for page in pages], [10, 11, 12])
233        # And the garbage that caused the error should be okay too.
234        self.failUnlessEqual(fileobj.read(), b"left over data")
235
236    def test_renumber_reread(self):
237        try:
238            filename = get_temp_copy(
239                os.path.join(DATA_DIR, "multipagecomment.ogg"))
240            with open(filename, "rb+") as fileobj:
241                OggPage.renumber(fileobj, 1002429366, 20)
242            with open(filename, "rb+") as fileobj:
243                OggPage.renumber(fileobj, 1002429366, 0)
244        finally:
245            os.unlink(filename)
246
247    def test_renumber_muxed(self):
248        pages = [OggPage() for i in xrange(10)]
249        for seq, page in enumerate(pages[0:1] + pages[2:]):
250            page.serial = 0
251            page.sequence = seq
252        pages[1].serial = 2
253        pages[1].sequence = 100
254        data = BytesIO(b"".join([page.write() for page in pages]))
255        OggPage.renumber(data, 0, 20)
256        data.seek(0)
257        pages = [OggPage(data) for i in xrange(10)]
258        self.failUnlessEqual(pages[1].serial, 2)
259        self.failUnlessEqual(pages[1].sequence, 100)
260        pages.pop(1)
261        self.failUnlessEqual(
262            [page.sequence for page in pages], list(xrange(20, 29)))
263
264    def test_to_packets(self):
265        self.failUnlessEqual(
266            [b"foo", b"bar", b"baz"], OggPage.to_packets(self.pages))
267        self.pages[0].complete = False
268        self.pages[1].continued = True
269        self.failUnlessEqual(
270            [b"foobar", b"baz"], OggPage.to_packets(self.pages))
271
272    def test_to_packets_mixed_stream(self):
273        self.pages[0].serial = 3
274        self.failUnlessRaises(ValueError, OggPage.to_packets, self.pages)
275
276    def test_to_packets_missing_sequence(self):
277        self.pages[0].sequence = 3
278        self.failUnlessRaises(ValueError, OggPage.to_packets, self.pages)
279
280    def test_to_packets_continued(self):
281        self.pages[0].continued = True
282        self.failUnlessEqual(
283            OggPage.to_packets(self.pages), [b"foo", b"bar", b"baz"])
284
285    def test_to_packets_continued_strict(self):
286        self.pages[0].continued = True
287        self.failUnlessRaises(
288            ValueError, OggPage.to_packets, self.pages, strict=True)
289
290    def test_to_packets_strict(self):
291        for page in self.pages:
292            page.complete = False
293        self.failUnlessRaises(
294            ValueError, OggPage.to_packets, self.pages, strict=True)
295
296    def test_from_packets_short_enough(self):
297        packets = [b"1" * 200, b"2" * 200, b"3" * 200]
298        pages = OggPage.from_packets(packets)
299        self.failUnlessEqual(OggPage.to_packets(pages), packets)
300
301    def test_from_packets_position(self):
302        packets = [b"1" * 100000]
303        pages = OggPage.from_packets(packets)
304        self.failUnless(len(pages) > 1)
305        for page in pages[:-1]:
306            self.failUnlessEqual(-1, page.position)
307        self.failUnlessEqual(0, pages[-1].position)
308
309    def test_from_packets_long(self):
310        packets = [b"1" * 100000, b"2" * 100000, b"3" * 100000]
311        pages = OggPage.from_packets(packets)
312        self.failIf(pages[0].complete)
313        self.failUnless(pages[1].continued)
314        self.failUnlessEqual(OggPage.to_packets(pages), packets)
315
316    def test__from_packets_try_preserve(self):
317        # if the packet layout matches, just create pages with
318        # the same layout and copy things over
319        packets = [b"1" * 100000, b"2" * 100000, b"3" * 100000]
320        pages = OggPage.from_packets(packets, sequence=42, default_size=977)
321        new_pages = OggPage._from_packets_try_preserve(packets, pages)
322        self.assertEqual(pages, new_pages)
323
324        # zero case
325        new_pages = OggPage._from_packets_try_preserve([], pages)
326        self.assertEqual(new_pages, [])
327
328        # if the layout doesn't match we should fall back to creating new
329        # pages starting with the sequence of the first given page
330        other_packets = list(packets)
331        other_packets[1] += b"\xff"
332        other_pages = OggPage.from_packets(other_packets, 42)
333        new_pages = OggPage._from_packets_try_preserve(other_packets, pages)
334        self.assertEqual(new_pages, other_pages)
335
336    def test_random_data_roundtrip(self):
337        try:
338            random_file = open("/dev/urandom", "rb")
339        except (IOError, OSError):
340            print("WARNING: Random data round trip test disabled.")
341            return
342        try:
343            for i in xrange(10):
344                num_packets = random.randrange(2, 100)
345                lengths = [random.randrange(10, 10000)
346                           for i in xrange(num_packets)]
347                packets = list(map(random_file.read, lengths))
348                self.failUnlessEqual(
349                    packets, OggPage.to_packets(OggPage.from_packets(packets)))
350        finally:
351            random_file.close()
352
353    def test_packet_exactly_255(self):
354        page = OggPage()
355        page.packets = [b"1" * 255]
356        page.complete = False
357        page2 = OggPage()
358        page2.packets = [b""]
359        page2.sequence = 1
360        page2.continued = True
361        self.failUnlessEqual(
362            [b"1" * 255], OggPage.to_packets([page, page2]))
363
364    def test_page_max_size_alone_too_big(self):
365        page = OggPage()
366        page.packets = [b"1" * 255 * 255]
367        page.complete = True
368        self.failUnlessRaises(ValueError, page.write)
369
370    def test_page_max_size(self):
371        page = OggPage()
372        page.packets = [b"1" * 255 * 255]
373        page.complete = False
374        page2 = OggPage()
375        page2.packets = [b""]
376        page2.sequence = 1
377        page2.continued = True
378        self.failUnlessEqual(
379            [b"1" * 255 * 255], OggPage.to_packets([page, page2]))
380
381    def test_complete_zero_length(self):
382        packets = [b""] * 20
383        page = OggPage.from_packets(packets)[0]
384        new_page = OggPage(BytesIO(page.write()))
385        self.failUnlessEqual(new_page, page)
386        self.failUnlessEqual(OggPage.to_packets([new_page]), packets)
387
388    def test_too_many_packets(self):
389        packets = [b"1"] * 3000
390        pages = OggPage.from_packets(packets)
391        map(OggPage.write, pages)
392        self.failUnless(len(pages) > 3000 // 255)
393
394    def test_read_max_size(self):
395        page = OggPage()
396        page.packets = [b"1" * 255 * 255]
397        page.complete = False
398        page2 = OggPage()
399        page2.packets = [b"", b"foo"]
400        page2.sequence = 1
401        page2.continued = True
402        data = page.write() + page2.write()
403        fileobj = BytesIO(data)
404        self.failUnlessEqual(OggPage(fileobj), page)
405        self.failUnlessEqual(OggPage(fileobj), page2)
406        self.failUnlessRaises(EOFError, OggPage, fileobj)
407
408    def test_invalid_version(self):
409        page = OggPage()
410        OggPage(BytesIO(page.write()))
411        page.version = 1
412        self.failUnlessRaises(OggError, OggPage, BytesIO(page.write()))
413
414    def test_not_enough_lacing(self):
415        data = OggPage().write()[:-1] + b"\x10"
416        self.failUnlessRaises(OggError, OggPage, BytesIO(data))
417
418    def test_not_enough_data(self):
419        data = OggPage().write()[:-1] + b"\x01\x10"
420        self.failUnlessRaises(OggError, OggPage, BytesIO(data))
421
422    def test_not_equal(self):
423        self.failIfEqual(OggPage(), 12)
424
425    def test_find_last(self):
426        pages = [OggPage() for i in xrange(10)]
427        for i, page in enumerate(pages):
428            page.sequence = i
429        data = BytesIO(b"".join([page.write() for page in pages]))
430        self.failUnlessEqual(
431            OggPage.find_last(data, pages[0].serial), pages[-1])
432
433    def test_find_last_none_finishing(self):
434        page = OggPage()
435        page.position = -1
436        data = BytesIO(page.write())
437        assert OggPage.find_last(data, page.serial, finishing=True) is None
438
439    def test_find_last_none_finishing_mux(self):
440        page1 = OggPage()
441        page1.last = True
442        page1.position = -1
443        page2 = OggPage()
444        page2.serial = page1.serial + 1
445        pages = [page1, page2]
446        data = BytesIO(b"".join([page.write() for page in pages]))
447
448        assert OggPage.find_last(data, page1.serial, finishing=True) is None
449        assert OggPage.find_last(data, page2.serial, finishing=True) == page2
450
451    def test_find_last_last_empty(self):
452        # https://github.com/quodlibet/mutagen/issues/308
453        pages = [OggPage() for i in xrange(10)]
454        for i, page in enumerate(pages):
455            page.sequence = i
456            page.position = i
457        pages[-1].last = True
458        pages[-1].position = -1
459        data = BytesIO(b"".join([page.write() for page in pages]))
460        page = OggPage.find_last(data, pages[-1].serial, finishing=True)
461        assert page is not None
462        assert page.position == 8
463        page = OggPage.find_last(data, pages[-1].serial, finishing=False)
464        assert page is not None
465        assert page.position == -1
466
467    def test_find_last_single_muxed(self):
468        page1 = OggPage()
469        page1.last = True
470        page2 = OggPage()
471        page2.serial = page1.serial + 1
472        pages = [page1, page2]
473        data = BytesIO(b"".join([page.write() for page in pages]))
474        assert OggPage.find_last(data, page2.serial).serial == page2.serial
475
476    def test_find_last_really_last(self):
477        pages = [OggPage() for i in xrange(10)]
478        pages[-1].last = True
479        for i, page in enumerate(pages):
480            page.sequence = i
481        data = BytesIO(b"".join([page.write() for page in pages]))
482        self.failUnlessEqual(
483            OggPage.find_last(data, pages[0].serial), pages[-1])
484
485    def test_find_last_muxed(self):
486        pages = [OggPage() for i in xrange(10)]
487        for i, page in enumerate(pages):
488            page.sequence = i
489        pages[-2].last = True
490        pages[-1].serial = pages[0].serial + 1
491        data = BytesIO(b"".join([page.write() for page in pages]))
492        self.failUnlessEqual(
493            OggPage.find_last(data, pages[0].serial), pages[-2])
494
495    def test_find_last_no_serial(self):
496        pages = [OggPage() for i in xrange(10)]
497        for i, page in enumerate(pages):
498            page.sequence = i
499        data = BytesIO(b"".join([page.write() for page in pages]))
500        self.failUnless(OggPage.find_last(data, pages[0].serial + 1) is None)
501
502    def test_find_last_invalid(self):
503        data = BytesIO(b"if you think this is an Ogg, you're crazy")
504        self.failUnlessRaises(OggError, OggPage.find_last, data, 0)
505
506    # Disabled because GStreamer will write Oggs with bad data,
507    # which we need to make a best guess for.
508    #
509    # def test_find_last_invalid_sync(self):
510    #     data = BytesIO("if you think this is an OggS, you're crazy")
511    #     self.failUnlessRaises(OggError, OggPage.find_last, data, 0)
512
513    def test_find_last_invalid_sync(self):
514        data = BytesIO(b"if you think this is an OggS, you're crazy")
515        page = OggPage.find_last(data, 0)
516        self.failIf(page)
517
518    def test_crc_py25(self):
519        # Make sure page.write can handle both signed/unsigned int
520        # return values of crc32.
521        # https://github.com/quodlibet/mutagen/issues/63
522        # http://docs.python.org/library/zlib.html#zlib.crc32
523
524        import zlib
525        old_crc = zlib.crc32
526
527        def zlib_uint(*args):
528            return (old_crc(*args) & 0xffffffff)
529
530        def zlib_int(*args):
531            return cdata.int_be(cdata.to_uint_be(old_crc(*args) & 0xffffffff))
532
533        try:
534            page = OggPage()
535            page.packets = [b"abc"]
536            zlib.crc32 = zlib_uint
537            uint_data = page.write()
538            zlib.crc32 = zlib_int
539            int_data = page.write()
540        finally:
541            zlib.crc32 = old_crc
542
543        self.failUnlessEqual(uint_data, int_data)
544
545    def tearDown(self):
546        self.fileobj.close()
547
548
549class TOggFileTypeMixin(object):
550
551    PADDING_SUPPORT = True
552
553    def scan_file(self):
554        with open(self.filename, "rb") as fileobj:
555            try:
556                while True:
557                    OggPage(fileobj)
558            except EOFError:
559                pass
560
561    def test_pprint_empty(self):
562        self.audio.pprint()
563
564    def test_pprint_stuff(self):
565        self.test_set_two_tags()
566        self.audio.pprint()
567
568    def test_length(self):
569        self.failUnlessAlmostEqual(3.7, self.audio.info.length, 1)
570
571    def test_no_tags(self):
572        self.failIf(self.audio.tags)
573        self.failIf(self.audio.tags is None)
574
575    def test_vendor_safe(self):
576        self.audio["vendor"] = "a vendor"
577        self.audio.save()
578        audio = self.Kind(self.filename)
579        self.failUnlessEqual(audio["vendor"], ["a vendor"])
580
581    def test_set_two_tags(self):
582        self.audio["foo"] = ["a"]
583        self.audio["bar"] = ["b"]
584        self.audio.save()
585        audio = self.Kind(self.filename)
586        self.failUnlessEqual(len(audio.tags.keys()), 2)
587        self.failUnlessEqual(audio["foo"], ["a"])
588        self.failUnlessEqual(audio["bar"], ["b"])
589        self.scan_file()
590
591    def test_save_twice(self):
592        self.audio.save()
593        self.audio.save()
594        self.failUnlessEqual(self.Kind(self.filename).tags, self.audio.tags)
595        self.scan_file()
596
597    def test_set_delete(self):
598        self.test_set_two_tags()
599        self.audio.tags.clear()
600        self.audio.save()
601        audio = self.Kind(self.filename)
602        self.failIf(audio.tags)
603        self.scan_file()
604
605    def test_delete(self):
606        self.test_set_two_tags()
607        self.audio.delete()
608        self.failIf(self.audio.tags)
609        audio = self.Kind(self.filename)
610        self.failIf(audio.tags)
611
612        self.audio["foobar"] = "foobar" * 1000
613        self.audio.save()
614        audio = self.Kind(self.filename)
615        self.failUnlessEqual(self.audio["foobar"], audio["foobar"])
616
617        self.scan_file()
618
619    def test_delete_remove_padding(self):
620        if not self.PADDING_SUPPORT:
621            return
622        self.audio.clear()
623        self.audio.save(padding=lambda x: 0)
624        filesize = os.path.getsize(self.audio.filename)
625        self.audio.delete()
626        # deleting shouldn't add padding
627        self.assertTrue(os.path.getsize(self.audio.filename) <= filesize)
628
629    def test_really_big(self):
630        self.audio["foo"] = "foo" * (2 ** 16)
631        self.audio["bar"] = "bar" * (2 ** 16)
632        self.audio["baz"] = "quux" * (2 ** 16)
633        self.audio.save()
634        audio = self.Kind(self.filename)
635        self.failUnlessEqual(audio["foo"], ["foo" * 2 ** 16])
636        self.failUnlessEqual(audio["bar"], ["bar" * 2 ** 16])
637        self.failUnlessEqual(audio["baz"], ["quux" * 2 ** 16])
638        self.scan_file()
639
640    def test_delete_really_big(self):
641        self.audio["foo"] = "foo" * (2 ** 16)
642        self.audio["bar"] = "bar" * (2 ** 16)
643        self.audio["baz"] = "quux" * (2 ** 16)
644        self.audio.save()
645
646        self.audio.delete()
647        audio = self.Kind(self.filename)
648        self.failIf(audio.tags)
649        self.scan_file()
650
651    def test_invalid_open(self):
652        self.failUnlessRaises(OggError, self.Kind,
653                              os.path.join(DATA_DIR, 'xing.mp3'))
654
655    def test_invalid_delete(self):
656        self.failUnlessRaises(OggError, self.audio.delete,
657                              os.path.join(DATA_DIR, 'xing.mp3'))
658
659    def test_invalid_save(self):
660        self.failUnlessRaises(OggError, self.audio.save,
661                              os.path.join(DATA_DIR, 'xing.mp3'))
662
663    def ogg_reference(self, filename):
664        self.scan_file()
665        if have_ogginfo:
666            self.assertEqual(call_ogginfo(filename), 0,
667                             msg="ogginfo failed on %s" % filename)
668
669        if have_oggz_validate:
670            if filename.endswith(".opus") and not have_oggz_validate_opus:
671                return
672            self.assertEqual(call_oggz_validate(filename), 0,
673                             msg="oggz-validate failed on %s" % filename)
674
675    def test_ogg_reference_simple_save(self):
676        self.audio.save()
677        self.ogg_reference(self.filename)
678
679    def test_ogg_reference_really_big(self):
680        self.test_really_big()
681        self.audio.save()
682        self.ogg_reference(self.filename)
683
684    def test_ogg_reference_delete(self):
685        self.audio.delete()
686        self.ogg_reference(self.filename)
687
688    def test_ogg_reference_medium_sized(self):
689        self.audio["foobar"] = "foobar" * 1000
690        self.audio.save()
691        self.ogg_reference(self.filename)
692
693    def test_ogg_reference_delete_readd(self):
694        self.audio.delete()
695        self.audio.tags.clear()
696        self.audio["foobar"] = "foobar" * 1000
697        self.audio.save()
698        self.ogg_reference(self.filename)
699
700    def test_mime_secondary(self):
701        self.failUnless('application/ogg' in self.audio.mime)
702
703    def test_padding(self):
704        if not self.PADDING_SUPPORT:
705            return
706
707        self.audio.clear()
708        self.audio["foo"] = ["bar"]
709
710        for i in [0, 1, 2, 42, 5000, 4999]:
711            self.audio.save(padding=lambda x: i)
712            new = self.Kind(self.filename)
713            self.assertEqual(new.tags._padding, i)
714            self.assertEqual(new["foo"], ["bar"])
715            self.ogg_reference(self.filename)
716
717
718def call_ogginfo(*args):
719    with open(os.devnull, 'wb') as null:
720        return subprocess.call(
721            ["ogginfo"] + list(args), stdout=null, stderr=subprocess.STDOUT)
722
723
724def call_oggz_validate(*args):
725    with open(os.devnull, 'wb') as null:
726        return subprocess.call(
727            ["oggz-validate"] + list(args),
728            stdout=null, stderr=subprocess.STDOUT)
729
730
731def get_oggz_validate_version():
732    """A version tuple or OSError if oggz-validate isn't available"""
733
734    process = subprocess.Popen(["oggz-validate", "--version"],
735                               stdout=subprocess.PIPE)
736    output, unused_err = process.communicate()
737    retcode = process.poll()
738    if retcode != 0:
739        return (0,)
740    lines = output.splitlines()
741    if not lines:
742        return (0,)
743    parts = lines[0].split()
744    if not parts:
745        return (0,)
746    try:
747        return tuple(map(int, parts[-1].split(b".")))
748    except ValueError:
749        return (0,)
750
751
752have_ogginfo = True
753try:
754    call_ogginfo()
755except OSError:
756    have_ogginfo = False
757    print("WARNING: Skipping ogginfo reference tests.")
758
759
760have_oggz_validate = True
761have_oggz_validate_opus = True
762try:
763    call_oggz_validate()
764except OSError:
765    have_oggz_validate = False
766    print("WARNING: Skipping oggz-validate reference tests.")
767else:
768    if get_oggz_validate_version() <= (0, 9, 9):
769        have_oggz_validate_opus = False
770        print("WARNING: Skipping oggz-validate reference tests for opus")
771