1# -*- coding: utf-8 -*- 2 3from mutagen._util import DictMixin, cdata, insert_bytes, delete_bytes, \ 4 decode_terminated, dict_match, enum, get_size, BitReader, BitReaderError, \ 5 resize_bytes, seek_end, mmap_move, verify_fileobj, fileobj_name, \ 6 read_full, flags, resize_file, fallback_move, encode_endian, loadfile, \ 7 intround, verify_filename 8from mutagen._compat import text_type, itervalues, iterkeys, iteritems, PY2, \ 9 cBytesIO, xrange, BytesIO, builtins 10from tests import TestCase, get_temp_empty 11import os 12import random 13import tempfile 14import mmap 15import errno 16 17try: 18 import fcntl 19except ImportError: 20 fcntl = None 21 22import pytest 23 24 25def test_intround(): 26 assert intround(2.5) == 2 27 assert intround(2.6) == 3 28 assert intround(2.4) == 2 29 30 31class FDict(DictMixin): 32 33 def __init__(self): 34 self.__d = {} 35 self.keys = self.__d.keys 36 37 def __getitem__(self, *args): 38 return self.__d.__getitem__(*args) 39 40 def __setitem__(self, *args): 41 return self.__d.__setitem__(*args) 42 43 def __delitem__(self, *args): 44 return self.__d.__delitem__(*args) 45 46 47class TDictMixin(TestCase): 48 49 def setUp(self): 50 self.fdict = FDict() 51 self.rdict = {} 52 self.fdict["foo"] = self.rdict["foo"] = "bar" 53 54 def test_getsetitem(self): 55 self.failUnlessEqual(self.fdict["foo"], "bar") 56 self.failUnlessRaises(KeyError, self.fdict.__getitem__, "bar") 57 58 def test_has_key_contains(self): 59 self.failUnless("foo" in self.fdict) 60 self.failIf("bar" in self.fdict) 61 if PY2: 62 self.failUnless(self.fdict.has_key("foo")) 63 self.failIf(self.fdict.has_key("bar")) 64 65 def test_iter(self): 66 self.failUnlessEqual(list(iter(self.fdict)), ["foo"]) 67 68 def test_clear(self): 69 self.fdict.clear() 70 self.rdict.clear() 71 self.failIf(self.fdict) 72 73 def test_keys(self): 74 self.failUnlessEqual(list(self.fdict.keys()), list(self.rdict.keys())) 75 self.failUnlessEqual( 76 list(iterkeys(self.fdict)), list(iterkeys(self.rdict))) 77 78 def test_values(self): 79 self.failUnlessEqual( 80 list(self.fdict.values()), list(self.rdict.values())) 81 self.failUnlessEqual( 82 list(itervalues(self.fdict)), list(itervalues(self.rdict))) 83 84 def test_items(self): 85 self.failUnlessEqual( 86 list(self.fdict.items()), list(self.rdict.items())) 87 self.failUnlessEqual( 88 list(iteritems(self.fdict)), list(iteritems(self.rdict))) 89 90 def test_pop(self): 91 self.failUnlessEqual(self.fdict.pop("foo"), self.rdict.pop("foo")) 92 self.failUnlessRaises(KeyError, self.fdict.pop, "woo") 93 94 def test_pop_bad(self): 95 self.failUnlessRaises(TypeError, self.fdict.pop, "foo", 1, 2) 96 97 def test_popitem(self): 98 self.failUnlessEqual(self.fdict.popitem(), self.rdict.popitem()) 99 self.failUnlessRaises(KeyError, self.fdict.popitem) 100 101 def test_update_other(self): 102 other = {"a": 1, "b": 2} 103 self.fdict.update(other) 104 self.rdict.update(other) 105 106 def test_update_other_is_list(self): 107 other = [("a", 1), ("b", 2)] 108 self.fdict.update(other) 109 self.rdict.update(dict(other)) 110 111 def test_update_kwargs(self): 112 self.fdict.update(a=1, b=2) 113 # Ironically, the *real* dict doesn't support this on Python 2.3 114 other = {"a": 1, "b": 2} 115 self.rdict.update(other) 116 117 def test_setdefault(self): 118 self.fdict.setdefault("foo", "baz") 119 self.rdict.setdefault("foo", "baz") 120 self.fdict.setdefault("bar", "baz") 121 self.rdict.setdefault("bar", "baz") 122 123 def test_get(self): 124 self.failUnlessEqual(self.rdict.get("a"), self.fdict.get("a")) 125 self.failUnlessEqual( 126 self.rdict.get("a", "b"), self.fdict.get("a", "b")) 127 self.failUnlessEqual(self.rdict.get("foo"), self.fdict.get("foo")) 128 129 def test_repr(self): 130 self.failUnlessEqual(repr(self.rdict), repr(self.fdict)) 131 132 def test_len(self): 133 self.failUnlessEqual(len(self.rdict), len(self.fdict)) 134 135 def tearDown(self): 136 self.failUnlessEqual(self.fdict, self.rdict) 137 self.failUnlessEqual(self.rdict, self.fdict) 138 139 140class Tcdata(TestCase): 141 142 ZERO = staticmethod(lambda s: b"\x00" * s) 143 LEONE = staticmethod(lambda s: b"\x01" + b"\x00" * (s - 1)) 144 BEONE = staticmethod(lambda s: b"\x00" * (s - 1) + b"\x01") 145 NEGONE = staticmethod(lambda s: b"\xff" * s) 146 147 def test_char(self): 148 self.failUnlessEqual(cdata.char(self.ZERO(1)), 0) 149 self.failUnlessEqual(cdata.char(self.LEONE(1)), 1) 150 self.failUnlessEqual(cdata.char(self.BEONE(1)), 1) 151 self.failUnlessEqual(cdata.char(self.NEGONE(1)), -1) 152 self.assertTrue(cdata.char is cdata.int8) 153 self.assertTrue(cdata.to_char is cdata.to_int8) 154 self.assertTrue(cdata.char_from is cdata.int8_from) 155 assert cdata.int8_max == 2 ** 7 - 1 156 assert cdata.int8_min == - 2 ** 7 157 assert cdata.int8_max == cdata.char_max 158 assert cdata.int8_min == cdata.char_min 159 160 def test_char_from_to(self): 161 self.assertEqual(cdata.to_char(-2), b"\xfe") 162 self.assertEqual(cdata.char_from(b"\xfe"), (-2, 1)) 163 self.assertEqual(cdata.char_from(b"\x00\xfe", 1), (-2, 2)) 164 self.assertRaises(cdata.error, cdata.char_from, b"\x00\xfe", 3) 165 166 def test_uchar(self): 167 self.failUnlessEqual(cdata.uchar(self.ZERO(1)), 0) 168 self.failUnlessEqual(cdata.uchar(self.LEONE(1)), 1) 169 self.failUnlessEqual(cdata.uchar(self.BEONE(1)), 1) 170 self.failUnlessEqual(cdata.uchar(self.NEGONE(1)), 255) 171 self.assertTrue(cdata.uchar is cdata.uint8) 172 self.assertTrue(cdata.to_uchar is cdata.to_uint8) 173 self.assertTrue(cdata.uchar_from is cdata.uint8_from) 174 assert cdata.uint8_max == 2 ** 8 - 1 175 assert cdata.uint8_min == 0 176 assert cdata.uint8_max == cdata.uchar_max 177 assert cdata.uint8_min == cdata.uchar_min 178 179 def test_short(self): 180 self.failUnlessEqual(cdata.short_le(self.ZERO(2)), 0) 181 self.failUnlessEqual(cdata.short_le(self.LEONE(2)), 1) 182 self.failUnlessEqual(cdata.short_le(self.BEONE(2)), 256) 183 self.failUnlessEqual(cdata.short_le(self.NEGONE(2)), -1) 184 self.assertTrue(cdata.short_le is cdata.int16_le) 185 186 self.failUnlessEqual(cdata.short_be(self.ZERO(2)), 0) 187 self.failUnlessEqual(cdata.short_be(self.LEONE(2)), 256) 188 self.failUnlessEqual(cdata.short_be(self.BEONE(2)), 1) 189 self.failUnlessEqual(cdata.short_be(self.NEGONE(2)), -1) 190 self.assertTrue(cdata.short_be is cdata.int16_be) 191 192 def test_ushort(self): 193 self.failUnlessEqual(cdata.ushort_le(self.ZERO(2)), 0) 194 self.failUnlessEqual(cdata.ushort_le(self.LEONE(2)), 1) 195 self.failUnlessEqual(cdata.ushort_le(self.BEONE(2)), 2 ** 16 >> 8) 196 self.failUnlessEqual(cdata.ushort_le(self.NEGONE(2)), 65535) 197 self.assertTrue(cdata.ushort_le is cdata.uint16_le) 198 199 self.failUnlessEqual(cdata.ushort_be(self.ZERO(2)), 0) 200 self.failUnlessEqual(cdata.ushort_be(self.LEONE(2)), 2 ** 16 >> 8) 201 self.failUnlessEqual(cdata.ushort_be(self.BEONE(2)), 1) 202 self.failUnlessEqual(cdata.ushort_be(self.NEGONE(2)), 65535) 203 self.assertTrue(cdata.ushort_be is cdata.uint16_be) 204 205 def test_int(self): 206 self.failUnlessEqual(cdata.int_le(self.ZERO(4)), 0) 207 self.failUnlessEqual(cdata.int_le(self.LEONE(4)), 1) 208 self.failUnlessEqual(cdata.int_le(self.BEONE(4)), 2 ** 32 >> 8) 209 self.failUnlessEqual(cdata.int_le(self.NEGONE(4)), -1) 210 self.assertTrue(cdata.int_le is cdata.int32_le) 211 212 self.failUnlessEqual(cdata.int_be(self.ZERO(4)), 0) 213 self.failUnlessEqual(cdata.int_be(self.LEONE(4)), 2 ** 32 >> 8) 214 self.failUnlessEqual(cdata.int_be(self.BEONE(4)), 1) 215 self.failUnlessEqual(cdata.int_be(self.NEGONE(4)), -1) 216 self.assertTrue(cdata.int_be is cdata.int32_be) 217 218 def test_uint(self): 219 self.failUnlessEqual(cdata.uint_le(self.ZERO(4)), 0) 220 self.failUnlessEqual(cdata.uint_le(self.LEONE(4)), 1) 221 self.failUnlessEqual(cdata.uint_le(self.BEONE(4)), 2 ** 32 >> 8) 222 self.failUnlessEqual(cdata.uint_le(self.NEGONE(4)), 2 ** 32 - 1) 223 self.assertTrue(cdata.uint_le is cdata.uint32_le) 224 225 self.failUnlessEqual(cdata.uint_be(self.ZERO(4)), 0) 226 self.failUnlessEqual(cdata.uint_be(self.LEONE(4)), 2 ** 32 >> 8) 227 self.failUnlessEqual(cdata.uint_be(self.BEONE(4)), 1) 228 self.failUnlessEqual(cdata.uint_be(self.NEGONE(4)), 2 ** 32 - 1) 229 self.assertTrue(cdata.uint_be is cdata.uint32_be) 230 231 def test_longlong(self): 232 self.failUnlessEqual(cdata.longlong_le(self.ZERO(8)), 0) 233 self.failUnlessEqual(cdata.longlong_le(self.LEONE(8)), 1) 234 self.failUnlessEqual(cdata.longlong_le(self.BEONE(8)), 2 ** 64 >> 8) 235 self.failUnlessEqual(cdata.longlong_le(self.NEGONE(8)), -1) 236 self.assertTrue(cdata.longlong_le is cdata.int64_le) 237 238 self.failUnlessEqual(cdata.longlong_be(self.ZERO(8)), 0) 239 self.failUnlessEqual(cdata.longlong_be(self.LEONE(8)), 2 ** 64 >> 8) 240 self.failUnlessEqual(cdata.longlong_be(self.BEONE(8)), 1) 241 self.failUnlessEqual(cdata.longlong_be(self.NEGONE(8)), -1) 242 self.assertTrue(cdata.longlong_be is cdata.int64_be) 243 244 def test_ulonglong(self): 245 self.failUnlessEqual(cdata.ulonglong_le(self.ZERO(8)), 0) 246 self.failUnlessEqual(cdata.ulonglong_le(self.LEONE(8)), 1) 247 self.failUnlessEqual(cdata.longlong_le(self.BEONE(8)), 2 ** 64 >> 8) 248 self.failUnlessEqual(cdata.ulonglong_le(self.NEGONE(8)), 2 ** 64 - 1) 249 self.assertTrue(cdata.ulonglong_le is cdata.uint64_le) 250 251 self.failUnlessEqual(cdata.ulonglong_be(self.ZERO(8)), 0) 252 self.failUnlessEqual(cdata.ulonglong_be(self.LEONE(8)), 2 ** 64 >> 8) 253 self.failUnlessEqual(cdata.longlong_be(self.BEONE(8)), 1) 254 self.failUnlessEqual(cdata.ulonglong_be(self.NEGONE(8)), 2 ** 64 - 1) 255 self.assertTrue(cdata.ulonglong_be is cdata.uint64_be) 256 257 def test_invalid_lengths(self): 258 self.failUnlessRaises(cdata.error, cdata.char, b"") 259 self.failUnlessRaises(cdata.error, cdata.uchar, b"") 260 self.failUnlessRaises(cdata.error, cdata.int_le, b"") 261 self.failUnlessRaises(cdata.error, cdata.longlong_le, b"") 262 self.failUnlessRaises(cdata.error, cdata.uint_le, b"") 263 self.failUnlessRaises(cdata.error, cdata.ulonglong_le, b"") 264 self.failUnlessRaises(cdata.error, cdata.int_be, b"") 265 self.failUnlessRaises(cdata.error, cdata.longlong_be, b"") 266 self.failUnlessRaises(cdata.error, cdata.uint_be, b"") 267 self.failUnlessRaises(cdata.error, cdata.ulonglong_be, b"") 268 269 def test_test(self): 270 self.failUnless(cdata.test_bit((1), 0)) 271 self.failIf(cdata.test_bit(1, 1)) 272 273 self.failUnless(cdata.test_bit(2, 1)) 274 self.failIf(cdata.test_bit(2, 0)) 275 276 v = (1 << 12) + (1 << 5) + 1 277 self.failUnless(cdata.test_bit(v, 0)) 278 self.failUnless(cdata.test_bit(v, 5)) 279 self.failUnless(cdata.test_bit(v, 12)) 280 self.failIf(cdata.test_bit(v, 3)) 281 self.failIf(cdata.test_bit(v, 8)) 282 self.failIf(cdata.test_bit(v, 13)) 283 284 285class Tresize_file(TestCase): 286 287 def get_named_file(self, content): 288 filename = get_temp_empty() 289 h = open(filename, "wb+") 290 h.write(content) 291 h.seek(0) 292 return h 293 294 def test_resize(self): 295 with self.get_named_file(b"") as h: 296 resize_file(h, 0) 297 self.assertEqual(os.path.getsize(h.name), 0) 298 self.assertRaises(ValueError, resize_file, h, -1) 299 resize_file(h, 1) 300 self.assertEqual(os.path.getsize(h.name), 1) 301 h.seek(0) 302 self.assertEqual(h.read(), b"\x00") 303 resize_file(h, 2 ** 17) 304 self.assertEqual(os.path.getsize(h.name), 2 ** 17 + 1) 305 h.seek(0) 306 self.assertEqual(h.read(), b"\x00" * (2 ** 17 + 1)) 307 308 def test_resize_content(self): 309 with self.get_named_file(b"abc") as h: 310 self.assertRaises(ValueError, resize_file, h, -4) 311 resize_file(h, -1) 312 h.seek(0) 313 self.assertEqual(h.read(), b"ab") 314 resize_file(h, 2) 315 h.seek(0) 316 self.assertEqual(h.read(), b"ab\x00\x00") 317 318 def test_resize_dev_full(self): 319 320 def raise_no_space(*args): 321 raise IOError(errno.ENOSPC, os.strerror(errno.ENOSPC)) 322 323 # fail on first write 324 h = BytesIO(b"abc") 325 h.write = raise_no_space 326 self.assertRaises(IOError, resize_file, h, 1) 327 h.seek(0, 2) 328 self.assertEqual(h.tell(), 3) 329 330 # fail on flush 331 h = BytesIO(b"abc") 332 h.flush = raise_no_space 333 self.assertRaises(IOError, resize_file, h, 1) 334 h.seek(0, 2) 335 self.assertEqual(h.tell(), 3) 336 337 338class TMoveMixin(object): 339 340 MOVE = None 341 342 def file(self, contents): 343 temp = tempfile.TemporaryFile() 344 temp.write(contents) 345 temp.flush() 346 temp.seek(0) 347 return temp 348 349 def read(self, fobj): 350 fobj.seek(0, 0) 351 return fobj.read() 352 353 def test_basic(self): 354 with self.file(b"abc123") as h: 355 self.MOVE(h, 0, 1, 4) 356 self.assertEqual(self.read(h), b"bc1223") 357 358 with self.file(b"abc123") as h: 359 self.MOVE(h, 1, 0, 4) 360 self.assertEqual(self.read(h), b"aabc13") 361 362 def test_invalid_params(self): 363 with self.file(b"foo") as o: 364 self.assertRaises(ValueError, self.MOVE, o, -1, 0, 0) 365 self.assertRaises(ValueError, self.MOVE, o, 0, -1, 0) 366 self.assertRaises(ValueError, self.MOVE, o, 0, 0, -1) 367 368 def test_outside_file(self): 369 with self.file(b"foo") as o: 370 self.assertRaises(ValueError, self.MOVE, o, 0, 0, 4) 371 self.assertRaises(ValueError, self.MOVE, o, 0, 1, 3) 372 self.assertRaises(ValueError, self.MOVE, o, 1, 0, 3) 373 374 def test_ok(self): 375 with self.file(b"foo") as o: 376 self.MOVE(o, 0, 1, 2) 377 self.MOVE(o, 1, 0, 2) 378 379 def test_larger_than_page_size(self): 380 off = mmap.ALLOCATIONGRANULARITY 381 with self.file(b"f" * off * 2) as o: 382 self.MOVE(o, off, off + 1, off - 1) 383 self.MOVE(o, off + 1, off, off - 1) 384 385 with self.file(b"f" * off * 2 + b"x") as o: 386 self.MOVE(o, off * 2 - 1, off * 2, 1) 387 self.assertEqual(self.read(o)[-3:], b"fxx") 388 389 390class Tfallback_move(TestCase, TMoveMixin): 391 392 MOVE = staticmethod(fallback_move) 393 394 395class MmapMove(TestCase, TMoveMixin): 396 397 MOVE = staticmethod(mmap_move) 398 399 def test_stringio(self): 400 self.assertRaises(mmap.error, mmap_move, cBytesIO(), 0, 0, 0) 401 402 def test_no_fileno(self): 403 self.assertRaises(mmap.error, mmap_move, object(), 0, 0, 0) 404 405 406class FileHandling(TestCase): 407 def file(self, contents): 408 temp = tempfile.TemporaryFile() 409 temp.write(contents) 410 temp.flush() 411 temp.seek(0) 412 return temp 413 414 def read(self, fobj): 415 fobj.seek(0, 0) 416 return fobj.read() 417 418 def test_resize_decrease(self): 419 with self.file(b'abcd') as o: 420 resize_bytes(o, 2, 1, 1) 421 self.assertEqual(self.read(o), b"abd") 422 423 def test_resize_increase(self): 424 with self.file(b'abcd') as o: 425 resize_bytes(o, 2, 4, 1) 426 self.assertEqual(self.read(o), b"abcd\x00d") 427 428 def test_resize_nothing(self): 429 with self.file(b'abcd') as o: 430 resize_bytes(o, 2, 2, 1) 431 self.assertEqual(self.read(o), b"abcd") 432 433 def test_insert_into_empty(self): 434 with self.file(b'') as o: 435 insert_bytes(o, 8, 0) 436 self.assertEqual(b'\x00' * 8, self.read(o)) 437 438 def test_insert_before_one(self): 439 with self.file(b'a') as o: 440 insert_bytes(o, 8, 0) 441 self.assertEqual(b'a' + b'\x00' * 7 + b'a', self.read(o)) 442 443 def test_insert_after_one(self): 444 with self.file(b'a') as o: 445 insert_bytes(o, 8, 1) 446 self.assertEqual(b'a' + b'\x00' * 8, self.read(o)) 447 448 def test_insert_after_file(self): 449 with self.file(b'a') as o: 450 self.assertRaises(ValueError, insert_bytes, o, 1, 2) 451 452 def test_smaller_than_file_middle(self): 453 with self.file(b'abcdefghij') as o: 454 insert_bytes(o, 4, 4) 455 self.assertEqual(b'abcdefghefghij', self.read(o)) 456 457 def test_smaller_than_file_to_end(self): 458 with self.file(b'abcdefghij') as o: 459 insert_bytes(o, 4, 6) 460 self.assertEqual(b'abcdefghijghij', self.read(o)) 461 462 def test_smaller_than_file_across_end(self): 463 with self.file(b'abcdefghij') as o: 464 insert_bytes(o, 4, 8) 465 self.assertEqual(b'abcdefghij\x00\x00ij', self.read(o)) 466 467 def test_smaller_than_file_at_end(self): 468 with self.file(b'abcdefghij') as o: 469 insert_bytes(o, 3, 10) 470 self.assertEqual(b'abcdefghij\x00\x00\x00', self.read(o)) 471 472 def test_smaller_than_file_at_beginning(self): 473 with self.file(b'abcdefghij') as o: 474 insert_bytes(o, 3, 0) 475 self.assertEqual(b'abcabcdefghij', self.read(o)) 476 477 def test_zero(self): 478 with self.file(b'abcdefghij') as o: 479 insert_bytes(o, 0, 1) 480 self.assertEqual(b'abcdefghij', self.read(o)) 481 482 def test_negative(self): 483 with self.file(b'abcdefghij') as o: 484 self.assertRaises(ValueError, insert_bytes, o, 8, -1) 485 486 def test_delete_one(self): 487 with self.file(b'a') as o: 488 delete_bytes(o, 1, 0) 489 self.assertEqual(b'', self.read(o)) 490 491 def test_delete_first_of_two(self): 492 with self.file(b'ab') as o: 493 delete_bytes(o, 1, 0) 494 self.assertEqual(b'b', self.read(o)) 495 496 def test_delete_second_of_two(self): 497 with self.file(b'ab') as o: 498 delete_bytes(o, 1, 1) 499 self.assertEqual(b'a', self.read(o)) 500 501 def test_delete_third_of_two(self): 502 with self.file(b'ab') as o: 503 self.assertRaises(ValueError, delete_bytes, o, 1, 2) 504 505 def test_delete_middle(self): 506 with self.file(b'abcdefg') as o: 507 delete_bytes(o, 3, 2) 508 self.assertEqual(b'abfg', self.read(o)) 509 510 def test_delete_across_end(self): 511 with self.file(b'abcdefg') as o: 512 self.assertRaises(ValueError, delete_bytes, o, 4, 8) 513 514 def test_delete_zero(self): 515 with self.file(b'abcdefg') as o: 516 delete_bytes(o, 0, 3) 517 self.assertEqual(b'abcdefg', self.read(o)) 518 519 def test_delete_negative(self): 520 with self.file(b'abcdefg') as o: 521 self.assertRaises(ValueError, delete_bytes, o, 4, -8) 522 523 def test_insert_6106_79_51760(self): 524 # This appears to be due to ANSI C limitations in read/write on rb+ 525 # files. The problematic behavior only showed up in our mmap fallback 526 # code for transfers of this or similar sizes. 527 data = u''.join(map(text_type, xrange(12574))) # 51760 bytes 528 data = data.encode("ascii") 529 with self.file(data) as o: 530 insert_bytes(o, 6106, 79) 531 self.failUnless(data[:6106 + 79] + data[79:] == self.read(o)) 532 533 def test_delete_6106_79_51760(self): 534 # This appears to be due to ANSI C limitations in read/write on rb+ 535 # files. The problematic behavior only showed up in our mmap fallback 536 # code for transfers of this or similar sizes. 537 data = u''.join(map(text_type, xrange(12574))) # 51760 bytes 538 data = data.encode("ascii") 539 with self.file(data[:6106 + 79] + data[79:]) as o: 540 delete_bytes(o, 6106, 79) 541 self.failUnless(data == self.read(o)) 542 543 # Generate a bunch of random insertions, apply them, delete them, 544 # and make sure everything is still correct. 545 # 546 # The num_runs and num_changes values are tuned to take about 10s 547 # on my laptop, or about 30 seconds since we we have 3 variations 548 # on insert/delete_bytes brokenness. If I ever get a faster 549 # laptop, it's probably a good idea to increase them. :) 550 def test_many_changes(self, num_runs=5, num_changes=300, 551 min_change_size=500, max_change_size=1000, 552 min_buffer_size=1, max_buffer_size=2000): 553 self.failUnless(min_buffer_size < min_change_size and 554 max_buffer_size > max_change_size and 555 min_change_size < max_change_size and 556 min_buffer_size < max_buffer_size, 557 "Given testing parameters make this test useless") 558 for j in xrange(num_runs): 559 data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ" * 1024 560 with self.file(data) as fobj: 561 filesize = len(data) 562 # Generate the list of changes to apply 563 changes = [] 564 for i in xrange(num_changes): 565 change_size = random.randrange( 566 min_change_size, max_change_size) 567 change_offset = random.randrange(0, filesize) 568 filesize += change_size 569 changes.append((change_offset, change_size)) 570 571 # Apply the changes, and make sure they all took. 572 for offset, size in changes: 573 buffer_size = random.randrange( 574 min_buffer_size, max_buffer_size) 575 insert_bytes(fobj, size, offset, BUFFER_SIZE=buffer_size) 576 fobj.seek(0) 577 self.failIfEqual(fobj.read(len(data)), data) 578 fobj.seek(0, 2) 579 self.failUnlessEqual(fobj.tell(), filesize) 580 581 # Then, undo them. 582 changes.reverse() 583 for offset, size in changes: 584 buffer_size = random.randrange( 585 min_buffer_size, max_buffer_size) 586 delete_bytes(fobj, size, offset, BUFFER_SIZE=buffer_size) 587 fobj.seek(0) 588 self.failUnless(fobj.read() == data) 589 590 591class FileHandlingMockedMMap(FileHandling): 592 593 def setUp(self): 594 def MockMMap2(*args, **kwargs): 595 raise mmap.error 596 597 self._orig_mmap = mmap.mmap 598 mmap.mmap = MockMMap2 599 600 def tearDown(self): 601 mmap.mmap = self._orig_mmap 602 603 604class FileHandlingNoMMap(FileHandling): 605 """Disables mmap and makes sure it raises if it still gets used somehow""" 606 607 def setUp(self): 608 from mutagen import _util 609 610 def MockMMap2(*args, **kwargs): 611 assert False 612 613 self._orig_mmap = mmap.mmap 614 mmap.mmap = MockMMap2 615 616 _util.mmap = None 617 618 def tearDown(self): 619 from mutagen import _util 620 import mmap 621 622 _util.mmap = mmap 623 mmap.mmap = self._orig_mmap 624 625 626class Tdict_match(TestCase): 627 628 def test_match(self): 629 self.assertEqual(dict_match({"*": 1}, "a"), 1) 630 self.assertEqual(dict_match({"*": 1}, "*"), 1) 631 self.assertEqual(dict_match({"*a": 1}, "ba"), 1) 632 self.assertEqual(dict_match({"?": 1}, "b"), 1) 633 self.assertEqual(dict_match({"[ab]": 1}, "b"), 1) 634 635 def test_nomatch(self): 636 self.assertEqual(dict_match({"*a": 1}, "ab"), None) 637 self.assertEqual(dict_match({"??": 1}, "a"), None) 638 self.assertEqual(dict_match({"[ab]": 1}, "c"), None) 639 self.assertEqual(dict_match({"[ab]": 1}, "[ab]"), None) 640 641 642class Tenum(TestCase): 643 644 def test_enum(self): 645 @enum 646 class Foo(object): 647 FOO = 1 648 BAR = 3 649 650 self.assertEqual(Foo.FOO, 1) 651 self.assertTrue(isinstance(Foo.FOO, Foo)) 652 self.assertEqual(repr(Foo.FOO), "<Foo.FOO: 1>") 653 self.assertEqual(repr(Foo(3)), "<Foo.BAR: 3>") 654 self.assertEqual(repr(Foo(42)), "42") 655 self.assertEqual(str(Foo(42)), "42") 656 self.assertEqual(int(Foo(42)), 42) 657 self.assertEqual(str(Foo(1)), "Foo.FOO") 658 self.assertEqual(int(Foo(1)), 1) 659 660 self.assertTrue(isinstance(str(Foo.FOO), str)) 661 self.assertTrue(isinstance(repr(Foo.FOO), str)) 662 663 664class Tflags(TestCase): 665 666 def test_enum(self): 667 @flags 668 class Foo(object): 669 FOO = 1 670 BAR = 2 671 672 self.assertEqual(Foo.FOO, 1) 673 self.assertTrue(isinstance(Foo.FOO, Foo)) 674 self.assertEqual(repr(Foo.FOO), "<Foo.FOO: 1>") 675 self.assertEqual(repr(Foo(3)), "<Foo.FOO | Foo.BAR: 3>") 676 self.assertEqual(repr(Foo(42)), "<Foo.BAR | 40: 42>") 677 self.assertEqual(str(Foo(42)), "Foo.BAR | 40") 678 self.assertEqual(int(Foo(42)), 42) 679 self.assertEqual(str(Foo(1)), "Foo.FOO") 680 self.assertEqual(int(Foo(1)), 1) 681 self.assertEqual(str(Foo(0)), "0") 682 683 self.assertTrue(isinstance(str(Foo.FOO), str)) 684 self.assertTrue(isinstance(repr(Foo.FOO), str)) 685 686 687class Tverify_fileobj(TestCase): 688 689 def test_verify_fileobj_fail(self): 690 self.assertRaises(ValueError, verify_fileobj, object()) 691 with tempfile.TemporaryFile(mode="rb") as h: 692 self.assertRaises(ValueError, verify_fileobj, h, writable=True) 693 694 def test_verify_fileobj(self): 695 with tempfile.TemporaryFile(mode="rb") as h: 696 verify_fileobj(h) 697 698 with tempfile.TemporaryFile(mode="rb+") as h: 699 verify_fileobj(h, writable=True) 700 701 702class Tfileobj_name(TestCase): 703 704 def test_fileobj_name_other_type(self): 705 706 class Foo(object): 707 name = 123 708 709 self.assertEqual(fileobj_name(Foo()), "123") 710 711 def test_fileobj_name(self): 712 with tempfile.TemporaryFile(mode="rb") as h: 713 self.assertEqual(fileobj_name(h), text_type(h.name)) 714 715 716class Tseek_end(TestCase): 717 718 def file(self, contents): 719 temp = tempfile.TemporaryFile() 720 temp.write(contents) 721 temp.flush() 722 temp.seek(0) 723 return temp 724 725 def test_seek_end(self): 726 with self.file(b"foo") as f: 727 seek_end(f, 2) 728 self.assertEqual(f.tell(), 1) 729 seek_end(f, 3) 730 self.assertEqual(f.tell(), 0) 731 seek_end(f, 4) 732 self.assertEqual(f.tell(), 0) 733 seek_end(f, 0) 734 self.assertEqual(f.tell(), 3) 735 self.assertRaises(ValueError, seek_end, f, -1) 736 737 def test_seek_end_pos(self): 738 with self.file(b"foo") as f: 739 f.seek(10) 740 seek_end(f, 10) 741 self.assertEqual(f.tell(), 0) 742 743 744class Tloadfile(TestCase): 745 746 def test_handle_readwrite_notsup(self): 747 748 @loadfile(method=False, writable=True) 749 def file_func(filething): 750 fileobj = filething.fileobj 751 assert fileobj.read(3) == b"foo" 752 fileobj.seek(0, 2) 753 fileobj.write(b"bar") 754 755 # first a normal test 756 filename = get_temp_empty() 757 try: 758 with open(filename, "wb") as h: 759 h.write(b"foo") 760 file_func(filename) 761 with open(filename, "rb") as h: 762 assert h.read() == b"foobar" 763 finally: 764 os.unlink(filename) 765 766 # now we mock open to return raise EOPNOTSUPP in case of mixed mode. 767 # things should still work since we edit the file in memory 768 raised = [] 769 old_open = open 770 771 def mock_open(name, mode, *args): 772 if "+" in mode: 773 raised.append(True) 774 raise IOError(errno.EOPNOTSUPP, "nope") 775 return old_open(name, mode, *args) 776 777 builtins.open = mock_open 778 try: 779 filename = get_temp_empty() 780 try: 781 with open(filename, "wb") as h: 782 h.write(b"foo") 783 file_func(filename) 784 with open(filename, "rb") as h: 785 assert h.read() == b"foobar" 786 finally: 787 os.unlink(filename) 788 finally: 789 builtins.open = old_open 790 791 assert raised 792 793 def test_filename_from_fspath(self): 794 795 class FilePath(object): 796 def __init__(self, filename): 797 self.filename = filename 798 799 def __fspath__(self): 800 return self.filename 801 802 @loadfile(method=False, writable=True) 803 def file_func(filething): 804 fileobj = filething.fileobj 805 assert fileobj.read(3) == b"foo" 806 fileobj.seek(0, 2) 807 fileobj.write(b"bar") 808 809 filename = get_temp_empty() 810 try: 811 with open(filename, "wb") as h: 812 h.write(b"foo") 813 file_func(FilePath(filename)) 814 with open(filename, "rb") as h: 815 assert h.read() == b"foobar" 816 finally: 817 os.unlink(filename) 818 819 with pytest.raises(TypeError, match=r'.*__fspath__.*'): 820 file_func(FilePath(42)) 821 822 823class Tread_full(TestCase): 824 825 def test_read_full(self): 826 fileobj = cBytesIO() 827 self.assertRaises(ValueError, read_full, fileobj, -3) 828 self.assertRaises(IOError, read_full, fileobj, 3) 829 830 831class Tget_size(TestCase): 832 833 def test_get_size(self): 834 f = cBytesIO(b"foo") 835 f.seek(1, 0) 836 self.assertEqual(f.tell(), 1) 837 self.assertEqual(get_size(f), 3) 838 self.assertEqual(f.tell(), 1) 839 840 841class Tencode_endian(TestCase): 842 843 def test_other(self): 844 assert encode_endian(u"\xe4", "latin-1") == b"\xe4" 845 assert encode_endian(u"\xe4", "utf-8") == b"\xc3\xa4" 846 with self.assertRaises(LookupError): 847 encode_endian(u"", "nopenope") 848 with self.assertRaises(UnicodeEncodeError): 849 assert encode_endian(u"\u2714", "latin-1") 850 assert encode_endian(u"\u2714", "latin-1", "replace") == b"?" 851 852 def test_utf_16(self): 853 assert encode_endian(u"\xe4", "utf-16", le=True) == b"\xff\xfe\xe4\x00" 854 assert encode_endian(u"\xe4", "utf-16-le") == b"\xe4\x00" 855 assert encode_endian( 856 u"\xe4", "utf-16", le=False) == b"\xfe\xff\x00\xe4" 857 assert encode_endian(u"\xe4", "utf-16-be") == b"\x00\xe4" 858 859 def test_utf_32(self): 860 assert encode_endian(u"\xe4", "utf-32", le=True) == \ 861 b"\xff\xfe\x00\x00\xe4\x00\x00\x00" 862 assert encode_endian(u"\xe4", "utf-32-le") == b"\xe4\x00\x00\x00" 863 assert encode_endian( 864 u"\xe4", "utf-32", le=False) == b"\x00\x00\xfe\xff\x00\x00\x00\xe4" 865 assert encode_endian(u"\xe4", "utf-32-be") == b"\x00\x00\x00\xe4" 866 867 868class Tdecode_terminated(TestCase): 869 870 def test_all(self): 871 values = [u"", u"", u"\xe4", u"abc", u"", u""] 872 873 for codec in ["utf8", "utf-8", "utf-16", "latin-1", "utf-16be"]: 874 # NULL without the BOM 875 term = u"\x00".encode(codec)[-2:] 876 data = b"".join(v.encode(codec) + term for v in values) 877 878 for v in values: 879 dec, data = decode_terminated(data, codec) 880 self.assertEqual(dec, v) 881 self.assertEqual(data, b"") 882 883 def test_invalid(self): 884 # invalid 885 self.assertRaises( 886 UnicodeDecodeError, decode_terminated, b"\xff", "utf-8") 887 # truncated 888 self.assertRaises( 889 UnicodeDecodeError, decode_terminated, b"\xff\xfe\x00", "utf-16") 890 # not null terminated 891 self.assertRaises(ValueError, decode_terminated, b"abc", "utf-8") 892 self.assertRaises( 893 ValueError, decode_terminated, b"\xff\xfea\x00", "utf-16") 894 # invalid encoding 895 self.assertRaises(LookupError, decode_terminated, b"abc", "foobar") 896 897 def test_lax(self): 898 # missing termination 899 self.assertEqual( 900 decode_terminated(b"abc", "utf-8", strict=False), (u"abc", b"")) 901 902 # missing termination and truncated data 903 truncated = u"\xe4\xe4".encode("utf-8")[:-1] 904 self.assertRaises( 905 UnicodeDecodeError, decode_terminated, 906 truncated, "utf-8", strict=False) 907 908 909class TBitReader(TestCase): 910 911 def test_bits(self): 912 data = b"\x12\x34\x56\x78\x89\xAB\xCD\xEF" 913 ref = cdata.uint64_be(data) 914 915 for i in xrange(64): 916 fo = cBytesIO(data) 917 r = BitReader(fo) 918 v = r.bits(i) << (64 - i) | r.bits(64 - i) 919 self.assertEqual(v, ref) 920 921 def test_bits_null(self): 922 r = BitReader(cBytesIO(b"")) 923 self.assertEqual(r.bits(0), 0) 924 925 def test_bits_error(self): 926 r = BitReader(cBytesIO(b"")) 927 self.assertRaises(ValueError, r.bits, -1) 928 929 def test_bytes_error(self): 930 r = BitReader(cBytesIO(b"")) 931 self.assertRaises(ValueError, r.bytes, -1) 932 933 def test_skip_error(self): 934 r = BitReader(cBytesIO(b"")) 935 self.assertRaises(ValueError, r.skip, -1) 936 937 def test_read_too_much(self): 938 r = BitReader(cBytesIO(b"")) 939 self.assertEqual(r.bits(0), 0) 940 self.assertRaises(BitReaderError, r.bits, 1) 941 942 def test_skip(self): 943 r = BitReader(cBytesIO(b"\xEF")) 944 r.skip(4) 945 self.assertEqual(r.bits(4), 0xf) 946 947 def test_skip_more(self): 948 r = BitReader(cBytesIO(b"\xAB\xCD")) 949 self.assertEqual(r.bits(4), 0xa) 950 r.skip(8) 951 self.assertEqual(r.bits(4), 0xd) 952 self.assertRaises(BitReaderError, r.bits, 1) 953 954 def test_skip_too_much(self): 955 r = BitReader(cBytesIO(b"\xAB\xCD")) 956 # aligned skips don't fail, but the following read will 957 r.skip(32 + 8) 958 self.assertRaises(BitReaderError, r.bits, 1) 959 self.assertRaises(BitReaderError, r.skip, 1) 960 961 def test_bytes(self): 962 r = BitReader(cBytesIO(b"\xAB\xCD\xEF")) 963 self.assertEqual(r.bytes(2), b"\xAB\xCD") 964 self.assertEqual(r.bytes(0), b"") 965 966 def test_bytes_unaligned(self): 967 r = BitReader(cBytesIO(b"\xAB\xCD\xEF")) 968 r.skip(4) 969 self.assertEqual(r.bytes(2), b"\xBC\xDE") 970 971 def test_get_position(self): 972 r = BitReader(cBytesIO(b"\xAB\xCD")) 973 self.assertEqual(r.get_position(), 0) 974 r.bits(3) 975 self.assertEqual(r.get_position(), 3) 976 r.skip(9) 977 self.assertEqual(r.get_position(), 3 + 9) 978 r.align() 979 self.assertEqual(r.get_position(), 16) 980 981 def test_align(self): 982 r = BitReader(cBytesIO(b"\xAB\xCD\xEF")) 983 r.skip(3) 984 self.assertEqual(r.align(), 5) 985 self.assertEqual(r.get_position(), 8) 986 987 def test_is_aligned(self): 988 r = BitReader(cBytesIO(b"\xAB\xCD\xEF")) 989 self.assertTrue(r.is_aligned()) 990 991 r.skip(1) 992 self.assertFalse(r.is_aligned()) 993 r.skip(7) 994 self.assertTrue(r.is_aligned()) 995 996 r.bits(7) 997 self.assertFalse(r.is_aligned()) 998 r.bits(1) 999 self.assertTrue(r.is_aligned()) 1000 1001 1002class Tverify_filename(TestCase): 1003 1004 def test_verify_filename_fail(self): 1005 self.assertRaises(ValueError, verify_filename, object()) 1006 1007 def test_verify_filename(self): 1008 1009 class FilePath(object): 1010 def __init__(self, filename): 1011 self.filename = filename 1012 1013 def __fspath__(self): 1014 return self.filename 1015 1016 verify_filename(FilePath("foo")) 1017 verify_filename("foo") 1018 verify_filename(b"foo") 1019