1"""Unit tests for the io module.""" 2 3# Tests of io are scattered over the test suite: 4# * test_bufio - tests file buffering 5# * test_memoryio - tests BytesIO and StringIO 6# * test_fileio - tests FileIO 7# * test_file - tests the file interface 8# * test_io - tests everything else in the io module 9# * test_univnewlines - tests universal newline support 10# * test_largefile - tests operations on a file greater than 2**32 bytes 11# (only enabled with -ulargefile) 12 13################################################################################ 14# ATTENTION TEST WRITERS!!! 15################################################################################ 16# When writing tests for io, it's important to test both the C and Python 17# implementations. This is usually done by writing a base test that refers to 18# the type it is testing as an attribute. Then it provides custom subclasses to 19# test both implementations. This file has lots of examples. 20################################################################################ 21 22from __future__ import print_function 23from __future__ import unicode_literals 24 25import os 26import sys 27import time 28import array 29import random 30import unittest 31import weakref 32import warnings 33import abc 34import signal 35import errno 36from itertools import cycle, count 37from collections import deque 38from UserList import UserList 39from test import test_support as support 40import contextlib 41 42import codecs 43import io # C implementation of io 44import _pyio as pyio # Python implementation of io 45try: 46 import threading 47except ImportError: 48 threading = None 49try: 50 import fcntl 51except ImportError: 52 fcntl = None 53 54__metaclass__ = type 55bytes = support.py3k_bytes 56 57def byteslike(*pos, **kw): 58 return memoryview(bytearray(*pos, **kw)) 59 60def _default_chunk_size(): 61 """Get the default TextIOWrapper chunk size""" 62 with io.open(__file__, "r", encoding="latin1") as f: 63 return f._CHUNK_SIZE 64 65 66class MockRawIOWithoutRead: 67 """A RawIO implementation without read(), so as to exercise the default 68 RawIO.read() which calls readinto().""" 69 70 def __init__(self, read_stack=()): 71 self._read_stack = list(read_stack) 72 self._write_stack = [] 73 self._reads = 0 74 self._extraneous_reads = 0 75 76 def write(self, b): 77 self._write_stack.append(bytes(b)) 78 return len(b) 79 80 def writable(self): 81 return True 82 83 def fileno(self): 84 return 42 85 86 def readable(self): 87 return True 88 89 def seekable(self): 90 return True 91 92 def seek(self, pos, whence): 93 return 0 # wrong but we gotta return something 94 95 def tell(self): 96 return 0 # same comment as above 97 98 def readinto(self, buf): 99 self._reads += 1 100 max_len = len(buf) 101 try: 102 data = self._read_stack[0] 103 except IndexError: 104 self._extraneous_reads += 1 105 return 0 106 if data is None: 107 del self._read_stack[0] 108 return None 109 n = len(data) 110 if len(data) <= max_len: 111 del self._read_stack[0] 112 buf[:n] = data 113 return n 114 else: 115 buf[:] = data[:max_len] 116 self._read_stack[0] = data[max_len:] 117 return max_len 118 119 def truncate(self, pos=None): 120 return pos 121 122class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase): 123 pass 124 125class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase): 126 pass 127 128 129class MockRawIO(MockRawIOWithoutRead): 130 131 def read(self, n=None): 132 self._reads += 1 133 try: 134 return self._read_stack.pop(0) 135 except: 136 self._extraneous_reads += 1 137 return b"" 138 139class CMockRawIO(MockRawIO, io.RawIOBase): 140 pass 141 142class PyMockRawIO(MockRawIO, pyio.RawIOBase): 143 pass 144 145 146class MisbehavedRawIO(MockRawIO): 147 def write(self, b): 148 return MockRawIO.write(self, b) * 2 149 150 def read(self, n=None): 151 return MockRawIO.read(self, n) * 2 152 153 def seek(self, pos, whence): 154 return -123 155 156 def tell(self): 157 return -456 158 159 def readinto(self, buf): 160 MockRawIO.readinto(self, buf) 161 return len(buf) * 5 162 163class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase): 164 pass 165 166class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase): 167 pass 168 169 170class CloseFailureIO(MockRawIO): 171 closed = 0 172 173 def close(self): 174 if not self.closed: 175 self.closed = 1 176 raise IOError 177 178class CCloseFailureIO(CloseFailureIO, io.RawIOBase): 179 pass 180 181class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase): 182 pass 183 184 185class MockFileIO: 186 187 def __init__(self, data): 188 self.read_history = [] 189 super(MockFileIO, self).__init__(data) 190 191 def read(self, n=None): 192 res = super(MockFileIO, self).read(n) 193 self.read_history.append(None if res is None else len(res)) 194 return res 195 196 def readinto(self, b): 197 res = super(MockFileIO, self).readinto(b) 198 self.read_history.append(res) 199 return res 200 201class CMockFileIO(MockFileIO, io.BytesIO): 202 pass 203 204class PyMockFileIO(MockFileIO, pyio.BytesIO): 205 pass 206 207 208class MockNonBlockWriterIO: 209 210 def __init__(self): 211 self._write_stack = [] 212 self._blocker_char = None 213 214 def pop_written(self): 215 s = b"".join(self._write_stack) 216 self._write_stack[:] = [] 217 return s 218 219 def block_on(self, char): 220 """Block when a given char is encountered.""" 221 self._blocker_char = char 222 223 def readable(self): 224 return True 225 226 def seekable(self): 227 return True 228 229 def writable(self): 230 return True 231 232 def write(self, b): 233 b = bytes(b) 234 n = -1 235 if self._blocker_char: 236 try: 237 n = b.index(self._blocker_char) 238 except ValueError: 239 pass 240 else: 241 if n > 0: 242 # write data up to the first blocker 243 self._write_stack.append(b[:n]) 244 return n 245 else: 246 # cancel blocker and indicate would block 247 self._blocker_char = None 248 return None 249 self._write_stack.append(b) 250 return len(b) 251 252class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase): 253 BlockingIOError = io.BlockingIOError 254 255class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase): 256 BlockingIOError = pyio.BlockingIOError 257 258 259class IOTest(unittest.TestCase): 260 261 def setUp(self): 262 support.unlink(support.TESTFN) 263 264 def tearDown(self): 265 support.unlink(support.TESTFN) 266 267 def write_ops(self, f): 268 self.assertEqual(f.write(b"blah."), 5) 269 f.truncate(0) 270 self.assertEqual(f.tell(), 5) 271 f.seek(0) 272 273 self.assertEqual(f.write(b"blah."), 5) 274 self.assertEqual(f.seek(0), 0) 275 self.assertEqual(f.write(b"Hello."), 6) 276 self.assertEqual(f.tell(), 6) 277 self.assertEqual(f.seek(-1, 1), 5) 278 self.assertEqual(f.tell(), 5) 279 buffer = bytearray(b" world\n\n\n") 280 self.assertEqual(f.write(buffer), 9) 281 buffer[:] = b"*" * 9 # Overwrite our copy of the data 282 self.assertEqual(f.seek(0), 0) 283 self.assertEqual(f.write(b"h"), 1) 284 self.assertEqual(f.seek(-1, 2), 13) 285 self.assertEqual(f.tell(), 13) 286 287 self.assertEqual(f.truncate(12), 12) 288 self.assertEqual(f.tell(), 13) 289 self.assertRaises(TypeError, f.seek, 0.0) 290 291 def read_ops(self, f, buffered=False): 292 data = f.read(5) 293 self.assertEqual(data, b"hello") 294 data = byteslike(data) 295 self.assertEqual(f.readinto(data), 5) 296 self.assertEqual(data.tobytes(), b" worl") 297 data = bytearray(5) 298 self.assertEqual(f.readinto(data), 2) 299 self.assertEqual(len(data), 5) 300 self.assertEqual(data[:2], b"d\n") 301 self.assertEqual(f.seek(0), 0) 302 self.assertEqual(f.read(20), b"hello world\n") 303 self.assertEqual(f.read(1), b"") 304 self.assertEqual(f.readinto(byteslike(b"x")), 0) 305 self.assertEqual(f.seek(-6, 2), 6) 306 self.assertEqual(f.read(5), b"world") 307 self.assertEqual(f.read(0), b"") 308 self.assertEqual(f.readinto(byteslike()), 0) 309 self.assertEqual(f.seek(-6, 1), 5) 310 self.assertEqual(f.read(5), b" worl") 311 self.assertEqual(f.tell(), 10) 312 self.assertRaises(TypeError, f.seek, 0.0) 313 if buffered: 314 f.seek(0) 315 self.assertEqual(f.read(), b"hello world\n") 316 f.seek(6) 317 self.assertEqual(f.read(), b"world\n") 318 self.assertEqual(f.read(), b"") 319 320 LARGE = 2**31 321 322 def large_file_ops(self, f): 323 assert f.readable() 324 assert f.writable() 325 self.assertEqual(f.seek(self.LARGE), self.LARGE) 326 self.assertEqual(f.tell(), self.LARGE) 327 self.assertEqual(f.write(b"xxx"), 3) 328 self.assertEqual(f.tell(), self.LARGE + 3) 329 self.assertEqual(f.seek(-1, 1), self.LARGE + 2) 330 self.assertEqual(f.truncate(), self.LARGE + 2) 331 self.assertEqual(f.tell(), self.LARGE + 2) 332 self.assertEqual(f.seek(0, 2), self.LARGE + 2) 333 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1) 334 self.assertEqual(f.tell(), self.LARGE + 2) 335 self.assertEqual(f.seek(0, 2), self.LARGE + 1) 336 self.assertEqual(f.seek(-1, 2), self.LARGE) 337 self.assertEqual(f.read(2), b"x") 338 339 def test_invalid_operations(self): 340 # Try writing on a file opened in read mode and vice-versa. 341 for mode in ("w", "wb"): 342 with self.open(support.TESTFN, mode) as fp: 343 self.assertRaises(IOError, fp.read) 344 self.assertRaises(IOError, fp.readline) 345 with self.open(support.TESTFN, "rb") as fp: 346 self.assertRaises(IOError, fp.write, b"blah") 347 self.assertRaises(IOError, fp.writelines, [b"blah\n"]) 348 with self.open(support.TESTFN, "r") as fp: 349 self.assertRaises(IOError, fp.write, "blah") 350 self.assertRaises(IOError, fp.writelines, ["blah\n"]) 351 352 def test_open_handles_NUL_chars(self): 353 fn_with_NUL = 'foo\0bar' 354 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w') 355 356 bytes_fn = fn_with_NUL.encode('ascii') 357 with warnings.catch_warnings(): 358 warnings.simplefilter("ignore", DeprecationWarning) 359 self.assertRaises(TypeError, self.open, bytes_fn, 'w') 360 361 def test_raw_file_io(self): 362 with self.open(support.TESTFN, "wb", buffering=0) as f: 363 self.assertEqual(f.readable(), False) 364 self.assertEqual(f.writable(), True) 365 self.assertEqual(f.seekable(), True) 366 self.write_ops(f) 367 with self.open(support.TESTFN, "rb", buffering=0) as f: 368 self.assertEqual(f.readable(), True) 369 self.assertEqual(f.writable(), False) 370 self.assertEqual(f.seekable(), True) 371 self.read_ops(f) 372 373 def test_buffered_file_io(self): 374 with self.open(support.TESTFN, "wb") as f: 375 self.assertEqual(f.readable(), False) 376 self.assertEqual(f.writable(), True) 377 self.assertEqual(f.seekable(), True) 378 self.write_ops(f) 379 with self.open(support.TESTFN, "rb") as f: 380 self.assertEqual(f.readable(), True) 381 self.assertEqual(f.writable(), False) 382 self.assertEqual(f.seekable(), True) 383 self.read_ops(f, True) 384 385 def test_readline(self): 386 with self.open(support.TESTFN, "wb") as f: 387 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line") 388 with self.open(support.TESTFN, "rb") as f: 389 self.assertEqual(f.readline(), b"abc\n") 390 self.assertEqual(f.readline(10), b"def\n") 391 self.assertEqual(f.readline(2), b"xy") 392 self.assertEqual(f.readline(4), b"zzy\n") 393 self.assertEqual(f.readline(), b"foo\x00bar\n") 394 self.assertEqual(f.readline(None), b"another line") 395 self.assertRaises(TypeError, f.readline, 5.3) 396 with self.open(support.TESTFN, "r") as f: 397 self.assertRaises(TypeError, f.readline, 5.3) 398 399 def test_readline_nonsizeable(self): 400 # Issue #30061 401 # Crash when readline() returns an object without __len__ 402 class R(self.IOBase): 403 def readline(self): 404 return None 405 self.assertRaises((TypeError, StopIteration), next, R()) 406 407 def test_next_nonsizeable(self): 408 # Issue #30061 409 # Crash when next() returns an object without __len__ 410 class R(self.IOBase): 411 def next(self): 412 return None 413 self.assertRaises(TypeError, R().readlines, 1) 414 415 def test_raw_bytes_io(self): 416 f = self.BytesIO() 417 self.write_ops(f) 418 data = f.getvalue() 419 self.assertEqual(data, b"hello world\n") 420 f = self.BytesIO(data) 421 self.read_ops(f, True) 422 423 def test_large_file_ops(self): 424 # On Windows and Mac OSX this test comsumes large resources; It takes 425 # a long time to build the >2GB file and takes >2GB of disk space 426 # therefore the resource must be enabled to run this test. 427 if sys.platform[:3] == 'win' or sys.platform == 'darwin': 428 support.requires( 429 'largefile', 430 'test requires %s bytes and a long time to run' % self.LARGE) 431 with self.open(support.TESTFN, "w+b", 0) as f: 432 self.large_file_ops(f) 433 with self.open(support.TESTFN, "w+b") as f: 434 self.large_file_ops(f) 435 436 def test_with_open(self): 437 for bufsize in (0, 1, 100): 438 f = None 439 with self.open(support.TESTFN, "wb", bufsize) as f: 440 f.write(b"xxx") 441 self.assertEqual(f.closed, True) 442 f = None 443 try: 444 with self.open(support.TESTFN, "wb", bufsize) as f: 445 1 // 0 446 except ZeroDivisionError: 447 self.assertEqual(f.closed, True) 448 else: 449 self.fail("1 // 0 didn't raise an exception") 450 451 # issue 5008 452 def test_append_mode_tell(self): 453 with self.open(support.TESTFN, "wb") as f: 454 f.write(b"xxx") 455 with self.open(support.TESTFN, "ab", buffering=0) as f: 456 self.assertEqual(f.tell(), 3) 457 with self.open(support.TESTFN, "ab") as f: 458 self.assertEqual(f.tell(), 3) 459 with self.open(support.TESTFN, "a") as f: 460 self.assertGreater(f.tell(), 0) 461 462 def test_destructor(self): 463 record = [] 464 class MyFileIO(self.FileIO): 465 def __del__(self): 466 record.append(1) 467 try: 468 f = super(MyFileIO, self).__del__ 469 except AttributeError: 470 pass 471 else: 472 f() 473 def close(self): 474 record.append(2) 475 super(MyFileIO, self).close() 476 def flush(self): 477 record.append(3) 478 super(MyFileIO, self).flush() 479 f = MyFileIO(support.TESTFN, "wb") 480 f.write(b"xxx") 481 del f 482 support.gc_collect() 483 self.assertEqual(record, [1, 2, 3]) 484 with self.open(support.TESTFN, "rb") as f: 485 self.assertEqual(f.read(), b"xxx") 486 487 def _check_base_destructor(self, base): 488 record = [] 489 class MyIO(base): 490 def __init__(self): 491 # This exercises the availability of attributes on object 492 # destruction. 493 # (in the C version, close() is called by the tp_dealloc 494 # function, not by __del__) 495 self.on_del = 1 496 self.on_close = 2 497 self.on_flush = 3 498 def __del__(self): 499 record.append(self.on_del) 500 try: 501 f = super(MyIO, self).__del__ 502 except AttributeError: 503 pass 504 else: 505 f() 506 def close(self): 507 record.append(self.on_close) 508 super(MyIO, self).close() 509 def flush(self): 510 record.append(self.on_flush) 511 super(MyIO, self).flush() 512 f = MyIO() 513 del f 514 support.gc_collect() 515 self.assertEqual(record, [1, 2, 3]) 516 517 def test_IOBase_destructor(self): 518 self._check_base_destructor(self.IOBase) 519 520 def test_RawIOBase_destructor(self): 521 self._check_base_destructor(self.RawIOBase) 522 523 def test_BufferedIOBase_destructor(self): 524 self._check_base_destructor(self.BufferedIOBase) 525 526 def test_TextIOBase_destructor(self): 527 self._check_base_destructor(self.TextIOBase) 528 529 def test_close_flushes(self): 530 with self.open(support.TESTFN, "wb") as f: 531 f.write(b"xxx") 532 with self.open(support.TESTFN, "rb") as f: 533 self.assertEqual(f.read(), b"xxx") 534 535 def test_array_writes(self): 536 a = array.array(b'i', range(10)) 537 n = len(a.tostring()) 538 with self.open(support.TESTFN, "wb", 0) as f: 539 self.assertEqual(f.write(a), n) 540 with self.open(support.TESTFN, "wb") as f: 541 self.assertEqual(f.write(a), n) 542 543 def test_closefd(self): 544 self.assertRaises(ValueError, self.open, support.TESTFN, 'w', 545 closefd=False) 546 547 def test_read_closed(self): 548 with self.open(support.TESTFN, "w") as f: 549 f.write("egg\n") 550 with self.open(support.TESTFN, "r") as f: 551 file = self.open(f.fileno(), "r", closefd=False) 552 self.assertEqual(file.read(), "egg\n") 553 file.seek(0) 554 file.close() 555 self.assertRaises(ValueError, file.read) 556 557 def test_no_closefd_with_filename(self): 558 # can't use closefd in combination with a file name 559 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False) 560 561 def test_closefd_attr(self): 562 with self.open(support.TESTFN, "wb") as f: 563 f.write(b"egg\n") 564 with self.open(support.TESTFN, "r") as f: 565 self.assertEqual(f.buffer.raw.closefd, True) 566 file = self.open(f.fileno(), "r", closefd=False) 567 self.assertEqual(file.buffer.raw.closefd, False) 568 569 def test_garbage_collection(self): 570 # FileIO objects are collected, and collecting them flushes 571 # all data to disk. 572 f = self.FileIO(support.TESTFN, "wb") 573 f.write(b"abcxxx") 574 f.f = f 575 wr = weakref.ref(f) 576 del f 577 support.gc_collect() 578 self.assertIsNone(wr(), wr) 579 with self.open(support.TESTFN, "rb") as f: 580 self.assertEqual(f.read(), b"abcxxx") 581 582 def test_unbounded_file(self): 583 # Issue #1174606: reading from an unbounded stream such as /dev/zero. 584 zero = "/dev/zero" 585 if not os.path.exists(zero): 586 self.skipTest("{0} does not exist".format(zero)) 587 if sys.maxsize > 0x7FFFFFFF: 588 self.skipTest("test can only run in a 32-bit address space") 589 if support.real_max_memuse < support._2G: 590 self.skipTest("test requires at least 2GB of memory") 591 with self.open(zero, "rb", buffering=0) as f: 592 self.assertRaises(OverflowError, f.read) 593 with self.open(zero, "rb") as f: 594 self.assertRaises(OverflowError, f.read) 595 with self.open(zero, "r") as f: 596 self.assertRaises(OverflowError, f.read) 597 598 def check_flush_error_on_close(self, *args, **kwargs): 599 # Test that the file is closed despite failed flush 600 # and that flush() is called before file closed. 601 f = self.open(*args, **kwargs) 602 closed = [] 603 def bad_flush(): 604 closed[:] = [f.closed] 605 raise IOError() 606 f.flush = bad_flush 607 self.assertRaises(IOError, f.close) # exception not swallowed 608 self.assertTrue(f.closed) 609 self.assertTrue(closed) # flush() called 610 self.assertFalse(closed[0]) # flush() called before file closed 611 f.flush = lambda: None # break reference loop 612 613 def test_flush_error_on_close(self): 614 # raw file 615 # Issue #5700: io.FileIO calls flush() after file closed 616 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0) 617 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 618 self.check_flush_error_on_close(fd, 'wb', buffering=0) 619 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 620 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False) 621 os.close(fd) 622 # buffered io 623 self.check_flush_error_on_close(support.TESTFN, 'wb') 624 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 625 self.check_flush_error_on_close(fd, 'wb') 626 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 627 self.check_flush_error_on_close(fd, 'wb', closefd=False) 628 os.close(fd) 629 # text io 630 self.check_flush_error_on_close(support.TESTFN, 'w') 631 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 632 self.check_flush_error_on_close(fd, 'w') 633 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 634 self.check_flush_error_on_close(fd, 'w', closefd=False) 635 os.close(fd) 636 637 def test_multi_close(self): 638 f = self.open(support.TESTFN, "wb", buffering=0) 639 f.close() 640 f.close() 641 f.close() 642 self.assertRaises(ValueError, f.flush) 643 644 def test_RawIOBase_read(self): 645 # Exercise the default RawIOBase.read() implementation (which calls 646 # readinto() internally). 647 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None)) 648 self.assertEqual(rawio.read(2), b"ab") 649 self.assertEqual(rawio.read(2), b"c") 650 self.assertEqual(rawio.read(2), b"d") 651 self.assertEqual(rawio.read(2), None) 652 self.assertEqual(rawio.read(2), b"ef") 653 self.assertEqual(rawio.read(2), b"g") 654 self.assertEqual(rawio.read(2), None) 655 self.assertEqual(rawio.read(2), b"") 656 657 def test_fileio_closefd(self): 658 # Issue #4841 659 with self.open(__file__, 'rb') as f1, \ 660 self.open(__file__, 'rb') as f2: 661 fileio = self.FileIO(f1.fileno(), closefd=False) 662 # .__init__() must not close f1 663 fileio.__init__(f2.fileno(), closefd=False) 664 f1.readline() 665 # .close() must not close f2 666 fileio.close() 667 f2.readline() 668 669 def test_nonbuffered_textio(self): 670 with warnings.catch_warnings(record=True) as recorded: 671 with self.assertRaises(ValueError): 672 self.open(support.TESTFN, 'w', buffering=0) 673 support.gc_collect() 674 self.assertEqual(recorded, []) 675 676 def test_invalid_newline(self): 677 with warnings.catch_warnings(record=True) as recorded: 678 with self.assertRaises(ValueError): 679 self.open(support.TESTFN, 'w', newline='invalid') 680 support.gc_collect() 681 self.assertEqual(recorded, []) 682 683 def test_buffered_readinto_mixin(self): 684 # Test the implementation provided by BufferedIOBase 685 class Stream(self.BufferedIOBase): 686 def read(self, size): 687 return b"12345" 688 stream = Stream() 689 buffer = byteslike(5) 690 self.assertEqual(stream.readinto(buffer), 5) 691 self.assertEqual(buffer.tobytes(), b"12345") 692 693 def test_close_assert(self): 694 class R(self.IOBase): 695 def __setattr__(self, name, value): 696 pass 697 def flush(self): 698 raise OSError() 699 f = R() 700 # This would cause an assertion failure. 701 self.assertRaises(OSError, f.close) 702 703 704class CIOTest(IOTest): 705 706 def test_IOBase_finalize(self): 707 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a 708 # class which inherits IOBase and an object of this class are caught 709 # in a reference cycle and close() is already in the method cache. 710 class MyIO(self.IOBase): 711 def close(self): 712 pass 713 714 # create an instance to populate the method cache 715 MyIO() 716 obj = MyIO() 717 obj.obj = obj 718 wr = weakref.ref(obj) 719 del MyIO 720 del obj 721 support.gc_collect() 722 self.assertIsNone(wr(), wr) 723 724class PyIOTest(IOTest): 725 test_array_writes = unittest.skip( 726 "len(array.array) returns number of elements rather than bytelength" 727 )(IOTest.test_array_writes) 728 729 730class CommonBufferedTests: 731 # Tests common to BufferedReader, BufferedWriter and BufferedRandom 732 733 def test_detach(self): 734 raw = self.MockRawIO() 735 buf = self.tp(raw) 736 self.assertIs(buf.detach(), raw) 737 self.assertRaises(ValueError, buf.detach) 738 739 repr(buf) # Should still work 740 741 def test_fileno(self): 742 rawio = self.MockRawIO() 743 bufio = self.tp(rawio) 744 745 self.assertEqual(42, bufio.fileno()) 746 747 def test_invalid_args(self): 748 rawio = self.MockRawIO() 749 bufio = self.tp(rawio) 750 # Invalid whence 751 self.assertRaises(ValueError, bufio.seek, 0, -1) 752 self.assertRaises(ValueError, bufio.seek, 0, 3) 753 754 def test_override_destructor(self): 755 tp = self.tp 756 record = [] 757 class MyBufferedIO(tp): 758 def __del__(self): 759 record.append(1) 760 try: 761 f = super(MyBufferedIO, self).__del__ 762 except AttributeError: 763 pass 764 else: 765 f() 766 def close(self): 767 record.append(2) 768 super(MyBufferedIO, self).close() 769 def flush(self): 770 record.append(3) 771 super(MyBufferedIO, self).flush() 772 rawio = self.MockRawIO() 773 bufio = MyBufferedIO(rawio) 774 writable = bufio.writable() 775 del bufio 776 support.gc_collect() 777 if writable: 778 self.assertEqual(record, [1, 2, 3]) 779 else: 780 self.assertEqual(record, [1, 2]) 781 782 def test_context_manager(self): 783 # Test usability as a context manager 784 rawio = self.MockRawIO() 785 bufio = self.tp(rawio) 786 def _with(): 787 with bufio: 788 pass 789 _with() 790 # bufio should now be closed, and using it a second time should raise 791 # a ValueError. 792 self.assertRaises(ValueError, _with) 793 794 def test_error_through_destructor(self): 795 # Test that the exception state is not modified by a destructor, 796 # even if close() fails. 797 rawio = self.CloseFailureIO() 798 def f(): 799 self.tp(rawio).xyzzy 800 with support.captured_output("stderr") as s: 801 self.assertRaises(AttributeError, f) 802 s = s.getvalue().strip() 803 if s: 804 # The destructor *may* have printed an unraisable error, check it 805 self.assertEqual(len(s.splitlines()), 1) 806 self.assertTrue(s.startswith("Exception IOError: "), s) 807 self.assertTrue(s.endswith(" ignored"), s) 808 809 def test_repr(self): 810 raw = self.MockRawIO() 811 b = self.tp(raw) 812 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__) 813 self.assertEqual(repr(b), "<%s>" % clsname) 814 raw.name = "dummy" 815 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname) 816 raw.name = b"dummy" 817 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname) 818 819 def test_flush_error_on_close(self): 820 # Test that buffered file is closed despite failed flush 821 # and that flush() is called before file closed. 822 raw = self.MockRawIO() 823 closed = [] 824 def bad_flush(): 825 closed[:] = [b.closed, raw.closed] 826 raise IOError() 827 raw.flush = bad_flush 828 b = self.tp(raw) 829 self.assertRaises(IOError, b.close) # exception not swallowed 830 self.assertTrue(b.closed) 831 self.assertTrue(raw.closed) 832 self.assertTrue(closed) # flush() called 833 self.assertFalse(closed[0]) # flush() called before file closed 834 self.assertFalse(closed[1]) 835 raw.flush = lambda: None # break reference loop 836 837 def test_close_error_on_close(self): 838 raw = self.MockRawIO() 839 def bad_flush(): 840 raise IOError('flush') 841 def bad_close(): 842 raise IOError('close') 843 raw.close = bad_close 844 b = self.tp(raw) 845 b.flush = bad_flush 846 with self.assertRaises(IOError) as err: # exception not swallowed 847 b.close() 848 self.assertEqual(err.exception.args, ('close',)) 849 self.assertFalse(b.closed) 850 851 def test_multi_close(self): 852 raw = self.MockRawIO() 853 b = self.tp(raw) 854 b.close() 855 b.close() 856 b.close() 857 self.assertRaises(ValueError, b.flush) 858 859 def test_readonly_attributes(self): 860 raw = self.MockRawIO() 861 buf = self.tp(raw) 862 x = self.MockRawIO() 863 with self.assertRaises((AttributeError, TypeError)): 864 buf.raw = x 865 866 867class SizeofTest: 868 869 @support.cpython_only 870 def test_sizeof(self): 871 bufsize1 = 4096 872 bufsize2 = 8192 873 rawio = self.MockRawIO() 874 bufio = self.tp(rawio, buffer_size=bufsize1) 875 size = sys.getsizeof(bufio) - bufsize1 876 rawio = self.MockRawIO() 877 bufio = self.tp(rawio, buffer_size=bufsize2) 878 self.assertEqual(sys.getsizeof(bufio), size + bufsize2) 879 880 881class BufferedReaderTest(unittest.TestCase, CommonBufferedTests): 882 read_mode = "rb" 883 884 def test_constructor(self): 885 rawio = self.MockRawIO([b"abc"]) 886 bufio = self.tp(rawio) 887 bufio.__init__(rawio) 888 bufio.__init__(rawio, buffer_size=1024) 889 bufio.__init__(rawio, buffer_size=16) 890 self.assertEqual(b"abc", bufio.read()) 891 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 892 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 893 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 894 rawio = self.MockRawIO([b"abc"]) 895 bufio.__init__(rawio) 896 self.assertEqual(b"abc", bufio.read()) 897 898 def test_uninitialized(self): 899 bufio = self.tp.__new__(self.tp) 900 del bufio 901 bufio = self.tp.__new__(self.tp) 902 self.assertRaisesRegexp((ValueError, AttributeError), 903 'uninitialized|has no attribute', 904 bufio.read, 0) 905 bufio.__init__(self.MockRawIO()) 906 self.assertEqual(bufio.read(0), b'') 907 908 def test_read(self): 909 for arg in (None, 7): 910 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 911 bufio = self.tp(rawio) 912 self.assertEqual(b"abcdefg", bufio.read(arg)) 913 # Invalid args 914 self.assertRaises(ValueError, bufio.read, -2) 915 916 def test_read1(self): 917 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 918 bufio = self.tp(rawio) 919 self.assertEqual(b"a", bufio.read(1)) 920 self.assertEqual(b"b", bufio.read1(1)) 921 self.assertEqual(rawio._reads, 1) 922 self.assertEqual(b"c", bufio.read1(100)) 923 self.assertEqual(rawio._reads, 1) 924 self.assertEqual(b"d", bufio.read1(100)) 925 self.assertEqual(rawio._reads, 2) 926 self.assertEqual(b"efg", bufio.read1(100)) 927 self.assertEqual(rawio._reads, 3) 928 self.assertEqual(b"", bufio.read1(100)) 929 self.assertEqual(rawio._reads, 4) 930 # Invalid args 931 self.assertRaises(ValueError, bufio.read1, -1) 932 933 def test_readinto(self): 934 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 935 bufio = self.tp(rawio) 936 b = bytearray(2) 937 self.assertEqual(bufio.readinto(b), 2) 938 self.assertEqual(b, b"ab") 939 self.assertEqual(bufio.readinto(b), 2) 940 self.assertEqual(b, b"cd") 941 self.assertEqual(bufio.readinto(b), 2) 942 self.assertEqual(b, b"ef") 943 self.assertEqual(bufio.readinto(b), 1) 944 self.assertEqual(b, b"gf") 945 self.assertEqual(bufio.readinto(b), 0) 946 self.assertEqual(b, b"gf") 947 948 def test_readlines(self): 949 def bufio(): 950 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef")) 951 return self.tp(rawio) 952 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"]) 953 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"]) 954 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"]) 955 956 def test_buffering(self): 957 data = b"abcdefghi" 958 dlen = len(data) 959 960 tests = [ 961 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ], 962 [ 100, [ 3, 3, 3], [ dlen ] ], 963 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ], 964 ] 965 966 for bufsize, buf_read_sizes, raw_read_sizes in tests: 967 rawio = self.MockFileIO(data) 968 bufio = self.tp(rawio, buffer_size=bufsize) 969 pos = 0 970 for nbytes in buf_read_sizes: 971 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes]) 972 pos += nbytes 973 # this is mildly implementation-dependent 974 self.assertEqual(rawio.read_history, raw_read_sizes) 975 976 def test_read_non_blocking(self): 977 # Inject some None's in there to simulate EWOULDBLOCK 978 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None)) 979 bufio = self.tp(rawio) 980 self.assertEqual(b"abcd", bufio.read(6)) 981 self.assertEqual(b"e", bufio.read(1)) 982 self.assertEqual(b"fg", bufio.read()) 983 self.assertEqual(b"", bufio.peek(1)) 984 self.assertIsNone(bufio.read()) 985 self.assertEqual(b"", bufio.read()) 986 987 rawio = self.MockRawIO((b"a", None, None)) 988 self.assertEqual(b"a", rawio.readall()) 989 self.assertIsNone(rawio.readall()) 990 991 def test_read_past_eof(self): 992 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 993 bufio = self.tp(rawio) 994 995 self.assertEqual(b"abcdefg", bufio.read(9000)) 996 997 def test_read_all(self): 998 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 999 bufio = self.tp(rawio) 1000 1001 self.assertEqual(b"abcdefg", bufio.read()) 1002 1003 @unittest.skipUnless(threading, 'Threading required for this test.') 1004 @support.requires_resource('cpu') 1005 def test_threads(self): 1006 try: 1007 # Write out many bytes with exactly the same number of 0's, 1008 # 1's... 255's. This will help us check that concurrent reading 1009 # doesn't duplicate or forget contents. 1010 N = 1000 1011 l = list(range(256)) * N 1012 random.shuffle(l) 1013 s = bytes(bytearray(l)) 1014 with self.open(support.TESTFN, "wb") as f: 1015 f.write(s) 1016 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw: 1017 bufio = self.tp(raw, 8) 1018 errors = [] 1019 results = [] 1020 def f(): 1021 try: 1022 # Intra-buffer read then buffer-flushing read 1023 for n in cycle([1, 19]): 1024 s = bufio.read(n) 1025 if not s: 1026 break 1027 # list.append() is atomic 1028 results.append(s) 1029 except Exception as e: 1030 errors.append(e) 1031 raise 1032 threads = [threading.Thread(target=f) for x in range(20)] 1033 with support.start_threads(threads): 1034 time.sleep(0.02) # yield 1035 self.assertFalse(errors, 1036 "the following exceptions were caught: %r" % errors) 1037 s = b''.join(results) 1038 for i in range(256): 1039 c = bytes(bytearray([i])) 1040 self.assertEqual(s.count(c), N) 1041 finally: 1042 support.unlink(support.TESTFN) 1043 1044 def test_misbehaved_io(self): 1045 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg")) 1046 bufio = self.tp(rawio) 1047 self.assertRaises(IOError, bufio.seek, 0) 1048 self.assertRaises(IOError, bufio.tell) 1049 1050 def test_no_extraneous_read(self): 1051 # Issue #9550; when the raw IO object has satisfied the read request, 1052 # we should not issue any additional reads, otherwise it may block 1053 # (e.g. socket). 1054 bufsize = 16 1055 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2): 1056 rawio = self.MockRawIO([b"x" * n]) 1057 bufio = self.tp(rawio, bufsize) 1058 self.assertEqual(bufio.read(n), b"x" * n) 1059 # Simple case: one raw read is enough to satisfy the request. 1060 self.assertEqual(rawio._extraneous_reads, 0, 1061 "failed for {}: {} != 0".format(n, rawio._extraneous_reads)) 1062 # A more complex case where two raw reads are needed to satisfy 1063 # the request. 1064 rawio = self.MockRawIO([b"x" * (n - 1), b"x"]) 1065 bufio = self.tp(rawio, bufsize) 1066 self.assertEqual(bufio.read(n), b"x" * n) 1067 self.assertEqual(rawio._extraneous_reads, 0, 1068 "failed for {}: {} != 0".format(n, rawio._extraneous_reads)) 1069 1070 1071class CBufferedReaderTest(BufferedReaderTest, SizeofTest): 1072 tp = io.BufferedReader 1073 1074 def test_constructor(self): 1075 BufferedReaderTest.test_constructor(self) 1076 # The allocation can succeed on 32-bit builds, e.g. with more 1077 # than 2GB RAM and a 64-bit kernel. 1078 if sys.maxsize > 0x7FFFFFFF: 1079 rawio = self.MockRawIO() 1080 bufio = self.tp(rawio) 1081 self.assertRaises((OverflowError, MemoryError, ValueError), 1082 bufio.__init__, rawio, sys.maxsize) 1083 1084 def test_initialization(self): 1085 rawio = self.MockRawIO([b"abc"]) 1086 bufio = self.tp(rawio) 1087 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1088 self.assertRaises(ValueError, bufio.read) 1089 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1090 self.assertRaises(ValueError, bufio.read) 1091 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1092 self.assertRaises(ValueError, bufio.read) 1093 1094 def test_misbehaved_io_read(self): 1095 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg")) 1096 bufio = self.tp(rawio) 1097 # _pyio.BufferedReader seems to implement reading different, so that 1098 # checking this is not so easy. 1099 self.assertRaises(IOError, bufio.read, 10) 1100 1101 def test_garbage_collection(self): 1102 # C BufferedReader objects are collected. 1103 # The Python version has __del__, so it ends into gc.garbage instead 1104 self.addCleanup(support.unlink, support.TESTFN) 1105 rawio = self.FileIO(support.TESTFN, "w+b") 1106 f = self.tp(rawio) 1107 f.f = f 1108 wr = weakref.ref(f) 1109 del f 1110 support.gc_collect() 1111 self.assertIsNone(wr(), wr) 1112 1113 def test_args_error(self): 1114 # Issue #17275 1115 with self.assertRaisesRegexp(TypeError, "BufferedReader"): 1116 self.tp(io.BytesIO(), 1024, 1024, 1024) 1117 1118 1119class PyBufferedReaderTest(BufferedReaderTest): 1120 tp = pyio.BufferedReader 1121 1122 1123class BufferedWriterTest(unittest.TestCase, CommonBufferedTests): 1124 write_mode = "wb" 1125 1126 def test_constructor(self): 1127 rawio = self.MockRawIO() 1128 bufio = self.tp(rawio) 1129 bufio.__init__(rawio) 1130 bufio.__init__(rawio, buffer_size=1024) 1131 bufio.__init__(rawio, buffer_size=16) 1132 self.assertEqual(3, bufio.write(b"abc")) 1133 bufio.flush() 1134 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1135 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1136 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1137 bufio.__init__(rawio) 1138 self.assertEqual(3, bufio.write(b"ghi")) 1139 bufio.flush() 1140 self.assertEqual(b"".join(rawio._write_stack), b"abcghi") 1141 1142 def test_uninitialized(self): 1143 bufio = self.tp.__new__(self.tp) 1144 del bufio 1145 bufio = self.tp.__new__(self.tp) 1146 self.assertRaisesRegexp((ValueError, AttributeError), 1147 'uninitialized|has no attribute', 1148 bufio.write, b'') 1149 bufio.__init__(self.MockRawIO()) 1150 self.assertEqual(bufio.write(b''), 0) 1151 1152 def test_detach_flush(self): 1153 raw = self.MockRawIO() 1154 buf = self.tp(raw) 1155 buf.write(b"howdy!") 1156 self.assertFalse(raw._write_stack) 1157 buf.detach() 1158 self.assertEqual(raw._write_stack, [b"howdy!"]) 1159 1160 def test_write(self): 1161 # Write to the buffered IO but don't overflow the buffer. 1162 writer = self.MockRawIO() 1163 bufio = self.tp(writer, 8) 1164 bufio.write(b"abc") 1165 self.assertFalse(writer._write_stack) 1166 buffer = bytearray(b"def") 1167 bufio.write(buffer) 1168 buffer[:] = b"***" # Overwrite our copy of the data 1169 bufio.flush() 1170 self.assertEqual(b"".join(writer._write_stack), b"abcdef") 1171 1172 def test_write_overflow(self): 1173 writer = self.MockRawIO() 1174 bufio = self.tp(writer, 8) 1175 contents = b"abcdefghijklmnop" 1176 for n in range(0, len(contents), 3): 1177 bufio.write(contents[n:n+3]) 1178 flushed = b"".join(writer._write_stack) 1179 # At least (total - 8) bytes were implicitly flushed, perhaps more 1180 # depending on the implementation. 1181 self.assertTrue(flushed.startswith(contents[:-8]), flushed) 1182 1183 def check_writes(self, intermediate_func): 1184 # Lots of writes, test the flushed output is as expected. 1185 contents = bytes(range(256)) * 1000 1186 n = 0 1187 writer = self.MockRawIO() 1188 bufio = self.tp(writer, 13) 1189 # Generator of write sizes: repeat each N 15 times then proceed to N+1 1190 def gen_sizes(): 1191 for size in count(1): 1192 for i in range(15): 1193 yield size 1194 sizes = gen_sizes() 1195 while n < len(contents): 1196 size = min(next(sizes), len(contents) - n) 1197 self.assertEqual(bufio.write(contents[n:n+size]), size) 1198 intermediate_func(bufio) 1199 n += size 1200 bufio.flush() 1201 self.assertEqual(contents, 1202 b"".join(writer._write_stack)) 1203 1204 def test_writes(self): 1205 self.check_writes(lambda bufio: None) 1206 1207 def test_writes_and_flushes(self): 1208 self.check_writes(lambda bufio: bufio.flush()) 1209 1210 def test_writes_and_seeks(self): 1211 def _seekabs(bufio): 1212 pos = bufio.tell() 1213 bufio.seek(pos + 1, 0) 1214 bufio.seek(pos - 1, 0) 1215 bufio.seek(pos, 0) 1216 self.check_writes(_seekabs) 1217 def _seekrel(bufio): 1218 pos = bufio.seek(0, 1) 1219 bufio.seek(+1, 1) 1220 bufio.seek(-1, 1) 1221 bufio.seek(pos, 0) 1222 self.check_writes(_seekrel) 1223 1224 def test_writes_and_truncates(self): 1225 self.check_writes(lambda bufio: bufio.truncate(bufio.tell())) 1226 1227 def test_write_non_blocking(self): 1228 raw = self.MockNonBlockWriterIO() 1229 bufio = self.tp(raw, 8) 1230 1231 self.assertEqual(bufio.write(b"abcd"), 4) 1232 self.assertEqual(bufio.write(b"efghi"), 5) 1233 # 1 byte will be written, the rest will be buffered 1234 raw.block_on(b"k") 1235 self.assertEqual(bufio.write(b"jklmn"), 5) 1236 1237 # 8 bytes will be written, 8 will be buffered and the rest will be lost 1238 raw.block_on(b"0") 1239 try: 1240 bufio.write(b"opqrwxyz0123456789") 1241 except self.BlockingIOError as e: 1242 written = e.characters_written 1243 else: 1244 self.fail("BlockingIOError should have been raised") 1245 self.assertEqual(written, 16) 1246 self.assertEqual(raw.pop_written(), 1247 b"abcdefghijklmnopqrwxyz") 1248 1249 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9) 1250 s = raw.pop_written() 1251 # Previously buffered bytes were flushed 1252 self.assertTrue(s.startswith(b"01234567A"), s) 1253 1254 def test_write_and_rewind(self): 1255 raw = io.BytesIO() 1256 bufio = self.tp(raw, 4) 1257 self.assertEqual(bufio.write(b"abcdef"), 6) 1258 self.assertEqual(bufio.tell(), 6) 1259 bufio.seek(0, 0) 1260 self.assertEqual(bufio.write(b"XY"), 2) 1261 bufio.seek(6, 0) 1262 self.assertEqual(raw.getvalue(), b"XYcdef") 1263 self.assertEqual(bufio.write(b"123456"), 6) 1264 bufio.flush() 1265 self.assertEqual(raw.getvalue(), b"XYcdef123456") 1266 1267 def test_flush(self): 1268 writer = self.MockRawIO() 1269 bufio = self.tp(writer, 8) 1270 bufio.write(b"abc") 1271 bufio.flush() 1272 self.assertEqual(b"abc", writer._write_stack[0]) 1273 1274 def test_writelines(self): 1275 l = [b'ab', b'cd', b'ef'] 1276 writer = self.MockRawIO() 1277 bufio = self.tp(writer, 8) 1278 bufio.writelines(l) 1279 bufio.flush() 1280 self.assertEqual(b''.join(writer._write_stack), b'abcdef') 1281 1282 def test_writelines_userlist(self): 1283 l = UserList([b'ab', b'cd', b'ef']) 1284 writer = self.MockRawIO() 1285 bufio = self.tp(writer, 8) 1286 bufio.writelines(l) 1287 bufio.flush() 1288 self.assertEqual(b''.join(writer._write_stack), b'abcdef') 1289 1290 def test_writelines_error(self): 1291 writer = self.MockRawIO() 1292 bufio = self.tp(writer, 8) 1293 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3]) 1294 self.assertRaises(TypeError, bufio.writelines, None) 1295 1296 def test_destructor(self): 1297 writer = self.MockRawIO() 1298 bufio = self.tp(writer, 8) 1299 bufio.write(b"abc") 1300 del bufio 1301 support.gc_collect() 1302 self.assertEqual(b"abc", writer._write_stack[0]) 1303 1304 def test_truncate(self): 1305 # Truncate implicitly flushes the buffer. 1306 self.addCleanup(support.unlink, support.TESTFN) 1307 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw: 1308 bufio = self.tp(raw, 8) 1309 bufio.write(b"abcdef") 1310 self.assertEqual(bufio.truncate(3), 3) 1311 self.assertEqual(bufio.tell(), 6) 1312 with self.open(support.TESTFN, "rb", buffering=0) as f: 1313 self.assertEqual(f.read(), b"abc") 1314 1315 @unittest.skipUnless(threading, 'Threading required for this test.') 1316 @support.requires_resource('cpu') 1317 def test_threads(self): 1318 try: 1319 # Write out many bytes from many threads and test they were 1320 # all flushed. 1321 N = 1000 1322 contents = bytes(range(256)) * N 1323 sizes = cycle([1, 19]) 1324 n = 0 1325 queue = deque() 1326 while n < len(contents): 1327 size = next(sizes) 1328 queue.append(contents[n:n+size]) 1329 n += size 1330 del contents 1331 # We use a real file object because it allows us to 1332 # exercise situations where the GIL is released before 1333 # writing the buffer to the raw streams. This is in addition 1334 # to concurrency issues due to switching threads in the middle 1335 # of Python code. 1336 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw: 1337 bufio = self.tp(raw, 8) 1338 errors = [] 1339 def f(): 1340 try: 1341 while True: 1342 try: 1343 s = queue.popleft() 1344 except IndexError: 1345 return 1346 bufio.write(s) 1347 except Exception as e: 1348 errors.append(e) 1349 raise 1350 threads = [threading.Thread(target=f) for x in range(20)] 1351 with support.start_threads(threads): 1352 time.sleep(0.02) # yield 1353 self.assertFalse(errors, 1354 "the following exceptions were caught: %r" % errors) 1355 bufio.close() 1356 with self.open(support.TESTFN, "rb") as f: 1357 s = f.read() 1358 for i in range(256): 1359 self.assertEqual(s.count(bytes([i])), N) 1360 finally: 1361 support.unlink(support.TESTFN) 1362 1363 def test_misbehaved_io(self): 1364 rawio = self.MisbehavedRawIO() 1365 bufio = self.tp(rawio, 5) 1366 self.assertRaises(IOError, bufio.seek, 0) 1367 self.assertRaises(IOError, bufio.tell) 1368 self.assertRaises(IOError, bufio.write, b"abcdef") 1369 1370 def test_max_buffer_size_deprecation(self): 1371 with support.check_warnings(("max_buffer_size is deprecated", 1372 DeprecationWarning)): 1373 self.tp(self.MockRawIO(), 8, 12) 1374 1375 def test_write_error_on_close(self): 1376 raw = self.MockRawIO() 1377 def bad_write(b): 1378 raise IOError() 1379 raw.write = bad_write 1380 b = self.tp(raw) 1381 b.write(b'spam') 1382 self.assertRaises(IOError, b.close) # exception not swallowed 1383 self.assertTrue(b.closed) 1384 1385 1386class CBufferedWriterTest(BufferedWriterTest, SizeofTest): 1387 tp = io.BufferedWriter 1388 1389 def test_constructor(self): 1390 BufferedWriterTest.test_constructor(self) 1391 # The allocation can succeed on 32-bit builds, e.g. with more 1392 # than 2GB RAM and a 64-bit kernel. 1393 if sys.maxsize > 0x7FFFFFFF: 1394 rawio = self.MockRawIO() 1395 bufio = self.tp(rawio) 1396 self.assertRaises((OverflowError, MemoryError, ValueError), 1397 bufio.__init__, rawio, sys.maxsize) 1398 1399 def test_initialization(self): 1400 rawio = self.MockRawIO() 1401 bufio = self.tp(rawio) 1402 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1403 self.assertRaises(ValueError, bufio.write, b"def") 1404 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1405 self.assertRaises(ValueError, bufio.write, b"def") 1406 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1407 self.assertRaises(ValueError, bufio.write, b"def") 1408 1409 def test_garbage_collection(self): 1410 # C BufferedWriter objects are collected, and collecting them flushes 1411 # all data to disk. 1412 # The Python version has __del__, so it ends into gc.garbage instead 1413 self.addCleanup(support.unlink, support.TESTFN) 1414 rawio = self.FileIO(support.TESTFN, "w+b") 1415 f = self.tp(rawio) 1416 f.write(b"123xxx") 1417 f.x = f 1418 wr = weakref.ref(f) 1419 del f 1420 support.gc_collect() 1421 self.assertIsNone(wr(), wr) 1422 with self.open(support.TESTFN, "rb") as f: 1423 self.assertEqual(f.read(), b"123xxx") 1424 1425 def test_args_error(self): 1426 # Issue #17275 1427 with self.assertRaisesRegexp(TypeError, "BufferedWriter"): 1428 self.tp(io.BytesIO(), 1024, 1024, 1024) 1429 1430 1431class PyBufferedWriterTest(BufferedWriterTest): 1432 tp = pyio.BufferedWriter 1433 1434class BufferedRWPairTest(unittest.TestCase): 1435 1436 def test_constructor(self): 1437 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1438 self.assertFalse(pair.closed) 1439 1440 def test_uninitialized(self): 1441 pair = self.tp.__new__(self.tp) 1442 del pair 1443 pair = self.tp.__new__(self.tp) 1444 self.assertRaisesRegexp((ValueError, AttributeError), 1445 'uninitialized|has no attribute', 1446 pair.read, 0) 1447 self.assertRaisesRegexp((ValueError, AttributeError), 1448 'uninitialized|has no attribute', 1449 pair.write, b'') 1450 pair.__init__(self.MockRawIO(), self.MockRawIO()) 1451 self.assertEqual(pair.read(0), b'') 1452 self.assertEqual(pair.write(b''), 0) 1453 1454 def test_detach(self): 1455 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1456 self.assertRaises(self.UnsupportedOperation, pair.detach) 1457 1458 def test_constructor_max_buffer_size_deprecation(self): 1459 with support.check_warnings(("max_buffer_size is deprecated", 1460 DeprecationWarning)): 1461 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12) 1462 1463 def test_constructor_with_not_readable(self): 1464 class NotReadable(MockRawIO): 1465 def readable(self): 1466 return False 1467 1468 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO()) 1469 1470 def test_constructor_with_not_writeable(self): 1471 class NotWriteable(MockRawIO): 1472 def writable(self): 1473 return False 1474 1475 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable()) 1476 1477 def test_read(self): 1478 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1479 1480 self.assertEqual(pair.read(3), b"abc") 1481 self.assertEqual(pair.read(1), b"d") 1482 self.assertEqual(pair.read(), b"ef") 1483 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO()) 1484 self.assertEqual(pair.read(None), b"abc") 1485 1486 def test_readlines(self): 1487 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO()) 1488 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"]) 1489 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"]) 1490 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"]) 1491 1492 def test_read1(self): 1493 # .read1() is delegated to the underlying reader object, so this test 1494 # can be shallow. 1495 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1496 1497 self.assertEqual(pair.read1(3), b"abc") 1498 1499 def test_readinto(self): 1500 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1501 1502 data = byteslike(5) 1503 self.assertEqual(pair.readinto(data), 5) 1504 self.assertEqual(data.tobytes(), b"abcde") 1505 1506 def test_write(self): 1507 w = self.MockRawIO() 1508 pair = self.tp(self.MockRawIO(), w) 1509 1510 pair.write(b"abc") 1511 pair.flush() 1512 buffer = bytearray(b"def") 1513 pair.write(buffer) 1514 buffer[:] = b"***" # Overwrite our copy of the data 1515 pair.flush() 1516 self.assertEqual(w._write_stack, [b"abc", b"def"]) 1517 1518 def test_peek(self): 1519 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1520 1521 self.assertTrue(pair.peek(3).startswith(b"abc")) 1522 self.assertEqual(pair.read(3), b"abc") 1523 1524 def test_readable(self): 1525 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1526 self.assertTrue(pair.readable()) 1527 1528 def test_writeable(self): 1529 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1530 self.assertTrue(pair.writable()) 1531 1532 def test_seekable(self): 1533 # BufferedRWPairs are never seekable, even if their readers and writers 1534 # are. 1535 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1536 self.assertFalse(pair.seekable()) 1537 1538 # .flush() is delegated to the underlying writer object and has been 1539 # tested in the test_write method. 1540 1541 def test_close_and_closed(self): 1542 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1543 self.assertFalse(pair.closed) 1544 pair.close() 1545 self.assertTrue(pair.closed) 1546 1547 def test_reader_close_error_on_close(self): 1548 def reader_close(): 1549 reader_non_existing 1550 reader = self.MockRawIO() 1551 reader.close = reader_close 1552 writer = self.MockRawIO() 1553 pair = self.tp(reader, writer) 1554 with self.assertRaises(NameError) as err: 1555 pair.close() 1556 self.assertIn('reader_non_existing', str(err.exception)) 1557 self.assertTrue(pair.closed) 1558 self.assertFalse(reader.closed) 1559 self.assertTrue(writer.closed) 1560 1561 def test_writer_close_error_on_close(self): 1562 def writer_close(): 1563 writer_non_existing 1564 reader = self.MockRawIO() 1565 writer = self.MockRawIO() 1566 writer.close = writer_close 1567 pair = self.tp(reader, writer) 1568 with self.assertRaises(NameError) as err: 1569 pair.close() 1570 self.assertIn('writer_non_existing', str(err.exception)) 1571 self.assertFalse(pair.closed) 1572 self.assertTrue(reader.closed) 1573 self.assertFalse(writer.closed) 1574 1575 def test_reader_writer_close_error_on_close(self): 1576 def reader_close(): 1577 reader_non_existing 1578 def writer_close(): 1579 writer_non_existing 1580 reader = self.MockRawIO() 1581 reader.close = reader_close 1582 writer = self.MockRawIO() 1583 writer.close = writer_close 1584 pair = self.tp(reader, writer) 1585 with self.assertRaises(NameError) as err: 1586 pair.close() 1587 self.assertIn('reader_non_existing', str(err.exception)) 1588 self.assertFalse(pair.closed) 1589 self.assertFalse(reader.closed) 1590 self.assertFalse(writer.closed) 1591 1592 def test_isatty(self): 1593 class SelectableIsAtty(MockRawIO): 1594 def __init__(self, isatty): 1595 MockRawIO.__init__(self) 1596 self._isatty = isatty 1597 1598 def isatty(self): 1599 return self._isatty 1600 1601 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False)) 1602 self.assertFalse(pair.isatty()) 1603 1604 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False)) 1605 self.assertTrue(pair.isatty()) 1606 1607 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True)) 1608 self.assertTrue(pair.isatty()) 1609 1610 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True)) 1611 self.assertTrue(pair.isatty()) 1612 1613 def test_weakref_clearing(self): 1614 brw = self.tp(self.MockRawIO(), self.MockRawIO()) 1615 ref = weakref.ref(brw) 1616 brw = None 1617 ref = None # Shouldn't segfault. 1618 1619class CBufferedRWPairTest(BufferedRWPairTest): 1620 tp = io.BufferedRWPair 1621 1622class PyBufferedRWPairTest(BufferedRWPairTest): 1623 tp = pyio.BufferedRWPair 1624 1625 1626class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest): 1627 read_mode = "rb+" 1628 write_mode = "wb+" 1629 1630 def test_constructor(self): 1631 BufferedReaderTest.test_constructor(self) 1632 BufferedWriterTest.test_constructor(self) 1633 1634 def test_uninitialized(self): 1635 BufferedReaderTest.test_uninitialized(self) 1636 BufferedWriterTest.test_uninitialized(self) 1637 1638 def test_read_and_write(self): 1639 raw = self.MockRawIO((b"asdf", b"ghjk")) 1640 rw = self.tp(raw, 8) 1641 1642 self.assertEqual(b"as", rw.read(2)) 1643 rw.write(b"ddd") 1644 rw.write(b"eee") 1645 self.assertFalse(raw._write_stack) # Buffer writes 1646 self.assertEqual(b"ghjk", rw.read()) 1647 self.assertEqual(b"dddeee", raw._write_stack[0]) 1648 1649 def test_seek_and_tell(self): 1650 raw = self.BytesIO(b"asdfghjkl") 1651 rw = self.tp(raw) 1652 1653 self.assertEqual(b"as", rw.read(2)) 1654 self.assertEqual(2, rw.tell()) 1655 rw.seek(0, 0) 1656 self.assertEqual(b"asdf", rw.read(4)) 1657 1658 rw.write(b"123f") 1659 rw.seek(0, 0) 1660 self.assertEqual(b"asdf123fl", rw.read()) 1661 self.assertEqual(9, rw.tell()) 1662 rw.seek(-4, 2) 1663 self.assertEqual(5, rw.tell()) 1664 rw.seek(2, 1) 1665 self.assertEqual(7, rw.tell()) 1666 self.assertEqual(b"fl", rw.read(11)) 1667 rw.flush() 1668 self.assertEqual(b"asdf123fl", raw.getvalue()) 1669 1670 self.assertRaises(TypeError, rw.seek, 0.0) 1671 1672 def check_flush_and_read(self, read_func): 1673 raw = self.BytesIO(b"abcdefghi") 1674 bufio = self.tp(raw) 1675 1676 self.assertEqual(b"ab", read_func(bufio, 2)) 1677 bufio.write(b"12") 1678 self.assertEqual(b"ef", read_func(bufio, 2)) 1679 self.assertEqual(6, bufio.tell()) 1680 bufio.flush() 1681 self.assertEqual(6, bufio.tell()) 1682 self.assertEqual(b"ghi", read_func(bufio)) 1683 raw.seek(0, 0) 1684 raw.write(b"XYZ") 1685 # flush() resets the read buffer 1686 bufio.flush() 1687 bufio.seek(0, 0) 1688 self.assertEqual(b"XYZ", read_func(bufio, 3)) 1689 1690 def test_flush_and_read(self): 1691 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args)) 1692 1693 def test_flush_and_readinto(self): 1694 def _readinto(bufio, n=-1): 1695 b = bytearray(n if n >= 0 else 9999) 1696 n = bufio.readinto(b) 1697 return bytes(b[:n]) 1698 self.check_flush_and_read(_readinto) 1699 1700 def test_flush_and_peek(self): 1701 def _peek(bufio, n=-1): 1702 # This relies on the fact that the buffer can contain the whole 1703 # raw stream, otherwise peek() can return less. 1704 b = bufio.peek(n) 1705 if n != -1: 1706 b = b[:n] 1707 bufio.seek(len(b), 1) 1708 return b 1709 self.check_flush_and_read(_peek) 1710 1711 def test_flush_and_write(self): 1712 raw = self.BytesIO(b"abcdefghi") 1713 bufio = self.tp(raw) 1714 1715 bufio.write(b"123") 1716 bufio.flush() 1717 bufio.write(b"45") 1718 bufio.flush() 1719 bufio.seek(0, 0) 1720 self.assertEqual(b"12345fghi", raw.getvalue()) 1721 self.assertEqual(b"12345fghi", bufio.read()) 1722 1723 def test_threads(self): 1724 BufferedReaderTest.test_threads(self) 1725 BufferedWriterTest.test_threads(self) 1726 1727 def test_writes_and_peek(self): 1728 def _peek(bufio): 1729 bufio.peek(1) 1730 self.check_writes(_peek) 1731 def _peek(bufio): 1732 pos = bufio.tell() 1733 bufio.seek(-1, 1) 1734 bufio.peek(1) 1735 bufio.seek(pos, 0) 1736 self.check_writes(_peek) 1737 1738 def test_writes_and_reads(self): 1739 def _read(bufio): 1740 bufio.seek(-1, 1) 1741 bufio.read(1) 1742 self.check_writes(_read) 1743 1744 def test_writes_and_read1s(self): 1745 def _read1(bufio): 1746 bufio.seek(-1, 1) 1747 bufio.read1(1) 1748 self.check_writes(_read1) 1749 1750 def test_writes_and_readintos(self): 1751 def _read(bufio): 1752 bufio.seek(-1, 1) 1753 bufio.readinto(bytearray(1)) 1754 self.check_writes(_read) 1755 1756 def test_write_after_readahead(self): 1757 # Issue #6629: writing after the buffer was filled by readahead should 1758 # first rewind the raw stream. 1759 for overwrite_size in [1, 5]: 1760 raw = self.BytesIO(b"A" * 10) 1761 bufio = self.tp(raw, 4) 1762 # Trigger readahead 1763 self.assertEqual(bufio.read(1), b"A") 1764 self.assertEqual(bufio.tell(), 1) 1765 # Overwriting should rewind the raw stream if it needs so 1766 bufio.write(b"B" * overwrite_size) 1767 self.assertEqual(bufio.tell(), overwrite_size + 1) 1768 # If the write size was smaller than the buffer size, flush() and 1769 # check that rewind happens. 1770 bufio.flush() 1771 self.assertEqual(bufio.tell(), overwrite_size + 1) 1772 s = raw.getvalue() 1773 self.assertEqual(s, 1774 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size)) 1775 1776 def test_write_rewind_write(self): 1777 # Various combinations of reading / writing / seeking backwards / writing again 1778 def mutate(bufio, pos1, pos2): 1779 assert pos2 >= pos1 1780 # Fill the buffer 1781 bufio.seek(pos1) 1782 bufio.read(pos2 - pos1) 1783 bufio.write(b'\x02') 1784 # This writes earlier than the previous write, but still inside 1785 # the buffer. 1786 bufio.seek(pos1) 1787 bufio.write(b'\x01') 1788 1789 b = b"\x80\x81\x82\x83\x84" 1790 for i in range(0, len(b)): 1791 for j in range(i, len(b)): 1792 raw = self.BytesIO(b) 1793 bufio = self.tp(raw, 100) 1794 mutate(bufio, i, j) 1795 bufio.flush() 1796 expected = bytearray(b) 1797 expected[j] = 2 1798 expected[i] = 1 1799 self.assertEqual(raw.getvalue(), expected, 1800 "failed result for i=%d, j=%d" % (i, j)) 1801 1802 def test_truncate_after_read_or_write(self): 1803 raw = self.BytesIO(b"A" * 10) 1804 bufio = self.tp(raw, 100) 1805 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled 1806 self.assertEqual(bufio.truncate(), 2) 1807 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases 1808 self.assertEqual(bufio.truncate(), 4) 1809 1810 def test_misbehaved_io(self): 1811 BufferedReaderTest.test_misbehaved_io(self) 1812 BufferedWriterTest.test_misbehaved_io(self) 1813 1814 def test_interleaved_read_write(self): 1815 # Test for issue #12213 1816 with self.BytesIO(b'abcdefgh') as raw: 1817 with self.tp(raw, 100) as f: 1818 f.write(b"1") 1819 self.assertEqual(f.read(1), b'b') 1820 f.write(b'2') 1821 self.assertEqual(f.read1(1), b'd') 1822 f.write(b'3') 1823 buf = bytearray(1) 1824 f.readinto(buf) 1825 self.assertEqual(buf, b'f') 1826 f.write(b'4') 1827 self.assertEqual(f.peek(1), b'h') 1828 f.flush() 1829 self.assertEqual(raw.getvalue(), b'1b2d3f4h') 1830 1831 with self.BytesIO(b'abc') as raw: 1832 with self.tp(raw, 100) as f: 1833 self.assertEqual(f.read(1), b'a') 1834 f.write(b"2") 1835 self.assertEqual(f.read(1), b'c') 1836 f.flush() 1837 self.assertEqual(raw.getvalue(), b'a2c') 1838 1839 def test_interleaved_readline_write(self): 1840 with self.BytesIO(b'ab\ncdef\ng\n') as raw: 1841 with self.tp(raw) as f: 1842 f.write(b'1') 1843 self.assertEqual(f.readline(), b'b\n') 1844 f.write(b'2') 1845 self.assertEqual(f.readline(), b'def\n') 1846 f.write(b'3') 1847 self.assertEqual(f.readline(), b'\n') 1848 f.flush() 1849 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n') 1850 1851 1852class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, 1853 BufferedRandomTest, SizeofTest): 1854 tp = io.BufferedRandom 1855 1856 def test_constructor(self): 1857 BufferedRandomTest.test_constructor(self) 1858 # The allocation can succeed on 32-bit builds, e.g. with more 1859 # than 2GB RAM and a 64-bit kernel. 1860 if sys.maxsize > 0x7FFFFFFF: 1861 rawio = self.MockRawIO() 1862 bufio = self.tp(rawio) 1863 self.assertRaises((OverflowError, MemoryError, ValueError), 1864 bufio.__init__, rawio, sys.maxsize) 1865 1866 def test_garbage_collection(self): 1867 CBufferedReaderTest.test_garbage_collection(self) 1868 CBufferedWriterTest.test_garbage_collection(self) 1869 1870 def test_args_error(self): 1871 # Issue #17275 1872 with self.assertRaisesRegexp(TypeError, "BufferedRandom"): 1873 self.tp(io.BytesIO(), 1024, 1024, 1024) 1874 1875 1876class PyBufferedRandomTest(BufferedRandomTest): 1877 tp = pyio.BufferedRandom 1878 1879 1880# To fully exercise seek/tell, the StatefulIncrementalDecoder has these 1881# properties: 1882# - A single output character can correspond to many bytes of input. 1883# - The number of input bytes to complete the character can be 1884# undetermined until the last input byte is received. 1885# - The number of input bytes can vary depending on previous input. 1886# - A single input byte can correspond to many characters of output. 1887# - The number of output characters can be undetermined until the 1888# last input byte is received. 1889# - The number of output characters can vary depending on previous input. 1890 1891class StatefulIncrementalDecoder(codecs.IncrementalDecoder): 1892 """ 1893 For testing seek/tell behavior with a stateful, buffering decoder. 1894 1895 Input is a sequence of words. Words may be fixed-length (length set 1896 by input) or variable-length (period-terminated). In variable-length 1897 mode, extra periods are ignored. Possible words are: 1898 - 'i' followed by a number sets the input length, I (maximum 99). 1899 When I is set to 0, words are space-terminated. 1900 - 'o' followed by a number sets the output length, O (maximum 99). 1901 - Any other word is converted into a word followed by a period on 1902 the output. The output word consists of the input word truncated 1903 or padded out with hyphens to make its length equal to O. If O 1904 is 0, the word is output verbatim without truncating or padding. 1905 I and O are initially set to 1. When I changes, any buffered input is 1906 re-scanned according to the new I. EOF also terminates the last word. 1907 """ 1908 1909 def __init__(self, errors='strict'): 1910 codecs.IncrementalDecoder.__init__(self, errors) 1911 self.reset() 1912 1913 def __repr__(self): 1914 return '<SID %x>' % id(self) 1915 1916 def reset(self): 1917 self.i = 1 1918 self.o = 1 1919 self.buffer = bytearray() 1920 1921 def getstate(self): 1922 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset() 1923 return bytes(self.buffer), i*100 + o 1924 1925 def setstate(self, state): 1926 buffer, io = state 1927 self.buffer = bytearray(buffer) 1928 i, o = divmod(io, 100) 1929 self.i, self.o = i ^ 1, o ^ 1 1930 1931 def decode(self, input, final=False): 1932 output = '' 1933 for b in input: 1934 if self.i == 0: # variable-length, terminated with period 1935 if b == '.': 1936 if self.buffer: 1937 output += self.process_word() 1938 else: 1939 self.buffer.append(b) 1940 else: # fixed-length, terminate after self.i bytes 1941 self.buffer.append(b) 1942 if len(self.buffer) == self.i: 1943 output += self.process_word() 1944 if final and self.buffer: # EOF terminates the last word 1945 output += self.process_word() 1946 return output 1947 1948 def process_word(self): 1949 output = '' 1950 if self.buffer[0] == ord('i'): 1951 self.i = min(99, int(self.buffer[1:] or 0)) # set input length 1952 elif self.buffer[0] == ord('o'): 1953 self.o = min(99, int(self.buffer[1:] or 0)) # set output length 1954 else: 1955 output = self.buffer.decode('ascii') 1956 if len(output) < self.o: 1957 output += '-'*self.o # pad out with hyphens 1958 if self.o: 1959 output = output[:self.o] # truncate to output length 1960 output += '.' 1961 self.buffer = bytearray() 1962 return output 1963 1964 codecEnabled = False 1965 1966 @classmethod 1967 def lookupTestDecoder(cls, name): 1968 if cls.codecEnabled and name == 'test_decoder': 1969 latin1 = codecs.lookup('latin-1') 1970 return codecs.CodecInfo( 1971 name='test_decoder', encode=latin1.encode, decode=None, 1972 incrementalencoder=None, 1973 streamreader=None, streamwriter=None, 1974 incrementaldecoder=cls) 1975 1976# Register the previous decoder for testing. 1977# Disabled by default, tests will enable it. 1978codecs.register(StatefulIncrementalDecoder.lookupTestDecoder) 1979 1980 1981class StatefulIncrementalDecoderTest(unittest.TestCase): 1982 """ 1983 Make sure the StatefulIncrementalDecoder actually works. 1984 """ 1985 1986 test_cases = [ 1987 # I=1, O=1 (fixed-length input == fixed-length output) 1988 (b'abcd', False, 'a.b.c.d.'), 1989 # I=0, O=0 (variable-length input, variable-length output) 1990 (b'oiabcd', True, 'abcd.'), 1991 # I=0, O=0 (should ignore extra periods) 1992 (b'oi...abcd...', True, 'abcd.'), 1993 # I=0, O=6 (variable-length input, fixed-length output) 1994 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'), 1995 # I=2, O=6 (fixed-length input < fixed-length output) 1996 (b'i.i2.o6xyz', True, 'xy----.z-----.'), 1997 # I=6, O=3 (fixed-length input > fixed-length output) 1998 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'), 1999 # I=0, then 3; O=29, then 15 (with longer output) 2000 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True, 2001 'a----------------------------.' + 2002 'b----------------------------.' + 2003 'cde--------------------------.' + 2004 'abcdefghijabcde.' + 2005 'a.b------------.' + 2006 '.c.------------.' + 2007 'd.e------------.' + 2008 'k--------------.' + 2009 'l--------------.' + 2010 'm--------------.') 2011 ] 2012 2013 def test_decoder(self): 2014 # Try a few one-shot test cases. 2015 for input, eof, output in self.test_cases: 2016 d = StatefulIncrementalDecoder() 2017 self.assertEqual(d.decode(input, eof), output) 2018 2019 # Also test an unfinished decode, followed by forcing EOF. 2020 d = StatefulIncrementalDecoder() 2021 self.assertEqual(d.decode(b'oiabcd'), '') 2022 self.assertEqual(d.decode(b'', 1), 'abcd.') 2023 2024class TextIOWrapperTest(unittest.TestCase): 2025 2026 def setUp(self): 2027 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n" 2028 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii") 2029 support.unlink(support.TESTFN) 2030 2031 def tearDown(self): 2032 support.unlink(support.TESTFN) 2033 2034 def test_constructor(self): 2035 r = self.BytesIO(b"\xc3\xa9\n\n") 2036 b = self.BufferedReader(r, 1000) 2037 t = self.TextIOWrapper(b) 2038 t.__init__(b, encoding="latin1", newline="\r\n") 2039 self.assertEqual(t.encoding, "latin1") 2040 self.assertEqual(t.line_buffering, False) 2041 t.__init__(b, encoding="utf8", line_buffering=True) 2042 self.assertEqual(t.encoding, "utf8") 2043 self.assertEqual(t.line_buffering, True) 2044 self.assertEqual("\xe9\n", t.readline()) 2045 self.assertRaises(TypeError, t.__init__, b, newline=42) 2046 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy') 2047 2048 def test_uninitialized(self): 2049 t = self.TextIOWrapper.__new__(self.TextIOWrapper) 2050 del t 2051 t = self.TextIOWrapper.__new__(self.TextIOWrapper) 2052 self.assertRaises(Exception, repr, t) 2053 self.assertRaisesRegexp((ValueError, AttributeError), 2054 'uninitialized|has no attribute', 2055 t.read, 0) 2056 t.__init__(self.MockRawIO()) 2057 self.assertEqual(t.read(0), u'') 2058 2059 def test_non_text_encoding_codecs_are_rejected(self): 2060 # Ensure the constructor complains if passed a codec that isn't 2061 # marked as a text encoding 2062 # http://bugs.python.org/issue20404 2063 r = self.BytesIO() 2064 b = self.BufferedWriter(r) 2065 with support.check_py3k_warnings(): 2066 self.TextIOWrapper(b, encoding="hex_codec") 2067 2068 def test_detach(self): 2069 r = self.BytesIO() 2070 b = self.BufferedWriter(r) 2071 t = self.TextIOWrapper(b) 2072 self.assertIs(t.detach(), b) 2073 2074 t = self.TextIOWrapper(b, encoding="ascii") 2075 t.write("howdy") 2076 self.assertFalse(r.getvalue()) 2077 t.detach() 2078 self.assertEqual(r.getvalue(), b"howdy") 2079 self.assertRaises(ValueError, t.detach) 2080 2081 # Operations independent of the detached stream should still work 2082 repr(t) 2083 self.assertEqual(t.encoding, "ascii") 2084 self.assertEqual(t.errors, "strict") 2085 self.assertFalse(t.line_buffering) 2086 2087 def test_repr(self): 2088 raw = self.BytesIO("hello".encode("utf-8")) 2089 b = self.BufferedReader(raw) 2090 t = self.TextIOWrapper(b, encoding="utf-8") 2091 modname = self.TextIOWrapper.__module__ 2092 self.assertEqual(repr(t), 2093 "<%s.TextIOWrapper encoding='utf-8'>" % modname) 2094 raw.name = "dummy" 2095 self.assertEqual(repr(t), 2096 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname) 2097 raw.name = b"dummy" 2098 self.assertEqual(repr(t), 2099 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname) 2100 2101 t.buffer.detach() 2102 repr(t) # Should not raise an exception 2103 2104 def test_line_buffering(self): 2105 r = self.BytesIO() 2106 b = self.BufferedWriter(r, 1000) 2107 t = self.TextIOWrapper(b, newline="\n", line_buffering=True) 2108 t.write("X") 2109 self.assertEqual(r.getvalue(), b"") # No flush happened 2110 t.write("Y\nZ") 2111 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed 2112 t.write("A\rB") 2113 self.assertEqual(r.getvalue(), b"XY\nZA\rB") 2114 2115 def test_encoding(self): 2116 # Check the encoding attribute is always set, and valid 2117 b = self.BytesIO() 2118 t = self.TextIOWrapper(b, encoding="utf8") 2119 self.assertEqual(t.encoding, "utf8") 2120 t = self.TextIOWrapper(b) 2121 self.assertIsNotNone(t.encoding) 2122 codecs.lookup(t.encoding) 2123 2124 def test_encoding_errors_reading(self): 2125 # (1) default 2126 b = self.BytesIO(b"abc\n\xff\n") 2127 t = self.TextIOWrapper(b, encoding="ascii") 2128 self.assertRaises(UnicodeError, t.read) 2129 # (2) explicit strict 2130 b = self.BytesIO(b"abc\n\xff\n") 2131 t = self.TextIOWrapper(b, encoding="ascii", errors="strict") 2132 self.assertRaises(UnicodeError, t.read) 2133 # (3) ignore 2134 b = self.BytesIO(b"abc\n\xff\n") 2135 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore") 2136 self.assertEqual(t.read(), "abc\n\n") 2137 # (4) replace 2138 b = self.BytesIO(b"abc\n\xff\n") 2139 t = self.TextIOWrapper(b, encoding="ascii", errors="replace") 2140 self.assertEqual(t.read(), "abc\n\ufffd\n") 2141 2142 def test_encoding_errors_writing(self): 2143 # (1) default 2144 b = self.BytesIO() 2145 t = self.TextIOWrapper(b, encoding="ascii") 2146 self.assertRaises(UnicodeError, t.write, "\xff") 2147 # (2) explicit strict 2148 b = self.BytesIO() 2149 t = self.TextIOWrapper(b, encoding="ascii", errors="strict") 2150 self.assertRaises(UnicodeError, t.write, "\xff") 2151 # (3) ignore 2152 b = self.BytesIO() 2153 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore", 2154 newline="\n") 2155 t.write("abc\xffdef\n") 2156 t.flush() 2157 self.assertEqual(b.getvalue(), b"abcdef\n") 2158 # (4) replace 2159 b = self.BytesIO() 2160 t = self.TextIOWrapper(b, encoding="ascii", errors="replace", 2161 newline="\n") 2162 t.write("abc\xffdef\n") 2163 t.flush() 2164 self.assertEqual(b.getvalue(), b"abc?def\n") 2165 2166 def test_newlines(self): 2167 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ] 2168 2169 tests = [ 2170 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ], 2171 [ '', input_lines ], 2172 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ], 2173 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ], 2174 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ], 2175 ] 2176 encodings = ( 2177 'utf-8', 'latin-1', 2178 'utf-16', 'utf-16-le', 'utf-16-be', 2179 'utf-32', 'utf-32-le', 'utf-32-be', 2180 ) 2181 2182 # Try a range of buffer sizes to test the case where \r is the last 2183 # character in TextIOWrapper._pending_line. 2184 for encoding in encodings: 2185 # XXX: str.encode() should return bytes 2186 data = bytes(''.join(input_lines).encode(encoding)) 2187 for do_reads in (False, True): 2188 for bufsize in range(1, 10): 2189 for newline, exp_lines in tests: 2190 bufio = self.BufferedReader(self.BytesIO(data), bufsize) 2191 textio = self.TextIOWrapper(bufio, newline=newline, 2192 encoding=encoding) 2193 if do_reads: 2194 got_lines = [] 2195 while True: 2196 c2 = textio.read(2) 2197 if c2 == '': 2198 break 2199 self.assertEqual(len(c2), 2) 2200 got_lines.append(c2 + textio.readline()) 2201 else: 2202 got_lines = list(textio) 2203 2204 for got_line, exp_line in zip(got_lines, exp_lines): 2205 self.assertEqual(got_line, exp_line) 2206 self.assertEqual(len(got_lines), len(exp_lines)) 2207 2208 def test_newlines_input(self): 2209 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG" 2210 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n") 2211 for newline, expected in [ 2212 (None, normalized.decode("ascii").splitlines(True)), 2213 ("", testdata.decode("ascii").splitlines(True)), 2214 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), 2215 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), 2216 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]), 2217 ]: 2218 buf = self.BytesIO(testdata) 2219 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) 2220 self.assertEqual(txt.readlines(), expected) 2221 txt.seek(0) 2222 self.assertEqual(txt.read(), "".join(expected)) 2223 2224 def test_newlines_output(self): 2225 testdict = { 2226 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ", 2227 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ", 2228 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ", 2229 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ", 2230 } 2231 tests = [(None, testdict[os.linesep])] + sorted(testdict.items()) 2232 for newline, expected in tests: 2233 buf = self.BytesIO() 2234 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) 2235 txt.write("AAA\nB") 2236 txt.write("BB\nCCC\n") 2237 txt.write("X\rY\r\nZ") 2238 txt.flush() 2239 self.assertEqual(buf.closed, False) 2240 self.assertEqual(buf.getvalue(), expected) 2241 2242 def test_destructor(self): 2243 l = [] 2244 base = self.BytesIO 2245 class MyBytesIO(base): 2246 def close(self): 2247 l.append(self.getvalue()) 2248 base.close(self) 2249 b = MyBytesIO() 2250 t = self.TextIOWrapper(b, encoding="ascii") 2251 t.write("abc") 2252 del t 2253 support.gc_collect() 2254 self.assertEqual([b"abc"], l) 2255 2256 def test_override_destructor(self): 2257 record = [] 2258 class MyTextIO(self.TextIOWrapper): 2259 def __del__(self): 2260 record.append(1) 2261 try: 2262 f = super(MyTextIO, self).__del__ 2263 except AttributeError: 2264 pass 2265 else: 2266 f() 2267 def close(self): 2268 record.append(2) 2269 super(MyTextIO, self).close() 2270 def flush(self): 2271 record.append(3) 2272 super(MyTextIO, self).flush() 2273 b = self.BytesIO() 2274 t = MyTextIO(b, encoding="ascii") 2275 del t 2276 support.gc_collect() 2277 self.assertEqual(record, [1, 2, 3]) 2278 2279 def test_error_through_destructor(self): 2280 # Test that the exception state is not modified by a destructor, 2281 # even if close() fails. 2282 rawio = self.CloseFailureIO() 2283 def f(): 2284 self.TextIOWrapper(rawio).xyzzy 2285 with support.captured_output("stderr") as s: 2286 self.assertRaises(AttributeError, f) 2287 s = s.getvalue().strip() 2288 if s: 2289 # The destructor *may* have printed an unraisable error, check it 2290 self.assertEqual(len(s.splitlines()), 1) 2291 self.assertTrue(s.startswith("Exception IOError: "), s) 2292 self.assertTrue(s.endswith(" ignored"), s) 2293 2294 # Systematic tests of the text I/O API 2295 2296 def test_basic_io(self): 2297 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65): 2298 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le": 2299 f = self.open(support.TESTFN, "w+", encoding=enc) 2300 f._CHUNK_SIZE = chunksize 2301 self.assertEqual(f.write("abc"), 3) 2302 f.close() 2303 f = self.open(support.TESTFN, "r+", encoding=enc) 2304 f._CHUNK_SIZE = chunksize 2305 self.assertEqual(f.tell(), 0) 2306 self.assertEqual(f.read(), "abc") 2307 cookie = f.tell() 2308 self.assertEqual(f.seek(0), 0) 2309 self.assertEqual(f.read(None), "abc") 2310 f.seek(0) 2311 self.assertEqual(f.read(2), "ab") 2312 self.assertEqual(f.read(1), "c") 2313 self.assertEqual(f.read(1), "") 2314 self.assertEqual(f.read(), "") 2315 self.assertEqual(f.tell(), cookie) 2316 self.assertEqual(f.seek(0), 0) 2317 self.assertEqual(f.seek(0, 2), cookie) 2318 self.assertEqual(f.write("def"), 3) 2319 self.assertEqual(f.seek(cookie), cookie) 2320 self.assertEqual(f.read(), "def") 2321 if enc.startswith("utf"): 2322 self.multi_line_test(f, enc) 2323 f.close() 2324 2325 def multi_line_test(self, f, enc): 2326 f.seek(0) 2327 f.truncate() 2328 sample = "s\xff\u0fff\uffff" 2329 wlines = [] 2330 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000): 2331 chars = [] 2332 for i in range(size): 2333 chars.append(sample[i % len(sample)]) 2334 line = "".join(chars) + "\n" 2335 wlines.append((f.tell(), line)) 2336 f.write(line) 2337 f.seek(0) 2338 rlines = [] 2339 while True: 2340 pos = f.tell() 2341 line = f.readline() 2342 if not line: 2343 break 2344 rlines.append((pos, line)) 2345 self.assertEqual(rlines, wlines) 2346 2347 def test_telling(self): 2348 f = self.open(support.TESTFN, "w+", encoding="utf8") 2349 p0 = f.tell() 2350 f.write("\xff\n") 2351 p1 = f.tell() 2352 f.write("\xff\n") 2353 p2 = f.tell() 2354 f.seek(0) 2355 self.assertEqual(f.tell(), p0) 2356 self.assertEqual(f.readline(), "\xff\n") 2357 self.assertEqual(f.tell(), p1) 2358 self.assertEqual(f.readline(), "\xff\n") 2359 self.assertEqual(f.tell(), p2) 2360 f.seek(0) 2361 for line in f: 2362 self.assertEqual(line, "\xff\n") 2363 self.assertRaises(IOError, f.tell) 2364 self.assertEqual(f.tell(), p2) 2365 f.close() 2366 2367 def test_seeking(self): 2368 chunk_size = _default_chunk_size() 2369 prefix_size = chunk_size - 2 2370 u_prefix = "a" * prefix_size 2371 prefix = bytes(u_prefix.encode("utf-8")) 2372 self.assertEqual(len(u_prefix), len(prefix)) 2373 u_suffix = "\u8888\n" 2374 suffix = bytes(u_suffix.encode("utf-8")) 2375 line = prefix + suffix 2376 f = self.open(support.TESTFN, "wb") 2377 f.write(line*2) 2378 f.close() 2379 f = self.open(support.TESTFN, "r", encoding="utf-8") 2380 s = f.read(prefix_size) 2381 self.assertEqual(s, prefix.decode("ascii")) 2382 self.assertEqual(f.tell(), prefix_size) 2383 self.assertEqual(f.readline(), u_suffix) 2384 2385 def test_seeking_too(self): 2386 # Regression test for a specific bug 2387 data = b'\xe0\xbf\xbf\n' 2388 f = self.open(support.TESTFN, "wb") 2389 f.write(data) 2390 f.close() 2391 f = self.open(support.TESTFN, "r", encoding="utf-8") 2392 f._CHUNK_SIZE # Just test that it exists 2393 f._CHUNK_SIZE = 2 2394 f.readline() 2395 f.tell() 2396 2397 def test_seek_and_tell(self): 2398 #Test seek/tell using the StatefulIncrementalDecoder. 2399 # Make test faster by doing smaller seeks 2400 CHUNK_SIZE = 128 2401 2402 def test_seek_and_tell_with_data(data, min_pos=0): 2403 """Tell/seek to various points within a data stream and ensure 2404 that the decoded data returned by read() is consistent.""" 2405 f = self.open(support.TESTFN, 'wb') 2406 f.write(data) 2407 f.close() 2408 f = self.open(support.TESTFN, encoding='test_decoder') 2409 f._CHUNK_SIZE = CHUNK_SIZE 2410 decoded = f.read() 2411 f.close() 2412 2413 for i in range(min_pos, len(decoded) + 1): # seek positions 2414 for j in [1, 5, len(decoded) - i]: # read lengths 2415 f = self.open(support.TESTFN, encoding='test_decoder') 2416 self.assertEqual(f.read(i), decoded[:i]) 2417 cookie = f.tell() 2418 self.assertEqual(f.read(j), decoded[i:i + j]) 2419 f.seek(cookie) 2420 self.assertEqual(f.read(), decoded[i:]) 2421 f.close() 2422 2423 # Enable the test decoder. 2424 StatefulIncrementalDecoder.codecEnabled = 1 2425 2426 # Run the tests. 2427 try: 2428 # Try each test case. 2429 for input, _, _ in StatefulIncrementalDecoderTest.test_cases: 2430 test_seek_and_tell_with_data(input) 2431 2432 # Position each test case so that it crosses a chunk boundary. 2433 for input, _, _ in StatefulIncrementalDecoderTest.test_cases: 2434 offset = CHUNK_SIZE - len(input)//2 2435 prefix = b'.'*offset 2436 # Don't bother seeking into the prefix (takes too long). 2437 min_pos = offset*2 2438 test_seek_and_tell_with_data(prefix + input, min_pos) 2439 2440 # Ensure our test decoder won't interfere with subsequent tests. 2441 finally: 2442 StatefulIncrementalDecoder.codecEnabled = 0 2443 2444 def test_encoded_writes(self): 2445 data = "1234567890" 2446 tests = ("utf-16", 2447 "utf-16-le", 2448 "utf-16-be", 2449 "utf-32", 2450 "utf-32-le", 2451 "utf-32-be") 2452 for encoding in tests: 2453 buf = self.BytesIO() 2454 f = self.TextIOWrapper(buf, encoding=encoding) 2455 # Check if the BOM is written only once (see issue1753). 2456 f.write(data) 2457 f.write(data) 2458 f.seek(0) 2459 self.assertEqual(f.read(), data * 2) 2460 f.seek(0) 2461 self.assertEqual(f.read(), data * 2) 2462 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding)) 2463 2464 def test_unreadable(self): 2465 class UnReadable(self.BytesIO): 2466 def readable(self): 2467 return False 2468 txt = self.TextIOWrapper(UnReadable()) 2469 self.assertRaises(IOError, txt.read) 2470 2471 def test_read_one_by_one(self): 2472 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB")) 2473 reads = "" 2474 while True: 2475 c = txt.read(1) 2476 if not c: 2477 break 2478 reads += c 2479 self.assertEqual(reads, "AA\nBB") 2480 2481 def test_readlines(self): 2482 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC")) 2483 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"]) 2484 txt.seek(0) 2485 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"]) 2486 txt.seek(0) 2487 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"]) 2488 2489 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128. 2490 def test_read_by_chunk(self): 2491 # make sure "\r\n" straddles 128 char boundary. 2492 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB")) 2493 reads = "" 2494 while True: 2495 c = txt.read(128) 2496 if not c: 2497 break 2498 reads += c 2499 self.assertEqual(reads, "A"*127+"\nB") 2500 2501 def test_writelines(self): 2502 l = ['ab', 'cd', 'ef'] 2503 buf = self.BytesIO() 2504 txt = self.TextIOWrapper(buf) 2505 txt.writelines(l) 2506 txt.flush() 2507 self.assertEqual(buf.getvalue(), b'abcdef') 2508 2509 def test_writelines_userlist(self): 2510 l = UserList(['ab', 'cd', 'ef']) 2511 buf = self.BytesIO() 2512 txt = self.TextIOWrapper(buf) 2513 txt.writelines(l) 2514 txt.flush() 2515 self.assertEqual(buf.getvalue(), b'abcdef') 2516 2517 def test_writelines_error(self): 2518 txt = self.TextIOWrapper(self.BytesIO()) 2519 self.assertRaises(TypeError, txt.writelines, [1, 2, 3]) 2520 self.assertRaises(TypeError, txt.writelines, None) 2521 self.assertRaises(TypeError, txt.writelines, b'abc') 2522 2523 def test_issue1395_1(self): 2524 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2525 2526 # read one char at a time 2527 reads = "" 2528 while True: 2529 c = txt.read(1) 2530 if not c: 2531 break 2532 reads += c 2533 self.assertEqual(reads, self.normalized) 2534 2535 def test_issue1395_2(self): 2536 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2537 txt._CHUNK_SIZE = 4 2538 2539 reads = "" 2540 while True: 2541 c = txt.read(4) 2542 if not c: 2543 break 2544 reads += c 2545 self.assertEqual(reads, self.normalized) 2546 2547 def test_issue1395_3(self): 2548 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2549 txt._CHUNK_SIZE = 4 2550 2551 reads = txt.read(4) 2552 reads += txt.read(4) 2553 reads += txt.readline() 2554 reads += txt.readline() 2555 reads += txt.readline() 2556 self.assertEqual(reads, self.normalized) 2557 2558 def test_issue1395_4(self): 2559 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2560 txt._CHUNK_SIZE = 4 2561 2562 reads = txt.read(4) 2563 reads += txt.read() 2564 self.assertEqual(reads, self.normalized) 2565 2566 def test_issue1395_5(self): 2567 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2568 txt._CHUNK_SIZE = 4 2569 2570 reads = txt.read(4) 2571 pos = txt.tell() 2572 txt.seek(0) 2573 txt.seek(pos) 2574 self.assertEqual(txt.read(4), "BBB\n") 2575 2576 def test_issue2282(self): 2577 buffer = self.BytesIO(self.testdata) 2578 txt = self.TextIOWrapper(buffer, encoding="ascii") 2579 2580 self.assertEqual(buffer.seekable(), txt.seekable()) 2581 2582 def test_append_bom(self): 2583 # The BOM is not written again when appending to a non-empty file 2584 filename = support.TESTFN 2585 for charset in ('utf-8-sig', 'utf-16', 'utf-32'): 2586 with self.open(filename, 'w', encoding=charset) as f: 2587 f.write('aaa') 2588 pos = f.tell() 2589 with self.open(filename, 'rb') as f: 2590 self.assertEqual(f.read(), 'aaa'.encode(charset)) 2591 2592 with self.open(filename, 'a', encoding=charset) as f: 2593 f.write('xxx') 2594 with self.open(filename, 'rb') as f: 2595 self.assertEqual(f.read(), 'aaaxxx'.encode(charset)) 2596 2597 def test_seek_bom(self): 2598 # Same test, but when seeking manually 2599 filename = support.TESTFN 2600 for charset in ('utf-8-sig', 'utf-16', 'utf-32'): 2601 with self.open(filename, 'w', encoding=charset) as f: 2602 f.write('aaa') 2603 pos = f.tell() 2604 with self.open(filename, 'r+', encoding=charset) as f: 2605 f.seek(pos) 2606 f.write('zzz') 2607 f.seek(0) 2608 f.write('bbb') 2609 with self.open(filename, 'rb') as f: 2610 self.assertEqual(f.read(), 'bbbzzz'.encode(charset)) 2611 2612 def test_errors_property(self): 2613 with self.open(support.TESTFN, "w") as f: 2614 self.assertEqual(f.errors, "strict") 2615 with self.open(support.TESTFN, "w", errors="replace") as f: 2616 self.assertEqual(f.errors, "replace") 2617 2618 @unittest.skipUnless(threading, 'Threading required for this test.') 2619 def test_threads_write(self): 2620 # Issue6750: concurrent writes could duplicate data 2621 event = threading.Event() 2622 with self.open(support.TESTFN, "w", buffering=1) as f: 2623 def run(n): 2624 text = "Thread%03d\n" % n 2625 event.wait() 2626 f.write(text) 2627 threads = [threading.Thread(target=run, args=(x,)) 2628 for x in range(20)] 2629 with support.start_threads(threads, event.set): 2630 time.sleep(0.02) 2631 with self.open(support.TESTFN) as f: 2632 content = f.read() 2633 for n in range(20): 2634 self.assertEqual(content.count("Thread%03d\n" % n), 1) 2635 2636 def test_flush_error_on_close(self): 2637 # Test that text file is closed despite failed flush 2638 # and that flush() is called before file closed. 2639 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2640 closed = [] 2641 def bad_flush(): 2642 closed[:] = [txt.closed, txt.buffer.closed] 2643 raise IOError() 2644 txt.flush = bad_flush 2645 self.assertRaises(IOError, txt.close) # exception not swallowed 2646 self.assertTrue(txt.closed) 2647 self.assertTrue(txt.buffer.closed) 2648 self.assertTrue(closed) # flush() called 2649 self.assertFalse(closed[0]) # flush() called before file closed 2650 self.assertFalse(closed[1]) 2651 txt.flush = lambda: None # break reference loop 2652 2653 def test_multi_close(self): 2654 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2655 txt.close() 2656 txt.close() 2657 txt.close() 2658 self.assertRaises(ValueError, txt.flush) 2659 2660 def test_readonly_attributes(self): 2661 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2662 buf = self.BytesIO(self.testdata) 2663 with self.assertRaises((AttributeError, TypeError)): 2664 txt.buffer = buf 2665 2666 def test_read_nonbytes(self): 2667 # Issue #17106 2668 # Crash when underlying read() returns non-bytes 2669 class NonbytesStream(self.StringIO): 2670 read1 = self.StringIO.read 2671 class NonbytesStream(self.StringIO): 2672 read1 = self.StringIO.read 2673 t = self.TextIOWrapper(NonbytesStream('a')) 2674 with self.maybeRaises(TypeError): 2675 t.read(1) 2676 t = self.TextIOWrapper(NonbytesStream('a')) 2677 with self.maybeRaises(TypeError): 2678 t.readline() 2679 t = self.TextIOWrapper(NonbytesStream('a')) 2680 self.assertEqual(t.read(), u'a') 2681 2682 def test_illegal_encoder(self): 2683 # bpo-31271: A TypeError should be raised in case the return value of 2684 # encoder's encode() is invalid. 2685 class BadEncoder: 2686 def encode(self, dummy): 2687 return u'spam' 2688 def get_bad_encoder(dummy): 2689 return BadEncoder() 2690 rot13 = codecs.lookup("rot13") 2691 with support.swap_attr(rot13, '_is_text_encoding', True), \ 2692 support.swap_attr(rot13, 'incrementalencoder', get_bad_encoder): 2693 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13") 2694 with self.assertRaises(TypeError): 2695 t.write('bar') 2696 t.flush() 2697 2698 def test_illegal_decoder(self): 2699 # Issue #17106 2700 # Bypass the early encoding check added in issue 20404 2701 def _make_illegal_wrapper(): 2702 quopri = codecs.lookup("quopri_codec") 2703 quopri._is_text_encoding = True 2704 try: 2705 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), 2706 newline='\n', encoding="quopri_codec") 2707 finally: 2708 quopri._is_text_encoding = False 2709 return t 2710 # Crash when decoder returns non-string 2711 with support.check_py3k_warnings(): 2712 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n', 2713 encoding='quopri_codec') 2714 with self.maybeRaises(TypeError): 2715 t.read(1) 2716 with support.check_py3k_warnings(): 2717 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n', 2718 encoding='quopri_codec') 2719 with self.maybeRaises(TypeError): 2720 t.readline() 2721 with support.check_py3k_warnings(): 2722 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n', 2723 encoding='quopri_codec') 2724 with self.maybeRaises(TypeError): 2725 t.read() 2726 #else: 2727 #t = _make_illegal_wrapper() 2728 #self.assertRaises(TypeError, t.read, 1) 2729 #t = _make_illegal_wrapper() 2730 #self.assertRaises(TypeError, t.readline) 2731 #t = _make_illegal_wrapper() 2732 #self.assertRaises(TypeError, t.read) 2733 2734 # Issue 31243: calling read() while the return value of decoder's 2735 # getstate() is invalid should neither crash the interpreter nor 2736 # raise a SystemError. 2737 def _make_very_illegal_wrapper(getstate_ret_val): 2738 class BadDecoder: 2739 def getstate(self): 2740 return getstate_ret_val 2741 def _get_bad_decoder(dummy): 2742 return BadDecoder() 2743 quopri = codecs.lookup("quopri_codec") 2744 with support.swap_attr(quopri, 'incrementaldecoder', 2745 _get_bad_decoder): 2746 return _make_illegal_wrapper() 2747 t = _make_very_illegal_wrapper(42) 2748 with self.maybeRaises(TypeError): 2749 t.read(42) 2750 t = _make_very_illegal_wrapper(()) 2751 with self.maybeRaises(TypeError): 2752 t.read(42) 2753 2754 def test_issue25862(self): 2755 # Assertion failures occurred in tell() after read() and write(). 2756 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii') 2757 t.read(1) 2758 t.read() 2759 t.tell() 2760 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii') 2761 t.read(1) 2762 t.write('x') 2763 t.tell() 2764 2765 2766class CTextIOWrapperTest(TextIOWrapperTest): 2767 2768 def test_initialization(self): 2769 r = self.BytesIO(b"\xc3\xa9\n\n") 2770 b = self.BufferedReader(r, 1000) 2771 t = self.TextIOWrapper(b) 2772 self.assertRaises(TypeError, t.__init__, b, newline=42) 2773 self.assertRaises(ValueError, t.read) 2774 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy') 2775 self.assertRaises(ValueError, t.read) 2776 2777 t = self.TextIOWrapper.__new__(self.TextIOWrapper) 2778 self.assertRaises(Exception, repr, t) 2779 2780 def test_garbage_collection(self): 2781 # C TextIOWrapper objects are collected, and collecting them flushes 2782 # all data to disk. 2783 # The Python version has __del__, so it ends in gc.garbage instead. 2784 rawio = io.FileIO(support.TESTFN, "wb") 2785 b = self.BufferedWriter(rawio) 2786 t = self.TextIOWrapper(b, encoding="ascii") 2787 t.write("456def") 2788 t.x = t 2789 wr = weakref.ref(t) 2790 del t 2791 support.gc_collect() 2792 self.assertIsNone(wr(), wr) 2793 with self.open(support.TESTFN, "rb") as f: 2794 self.assertEqual(f.read(), b"456def") 2795 2796 def test_rwpair_cleared_before_textio(self): 2797 # Issue 13070: TextIOWrapper's finalization would crash when called 2798 # after the reference to the underlying BufferedRWPair's writer got 2799 # cleared by the GC. 2800 for i in range(1000): 2801 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()) 2802 t1 = self.TextIOWrapper(b1, encoding="ascii") 2803 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()) 2804 t2 = self.TextIOWrapper(b2, encoding="ascii") 2805 # circular references 2806 t1.buddy = t2 2807 t2.buddy = t1 2808 support.gc_collect() 2809 2810 def test_del__CHUNK_SIZE_SystemError(self): 2811 t = self.TextIOWrapper(self.BytesIO(), encoding='ascii') 2812 with self.assertRaises(AttributeError): 2813 del t._CHUNK_SIZE 2814 2815 maybeRaises = unittest.TestCase.assertRaises 2816 2817 2818class PyTextIOWrapperTest(TextIOWrapperTest): 2819 @contextlib.contextmanager 2820 def maybeRaises(self, *args, **kwds): 2821 yield 2822 2823 2824class IncrementalNewlineDecoderTest(unittest.TestCase): 2825 2826 def check_newline_decoding_utf8(self, decoder): 2827 # UTF-8 specific tests for a newline decoder 2828 def _check_decode(b, s, **kwargs): 2829 # We exercise getstate() / setstate() as well as decode() 2830 state = decoder.getstate() 2831 self.assertEqual(decoder.decode(b, **kwargs), s) 2832 decoder.setstate(state) 2833 self.assertEqual(decoder.decode(b, **kwargs), s) 2834 2835 _check_decode(b'\xe8\xa2\x88', "\u8888") 2836 2837 _check_decode(b'\xe8', "") 2838 _check_decode(b'\xa2', "") 2839 _check_decode(b'\x88', "\u8888") 2840 2841 _check_decode(b'\xe8', "") 2842 _check_decode(b'\xa2', "") 2843 _check_decode(b'\x88', "\u8888") 2844 2845 _check_decode(b'\xe8', "") 2846 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True) 2847 2848 decoder.reset() 2849 _check_decode(b'\n', "\n") 2850 _check_decode(b'\r', "") 2851 _check_decode(b'', "\n", final=True) 2852 _check_decode(b'\r', "\n", final=True) 2853 2854 _check_decode(b'\r', "") 2855 _check_decode(b'a', "\na") 2856 2857 _check_decode(b'\r\r\n', "\n\n") 2858 _check_decode(b'\r', "") 2859 _check_decode(b'\r', "\n") 2860 _check_decode(b'\na', "\na") 2861 2862 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n") 2863 _check_decode(b'\xe8\xa2\x88', "\u8888") 2864 _check_decode(b'\n', "\n") 2865 _check_decode(b'\xe8\xa2\x88\r', "\u8888") 2866 _check_decode(b'\n', "\n") 2867 2868 def check_newline_decoding(self, decoder, encoding): 2869 result = [] 2870 if encoding is not None: 2871 encoder = codecs.getincrementalencoder(encoding)() 2872 def _decode_bytewise(s): 2873 # Decode one byte at a time 2874 for b in encoder.encode(s): 2875 result.append(decoder.decode(b)) 2876 else: 2877 encoder = None 2878 def _decode_bytewise(s): 2879 # Decode one char at a time 2880 for c in s: 2881 result.append(decoder.decode(c)) 2882 self.assertEqual(decoder.newlines, None) 2883 _decode_bytewise("abc\n\r") 2884 self.assertEqual(decoder.newlines, '\n') 2885 _decode_bytewise("\nabc") 2886 self.assertEqual(decoder.newlines, ('\n', '\r\n')) 2887 _decode_bytewise("abc\r") 2888 self.assertEqual(decoder.newlines, ('\n', '\r\n')) 2889 _decode_bytewise("abc") 2890 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n')) 2891 _decode_bytewise("abc\r") 2892 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc") 2893 decoder.reset() 2894 input = "abc" 2895 if encoder is not None: 2896 encoder.reset() 2897 input = encoder.encode(input) 2898 self.assertEqual(decoder.decode(input), "abc") 2899 self.assertEqual(decoder.newlines, None) 2900 2901 def test_newline_decoder(self): 2902 encodings = ( 2903 # None meaning the IncrementalNewlineDecoder takes unicode input 2904 # rather than bytes input 2905 None, 'utf-8', 'latin-1', 2906 'utf-16', 'utf-16-le', 'utf-16-be', 2907 'utf-32', 'utf-32-le', 'utf-32-be', 2908 ) 2909 for enc in encodings: 2910 decoder = enc and codecs.getincrementaldecoder(enc)() 2911 decoder = self.IncrementalNewlineDecoder(decoder, translate=True) 2912 self.check_newline_decoding(decoder, enc) 2913 decoder = codecs.getincrementaldecoder("utf-8")() 2914 decoder = self.IncrementalNewlineDecoder(decoder, translate=True) 2915 self.check_newline_decoding_utf8(decoder) 2916 2917 def test_newline_bytes(self): 2918 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder 2919 def _check(dec): 2920 self.assertEqual(dec.newlines, None) 2921 self.assertEqual(dec.decode("\u0D00"), "\u0D00") 2922 self.assertEqual(dec.newlines, None) 2923 self.assertEqual(dec.decode("\u0A00"), "\u0A00") 2924 self.assertEqual(dec.newlines, None) 2925 dec = self.IncrementalNewlineDecoder(None, translate=False) 2926 _check(dec) 2927 dec = self.IncrementalNewlineDecoder(None, translate=True) 2928 _check(dec) 2929 2930 def test_translate(self): 2931 # issue 35062 2932 for translate in (-2, -1, 1, 2): 2933 decoder = codecs.getincrementaldecoder("utf-8")() 2934 decoder = self.IncrementalNewlineDecoder(decoder, translate) 2935 self.check_newline_decoding_utf8(decoder) 2936 decoder = codecs.getincrementaldecoder("utf-8")() 2937 decoder = self.IncrementalNewlineDecoder(decoder, translate=0) 2938 self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n") 2939 2940class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest): 2941 pass 2942 2943class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest): 2944 pass 2945 2946 2947# XXX Tests for open() 2948 2949class MiscIOTest(unittest.TestCase): 2950 2951 def tearDown(self): 2952 support.unlink(support.TESTFN) 2953 2954 def test___all__(self): 2955 for name in self.io.__all__: 2956 obj = getattr(self.io, name, None) 2957 self.assertIsNotNone(obj, name) 2958 if name == "open": 2959 continue 2960 elif "error" in name.lower() or name == "UnsupportedOperation": 2961 self.assertTrue(issubclass(obj, Exception), name) 2962 elif not name.startswith("SEEK_"): 2963 self.assertTrue(issubclass(obj, self.IOBase)) 2964 2965 def test_attributes(self): 2966 f = self.open(support.TESTFN, "wb", buffering=0) 2967 self.assertEqual(f.mode, "wb") 2968 f.close() 2969 2970 f = self.open(support.TESTFN, "U") 2971 self.assertEqual(f.name, support.TESTFN) 2972 self.assertEqual(f.buffer.name, support.TESTFN) 2973 self.assertEqual(f.buffer.raw.name, support.TESTFN) 2974 self.assertEqual(f.mode, "U") 2975 self.assertEqual(f.buffer.mode, "rb") 2976 self.assertEqual(f.buffer.raw.mode, "rb") 2977 f.close() 2978 2979 f = self.open(support.TESTFN, "w+") 2980 self.assertEqual(f.mode, "w+") 2981 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter? 2982 self.assertEqual(f.buffer.raw.mode, "rb+") 2983 2984 g = self.open(f.fileno(), "wb", closefd=False) 2985 self.assertEqual(g.mode, "wb") 2986 self.assertEqual(g.raw.mode, "wb") 2987 self.assertEqual(g.name, f.fileno()) 2988 self.assertEqual(g.raw.name, f.fileno()) 2989 f.close() 2990 g.close() 2991 2992 def test_io_after_close(self): 2993 for kwargs in [ 2994 {"mode": "w"}, 2995 {"mode": "wb"}, 2996 {"mode": "w", "buffering": 1}, 2997 {"mode": "w", "buffering": 2}, 2998 {"mode": "wb", "buffering": 0}, 2999 {"mode": "r"}, 3000 {"mode": "rb"}, 3001 {"mode": "r", "buffering": 1}, 3002 {"mode": "r", "buffering": 2}, 3003 {"mode": "rb", "buffering": 0}, 3004 {"mode": "w+"}, 3005 {"mode": "w+b"}, 3006 {"mode": "w+", "buffering": 1}, 3007 {"mode": "w+", "buffering": 2}, 3008 {"mode": "w+b", "buffering": 0}, 3009 ]: 3010 f = self.open(support.TESTFN, **kwargs) 3011 f.close() 3012 self.assertRaises(ValueError, f.flush) 3013 self.assertRaises(ValueError, f.fileno) 3014 self.assertRaises(ValueError, f.isatty) 3015 self.assertRaises(ValueError, f.__iter__) 3016 if hasattr(f, "peek"): 3017 self.assertRaises(ValueError, f.peek, 1) 3018 self.assertRaises(ValueError, f.read) 3019 if hasattr(f, "read1"): 3020 self.assertRaises(ValueError, f.read1, 1024) 3021 if hasattr(f, "readall"): 3022 self.assertRaises(ValueError, f.readall) 3023 if hasattr(f, "readinto"): 3024 self.assertRaises(ValueError, f.readinto, bytearray(1024)) 3025 self.assertRaises(ValueError, f.readline) 3026 self.assertRaises(ValueError, f.readlines) 3027 self.assertRaises(ValueError, f.readlines, 1) 3028 self.assertRaises(ValueError, f.seek, 0) 3029 self.assertRaises(ValueError, f.tell) 3030 self.assertRaises(ValueError, f.truncate) 3031 self.assertRaises(ValueError, f.write, 3032 b"" if "b" in kwargs['mode'] else "") 3033 self.assertRaises(ValueError, f.writelines, []) 3034 self.assertRaises(ValueError, next, f) 3035 3036 def test_blockingioerror(self): 3037 # Various BlockingIOError issues 3038 self.assertRaises(TypeError, self.BlockingIOError) 3039 self.assertRaises(TypeError, self.BlockingIOError, 1) 3040 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4) 3041 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None) 3042 b = self.BlockingIOError(1, "") 3043 self.assertEqual(b.characters_written, 0) 3044 class C(unicode): 3045 pass 3046 c = C("") 3047 b = self.BlockingIOError(1, c) 3048 c.b = b 3049 b.c = c 3050 wr = weakref.ref(c) 3051 del c, b 3052 support.gc_collect() 3053 self.assertIsNone(wr(), wr) 3054 3055 def test_abcs(self): 3056 # Test the visible base classes are ABCs. 3057 self.assertIsInstance(self.IOBase, abc.ABCMeta) 3058 self.assertIsInstance(self.RawIOBase, abc.ABCMeta) 3059 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta) 3060 self.assertIsInstance(self.TextIOBase, abc.ABCMeta) 3061 3062 def _check_abc_inheritance(self, abcmodule): 3063 with self.open(support.TESTFN, "wb", buffering=0) as f: 3064 self.assertIsInstance(f, abcmodule.IOBase) 3065 self.assertIsInstance(f, abcmodule.RawIOBase) 3066 self.assertNotIsInstance(f, abcmodule.BufferedIOBase) 3067 self.assertNotIsInstance(f, abcmodule.TextIOBase) 3068 with self.open(support.TESTFN, "wb") as f: 3069 self.assertIsInstance(f, abcmodule.IOBase) 3070 self.assertNotIsInstance(f, abcmodule.RawIOBase) 3071 self.assertIsInstance(f, abcmodule.BufferedIOBase) 3072 self.assertNotIsInstance(f, abcmodule.TextIOBase) 3073 with self.open(support.TESTFN, "w") as f: 3074 self.assertIsInstance(f, abcmodule.IOBase) 3075 self.assertNotIsInstance(f, abcmodule.RawIOBase) 3076 self.assertNotIsInstance(f, abcmodule.BufferedIOBase) 3077 self.assertIsInstance(f, abcmodule.TextIOBase) 3078 3079 def test_abc_inheritance(self): 3080 # Test implementations inherit from their respective ABCs 3081 self._check_abc_inheritance(self) 3082 3083 def test_abc_inheritance_official(self): 3084 # Test implementations inherit from the official ABCs of the 3085 # baseline "io" module. 3086 self._check_abc_inheritance(io) 3087 3088 @unittest.skipUnless(fcntl, 'fcntl required for this test') 3089 def test_nonblock_pipe_write_bigbuf(self): 3090 self._test_nonblock_pipe_write(16*1024) 3091 3092 @unittest.skipUnless(fcntl, 'fcntl required for this test') 3093 def test_nonblock_pipe_write_smallbuf(self): 3094 self._test_nonblock_pipe_write(1024) 3095 3096 def _set_non_blocking(self, fd): 3097 flags = fcntl.fcntl(fd, fcntl.F_GETFL) 3098 self.assertNotEqual(flags, -1) 3099 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) 3100 self.assertEqual(res, 0) 3101 3102 def _test_nonblock_pipe_write(self, bufsize): 3103 sent = [] 3104 received = [] 3105 r, w = os.pipe() 3106 self._set_non_blocking(r) 3107 self._set_non_blocking(w) 3108 3109 # To exercise all code paths in the C implementation we need 3110 # to play with buffer sizes. For instance, if we choose a 3111 # buffer size less than or equal to _PIPE_BUF (4096 on Linux) 3112 # then we will never get a partial write of the buffer. 3113 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize) 3114 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize) 3115 3116 with rf, wf: 3117 for N in 9999, 73, 7574: 3118 try: 3119 i = 0 3120 while True: 3121 msg = bytes([i % 26 + 97]) * N 3122 sent.append(msg) 3123 wf.write(msg) 3124 i += 1 3125 3126 except self.BlockingIOError as e: 3127 self.assertEqual(e.args[0], errno.EAGAIN) 3128 sent[-1] = sent[-1][:e.characters_written] 3129 received.append(rf.read()) 3130 msg = b'BLOCKED' 3131 wf.write(msg) 3132 sent.append(msg) 3133 3134 while True: 3135 try: 3136 wf.flush() 3137 break 3138 except self.BlockingIOError as e: 3139 self.assertEqual(e.args[0], errno.EAGAIN) 3140 self.assertEqual(e.characters_written, 0) 3141 received.append(rf.read()) 3142 3143 received += iter(rf.read, None) 3144 3145 sent, received = b''.join(sent), b''.join(received) 3146 self.assertEqual(sent, received) 3147 self.assertTrue(wf.closed) 3148 self.assertTrue(rf.closed) 3149 3150class CMiscIOTest(MiscIOTest): 3151 io = io 3152 shutdown_error = "RuntimeError: could not find io module state" 3153 3154class PyMiscIOTest(MiscIOTest): 3155 io = pyio 3156 shutdown_error = "LookupError: unknown encoding: ascii" 3157 3158 3159@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.') 3160class SignalsTest(unittest.TestCase): 3161 3162 def setUp(self): 3163 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt) 3164 3165 def tearDown(self): 3166 signal.signal(signal.SIGALRM, self.oldalrm) 3167 3168 def alarm_interrupt(self, sig, frame): 3169 1 // 0 3170 3171 @unittest.skipUnless(threading, 'Threading required for this test.') 3172 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'), 3173 'issue #12429: skip test on FreeBSD <= 7') 3174 def check_interrupted_write(self, item, bytes, **fdopen_kwargs): 3175 """Check that a partial write, when it gets interrupted, properly 3176 invokes the signal handler, and bubbles up the exception raised 3177 in the latter.""" 3178 read_results = [] 3179 def _read(): 3180 s = os.read(r, 1) 3181 read_results.append(s) 3182 t = threading.Thread(target=_read) 3183 t.daemon = True 3184 r, w = os.pipe() 3185 try: 3186 wio = self.io.open(w, **fdopen_kwargs) 3187 t.start() 3188 # Fill the pipe enough that the write will be blocking. 3189 # It will be interrupted by the timer armed above. Since the 3190 # other thread has read one byte, the low-level write will 3191 # return with a successful (partial) result rather than an EINTR. 3192 # The buffered IO layer must check for pending signal 3193 # handlers, which in this case will invoke alarm_interrupt(). 3194 try: 3195 signal.alarm(1) 3196 with self.assertRaises(ZeroDivisionError): 3197 wio.write(item * (support.PIPE_MAX_SIZE // len(item) + 1)) 3198 finally: 3199 signal.alarm(0) 3200 t.join() 3201 3202 # We got one byte, get another one and check that it isn't a 3203 # repeat of the first one. 3204 read_results.append(os.read(r, 1)) 3205 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]]) 3206 finally: 3207 os.close(w) 3208 os.close(r) 3209 # This is deliberate. If we didn't close the file descriptor 3210 # before closing wio, wio would try to flush its internal 3211 # buffer, and block again. 3212 try: 3213 wio.close() 3214 except IOError as e: 3215 if e.errno != errno.EBADF: 3216 raise 3217 3218 def test_interrupted_write_unbuffered(self): 3219 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0) 3220 3221 def test_interrupted_write_buffered(self): 3222 self.check_interrupted_write(b"xy", b"xy", mode="wb") 3223 3224 def test_interrupted_write_text(self): 3225 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii") 3226 3227 def check_reentrant_write(self, data, **fdopen_kwargs): 3228 def on_alarm(*args): 3229 # Will be called reentrantly from the same thread 3230 wio.write(data) 3231 1//0 3232 signal.signal(signal.SIGALRM, on_alarm) 3233 r, w = os.pipe() 3234 wio = self.io.open(w, **fdopen_kwargs) 3235 try: 3236 signal.alarm(1) 3237 # Either the reentrant call to wio.write() fails with RuntimeError, 3238 # or the signal handler raises ZeroDivisionError. 3239 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm: 3240 while 1: 3241 for i in range(100): 3242 wio.write(data) 3243 wio.flush() 3244 # Make sure the buffer doesn't fill up and block further writes 3245 os.read(r, len(data) * 100) 3246 exc = cm.exception 3247 if isinstance(exc, RuntimeError): 3248 self.assertTrue(str(exc).startswith("reentrant call"), str(exc)) 3249 finally: 3250 signal.alarm(0) 3251 wio.close() 3252 os.close(r) 3253 3254 def test_reentrant_write_buffered(self): 3255 self.check_reentrant_write(b"xy", mode="wb") 3256 3257 def test_reentrant_write_text(self): 3258 self.check_reentrant_write("xy", mode="w", encoding="ascii") 3259 3260 def check_interrupted_read_retry(self, decode, **fdopen_kwargs): 3261 """Check that a buffered read, when it gets interrupted (either 3262 returning a partial result or EINTR), properly invokes the signal 3263 handler and retries if the latter returned successfully.""" 3264 r, w = os.pipe() 3265 fdopen_kwargs["closefd"] = False 3266 def alarm_handler(sig, frame): 3267 os.write(w, b"bar") 3268 signal.signal(signal.SIGALRM, alarm_handler) 3269 try: 3270 rio = self.io.open(r, **fdopen_kwargs) 3271 os.write(w, b"foo") 3272 signal.alarm(1) 3273 # Expected behaviour: 3274 # - first raw read() returns partial b"foo" 3275 # - second raw read() returns EINTR 3276 # - third raw read() returns b"bar" 3277 self.assertEqual(decode(rio.read(6)), "foobar") 3278 finally: 3279 signal.alarm(0) 3280 rio.close() 3281 os.close(w) 3282 os.close(r) 3283 3284 def test_interrupterd_read_retry_buffered(self): 3285 self.check_interrupted_read_retry(lambda x: x.decode('latin1'), 3286 mode="rb") 3287 3288 def test_interrupterd_read_retry_text(self): 3289 self.check_interrupted_read_retry(lambda x: x, 3290 mode="r") 3291 3292 @unittest.skipUnless(threading, 'Threading required for this test.') 3293 def check_interrupted_write_retry(self, item, **fdopen_kwargs): 3294 """Check that a buffered write, when it gets interrupted (either 3295 returning a partial result or EINTR), properly invokes the signal 3296 handler and retries if the latter returned successfully.""" 3297 select = support.import_module("select") 3298 # A quantity that exceeds the buffer size of an anonymous pipe's 3299 # write end. 3300 N = support.PIPE_MAX_SIZE 3301 r, w = os.pipe() 3302 fdopen_kwargs["closefd"] = False 3303 # We need a separate thread to read from the pipe and allow the 3304 # write() to finish. This thread is started after the SIGALRM is 3305 # received (forcing a first EINTR in write()). 3306 read_results = [] 3307 write_finished = False 3308 error = [None] 3309 def _read(): 3310 try: 3311 while not write_finished: 3312 while r in select.select([r], [], [], 1.0)[0]: 3313 s = os.read(r, 1024) 3314 read_results.append(s) 3315 except BaseException as exc: 3316 error[0] = exc 3317 t = threading.Thread(target=_read) 3318 t.daemon = True 3319 def alarm1(sig, frame): 3320 signal.signal(signal.SIGALRM, alarm2) 3321 signal.alarm(1) 3322 def alarm2(sig, frame): 3323 t.start() 3324 signal.signal(signal.SIGALRM, alarm1) 3325 try: 3326 wio = self.io.open(w, **fdopen_kwargs) 3327 signal.alarm(1) 3328 # Expected behaviour: 3329 # - first raw write() is partial (because of the limited pipe buffer 3330 # and the first alarm) 3331 # - second raw write() returns EINTR (because of the second alarm) 3332 # - subsequent write()s are successful (either partial or complete) 3333 self.assertEqual(N, wio.write(item * N)) 3334 wio.flush() 3335 write_finished = True 3336 t.join() 3337 3338 self.assertIsNone(error[0]) 3339 self.assertEqual(N, sum(len(x) for x in read_results)) 3340 finally: 3341 signal.alarm(0) 3342 write_finished = True 3343 os.close(w) 3344 os.close(r) 3345 # This is deliberate. If we didn't close the file descriptor 3346 # before closing wio, wio would try to flush its internal 3347 # buffer, and could block (in case of failure). 3348 try: 3349 wio.close() 3350 except IOError as e: 3351 if e.errno != errno.EBADF: 3352 raise 3353 3354 def test_interrupterd_write_retry_buffered(self): 3355 self.check_interrupted_write_retry(b"x", mode="wb") 3356 3357 def test_interrupterd_write_retry_text(self): 3358 self.check_interrupted_write_retry("x", mode="w", encoding="latin1") 3359 3360 3361class CSignalsTest(SignalsTest): 3362 io = io 3363 3364class PySignalsTest(SignalsTest): 3365 io = pyio 3366 3367 # Handling reentrancy issues would slow down _pyio even more, so the 3368 # tests are disabled. 3369 test_reentrant_write_buffered = None 3370 test_reentrant_write_text = None 3371 3372 3373def test_main(): 3374 tests = (CIOTest, PyIOTest, 3375 CBufferedReaderTest, PyBufferedReaderTest, 3376 CBufferedWriterTest, PyBufferedWriterTest, 3377 CBufferedRWPairTest, PyBufferedRWPairTest, 3378 CBufferedRandomTest, PyBufferedRandomTest, 3379 StatefulIncrementalDecoderTest, 3380 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest, 3381 CTextIOWrapperTest, PyTextIOWrapperTest, 3382 CMiscIOTest, PyMiscIOTest, 3383 CSignalsTest, PySignalsTest, 3384 ) 3385 3386 # Put the namespaces of the IO module we are testing and some useful mock 3387 # classes in the __dict__ of each test. 3388 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO, 3389 MockNonBlockWriterIO, MockRawIOWithoutRead) 3390 all_members = io.__all__ + ["IncrementalNewlineDecoder"] 3391 c_io_ns = dict((name, getattr(io, name)) for name in all_members) 3392 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members) 3393 globs = globals() 3394 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks) 3395 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks) 3396 # Avoid turning open into a bound method. 3397 py_io_ns["open"] = pyio.OpenWrapper 3398 for test in tests: 3399 if test.__name__.startswith("C"): 3400 for name, obj in c_io_ns.items(): 3401 setattr(test, name, obj) 3402 elif test.__name__.startswith("Py"): 3403 for name, obj in py_io_ns.items(): 3404 setattr(test, name, obj) 3405 3406 support.run_unittest(*tests) 3407 3408if __name__ == "__main__": 3409 test_main() 3410