1# -*- coding: utf-8 -*-
2
3from mutagen._util import DictMixin, cdata, insert_bytes, delete_bytes, \
4    decode_terminated, dict_match, enum, get_size, BitReader, BitReaderError, \
5    resize_bytes, seek_end, mmap_move, verify_fileobj, fileobj_name, \
6    read_full, flags, resize_file, fallback_move, encode_endian, loadfile, \
7    intround, verify_filename
8from mutagen._compat import text_type, itervalues, iterkeys, iteritems, PY2, \
9    cBytesIO, xrange, BytesIO, builtins
10from tests import TestCase, get_temp_empty
11import os
12import random
13import tempfile
14import mmap
15import errno
16
17try:
18    import fcntl
19except ImportError:
20    fcntl = None
21
22import pytest
23
24
25def test_intround():
26    assert intround(2.5) == 2
27    assert intround(2.6) == 3
28    assert intround(2.4) == 2
29
30
31class FDict(DictMixin):
32
33    def __init__(self):
34        self.__d = {}
35        self.keys = self.__d.keys
36
37    def __getitem__(self, *args):
38        return self.__d.__getitem__(*args)
39
40    def __setitem__(self, *args):
41        return self.__d.__setitem__(*args)
42
43    def __delitem__(self, *args):
44        return self.__d.__delitem__(*args)
45
46
47class TDictMixin(TestCase):
48
49    def setUp(self):
50        self.fdict = FDict()
51        self.rdict = {}
52        self.fdict["foo"] = self.rdict["foo"] = "bar"
53
54    def test_getsetitem(self):
55        self.failUnlessEqual(self.fdict["foo"], "bar")
56        self.failUnlessRaises(KeyError, self.fdict.__getitem__, "bar")
57
58    def test_has_key_contains(self):
59        self.failUnless("foo" in self.fdict)
60        self.failIf("bar" in self.fdict)
61        if PY2:
62            self.failUnless(self.fdict.has_key("foo"))
63            self.failIf(self.fdict.has_key("bar"))
64
65    def test_iter(self):
66        self.failUnlessEqual(list(iter(self.fdict)), ["foo"])
67
68    def test_clear(self):
69        self.fdict.clear()
70        self.rdict.clear()
71        self.failIf(self.fdict)
72
73    def test_keys(self):
74        self.failUnlessEqual(list(self.fdict.keys()), list(self.rdict.keys()))
75        self.failUnlessEqual(
76            list(iterkeys(self.fdict)), list(iterkeys(self.rdict)))
77
78    def test_values(self):
79        self.failUnlessEqual(
80            list(self.fdict.values()), list(self.rdict.values()))
81        self.failUnlessEqual(
82            list(itervalues(self.fdict)), list(itervalues(self.rdict)))
83
84    def test_items(self):
85        self.failUnlessEqual(
86            list(self.fdict.items()), list(self.rdict.items()))
87        self.failUnlessEqual(
88            list(iteritems(self.fdict)), list(iteritems(self.rdict)))
89
90    def test_pop(self):
91        self.failUnlessEqual(self.fdict.pop("foo"), self.rdict.pop("foo"))
92        self.failUnlessRaises(KeyError, self.fdict.pop, "woo")
93
94    def test_pop_bad(self):
95        self.failUnlessRaises(TypeError, self.fdict.pop, "foo", 1, 2)
96
97    def test_popitem(self):
98        self.failUnlessEqual(self.fdict.popitem(), self.rdict.popitem())
99        self.failUnlessRaises(KeyError, self.fdict.popitem)
100
101    def test_update_other(self):
102        other = {"a": 1, "b": 2}
103        self.fdict.update(other)
104        self.rdict.update(other)
105
106    def test_update_other_is_list(self):
107        other = [("a", 1), ("b", 2)]
108        self.fdict.update(other)
109        self.rdict.update(dict(other))
110
111    def test_update_kwargs(self):
112        self.fdict.update(a=1, b=2)
113        # Ironically, the *real* dict doesn't support this on Python 2.3
114        other = {"a": 1, "b": 2}
115        self.rdict.update(other)
116
117    def test_setdefault(self):
118        self.fdict.setdefault("foo", "baz")
119        self.rdict.setdefault("foo", "baz")
120        self.fdict.setdefault("bar", "baz")
121        self.rdict.setdefault("bar", "baz")
122
123    def test_get(self):
124        self.failUnlessEqual(self.rdict.get("a"), self.fdict.get("a"))
125        self.failUnlessEqual(
126            self.rdict.get("a", "b"), self.fdict.get("a", "b"))
127        self.failUnlessEqual(self.rdict.get("foo"), self.fdict.get("foo"))
128
129    def test_repr(self):
130        self.failUnlessEqual(repr(self.rdict), repr(self.fdict))
131
132    def test_len(self):
133        self.failUnlessEqual(len(self.rdict), len(self.fdict))
134
135    def tearDown(self):
136        self.failUnlessEqual(self.fdict, self.rdict)
137        self.failUnlessEqual(self.rdict, self.fdict)
138
139
140class Tcdata(TestCase):
141
142    ZERO = staticmethod(lambda s: b"\x00" * s)
143    LEONE = staticmethod(lambda s: b"\x01" + b"\x00" * (s - 1))
144    BEONE = staticmethod(lambda s: b"\x00" * (s - 1) + b"\x01")
145    NEGONE = staticmethod(lambda s: b"\xff" * s)
146
147    def test_char(self):
148        self.failUnlessEqual(cdata.char(self.ZERO(1)), 0)
149        self.failUnlessEqual(cdata.char(self.LEONE(1)), 1)
150        self.failUnlessEqual(cdata.char(self.BEONE(1)), 1)
151        self.failUnlessEqual(cdata.char(self.NEGONE(1)), -1)
152        self.assertTrue(cdata.char is cdata.int8)
153        self.assertTrue(cdata.to_char is cdata.to_int8)
154        self.assertTrue(cdata.char_from is cdata.int8_from)
155        assert cdata.int8_max == 2 ** 7 - 1
156        assert cdata.int8_min == - 2 ** 7
157        assert cdata.int8_max == cdata.char_max
158        assert cdata.int8_min == cdata.char_min
159
160    def test_char_from_to(self):
161        self.assertEqual(cdata.to_char(-2), b"\xfe")
162        self.assertEqual(cdata.char_from(b"\xfe"), (-2, 1))
163        self.assertEqual(cdata.char_from(b"\x00\xfe", 1), (-2, 2))
164        self.assertRaises(cdata.error, cdata.char_from, b"\x00\xfe", 3)
165
166    def test_uchar(self):
167        self.failUnlessEqual(cdata.uchar(self.ZERO(1)), 0)
168        self.failUnlessEqual(cdata.uchar(self.LEONE(1)), 1)
169        self.failUnlessEqual(cdata.uchar(self.BEONE(1)), 1)
170        self.failUnlessEqual(cdata.uchar(self.NEGONE(1)), 255)
171        self.assertTrue(cdata.uchar is cdata.uint8)
172        self.assertTrue(cdata.to_uchar is cdata.to_uint8)
173        self.assertTrue(cdata.uchar_from is cdata.uint8_from)
174        assert cdata.uint8_max == 2 ** 8 - 1
175        assert cdata.uint8_min == 0
176        assert cdata.uint8_max == cdata.uchar_max
177        assert cdata.uint8_min == cdata.uchar_min
178
179    def test_short(self):
180        self.failUnlessEqual(cdata.short_le(self.ZERO(2)), 0)
181        self.failUnlessEqual(cdata.short_le(self.LEONE(2)), 1)
182        self.failUnlessEqual(cdata.short_le(self.BEONE(2)), 256)
183        self.failUnlessEqual(cdata.short_le(self.NEGONE(2)), -1)
184        self.assertTrue(cdata.short_le is cdata.int16_le)
185
186        self.failUnlessEqual(cdata.short_be(self.ZERO(2)), 0)
187        self.failUnlessEqual(cdata.short_be(self.LEONE(2)), 256)
188        self.failUnlessEqual(cdata.short_be(self.BEONE(2)), 1)
189        self.failUnlessEqual(cdata.short_be(self.NEGONE(2)), -1)
190        self.assertTrue(cdata.short_be is cdata.int16_be)
191
192    def test_ushort(self):
193        self.failUnlessEqual(cdata.ushort_le(self.ZERO(2)), 0)
194        self.failUnlessEqual(cdata.ushort_le(self.LEONE(2)), 1)
195        self.failUnlessEqual(cdata.ushort_le(self.BEONE(2)), 2 ** 16 >> 8)
196        self.failUnlessEqual(cdata.ushort_le(self.NEGONE(2)), 65535)
197        self.assertTrue(cdata.ushort_le is cdata.uint16_le)
198
199        self.failUnlessEqual(cdata.ushort_be(self.ZERO(2)), 0)
200        self.failUnlessEqual(cdata.ushort_be(self.LEONE(2)), 2 ** 16 >> 8)
201        self.failUnlessEqual(cdata.ushort_be(self.BEONE(2)), 1)
202        self.failUnlessEqual(cdata.ushort_be(self.NEGONE(2)), 65535)
203        self.assertTrue(cdata.ushort_be is cdata.uint16_be)
204
205    def test_int(self):
206        self.failUnlessEqual(cdata.int_le(self.ZERO(4)), 0)
207        self.failUnlessEqual(cdata.int_le(self.LEONE(4)), 1)
208        self.failUnlessEqual(cdata.int_le(self.BEONE(4)), 2 ** 32 >> 8)
209        self.failUnlessEqual(cdata.int_le(self.NEGONE(4)), -1)
210        self.assertTrue(cdata.int_le is cdata.int32_le)
211
212        self.failUnlessEqual(cdata.int_be(self.ZERO(4)), 0)
213        self.failUnlessEqual(cdata.int_be(self.LEONE(4)), 2 ** 32 >> 8)
214        self.failUnlessEqual(cdata.int_be(self.BEONE(4)), 1)
215        self.failUnlessEqual(cdata.int_be(self.NEGONE(4)), -1)
216        self.assertTrue(cdata.int_be is cdata.int32_be)
217
218    def test_uint(self):
219        self.failUnlessEqual(cdata.uint_le(self.ZERO(4)), 0)
220        self.failUnlessEqual(cdata.uint_le(self.LEONE(4)), 1)
221        self.failUnlessEqual(cdata.uint_le(self.BEONE(4)), 2 ** 32 >> 8)
222        self.failUnlessEqual(cdata.uint_le(self.NEGONE(4)), 2 ** 32 - 1)
223        self.assertTrue(cdata.uint_le is cdata.uint32_le)
224
225        self.failUnlessEqual(cdata.uint_be(self.ZERO(4)), 0)
226        self.failUnlessEqual(cdata.uint_be(self.LEONE(4)), 2 ** 32 >> 8)
227        self.failUnlessEqual(cdata.uint_be(self.BEONE(4)), 1)
228        self.failUnlessEqual(cdata.uint_be(self.NEGONE(4)), 2 ** 32 - 1)
229        self.assertTrue(cdata.uint_be is cdata.uint32_be)
230
231    def test_longlong(self):
232        self.failUnlessEqual(cdata.longlong_le(self.ZERO(8)), 0)
233        self.failUnlessEqual(cdata.longlong_le(self.LEONE(8)), 1)
234        self.failUnlessEqual(cdata.longlong_le(self.BEONE(8)), 2 ** 64 >> 8)
235        self.failUnlessEqual(cdata.longlong_le(self.NEGONE(8)), -1)
236        self.assertTrue(cdata.longlong_le is cdata.int64_le)
237
238        self.failUnlessEqual(cdata.longlong_be(self.ZERO(8)), 0)
239        self.failUnlessEqual(cdata.longlong_be(self.LEONE(8)), 2 ** 64 >> 8)
240        self.failUnlessEqual(cdata.longlong_be(self.BEONE(8)), 1)
241        self.failUnlessEqual(cdata.longlong_be(self.NEGONE(8)), -1)
242        self.assertTrue(cdata.longlong_be is cdata.int64_be)
243
244    def test_ulonglong(self):
245        self.failUnlessEqual(cdata.ulonglong_le(self.ZERO(8)), 0)
246        self.failUnlessEqual(cdata.ulonglong_le(self.LEONE(8)), 1)
247        self.failUnlessEqual(cdata.longlong_le(self.BEONE(8)), 2 ** 64 >> 8)
248        self.failUnlessEqual(cdata.ulonglong_le(self.NEGONE(8)), 2 ** 64 - 1)
249        self.assertTrue(cdata.ulonglong_le is cdata.uint64_le)
250
251        self.failUnlessEqual(cdata.ulonglong_be(self.ZERO(8)), 0)
252        self.failUnlessEqual(cdata.ulonglong_be(self.LEONE(8)), 2 ** 64 >> 8)
253        self.failUnlessEqual(cdata.longlong_be(self.BEONE(8)), 1)
254        self.failUnlessEqual(cdata.ulonglong_be(self.NEGONE(8)), 2 ** 64 - 1)
255        self.assertTrue(cdata.ulonglong_be is cdata.uint64_be)
256
257    def test_invalid_lengths(self):
258        self.failUnlessRaises(cdata.error, cdata.char, b"")
259        self.failUnlessRaises(cdata.error, cdata.uchar, b"")
260        self.failUnlessRaises(cdata.error, cdata.int_le, b"")
261        self.failUnlessRaises(cdata.error, cdata.longlong_le, b"")
262        self.failUnlessRaises(cdata.error, cdata.uint_le, b"")
263        self.failUnlessRaises(cdata.error, cdata.ulonglong_le, b"")
264        self.failUnlessRaises(cdata.error, cdata.int_be, b"")
265        self.failUnlessRaises(cdata.error, cdata.longlong_be, b"")
266        self.failUnlessRaises(cdata.error, cdata.uint_be, b"")
267        self.failUnlessRaises(cdata.error, cdata.ulonglong_be, b"")
268
269    def test_test(self):
270        self.failUnless(cdata.test_bit((1), 0))
271        self.failIf(cdata.test_bit(1, 1))
272
273        self.failUnless(cdata.test_bit(2, 1))
274        self.failIf(cdata.test_bit(2, 0))
275
276        v = (1 << 12) + (1 << 5) + 1
277        self.failUnless(cdata.test_bit(v, 0))
278        self.failUnless(cdata.test_bit(v, 5))
279        self.failUnless(cdata.test_bit(v, 12))
280        self.failIf(cdata.test_bit(v, 3))
281        self.failIf(cdata.test_bit(v, 8))
282        self.failIf(cdata.test_bit(v, 13))
283
284
285class Tresize_file(TestCase):
286
287    def get_named_file(self, content):
288        filename = get_temp_empty()
289        h = open(filename, "wb+")
290        h.write(content)
291        h.seek(0)
292        return h
293
294    def test_resize(self):
295        with self.get_named_file(b"") as h:
296            resize_file(h, 0)
297            self.assertEqual(os.path.getsize(h.name), 0)
298            self.assertRaises(ValueError, resize_file, h, -1)
299            resize_file(h, 1)
300            self.assertEqual(os.path.getsize(h.name), 1)
301            h.seek(0)
302            self.assertEqual(h.read(), b"\x00")
303            resize_file(h, 2 ** 17)
304            self.assertEqual(os.path.getsize(h.name), 2 ** 17 + 1)
305            h.seek(0)
306            self.assertEqual(h.read(), b"\x00" * (2 ** 17 + 1))
307
308    def test_resize_content(self):
309        with self.get_named_file(b"abc") as h:
310            self.assertRaises(ValueError, resize_file, h, -4)
311            resize_file(h, -1)
312            h.seek(0)
313            self.assertEqual(h.read(), b"ab")
314            resize_file(h, 2)
315            h.seek(0)
316            self.assertEqual(h.read(), b"ab\x00\x00")
317
318    def test_resize_dev_full(self):
319
320        def raise_no_space(*args):
321            raise IOError(errno.ENOSPC, os.strerror(errno.ENOSPC))
322
323        # fail on first write
324        h = BytesIO(b"abc")
325        h.write = raise_no_space
326        self.assertRaises(IOError, resize_file, h, 1)
327        h.seek(0, 2)
328        self.assertEqual(h.tell(), 3)
329
330        # fail on flush
331        h = BytesIO(b"abc")
332        h.flush = raise_no_space
333        self.assertRaises(IOError, resize_file, h, 1)
334        h.seek(0, 2)
335        self.assertEqual(h.tell(), 3)
336
337
338class TMoveMixin(object):
339
340    MOVE = None
341
342    def file(self, contents):
343        temp = tempfile.TemporaryFile()
344        temp.write(contents)
345        temp.flush()
346        temp.seek(0)
347        return temp
348
349    def read(self, fobj):
350        fobj.seek(0, 0)
351        return fobj.read()
352
353    def test_basic(self):
354        with self.file(b"abc123") as h:
355            self.MOVE(h, 0, 1, 4)
356            self.assertEqual(self.read(h), b"bc1223")
357
358        with self.file(b"abc123") as h:
359            self.MOVE(h, 1, 0, 4)
360            self.assertEqual(self.read(h), b"aabc13")
361
362    def test_invalid_params(self):
363        with self.file(b"foo") as o:
364            self.assertRaises(ValueError, self.MOVE, o, -1, 0, 0)
365            self.assertRaises(ValueError, self.MOVE, o, 0, -1, 0)
366            self.assertRaises(ValueError, self.MOVE, o, 0, 0, -1)
367
368    def test_outside_file(self):
369        with self.file(b"foo") as o:
370            self.assertRaises(ValueError, self.MOVE, o, 0, 0, 4)
371            self.assertRaises(ValueError, self.MOVE, o, 0, 1, 3)
372            self.assertRaises(ValueError, self.MOVE, o, 1, 0, 3)
373
374    def test_ok(self):
375        with self.file(b"foo") as o:
376            self.MOVE(o, 0, 1, 2)
377            self.MOVE(o, 1, 0, 2)
378
379    def test_larger_than_page_size(self):
380        off = mmap.ALLOCATIONGRANULARITY
381        with self.file(b"f" * off * 2) as o:
382            self.MOVE(o, off, off + 1, off - 1)
383            self.MOVE(o, off + 1, off, off - 1)
384
385        with self.file(b"f" * off * 2 + b"x") as o:
386            self.MOVE(o, off * 2 - 1, off * 2, 1)
387            self.assertEqual(self.read(o)[-3:], b"fxx")
388
389
390class Tfallback_move(TestCase, TMoveMixin):
391
392    MOVE = staticmethod(fallback_move)
393
394
395class MmapMove(TestCase, TMoveMixin):
396
397    MOVE = staticmethod(mmap_move)
398
399    def test_stringio(self):
400        self.assertRaises(mmap.error, mmap_move, cBytesIO(), 0, 0, 0)
401
402    def test_no_fileno(self):
403        self.assertRaises(mmap.error, mmap_move, object(), 0, 0, 0)
404
405
406class FileHandling(TestCase):
407    def file(self, contents):
408        temp = tempfile.TemporaryFile()
409        temp.write(contents)
410        temp.flush()
411        temp.seek(0)
412        return temp
413
414    def read(self, fobj):
415        fobj.seek(0, 0)
416        return fobj.read()
417
418    def test_resize_decrease(self):
419        with self.file(b'abcd') as o:
420            resize_bytes(o, 2, 1, 1)
421            self.assertEqual(self.read(o), b"abd")
422
423    def test_resize_increase(self):
424        with self.file(b'abcd') as o:
425            resize_bytes(o, 2, 4, 1)
426            self.assertEqual(self.read(o), b"abcd\x00d")
427
428    def test_resize_nothing(self):
429        with self.file(b'abcd') as o:
430            resize_bytes(o, 2, 2, 1)
431            self.assertEqual(self.read(o), b"abcd")
432
433    def test_insert_into_empty(self):
434        with self.file(b'') as o:
435            insert_bytes(o, 8, 0)
436            self.assertEqual(b'\x00' * 8, self.read(o))
437
438    def test_insert_before_one(self):
439        with self.file(b'a') as o:
440            insert_bytes(o, 8, 0)
441            self.assertEqual(b'a' + b'\x00' * 7 + b'a', self.read(o))
442
443    def test_insert_after_one(self):
444        with self.file(b'a') as o:
445            insert_bytes(o, 8, 1)
446            self.assertEqual(b'a' + b'\x00' * 8, self.read(o))
447
448    def test_insert_after_file(self):
449        with self.file(b'a') as o:
450            self.assertRaises(ValueError, insert_bytes, o, 1, 2)
451
452    def test_smaller_than_file_middle(self):
453        with self.file(b'abcdefghij') as o:
454            insert_bytes(o, 4, 4)
455            self.assertEqual(b'abcdefghefghij', self.read(o))
456
457    def test_smaller_than_file_to_end(self):
458        with self.file(b'abcdefghij') as o:
459            insert_bytes(o, 4, 6)
460            self.assertEqual(b'abcdefghijghij', self.read(o))
461
462    def test_smaller_than_file_across_end(self):
463        with self.file(b'abcdefghij') as o:
464            insert_bytes(o, 4, 8)
465            self.assertEqual(b'abcdefghij\x00\x00ij', self.read(o))
466
467    def test_smaller_than_file_at_end(self):
468        with self.file(b'abcdefghij') as o:
469            insert_bytes(o, 3, 10)
470            self.assertEqual(b'abcdefghij\x00\x00\x00', self.read(o))
471
472    def test_smaller_than_file_at_beginning(self):
473        with self.file(b'abcdefghij') as o:
474            insert_bytes(o, 3, 0)
475            self.assertEqual(b'abcabcdefghij', self.read(o))
476
477    def test_zero(self):
478        with self.file(b'abcdefghij') as o:
479            insert_bytes(o, 0, 1)
480            self.assertEqual(b'abcdefghij', self.read(o))
481
482    def test_negative(self):
483        with self.file(b'abcdefghij') as o:
484            self.assertRaises(ValueError, insert_bytes, o, 8, -1)
485
486    def test_delete_one(self):
487        with self.file(b'a') as o:
488            delete_bytes(o, 1, 0)
489            self.assertEqual(b'', self.read(o))
490
491    def test_delete_first_of_two(self):
492        with self.file(b'ab') as o:
493            delete_bytes(o, 1, 0)
494            self.assertEqual(b'b', self.read(o))
495
496    def test_delete_second_of_two(self):
497        with self.file(b'ab') as o:
498            delete_bytes(o, 1, 1)
499            self.assertEqual(b'a', self.read(o))
500
501    def test_delete_third_of_two(self):
502        with self.file(b'ab') as o:
503            self.assertRaises(ValueError, delete_bytes, o, 1, 2)
504
505    def test_delete_middle(self):
506        with self.file(b'abcdefg') as o:
507            delete_bytes(o, 3, 2)
508            self.assertEqual(b'abfg', self.read(o))
509
510    def test_delete_across_end(self):
511        with self.file(b'abcdefg') as o:
512            self.assertRaises(ValueError, delete_bytes, o, 4, 8)
513
514    def test_delete_zero(self):
515        with self.file(b'abcdefg') as o:
516            delete_bytes(o, 0, 3)
517            self.assertEqual(b'abcdefg', self.read(o))
518
519    def test_delete_negative(self):
520        with self.file(b'abcdefg') as o:
521            self.assertRaises(ValueError, delete_bytes, o, 4, -8)
522
523    def test_insert_6106_79_51760(self):
524        # This appears to be due to ANSI C limitations in read/write on rb+
525        # files. The problematic behavior only showed up in our mmap fallback
526        # code for transfers of this or similar sizes.
527        data = u''.join(map(text_type, xrange(12574)))  # 51760 bytes
528        data = data.encode("ascii")
529        with self.file(data) as o:
530            insert_bytes(o, 6106, 79)
531            self.failUnless(data[:6106 + 79] + data[79:] == self.read(o))
532
533    def test_delete_6106_79_51760(self):
534        # This appears to be due to ANSI C limitations in read/write on rb+
535        # files. The problematic behavior only showed up in our mmap fallback
536        # code for transfers of this or similar sizes.
537        data = u''.join(map(text_type, xrange(12574)))  # 51760 bytes
538        data = data.encode("ascii")
539        with self.file(data[:6106 + 79] + data[79:]) as o:
540            delete_bytes(o, 6106, 79)
541            self.failUnless(data == self.read(o))
542
543    # Generate a bunch of random insertions, apply them, delete them,
544    # and make sure everything is still correct.
545    #
546    # The num_runs and num_changes values are tuned to take about 10s
547    # on my laptop, or about 30 seconds since we we have 3 variations
548    # on insert/delete_bytes brokenness. If I ever get a faster
549    # laptop, it's probably a good idea to increase them. :)
550    def test_many_changes(self, num_runs=5, num_changes=300,
551                          min_change_size=500, max_change_size=1000,
552                          min_buffer_size=1, max_buffer_size=2000):
553        self.failUnless(min_buffer_size < min_change_size and
554                        max_buffer_size > max_change_size and
555                        min_change_size < max_change_size and
556                        min_buffer_size < max_buffer_size,
557                        "Given testing parameters make this test useless")
558        for j in xrange(num_runs):
559            data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ" * 1024
560            with self.file(data) as fobj:
561                filesize = len(data)
562                # Generate the list of changes to apply
563                changes = []
564                for i in xrange(num_changes):
565                    change_size = random.randrange(
566                        min_change_size, max_change_size)
567                    change_offset = random.randrange(0, filesize)
568                    filesize += change_size
569                    changes.append((change_offset, change_size))
570
571                # Apply the changes, and make sure they all took.
572                for offset, size in changes:
573                    buffer_size = random.randrange(
574                        min_buffer_size, max_buffer_size)
575                    insert_bytes(fobj, size, offset, BUFFER_SIZE=buffer_size)
576                fobj.seek(0)
577                self.failIfEqual(fobj.read(len(data)), data)
578                fobj.seek(0, 2)
579                self.failUnlessEqual(fobj.tell(), filesize)
580
581                # Then, undo them.
582                changes.reverse()
583                for offset, size in changes:
584                    buffer_size = random.randrange(
585                        min_buffer_size, max_buffer_size)
586                    delete_bytes(fobj, size, offset, BUFFER_SIZE=buffer_size)
587                fobj.seek(0)
588                self.failUnless(fobj.read() == data)
589
590
591class FileHandlingMockedMMap(FileHandling):
592
593    def setUp(self):
594        def MockMMap2(*args, **kwargs):
595            raise mmap.error
596
597        self._orig_mmap = mmap.mmap
598        mmap.mmap = MockMMap2
599
600    def tearDown(self):
601        mmap.mmap = self._orig_mmap
602
603
604class FileHandlingNoMMap(FileHandling):
605    """Disables mmap and makes sure it raises if it still gets used somehow"""
606
607    def setUp(self):
608        from mutagen import _util
609
610        def MockMMap2(*args, **kwargs):
611            assert False
612
613        self._orig_mmap = mmap.mmap
614        mmap.mmap = MockMMap2
615
616        _util.mmap = None
617
618    def tearDown(self):
619        from mutagen import _util
620        import mmap
621
622        _util.mmap = mmap
623        mmap.mmap = self._orig_mmap
624
625
626class Tdict_match(TestCase):
627
628    def test_match(self):
629        self.assertEqual(dict_match({"*": 1}, "a"), 1)
630        self.assertEqual(dict_match({"*": 1}, "*"), 1)
631        self.assertEqual(dict_match({"*a": 1}, "ba"), 1)
632        self.assertEqual(dict_match({"?": 1}, "b"), 1)
633        self.assertEqual(dict_match({"[ab]": 1}, "b"), 1)
634
635    def test_nomatch(self):
636        self.assertEqual(dict_match({"*a": 1}, "ab"), None)
637        self.assertEqual(dict_match({"??": 1}, "a"), None)
638        self.assertEqual(dict_match({"[ab]": 1}, "c"), None)
639        self.assertEqual(dict_match({"[ab]": 1}, "[ab]"), None)
640
641
642class Tenum(TestCase):
643
644    def test_enum(self):
645        @enum
646        class Foo(object):
647            FOO = 1
648            BAR = 3
649
650        self.assertEqual(Foo.FOO, 1)
651        self.assertTrue(isinstance(Foo.FOO, Foo))
652        self.assertEqual(repr(Foo.FOO), "<Foo.FOO: 1>")
653        self.assertEqual(repr(Foo(3)), "<Foo.BAR: 3>")
654        self.assertEqual(repr(Foo(42)), "42")
655        self.assertEqual(str(Foo(42)), "42")
656        self.assertEqual(int(Foo(42)), 42)
657        self.assertEqual(str(Foo(1)), "Foo.FOO")
658        self.assertEqual(int(Foo(1)), 1)
659
660        self.assertTrue(isinstance(str(Foo.FOO), str))
661        self.assertTrue(isinstance(repr(Foo.FOO), str))
662
663
664class Tflags(TestCase):
665
666    def test_enum(self):
667        @flags
668        class Foo(object):
669            FOO = 1
670            BAR = 2
671
672        self.assertEqual(Foo.FOO, 1)
673        self.assertTrue(isinstance(Foo.FOO, Foo))
674        self.assertEqual(repr(Foo.FOO), "<Foo.FOO: 1>")
675        self.assertEqual(repr(Foo(3)), "<Foo.FOO | Foo.BAR: 3>")
676        self.assertEqual(repr(Foo(42)), "<Foo.BAR | 40: 42>")
677        self.assertEqual(str(Foo(42)), "Foo.BAR | 40")
678        self.assertEqual(int(Foo(42)), 42)
679        self.assertEqual(str(Foo(1)), "Foo.FOO")
680        self.assertEqual(int(Foo(1)), 1)
681        self.assertEqual(str(Foo(0)), "0")
682
683        self.assertTrue(isinstance(str(Foo.FOO), str))
684        self.assertTrue(isinstance(repr(Foo.FOO), str))
685
686
687class Tverify_fileobj(TestCase):
688
689    def test_verify_fileobj_fail(self):
690        self.assertRaises(ValueError, verify_fileobj, object())
691        with tempfile.TemporaryFile(mode="rb") as h:
692            self.assertRaises(ValueError, verify_fileobj, h, writable=True)
693
694    def test_verify_fileobj(self):
695        with tempfile.TemporaryFile(mode="rb") as h:
696            verify_fileobj(h)
697
698        with tempfile.TemporaryFile(mode="rb+") as h:
699            verify_fileobj(h, writable=True)
700
701
702class Tfileobj_name(TestCase):
703
704    def test_fileobj_name_other_type(self):
705
706        class Foo(object):
707            name = 123
708
709        self.assertEqual(fileobj_name(Foo()), "123")
710
711    def test_fileobj_name(self):
712        with tempfile.TemporaryFile(mode="rb") as h:
713            self.assertEqual(fileobj_name(h), text_type(h.name))
714
715
716class Tseek_end(TestCase):
717
718    def file(self, contents):
719        temp = tempfile.TemporaryFile()
720        temp.write(contents)
721        temp.flush()
722        temp.seek(0)
723        return temp
724
725    def test_seek_end(self):
726        with self.file(b"foo") as f:
727            seek_end(f, 2)
728            self.assertEqual(f.tell(), 1)
729            seek_end(f, 3)
730            self.assertEqual(f.tell(), 0)
731            seek_end(f, 4)
732            self.assertEqual(f.tell(), 0)
733            seek_end(f, 0)
734            self.assertEqual(f.tell(), 3)
735            self.assertRaises(ValueError, seek_end, f, -1)
736
737    def test_seek_end_pos(self):
738        with self.file(b"foo") as f:
739            f.seek(10)
740            seek_end(f, 10)
741            self.assertEqual(f.tell(), 0)
742
743
744class Tloadfile(TestCase):
745
746    def test_handle_readwrite_notsup(self):
747
748        @loadfile(method=False, writable=True)
749        def file_func(filething):
750            fileobj = filething.fileobj
751            assert fileobj.read(3) == b"foo"
752            fileobj.seek(0, 2)
753            fileobj.write(b"bar")
754
755        # first a normal test
756        filename = get_temp_empty()
757        try:
758            with open(filename, "wb") as h:
759                h.write(b"foo")
760            file_func(filename)
761            with open(filename, "rb") as h:
762                assert h.read() == b"foobar"
763        finally:
764            os.unlink(filename)
765
766        # now we mock open to return raise EOPNOTSUPP in case of mixed mode.
767        # things should still work since we edit the file in memory
768        raised = []
769        old_open = open
770
771        def mock_open(name, mode, *args):
772            if "+" in mode:
773                raised.append(True)
774                raise IOError(errno.EOPNOTSUPP, "nope")
775            return old_open(name, mode, *args)
776
777        builtins.open = mock_open
778        try:
779            filename = get_temp_empty()
780            try:
781                with open(filename, "wb") as h:
782                    h.write(b"foo")
783                file_func(filename)
784                with open(filename, "rb") as h:
785                    assert h.read() == b"foobar"
786            finally:
787                os.unlink(filename)
788        finally:
789            builtins.open = old_open
790
791        assert raised
792
793    def test_filename_from_fspath(self):
794
795        class FilePath(object):
796            def __init__(self, filename):
797                self.filename = filename
798
799            def __fspath__(self):
800                return self.filename
801
802        @loadfile(method=False, writable=True)
803        def file_func(filething):
804            fileobj = filething.fileobj
805            assert fileobj.read(3) == b"foo"
806            fileobj.seek(0, 2)
807            fileobj.write(b"bar")
808
809        filename = get_temp_empty()
810        try:
811            with open(filename, "wb") as h:
812                h.write(b"foo")
813            file_func(FilePath(filename))
814            with open(filename, "rb") as h:
815                assert h.read() == b"foobar"
816        finally:
817            os.unlink(filename)
818
819        with pytest.raises(TypeError, match=r'.*__fspath__.*'):
820            file_func(FilePath(42))
821
822
823class Tread_full(TestCase):
824
825    def test_read_full(self):
826        fileobj = cBytesIO()
827        self.assertRaises(ValueError, read_full, fileobj, -3)
828        self.assertRaises(IOError, read_full, fileobj, 3)
829
830
831class Tget_size(TestCase):
832
833    def test_get_size(self):
834        f = cBytesIO(b"foo")
835        f.seek(1, 0)
836        self.assertEqual(f.tell(), 1)
837        self.assertEqual(get_size(f), 3)
838        self.assertEqual(f.tell(), 1)
839
840
841class Tencode_endian(TestCase):
842
843    def test_other(self):
844        assert encode_endian(u"\xe4", "latin-1") == b"\xe4"
845        assert encode_endian(u"\xe4", "utf-8") == b"\xc3\xa4"
846        with self.assertRaises(LookupError):
847            encode_endian(u"", "nopenope")
848        with self.assertRaises(UnicodeEncodeError):
849            assert encode_endian(u"\u2714", "latin-1")
850        assert encode_endian(u"\u2714", "latin-1", "replace") == b"?"
851
852    def test_utf_16(self):
853        assert encode_endian(u"\xe4", "utf-16", le=True) == b"\xff\xfe\xe4\x00"
854        assert encode_endian(u"\xe4", "utf-16-le") == b"\xe4\x00"
855        assert encode_endian(
856            u"\xe4", "utf-16", le=False) == b"\xfe\xff\x00\xe4"
857        assert encode_endian(u"\xe4", "utf-16-be") == b"\x00\xe4"
858
859    def test_utf_32(self):
860        assert encode_endian(u"\xe4", "utf-32", le=True) == \
861            b"\xff\xfe\x00\x00\xe4\x00\x00\x00"
862        assert encode_endian(u"\xe4", "utf-32-le") == b"\xe4\x00\x00\x00"
863        assert encode_endian(
864            u"\xe4", "utf-32", le=False) == b"\x00\x00\xfe\xff\x00\x00\x00\xe4"
865        assert encode_endian(u"\xe4", "utf-32-be") == b"\x00\x00\x00\xe4"
866
867
868class Tdecode_terminated(TestCase):
869
870    def test_all(self):
871        values = [u"", u"", u"\xe4", u"abc", u"", u""]
872
873        for codec in ["utf8", "utf-8", "utf-16", "latin-1", "utf-16be"]:
874            # NULL without the BOM
875            term = u"\x00".encode(codec)[-2:]
876            data = b"".join(v.encode(codec) + term for v in values)
877
878            for v in values:
879                dec, data = decode_terminated(data, codec)
880                self.assertEqual(dec, v)
881            self.assertEqual(data, b"")
882
883    def test_invalid(self):
884        # invalid
885        self.assertRaises(
886            UnicodeDecodeError, decode_terminated, b"\xff", "utf-8")
887        # truncated
888        self.assertRaises(
889            UnicodeDecodeError, decode_terminated, b"\xff\xfe\x00", "utf-16")
890        # not null terminated
891        self.assertRaises(ValueError, decode_terminated, b"abc", "utf-8")
892        self.assertRaises(
893            ValueError, decode_terminated, b"\xff\xfea\x00", "utf-16")
894        # invalid encoding
895        self.assertRaises(LookupError, decode_terminated, b"abc", "foobar")
896
897    def test_lax(self):
898        # missing termination
899        self.assertEqual(
900            decode_terminated(b"abc", "utf-8", strict=False), (u"abc", b""))
901
902        # missing termination and truncated data
903        truncated = u"\xe4\xe4".encode("utf-8")[:-1]
904        self.assertRaises(
905            UnicodeDecodeError, decode_terminated,
906            truncated, "utf-8", strict=False)
907
908
909class TBitReader(TestCase):
910
911    def test_bits(self):
912        data = b"\x12\x34\x56\x78\x89\xAB\xCD\xEF"
913        ref = cdata.uint64_be(data)
914
915        for i in xrange(64):
916            fo = cBytesIO(data)
917            r = BitReader(fo)
918            v = r.bits(i) << (64 - i) | r.bits(64 - i)
919            self.assertEqual(v, ref)
920
921    def test_bits_null(self):
922        r = BitReader(cBytesIO(b""))
923        self.assertEqual(r.bits(0), 0)
924
925    def test_bits_error(self):
926        r = BitReader(cBytesIO(b""))
927        self.assertRaises(ValueError, r.bits, -1)
928
929    def test_bytes_error(self):
930        r = BitReader(cBytesIO(b""))
931        self.assertRaises(ValueError, r.bytes, -1)
932
933    def test_skip_error(self):
934        r = BitReader(cBytesIO(b""))
935        self.assertRaises(ValueError, r.skip, -1)
936
937    def test_read_too_much(self):
938        r = BitReader(cBytesIO(b""))
939        self.assertEqual(r.bits(0), 0)
940        self.assertRaises(BitReaderError, r.bits, 1)
941
942    def test_skip(self):
943        r = BitReader(cBytesIO(b"\xEF"))
944        r.skip(4)
945        self.assertEqual(r.bits(4), 0xf)
946
947    def test_skip_more(self):
948        r = BitReader(cBytesIO(b"\xAB\xCD"))
949        self.assertEqual(r.bits(4), 0xa)
950        r.skip(8)
951        self.assertEqual(r.bits(4), 0xd)
952        self.assertRaises(BitReaderError, r.bits, 1)
953
954    def test_skip_too_much(self):
955        r = BitReader(cBytesIO(b"\xAB\xCD"))
956        # aligned skips don't fail, but the following read will
957        r.skip(32 + 8)
958        self.assertRaises(BitReaderError, r.bits, 1)
959        self.assertRaises(BitReaderError, r.skip, 1)
960
961    def test_bytes(self):
962        r = BitReader(cBytesIO(b"\xAB\xCD\xEF"))
963        self.assertEqual(r.bytes(2), b"\xAB\xCD")
964        self.assertEqual(r.bytes(0), b"")
965
966    def test_bytes_unaligned(self):
967        r = BitReader(cBytesIO(b"\xAB\xCD\xEF"))
968        r.skip(4)
969        self.assertEqual(r.bytes(2), b"\xBC\xDE")
970
971    def test_get_position(self):
972        r = BitReader(cBytesIO(b"\xAB\xCD"))
973        self.assertEqual(r.get_position(), 0)
974        r.bits(3)
975        self.assertEqual(r.get_position(), 3)
976        r.skip(9)
977        self.assertEqual(r.get_position(), 3 + 9)
978        r.align()
979        self.assertEqual(r.get_position(), 16)
980
981    def test_align(self):
982        r = BitReader(cBytesIO(b"\xAB\xCD\xEF"))
983        r.skip(3)
984        self.assertEqual(r.align(), 5)
985        self.assertEqual(r.get_position(), 8)
986
987    def test_is_aligned(self):
988        r = BitReader(cBytesIO(b"\xAB\xCD\xEF"))
989        self.assertTrue(r.is_aligned())
990
991        r.skip(1)
992        self.assertFalse(r.is_aligned())
993        r.skip(7)
994        self.assertTrue(r.is_aligned())
995
996        r.bits(7)
997        self.assertFalse(r.is_aligned())
998        r.bits(1)
999        self.assertTrue(r.is_aligned())
1000
1001
1002class Tverify_filename(TestCase):
1003
1004    def test_verify_filename_fail(self):
1005        self.assertRaises(ValueError, verify_filename, object())
1006
1007    def test_verify_filename(self):
1008
1009        class FilePath(object):
1010            def __init__(self, filename):
1011                self.filename = filename
1012
1013            def __fspath__(self):
1014                return self.filename
1015
1016        verify_filename(FilePath("foo"))
1017        verify_filename("foo")
1018        verify_filename(b"foo")
1019