1"""Unit tests for the io module.""" 2 3# Tests of io are scattered over the test suite: 4# * test_bufio - tests file buffering 5# * test_memoryio - tests BytesIO and StringIO 6# * test_fileio - tests FileIO 7# * test_file - tests the file interface 8# * test_io - tests everything else in the io module 9# * test_univnewlines - tests universal newline support 10# * test_largefile - tests operations on a file greater than 2**32 bytes 11# (only enabled with -ulargefile) 12 13################################################################################ 14# ATTENTION TEST WRITERS!!! 15################################################################################ 16# When writing tests for io, it's important to test both the C and Python 17# implementations. This is usually done by writing a base test that refers to 18# the type it is testing as an attribute. Then it provides custom subclasses to 19# test both implementations. This file has lots of examples. 20################################################################################ 21 22import abc 23import array 24import errno 25import locale 26import os 27import pickle 28import random 29import signal 30import sys 31import sysconfig 32import threading 33import time 34import unittest 35import warnings 36import weakref 37from collections import deque, UserList 38from itertools import cycle, count 39from test import support 40from test.support.script_helper import assert_python_ok, run_python_until_end 41from test.support import FakePath 42 43import codecs 44import io # C implementation of io 45import _pyio as pyio # Python implementation of io 46 47try: 48 import ctypes 49except ImportError: 50 def byteslike(*pos, **kw): 51 return array.array("b", bytes(*pos, **kw)) 52else: 53 def byteslike(*pos, **kw): 54 """Create a bytes-like object having no string or sequence methods""" 55 data = bytes(*pos, **kw) 56 obj = EmptyStruct() 57 ctypes.resize(obj, len(data)) 58 memoryview(obj).cast("B")[:] = data 59 return obj 60 class EmptyStruct(ctypes.Structure): 61 pass 62 63_cflags = sysconfig.get_config_var('CFLAGS') or '' 64_config_args = sysconfig.get_config_var('CONFIG_ARGS') or '' 65MEMORY_SANITIZER = ( 66 '-fsanitize=memory' in _cflags or 67 '--with-memory-sanitizer' in _config_args 68) 69 70def _default_chunk_size(): 71 """Get the default TextIOWrapper chunk size""" 72 with open(__file__, "r", encoding="latin-1") as f: 73 return f._CHUNK_SIZE 74 75 76class MockRawIOWithoutRead: 77 """A RawIO implementation without read(), so as to exercise the default 78 RawIO.read() which calls readinto().""" 79 80 def __init__(self, read_stack=()): 81 self._read_stack = list(read_stack) 82 self._write_stack = [] 83 self._reads = 0 84 self._extraneous_reads = 0 85 86 def write(self, b): 87 self._write_stack.append(bytes(b)) 88 return len(b) 89 90 def writable(self): 91 return True 92 93 def fileno(self): 94 return 42 95 96 def readable(self): 97 return True 98 99 def seekable(self): 100 return True 101 102 def seek(self, pos, whence): 103 return 0 # wrong but we gotta return something 104 105 def tell(self): 106 return 0 # same comment as above 107 108 def readinto(self, buf): 109 self._reads += 1 110 max_len = len(buf) 111 try: 112 data = self._read_stack[0] 113 except IndexError: 114 self._extraneous_reads += 1 115 return 0 116 if data is None: 117 del self._read_stack[0] 118 return None 119 n = len(data) 120 if len(data) <= max_len: 121 del self._read_stack[0] 122 buf[:n] = data 123 return n 124 else: 125 buf[:] = data[:max_len] 126 self._read_stack[0] = data[max_len:] 127 return max_len 128 129 def truncate(self, pos=None): 130 return pos 131 132class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase): 133 pass 134 135class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase): 136 pass 137 138 139class MockRawIO(MockRawIOWithoutRead): 140 141 def read(self, n=None): 142 self._reads += 1 143 try: 144 return self._read_stack.pop(0) 145 except: 146 self._extraneous_reads += 1 147 return b"" 148 149class CMockRawIO(MockRawIO, io.RawIOBase): 150 pass 151 152class PyMockRawIO(MockRawIO, pyio.RawIOBase): 153 pass 154 155 156class MisbehavedRawIO(MockRawIO): 157 def write(self, b): 158 return super().write(b) * 2 159 160 def read(self, n=None): 161 return super().read(n) * 2 162 163 def seek(self, pos, whence): 164 return -123 165 166 def tell(self): 167 return -456 168 169 def readinto(self, buf): 170 super().readinto(buf) 171 return len(buf) * 5 172 173class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase): 174 pass 175 176class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase): 177 pass 178 179 180class SlowFlushRawIO(MockRawIO): 181 def __init__(self): 182 super().__init__() 183 self.in_flush = threading.Event() 184 185 def flush(self): 186 self.in_flush.set() 187 time.sleep(0.25) 188 189class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase): 190 pass 191 192class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase): 193 pass 194 195 196class CloseFailureIO(MockRawIO): 197 closed = 0 198 199 def close(self): 200 if not self.closed: 201 self.closed = 1 202 raise OSError 203 204class CCloseFailureIO(CloseFailureIO, io.RawIOBase): 205 pass 206 207class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase): 208 pass 209 210 211class MockFileIO: 212 213 def __init__(self, data): 214 self.read_history = [] 215 super().__init__(data) 216 217 def read(self, n=None): 218 res = super().read(n) 219 self.read_history.append(None if res is None else len(res)) 220 return res 221 222 def readinto(self, b): 223 res = super().readinto(b) 224 self.read_history.append(res) 225 return res 226 227class CMockFileIO(MockFileIO, io.BytesIO): 228 pass 229 230class PyMockFileIO(MockFileIO, pyio.BytesIO): 231 pass 232 233 234class MockUnseekableIO: 235 def seekable(self): 236 return False 237 238 def seek(self, *args): 239 raise self.UnsupportedOperation("not seekable") 240 241 def tell(self, *args): 242 raise self.UnsupportedOperation("not seekable") 243 244 def truncate(self, *args): 245 raise self.UnsupportedOperation("not seekable") 246 247class CMockUnseekableIO(MockUnseekableIO, io.BytesIO): 248 UnsupportedOperation = io.UnsupportedOperation 249 250class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO): 251 UnsupportedOperation = pyio.UnsupportedOperation 252 253 254class MockNonBlockWriterIO: 255 256 def __init__(self): 257 self._write_stack = [] 258 self._blocker_char = None 259 260 def pop_written(self): 261 s = b"".join(self._write_stack) 262 self._write_stack[:] = [] 263 return s 264 265 def block_on(self, char): 266 """Block when a given char is encountered.""" 267 self._blocker_char = char 268 269 def readable(self): 270 return True 271 272 def seekable(self): 273 return True 274 275 def writable(self): 276 return True 277 278 def write(self, b): 279 b = bytes(b) 280 n = -1 281 if self._blocker_char: 282 try: 283 n = b.index(self._blocker_char) 284 except ValueError: 285 pass 286 else: 287 if n > 0: 288 # write data up to the first blocker 289 self._write_stack.append(b[:n]) 290 return n 291 else: 292 # cancel blocker and indicate would block 293 self._blocker_char = None 294 return None 295 self._write_stack.append(b) 296 return len(b) 297 298class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase): 299 BlockingIOError = io.BlockingIOError 300 301class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase): 302 BlockingIOError = pyio.BlockingIOError 303 304 305class IOTest(unittest.TestCase): 306 307 def setUp(self): 308 support.unlink(support.TESTFN) 309 310 def tearDown(self): 311 support.unlink(support.TESTFN) 312 313 def write_ops(self, f): 314 self.assertEqual(f.write(b"blah."), 5) 315 f.truncate(0) 316 self.assertEqual(f.tell(), 5) 317 f.seek(0) 318 319 self.assertEqual(f.write(b"blah."), 5) 320 self.assertEqual(f.seek(0), 0) 321 self.assertEqual(f.write(b"Hello."), 6) 322 self.assertEqual(f.tell(), 6) 323 self.assertEqual(f.seek(-1, 1), 5) 324 self.assertEqual(f.tell(), 5) 325 buffer = bytearray(b" world\n\n\n") 326 self.assertEqual(f.write(buffer), 9) 327 buffer[:] = b"*" * 9 # Overwrite our copy of the data 328 self.assertEqual(f.seek(0), 0) 329 self.assertEqual(f.write(b"h"), 1) 330 self.assertEqual(f.seek(-1, 2), 13) 331 self.assertEqual(f.tell(), 13) 332 333 self.assertEqual(f.truncate(12), 12) 334 self.assertEqual(f.tell(), 13) 335 self.assertRaises(TypeError, f.seek, 0.0) 336 337 def read_ops(self, f, buffered=False): 338 data = f.read(5) 339 self.assertEqual(data, b"hello") 340 data = byteslike(data) 341 self.assertEqual(f.readinto(data), 5) 342 self.assertEqual(bytes(data), b" worl") 343 data = bytearray(5) 344 self.assertEqual(f.readinto(data), 2) 345 self.assertEqual(len(data), 5) 346 self.assertEqual(data[:2], b"d\n") 347 self.assertEqual(f.seek(0), 0) 348 self.assertEqual(f.read(20), b"hello world\n") 349 self.assertEqual(f.read(1), b"") 350 self.assertEqual(f.readinto(byteslike(b"x")), 0) 351 self.assertEqual(f.seek(-6, 2), 6) 352 self.assertEqual(f.read(5), b"world") 353 self.assertEqual(f.read(0), b"") 354 self.assertEqual(f.readinto(byteslike()), 0) 355 self.assertEqual(f.seek(-6, 1), 5) 356 self.assertEqual(f.read(5), b" worl") 357 self.assertEqual(f.tell(), 10) 358 self.assertRaises(TypeError, f.seek, 0.0) 359 if buffered: 360 f.seek(0) 361 self.assertEqual(f.read(), b"hello world\n") 362 f.seek(6) 363 self.assertEqual(f.read(), b"world\n") 364 self.assertEqual(f.read(), b"") 365 f.seek(0) 366 data = byteslike(5) 367 self.assertEqual(f.readinto1(data), 5) 368 self.assertEqual(bytes(data), b"hello") 369 370 LARGE = 2**31 371 372 def large_file_ops(self, f): 373 assert f.readable() 374 assert f.writable() 375 try: 376 self.assertEqual(f.seek(self.LARGE), self.LARGE) 377 except (OverflowError, ValueError): 378 self.skipTest("no largefile support") 379 self.assertEqual(f.tell(), self.LARGE) 380 self.assertEqual(f.write(b"xxx"), 3) 381 self.assertEqual(f.tell(), self.LARGE + 3) 382 self.assertEqual(f.seek(-1, 1), self.LARGE + 2) 383 self.assertEqual(f.truncate(), self.LARGE + 2) 384 self.assertEqual(f.tell(), self.LARGE + 2) 385 self.assertEqual(f.seek(0, 2), self.LARGE + 2) 386 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1) 387 self.assertEqual(f.tell(), self.LARGE + 2) 388 self.assertEqual(f.seek(0, 2), self.LARGE + 1) 389 self.assertEqual(f.seek(-1, 2), self.LARGE) 390 self.assertEqual(f.read(2), b"x") 391 392 def test_invalid_operations(self): 393 # Try writing on a file opened in read mode and vice-versa. 394 exc = self.UnsupportedOperation 395 for mode in ("w", "wb"): 396 with self.open(support.TESTFN, mode) as fp: 397 self.assertRaises(exc, fp.read) 398 self.assertRaises(exc, fp.readline) 399 with self.open(support.TESTFN, "wb", buffering=0) as fp: 400 self.assertRaises(exc, fp.read) 401 self.assertRaises(exc, fp.readline) 402 with self.open(support.TESTFN, "rb", buffering=0) as fp: 403 self.assertRaises(exc, fp.write, b"blah") 404 self.assertRaises(exc, fp.writelines, [b"blah\n"]) 405 with self.open(support.TESTFN, "rb") as fp: 406 self.assertRaises(exc, fp.write, b"blah") 407 self.assertRaises(exc, fp.writelines, [b"blah\n"]) 408 with self.open(support.TESTFN, "r") as fp: 409 self.assertRaises(exc, fp.write, "blah") 410 self.assertRaises(exc, fp.writelines, ["blah\n"]) 411 # Non-zero seeking from current or end pos 412 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR) 413 self.assertRaises(exc, fp.seek, -1, self.SEEK_END) 414 415 def test_optional_abilities(self): 416 # Test for OSError when optional APIs are not supported 417 # The purpose of this test is to try fileno(), reading, writing and 418 # seeking operations with various objects that indicate they do not 419 # support these operations. 420 421 def pipe_reader(): 422 [r, w] = os.pipe() 423 os.close(w) # So that read() is harmless 424 return self.FileIO(r, "r") 425 426 def pipe_writer(): 427 [r, w] = os.pipe() 428 self.addCleanup(os.close, r) 429 # Guarantee that we can write into the pipe without blocking 430 thread = threading.Thread(target=os.read, args=(r, 100)) 431 thread.start() 432 self.addCleanup(thread.join) 433 return self.FileIO(w, "w") 434 435 def buffered_reader(): 436 return self.BufferedReader(self.MockUnseekableIO()) 437 438 def buffered_writer(): 439 return self.BufferedWriter(self.MockUnseekableIO()) 440 441 def buffered_random(): 442 return self.BufferedRandom(self.BytesIO()) 443 444 def buffered_rw_pair(): 445 return self.BufferedRWPair(self.MockUnseekableIO(), 446 self.MockUnseekableIO()) 447 448 def text_reader(): 449 class UnseekableReader(self.MockUnseekableIO): 450 writable = self.BufferedIOBase.writable 451 write = self.BufferedIOBase.write 452 return self.TextIOWrapper(UnseekableReader(), "ascii") 453 454 def text_writer(): 455 class UnseekableWriter(self.MockUnseekableIO): 456 readable = self.BufferedIOBase.readable 457 read = self.BufferedIOBase.read 458 return self.TextIOWrapper(UnseekableWriter(), "ascii") 459 460 tests = ( 461 (pipe_reader, "fr"), (pipe_writer, "fw"), 462 (buffered_reader, "r"), (buffered_writer, "w"), 463 (buffered_random, "rws"), (buffered_rw_pair, "rw"), 464 (text_reader, "r"), (text_writer, "w"), 465 (self.BytesIO, "rws"), (self.StringIO, "rws"), 466 ) 467 for [test, abilities] in tests: 468 with self.subTest(test), test() as obj: 469 readable = "r" in abilities 470 self.assertEqual(obj.readable(), readable) 471 writable = "w" in abilities 472 self.assertEqual(obj.writable(), writable) 473 474 if isinstance(obj, self.TextIOBase): 475 data = "3" 476 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)): 477 data = b"3" 478 else: 479 self.fail("Unknown base class") 480 481 if "f" in abilities: 482 obj.fileno() 483 else: 484 self.assertRaises(OSError, obj.fileno) 485 486 if readable: 487 obj.read(1) 488 obj.read() 489 else: 490 self.assertRaises(OSError, obj.read, 1) 491 self.assertRaises(OSError, obj.read) 492 493 if writable: 494 obj.write(data) 495 else: 496 self.assertRaises(OSError, obj.write, data) 497 498 if sys.platform.startswith("win") and test in ( 499 pipe_reader, pipe_writer): 500 # Pipes seem to appear as seekable on Windows 501 continue 502 seekable = "s" in abilities 503 self.assertEqual(obj.seekable(), seekable) 504 505 if seekable: 506 obj.tell() 507 obj.seek(0) 508 else: 509 self.assertRaises(OSError, obj.tell) 510 self.assertRaises(OSError, obj.seek, 0) 511 512 if writable and seekable: 513 obj.truncate() 514 obj.truncate(0) 515 else: 516 self.assertRaises(OSError, obj.truncate) 517 self.assertRaises(OSError, obj.truncate, 0) 518 519 def test_open_handles_NUL_chars(self): 520 fn_with_NUL = 'foo\0bar' 521 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w') 522 523 bytes_fn = bytes(fn_with_NUL, 'ascii') 524 with warnings.catch_warnings(): 525 warnings.simplefilter("ignore", DeprecationWarning) 526 self.assertRaises(ValueError, self.open, bytes_fn, 'w') 527 528 def test_raw_file_io(self): 529 with self.open(support.TESTFN, "wb", buffering=0) as f: 530 self.assertEqual(f.readable(), False) 531 self.assertEqual(f.writable(), True) 532 self.assertEqual(f.seekable(), True) 533 self.write_ops(f) 534 with self.open(support.TESTFN, "rb", buffering=0) as f: 535 self.assertEqual(f.readable(), True) 536 self.assertEqual(f.writable(), False) 537 self.assertEqual(f.seekable(), True) 538 self.read_ops(f) 539 540 def test_buffered_file_io(self): 541 with self.open(support.TESTFN, "wb") as f: 542 self.assertEqual(f.readable(), False) 543 self.assertEqual(f.writable(), True) 544 self.assertEqual(f.seekable(), True) 545 self.write_ops(f) 546 with self.open(support.TESTFN, "rb") as f: 547 self.assertEqual(f.readable(), True) 548 self.assertEqual(f.writable(), False) 549 self.assertEqual(f.seekable(), True) 550 self.read_ops(f, True) 551 552 def test_readline(self): 553 with self.open(support.TESTFN, "wb") as f: 554 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line") 555 with self.open(support.TESTFN, "rb") as f: 556 self.assertEqual(f.readline(), b"abc\n") 557 self.assertEqual(f.readline(10), b"def\n") 558 self.assertEqual(f.readline(2), b"xy") 559 self.assertEqual(f.readline(4), b"zzy\n") 560 self.assertEqual(f.readline(), b"foo\x00bar\n") 561 self.assertEqual(f.readline(None), b"another line") 562 self.assertRaises(TypeError, f.readline, 5.3) 563 with self.open(support.TESTFN, "r") as f: 564 self.assertRaises(TypeError, f.readline, 5.3) 565 566 def test_readline_nonsizeable(self): 567 # Issue #30061 568 # Crash when readline() returns an object without __len__ 569 class R(self.IOBase): 570 def readline(self): 571 return None 572 self.assertRaises((TypeError, StopIteration), next, R()) 573 574 def test_next_nonsizeable(self): 575 # Issue #30061 576 # Crash when __next__() returns an object without __len__ 577 class R(self.IOBase): 578 def __next__(self): 579 return None 580 self.assertRaises(TypeError, R().readlines, 1) 581 582 def test_raw_bytes_io(self): 583 f = self.BytesIO() 584 self.write_ops(f) 585 data = f.getvalue() 586 self.assertEqual(data, b"hello world\n") 587 f = self.BytesIO(data) 588 self.read_ops(f, True) 589 590 def test_large_file_ops(self): 591 # On Windows and Mac OSX this test consumes large resources; It takes 592 # a long time to build the >2 GiB file and takes >2 GiB of disk space 593 # therefore the resource must be enabled to run this test. 594 if sys.platform[:3] == 'win' or sys.platform == 'darwin': 595 support.requires( 596 'largefile', 597 'test requires %s bytes and a long time to run' % self.LARGE) 598 with self.open(support.TESTFN, "w+b", 0) as f: 599 self.large_file_ops(f) 600 with self.open(support.TESTFN, "w+b") as f: 601 self.large_file_ops(f) 602 603 def test_with_open(self): 604 for bufsize in (0, 1, 100): 605 f = None 606 with self.open(support.TESTFN, "wb", bufsize) as f: 607 f.write(b"xxx") 608 self.assertEqual(f.closed, True) 609 f = None 610 try: 611 with self.open(support.TESTFN, "wb", bufsize) as f: 612 1/0 613 except ZeroDivisionError: 614 self.assertEqual(f.closed, True) 615 else: 616 self.fail("1/0 didn't raise an exception") 617 618 # issue 5008 619 def test_append_mode_tell(self): 620 with self.open(support.TESTFN, "wb") as f: 621 f.write(b"xxx") 622 with self.open(support.TESTFN, "ab", buffering=0) as f: 623 self.assertEqual(f.tell(), 3) 624 with self.open(support.TESTFN, "ab") as f: 625 self.assertEqual(f.tell(), 3) 626 with self.open(support.TESTFN, "a") as f: 627 self.assertGreater(f.tell(), 0) 628 629 def test_destructor(self): 630 record = [] 631 class MyFileIO(self.FileIO): 632 def __del__(self): 633 record.append(1) 634 try: 635 f = super().__del__ 636 except AttributeError: 637 pass 638 else: 639 f() 640 def close(self): 641 record.append(2) 642 super().close() 643 def flush(self): 644 record.append(3) 645 super().flush() 646 with support.check_warnings(('', ResourceWarning)): 647 f = MyFileIO(support.TESTFN, "wb") 648 f.write(b"xxx") 649 del f 650 support.gc_collect() 651 self.assertEqual(record, [1, 2, 3]) 652 with self.open(support.TESTFN, "rb") as f: 653 self.assertEqual(f.read(), b"xxx") 654 655 def _check_base_destructor(self, base): 656 record = [] 657 class MyIO(base): 658 def __init__(self): 659 # This exercises the availability of attributes on object 660 # destruction. 661 # (in the C version, close() is called by the tp_dealloc 662 # function, not by __del__) 663 self.on_del = 1 664 self.on_close = 2 665 self.on_flush = 3 666 def __del__(self): 667 record.append(self.on_del) 668 try: 669 f = super().__del__ 670 except AttributeError: 671 pass 672 else: 673 f() 674 def close(self): 675 record.append(self.on_close) 676 super().close() 677 def flush(self): 678 record.append(self.on_flush) 679 super().flush() 680 f = MyIO() 681 del f 682 support.gc_collect() 683 self.assertEqual(record, [1, 2, 3]) 684 685 def test_IOBase_destructor(self): 686 self._check_base_destructor(self.IOBase) 687 688 def test_RawIOBase_destructor(self): 689 self._check_base_destructor(self.RawIOBase) 690 691 def test_BufferedIOBase_destructor(self): 692 self._check_base_destructor(self.BufferedIOBase) 693 694 def test_TextIOBase_destructor(self): 695 self._check_base_destructor(self.TextIOBase) 696 697 def test_close_flushes(self): 698 with self.open(support.TESTFN, "wb") as f: 699 f.write(b"xxx") 700 with self.open(support.TESTFN, "rb") as f: 701 self.assertEqual(f.read(), b"xxx") 702 703 def test_array_writes(self): 704 a = array.array('i', range(10)) 705 n = len(a.tobytes()) 706 def check(f): 707 with f: 708 self.assertEqual(f.write(a), n) 709 f.writelines((a,)) 710 check(self.BytesIO()) 711 check(self.FileIO(support.TESTFN, "w")) 712 check(self.BufferedWriter(self.MockRawIO())) 713 check(self.BufferedRandom(self.MockRawIO())) 714 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())) 715 716 def test_closefd(self): 717 self.assertRaises(ValueError, self.open, support.TESTFN, 'w', 718 closefd=False) 719 720 def test_read_closed(self): 721 with self.open(support.TESTFN, "w") as f: 722 f.write("egg\n") 723 with self.open(support.TESTFN, "r") as f: 724 file = self.open(f.fileno(), "r", closefd=False) 725 self.assertEqual(file.read(), "egg\n") 726 file.seek(0) 727 file.close() 728 self.assertRaises(ValueError, file.read) 729 with self.open(support.TESTFN, "rb") as f: 730 file = self.open(f.fileno(), "rb", closefd=False) 731 self.assertEqual(file.read()[:3], b"egg") 732 file.close() 733 self.assertRaises(ValueError, file.readinto, bytearray(1)) 734 735 def test_no_closefd_with_filename(self): 736 # can't use closefd in combination with a file name 737 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False) 738 739 def test_closefd_attr(self): 740 with self.open(support.TESTFN, "wb") as f: 741 f.write(b"egg\n") 742 with self.open(support.TESTFN, "r") as f: 743 self.assertEqual(f.buffer.raw.closefd, True) 744 file = self.open(f.fileno(), "r", closefd=False) 745 self.assertEqual(file.buffer.raw.closefd, False) 746 747 def test_garbage_collection(self): 748 # FileIO objects are collected, and collecting them flushes 749 # all data to disk. 750 with support.check_warnings(('', ResourceWarning)): 751 f = self.FileIO(support.TESTFN, "wb") 752 f.write(b"abcxxx") 753 f.f = f 754 wr = weakref.ref(f) 755 del f 756 support.gc_collect() 757 self.assertIsNone(wr(), wr) 758 with self.open(support.TESTFN, "rb") as f: 759 self.assertEqual(f.read(), b"abcxxx") 760 761 def test_unbounded_file(self): 762 # Issue #1174606: reading from an unbounded stream such as /dev/zero. 763 zero = "/dev/zero" 764 if not os.path.exists(zero): 765 self.skipTest("{0} does not exist".format(zero)) 766 if sys.maxsize > 0x7FFFFFFF: 767 self.skipTest("test can only run in a 32-bit address space") 768 if support.real_max_memuse < support._2G: 769 self.skipTest("test requires at least 2 GiB of memory") 770 with self.open(zero, "rb", buffering=0) as f: 771 self.assertRaises(OverflowError, f.read) 772 with self.open(zero, "rb") as f: 773 self.assertRaises(OverflowError, f.read) 774 with self.open(zero, "r") as f: 775 self.assertRaises(OverflowError, f.read) 776 777 def check_flush_error_on_close(self, *args, **kwargs): 778 # Test that the file is closed despite failed flush 779 # and that flush() is called before file closed. 780 f = self.open(*args, **kwargs) 781 closed = [] 782 def bad_flush(): 783 closed[:] = [f.closed] 784 raise OSError() 785 f.flush = bad_flush 786 self.assertRaises(OSError, f.close) # exception not swallowed 787 self.assertTrue(f.closed) 788 self.assertTrue(closed) # flush() called 789 self.assertFalse(closed[0]) # flush() called before file closed 790 f.flush = lambda: None # break reference loop 791 792 def test_flush_error_on_close(self): 793 # raw file 794 # Issue #5700: io.FileIO calls flush() after file closed 795 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0) 796 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 797 self.check_flush_error_on_close(fd, 'wb', buffering=0) 798 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 799 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False) 800 os.close(fd) 801 # buffered io 802 self.check_flush_error_on_close(support.TESTFN, 'wb') 803 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 804 self.check_flush_error_on_close(fd, 'wb') 805 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 806 self.check_flush_error_on_close(fd, 'wb', closefd=False) 807 os.close(fd) 808 # text io 809 self.check_flush_error_on_close(support.TESTFN, 'w') 810 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 811 self.check_flush_error_on_close(fd, 'w') 812 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 813 self.check_flush_error_on_close(fd, 'w', closefd=False) 814 os.close(fd) 815 816 def test_multi_close(self): 817 f = self.open(support.TESTFN, "wb", buffering=0) 818 f.close() 819 f.close() 820 f.close() 821 self.assertRaises(ValueError, f.flush) 822 823 def test_RawIOBase_read(self): 824 # Exercise the default limited RawIOBase.read(n) implementation (which 825 # calls readinto() internally). 826 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None)) 827 self.assertEqual(rawio.read(2), b"ab") 828 self.assertEqual(rawio.read(2), b"c") 829 self.assertEqual(rawio.read(2), b"d") 830 self.assertEqual(rawio.read(2), None) 831 self.assertEqual(rawio.read(2), b"ef") 832 self.assertEqual(rawio.read(2), b"g") 833 self.assertEqual(rawio.read(2), None) 834 self.assertEqual(rawio.read(2), b"") 835 836 def test_types_have_dict(self): 837 test = ( 838 self.IOBase(), 839 self.RawIOBase(), 840 self.TextIOBase(), 841 self.StringIO(), 842 self.BytesIO() 843 ) 844 for obj in test: 845 self.assertTrue(hasattr(obj, "__dict__")) 846 847 def test_opener(self): 848 with self.open(support.TESTFN, "w") as f: 849 f.write("egg\n") 850 fd = os.open(support.TESTFN, os.O_RDONLY) 851 def opener(path, flags): 852 return fd 853 with self.open("non-existent", "r", opener=opener) as f: 854 self.assertEqual(f.read(), "egg\n") 855 856 def test_bad_opener_negative_1(self): 857 # Issue #27066. 858 def badopener(fname, flags): 859 return -1 860 with self.assertRaises(ValueError) as cm: 861 open('non-existent', 'r', opener=badopener) 862 self.assertEqual(str(cm.exception), 'opener returned -1') 863 864 def test_bad_opener_other_negative(self): 865 # Issue #27066. 866 def badopener(fname, flags): 867 return -2 868 with self.assertRaises(ValueError) as cm: 869 open('non-existent', 'r', opener=badopener) 870 self.assertEqual(str(cm.exception), 'opener returned -2') 871 872 def test_fileio_closefd(self): 873 # Issue #4841 874 with self.open(__file__, 'rb') as f1, \ 875 self.open(__file__, 'rb') as f2: 876 fileio = self.FileIO(f1.fileno(), closefd=False) 877 # .__init__() must not close f1 878 fileio.__init__(f2.fileno(), closefd=False) 879 f1.readline() 880 # .close() must not close f2 881 fileio.close() 882 f2.readline() 883 884 def test_nonbuffered_textio(self): 885 with support.check_no_resource_warning(self): 886 with self.assertRaises(ValueError): 887 self.open(support.TESTFN, 'w', buffering=0) 888 889 def test_invalid_newline(self): 890 with support.check_no_resource_warning(self): 891 with self.assertRaises(ValueError): 892 self.open(support.TESTFN, 'w', newline='invalid') 893 894 def test_buffered_readinto_mixin(self): 895 # Test the implementation provided by BufferedIOBase 896 class Stream(self.BufferedIOBase): 897 def read(self, size): 898 return b"12345" 899 read1 = read 900 stream = Stream() 901 for method in ("readinto", "readinto1"): 902 with self.subTest(method): 903 buffer = byteslike(5) 904 self.assertEqual(getattr(stream, method)(buffer), 5) 905 self.assertEqual(bytes(buffer), b"12345") 906 907 def test_fspath_support(self): 908 def check_path_succeeds(path): 909 with self.open(path, "w") as f: 910 f.write("egg\n") 911 912 with self.open(path, "r") as f: 913 self.assertEqual(f.read(), "egg\n") 914 915 check_path_succeeds(FakePath(support.TESTFN)) 916 check_path_succeeds(FakePath(support.TESTFN.encode('utf-8'))) 917 918 with self.open(support.TESTFN, "w") as f: 919 bad_path = FakePath(f.fileno()) 920 with self.assertRaises(TypeError): 921 self.open(bad_path, 'w') 922 923 bad_path = FakePath(None) 924 with self.assertRaises(TypeError): 925 self.open(bad_path, 'w') 926 927 bad_path = FakePath(FloatingPointError) 928 with self.assertRaises(FloatingPointError): 929 self.open(bad_path, 'w') 930 931 # ensure that refcounting is correct with some error conditions 932 with self.assertRaisesRegex(ValueError, 'read/write/append mode'): 933 self.open(FakePath(support.TESTFN), 'rwxa') 934 935 def test_RawIOBase_readall(self): 936 # Exercise the default unlimited RawIOBase.read() and readall() 937 # implementations. 938 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg")) 939 self.assertEqual(rawio.read(), b"abcdefg") 940 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg")) 941 self.assertEqual(rawio.readall(), b"abcdefg") 942 943 def test_BufferedIOBase_readinto(self): 944 # Exercise the default BufferedIOBase.readinto() and readinto1() 945 # implementations (which call read() or read1() internally). 946 class Reader(self.BufferedIOBase): 947 def __init__(self, avail): 948 self.avail = avail 949 def read(self, size): 950 result = self.avail[:size] 951 self.avail = self.avail[size:] 952 return result 953 def read1(self, size): 954 """Returns no more than 5 bytes at once""" 955 return self.read(min(size, 5)) 956 tests = ( 957 # (test method, total data available, read buffer size, expected 958 # read size) 959 ("readinto", 10, 5, 5), 960 ("readinto", 10, 6, 6), # More than read1() can return 961 ("readinto", 5, 6, 5), # Buffer larger than total available 962 ("readinto", 6, 7, 6), 963 ("readinto", 10, 0, 0), # Empty buffer 964 ("readinto1", 10, 5, 5), # Result limited to single read1() call 965 ("readinto1", 10, 6, 5), # Buffer larger than read1() can return 966 ("readinto1", 5, 6, 5), # Buffer larger than total available 967 ("readinto1", 6, 7, 5), 968 ("readinto1", 10, 0, 0), # Empty buffer 969 ) 970 UNUSED_BYTE = 0x81 971 for test in tests: 972 with self.subTest(test): 973 method, avail, request, result = test 974 reader = Reader(bytes(range(avail))) 975 buffer = bytearray((UNUSED_BYTE,) * request) 976 method = getattr(reader, method) 977 self.assertEqual(method(buffer), result) 978 self.assertEqual(len(buffer), request) 979 self.assertSequenceEqual(buffer[:result], range(result)) 980 unused = (UNUSED_BYTE,) * (request - result) 981 self.assertSequenceEqual(buffer[result:], unused) 982 self.assertEqual(len(reader.avail), avail - result) 983 984 def test_close_assert(self): 985 class R(self.IOBase): 986 def __setattr__(self, name, value): 987 pass 988 def flush(self): 989 raise OSError() 990 f = R() 991 # This would cause an assertion failure. 992 self.assertRaises(OSError, f.close) 993 994 995class CIOTest(IOTest): 996 997 def test_IOBase_finalize(self): 998 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a 999 # class which inherits IOBase and an object of this class are caught 1000 # in a reference cycle and close() is already in the method cache. 1001 class MyIO(self.IOBase): 1002 def close(self): 1003 pass 1004 1005 # create an instance to populate the method cache 1006 MyIO() 1007 obj = MyIO() 1008 obj.obj = obj 1009 wr = weakref.ref(obj) 1010 del MyIO 1011 del obj 1012 support.gc_collect() 1013 self.assertIsNone(wr(), wr) 1014 1015class PyIOTest(IOTest): 1016 pass 1017 1018 1019@support.cpython_only 1020class APIMismatchTest(unittest.TestCase): 1021 1022 def test_RawIOBase_io_in_pyio_match(self): 1023 """Test that pyio RawIOBase class has all c RawIOBase methods""" 1024 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase, 1025 ignore=('__weakref__',)) 1026 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods') 1027 1028 def test_RawIOBase_pyio_in_io_match(self): 1029 """Test that c RawIOBase class has all pyio RawIOBase methods""" 1030 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase) 1031 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods') 1032 1033 1034class CommonBufferedTests: 1035 # Tests common to BufferedReader, BufferedWriter and BufferedRandom 1036 1037 def test_detach(self): 1038 raw = self.MockRawIO() 1039 buf = self.tp(raw) 1040 self.assertIs(buf.detach(), raw) 1041 self.assertRaises(ValueError, buf.detach) 1042 1043 repr(buf) # Should still work 1044 1045 def test_fileno(self): 1046 rawio = self.MockRawIO() 1047 bufio = self.tp(rawio) 1048 1049 self.assertEqual(42, bufio.fileno()) 1050 1051 def test_invalid_args(self): 1052 rawio = self.MockRawIO() 1053 bufio = self.tp(rawio) 1054 # Invalid whence 1055 self.assertRaises(ValueError, bufio.seek, 0, -1) 1056 self.assertRaises(ValueError, bufio.seek, 0, 9) 1057 1058 def test_override_destructor(self): 1059 tp = self.tp 1060 record = [] 1061 class MyBufferedIO(tp): 1062 def __del__(self): 1063 record.append(1) 1064 try: 1065 f = super().__del__ 1066 except AttributeError: 1067 pass 1068 else: 1069 f() 1070 def close(self): 1071 record.append(2) 1072 super().close() 1073 def flush(self): 1074 record.append(3) 1075 super().flush() 1076 rawio = self.MockRawIO() 1077 bufio = MyBufferedIO(rawio) 1078 del bufio 1079 support.gc_collect() 1080 self.assertEqual(record, [1, 2, 3]) 1081 1082 def test_context_manager(self): 1083 # Test usability as a context manager 1084 rawio = self.MockRawIO() 1085 bufio = self.tp(rawio) 1086 def _with(): 1087 with bufio: 1088 pass 1089 _with() 1090 # bufio should now be closed, and using it a second time should raise 1091 # a ValueError. 1092 self.assertRaises(ValueError, _with) 1093 1094 def test_error_through_destructor(self): 1095 # Test that the exception state is not modified by a destructor, 1096 # even if close() fails. 1097 rawio = self.CloseFailureIO() 1098 def f(): 1099 self.tp(rawio).xyzzy 1100 with support.captured_output("stderr") as s: 1101 self.assertRaises(AttributeError, f) 1102 s = s.getvalue().strip() 1103 if s: 1104 # The destructor *may* have printed an unraisable error, check it 1105 self.assertEqual(len(s.splitlines()), 1) 1106 self.assertTrue(s.startswith("Exception OSError: "), s) 1107 self.assertTrue(s.endswith(" ignored"), s) 1108 1109 def test_repr(self): 1110 raw = self.MockRawIO() 1111 b = self.tp(raw) 1112 clsname = r"(%s\.)?%s" % (self.tp.__module__, self.tp.__qualname__) 1113 self.assertRegex(repr(b), "<%s>" % clsname) 1114 raw.name = "dummy" 1115 self.assertRegex(repr(b), "<%s name='dummy'>" % clsname) 1116 raw.name = b"dummy" 1117 self.assertRegex(repr(b), "<%s name=b'dummy'>" % clsname) 1118 1119 def test_recursive_repr(self): 1120 # Issue #25455 1121 raw = self.MockRawIO() 1122 b = self.tp(raw) 1123 with support.swap_attr(raw, 'name', b): 1124 try: 1125 repr(b) # Should not crash 1126 except RuntimeError: 1127 pass 1128 1129 def test_flush_error_on_close(self): 1130 # Test that buffered file is closed despite failed flush 1131 # and that flush() is called before file closed. 1132 raw = self.MockRawIO() 1133 closed = [] 1134 def bad_flush(): 1135 closed[:] = [b.closed, raw.closed] 1136 raise OSError() 1137 raw.flush = bad_flush 1138 b = self.tp(raw) 1139 self.assertRaises(OSError, b.close) # exception not swallowed 1140 self.assertTrue(b.closed) 1141 self.assertTrue(raw.closed) 1142 self.assertTrue(closed) # flush() called 1143 self.assertFalse(closed[0]) # flush() called before file closed 1144 self.assertFalse(closed[1]) 1145 raw.flush = lambda: None # break reference loop 1146 1147 def test_close_error_on_close(self): 1148 raw = self.MockRawIO() 1149 def bad_flush(): 1150 raise OSError('flush') 1151 def bad_close(): 1152 raise OSError('close') 1153 raw.close = bad_close 1154 b = self.tp(raw) 1155 b.flush = bad_flush 1156 with self.assertRaises(OSError) as err: # exception not swallowed 1157 b.close() 1158 self.assertEqual(err.exception.args, ('close',)) 1159 self.assertIsInstance(err.exception.__context__, OSError) 1160 self.assertEqual(err.exception.__context__.args, ('flush',)) 1161 self.assertFalse(b.closed) 1162 1163 def test_nonnormalized_close_error_on_close(self): 1164 # Issue #21677 1165 raw = self.MockRawIO() 1166 def bad_flush(): 1167 raise non_existing_flush 1168 def bad_close(): 1169 raise non_existing_close 1170 raw.close = bad_close 1171 b = self.tp(raw) 1172 b.flush = bad_flush 1173 with self.assertRaises(NameError) as err: # exception not swallowed 1174 b.close() 1175 self.assertIn('non_existing_close', str(err.exception)) 1176 self.assertIsInstance(err.exception.__context__, NameError) 1177 self.assertIn('non_existing_flush', str(err.exception.__context__)) 1178 self.assertFalse(b.closed) 1179 1180 def test_multi_close(self): 1181 raw = self.MockRawIO() 1182 b = self.tp(raw) 1183 b.close() 1184 b.close() 1185 b.close() 1186 self.assertRaises(ValueError, b.flush) 1187 1188 def test_unseekable(self): 1189 bufio = self.tp(self.MockUnseekableIO(b"A" * 10)) 1190 self.assertRaises(self.UnsupportedOperation, bufio.tell) 1191 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0) 1192 1193 def test_readonly_attributes(self): 1194 raw = self.MockRawIO() 1195 buf = self.tp(raw) 1196 x = self.MockRawIO() 1197 with self.assertRaises(AttributeError): 1198 buf.raw = x 1199 1200 1201class SizeofTest: 1202 1203 @support.cpython_only 1204 def test_sizeof(self): 1205 bufsize1 = 4096 1206 bufsize2 = 8192 1207 rawio = self.MockRawIO() 1208 bufio = self.tp(rawio, buffer_size=bufsize1) 1209 size = sys.getsizeof(bufio) - bufsize1 1210 rawio = self.MockRawIO() 1211 bufio = self.tp(rawio, buffer_size=bufsize2) 1212 self.assertEqual(sys.getsizeof(bufio), size + bufsize2) 1213 1214 @support.cpython_only 1215 def test_buffer_freeing(self) : 1216 bufsize = 4096 1217 rawio = self.MockRawIO() 1218 bufio = self.tp(rawio, buffer_size=bufsize) 1219 size = sys.getsizeof(bufio) - bufsize 1220 bufio.close() 1221 self.assertEqual(sys.getsizeof(bufio), size) 1222 1223class BufferedReaderTest(unittest.TestCase, CommonBufferedTests): 1224 read_mode = "rb" 1225 1226 def test_constructor(self): 1227 rawio = self.MockRawIO([b"abc"]) 1228 bufio = self.tp(rawio) 1229 bufio.__init__(rawio) 1230 bufio.__init__(rawio, buffer_size=1024) 1231 bufio.__init__(rawio, buffer_size=16) 1232 self.assertEqual(b"abc", bufio.read()) 1233 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1234 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1235 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1236 rawio = self.MockRawIO([b"abc"]) 1237 bufio.__init__(rawio) 1238 self.assertEqual(b"abc", bufio.read()) 1239 1240 def test_uninitialized(self): 1241 bufio = self.tp.__new__(self.tp) 1242 del bufio 1243 bufio = self.tp.__new__(self.tp) 1244 self.assertRaisesRegex((ValueError, AttributeError), 1245 'uninitialized|has no attribute', 1246 bufio.read, 0) 1247 bufio.__init__(self.MockRawIO()) 1248 self.assertEqual(bufio.read(0), b'') 1249 1250 def test_read(self): 1251 for arg in (None, 7): 1252 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1253 bufio = self.tp(rawio) 1254 self.assertEqual(b"abcdefg", bufio.read(arg)) 1255 # Invalid args 1256 self.assertRaises(ValueError, bufio.read, -2) 1257 1258 def test_read1(self): 1259 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1260 bufio = self.tp(rawio) 1261 self.assertEqual(b"a", bufio.read(1)) 1262 self.assertEqual(b"b", bufio.read1(1)) 1263 self.assertEqual(rawio._reads, 1) 1264 self.assertEqual(b"", bufio.read1(0)) 1265 self.assertEqual(b"c", bufio.read1(100)) 1266 self.assertEqual(rawio._reads, 1) 1267 self.assertEqual(b"d", bufio.read1(100)) 1268 self.assertEqual(rawio._reads, 2) 1269 self.assertEqual(b"efg", bufio.read1(100)) 1270 self.assertEqual(rawio._reads, 3) 1271 self.assertEqual(b"", bufio.read1(100)) 1272 self.assertEqual(rawio._reads, 4) 1273 1274 def test_read1_arbitrary(self): 1275 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1276 bufio = self.tp(rawio) 1277 self.assertEqual(b"a", bufio.read(1)) 1278 self.assertEqual(b"bc", bufio.read1()) 1279 self.assertEqual(b"d", bufio.read1()) 1280 self.assertEqual(b"efg", bufio.read1(-1)) 1281 self.assertEqual(rawio._reads, 3) 1282 self.assertEqual(b"", bufio.read1()) 1283 self.assertEqual(rawio._reads, 4) 1284 1285 def test_readinto(self): 1286 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1287 bufio = self.tp(rawio) 1288 b = bytearray(2) 1289 self.assertEqual(bufio.readinto(b), 2) 1290 self.assertEqual(b, b"ab") 1291 self.assertEqual(bufio.readinto(b), 2) 1292 self.assertEqual(b, b"cd") 1293 self.assertEqual(bufio.readinto(b), 2) 1294 self.assertEqual(b, b"ef") 1295 self.assertEqual(bufio.readinto(b), 1) 1296 self.assertEqual(b, b"gf") 1297 self.assertEqual(bufio.readinto(b), 0) 1298 self.assertEqual(b, b"gf") 1299 rawio = self.MockRawIO((b"abc", None)) 1300 bufio = self.tp(rawio) 1301 self.assertEqual(bufio.readinto(b), 2) 1302 self.assertEqual(b, b"ab") 1303 self.assertEqual(bufio.readinto(b), 1) 1304 self.assertEqual(b, b"cb") 1305 1306 def test_readinto1(self): 1307 buffer_size = 10 1308 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl")) 1309 bufio = self.tp(rawio, buffer_size=buffer_size) 1310 b = bytearray(2) 1311 self.assertEqual(bufio.peek(3), b'abc') 1312 self.assertEqual(rawio._reads, 1) 1313 self.assertEqual(bufio.readinto1(b), 2) 1314 self.assertEqual(b, b"ab") 1315 self.assertEqual(rawio._reads, 1) 1316 self.assertEqual(bufio.readinto1(b), 1) 1317 self.assertEqual(b[:1], b"c") 1318 self.assertEqual(rawio._reads, 1) 1319 self.assertEqual(bufio.readinto1(b), 2) 1320 self.assertEqual(b, b"de") 1321 self.assertEqual(rawio._reads, 2) 1322 b = bytearray(2*buffer_size) 1323 self.assertEqual(bufio.peek(3), b'fgh') 1324 self.assertEqual(rawio._reads, 3) 1325 self.assertEqual(bufio.readinto1(b), 6) 1326 self.assertEqual(b[:6], b"fghjkl") 1327 self.assertEqual(rawio._reads, 4) 1328 1329 def test_readinto_array(self): 1330 buffer_size = 60 1331 data = b"a" * 26 1332 rawio = self.MockRawIO((data,)) 1333 bufio = self.tp(rawio, buffer_size=buffer_size) 1334 1335 # Create an array with element size > 1 byte 1336 b = array.array('i', b'x' * 32) 1337 assert len(b) != 16 1338 1339 # Read into it. We should get as many *bytes* as we can fit into b 1340 # (which is more than the number of elements) 1341 n = bufio.readinto(b) 1342 self.assertGreater(n, len(b)) 1343 1344 # Check that old contents of b are preserved 1345 bm = memoryview(b).cast('B') 1346 self.assertLess(n, len(bm)) 1347 self.assertEqual(bm[:n], data[:n]) 1348 self.assertEqual(bm[n:], b'x' * (len(bm[n:]))) 1349 1350 def test_readinto1_array(self): 1351 buffer_size = 60 1352 data = b"a" * 26 1353 rawio = self.MockRawIO((data,)) 1354 bufio = self.tp(rawio, buffer_size=buffer_size) 1355 1356 # Create an array with element size > 1 byte 1357 b = array.array('i', b'x' * 32) 1358 assert len(b) != 16 1359 1360 # Read into it. We should get as many *bytes* as we can fit into b 1361 # (which is more than the number of elements) 1362 n = bufio.readinto1(b) 1363 self.assertGreater(n, len(b)) 1364 1365 # Check that old contents of b are preserved 1366 bm = memoryview(b).cast('B') 1367 self.assertLess(n, len(bm)) 1368 self.assertEqual(bm[:n], data[:n]) 1369 self.assertEqual(bm[n:], b'x' * (len(bm[n:]))) 1370 1371 def test_readlines(self): 1372 def bufio(): 1373 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef")) 1374 return self.tp(rawio) 1375 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"]) 1376 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"]) 1377 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"]) 1378 1379 def test_buffering(self): 1380 data = b"abcdefghi" 1381 dlen = len(data) 1382 1383 tests = [ 1384 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ], 1385 [ 100, [ 3, 3, 3], [ dlen ] ], 1386 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ], 1387 ] 1388 1389 for bufsize, buf_read_sizes, raw_read_sizes in tests: 1390 rawio = self.MockFileIO(data) 1391 bufio = self.tp(rawio, buffer_size=bufsize) 1392 pos = 0 1393 for nbytes in buf_read_sizes: 1394 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes]) 1395 pos += nbytes 1396 # this is mildly implementation-dependent 1397 self.assertEqual(rawio.read_history, raw_read_sizes) 1398 1399 def test_read_non_blocking(self): 1400 # Inject some None's in there to simulate EWOULDBLOCK 1401 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None)) 1402 bufio = self.tp(rawio) 1403 self.assertEqual(b"abcd", bufio.read(6)) 1404 self.assertEqual(b"e", bufio.read(1)) 1405 self.assertEqual(b"fg", bufio.read()) 1406 self.assertEqual(b"", bufio.peek(1)) 1407 self.assertIsNone(bufio.read()) 1408 self.assertEqual(b"", bufio.read()) 1409 1410 rawio = self.MockRawIO((b"a", None, None)) 1411 self.assertEqual(b"a", rawio.readall()) 1412 self.assertIsNone(rawio.readall()) 1413 1414 def test_read_past_eof(self): 1415 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1416 bufio = self.tp(rawio) 1417 1418 self.assertEqual(b"abcdefg", bufio.read(9000)) 1419 1420 def test_read_all(self): 1421 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1422 bufio = self.tp(rawio) 1423 1424 self.assertEqual(b"abcdefg", bufio.read()) 1425 1426 @support.requires_resource('cpu') 1427 def test_threads(self): 1428 try: 1429 # Write out many bytes with exactly the same number of 0's, 1430 # 1's... 255's. This will help us check that concurrent reading 1431 # doesn't duplicate or forget contents. 1432 N = 1000 1433 l = list(range(256)) * N 1434 random.shuffle(l) 1435 s = bytes(bytearray(l)) 1436 with self.open(support.TESTFN, "wb") as f: 1437 f.write(s) 1438 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw: 1439 bufio = self.tp(raw, 8) 1440 errors = [] 1441 results = [] 1442 def f(): 1443 try: 1444 # Intra-buffer read then buffer-flushing read 1445 for n in cycle([1, 19]): 1446 s = bufio.read(n) 1447 if not s: 1448 break 1449 # list.append() is atomic 1450 results.append(s) 1451 except Exception as e: 1452 errors.append(e) 1453 raise 1454 threads = [threading.Thread(target=f) for x in range(20)] 1455 with support.start_threads(threads): 1456 time.sleep(0.02) # yield 1457 self.assertFalse(errors, 1458 "the following exceptions were caught: %r" % errors) 1459 s = b''.join(results) 1460 for i in range(256): 1461 c = bytes(bytearray([i])) 1462 self.assertEqual(s.count(c), N) 1463 finally: 1464 support.unlink(support.TESTFN) 1465 1466 def test_unseekable(self): 1467 bufio = self.tp(self.MockUnseekableIO(b"A" * 10)) 1468 self.assertRaises(self.UnsupportedOperation, bufio.tell) 1469 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0) 1470 bufio.read(1) 1471 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0) 1472 self.assertRaises(self.UnsupportedOperation, bufio.tell) 1473 1474 def test_misbehaved_io(self): 1475 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg")) 1476 bufio = self.tp(rawio) 1477 self.assertRaises(OSError, bufio.seek, 0) 1478 self.assertRaises(OSError, bufio.tell) 1479 1480 def test_no_extraneous_read(self): 1481 # Issue #9550; when the raw IO object has satisfied the read request, 1482 # we should not issue any additional reads, otherwise it may block 1483 # (e.g. socket). 1484 bufsize = 16 1485 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2): 1486 rawio = self.MockRawIO([b"x" * n]) 1487 bufio = self.tp(rawio, bufsize) 1488 self.assertEqual(bufio.read(n), b"x" * n) 1489 # Simple case: one raw read is enough to satisfy the request. 1490 self.assertEqual(rawio._extraneous_reads, 0, 1491 "failed for {}: {} != 0".format(n, rawio._extraneous_reads)) 1492 # A more complex case where two raw reads are needed to satisfy 1493 # the request. 1494 rawio = self.MockRawIO([b"x" * (n - 1), b"x"]) 1495 bufio = self.tp(rawio, bufsize) 1496 self.assertEqual(bufio.read(n), b"x" * n) 1497 self.assertEqual(rawio._extraneous_reads, 0, 1498 "failed for {}: {} != 0".format(n, rawio._extraneous_reads)) 1499 1500 def test_read_on_closed(self): 1501 # Issue #23796 1502 b = io.BufferedReader(io.BytesIO(b"12")) 1503 b.read(1) 1504 b.close() 1505 self.assertRaises(ValueError, b.peek) 1506 self.assertRaises(ValueError, b.read1, 1) 1507 1508 1509class CBufferedReaderTest(BufferedReaderTest, SizeofTest): 1510 tp = io.BufferedReader 1511 1512 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing " 1513 "instead of returning NULL for malloc failure.") 1514 def test_constructor(self): 1515 BufferedReaderTest.test_constructor(self) 1516 # The allocation can succeed on 32-bit builds, e.g. with more 1517 # than 2 GiB RAM and a 64-bit kernel. 1518 if sys.maxsize > 0x7FFFFFFF: 1519 rawio = self.MockRawIO() 1520 bufio = self.tp(rawio) 1521 self.assertRaises((OverflowError, MemoryError, ValueError), 1522 bufio.__init__, rawio, sys.maxsize) 1523 1524 def test_initialization(self): 1525 rawio = self.MockRawIO([b"abc"]) 1526 bufio = self.tp(rawio) 1527 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1528 self.assertRaises(ValueError, bufio.read) 1529 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1530 self.assertRaises(ValueError, bufio.read) 1531 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1532 self.assertRaises(ValueError, bufio.read) 1533 1534 def test_misbehaved_io_read(self): 1535 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg")) 1536 bufio = self.tp(rawio) 1537 # _pyio.BufferedReader seems to implement reading different, so that 1538 # checking this is not so easy. 1539 self.assertRaises(OSError, bufio.read, 10) 1540 1541 def test_garbage_collection(self): 1542 # C BufferedReader objects are collected. 1543 # The Python version has __del__, so it ends into gc.garbage instead 1544 self.addCleanup(support.unlink, support.TESTFN) 1545 with support.check_warnings(('', ResourceWarning)): 1546 rawio = self.FileIO(support.TESTFN, "w+b") 1547 f = self.tp(rawio) 1548 f.f = f 1549 wr = weakref.ref(f) 1550 del f 1551 support.gc_collect() 1552 self.assertIsNone(wr(), wr) 1553 1554 def test_args_error(self): 1555 # Issue #17275 1556 with self.assertRaisesRegex(TypeError, "BufferedReader"): 1557 self.tp(io.BytesIO(), 1024, 1024, 1024) 1558 1559 1560class PyBufferedReaderTest(BufferedReaderTest): 1561 tp = pyio.BufferedReader 1562 1563 1564class BufferedWriterTest(unittest.TestCase, CommonBufferedTests): 1565 write_mode = "wb" 1566 1567 def test_constructor(self): 1568 rawio = self.MockRawIO() 1569 bufio = self.tp(rawio) 1570 bufio.__init__(rawio) 1571 bufio.__init__(rawio, buffer_size=1024) 1572 bufio.__init__(rawio, buffer_size=16) 1573 self.assertEqual(3, bufio.write(b"abc")) 1574 bufio.flush() 1575 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1576 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1577 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1578 bufio.__init__(rawio) 1579 self.assertEqual(3, bufio.write(b"ghi")) 1580 bufio.flush() 1581 self.assertEqual(b"".join(rawio._write_stack), b"abcghi") 1582 1583 def test_uninitialized(self): 1584 bufio = self.tp.__new__(self.tp) 1585 del bufio 1586 bufio = self.tp.__new__(self.tp) 1587 self.assertRaisesRegex((ValueError, AttributeError), 1588 'uninitialized|has no attribute', 1589 bufio.write, b'') 1590 bufio.__init__(self.MockRawIO()) 1591 self.assertEqual(bufio.write(b''), 0) 1592 1593 def test_detach_flush(self): 1594 raw = self.MockRawIO() 1595 buf = self.tp(raw) 1596 buf.write(b"howdy!") 1597 self.assertFalse(raw._write_stack) 1598 buf.detach() 1599 self.assertEqual(raw._write_stack, [b"howdy!"]) 1600 1601 def test_write(self): 1602 # Write to the buffered IO but don't overflow the buffer. 1603 writer = self.MockRawIO() 1604 bufio = self.tp(writer, 8) 1605 bufio.write(b"abc") 1606 self.assertFalse(writer._write_stack) 1607 buffer = bytearray(b"def") 1608 bufio.write(buffer) 1609 buffer[:] = b"***" # Overwrite our copy of the data 1610 bufio.flush() 1611 self.assertEqual(b"".join(writer._write_stack), b"abcdef") 1612 1613 def test_write_overflow(self): 1614 writer = self.MockRawIO() 1615 bufio = self.tp(writer, 8) 1616 contents = b"abcdefghijklmnop" 1617 for n in range(0, len(contents), 3): 1618 bufio.write(contents[n:n+3]) 1619 flushed = b"".join(writer._write_stack) 1620 # At least (total - 8) bytes were implicitly flushed, perhaps more 1621 # depending on the implementation. 1622 self.assertTrue(flushed.startswith(contents[:-8]), flushed) 1623 1624 def check_writes(self, intermediate_func): 1625 # Lots of writes, test the flushed output is as expected. 1626 contents = bytes(range(256)) * 1000 1627 n = 0 1628 writer = self.MockRawIO() 1629 bufio = self.tp(writer, 13) 1630 # Generator of write sizes: repeat each N 15 times then proceed to N+1 1631 def gen_sizes(): 1632 for size in count(1): 1633 for i in range(15): 1634 yield size 1635 sizes = gen_sizes() 1636 while n < len(contents): 1637 size = min(next(sizes), len(contents) - n) 1638 self.assertEqual(bufio.write(contents[n:n+size]), size) 1639 intermediate_func(bufio) 1640 n += size 1641 bufio.flush() 1642 self.assertEqual(contents, b"".join(writer._write_stack)) 1643 1644 def test_writes(self): 1645 self.check_writes(lambda bufio: None) 1646 1647 def test_writes_and_flushes(self): 1648 self.check_writes(lambda bufio: bufio.flush()) 1649 1650 def test_writes_and_seeks(self): 1651 def _seekabs(bufio): 1652 pos = bufio.tell() 1653 bufio.seek(pos + 1, 0) 1654 bufio.seek(pos - 1, 0) 1655 bufio.seek(pos, 0) 1656 self.check_writes(_seekabs) 1657 def _seekrel(bufio): 1658 pos = bufio.seek(0, 1) 1659 bufio.seek(+1, 1) 1660 bufio.seek(-1, 1) 1661 bufio.seek(pos, 0) 1662 self.check_writes(_seekrel) 1663 1664 def test_writes_and_truncates(self): 1665 self.check_writes(lambda bufio: bufio.truncate(bufio.tell())) 1666 1667 def test_write_non_blocking(self): 1668 raw = self.MockNonBlockWriterIO() 1669 bufio = self.tp(raw, 8) 1670 1671 self.assertEqual(bufio.write(b"abcd"), 4) 1672 self.assertEqual(bufio.write(b"efghi"), 5) 1673 # 1 byte will be written, the rest will be buffered 1674 raw.block_on(b"k") 1675 self.assertEqual(bufio.write(b"jklmn"), 5) 1676 1677 # 8 bytes will be written, 8 will be buffered and the rest will be lost 1678 raw.block_on(b"0") 1679 try: 1680 bufio.write(b"opqrwxyz0123456789") 1681 except self.BlockingIOError as e: 1682 written = e.characters_written 1683 else: 1684 self.fail("BlockingIOError should have been raised") 1685 self.assertEqual(written, 16) 1686 self.assertEqual(raw.pop_written(), 1687 b"abcdefghijklmnopqrwxyz") 1688 1689 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9) 1690 s = raw.pop_written() 1691 # Previously buffered bytes were flushed 1692 self.assertTrue(s.startswith(b"01234567A"), s) 1693 1694 def test_write_and_rewind(self): 1695 raw = io.BytesIO() 1696 bufio = self.tp(raw, 4) 1697 self.assertEqual(bufio.write(b"abcdef"), 6) 1698 self.assertEqual(bufio.tell(), 6) 1699 bufio.seek(0, 0) 1700 self.assertEqual(bufio.write(b"XY"), 2) 1701 bufio.seek(6, 0) 1702 self.assertEqual(raw.getvalue(), b"XYcdef") 1703 self.assertEqual(bufio.write(b"123456"), 6) 1704 bufio.flush() 1705 self.assertEqual(raw.getvalue(), b"XYcdef123456") 1706 1707 def test_flush(self): 1708 writer = self.MockRawIO() 1709 bufio = self.tp(writer, 8) 1710 bufio.write(b"abc") 1711 bufio.flush() 1712 self.assertEqual(b"abc", writer._write_stack[0]) 1713 1714 def test_writelines(self): 1715 l = [b'ab', b'cd', b'ef'] 1716 writer = self.MockRawIO() 1717 bufio = self.tp(writer, 8) 1718 bufio.writelines(l) 1719 bufio.flush() 1720 self.assertEqual(b''.join(writer._write_stack), b'abcdef') 1721 1722 def test_writelines_userlist(self): 1723 l = UserList([b'ab', b'cd', b'ef']) 1724 writer = self.MockRawIO() 1725 bufio = self.tp(writer, 8) 1726 bufio.writelines(l) 1727 bufio.flush() 1728 self.assertEqual(b''.join(writer._write_stack), b'abcdef') 1729 1730 def test_writelines_error(self): 1731 writer = self.MockRawIO() 1732 bufio = self.tp(writer, 8) 1733 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3]) 1734 self.assertRaises(TypeError, bufio.writelines, None) 1735 self.assertRaises(TypeError, bufio.writelines, 'abc') 1736 1737 def test_destructor(self): 1738 writer = self.MockRawIO() 1739 bufio = self.tp(writer, 8) 1740 bufio.write(b"abc") 1741 del bufio 1742 support.gc_collect() 1743 self.assertEqual(b"abc", writer._write_stack[0]) 1744 1745 def test_truncate(self): 1746 # Truncate implicitly flushes the buffer. 1747 self.addCleanup(support.unlink, support.TESTFN) 1748 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw: 1749 bufio = self.tp(raw, 8) 1750 bufio.write(b"abcdef") 1751 self.assertEqual(bufio.truncate(3), 3) 1752 self.assertEqual(bufio.tell(), 6) 1753 with self.open(support.TESTFN, "rb", buffering=0) as f: 1754 self.assertEqual(f.read(), b"abc") 1755 1756 def test_truncate_after_write(self): 1757 # Ensure that truncate preserves the file position after 1758 # writes longer than the buffer size. 1759 # Issue: https://bugs.python.org/issue32228 1760 self.addCleanup(support.unlink, support.TESTFN) 1761 with self.open(support.TESTFN, "wb") as f: 1762 # Fill with some buffer 1763 f.write(b'\x00' * 10000) 1764 buffer_sizes = [8192, 4096, 200] 1765 for buffer_size in buffer_sizes: 1766 with self.open(support.TESTFN, "r+b", buffering=buffer_size) as f: 1767 f.write(b'\x00' * (buffer_size + 1)) 1768 # After write write_pos and write_end are set to 0 1769 f.read(1) 1770 # read operation makes sure that pos != raw_pos 1771 f.truncate() 1772 self.assertEqual(f.tell(), buffer_size + 2) 1773 1774 @support.requires_resource('cpu') 1775 def test_threads(self): 1776 try: 1777 # Write out many bytes from many threads and test they were 1778 # all flushed. 1779 N = 1000 1780 contents = bytes(range(256)) * N 1781 sizes = cycle([1, 19]) 1782 n = 0 1783 queue = deque() 1784 while n < len(contents): 1785 size = next(sizes) 1786 queue.append(contents[n:n+size]) 1787 n += size 1788 del contents 1789 # We use a real file object because it allows us to 1790 # exercise situations where the GIL is released before 1791 # writing the buffer to the raw streams. This is in addition 1792 # to concurrency issues due to switching threads in the middle 1793 # of Python code. 1794 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw: 1795 bufio = self.tp(raw, 8) 1796 errors = [] 1797 def f(): 1798 try: 1799 while True: 1800 try: 1801 s = queue.popleft() 1802 except IndexError: 1803 return 1804 bufio.write(s) 1805 except Exception as e: 1806 errors.append(e) 1807 raise 1808 threads = [threading.Thread(target=f) for x in range(20)] 1809 with support.start_threads(threads): 1810 time.sleep(0.02) # yield 1811 self.assertFalse(errors, 1812 "the following exceptions were caught: %r" % errors) 1813 bufio.close() 1814 with self.open(support.TESTFN, "rb") as f: 1815 s = f.read() 1816 for i in range(256): 1817 self.assertEqual(s.count(bytes([i])), N) 1818 finally: 1819 support.unlink(support.TESTFN) 1820 1821 def test_misbehaved_io(self): 1822 rawio = self.MisbehavedRawIO() 1823 bufio = self.tp(rawio, 5) 1824 self.assertRaises(OSError, bufio.seek, 0) 1825 self.assertRaises(OSError, bufio.tell) 1826 self.assertRaises(OSError, bufio.write, b"abcdef") 1827 1828 def test_max_buffer_size_removal(self): 1829 with self.assertRaises(TypeError): 1830 self.tp(self.MockRawIO(), 8, 12) 1831 1832 def test_write_error_on_close(self): 1833 raw = self.MockRawIO() 1834 def bad_write(b): 1835 raise OSError() 1836 raw.write = bad_write 1837 b = self.tp(raw) 1838 b.write(b'spam') 1839 self.assertRaises(OSError, b.close) # exception not swallowed 1840 self.assertTrue(b.closed) 1841 1842 def test_slow_close_from_thread(self): 1843 # Issue #31976 1844 rawio = self.SlowFlushRawIO() 1845 bufio = self.tp(rawio, 8) 1846 t = threading.Thread(target=bufio.close) 1847 t.start() 1848 rawio.in_flush.wait() 1849 self.assertRaises(ValueError, bufio.write, b'spam') 1850 self.assertTrue(bufio.closed) 1851 t.join() 1852 1853 1854 1855class CBufferedWriterTest(BufferedWriterTest, SizeofTest): 1856 tp = io.BufferedWriter 1857 1858 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing " 1859 "instead of returning NULL for malloc failure.") 1860 def test_constructor(self): 1861 BufferedWriterTest.test_constructor(self) 1862 # The allocation can succeed on 32-bit builds, e.g. with more 1863 # than 2 GiB RAM and a 64-bit kernel. 1864 if sys.maxsize > 0x7FFFFFFF: 1865 rawio = self.MockRawIO() 1866 bufio = self.tp(rawio) 1867 self.assertRaises((OverflowError, MemoryError, ValueError), 1868 bufio.__init__, rawio, sys.maxsize) 1869 1870 def test_initialization(self): 1871 rawio = self.MockRawIO() 1872 bufio = self.tp(rawio) 1873 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1874 self.assertRaises(ValueError, bufio.write, b"def") 1875 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1876 self.assertRaises(ValueError, bufio.write, b"def") 1877 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1878 self.assertRaises(ValueError, bufio.write, b"def") 1879 1880 def test_garbage_collection(self): 1881 # C BufferedWriter objects are collected, and collecting them flushes 1882 # all data to disk. 1883 # The Python version has __del__, so it ends into gc.garbage instead 1884 self.addCleanup(support.unlink, support.TESTFN) 1885 with support.check_warnings(('', ResourceWarning)): 1886 rawio = self.FileIO(support.TESTFN, "w+b") 1887 f = self.tp(rawio) 1888 f.write(b"123xxx") 1889 f.x = f 1890 wr = weakref.ref(f) 1891 del f 1892 support.gc_collect() 1893 self.assertIsNone(wr(), wr) 1894 with self.open(support.TESTFN, "rb") as f: 1895 self.assertEqual(f.read(), b"123xxx") 1896 1897 def test_args_error(self): 1898 # Issue #17275 1899 with self.assertRaisesRegex(TypeError, "BufferedWriter"): 1900 self.tp(io.BytesIO(), 1024, 1024, 1024) 1901 1902 1903class PyBufferedWriterTest(BufferedWriterTest): 1904 tp = pyio.BufferedWriter 1905 1906class BufferedRWPairTest(unittest.TestCase): 1907 1908 def test_constructor(self): 1909 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1910 self.assertFalse(pair.closed) 1911 1912 def test_uninitialized(self): 1913 pair = self.tp.__new__(self.tp) 1914 del pair 1915 pair = self.tp.__new__(self.tp) 1916 self.assertRaisesRegex((ValueError, AttributeError), 1917 'uninitialized|has no attribute', 1918 pair.read, 0) 1919 self.assertRaisesRegex((ValueError, AttributeError), 1920 'uninitialized|has no attribute', 1921 pair.write, b'') 1922 pair.__init__(self.MockRawIO(), self.MockRawIO()) 1923 self.assertEqual(pair.read(0), b'') 1924 self.assertEqual(pair.write(b''), 0) 1925 1926 def test_detach(self): 1927 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1928 self.assertRaises(self.UnsupportedOperation, pair.detach) 1929 1930 def test_constructor_max_buffer_size_removal(self): 1931 with self.assertRaises(TypeError): 1932 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12) 1933 1934 def test_constructor_with_not_readable(self): 1935 class NotReadable(MockRawIO): 1936 def readable(self): 1937 return False 1938 1939 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO()) 1940 1941 def test_constructor_with_not_writeable(self): 1942 class NotWriteable(MockRawIO): 1943 def writable(self): 1944 return False 1945 1946 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable()) 1947 1948 def test_read(self): 1949 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1950 1951 self.assertEqual(pair.read(3), b"abc") 1952 self.assertEqual(pair.read(1), b"d") 1953 self.assertEqual(pair.read(), b"ef") 1954 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO()) 1955 self.assertEqual(pair.read(None), b"abc") 1956 1957 def test_readlines(self): 1958 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO()) 1959 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"]) 1960 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"]) 1961 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"]) 1962 1963 def test_read1(self): 1964 # .read1() is delegated to the underlying reader object, so this test 1965 # can be shallow. 1966 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1967 1968 self.assertEqual(pair.read1(3), b"abc") 1969 self.assertEqual(pair.read1(), b"def") 1970 1971 def test_readinto(self): 1972 for method in ("readinto", "readinto1"): 1973 with self.subTest(method): 1974 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1975 1976 data = byteslike(b'\0' * 5) 1977 self.assertEqual(getattr(pair, method)(data), 5) 1978 self.assertEqual(bytes(data), b"abcde") 1979 1980 def test_write(self): 1981 w = self.MockRawIO() 1982 pair = self.tp(self.MockRawIO(), w) 1983 1984 pair.write(b"abc") 1985 pair.flush() 1986 buffer = bytearray(b"def") 1987 pair.write(buffer) 1988 buffer[:] = b"***" # Overwrite our copy of the data 1989 pair.flush() 1990 self.assertEqual(w._write_stack, [b"abc", b"def"]) 1991 1992 def test_peek(self): 1993 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1994 1995 self.assertTrue(pair.peek(3).startswith(b"abc")) 1996 self.assertEqual(pair.read(3), b"abc") 1997 1998 def test_readable(self): 1999 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 2000 self.assertTrue(pair.readable()) 2001 2002 def test_writeable(self): 2003 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 2004 self.assertTrue(pair.writable()) 2005 2006 def test_seekable(self): 2007 # BufferedRWPairs are never seekable, even if their readers and writers 2008 # are. 2009 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 2010 self.assertFalse(pair.seekable()) 2011 2012 # .flush() is delegated to the underlying writer object and has been 2013 # tested in the test_write method. 2014 2015 def test_close_and_closed(self): 2016 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 2017 self.assertFalse(pair.closed) 2018 pair.close() 2019 self.assertTrue(pair.closed) 2020 2021 def test_reader_close_error_on_close(self): 2022 def reader_close(): 2023 reader_non_existing 2024 reader = self.MockRawIO() 2025 reader.close = reader_close 2026 writer = self.MockRawIO() 2027 pair = self.tp(reader, writer) 2028 with self.assertRaises(NameError) as err: 2029 pair.close() 2030 self.assertIn('reader_non_existing', str(err.exception)) 2031 self.assertTrue(pair.closed) 2032 self.assertFalse(reader.closed) 2033 self.assertTrue(writer.closed) 2034 2035 def test_writer_close_error_on_close(self): 2036 def writer_close(): 2037 writer_non_existing 2038 reader = self.MockRawIO() 2039 writer = self.MockRawIO() 2040 writer.close = writer_close 2041 pair = self.tp(reader, writer) 2042 with self.assertRaises(NameError) as err: 2043 pair.close() 2044 self.assertIn('writer_non_existing', str(err.exception)) 2045 self.assertFalse(pair.closed) 2046 self.assertTrue(reader.closed) 2047 self.assertFalse(writer.closed) 2048 2049 def test_reader_writer_close_error_on_close(self): 2050 def reader_close(): 2051 reader_non_existing 2052 def writer_close(): 2053 writer_non_existing 2054 reader = self.MockRawIO() 2055 reader.close = reader_close 2056 writer = self.MockRawIO() 2057 writer.close = writer_close 2058 pair = self.tp(reader, writer) 2059 with self.assertRaises(NameError) as err: 2060 pair.close() 2061 self.assertIn('reader_non_existing', str(err.exception)) 2062 self.assertIsInstance(err.exception.__context__, NameError) 2063 self.assertIn('writer_non_existing', str(err.exception.__context__)) 2064 self.assertFalse(pair.closed) 2065 self.assertFalse(reader.closed) 2066 self.assertFalse(writer.closed) 2067 2068 def test_isatty(self): 2069 class SelectableIsAtty(MockRawIO): 2070 def __init__(self, isatty): 2071 MockRawIO.__init__(self) 2072 self._isatty = isatty 2073 2074 def isatty(self): 2075 return self._isatty 2076 2077 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False)) 2078 self.assertFalse(pair.isatty()) 2079 2080 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False)) 2081 self.assertTrue(pair.isatty()) 2082 2083 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True)) 2084 self.assertTrue(pair.isatty()) 2085 2086 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True)) 2087 self.assertTrue(pair.isatty()) 2088 2089 def test_weakref_clearing(self): 2090 brw = self.tp(self.MockRawIO(), self.MockRawIO()) 2091 ref = weakref.ref(brw) 2092 brw = None 2093 ref = None # Shouldn't segfault. 2094 2095class CBufferedRWPairTest(BufferedRWPairTest): 2096 tp = io.BufferedRWPair 2097 2098class PyBufferedRWPairTest(BufferedRWPairTest): 2099 tp = pyio.BufferedRWPair 2100 2101 2102class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest): 2103 read_mode = "rb+" 2104 write_mode = "wb+" 2105 2106 def test_constructor(self): 2107 BufferedReaderTest.test_constructor(self) 2108 BufferedWriterTest.test_constructor(self) 2109 2110 def test_uninitialized(self): 2111 BufferedReaderTest.test_uninitialized(self) 2112 BufferedWriterTest.test_uninitialized(self) 2113 2114 def test_read_and_write(self): 2115 raw = self.MockRawIO((b"asdf", b"ghjk")) 2116 rw = self.tp(raw, 8) 2117 2118 self.assertEqual(b"as", rw.read(2)) 2119 rw.write(b"ddd") 2120 rw.write(b"eee") 2121 self.assertFalse(raw._write_stack) # Buffer writes 2122 self.assertEqual(b"ghjk", rw.read()) 2123 self.assertEqual(b"dddeee", raw._write_stack[0]) 2124 2125 def test_seek_and_tell(self): 2126 raw = self.BytesIO(b"asdfghjkl") 2127 rw = self.tp(raw) 2128 2129 self.assertEqual(b"as", rw.read(2)) 2130 self.assertEqual(2, rw.tell()) 2131 rw.seek(0, 0) 2132 self.assertEqual(b"asdf", rw.read(4)) 2133 2134 rw.write(b"123f") 2135 rw.seek(0, 0) 2136 self.assertEqual(b"asdf123fl", rw.read()) 2137 self.assertEqual(9, rw.tell()) 2138 rw.seek(-4, 2) 2139 self.assertEqual(5, rw.tell()) 2140 rw.seek(2, 1) 2141 self.assertEqual(7, rw.tell()) 2142 self.assertEqual(b"fl", rw.read(11)) 2143 rw.flush() 2144 self.assertEqual(b"asdf123fl", raw.getvalue()) 2145 2146 self.assertRaises(TypeError, rw.seek, 0.0) 2147 2148 def check_flush_and_read(self, read_func): 2149 raw = self.BytesIO(b"abcdefghi") 2150 bufio = self.tp(raw) 2151 2152 self.assertEqual(b"ab", read_func(bufio, 2)) 2153 bufio.write(b"12") 2154 self.assertEqual(b"ef", read_func(bufio, 2)) 2155 self.assertEqual(6, bufio.tell()) 2156 bufio.flush() 2157 self.assertEqual(6, bufio.tell()) 2158 self.assertEqual(b"ghi", read_func(bufio)) 2159 raw.seek(0, 0) 2160 raw.write(b"XYZ") 2161 # flush() resets the read buffer 2162 bufio.flush() 2163 bufio.seek(0, 0) 2164 self.assertEqual(b"XYZ", read_func(bufio, 3)) 2165 2166 def test_flush_and_read(self): 2167 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args)) 2168 2169 def test_flush_and_readinto(self): 2170 def _readinto(bufio, n=-1): 2171 b = bytearray(n if n >= 0 else 9999) 2172 n = bufio.readinto(b) 2173 return bytes(b[:n]) 2174 self.check_flush_and_read(_readinto) 2175 2176 def test_flush_and_peek(self): 2177 def _peek(bufio, n=-1): 2178 # This relies on the fact that the buffer can contain the whole 2179 # raw stream, otherwise peek() can return less. 2180 b = bufio.peek(n) 2181 if n != -1: 2182 b = b[:n] 2183 bufio.seek(len(b), 1) 2184 return b 2185 self.check_flush_and_read(_peek) 2186 2187 def test_flush_and_write(self): 2188 raw = self.BytesIO(b"abcdefghi") 2189 bufio = self.tp(raw) 2190 2191 bufio.write(b"123") 2192 bufio.flush() 2193 bufio.write(b"45") 2194 bufio.flush() 2195 bufio.seek(0, 0) 2196 self.assertEqual(b"12345fghi", raw.getvalue()) 2197 self.assertEqual(b"12345fghi", bufio.read()) 2198 2199 def test_threads(self): 2200 BufferedReaderTest.test_threads(self) 2201 BufferedWriterTest.test_threads(self) 2202 2203 def test_writes_and_peek(self): 2204 def _peek(bufio): 2205 bufio.peek(1) 2206 self.check_writes(_peek) 2207 def _peek(bufio): 2208 pos = bufio.tell() 2209 bufio.seek(-1, 1) 2210 bufio.peek(1) 2211 bufio.seek(pos, 0) 2212 self.check_writes(_peek) 2213 2214 def test_writes_and_reads(self): 2215 def _read(bufio): 2216 bufio.seek(-1, 1) 2217 bufio.read(1) 2218 self.check_writes(_read) 2219 2220 def test_writes_and_read1s(self): 2221 def _read1(bufio): 2222 bufio.seek(-1, 1) 2223 bufio.read1(1) 2224 self.check_writes(_read1) 2225 2226 def test_writes_and_readintos(self): 2227 def _read(bufio): 2228 bufio.seek(-1, 1) 2229 bufio.readinto(bytearray(1)) 2230 self.check_writes(_read) 2231 2232 def test_write_after_readahead(self): 2233 # Issue #6629: writing after the buffer was filled by readahead should 2234 # first rewind the raw stream. 2235 for overwrite_size in [1, 5]: 2236 raw = self.BytesIO(b"A" * 10) 2237 bufio = self.tp(raw, 4) 2238 # Trigger readahead 2239 self.assertEqual(bufio.read(1), b"A") 2240 self.assertEqual(bufio.tell(), 1) 2241 # Overwriting should rewind the raw stream if it needs so 2242 bufio.write(b"B" * overwrite_size) 2243 self.assertEqual(bufio.tell(), overwrite_size + 1) 2244 # If the write size was smaller than the buffer size, flush() and 2245 # check that rewind happens. 2246 bufio.flush() 2247 self.assertEqual(bufio.tell(), overwrite_size + 1) 2248 s = raw.getvalue() 2249 self.assertEqual(s, 2250 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size)) 2251 2252 def test_write_rewind_write(self): 2253 # Various combinations of reading / writing / seeking backwards / writing again 2254 def mutate(bufio, pos1, pos2): 2255 assert pos2 >= pos1 2256 # Fill the buffer 2257 bufio.seek(pos1) 2258 bufio.read(pos2 - pos1) 2259 bufio.write(b'\x02') 2260 # This writes earlier than the previous write, but still inside 2261 # the buffer. 2262 bufio.seek(pos1) 2263 bufio.write(b'\x01') 2264 2265 b = b"\x80\x81\x82\x83\x84" 2266 for i in range(0, len(b)): 2267 for j in range(i, len(b)): 2268 raw = self.BytesIO(b) 2269 bufio = self.tp(raw, 100) 2270 mutate(bufio, i, j) 2271 bufio.flush() 2272 expected = bytearray(b) 2273 expected[j] = 2 2274 expected[i] = 1 2275 self.assertEqual(raw.getvalue(), expected, 2276 "failed result for i=%d, j=%d" % (i, j)) 2277 2278 def test_truncate_after_read_or_write(self): 2279 raw = self.BytesIO(b"A" * 10) 2280 bufio = self.tp(raw, 100) 2281 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled 2282 self.assertEqual(bufio.truncate(), 2) 2283 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases 2284 self.assertEqual(bufio.truncate(), 4) 2285 2286 def test_misbehaved_io(self): 2287 BufferedReaderTest.test_misbehaved_io(self) 2288 BufferedWriterTest.test_misbehaved_io(self) 2289 2290 def test_interleaved_read_write(self): 2291 # Test for issue #12213 2292 with self.BytesIO(b'abcdefgh') as raw: 2293 with self.tp(raw, 100) as f: 2294 f.write(b"1") 2295 self.assertEqual(f.read(1), b'b') 2296 f.write(b'2') 2297 self.assertEqual(f.read1(1), b'd') 2298 f.write(b'3') 2299 buf = bytearray(1) 2300 f.readinto(buf) 2301 self.assertEqual(buf, b'f') 2302 f.write(b'4') 2303 self.assertEqual(f.peek(1), b'h') 2304 f.flush() 2305 self.assertEqual(raw.getvalue(), b'1b2d3f4h') 2306 2307 with self.BytesIO(b'abc') as raw: 2308 with self.tp(raw, 100) as f: 2309 self.assertEqual(f.read(1), b'a') 2310 f.write(b"2") 2311 self.assertEqual(f.read(1), b'c') 2312 f.flush() 2313 self.assertEqual(raw.getvalue(), b'a2c') 2314 2315 def test_interleaved_readline_write(self): 2316 with self.BytesIO(b'ab\ncdef\ng\n') as raw: 2317 with self.tp(raw) as f: 2318 f.write(b'1') 2319 self.assertEqual(f.readline(), b'b\n') 2320 f.write(b'2') 2321 self.assertEqual(f.readline(), b'def\n') 2322 f.write(b'3') 2323 self.assertEqual(f.readline(), b'\n') 2324 f.flush() 2325 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n') 2326 2327 # You can't construct a BufferedRandom over a non-seekable stream. 2328 test_unseekable = None 2329 2330 2331class CBufferedRandomTest(BufferedRandomTest, SizeofTest): 2332 tp = io.BufferedRandom 2333 2334 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing " 2335 "instead of returning NULL for malloc failure.") 2336 def test_constructor(self): 2337 BufferedRandomTest.test_constructor(self) 2338 # The allocation can succeed on 32-bit builds, e.g. with more 2339 # than 2 GiB RAM and a 64-bit kernel. 2340 if sys.maxsize > 0x7FFFFFFF: 2341 rawio = self.MockRawIO() 2342 bufio = self.tp(rawio) 2343 self.assertRaises((OverflowError, MemoryError, ValueError), 2344 bufio.__init__, rawio, sys.maxsize) 2345 2346 def test_garbage_collection(self): 2347 CBufferedReaderTest.test_garbage_collection(self) 2348 CBufferedWriterTest.test_garbage_collection(self) 2349 2350 def test_args_error(self): 2351 # Issue #17275 2352 with self.assertRaisesRegex(TypeError, "BufferedRandom"): 2353 self.tp(io.BytesIO(), 1024, 1024, 1024) 2354 2355 2356class PyBufferedRandomTest(BufferedRandomTest): 2357 tp = pyio.BufferedRandom 2358 2359 2360# To fully exercise seek/tell, the StatefulIncrementalDecoder has these 2361# properties: 2362# - A single output character can correspond to many bytes of input. 2363# - The number of input bytes to complete the character can be 2364# undetermined until the last input byte is received. 2365# - The number of input bytes can vary depending on previous input. 2366# - A single input byte can correspond to many characters of output. 2367# - The number of output characters can be undetermined until the 2368# last input byte is received. 2369# - The number of output characters can vary depending on previous input. 2370 2371class StatefulIncrementalDecoder(codecs.IncrementalDecoder): 2372 """ 2373 For testing seek/tell behavior with a stateful, buffering decoder. 2374 2375 Input is a sequence of words. Words may be fixed-length (length set 2376 by input) or variable-length (period-terminated). In variable-length 2377 mode, extra periods are ignored. Possible words are: 2378 - 'i' followed by a number sets the input length, I (maximum 99). 2379 When I is set to 0, words are space-terminated. 2380 - 'o' followed by a number sets the output length, O (maximum 99). 2381 - Any other word is converted into a word followed by a period on 2382 the output. The output word consists of the input word truncated 2383 or padded out with hyphens to make its length equal to O. If O 2384 is 0, the word is output verbatim without truncating or padding. 2385 I and O are initially set to 1. When I changes, any buffered input is 2386 re-scanned according to the new I. EOF also terminates the last word. 2387 """ 2388 2389 def __init__(self, errors='strict'): 2390 codecs.IncrementalDecoder.__init__(self, errors) 2391 self.reset() 2392 2393 def __repr__(self): 2394 return '<SID %x>' % id(self) 2395 2396 def reset(self): 2397 self.i = 1 2398 self.o = 1 2399 self.buffer = bytearray() 2400 2401 def getstate(self): 2402 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset() 2403 return bytes(self.buffer), i*100 + o 2404 2405 def setstate(self, state): 2406 buffer, io = state 2407 self.buffer = bytearray(buffer) 2408 i, o = divmod(io, 100) 2409 self.i, self.o = i ^ 1, o ^ 1 2410 2411 def decode(self, input, final=False): 2412 output = '' 2413 for b in input: 2414 if self.i == 0: # variable-length, terminated with period 2415 if b == ord('.'): 2416 if self.buffer: 2417 output += self.process_word() 2418 else: 2419 self.buffer.append(b) 2420 else: # fixed-length, terminate after self.i bytes 2421 self.buffer.append(b) 2422 if len(self.buffer) == self.i: 2423 output += self.process_word() 2424 if final and self.buffer: # EOF terminates the last word 2425 output += self.process_word() 2426 return output 2427 2428 def process_word(self): 2429 output = '' 2430 if self.buffer[0] == ord('i'): 2431 self.i = min(99, int(self.buffer[1:] or 0)) # set input length 2432 elif self.buffer[0] == ord('o'): 2433 self.o = min(99, int(self.buffer[1:] or 0)) # set output length 2434 else: 2435 output = self.buffer.decode('ascii') 2436 if len(output) < self.o: 2437 output += '-'*self.o # pad out with hyphens 2438 if self.o: 2439 output = output[:self.o] # truncate to output length 2440 output += '.' 2441 self.buffer = bytearray() 2442 return output 2443 2444 codecEnabled = False 2445 2446 @classmethod 2447 def lookupTestDecoder(cls, name): 2448 if cls.codecEnabled and name == 'test_decoder': 2449 latin1 = codecs.lookup('latin-1') 2450 return codecs.CodecInfo( 2451 name='test_decoder', encode=latin1.encode, decode=None, 2452 incrementalencoder=None, 2453 streamreader=None, streamwriter=None, 2454 incrementaldecoder=cls) 2455 2456# Register the previous decoder for testing. 2457# Disabled by default, tests will enable it. 2458codecs.register(StatefulIncrementalDecoder.lookupTestDecoder) 2459 2460 2461class StatefulIncrementalDecoderTest(unittest.TestCase): 2462 """ 2463 Make sure the StatefulIncrementalDecoder actually works. 2464 """ 2465 2466 test_cases = [ 2467 # I=1, O=1 (fixed-length input == fixed-length output) 2468 (b'abcd', False, 'a.b.c.d.'), 2469 # I=0, O=0 (variable-length input, variable-length output) 2470 (b'oiabcd', True, 'abcd.'), 2471 # I=0, O=0 (should ignore extra periods) 2472 (b'oi...abcd...', True, 'abcd.'), 2473 # I=0, O=6 (variable-length input, fixed-length output) 2474 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'), 2475 # I=2, O=6 (fixed-length input < fixed-length output) 2476 (b'i.i2.o6xyz', True, 'xy----.z-----.'), 2477 # I=6, O=3 (fixed-length input > fixed-length output) 2478 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'), 2479 # I=0, then 3; O=29, then 15 (with longer output) 2480 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True, 2481 'a----------------------------.' + 2482 'b----------------------------.' + 2483 'cde--------------------------.' + 2484 'abcdefghijabcde.' + 2485 'a.b------------.' + 2486 '.c.------------.' + 2487 'd.e------------.' + 2488 'k--------------.' + 2489 'l--------------.' + 2490 'm--------------.') 2491 ] 2492 2493 def test_decoder(self): 2494 # Try a few one-shot test cases. 2495 for input, eof, output in self.test_cases: 2496 d = StatefulIncrementalDecoder() 2497 self.assertEqual(d.decode(input, eof), output) 2498 2499 # Also test an unfinished decode, followed by forcing EOF. 2500 d = StatefulIncrementalDecoder() 2501 self.assertEqual(d.decode(b'oiabcd'), '') 2502 self.assertEqual(d.decode(b'', 1), 'abcd.') 2503 2504class TextIOWrapperTest(unittest.TestCase): 2505 2506 def setUp(self): 2507 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n" 2508 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii") 2509 support.unlink(support.TESTFN) 2510 2511 def tearDown(self): 2512 support.unlink(support.TESTFN) 2513 2514 def test_constructor(self): 2515 r = self.BytesIO(b"\xc3\xa9\n\n") 2516 b = self.BufferedReader(r, 1000) 2517 t = self.TextIOWrapper(b) 2518 t.__init__(b, encoding="latin-1", newline="\r\n") 2519 self.assertEqual(t.encoding, "latin-1") 2520 self.assertEqual(t.line_buffering, False) 2521 t.__init__(b, encoding="utf-8", line_buffering=True) 2522 self.assertEqual(t.encoding, "utf-8") 2523 self.assertEqual(t.line_buffering, True) 2524 self.assertEqual("\xe9\n", t.readline()) 2525 self.assertRaises(TypeError, t.__init__, b, newline=42) 2526 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy') 2527 2528 def test_uninitialized(self): 2529 t = self.TextIOWrapper.__new__(self.TextIOWrapper) 2530 del t 2531 t = self.TextIOWrapper.__new__(self.TextIOWrapper) 2532 self.assertRaises(Exception, repr, t) 2533 self.assertRaisesRegex((ValueError, AttributeError), 2534 'uninitialized|has no attribute', 2535 t.read, 0) 2536 t.__init__(self.MockRawIO()) 2537 self.assertEqual(t.read(0), '') 2538 2539 def test_non_text_encoding_codecs_are_rejected(self): 2540 # Ensure the constructor complains if passed a codec that isn't 2541 # marked as a text encoding 2542 # http://bugs.python.org/issue20404 2543 r = self.BytesIO() 2544 b = self.BufferedWriter(r) 2545 with self.assertRaisesRegex(LookupError, "is not a text encoding"): 2546 self.TextIOWrapper(b, encoding="hex") 2547 2548 def test_detach(self): 2549 r = self.BytesIO() 2550 b = self.BufferedWriter(r) 2551 t = self.TextIOWrapper(b) 2552 self.assertIs(t.detach(), b) 2553 2554 t = self.TextIOWrapper(b, encoding="ascii") 2555 t.write("howdy") 2556 self.assertFalse(r.getvalue()) 2557 t.detach() 2558 self.assertEqual(r.getvalue(), b"howdy") 2559 self.assertRaises(ValueError, t.detach) 2560 2561 # Operations independent of the detached stream should still work 2562 repr(t) 2563 self.assertEqual(t.encoding, "ascii") 2564 self.assertEqual(t.errors, "strict") 2565 self.assertFalse(t.line_buffering) 2566 self.assertFalse(t.write_through) 2567 2568 def test_repr(self): 2569 raw = self.BytesIO("hello".encode("utf-8")) 2570 b = self.BufferedReader(raw) 2571 t = self.TextIOWrapper(b, encoding="utf-8") 2572 modname = self.TextIOWrapper.__module__ 2573 self.assertRegex(repr(t), 2574 r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname) 2575 raw.name = "dummy" 2576 self.assertRegex(repr(t), 2577 r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname) 2578 t.mode = "r" 2579 self.assertRegex(repr(t), 2580 r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname) 2581 raw.name = b"dummy" 2582 self.assertRegex(repr(t), 2583 r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname) 2584 2585 t.buffer.detach() 2586 repr(t) # Should not raise an exception 2587 2588 def test_recursive_repr(self): 2589 # Issue #25455 2590 raw = self.BytesIO() 2591 t = self.TextIOWrapper(raw) 2592 with support.swap_attr(raw, 'name', t): 2593 try: 2594 repr(t) # Should not crash 2595 except RuntimeError: 2596 pass 2597 2598 def test_line_buffering(self): 2599 r = self.BytesIO() 2600 b = self.BufferedWriter(r, 1000) 2601 t = self.TextIOWrapper(b, newline="\n", line_buffering=True) 2602 t.write("X") 2603 self.assertEqual(r.getvalue(), b"") # No flush happened 2604 t.write("Y\nZ") 2605 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed 2606 t.write("A\rB") 2607 self.assertEqual(r.getvalue(), b"XY\nZA\rB") 2608 2609 def test_reconfigure_line_buffering(self): 2610 r = self.BytesIO() 2611 b = self.BufferedWriter(r, 1000) 2612 t = self.TextIOWrapper(b, newline="\n", line_buffering=False) 2613 t.write("AB\nC") 2614 self.assertEqual(r.getvalue(), b"") 2615 2616 t.reconfigure(line_buffering=True) # implicit flush 2617 self.assertEqual(r.getvalue(), b"AB\nC") 2618 t.write("DEF\nG") 2619 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG") 2620 t.write("H") 2621 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG") 2622 t.reconfigure(line_buffering=False) # implicit flush 2623 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH") 2624 t.write("IJ") 2625 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH") 2626 2627 # Keeping default value 2628 t.reconfigure() 2629 t.reconfigure(line_buffering=None) 2630 self.assertEqual(t.line_buffering, False) 2631 t.reconfigure(line_buffering=True) 2632 t.reconfigure() 2633 t.reconfigure(line_buffering=None) 2634 self.assertEqual(t.line_buffering, True) 2635 2636 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled") 2637 def test_default_encoding(self): 2638 old_environ = dict(os.environ) 2639 try: 2640 # try to get a user preferred encoding different than the current 2641 # locale encoding to check that TextIOWrapper() uses the current 2642 # locale encoding and not the user preferred encoding 2643 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'): 2644 if key in os.environ: 2645 del os.environ[key] 2646 2647 current_locale_encoding = locale.getpreferredencoding(False) 2648 b = self.BytesIO() 2649 t = self.TextIOWrapper(b) 2650 self.assertEqual(t.encoding, current_locale_encoding) 2651 finally: 2652 os.environ.clear() 2653 os.environ.update(old_environ) 2654 2655 @support.cpython_only 2656 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled") 2657 def test_device_encoding(self): 2658 # Issue 15989 2659 import _testcapi 2660 b = self.BytesIO() 2661 b.fileno = lambda: _testcapi.INT_MAX + 1 2662 self.assertRaises(OverflowError, self.TextIOWrapper, b) 2663 b.fileno = lambda: _testcapi.UINT_MAX + 1 2664 self.assertRaises(OverflowError, self.TextIOWrapper, b) 2665 2666 def test_encoding(self): 2667 # Check the encoding attribute is always set, and valid 2668 b = self.BytesIO() 2669 t = self.TextIOWrapper(b, encoding="utf-8") 2670 self.assertEqual(t.encoding, "utf-8") 2671 t = self.TextIOWrapper(b) 2672 self.assertIsNotNone(t.encoding) 2673 codecs.lookup(t.encoding) 2674 2675 def test_encoding_errors_reading(self): 2676 # (1) default 2677 b = self.BytesIO(b"abc\n\xff\n") 2678 t = self.TextIOWrapper(b, encoding="ascii") 2679 self.assertRaises(UnicodeError, t.read) 2680 # (2) explicit strict 2681 b = self.BytesIO(b"abc\n\xff\n") 2682 t = self.TextIOWrapper(b, encoding="ascii", errors="strict") 2683 self.assertRaises(UnicodeError, t.read) 2684 # (3) ignore 2685 b = self.BytesIO(b"abc\n\xff\n") 2686 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore") 2687 self.assertEqual(t.read(), "abc\n\n") 2688 # (4) replace 2689 b = self.BytesIO(b"abc\n\xff\n") 2690 t = self.TextIOWrapper(b, encoding="ascii", errors="replace") 2691 self.assertEqual(t.read(), "abc\n\ufffd\n") 2692 2693 def test_encoding_errors_writing(self): 2694 # (1) default 2695 b = self.BytesIO() 2696 t = self.TextIOWrapper(b, encoding="ascii") 2697 self.assertRaises(UnicodeError, t.write, "\xff") 2698 # (2) explicit strict 2699 b = self.BytesIO() 2700 t = self.TextIOWrapper(b, encoding="ascii", errors="strict") 2701 self.assertRaises(UnicodeError, t.write, "\xff") 2702 # (3) ignore 2703 b = self.BytesIO() 2704 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore", 2705 newline="\n") 2706 t.write("abc\xffdef\n") 2707 t.flush() 2708 self.assertEqual(b.getvalue(), b"abcdef\n") 2709 # (4) replace 2710 b = self.BytesIO() 2711 t = self.TextIOWrapper(b, encoding="ascii", errors="replace", 2712 newline="\n") 2713 t.write("abc\xffdef\n") 2714 t.flush() 2715 self.assertEqual(b.getvalue(), b"abc?def\n") 2716 2717 def test_newlines(self): 2718 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ] 2719 2720 tests = [ 2721 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ], 2722 [ '', input_lines ], 2723 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ], 2724 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ], 2725 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ], 2726 ] 2727 encodings = ( 2728 'utf-8', 'latin-1', 2729 'utf-16', 'utf-16-le', 'utf-16-be', 2730 'utf-32', 'utf-32-le', 'utf-32-be', 2731 ) 2732 2733 # Try a range of buffer sizes to test the case where \r is the last 2734 # character in TextIOWrapper._pending_line. 2735 for encoding in encodings: 2736 # XXX: str.encode() should return bytes 2737 data = bytes(''.join(input_lines).encode(encoding)) 2738 for do_reads in (False, True): 2739 for bufsize in range(1, 10): 2740 for newline, exp_lines in tests: 2741 bufio = self.BufferedReader(self.BytesIO(data), bufsize) 2742 textio = self.TextIOWrapper(bufio, newline=newline, 2743 encoding=encoding) 2744 if do_reads: 2745 got_lines = [] 2746 while True: 2747 c2 = textio.read(2) 2748 if c2 == '': 2749 break 2750 self.assertEqual(len(c2), 2) 2751 got_lines.append(c2 + textio.readline()) 2752 else: 2753 got_lines = list(textio) 2754 2755 for got_line, exp_line in zip(got_lines, exp_lines): 2756 self.assertEqual(got_line, exp_line) 2757 self.assertEqual(len(got_lines), len(exp_lines)) 2758 2759 def test_newlines_input(self): 2760 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG" 2761 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n") 2762 for newline, expected in [ 2763 (None, normalized.decode("ascii").splitlines(keepends=True)), 2764 ("", testdata.decode("ascii").splitlines(keepends=True)), 2765 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), 2766 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), 2767 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]), 2768 ]: 2769 buf = self.BytesIO(testdata) 2770 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) 2771 self.assertEqual(txt.readlines(), expected) 2772 txt.seek(0) 2773 self.assertEqual(txt.read(), "".join(expected)) 2774 2775 def test_newlines_output(self): 2776 testdict = { 2777 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ", 2778 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ", 2779 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ", 2780 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ", 2781 } 2782 tests = [(None, testdict[os.linesep])] + sorted(testdict.items()) 2783 for newline, expected in tests: 2784 buf = self.BytesIO() 2785 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) 2786 txt.write("AAA\nB") 2787 txt.write("BB\nCCC\n") 2788 txt.write("X\rY\r\nZ") 2789 txt.flush() 2790 self.assertEqual(buf.closed, False) 2791 self.assertEqual(buf.getvalue(), expected) 2792 2793 def test_destructor(self): 2794 l = [] 2795 base = self.BytesIO 2796 class MyBytesIO(base): 2797 def close(self): 2798 l.append(self.getvalue()) 2799 base.close(self) 2800 b = MyBytesIO() 2801 t = self.TextIOWrapper(b, encoding="ascii") 2802 t.write("abc") 2803 del t 2804 support.gc_collect() 2805 self.assertEqual([b"abc"], l) 2806 2807 def test_override_destructor(self): 2808 record = [] 2809 class MyTextIO(self.TextIOWrapper): 2810 def __del__(self): 2811 record.append(1) 2812 try: 2813 f = super().__del__ 2814 except AttributeError: 2815 pass 2816 else: 2817 f() 2818 def close(self): 2819 record.append(2) 2820 super().close() 2821 def flush(self): 2822 record.append(3) 2823 super().flush() 2824 b = self.BytesIO() 2825 t = MyTextIO(b, encoding="ascii") 2826 del t 2827 support.gc_collect() 2828 self.assertEqual(record, [1, 2, 3]) 2829 2830 def test_error_through_destructor(self): 2831 # Test that the exception state is not modified by a destructor, 2832 # even if close() fails. 2833 rawio = self.CloseFailureIO() 2834 def f(): 2835 self.TextIOWrapper(rawio).xyzzy 2836 with support.captured_output("stderr") as s: 2837 self.assertRaises(AttributeError, f) 2838 s = s.getvalue().strip() 2839 if s: 2840 # The destructor *may* have printed an unraisable error, check it 2841 self.assertEqual(len(s.splitlines()), 1) 2842 self.assertTrue(s.startswith("Exception OSError: "), s) 2843 self.assertTrue(s.endswith(" ignored"), s) 2844 2845 # Systematic tests of the text I/O API 2846 2847 def test_basic_io(self): 2848 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65): 2849 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le": 2850 f = self.open(support.TESTFN, "w+", encoding=enc) 2851 f._CHUNK_SIZE = chunksize 2852 self.assertEqual(f.write("abc"), 3) 2853 f.close() 2854 f = self.open(support.TESTFN, "r+", encoding=enc) 2855 f._CHUNK_SIZE = chunksize 2856 self.assertEqual(f.tell(), 0) 2857 self.assertEqual(f.read(), "abc") 2858 cookie = f.tell() 2859 self.assertEqual(f.seek(0), 0) 2860 self.assertEqual(f.read(None), "abc") 2861 f.seek(0) 2862 self.assertEqual(f.read(2), "ab") 2863 self.assertEqual(f.read(1), "c") 2864 self.assertEqual(f.read(1), "") 2865 self.assertEqual(f.read(), "") 2866 self.assertEqual(f.tell(), cookie) 2867 self.assertEqual(f.seek(0), 0) 2868 self.assertEqual(f.seek(0, 2), cookie) 2869 self.assertEqual(f.write("def"), 3) 2870 self.assertEqual(f.seek(cookie), cookie) 2871 self.assertEqual(f.read(), "def") 2872 if enc.startswith("utf"): 2873 self.multi_line_test(f, enc) 2874 f.close() 2875 2876 def multi_line_test(self, f, enc): 2877 f.seek(0) 2878 f.truncate() 2879 sample = "s\xff\u0fff\uffff" 2880 wlines = [] 2881 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000): 2882 chars = [] 2883 for i in range(size): 2884 chars.append(sample[i % len(sample)]) 2885 line = "".join(chars) + "\n" 2886 wlines.append((f.tell(), line)) 2887 f.write(line) 2888 f.seek(0) 2889 rlines = [] 2890 while True: 2891 pos = f.tell() 2892 line = f.readline() 2893 if not line: 2894 break 2895 rlines.append((pos, line)) 2896 self.assertEqual(rlines, wlines) 2897 2898 def test_telling(self): 2899 f = self.open(support.TESTFN, "w+", encoding="utf-8") 2900 p0 = f.tell() 2901 f.write("\xff\n") 2902 p1 = f.tell() 2903 f.write("\xff\n") 2904 p2 = f.tell() 2905 f.seek(0) 2906 self.assertEqual(f.tell(), p0) 2907 self.assertEqual(f.readline(), "\xff\n") 2908 self.assertEqual(f.tell(), p1) 2909 self.assertEqual(f.readline(), "\xff\n") 2910 self.assertEqual(f.tell(), p2) 2911 f.seek(0) 2912 for line in f: 2913 self.assertEqual(line, "\xff\n") 2914 self.assertRaises(OSError, f.tell) 2915 self.assertEqual(f.tell(), p2) 2916 f.close() 2917 2918 def test_seeking(self): 2919 chunk_size = _default_chunk_size() 2920 prefix_size = chunk_size - 2 2921 u_prefix = "a" * prefix_size 2922 prefix = bytes(u_prefix.encode("utf-8")) 2923 self.assertEqual(len(u_prefix), len(prefix)) 2924 u_suffix = "\u8888\n" 2925 suffix = bytes(u_suffix.encode("utf-8")) 2926 line = prefix + suffix 2927 with self.open(support.TESTFN, "wb") as f: 2928 f.write(line*2) 2929 with self.open(support.TESTFN, "r", encoding="utf-8") as f: 2930 s = f.read(prefix_size) 2931 self.assertEqual(s, str(prefix, "ascii")) 2932 self.assertEqual(f.tell(), prefix_size) 2933 self.assertEqual(f.readline(), u_suffix) 2934 2935 def test_seeking_too(self): 2936 # Regression test for a specific bug 2937 data = b'\xe0\xbf\xbf\n' 2938 with self.open(support.TESTFN, "wb") as f: 2939 f.write(data) 2940 with self.open(support.TESTFN, "r", encoding="utf-8") as f: 2941 f._CHUNK_SIZE # Just test that it exists 2942 f._CHUNK_SIZE = 2 2943 f.readline() 2944 f.tell() 2945 2946 def test_seek_and_tell(self): 2947 #Test seek/tell using the StatefulIncrementalDecoder. 2948 # Make test faster by doing smaller seeks 2949 CHUNK_SIZE = 128 2950 2951 def test_seek_and_tell_with_data(data, min_pos=0): 2952 """Tell/seek to various points within a data stream and ensure 2953 that the decoded data returned by read() is consistent.""" 2954 f = self.open(support.TESTFN, 'wb') 2955 f.write(data) 2956 f.close() 2957 f = self.open(support.TESTFN, encoding='test_decoder') 2958 f._CHUNK_SIZE = CHUNK_SIZE 2959 decoded = f.read() 2960 f.close() 2961 2962 for i in range(min_pos, len(decoded) + 1): # seek positions 2963 for j in [1, 5, len(decoded) - i]: # read lengths 2964 f = self.open(support.TESTFN, encoding='test_decoder') 2965 self.assertEqual(f.read(i), decoded[:i]) 2966 cookie = f.tell() 2967 self.assertEqual(f.read(j), decoded[i:i + j]) 2968 f.seek(cookie) 2969 self.assertEqual(f.read(), decoded[i:]) 2970 f.close() 2971 2972 # Enable the test decoder. 2973 StatefulIncrementalDecoder.codecEnabled = 1 2974 2975 # Run the tests. 2976 try: 2977 # Try each test case. 2978 for input, _, _ in StatefulIncrementalDecoderTest.test_cases: 2979 test_seek_and_tell_with_data(input) 2980 2981 # Position each test case so that it crosses a chunk boundary. 2982 for input, _, _ in StatefulIncrementalDecoderTest.test_cases: 2983 offset = CHUNK_SIZE - len(input)//2 2984 prefix = b'.'*offset 2985 # Don't bother seeking into the prefix (takes too long). 2986 min_pos = offset*2 2987 test_seek_and_tell_with_data(prefix + input, min_pos) 2988 2989 # Ensure our test decoder won't interfere with subsequent tests. 2990 finally: 2991 StatefulIncrementalDecoder.codecEnabled = 0 2992 2993 def test_encoded_writes(self): 2994 data = "1234567890" 2995 tests = ("utf-16", 2996 "utf-16-le", 2997 "utf-16-be", 2998 "utf-32", 2999 "utf-32-le", 3000 "utf-32-be") 3001 for encoding in tests: 3002 buf = self.BytesIO() 3003 f = self.TextIOWrapper(buf, encoding=encoding) 3004 # Check if the BOM is written only once (see issue1753). 3005 f.write(data) 3006 f.write(data) 3007 f.seek(0) 3008 self.assertEqual(f.read(), data * 2) 3009 f.seek(0) 3010 self.assertEqual(f.read(), data * 2) 3011 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding)) 3012 3013 def test_unreadable(self): 3014 class UnReadable(self.BytesIO): 3015 def readable(self): 3016 return False 3017 txt = self.TextIOWrapper(UnReadable()) 3018 self.assertRaises(OSError, txt.read) 3019 3020 def test_read_one_by_one(self): 3021 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB")) 3022 reads = "" 3023 while True: 3024 c = txt.read(1) 3025 if not c: 3026 break 3027 reads += c 3028 self.assertEqual(reads, "AA\nBB") 3029 3030 def test_readlines(self): 3031 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC")) 3032 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"]) 3033 txt.seek(0) 3034 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"]) 3035 txt.seek(0) 3036 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"]) 3037 3038 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128. 3039 def test_read_by_chunk(self): 3040 # make sure "\r\n" straddles 128 char boundary. 3041 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB")) 3042 reads = "" 3043 while True: 3044 c = txt.read(128) 3045 if not c: 3046 break 3047 reads += c 3048 self.assertEqual(reads, "A"*127+"\nB") 3049 3050 def test_writelines(self): 3051 l = ['ab', 'cd', 'ef'] 3052 buf = self.BytesIO() 3053 txt = self.TextIOWrapper(buf) 3054 txt.writelines(l) 3055 txt.flush() 3056 self.assertEqual(buf.getvalue(), b'abcdef') 3057 3058 def test_writelines_userlist(self): 3059 l = UserList(['ab', 'cd', 'ef']) 3060 buf = self.BytesIO() 3061 txt = self.TextIOWrapper(buf) 3062 txt.writelines(l) 3063 txt.flush() 3064 self.assertEqual(buf.getvalue(), b'abcdef') 3065 3066 def test_writelines_error(self): 3067 txt = self.TextIOWrapper(self.BytesIO()) 3068 self.assertRaises(TypeError, txt.writelines, [1, 2, 3]) 3069 self.assertRaises(TypeError, txt.writelines, None) 3070 self.assertRaises(TypeError, txt.writelines, b'abc') 3071 3072 def test_issue1395_1(self): 3073 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3074 3075 # read one char at a time 3076 reads = "" 3077 while True: 3078 c = txt.read(1) 3079 if not c: 3080 break 3081 reads += c 3082 self.assertEqual(reads, self.normalized) 3083 3084 def test_issue1395_2(self): 3085 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3086 txt._CHUNK_SIZE = 4 3087 3088 reads = "" 3089 while True: 3090 c = txt.read(4) 3091 if not c: 3092 break 3093 reads += c 3094 self.assertEqual(reads, self.normalized) 3095 3096 def test_issue1395_3(self): 3097 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3098 txt._CHUNK_SIZE = 4 3099 3100 reads = txt.read(4) 3101 reads += txt.read(4) 3102 reads += txt.readline() 3103 reads += txt.readline() 3104 reads += txt.readline() 3105 self.assertEqual(reads, self.normalized) 3106 3107 def test_issue1395_4(self): 3108 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3109 txt._CHUNK_SIZE = 4 3110 3111 reads = txt.read(4) 3112 reads += txt.read() 3113 self.assertEqual(reads, self.normalized) 3114 3115 def test_issue1395_5(self): 3116 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3117 txt._CHUNK_SIZE = 4 3118 3119 reads = txt.read(4) 3120 pos = txt.tell() 3121 txt.seek(0) 3122 txt.seek(pos) 3123 self.assertEqual(txt.read(4), "BBB\n") 3124 3125 def test_issue2282(self): 3126 buffer = self.BytesIO(self.testdata) 3127 txt = self.TextIOWrapper(buffer, encoding="ascii") 3128 3129 self.assertEqual(buffer.seekable(), txt.seekable()) 3130 3131 def test_append_bom(self): 3132 # The BOM is not written again when appending to a non-empty file 3133 filename = support.TESTFN 3134 for charset in ('utf-8-sig', 'utf-16', 'utf-32'): 3135 with self.open(filename, 'w', encoding=charset) as f: 3136 f.write('aaa') 3137 pos = f.tell() 3138 with self.open(filename, 'rb') as f: 3139 self.assertEqual(f.read(), 'aaa'.encode(charset)) 3140 3141 with self.open(filename, 'a', encoding=charset) as f: 3142 f.write('xxx') 3143 with self.open(filename, 'rb') as f: 3144 self.assertEqual(f.read(), 'aaaxxx'.encode(charset)) 3145 3146 def test_seek_bom(self): 3147 # Same test, but when seeking manually 3148 filename = support.TESTFN 3149 for charset in ('utf-8-sig', 'utf-16', 'utf-32'): 3150 with self.open(filename, 'w', encoding=charset) as f: 3151 f.write('aaa') 3152 pos = f.tell() 3153 with self.open(filename, 'r+', encoding=charset) as f: 3154 f.seek(pos) 3155 f.write('zzz') 3156 f.seek(0) 3157 f.write('bbb') 3158 with self.open(filename, 'rb') as f: 3159 self.assertEqual(f.read(), 'bbbzzz'.encode(charset)) 3160 3161 def test_seek_append_bom(self): 3162 # Same test, but first seek to the start and then to the end 3163 filename = support.TESTFN 3164 for charset in ('utf-8-sig', 'utf-16', 'utf-32'): 3165 with self.open(filename, 'w', encoding=charset) as f: 3166 f.write('aaa') 3167 with self.open(filename, 'a', encoding=charset) as f: 3168 f.seek(0) 3169 f.seek(0, self.SEEK_END) 3170 f.write('xxx') 3171 with self.open(filename, 'rb') as f: 3172 self.assertEqual(f.read(), 'aaaxxx'.encode(charset)) 3173 3174 def test_errors_property(self): 3175 with self.open(support.TESTFN, "w") as f: 3176 self.assertEqual(f.errors, "strict") 3177 with self.open(support.TESTFN, "w", errors="replace") as f: 3178 self.assertEqual(f.errors, "replace") 3179 3180 @support.no_tracing 3181 def test_threads_write(self): 3182 # Issue6750: concurrent writes could duplicate data 3183 event = threading.Event() 3184 with self.open(support.TESTFN, "w", buffering=1) as f: 3185 def run(n): 3186 text = "Thread%03d\n" % n 3187 event.wait() 3188 f.write(text) 3189 threads = [threading.Thread(target=run, args=(x,)) 3190 for x in range(20)] 3191 with support.start_threads(threads, event.set): 3192 time.sleep(0.02) 3193 with self.open(support.TESTFN) as f: 3194 content = f.read() 3195 for n in range(20): 3196 self.assertEqual(content.count("Thread%03d\n" % n), 1) 3197 3198 def test_flush_error_on_close(self): 3199 # Test that text file is closed despite failed flush 3200 # and that flush() is called before file closed. 3201 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3202 closed = [] 3203 def bad_flush(): 3204 closed[:] = [txt.closed, txt.buffer.closed] 3205 raise OSError() 3206 txt.flush = bad_flush 3207 self.assertRaises(OSError, txt.close) # exception not swallowed 3208 self.assertTrue(txt.closed) 3209 self.assertTrue(txt.buffer.closed) 3210 self.assertTrue(closed) # flush() called 3211 self.assertFalse(closed[0]) # flush() called before file closed 3212 self.assertFalse(closed[1]) 3213 txt.flush = lambda: None # break reference loop 3214 3215 def test_close_error_on_close(self): 3216 buffer = self.BytesIO(self.testdata) 3217 def bad_flush(): 3218 raise OSError('flush') 3219 def bad_close(): 3220 raise OSError('close') 3221 buffer.close = bad_close 3222 txt = self.TextIOWrapper(buffer, encoding="ascii") 3223 txt.flush = bad_flush 3224 with self.assertRaises(OSError) as err: # exception not swallowed 3225 txt.close() 3226 self.assertEqual(err.exception.args, ('close',)) 3227 self.assertIsInstance(err.exception.__context__, OSError) 3228 self.assertEqual(err.exception.__context__.args, ('flush',)) 3229 self.assertFalse(txt.closed) 3230 3231 def test_nonnormalized_close_error_on_close(self): 3232 # Issue #21677 3233 buffer = self.BytesIO(self.testdata) 3234 def bad_flush(): 3235 raise non_existing_flush 3236 def bad_close(): 3237 raise non_existing_close 3238 buffer.close = bad_close 3239 txt = self.TextIOWrapper(buffer, encoding="ascii") 3240 txt.flush = bad_flush 3241 with self.assertRaises(NameError) as err: # exception not swallowed 3242 txt.close() 3243 self.assertIn('non_existing_close', str(err.exception)) 3244 self.assertIsInstance(err.exception.__context__, NameError) 3245 self.assertIn('non_existing_flush', str(err.exception.__context__)) 3246 self.assertFalse(txt.closed) 3247 3248 def test_multi_close(self): 3249 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3250 txt.close() 3251 txt.close() 3252 txt.close() 3253 self.assertRaises(ValueError, txt.flush) 3254 3255 def test_unseekable(self): 3256 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata)) 3257 self.assertRaises(self.UnsupportedOperation, txt.tell) 3258 self.assertRaises(self.UnsupportedOperation, txt.seek, 0) 3259 3260 def test_readonly_attributes(self): 3261 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3262 buf = self.BytesIO(self.testdata) 3263 with self.assertRaises(AttributeError): 3264 txt.buffer = buf 3265 3266 def test_rawio(self): 3267 # Issue #12591: TextIOWrapper must work with raw I/O objects, so 3268 # that subprocess.Popen() can have the required unbuffered 3269 # semantics with universal_newlines=True. 3270 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n']) 3271 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') 3272 # Reads 3273 self.assertEqual(txt.read(4), 'abcd') 3274 self.assertEqual(txt.readline(), 'efghi\n') 3275 self.assertEqual(list(txt), ['jkl\n', 'opq\n']) 3276 3277 def test_rawio_write_through(self): 3278 # Issue #12591: with write_through=True, writes don't need a flush 3279 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n']) 3280 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n', 3281 write_through=True) 3282 txt.write('1') 3283 txt.write('23\n4') 3284 txt.write('5') 3285 self.assertEqual(b''.join(raw._write_stack), b'123\n45') 3286 3287 def test_bufio_write_through(self): 3288 # Issue #21396: write_through=True doesn't force a flush() 3289 # on the underlying binary buffered object. 3290 flush_called, write_called = [], [] 3291 class BufferedWriter(self.BufferedWriter): 3292 def flush(self, *args, **kwargs): 3293 flush_called.append(True) 3294 return super().flush(*args, **kwargs) 3295 def write(self, *args, **kwargs): 3296 write_called.append(True) 3297 return super().write(*args, **kwargs) 3298 3299 rawio = self.BytesIO() 3300 data = b"a" 3301 bufio = BufferedWriter(rawio, len(data)*2) 3302 textio = self.TextIOWrapper(bufio, encoding='ascii', 3303 write_through=True) 3304 # write to the buffered io but don't overflow the buffer 3305 text = data.decode('ascii') 3306 textio.write(text) 3307 3308 # buffer.flush is not called with write_through=True 3309 self.assertFalse(flush_called) 3310 # buffer.write *is* called with write_through=True 3311 self.assertTrue(write_called) 3312 self.assertEqual(rawio.getvalue(), b"") # no flush 3313 3314 write_called = [] # reset 3315 textio.write(text * 10) # total content is larger than bufio buffer 3316 self.assertTrue(write_called) 3317 self.assertEqual(rawio.getvalue(), data * 11) # all flushed 3318 3319 def test_reconfigure_write_through(self): 3320 raw = self.MockRawIO([]) 3321 t = self.TextIOWrapper(raw, encoding='ascii', newline='\n') 3322 t.write('1') 3323 t.reconfigure(write_through=True) # implied flush 3324 self.assertEqual(t.write_through, True) 3325 self.assertEqual(b''.join(raw._write_stack), b'1') 3326 t.write('23') 3327 self.assertEqual(b''.join(raw._write_stack), b'123') 3328 t.reconfigure(write_through=False) 3329 self.assertEqual(t.write_through, False) 3330 t.write('45') 3331 t.flush() 3332 self.assertEqual(b''.join(raw._write_stack), b'12345') 3333 # Keeping default value 3334 t.reconfigure() 3335 t.reconfigure(write_through=None) 3336 self.assertEqual(t.write_through, False) 3337 t.reconfigure(write_through=True) 3338 t.reconfigure() 3339 t.reconfigure(write_through=None) 3340 self.assertEqual(t.write_through, True) 3341 3342 def test_read_nonbytes(self): 3343 # Issue #17106 3344 # Crash when underlying read() returns non-bytes 3345 t = self.TextIOWrapper(self.StringIO('a')) 3346 self.assertRaises(TypeError, t.read, 1) 3347 t = self.TextIOWrapper(self.StringIO('a')) 3348 self.assertRaises(TypeError, t.readline) 3349 t = self.TextIOWrapper(self.StringIO('a')) 3350 self.assertRaises(TypeError, t.read) 3351 3352 def test_illegal_encoder(self): 3353 # Issue 31271: Calling write() while the return value of encoder's 3354 # encode() is invalid shouldn't cause an assertion failure. 3355 rot13 = codecs.lookup("rot13") 3356 with support.swap_attr(rot13, '_is_text_encoding', True): 3357 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13") 3358 self.assertRaises(TypeError, t.write, 'bar') 3359 3360 def test_illegal_decoder(self): 3361 # Issue #17106 3362 # Bypass the early encoding check added in issue 20404 3363 def _make_illegal_wrapper(): 3364 quopri = codecs.lookup("quopri") 3365 quopri._is_text_encoding = True 3366 try: 3367 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), 3368 newline='\n', encoding="quopri") 3369 finally: 3370 quopri._is_text_encoding = False 3371 return t 3372 # Crash when decoder returns non-string 3373 t = _make_illegal_wrapper() 3374 self.assertRaises(TypeError, t.read, 1) 3375 t = _make_illegal_wrapper() 3376 self.assertRaises(TypeError, t.readline) 3377 t = _make_illegal_wrapper() 3378 self.assertRaises(TypeError, t.read) 3379 3380 # Issue 31243: calling read() while the return value of decoder's 3381 # getstate() is invalid should neither crash the interpreter nor 3382 # raise a SystemError. 3383 def _make_very_illegal_wrapper(getstate_ret_val): 3384 class BadDecoder: 3385 def getstate(self): 3386 return getstate_ret_val 3387 def _get_bad_decoder(dummy): 3388 return BadDecoder() 3389 quopri = codecs.lookup("quopri") 3390 with support.swap_attr(quopri, 'incrementaldecoder', 3391 _get_bad_decoder): 3392 return _make_illegal_wrapper() 3393 t = _make_very_illegal_wrapper(42) 3394 self.assertRaises(TypeError, t.read, 42) 3395 t = _make_very_illegal_wrapper(()) 3396 self.assertRaises(TypeError, t.read, 42) 3397 t = _make_very_illegal_wrapper((1, 2)) 3398 self.assertRaises(TypeError, t.read, 42) 3399 3400 def _check_create_at_shutdown(self, **kwargs): 3401 # Issue #20037: creating a TextIOWrapper at shutdown 3402 # shouldn't crash the interpreter. 3403 iomod = self.io.__name__ 3404 code = """if 1: 3405 import codecs 3406 import {iomod} as io 3407 3408 # Avoid looking up codecs at shutdown 3409 codecs.lookup('utf-8') 3410 3411 class C: 3412 def __init__(self): 3413 self.buf = io.BytesIO() 3414 def __del__(self): 3415 io.TextIOWrapper(self.buf, **{kwargs}) 3416 print("ok") 3417 c = C() 3418 """.format(iomod=iomod, kwargs=kwargs) 3419 return assert_python_ok("-c", code) 3420 3421 @support.requires_type_collecting 3422 def test_create_at_shutdown_without_encoding(self): 3423 rc, out, err = self._check_create_at_shutdown() 3424 if err: 3425 # Can error out with a RuntimeError if the module state 3426 # isn't found. 3427 self.assertIn(self.shutdown_error, err.decode()) 3428 else: 3429 self.assertEqual("ok", out.decode().strip()) 3430 3431 @support.requires_type_collecting 3432 def test_create_at_shutdown_with_encoding(self): 3433 rc, out, err = self._check_create_at_shutdown(encoding='utf-8', 3434 errors='strict') 3435 self.assertFalse(err) 3436 self.assertEqual("ok", out.decode().strip()) 3437 3438 def test_read_byteslike(self): 3439 r = MemviewBytesIO(b'Just some random string\n') 3440 t = self.TextIOWrapper(r, 'utf-8') 3441 3442 # TextIOwrapper will not read the full string, because 3443 # we truncate it to a multiple of the native int size 3444 # so that we can construct a more complex memoryview. 3445 bytes_val = _to_memoryview(r.getvalue()).tobytes() 3446 3447 self.assertEqual(t.read(200), bytes_val.decode('utf-8')) 3448 3449 def test_issue22849(self): 3450 class F(object): 3451 def readable(self): return True 3452 def writable(self): return True 3453 def seekable(self): return True 3454 3455 for i in range(10): 3456 try: 3457 self.TextIOWrapper(F(), encoding='utf-8') 3458 except Exception: 3459 pass 3460 3461 F.tell = lambda x: 0 3462 t = self.TextIOWrapper(F(), encoding='utf-8') 3463 3464 def test_reconfigure_encoding_read(self): 3465 # latin1 -> utf8 3466 # (latin1 can decode utf-8 encoded string) 3467 data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8') 3468 raw = self.BytesIO(data) 3469 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n') 3470 self.assertEqual(txt.readline(), 'abc\xe9\n') 3471 with self.assertRaises(self.UnsupportedOperation): 3472 txt.reconfigure(encoding='utf-8') 3473 with self.assertRaises(self.UnsupportedOperation): 3474 txt.reconfigure(newline=None) 3475 3476 def test_reconfigure_write_fromascii(self): 3477 # ascii has a specific encodefunc in the C implementation, 3478 # but utf-8-sig has not. Make sure that we get rid of the 3479 # cached encodefunc when we switch encoders. 3480 raw = self.BytesIO() 3481 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') 3482 txt.write('foo\n') 3483 txt.reconfigure(encoding='utf-8-sig') 3484 txt.write('\xe9\n') 3485 txt.flush() 3486 self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n') 3487 3488 def test_reconfigure_write(self): 3489 # latin -> utf8 3490 raw = self.BytesIO() 3491 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n') 3492 txt.write('abc\xe9\n') 3493 txt.reconfigure(encoding='utf-8') 3494 self.assertEqual(raw.getvalue(), b'abc\xe9\n') 3495 txt.write('d\xe9f\n') 3496 txt.flush() 3497 self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n') 3498 3499 # ascii -> utf-8-sig: ensure that no BOM is written in the middle of 3500 # the file 3501 raw = self.BytesIO() 3502 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') 3503 txt.write('abc\n') 3504 txt.reconfigure(encoding='utf-8-sig') 3505 txt.write('d\xe9f\n') 3506 txt.flush() 3507 self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n') 3508 3509 def test_reconfigure_write_non_seekable(self): 3510 raw = self.BytesIO() 3511 raw.seekable = lambda: False 3512 raw.seek = None 3513 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') 3514 txt.write('abc\n') 3515 txt.reconfigure(encoding='utf-8-sig') 3516 txt.write('d\xe9f\n') 3517 txt.flush() 3518 3519 # If the raw stream is not seekable, there'll be a BOM 3520 self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n') 3521 3522 def test_reconfigure_defaults(self): 3523 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n') 3524 txt.reconfigure(encoding=None) 3525 self.assertEqual(txt.encoding, 'ascii') 3526 self.assertEqual(txt.errors, 'replace') 3527 txt.write('LF\n') 3528 3529 txt.reconfigure(newline='\r\n') 3530 self.assertEqual(txt.encoding, 'ascii') 3531 self.assertEqual(txt.errors, 'replace') 3532 3533 txt.reconfigure(errors='ignore') 3534 self.assertEqual(txt.encoding, 'ascii') 3535 self.assertEqual(txt.errors, 'ignore') 3536 txt.write('CRLF\n') 3537 3538 txt.reconfigure(encoding='utf-8', newline=None) 3539 self.assertEqual(txt.errors, 'strict') 3540 txt.seek(0) 3541 self.assertEqual(txt.read(), 'LF\nCRLF\n') 3542 3543 self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n') 3544 3545 def test_reconfigure_newline(self): 3546 raw = self.BytesIO(b'CR\rEOF') 3547 txt = self.TextIOWrapper(raw, 'ascii', newline='\n') 3548 txt.reconfigure(newline=None) 3549 self.assertEqual(txt.readline(), 'CR\n') 3550 raw = self.BytesIO(b'CR\rEOF') 3551 txt = self.TextIOWrapper(raw, 'ascii', newline='\n') 3552 txt.reconfigure(newline='') 3553 self.assertEqual(txt.readline(), 'CR\r') 3554 raw = self.BytesIO(b'CR\rLF\nEOF') 3555 txt = self.TextIOWrapper(raw, 'ascii', newline='\r') 3556 txt.reconfigure(newline='\n') 3557 self.assertEqual(txt.readline(), 'CR\rLF\n') 3558 raw = self.BytesIO(b'LF\nCR\rEOF') 3559 txt = self.TextIOWrapper(raw, 'ascii', newline='\n') 3560 txt.reconfigure(newline='\r') 3561 self.assertEqual(txt.readline(), 'LF\nCR\r') 3562 raw = self.BytesIO(b'CR\rCRLF\r\nEOF') 3563 txt = self.TextIOWrapper(raw, 'ascii', newline='\r') 3564 txt.reconfigure(newline='\r\n') 3565 self.assertEqual(txt.readline(), 'CR\rCRLF\r\n') 3566 3567 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r') 3568 txt.reconfigure(newline=None) 3569 txt.write('linesep\n') 3570 txt.reconfigure(newline='') 3571 txt.write('LF\n') 3572 txt.reconfigure(newline='\n') 3573 txt.write('LF\n') 3574 txt.reconfigure(newline='\r') 3575 txt.write('CR\n') 3576 txt.reconfigure(newline='\r\n') 3577 txt.write('CRLF\n') 3578 expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n' 3579 self.assertEqual(txt.detach().getvalue().decode('ascii'), expected) 3580 3581 def test_issue25862(self): 3582 # Assertion failures occurred in tell() after read() and write(). 3583 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii') 3584 t.read(1) 3585 t.read() 3586 t.tell() 3587 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii') 3588 t.read(1) 3589 t.write('x') 3590 t.tell() 3591 3592 3593class MemviewBytesIO(io.BytesIO): 3594 '''A BytesIO object whose read method returns memoryviews 3595 rather than bytes''' 3596 3597 def read1(self, len_): 3598 return _to_memoryview(super().read1(len_)) 3599 3600 def read(self, len_): 3601 return _to_memoryview(super().read(len_)) 3602 3603def _to_memoryview(buf): 3604 '''Convert bytes-object *buf* to a non-trivial memoryview''' 3605 3606 arr = array.array('i') 3607 idx = len(buf) - len(buf) % arr.itemsize 3608 arr.frombytes(buf[:idx]) 3609 return memoryview(arr) 3610 3611 3612class CTextIOWrapperTest(TextIOWrapperTest): 3613 io = io 3614 shutdown_error = "RuntimeError: could not find io module state" 3615 3616 def test_initialization(self): 3617 r = self.BytesIO(b"\xc3\xa9\n\n") 3618 b = self.BufferedReader(r, 1000) 3619 t = self.TextIOWrapper(b) 3620 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy') 3621 self.assertRaises(ValueError, t.read) 3622 3623 t = self.TextIOWrapper.__new__(self.TextIOWrapper) 3624 self.assertRaises(Exception, repr, t) 3625 3626 def test_garbage_collection(self): 3627 # C TextIOWrapper objects are collected, and collecting them flushes 3628 # all data to disk. 3629 # The Python version has __del__, so it ends in gc.garbage instead. 3630 with support.check_warnings(('', ResourceWarning)): 3631 rawio = io.FileIO(support.TESTFN, "wb") 3632 b = self.BufferedWriter(rawio) 3633 t = self.TextIOWrapper(b, encoding="ascii") 3634 t.write("456def") 3635 t.x = t 3636 wr = weakref.ref(t) 3637 del t 3638 support.gc_collect() 3639 self.assertIsNone(wr(), wr) 3640 with self.open(support.TESTFN, "rb") as f: 3641 self.assertEqual(f.read(), b"456def") 3642 3643 def test_rwpair_cleared_before_textio(self): 3644 # Issue 13070: TextIOWrapper's finalization would crash when called 3645 # after the reference to the underlying BufferedRWPair's writer got 3646 # cleared by the GC. 3647 for i in range(1000): 3648 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()) 3649 t1 = self.TextIOWrapper(b1, encoding="ascii") 3650 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()) 3651 t2 = self.TextIOWrapper(b2, encoding="ascii") 3652 # circular references 3653 t1.buddy = t2 3654 t2.buddy = t1 3655 support.gc_collect() 3656 3657 def test_del__CHUNK_SIZE_SystemError(self): 3658 t = self.TextIOWrapper(self.BytesIO(), encoding='ascii') 3659 with self.assertRaises(AttributeError): 3660 del t._CHUNK_SIZE 3661 3662 3663class PyTextIOWrapperTest(TextIOWrapperTest): 3664 io = pyio 3665 shutdown_error = "LookupError: unknown encoding: ascii" 3666 3667 3668class IncrementalNewlineDecoderTest(unittest.TestCase): 3669 3670 def check_newline_decoding_utf8(self, decoder): 3671 # UTF-8 specific tests for a newline decoder 3672 def _check_decode(b, s, **kwargs): 3673 # We exercise getstate() / setstate() as well as decode() 3674 state = decoder.getstate() 3675 self.assertEqual(decoder.decode(b, **kwargs), s) 3676 decoder.setstate(state) 3677 self.assertEqual(decoder.decode(b, **kwargs), s) 3678 3679 _check_decode(b'\xe8\xa2\x88', "\u8888") 3680 3681 _check_decode(b'\xe8', "") 3682 _check_decode(b'\xa2', "") 3683 _check_decode(b'\x88', "\u8888") 3684 3685 _check_decode(b'\xe8', "") 3686 _check_decode(b'\xa2', "") 3687 _check_decode(b'\x88', "\u8888") 3688 3689 _check_decode(b'\xe8', "") 3690 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True) 3691 3692 decoder.reset() 3693 _check_decode(b'\n', "\n") 3694 _check_decode(b'\r', "") 3695 _check_decode(b'', "\n", final=True) 3696 _check_decode(b'\r', "\n", final=True) 3697 3698 _check_decode(b'\r', "") 3699 _check_decode(b'a', "\na") 3700 3701 _check_decode(b'\r\r\n', "\n\n") 3702 _check_decode(b'\r', "") 3703 _check_decode(b'\r', "\n") 3704 _check_decode(b'\na', "\na") 3705 3706 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n") 3707 _check_decode(b'\xe8\xa2\x88', "\u8888") 3708 _check_decode(b'\n', "\n") 3709 _check_decode(b'\xe8\xa2\x88\r', "\u8888") 3710 _check_decode(b'\n', "\n") 3711 3712 def check_newline_decoding(self, decoder, encoding): 3713 result = [] 3714 if encoding is not None: 3715 encoder = codecs.getincrementalencoder(encoding)() 3716 def _decode_bytewise(s): 3717 # Decode one byte at a time 3718 for b in encoder.encode(s): 3719 result.append(decoder.decode(bytes([b]))) 3720 else: 3721 encoder = None 3722 def _decode_bytewise(s): 3723 # Decode one char at a time 3724 for c in s: 3725 result.append(decoder.decode(c)) 3726 self.assertEqual(decoder.newlines, None) 3727 _decode_bytewise("abc\n\r") 3728 self.assertEqual(decoder.newlines, '\n') 3729 _decode_bytewise("\nabc") 3730 self.assertEqual(decoder.newlines, ('\n', '\r\n')) 3731 _decode_bytewise("abc\r") 3732 self.assertEqual(decoder.newlines, ('\n', '\r\n')) 3733 _decode_bytewise("abc") 3734 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n')) 3735 _decode_bytewise("abc\r") 3736 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc") 3737 decoder.reset() 3738 input = "abc" 3739 if encoder is not None: 3740 encoder.reset() 3741 input = encoder.encode(input) 3742 self.assertEqual(decoder.decode(input), "abc") 3743 self.assertEqual(decoder.newlines, None) 3744 3745 def test_newline_decoder(self): 3746 encodings = ( 3747 # None meaning the IncrementalNewlineDecoder takes unicode input 3748 # rather than bytes input 3749 None, 'utf-8', 'latin-1', 3750 'utf-16', 'utf-16-le', 'utf-16-be', 3751 'utf-32', 'utf-32-le', 'utf-32-be', 3752 ) 3753 for enc in encodings: 3754 decoder = enc and codecs.getincrementaldecoder(enc)() 3755 decoder = self.IncrementalNewlineDecoder(decoder, translate=True) 3756 self.check_newline_decoding(decoder, enc) 3757 decoder = codecs.getincrementaldecoder("utf-8")() 3758 decoder = self.IncrementalNewlineDecoder(decoder, translate=True) 3759 self.check_newline_decoding_utf8(decoder) 3760 self.assertRaises(TypeError, decoder.setstate, 42) 3761 3762 def test_newline_bytes(self): 3763 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder 3764 def _check(dec): 3765 self.assertEqual(dec.newlines, None) 3766 self.assertEqual(dec.decode("\u0D00"), "\u0D00") 3767 self.assertEqual(dec.newlines, None) 3768 self.assertEqual(dec.decode("\u0A00"), "\u0A00") 3769 self.assertEqual(dec.newlines, None) 3770 dec = self.IncrementalNewlineDecoder(None, translate=False) 3771 _check(dec) 3772 dec = self.IncrementalNewlineDecoder(None, translate=True) 3773 _check(dec) 3774 3775 def test_translate(self): 3776 # issue 35062 3777 for translate in (-2, -1, 1, 2): 3778 decoder = codecs.getincrementaldecoder("utf-8")() 3779 decoder = self.IncrementalNewlineDecoder(decoder, translate) 3780 self.check_newline_decoding_utf8(decoder) 3781 decoder = codecs.getincrementaldecoder("utf-8")() 3782 decoder = self.IncrementalNewlineDecoder(decoder, translate=0) 3783 self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n") 3784 3785class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest): 3786 pass 3787 3788class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest): 3789 pass 3790 3791 3792# XXX Tests for open() 3793 3794class MiscIOTest(unittest.TestCase): 3795 3796 def tearDown(self): 3797 support.unlink(support.TESTFN) 3798 3799 def test___all__(self): 3800 for name in self.io.__all__: 3801 obj = getattr(self.io, name, None) 3802 self.assertIsNotNone(obj, name) 3803 if name == "open": 3804 continue 3805 elif "error" in name.lower() or name == "UnsupportedOperation": 3806 self.assertTrue(issubclass(obj, Exception), name) 3807 elif not name.startswith("SEEK_"): 3808 self.assertTrue(issubclass(obj, self.IOBase)) 3809 3810 def test_attributes(self): 3811 f = self.open(support.TESTFN, "wb", buffering=0) 3812 self.assertEqual(f.mode, "wb") 3813 f.close() 3814 3815 with support.check_warnings(('', DeprecationWarning)): 3816 f = self.open(support.TESTFN, "U") 3817 self.assertEqual(f.name, support.TESTFN) 3818 self.assertEqual(f.buffer.name, support.TESTFN) 3819 self.assertEqual(f.buffer.raw.name, support.TESTFN) 3820 self.assertEqual(f.mode, "U") 3821 self.assertEqual(f.buffer.mode, "rb") 3822 self.assertEqual(f.buffer.raw.mode, "rb") 3823 f.close() 3824 3825 f = self.open(support.TESTFN, "w+") 3826 self.assertEqual(f.mode, "w+") 3827 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter? 3828 self.assertEqual(f.buffer.raw.mode, "rb+") 3829 3830 g = self.open(f.fileno(), "wb", closefd=False) 3831 self.assertEqual(g.mode, "wb") 3832 self.assertEqual(g.raw.mode, "wb") 3833 self.assertEqual(g.name, f.fileno()) 3834 self.assertEqual(g.raw.name, f.fileno()) 3835 f.close() 3836 g.close() 3837 3838 def test_open_pipe_with_append(self): 3839 # bpo-27805: Ignore ESPIPE from lseek() in open(). 3840 r, w = os.pipe() 3841 self.addCleanup(os.close, r) 3842 f = self.open(w, 'a') 3843 self.addCleanup(f.close) 3844 # Check that the file is marked non-seekable. On Windows, however, lseek 3845 # somehow succeeds on pipes. 3846 if sys.platform != 'win32': 3847 self.assertFalse(f.seekable()) 3848 3849 def test_io_after_close(self): 3850 for kwargs in [ 3851 {"mode": "w"}, 3852 {"mode": "wb"}, 3853 {"mode": "w", "buffering": 1}, 3854 {"mode": "w", "buffering": 2}, 3855 {"mode": "wb", "buffering": 0}, 3856 {"mode": "r"}, 3857 {"mode": "rb"}, 3858 {"mode": "r", "buffering": 1}, 3859 {"mode": "r", "buffering": 2}, 3860 {"mode": "rb", "buffering": 0}, 3861 {"mode": "w+"}, 3862 {"mode": "w+b"}, 3863 {"mode": "w+", "buffering": 1}, 3864 {"mode": "w+", "buffering": 2}, 3865 {"mode": "w+b", "buffering": 0}, 3866 ]: 3867 f = self.open(support.TESTFN, **kwargs) 3868 f.close() 3869 self.assertRaises(ValueError, f.flush) 3870 self.assertRaises(ValueError, f.fileno) 3871 self.assertRaises(ValueError, f.isatty) 3872 self.assertRaises(ValueError, f.__iter__) 3873 if hasattr(f, "peek"): 3874 self.assertRaises(ValueError, f.peek, 1) 3875 self.assertRaises(ValueError, f.read) 3876 if hasattr(f, "read1"): 3877 self.assertRaises(ValueError, f.read1, 1024) 3878 self.assertRaises(ValueError, f.read1) 3879 if hasattr(f, "readall"): 3880 self.assertRaises(ValueError, f.readall) 3881 if hasattr(f, "readinto"): 3882 self.assertRaises(ValueError, f.readinto, bytearray(1024)) 3883 if hasattr(f, "readinto1"): 3884 self.assertRaises(ValueError, f.readinto1, bytearray(1024)) 3885 self.assertRaises(ValueError, f.readline) 3886 self.assertRaises(ValueError, f.readlines) 3887 self.assertRaises(ValueError, f.readlines, 1) 3888 self.assertRaises(ValueError, f.seek, 0) 3889 self.assertRaises(ValueError, f.tell) 3890 self.assertRaises(ValueError, f.truncate) 3891 self.assertRaises(ValueError, f.write, 3892 b"" if "b" in kwargs['mode'] else "") 3893 self.assertRaises(ValueError, f.writelines, []) 3894 self.assertRaises(ValueError, next, f) 3895 3896 def test_blockingioerror(self): 3897 # Various BlockingIOError issues 3898 class C(str): 3899 pass 3900 c = C("") 3901 b = self.BlockingIOError(1, c) 3902 c.b = b 3903 b.c = c 3904 wr = weakref.ref(c) 3905 del c, b 3906 support.gc_collect() 3907 self.assertIsNone(wr(), wr) 3908 3909 def test_abcs(self): 3910 # Test the visible base classes are ABCs. 3911 self.assertIsInstance(self.IOBase, abc.ABCMeta) 3912 self.assertIsInstance(self.RawIOBase, abc.ABCMeta) 3913 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta) 3914 self.assertIsInstance(self.TextIOBase, abc.ABCMeta) 3915 3916 def _check_abc_inheritance(self, abcmodule): 3917 with self.open(support.TESTFN, "wb", buffering=0) as f: 3918 self.assertIsInstance(f, abcmodule.IOBase) 3919 self.assertIsInstance(f, abcmodule.RawIOBase) 3920 self.assertNotIsInstance(f, abcmodule.BufferedIOBase) 3921 self.assertNotIsInstance(f, abcmodule.TextIOBase) 3922 with self.open(support.TESTFN, "wb") as f: 3923 self.assertIsInstance(f, abcmodule.IOBase) 3924 self.assertNotIsInstance(f, abcmodule.RawIOBase) 3925 self.assertIsInstance(f, abcmodule.BufferedIOBase) 3926 self.assertNotIsInstance(f, abcmodule.TextIOBase) 3927 with self.open(support.TESTFN, "w") as f: 3928 self.assertIsInstance(f, abcmodule.IOBase) 3929 self.assertNotIsInstance(f, abcmodule.RawIOBase) 3930 self.assertNotIsInstance(f, abcmodule.BufferedIOBase) 3931 self.assertIsInstance(f, abcmodule.TextIOBase) 3932 3933 def test_abc_inheritance(self): 3934 # Test implementations inherit from their respective ABCs 3935 self._check_abc_inheritance(self) 3936 3937 def test_abc_inheritance_official(self): 3938 # Test implementations inherit from the official ABCs of the 3939 # baseline "io" module. 3940 self._check_abc_inheritance(io) 3941 3942 def _check_warn_on_dealloc(self, *args, **kwargs): 3943 f = open(*args, **kwargs) 3944 r = repr(f) 3945 with self.assertWarns(ResourceWarning) as cm: 3946 f = None 3947 support.gc_collect() 3948 self.assertIn(r, str(cm.warning.args[0])) 3949 3950 def test_warn_on_dealloc(self): 3951 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0) 3952 self._check_warn_on_dealloc(support.TESTFN, "wb") 3953 self._check_warn_on_dealloc(support.TESTFN, "w") 3954 3955 def _check_warn_on_dealloc_fd(self, *args, **kwargs): 3956 fds = [] 3957 def cleanup_fds(): 3958 for fd in fds: 3959 try: 3960 os.close(fd) 3961 except OSError as e: 3962 if e.errno != errno.EBADF: 3963 raise 3964 self.addCleanup(cleanup_fds) 3965 r, w = os.pipe() 3966 fds += r, w 3967 self._check_warn_on_dealloc(r, *args, **kwargs) 3968 # When using closefd=False, there's no warning 3969 r, w = os.pipe() 3970 fds += r, w 3971 with support.check_no_resource_warning(self): 3972 open(r, *args, closefd=False, **kwargs) 3973 3974 def test_warn_on_dealloc_fd(self): 3975 self._check_warn_on_dealloc_fd("rb", buffering=0) 3976 self._check_warn_on_dealloc_fd("rb") 3977 self._check_warn_on_dealloc_fd("r") 3978 3979 3980 def test_pickling(self): 3981 # Pickling file objects is forbidden 3982 for kwargs in [ 3983 {"mode": "w"}, 3984 {"mode": "wb"}, 3985 {"mode": "wb", "buffering": 0}, 3986 {"mode": "r"}, 3987 {"mode": "rb"}, 3988 {"mode": "rb", "buffering": 0}, 3989 {"mode": "w+"}, 3990 {"mode": "w+b"}, 3991 {"mode": "w+b", "buffering": 0}, 3992 ]: 3993 for protocol in range(pickle.HIGHEST_PROTOCOL + 1): 3994 with self.open(support.TESTFN, **kwargs) as f: 3995 self.assertRaises(TypeError, pickle.dumps, f, protocol) 3996 3997 def test_nonblock_pipe_write_bigbuf(self): 3998 self._test_nonblock_pipe_write(16*1024) 3999 4000 def test_nonblock_pipe_write_smallbuf(self): 4001 self._test_nonblock_pipe_write(1024) 4002 4003 @unittest.skipUnless(hasattr(os, 'set_blocking'), 4004 'os.set_blocking() required for this test') 4005 def _test_nonblock_pipe_write(self, bufsize): 4006 sent = [] 4007 received = [] 4008 r, w = os.pipe() 4009 os.set_blocking(r, False) 4010 os.set_blocking(w, False) 4011 4012 # To exercise all code paths in the C implementation we need 4013 # to play with buffer sizes. For instance, if we choose a 4014 # buffer size less than or equal to _PIPE_BUF (4096 on Linux) 4015 # then we will never get a partial write of the buffer. 4016 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize) 4017 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize) 4018 4019 with rf, wf: 4020 for N in 9999, 73, 7574: 4021 try: 4022 i = 0 4023 while True: 4024 msg = bytes([i % 26 + 97]) * N 4025 sent.append(msg) 4026 wf.write(msg) 4027 i += 1 4028 4029 except self.BlockingIOError as e: 4030 self.assertEqual(e.args[0], errno.EAGAIN) 4031 self.assertEqual(e.args[2], e.characters_written) 4032 sent[-1] = sent[-1][:e.characters_written] 4033 received.append(rf.read()) 4034 msg = b'BLOCKED' 4035 wf.write(msg) 4036 sent.append(msg) 4037 4038 while True: 4039 try: 4040 wf.flush() 4041 break 4042 except self.BlockingIOError as e: 4043 self.assertEqual(e.args[0], errno.EAGAIN) 4044 self.assertEqual(e.args[2], e.characters_written) 4045 self.assertEqual(e.characters_written, 0) 4046 received.append(rf.read()) 4047 4048 received += iter(rf.read, None) 4049 4050 sent, received = b''.join(sent), b''.join(received) 4051 self.assertEqual(sent, received) 4052 self.assertTrue(wf.closed) 4053 self.assertTrue(rf.closed) 4054 4055 def test_create_fail(self): 4056 # 'x' mode fails if file is existing 4057 with self.open(support.TESTFN, 'w'): 4058 pass 4059 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x') 4060 4061 def test_create_writes(self): 4062 # 'x' mode opens for writing 4063 with self.open(support.TESTFN, 'xb') as f: 4064 f.write(b"spam") 4065 with self.open(support.TESTFN, 'rb') as f: 4066 self.assertEqual(b"spam", f.read()) 4067 4068 def test_open_allargs(self): 4069 # there used to be a buffer overflow in the parser for rawmode 4070 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+') 4071 4072 4073class CMiscIOTest(MiscIOTest): 4074 io = io 4075 4076 def test_readinto_buffer_overflow(self): 4077 # Issue #18025 4078 class BadReader(self.io.BufferedIOBase): 4079 def read(self, n=-1): 4080 return b'x' * 10**6 4081 bufio = BadReader() 4082 b = bytearray(2) 4083 self.assertRaises(ValueError, bufio.readinto, b) 4084 4085 def check_daemon_threads_shutdown_deadlock(self, stream_name): 4086 # Issue #23309: deadlocks at shutdown should be avoided when a 4087 # daemon thread and the main thread both write to a file. 4088 code = """if 1: 4089 import sys 4090 import time 4091 import threading 4092 from test.support import SuppressCrashReport 4093 4094 file = sys.{stream_name} 4095 4096 def run(): 4097 while True: 4098 file.write('.') 4099 file.flush() 4100 4101 crash = SuppressCrashReport() 4102 crash.__enter__() 4103 # don't call __exit__(): the crash occurs at Python shutdown 4104 4105 thread = threading.Thread(target=run) 4106 thread.daemon = True 4107 thread.start() 4108 4109 time.sleep(0.5) 4110 file.write('!') 4111 file.flush() 4112 """.format_map(locals()) 4113 res, _ = run_python_until_end("-c", code) 4114 err = res.err.decode() 4115 if res.rc != 0: 4116 # Failure: should be a fatal error 4117 pattern = (r"Fatal Python error: could not acquire lock " 4118 r"for <(_io\.)?BufferedWriter name='<{stream_name}>'> " 4119 r"at interpreter shutdown, possibly due to " 4120 r"daemon threads".format_map(locals())) 4121 self.assertRegex(err, pattern) 4122 else: 4123 self.assertFalse(err.strip('.!')) 4124 4125 def test_daemon_threads_shutdown_stdout_deadlock(self): 4126 self.check_daemon_threads_shutdown_deadlock('stdout') 4127 4128 def test_daemon_threads_shutdown_stderr_deadlock(self): 4129 self.check_daemon_threads_shutdown_deadlock('stderr') 4130 4131 4132class PyMiscIOTest(MiscIOTest): 4133 io = pyio 4134 4135 4136@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.') 4137class SignalsTest(unittest.TestCase): 4138 4139 def setUp(self): 4140 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt) 4141 4142 def tearDown(self): 4143 signal.signal(signal.SIGALRM, self.oldalrm) 4144 4145 def alarm_interrupt(self, sig, frame): 4146 1/0 4147 4148 def check_interrupted_write(self, item, bytes, **fdopen_kwargs): 4149 """Check that a partial write, when it gets interrupted, properly 4150 invokes the signal handler, and bubbles up the exception raised 4151 in the latter.""" 4152 read_results = [] 4153 def _read(): 4154 s = os.read(r, 1) 4155 read_results.append(s) 4156 4157 t = threading.Thread(target=_read) 4158 t.daemon = True 4159 r, w = os.pipe() 4160 fdopen_kwargs["closefd"] = False 4161 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1) 4162 try: 4163 wio = self.io.open(w, **fdopen_kwargs) 4164 if hasattr(signal, 'pthread_sigmask'): 4165 # create the thread with SIGALRM signal blocked 4166 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM]) 4167 t.start() 4168 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGALRM]) 4169 else: 4170 t.start() 4171 4172 # Fill the pipe enough that the write will be blocking. 4173 # It will be interrupted by the timer armed above. Since the 4174 # other thread has read one byte, the low-level write will 4175 # return with a successful (partial) result rather than an EINTR. 4176 # The buffered IO layer must check for pending signal 4177 # handlers, which in this case will invoke alarm_interrupt(). 4178 signal.alarm(1) 4179 try: 4180 self.assertRaises(ZeroDivisionError, wio.write, large_data) 4181 finally: 4182 signal.alarm(0) 4183 t.join() 4184 # We got one byte, get another one and check that it isn't a 4185 # repeat of the first one. 4186 read_results.append(os.read(r, 1)) 4187 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]]) 4188 finally: 4189 os.close(w) 4190 os.close(r) 4191 # This is deliberate. If we didn't close the file descriptor 4192 # before closing wio, wio would try to flush its internal 4193 # buffer, and block again. 4194 try: 4195 wio.close() 4196 except OSError as e: 4197 if e.errno != errno.EBADF: 4198 raise 4199 4200 def test_interrupted_write_unbuffered(self): 4201 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0) 4202 4203 def test_interrupted_write_buffered(self): 4204 self.check_interrupted_write(b"xy", b"xy", mode="wb") 4205 4206 def test_interrupted_write_text(self): 4207 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii") 4208 4209 @support.no_tracing 4210 def check_reentrant_write(self, data, **fdopen_kwargs): 4211 def on_alarm(*args): 4212 # Will be called reentrantly from the same thread 4213 wio.write(data) 4214 1/0 4215 signal.signal(signal.SIGALRM, on_alarm) 4216 r, w = os.pipe() 4217 wio = self.io.open(w, **fdopen_kwargs) 4218 try: 4219 signal.alarm(1) 4220 # Either the reentrant call to wio.write() fails with RuntimeError, 4221 # or the signal handler raises ZeroDivisionError. 4222 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm: 4223 while 1: 4224 for i in range(100): 4225 wio.write(data) 4226 wio.flush() 4227 # Make sure the buffer doesn't fill up and block further writes 4228 os.read(r, len(data) * 100) 4229 exc = cm.exception 4230 if isinstance(exc, RuntimeError): 4231 self.assertTrue(str(exc).startswith("reentrant call"), str(exc)) 4232 finally: 4233 signal.alarm(0) 4234 wio.close() 4235 os.close(r) 4236 4237 def test_reentrant_write_buffered(self): 4238 self.check_reentrant_write(b"xy", mode="wb") 4239 4240 def test_reentrant_write_text(self): 4241 self.check_reentrant_write("xy", mode="w", encoding="ascii") 4242 4243 def check_interrupted_read_retry(self, decode, **fdopen_kwargs): 4244 """Check that a buffered read, when it gets interrupted (either 4245 returning a partial result or EINTR), properly invokes the signal 4246 handler and retries if the latter returned successfully.""" 4247 r, w = os.pipe() 4248 fdopen_kwargs["closefd"] = False 4249 def alarm_handler(sig, frame): 4250 os.write(w, b"bar") 4251 signal.signal(signal.SIGALRM, alarm_handler) 4252 try: 4253 rio = self.io.open(r, **fdopen_kwargs) 4254 os.write(w, b"foo") 4255 signal.alarm(1) 4256 # Expected behaviour: 4257 # - first raw read() returns partial b"foo" 4258 # - second raw read() returns EINTR 4259 # - third raw read() returns b"bar" 4260 self.assertEqual(decode(rio.read(6)), "foobar") 4261 finally: 4262 signal.alarm(0) 4263 rio.close() 4264 os.close(w) 4265 os.close(r) 4266 4267 def test_interrupted_read_retry_buffered(self): 4268 self.check_interrupted_read_retry(lambda x: x.decode('latin1'), 4269 mode="rb") 4270 4271 def test_interrupted_read_retry_text(self): 4272 self.check_interrupted_read_retry(lambda x: x, 4273 mode="r") 4274 4275 def check_interrupted_write_retry(self, item, **fdopen_kwargs): 4276 """Check that a buffered write, when it gets interrupted (either 4277 returning a partial result or EINTR), properly invokes the signal 4278 handler and retries if the latter returned successfully.""" 4279 select = support.import_module("select") 4280 4281 # A quantity that exceeds the buffer size of an anonymous pipe's 4282 # write end. 4283 N = support.PIPE_MAX_SIZE 4284 r, w = os.pipe() 4285 fdopen_kwargs["closefd"] = False 4286 4287 # We need a separate thread to read from the pipe and allow the 4288 # write() to finish. This thread is started after the SIGALRM is 4289 # received (forcing a first EINTR in write()). 4290 read_results = [] 4291 write_finished = False 4292 error = None 4293 def _read(): 4294 try: 4295 while not write_finished: 4296 while r in select.select([r], [], [], 1.0)[0]: 4297 s = os.read(r, 1024) 4298 read_results.append(s) 4299 except BaseException as exc: 4300 nonlocal error 4301 error = exc 4302 t = threading.Thread(target=_read) 4303 t.daemon = True 4304 def alarm1(sig, frame): 4305 signal.signal(signal.SIGALRM, alarm2) 4306 signal.alarm(1) 4307 def alarm2(sig, frame): 4308 t.start() 4309 4310 large_data = item * N 4311 signal.signal(signal.SIGALRM, alarm1) 4312 try: 4313 wio = self.io.open(w, **fdopen_kwargs) 4314 signal.alarm(1) 4315 # Expected behaviour: 4316 # - first raw write() is partial (because of the limited pipe buffer 4317 # and the first alarm) 4318 # - second raw write() returns EINTR (because of the second alarm) 4319 # - subsequent write()s are successful (either partial or complete) 4320 written = wio.write(large_data) 4321 self.assertEqual(N, written) 4322 4323 wio.flush() 4324 write_finished = True 4325 t.join() 4326 4327 self.assertIsNone(error) 4328 self.assertEqual(N, sum(len(x) for x in read_results)) 4329 finally: 4330 signal.alarm(0) 4331 write_finished = True 4332 os.close(w) 4333 os.close(r) 4334 # This is deliberate. If we didn't close the file descriptor 4335 # before closing wio, wio would try to flush its internal 4336 # buffer, and could block (in case of failure). 4337 try: 4338 wio.close() 4339 except OSError as e: 4340 if e.errno != errno.EBADF: 4341 raise 4342 4343 def test_interrupted_write_retry_buffered(self): 4344 self.check_interrupted_write_retry(b"x", mode="wb") 4345 4346 def test_interrupted_write_retry_text(self): 4347 self.check_interrupted_write_retry("x", mode="w", encoding="latin1") 4348 4349 4350class CSignalsTest(SignalsTest): 4351 io = io 4352 4353class PySignalsTest(SignalsTest): 4354 io = pyio 4355 4356 # Handling reentrancy issues would slow down _pyio even more, so the 4357 # tests are disabled. 4358 test_reentrant_write_buffered = None 4359 test_reentrant_write_text = None 4360 4361 4362def load_tests(*args): 4363 tests = (CIOTest, PyIOTest, APIMismatchTest, 4364 CBufferedReaderTest, PyBufferedReaderTest, 4365 CBufferedWriterTest, PyBufferedWriterTest, 4366 CBufferedRWPairTest, PyBufferedRWPairTest, 4367 CBufferedRandomTest, PyBufferedRandomTest, 4368 StatefulIncrementalDecoderTest, 4369 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest, 4370 CTextIOWrapperTest, PyTextIOWrapperTest, 4371 CMiscIOTest, PyMiscIOTest, 4372 CSignalsTest, PySignalsTest, 4373 ) 4374 4375 # Put the namespaces of the IO module we are testing and some useful mock 4376 # classes in the __dict__ of each test. 4377 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO, 4378 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead, 4379 SlowFlushRawIO) 4380 all_members = io.__all__ + ["IncrementalNewlineDecoder"] 4381 c_io_ns = {name : getattr(io, name) for name in all_members} 4382 py_io_ns = {name : getattr(pyio, name) for name in all_members} 4383 globs = globals() 4384 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks) 4385 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks) 4386 # Avoid turning open into a bound method. 4387 py_io_ns["open"] = pyio.OpenWrapper 4388 for test in tests: 4389 if test.__name__.startswith("C"): 4390 for name, obj in c_io_ns.items(): 4391 setattr(test, name, obj) 4392 elif test.__name__.startswith("Py"): 4393 for name, obj in py_io_ns.items(): 4394 setattr(test, name, obj) 4395 4396 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests]) 4397 return suite 4398 4399if __name__ == "__main__": 4400 unittest.main() 4401