1"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11#     (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as an attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
22import abc
23import array
24import errno
25import locale
26import os
27import pickle
28import random
29import signal
30import sys
31import sysconfig
32import threading
33import time
34import unittest
35import warnings
36import weakref
37from collections import deque, UserList
38from itertools import cycle, count
39from test import support
40from test.support.script_helper import assert_python_ok, run_python_until_end
41from test.support import FakePath
42
43import codecs
44import io  # C implementation of io
45import _pyio as pyio # Python implementation of io
46
47try:
48    import ctypes
49except ImportError:
50    def byteslike(*pos, **kw):
51        return array.array("b", bytes(*pos, **kw))
52else:
53    def byteslike(*pos, **kw):
54        """Create a bytes-like object having no string or sequence methods"""
55        data = bytes(*pos, **kw)
56        obj = EmptyStruct()
57        ctypes.resize(obj, len(data))
58        memoryview(obj).cast("B")[:] = data
59        return obj
60    class EmptyStruct(ctypes.Structure):
61        pass
62
63_cflags = sysconfig.get_config_var('CFLAGS') or ''
64_config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
65MEMORY_SANITIZER = (
66    '-fsanitize=memory' in _cflags or
67    '--with-memory-sanitizer' in _config_args
68)
69
70def _default_chunk_size():
71    """Get the default TextIOWrapper chunk size"""
72    with open(__file__, "r", encoding="latin-1") as f:
73        return f._CHUNK_SIZE
74
75
76class MockRawIOWithoutRead:
77    """A RawIO implementation without read(), so as to exercise the default
78    RawIO.read() which calls readinto()."""
79
80    def __init__(self, read_stack=()):
81        self._read_stack = list(read_stack)
82        self._write_stack = []
83        self._reads = 0
84        self._extraneous_reads = 0
85
86    def write(self, b):
87        self._write_stack.append(bytes(b))
88        return len(b)
89
90    def writable(self):
91        return True
92
93    def fileno(self):
94        return 42
95
96    def readable(self):
97        return True
98
99    def seekable(self):
100        return True
101
102    def seek(self, pos, whence):
103        return 0   # wrong but we gotta return something
104
105    def tell(self):
106        return 0   # same comment as above
107
108    def readinto(self, buf):
109        self._reads += 1
110        max_len = len(buf)
111        try:
112            data = self._read_stack[0]
113        except IndexError:
114            self._extraneous_reads += 1
115            return 0
116        if data is None:
117            del self._read_stack[0]
118            return None
119        n = len(data)
120        if len(data) <= max_len:
121            del self._read_stack[0]
122            buf[:n] = data
123            return n
124        else:
125            buf[:] = data[:max_len]
126            self._read_stack[0] = data[max_len:]
127            return max_len
128
129    def truncate(self, pos=None):
130        return pos
131
132class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
133    pass
134
135class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
136    pass
137
138
139class MockRawIO(MockRawIOWithoutRead):
140
141    def read(self, n=None):
142        self._reads += 1
143        try:
144            return self._read_stack.pop(0)
145        except:
146            self._extraneous_reads += 1
147            return b""
148
149class CMockRawIO(MockRawIO, io.RawIOBase):
150    pass
151
152class PyMockRawIO(MockRawIO, pyio.RawIOBase):
153    pass
154
155
156class MisbehavedRawIO(MockRawIO):
157    def write(self, b):
158        return super().write(b) * 2
159
160    def read(self, n=None):
161        return super().read(n) * 2
162
163    def seek(self, pos, whence):
164        return -123
165
166    def tell(self):
167        return -456
168
169    def readinto(self, buf):
170        super().readinto(buf)
171        return len(buf) * 5
172
173class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
174    pass
175
176class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
177    pass
178
179
180class SlowFlushRawIO(MockRawIO):
181    def __init__(self):
182        super().__init__()
183        self.in_flush = threading.Event()
184
185    def flush(self):
186        self.in_flush.set()
187        time.sleep(0.25)
188
189class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase):
190    pass
191
192class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase):
193    pass
194
195
196class CloseFailureIO(MockRawIO):
197    closed = 0
198
199    def close(self):
200        if not self.closed:
201            self.closed = 1
202            raise OSError
203
204class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
205    pass
206
207class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
208    pass
209
210
211class MockFileIO:
212
213    def __init__(self, data):
214        self.read_history = []
215        super().__init__(data)
216
217    def read(self, n=None):
218        res = super().read(n)
219        self.read_history.append(None if res is None else len(res))
220        return res
221
222    def readinto(self, b):
223        res = super().readinto(b)
224        self.read_history.append(res)
225        return res
226
227class CMockFileIO(MockFileIO, io.BytesIO):
228    pass
229
230class PyMockFileIO(MockFileIO, pyio.BytesIO):
231    pass
232
233
234class MockUnseekableIO:
235    def seekable(self):
236        return False
237
238    def seek(self, *args):
239        raise self.UnsupportedOperation("not seekable")
240
241    def tell(self, *args):
242        raise self.UnsupportedOperation("not seekable")
243
244    def truncate(self, *args):
245        raise self.UnsupportedOperation("not seekable")
246
247class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
248    UnsupportedOperation = io.UnsupportedOperation
249
250class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
251    UnsupportedOperation = pyio.UnsupportedOperation
252
253
254class MockNonBlockWriterIO:
255
256    def __init__(self):
257        self._write_stack = []
258        self._blocker_char = None
259
260    def pop_written(self):
261        s = b"".join(self._write_stack)
262        self._write_stack[:] = []
263        return s
264
265    def block_on(self, char):
266        """Block when a given char is encountered."""
267        self._blocker_char = char
268
269    def readable(self):
270        return True
271
272    def seekable(self):
273        return True
274
275    def writable(self):
276        return True
277
278    def write(self, b):
279        b = bytes(b)
280        n = -1
281        if self._blocker_char:
282            try:
283                n = b.index(self._blocker_char)
284            except ValueError:
285                pass
286            else:
287                if n > 0:
288                    # write data up to the first blocker
289                    self._write_stack.append(b[:n])
290                    return n
291                else:
292                    # cancel blocker and indicate would block
293                    self._blocker_char = None
294                    return None
295        self._write_stack.append(b)
296        return len(b)
297
298class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
299    BlockingIOError = io.BlockingIOError
300
301class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
302    BlockingIOError = pyio.BlockingIOError
303
304
305class IOTest(unittest.TestCase):
306
307    def setUp(self):
308        support.unlink(support.TESTFN)
309
310    def tearDown(self):
311        support.unlink(support.TESTFN)
312
313    def write_ops(self, f):
314        self.assertEqual(f.write(b"blah."), 5)
315        f.truncate(0)
316        self.assertEqual(f.tell(), 5)
317        f.seek(0)
318
319        self.assertEqual(f.write(b"blah."), 5)
320        self.assertEqual(f.seek(0), 0)
321        self.assertEqual(f.write(b"Hello."), 6)
322        self.assertEqual(f.tell(), 6)
323        self.assertEqual(f.seek(-1, 1), 5)
324        self.assertEqual(f.tell(), 5)
325        buffer = bytearray(b" world\n\n\n")
326        self.assertEqual(f.write(buffer), 9)
327        buffer[:] = b"*" * 9  # Overwrite our copy of the data
328        self.assertEqual(f.seek(0), 0)
329        self.assertEqual(f.write(b"h"), 1)
330        self.assertEqual(f.seek(-1, 2), 13)
331        self.assertEqual(f.tell(), 13)
332
333        self.assertEqual(f.truncate(12), 12)
334        self.assertEqual(f.tell(), 13)
335        self.assertRaises(TypeError, f.seek, 0.0)
336
337    def read_ops(self, f, buffered=False):
338        data = f.read(5)
339        self.assertEqual(data, b"hello")
340        data = byteslike(data)
341        self.assertEqual(f.readinto(data), 5)
342        self.assertEqual(bytes(data), b" worl")
343        data = bytearray(5)
344        self.assertEqual(f.readinto(data), 2)
345        self.assertEqual(len(data), 5)
346        self.assertEqual(data[:2], b"d\n")
347        self.assertEqual(f.seek(0), 0)
348        self.assertEqual(f.read(20), b"hello world\n")
349        self.assertEqual(f.read(1), b"")
350        self.assertEqual(f.readinto(byteslike(b"x")), 0)
351        self.assertEqual(f.seek(-6, 2), 6)
352        self.assertEqual(f.read(5), b"world")
353        self.assertEqual(f.read(0), b"")
354        self.assertEqual(f.readinto(byteslike()), 0)
355        self.assertEqual(f.seek(-6, 1), 5)
356        self.assertEqual(f.read(5), b" worl")
357        self.assertEqual(f.tell(), 10)
358        self.assertRaises(TypeError, f.seek, 0.0)
359        if buffered:
360            f.seek(0)
361            self.assertEqual(f.read(), b"hello world\n")
362            f.seek(6)
363            self.assertEqual(f.read(), b"world\n")
364            self.assertEqual(f.read(), b"")
365            f.seek(0)
366            data = byteslike(5)
367            self.assertEqual(f.readinto1(data), 5)
368            self.assertEqual(bytes(data), b"hello")
369
370    LARGE = 2**31
371
372    def large_file_ops(self, f):
373        assert f.readable()
374        assert f.writable()
375        try:
376            self.assertEqual(f.seek(self.LARGE), self.LARGE)
377        except (OverflowError, ValueError):
378            self.skipTest("no largefile support")
379        self.assertEqual(f.tell(), self.LARGE)
380        self.assertEqual(f.write(b"xxx"), 3)
381        self.assertEqual(f.tell(), self.LARGE + 3)
382        self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
383        self.assertEqual(f.truncate(), self.LARGE + 2)
384        self.assertEqual(f.tell(), self.LARGE + 2)
385        self.assertEqual(f.seek(0, 2), self.LARGE + 2)
386        self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
387        self.assertEqual(f.tell(), self.LARGE + 2)
388        self.assertEqual(f.seek(0, 2), self.LARGE + 1)
389        self.assertEqual(f.seek(-1, 2), self.LARGE)
390        self.assertEqual(f.read(2), b"x")
391
392    def test_invalid_operations(self):
393        # Try writing on a file opened in read mode and vice-versa.
394        exc = self.UnsupportedOperation
395        for mode in ("w", "wb"):
396            with self.open(support.TESTFN, mode) as fp:
397                self.assertRaises(exc, fp.read)
398                self.assertRaises(exc, fp.readline)
399        with self.open(support.TESTFN, "wb", buffering=0) as fp:
400            self.assertRaises(exc, fp.read)
401            self.assertRaises(exc, fp.readline)
402        with self.open(support.TESTFN, "rb", buffering=0) as fp:
403            self.assertRaises(exc, fp.write, b"blah")
404            self.assertRaises(exc, fp.writelines, [b"blah\n"])
405        with self.open(support.TESTFN, "rb") as fp:
406            self.assertRaises(exc, fp.write, b"blah")
407            self.assertRaises(exc, fp.writelines, [b"blah\n"])
408        with self.open(support.TESTFN, "r") as fp:
409            self.assertRaises(exc, fp.write, "blah")
410            self.assertRaises(exc, fp.writelines, ["blah\n"])
411            # Non-zero seeking from current or end pos
412            self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
413            self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
414
415    def test_optional_abilities(self):
416        # Test for OSError when optional APIs are not supported
417        # The purpose of this test is to try fileno(), reading, writing and
418        # seeking operations with various objects that indicate they do not
419        # support these operations.
420
421        def pipe_reader():
422            [r, w] = os.pipe()
423            os.close(w)  # So that read() is harmless
424            return self.FileIO(r, "r")
425
426        def pipe_writer():
427            [r, w] = os.pipe()
428            self.addCleanup(os.close, r)
429            # Guarantee that we can write into the pipe without blocking
430            thread = threading.Thread(target=os.read, args=(r, 100))
431            thread.start()
432            self.addCleanup(thread.join)
433            return self.FileIO(w, "w")
434
435        def buffered_reader():
436            return self.BufferedReader(self.MockUnseekableIO())
437
438        def buffered_writer():
439            return self.BufferedWriter(self.MockUnseekableIO())
440
441        def buffered_random():
442            return self.BufferedRandom(self.BytesIO())
443
444        def buffered_rw_pair():
445            return self.BufferedRWPair(self.MockUnseekableIO(),
446                self.MockUnseekableIO())
447
448        def text_reader():
449            class UnseekableReader(self.MockUnseekableIO):
450                writable = self.BufferedIOBase.writable
451                write = self.BufferedIOBase.write
452            return self.TextIOWrapper(UnseekableReader(), "ascii")
453
454        def text_writer():
455            class UnseekableWriter(self.MockUnseekableIO):
456                readable = self.BufferedIOBase.readable
457                read = self.BufferedIOBase.read
458            return self.TextIOWrapper(UnseekableWriter(), "ascii")
459
460        tests = (
461            (pipe_reader, "fr"), (pipe_writer, "fw"),
462            (buffered_reader, "r"), (buffered_writer, "w"),
463            (buffered_random, "rws"), (buffered_rw_pair, "rw"),
464            (text_reader, "r"), (text_writer, "w"),
465            (self.BytesIO, "rws"), (self.StringIO, "rws"),
466        )
467        for [test, abilities] in tests:
468            with self.subTest(test), test() as obj:
469                readable = "r" in abilities
470                self.assertEqual(obj.readable(), readable)
471                writable = "w" in abilities
472                self.assertEqual(obj.writable(), writable)
473
474                if isinstance(obj, self.TextIOBase):
475                    data = "3"
476                elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
477                    data = b"3"
478                else:
479                    self.fail("Unknown base class")
480
481                if "f" in abilities:
482                    obj.fileno()
483                else:
484                    self.assertRaises(OSError, obj.fileno)
485
486                if readable:
487                    obj.read(1)
488                    obj.read()
489                else:
490                    self.assertRaises(OSError, obj.read, 1)
491                    self.assertRaises(OSError, obj.read)
492
493                if writable:
494                    obj.write(data)
495                else:
496                    self.assertRaises(OSError, obj.write, data)
497
498                if sys.platform.startswith("win") and test in (
499                        pipe_reader, pipe_writer):
500                    # Pipes seem to appear as seekable on Windows
501                    continue
502                seekable = "s" in abilities
503                self.assertEqual(obj.seekable(), seekable)
504
505                if seekable:
506                    obj.tell()
507                    obj.seek(0)
508                else:
509                    self.assertRaises(OSError, obj.tell)
510                    self.assertRaises(OSError, obj.seek, 0)
511
512                if writable and seekable:
513                    obj.truncate()
514                    obj.truncate(0)
515                else:
516                    self.assertRaises(OSError, obj.truncate)
517                    self.assertRaises(OSError, obj.truncate, 0)
518
519    def test_open_handles_NUL_chars(self):
520        fn_with_NUL = 'foo\0bar'
521        self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
522
523        bytes_fn = bytes(fn_with_NUL, 'ascii')
524        with warnings.catch_warnings():
525            warnings.simplefilter("ignore", DeprecationWarning)
526            self.assertRaises(ValueError, self.open, bytes_fn, 'w')
527
528    def test_raw_file_io(self):
529        with self.open(support.TESTFN, "wb", buffering=0) as f:
530            self.assertEqual(f.readable(), False)
531            self.assertEqual(f.writable(), True)
532            self.assertEqual(f.seekable(), True)
533            self.write_ops(f)
534        with self.open(support.TESTFN, "rb", buffering=0) as f:
535            self.assertEqual(f.readable(), True)
536            self.assertEqual(f.writable(), False)
537            self.assertEqual(f.seekable(), True)
538            self.read_ops(f)
539
540    def test_buffered_file_io(self):
541        with self.open(support.TESTFN, "wb") as f:
542            self.assertEqual(f.readable(), False)
543            self.assertEqual(f.writable(), True)
544            self.assertEqual(f.seekable(), True)
545            self.write_ops(f)
546        with self.open(support.TESTFN, "rb") as f:
547            self.assertEqual(f.readable(), True)
548            self.assertEqual(f.writable(), False)
549            self.assertEqual(f.seekable(), True)
550            self.read_ops(f, True)
551
552    def test_readline(self):
553        with self.open(support.TESTFN, "wb") as f:
554            f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
555        with self.open(support.TESTFN, "rb") as f:
556            self.assertEqual(f.readline(), b"abc\n")
557            self.assertEqual(f.readline(10), b"def\n")
558            self.assertEqual(f.readline(2), b"xy")
559            self.assertEqual(f.readline(4), b"zzy\n")
560            self.assertEqual(f.readline(), b"foo\x00bar\n")
561            self.assertEqual(f.readline(None), b"another line")
562            self.assertRaises(TypeError, f.readline, 5.3)
563        with self.open(support.TESTFN, "r") as f:
564            self.assertRaises(TypeError, f.readline, 5.3)
565
566    def test_readline_nonsizeable(self):
567        # Issue #30061
568        # Crash when readline() returns an object without __len__
569        class R(self.IOBase):
570            def readline(self):
571                return None
572        self.assertRaises((TypeError, StopIteration), next, R())
573
574    def test_next_nonsizeable(self):
575        # Issue #30061
576        # Crash when __next__() returns an object without __len__
577        class R(self.IOBase):
578            def __next__(self):
579                return None
580        self.assertRaises(TypeError, R().readlines, 1)
581
582    def test_raw_bytes_io(self):
583        f = self.BytesIO()
584        self.write_ops(f)
585        data = f.getvalue()
586        self.assertEqual(data, b"hello world\n")
587        f = self.BytesIO(data)
588        self.read_ops(f, True)
589
590    def test_large_file_ops(self):
591        # On Windows and Mac OSX this test consumes large resources; It takes
592        # a long time to build the >2 GiB file and takes >2 GiB of disk space
593        # therefore the resource must be enabled to run this test.
594        if sys.platform[:3] == 'win' or sys.platform == 'darwin':
595            support.requires(
596                'largefile',
597                'test requires %s bytes and a long time to run' % self.LARGE)
598        with self.open(support.TESTFN, "w+b", 0) as f:
599            self.large_file_ops(f)
600        with self.open(support.TESTFN, "w+b") as f:
601            self.large_file_ops(f)
602
603    def test_with_open(self):
604        for bufsize in (0, 1, 100):
605            f = None
606            with self.open(support.TESTFN, "wb", bufsize) as f:
607                f.write(b"xxx")
608            self.assertEqual(f.closed, True)
609            f = None
610            try:
611                with self.open(support.TESTFN, "wb", bufsize) as f:
612                    1/0
613            except ZeroDivisionError:
614                self.assertEqual(f.closed, True)
615            else:
616                self.fail("1/0 didn't raise an exception")
617
618    # issue 5008
619    def test_append_mode_tell(self):
620        with self.open(support.TESTFN, "wb") as f:
621            f.write(b"xxx")
622        with self.open(support.TESTFN, "ab", buffering=0) as f:
623            self.assertEqual(f.tell(), 3)
624        with self.open(support.TESTFN, "ab") as f:
625            self.assertEqual(f.tell(), 3)
626        with self.open(support.TESTFN, "a") as f:
627            self.assertGreater(f.tell(), 0)
628
629    def test_destructor(self):
630        record = []
631        class MyFileIO(self.FileIO):
632            def __del__(self):
633                record.append(1)
634                try:
635                    f = super().__del__
636                except AttributeError:
637                    pass
638                else:
639                    f()
640            def close(self):
641                record.append(2)
642                super().close()
643            def flush(self):
644                record.append(3)
645                super().flush()
646        with support.check_warnings(('', ResourceWarning)):
647            f = MyFileIO(support.TESTFN, "wb")
648            f.write(b"xxx")
649            del f
650            support.gc_collect()
651            self.assertEqual(record, [1, 2, 3])
652            with self.open(support.TESTFN, "rb") as f:
653                self.assertEqual(f.read(), b"xxx")
654
655    def _check_base_destructor(self, base):
656        record = []
657        class MyIO(base):
658            def __init__(self):
659                # This exercises the availability of attributes on object
660                # destruction.
661                # (in the C version, close() is called by the tp_dealloc
662                # function, not by __del__)
663                self.on_del = 1
664                self.on_close = 2
665                self.on_flush = 3
666            def __del__(self):
667                record.append(self.on_del)
668                try:
669                    f = super().__del__
670                except AttributeError:
671                    pass
672                else:
673                    f()
674            def close(self):
675                record.append(self.on_close)
676                super().close()
677            def flush(self):
678                record.append(self.on_flush)
679                super().flush()
680        f = MyIO()
681        del f
682        support.gc_collect()
683        self.assertEqual(record, [1, 2, 3])
684
685    def test_IOBase_destructor(self):
686        self._check_base_destructor(self.IOBase)
687
688    def test_RawIOBase_destructor(self):
689        self._check_base_destructor(self.RawIOBase)
690
691    def test_BufferedIOBase_destructor(self):
692        self._check_base_destructor(self.BufferedIOBase)
693
694    def test_TextIOBase_destructor(self):
695        self._check_base_destructor(self.TextIOBase)
696
697    def test_close_flushes(self):
698        with self.open(support.TESTFN, "wb") as f:
699            f.write(b"xxx")
700        with self.open(support.TESTFN, "rb") as f:
701            self.assertEqual(f.read(), b"xxx")
702
703    def test_array_writes(self):
704        a = array.array('i', range(10))
705        n = len(a.tobytes())
706        def check(f):
707            with f:
708                self.assertEqual(f.write(a), n)
709                f.writelines((a,))
710        check(self.BytesIO())
711        check(self.FileIO(support.TESTFN, "w"))
712        check(self.BufferedWriter(self.MockRawIO()))
713        check(self.BufferedRandom(self.MockRawIO()))
714        check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
715
716    def test_closefd(self):
717        self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
718                          closefd=False)
719
720    def test_read_closed(self):
721        with self.open(support.TESTFN, "w") as f:
722            f.write("egg\n")
723        with self.open(support.TESTFN, "r") as f:
724            file = self.open(f.fileno(), "r", closefd=False)
725            self.assertEqual(file.read(), "egg\n")
726            file.seek(0)
727            file.close()
728            self.assertRaises(ValueError, file.read)
729        with self.open(support.TESTFN, "rb") as f:
730            file = self.open(f.fileno(), "rb", closefd=False)
731            self.assertEqual(file.read()[:3], b"egg")
732            file.close()
733            self.assertRaises(ValueError, file.readinto, bytearray(1))
734
735    def test_no_closefd_with_filename(self):
736        # can't use closefd in combination with a file name
737        self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
738
739    def test_closefd_attr(self):
740        with self.open(support.TESTFN, "wb") as f:
741            f.write(b"egg\n")
742        with self.open(support.TESTFN, "r") as f:
743            self.assertEqual(f.buffer.raw.closefd, True)
744            file = self.open(f.fileno(), "r", closefd=False)
745            self.assertEqual(file.buffer.raw.closefd, False)
746
747    def test_garbage_collection(self):
748        # FileIO objects are collected, and collecting them flushes
749        # all data to disk.
750        with support.check_warnings(('', ResourceWarning)):
751            f = self.FileIO(support.TESTFN, "wb")
752            f.write(b"abcxxx")
753            f.f = f
754            wr = weakref.ref(f)
755            del f
756            support.gc_collect()
757        self.assertIsNone(wr(), wr)
758        with self.open(support.TESTFN, "rb") as f:
759            self.assertEqual(f.read(), b"abcxxx")
760
761    def test_unbounded_file(self):
762        # Issue #1174606: reading from an unbounded stream such as /dev/zero.
763        zero = "/dev/zero"
764        if not os.path.exists(zero):
765            self.skipTest("{0} does not exist".format(zero))
766        if sys.maxsize > 0x7FFFFFFF:
767            self.skipTest("test can only run in a 32-bit address space")
768        if support.real_max_memuse < support._2G:
769            self.skipTest("test requires at least 2 GiB of memory")
770        with self.open(zero, "rb", buffering=0) as f:
771            self.assertRaises(OverflowError, f.read)
772        with self.open(zero, "rb") as f:
773            self.assertRaises(OverflowError, f.read)
774        with self.open(zero, "r") as f:
775            self.assertRaises(OverflowError, f.read)
776
777    def check_flush_error_on_close(self, *args, **kwargs):
778        # Test that the file is closed despite failed flush
779        # and that flush() is called before file closed.
780        f = self.open(*args, **kwargs)
781        closed = []
782        def bad_flush():
783            closed[:] = [f.closed]
784            raise OSError()
785        f.flush = bad_flush
786        self.assertRaises(OSError, f.close) # exception not swallowed
787        self.assertTrue(f.closed)
788        self.assertTrue(closed)      # flush() called
789        self.assertFalse(closed[0])  # flush() called before file closed
790        f.flush = lambda: None  # break reference loop
791
792    def test_flush_error_on_close(self):
793        # raw file
794        # Issue #5700: io.FileIO calls flush() after file closed
795        self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
796        fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
797        self.check_flush_error_on_close(fd, 'wb', buffering=0)
798        fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
799        self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
800        os.close(fd)
801        # buffered io
802        self.check_flush_error_on_close(support.TESTFN, 'wb')
803        fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
804        self.check_flush_error_on_close(fd, 'wb')
805        fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
806        self.check_flush_error_on_close(fd, 'wb', closefd=False)
807        os.close(fd)
808        # text io
809        self.check_flush_error_on_close(support.TESTFN, 'w')
810        fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
811        self.check_flush_error_on_close(fd, 'w')
812        fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
813        self.check_flush_error_on_close(fd, 'w', closefd=False)
814        os.close(fd)
815
816    def test_multi_close(self):
817        f = self.open(support.TESTFN, "wb", buffering=0)
818        f.close()
819        f.close()
820        f.close()
821        self.assertRaises(ValueError, f.flush)
822
823    def test_RawIOBase_read(self):
824        # Exercise the default limited RawIOBase.read(n) implementation (which
825        # calls readinto() internally).
826        rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
827        self.assertEqual(rawio.read(2), b"ab")
828        self.assertEqual(rawio.read(2), b"c")
829        self.assertEqual(rawio.read(2), b"d")
830        self.assertEqual(rawio.read(2), None)
831        self.assertEqual(rawio.read(2), b"ef")
832        self.assertEqual(rawio.read(2), b"g")
833        self.assertEqual(rawio.read(2), None)
834        self.assertEqual(rawio.read(2), b"")
835
836    def test_types_have_dict(self):
837        test = (
838            self.IOBase(),
839            self.RawIOBase(),
840            self.TextIOBase(),
841            self.StringIO(),
842            self.BytesIO()
843        )
844        for obj in test:
845            self.assertTrue(hasattr(obj, "__dict__"))
846
847    def test_opener(self):
848        with self.open(support.TESTFN, "w") as f:
849            f.write("egg\n")
850        fd = os.open(support.TESTFN, os.O_RDONLY)
851        def opener(path, flags):
852            return fd
853        with self.open("non-existent", "r", opener=opener) as f:
854            self.assertEqual(f.read(), "egg\n")
855
856    def test_bad_opener_negative_1(self):
857        # Issue #27066.
858        def badopener(fname, flags):
859            return -1
860        with self.assertRaises(ValueError) as cm:
861            open('non-existent', 'r', opener=badopener)
862        self.assertEqual(str(cm.exception), 'opener returned -1')
863
864    def test_bad_opener_other_negative(self):
865        # Issue #27066.
866        def badopener(fname, flags):
867            return -2
868        with self.assertRaises(ValueError) as cm:
869            open('non-existent', 'r', opener=badopener)
870        self.assertEqual(str(cm.exception), 'opener returned -2')
871
872    def test_fileio_closefd(self):
873        # Issue #4841
874        with self.open(__file__, 'rb') as f1, \
875             self.open(__file__, 'rb') as f2:
876            fileio = self.FileIO(f1.fileno(), closefd=False)
877            # .__init__() must not close f1
878            fileio.__init__(f2.fileno(), closefd=False)
879            f1.readline()
880            # .close() must not close f2
881            fileio.close()
882            f2.readline()
883
884    def test_nonbuffered_textio(self):
885        with support.check_no_resource_warning(self):
886            with self.assertRaises(ValueError):
887                self.open(support.TESTFN, 'w', buffering=0)
888
889    def test_invalid_newline(self):
890        with support.check_no_resource_warning(self):
891            with self.assertRaises(ValueError):
892                self.open(support.TESTFN, 'w', newline='invalid')
893
894    def test_buffered_readinto_mixin(self):
895        # Test the implementation provided by BufferedIOBase
896        class Stream(self.BufferedIOBase):
897            def read(self, size):
898                return b"12345"
899            read1 = read
900        stream = Stream()
901        for method in ("readinto", "readinto1"):
902            with self.subTest(method):
903                buffer = byteslike(5)
904                self.assertEqual(getattr(stream, method)(buffer), 5)
905                self.assertEqual(bytes(buffer), b"12345")
906
907    def test_fspath_support(self):
908        def check_path_succeeds(path):
909            with self.open(path, "w") as f:
910                f.write("egg\n")
911
912            with self.open(path, "r") as f:
913                self.assertEqual(f.read(), "egg\n")
914
915        check_path_succeeds(FakePath(support.TESTFN))
916        check_path_succeeds(FakePath(support.TESTFN.encode('utf-8')))
917
918        with self.open(support.TESTFN, "w") as f:
919            bad_path = FakePath(f.fileno())
920            with self.assertRaises(TypeError):
921                self.open(bad_path, 'w')
922
923        bad_path = FakePath(None)
924        with self.assertRaises(TypeError):
925            self.open(bad_path, 'w')
926
927        bad_path = FakePath(FloatingPointError)
928        with self.assertRaises(FloatingPointError):
929            self.open(bad_path, 'w')
930
931        # ensure that refcounting is correct with some error conditions
932        with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
933            self.open(FakePath(support.TESTFN), 'rwxa')
934
935    def test_RawIOBase_readall(self):
936        # Exercise the default unlimited RawIOBase.read() and readall()
937        # implementations.
938        rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
939        self.assertEqual(rawio.read(), b"abcdefg")
940        rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
941        self.assertEqual(rawio.readall(), b"abcdefg")
942
943    def test_BufferedIOBase_readinto(self):
944        # Exercise the default BufferedIOBase.readinto() and readinto1()
945        # implementations (which call read() or read1() internally).
946        class Reader(self.BufferedIOBase):
947            def __init__(self, avail):
948                self.avail = avail
949            def read(self, size):
950                result = self.avail[:size]
951                self.avail = self.avail[size:]
952                return result
953            def read1(self, size):
954                """Returns no more than 5 bytes at once"""
955                return self.read(min(size, 5))
956        tests = (
957            # (test method, total data available, read buffer size, expected
958            #     read size)
959            ("readinto", 10, 5, 5),
960            ("readinto", 10, 6, 6),  # More than read1() can return
961            ("readinto", 5, 6, 5),  # Buffer larger than total available
962            ("readinto", 6, 7, 6),
963            ("readinto", 10, 0, 0),  # Empty buffer
964            ("readinto1", 10, 5, 5),  # Result limited to single read1() call
965            ("readinto1", 10, 6, 5),  # Buffer larger than read1() can return
966            ("readinto1", 5, 6, 5),  # Buffer larger than total available
967            ("readinto1", 6, 7, 5),
968            ("readinto1", 10, 0, 0),  # Empty buffer
969        )
970        UNUSED_BYTE = 0x81
971        for test in tests:
972            with self.subTest(test):
973                method, avail, request, result = test
974                reader = Reader(bytes(range(avail)))
975                buffer = bytearray((UNUSED_BYTE,) * request)
976                method = getattr(reader, method)
977                self.assertEqual(method(buffer), result)
978                self.assertEqual(len(buffer), request)
979                self.assertSequenceEqual(buffer[:result], range(result))
980                unused = (UNUSED_BYTE,) * (request - result)
981                self.assertSequenceEqual(buffer[result:], unused)
982                self.assertEqual(len(reader.avail), avail - result)
983
984    def test_close_assert(self):
985        class R(self.IOBase):
986            def __setattr__(self, name, value):
987                pass
988            def flush(self):
989                raise OSError()
990        f = R()
991        # This would cause an assertion failure.
992        self.assertRaises(OSError, f.close)
993
994
995class CIOTest(IOTest):
996
997    def test_IOBase_finalize(self):
998        # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
999        # class which inherits IOBase and an object of this class are caught
1000        # in a reference cycle and close() is already in the method cache.
1001        class MyIO(self.IOBase):
1002            def close(self):
1003                pass
1004
1005        # create an instance to populate the method cache
1006        MyIO()
1007        obj = MyIO()
1008        obj.obj = obj
1009        wr = weakref.ref(obj)
1010        del MyIO
1011        del obj
1012        support.gc_collect()
1013        self.assertIsNone(wr(), wr)
1014
1015class PyIOTest(IOTest):
1016    pass
1017
1018
1019@support.cpython_only
1020class APIMismatchTest(unittest.TestCase):
1021
1022    def test_RawIOBase_io_in_pyio_match(self):
1023        """Test that pyio RawIOBase class has all c RawIOBase methods"""
1024        mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
1025                                               ignore=('__weakref__',))
1026        self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
1027
1028    def test_RawIOBase_pyio_in_io_match(self):
1029        """Test that c RawIOBase class has all pyio RawIOBase methods"""
1030        mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
1031        self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
1032
1033
1034class CommonBufferedTests:
1035    # Tests common to BufferedReader, BufferedWriter and BufferedRandom
1036
1037    def test_detach(self):
1038        raw = self.MockRawIO()
1039        buf = self.tp(raw)
1040        self.assertIs(buf.detach(), raw)
1041        self.assertRaises(ValueError, buf.detach)
1042
1043        repr(buf)  # Should still work
1044
1045    def test_fileno(self):
1046        rawio = self.MockRawIO()
1047        bufio = self.tp(rawio)
1048
1049        self.assertEqual(42, bufio.fileno())
1050
1051    def test_invalid_args(self):
1052        rawio = self.MockRawIO()
1053        bufio = self.tp(rawio)
1054        # Invalid whence
1055        self.assertRaises(ValueError, bufio.seek, 0, -1)
1056        self.assertRaises(ValueError, bufio.seek, 0, 9)
1057
1058    def test_override_destructor(self):
1059        tp = self.tp
1060        record = []
1061        class MyBufferedIO(tp):
1062            def __del__(self):
1063                record.append(1)
1064                try:
1065                    f = super().__del__
1066                except AttributeError:
1067                    pass
1068                else:
1069                    f()
1070            def close(self):
1071                record.append(2)
1072                super().close()
1073            def flush(self):
1074                record.append(3)
1075                super().flush()
1076        rawio = self.MockRawIO()
1077        bufio = MyBufferedIO(rawio)
1078        del bufio
1079        support.gc_collect()
1080        self.assertEqual(record, [1, 2, 3])
1081
1082    def test_context_manager(self):
1083        # Test usability as a context manager
1084        rawio = self.MockRawIO()
1085        bufio = self.tp(rawio)
1086        def _with():
1087            with bufio:
1088                pass
1089        _with()
1090        # bufio should now be closed, and using it a second time should raise
1091        # a ValueError.
1092        self.assertRaises(ValueError, _with)
1093
1094    def test_error_through_destructor(self):
1095        # Test that the exception state is not modified by a destructor,
1096        # even if close() fails.
1097        rawio = self.CloseFailureIO()
1098        def f():
1099            self.tp(rawio).xyzzy
1100        with support.captured_output("stderr") as s:
1101            self.assertRaises(AttributeError, f)
1102        s = s.getvalue().strip()
1103        if s:
1104            # The destructor *may* have printed an unraisable error, check it
1105            self.assertEqual(len(s.splitlines()), 1)
1106            self.assertTrue(s.startswith("Exception OSError: "), s)
1107            self.assertTrue(s.endswith(" ignored"), s)
1108
1109    def test_repr(self):
1110        raw = self.MockRawIO()
1111        b = self.tp(raw)
1112        clsname = r"(%s\.)?%s" % (self.tp.__module__, self.tp.__qualname__)
1113        self.assertRegex(repr(b), "<%s>" % clsname)
1114        raw.name = "dummy"
1115        self.assertRegex(repr(b), "<%s name='dummy'>" % clsname)
1116        raw.name = b"dummy"
1117        self.assertRegex(repr(b), "<%s name=b'dummy'>" % clsname)
1118
1119    def test_recursive_repr(self):
1120        # Issue #25455
1121        raw = self.MockRawIO()
1122        b = self.tp(raw)
1123        with support.swap_attr(raw, 'name', b):
1124            try:
1125                repr(b)  # Should not crash
1126            except RuntimeError:
1127                pass
1128
1129    def test_flush_error_on_close(self):
1130        # Test that buffered file is closed despite failed flush
1131        # and that flush() is called before file closed.
1132        raw = self.MockRawIO()
1133        closed = []
1134        def bad_flush():
1135            closed[:] = [b.closed, raw.closed]
1136            raise OSError()
1137        raw.flush = bad_flush
1138        b = self.tp(raw)
1139        self.assertRaises(OSError, b.close) # exception not swallowed
1140        self.assertTrue(b.closed)
1141        self.assertTrue(raw.closed)
1142        self.assertTrue(closed)      # flush() called
1143        self.assertFalse(closed[0])  # flush() called before file closed
1144        self.assertFalse(closed[1])
1145        raw.flush = lambda: None  # break reference loop
1146
1147    def test_close_error_on_close(self):
1148        raw = self.MockRawIO()
1149        def bad_flush():
1150            raise OSError('flush')
1151        def bad_close():
1152            raise OSError('close')
1153        raw.close = bad_close
1154        b = self.tp(raw)
1155        b.flush = bad_flush
1156        with self.assertRaises(OSError) as err: # exception not swallowed
1157            b.close()
1158        self.assertEqual(err.exception.args, ('close',))
1159        self.assertIsInstance(err.exception.__context__, OSError)
1160        self.assertEqual(err.exception.__context__.args, ('flush',))
1161        self.assertFalse(b.closed)
1162
1163    def test_nonnormalized_close_error_on_close(self):
1164        # Issue #21677
1165        raw = self.MockRawIO()
1166        def bad_flush():
1167            raise non_existing_flush
1168        def bad_close():
1169            raise non_existing_close
1170        raw.close = bad_close
1171        b = self.tp(raw)
1172        b.flush = bad_flush
1173        with self.assertRaises(NameError) as err: # exception not swallowed
1174            b.close()
1175        self.assertIn('non_existing_close', str(err.exception))
1176        self.assertIsInstance(err.exception.__context__, NameError)
1177        self.assertIn('non_existing_flush', str(err.exception.__context__))
1178        self.assertFalse(b.closed)
1179
1180    def test_multi_close(self):
1181        raw = self.MockRawIO()
1182        b = self.tp(raw)
1183        b.close()
1184        b.close()
1185        b.close()
1186        self.assertRaises(ValueError, b.flush)
1187
1188    def test_unseekable(self):
1189        bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1190        self.assertRaises(self.UnsupportedOperation, bufio.tell)
1191        self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1192
1193    def test_readonly_attributes(self):
1194        raw = self.MockRawIO()
1195        buf = self.tp(raw)
1196        x = self.MockRawIO()
1197        with self.assertRaises(AttributeError):
1198            buf.raw = x
1199
1200
1201class SizeofTest:
1202
1203    @support.cpython_only
1204    def test_sizeof(self):
1205        bufsize1 = 4096
1206        bufsize2 = 8192
1207        rawio = self.MockRawIO()
1208        bufio = self.tp(rawio, buffer_size=bufsize1)
1209        size = sys.getsizeof(bufio) - bufsize1
1210        rawio = self.MockRawIO()
1211        bufio = self.tp(rawio, buffer_size=bufsize2)
1212        self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1213
1214    @support.cpython_only
1215    def test_buffer_freeing(self) :
1216        bufsize = 4096
1217        rawio = self.MockRawIO()
1218        bufio = self.tp(rawio, buffer_size=bufsize)
1219        size = sys.getsizeof(bufio) - bufsize
1220        bufio.close()
1221        self.assertEqual(sys.getsizeof(bufio), size)
1222
1223class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1224    read_mode = "rb"
1225
1226    def test_constructor(self):
1227        rawio = self.MockRawIO([b"abc"])
1228        bufio = self.tp(rawio)
1229        bufio.__init__(rawio)
1230        bufio.__init__(rawio, buffer_size=1024)
1231        bufio.__init__(rawio, buffer_size=16)
1232        self.assertEqual(b"abc", bufio.read())
1233        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1234        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1235        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1236        rawio = self.MockRawIO([b"abc"])
1237        bufio.__init__(rawio)
1238        self.assertEqual(b"abc", bufio.read())
1239
1240    def test_uninitialized(self):
1241        bufio = self.tp.__new__(self.tp)
1242        del bufio
1243        bufio = self.tp.__new__(self.tp)
1244        self.assertRaisesRegex((ValueError, AttributeError),
1245                               'uninitialized|has no attribute',
1246                               bufio.read, 0)
1247        bufio.__init__(self.MockRawIO())
1248        self.assertEqual(bufio.read(0), b'')
1249
1250    def test_read(self):
1251        for arg in (None, 7):
1252            rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1253            bufio = self.tp(rawio)
1254            self.assertEqual(b"abcdefg", bufio.read(arg))
1255        # Invalid args
1256        self.assertRaises(ValueError, bufio.read, -2)
1257
1258    def test_read1(self):
1259        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1260        bufio = self.tp(rawio)
1261        self.assertEqual(b"a", bufio.read(1))
1262        self.assertEqual(b"b", bufio.read1(1))
1263        self.assertEqual(rawio._reads, 1)
1264        self.assertEqual(b"", bufio.read1(0))
1265        self.assertEqual(b"c", bufio.read1(100))
1266        self.assertEqual(rawio._reads, 1)
1267        self.assertEqual(b"d", bufio.read1(100))
1268        self.assertEqual(rawio._reads, 2)
1269        self.assertEqual(b"efg", bufio.read1(100))
1270        self.assertEqual(rawio._reads, 3)
1271        self.assertEqual(b"", bufio.read1(100))
1272        self.assertEqual(rawio._reads, 4)
1273
1274    def test_read1_arbitrary(self):
1275        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1276        bufio = self.tp(rawio)
1277        self.assertEqual(b"a", bufio.read(1))
1278        self.assertEqual(b"bc", bufio.read1())
1279        self.assertEqual(b"d", bufio.read1())
1280        self.assertEqual(b"efg", bufio.read1(-1))
1281        self.assertEqual(rawio._reads, 3)
1282        self.assertEqual(b"", bufio.read1())
1283        self.assertEqual(rawio._reads, 4)
1284
1285    def test_readinto(self):
1286        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1287        bufio = self.tp(rawio)
1288        b = bytearray(2)
1289        self.assertEqual(bufio.readinto(b), 2)
1290        self.assertEqual(b, b"ab")
1291        self.assertEqual(bufio.readinto(b), 2)
1292        self.assertEqual(b, b"cd")
1293        self.assertEqual(bufio.readinto(b), 2)
1294        self.assertEqual(b, b"ef")
1295        self.assertEqual(bufio.readinto(b), 1)
1296        self.assertEqual(b, b"gf")
1297        self.assertEqual(bufio.readinto(b), 0)
1298        self.assertEqual(b, b"gf")
1299        rawio = self.MockRawIO((b"abc", None))
1300        bufio = self.tp(rawio)
1301        self.assertEqual(bufio.readinto(b), 2)
1302        self.assertEqual(b, b"ab")
1303        self.assertEqual(bufio.readinto(b), 1)
1304        self.assertEqual(b, b"cb")
1305
1306    def test_readinto1(self):
1307        buffer_size = 10
1308        rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1309        bufio = self.tp(rawio, buffer_size=buffer_size)
1310        b = bytearray(2)
1311        self.assertEqual(bufio.peek(3), b'abc')
1312        self.assertEqual(rawio._reads, 1)
1313        self.assertEqual(bufio.readinto1(b), 2)
1314        self.assertEqual(b, b"ab")
1315        self.assertEqual(rawio._reads, 1)
1316        self.assertEqual(bufio.readinto1(b), 1)
1317        self.assertEqual(b[:1], b"c")
1318        self.assertEqual(rawio._reads, 1)
1319        self.assertEqual(bufio.readinto1(b), 2)
1320        self.assertEqual(b, b"de")
1321        self.assertEqual(rawio._reads, 2)
1322        b = bytearray(2*buffer_size)
1323        self.assertEqual(bufio.peek(3), b'fgh')
1324        self.assertEqual(rawio._reads, 3)
1325        self.assertEqual(bufio.readinto1(b), 6)
1326        self.assertEqual(b[:6], b"fghjkl")
1327        self.assertEqual(rawio._reads, 4)
1328
1329    def test_readinto_array(self):
1330        buffer_size = 60
1331        data = b"a" * 26
1332        rawio = self.MockRawIO((data,))
1333        bufio = self.tp(rawio, buffer_size=buffer_size)
1334
1335        # Create an array with element size > 1 byte
1336        b = array.array('i', b'x' * 32)
1337        assert len(b) != 16
1338
1339        # Read into it. We should get as many *bytes* as we can fit into b
1340        # (which is more than the number of elements)
1341        n = bufio.readinto(b)
1342        self.assertGreater(n, len(b))
1343
1344        # Check that old contents of b are preserved
1345        bm = memoryview(b).cast('B')
1346        self.assertLess(n, len(bm))
1347        self.assertEqual(bm[:n], data[:n])
1348        self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1349
1350    def test_readinto1_array(self):
1351        buffer_size = 60
1352        data = b"a" * 26
1353        rawio = self.MockRawIO((data,))
1354        bufio = self.tp(rawio, buffer_size=buffer_size)
1355
1356        # Create an array with element size > 1 byte
1357        b = array.array('i', b'x' * 32)
1358        assert len(b) != 16
1359
1360        # Read into it. We should get as many *bytes* as we can fit into b
1361        # (which is more than the number of elements)
1362        n = bufio.readinto1(b)
1363        self.assertGreater(n, len(b))
1364
1365        # Check that old contents of b are preserved
1366        bm = memoryview(b).cast('B')
1367        self.assertLess(n, len(bm))
1368        self.assertEqual(bm[:n], data[:n])
1369        self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1370
1371    def test_readlines(self):
1372        def bufio():
1373            rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1374            return self.tp(rawio)
1375        self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1376        self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1377        self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
1378
1379    def test_buffering(self):
1380        data = b"abcdefghi"
1381        dlen = len(data)
1382
1383        tests = [
1384            [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1385            [ 100, [ 3, 3, 3],     [ dlen ]    ],
1386            [   4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1387        ]
1388
1389        for bufsize, buf_read_sizes, raw_read_sizes in tests:
1390            rawio = self.MockFileIO(data)
1391            bufio = self.tp(rawio, buffer_size=bufsize)
1392            pos = 0
1393            for nbytes in buf_read_sizes:
1394                self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
1395                pos += nbytes
1396            # this is mildly implementation-dependent
1397            self.assertEqual(rawio.read_history, raw_read_sizes)
1398
1399    def test_read_non_blocking(self):
1400        # Inject some None's in there to simulate EWOULDBLOCK
1401        rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1402        bufio = self.tp(rawio)
1403        self.assertEqual(b"abcd", bufio.read(6))
1404        self.assertEqual(b"e", bufio.read(1))
1405        self.assertEqual(b"fg", bufio.read())
1406        self.assertEqual(b"", bufio.peek(1))
1407        self.assertIsNone(bufio.read())
1408        self.assertEqual(b"", bufio.read())
1409
1410        rawio = self.MockRawIO((b"a", None, None))
1411        self.assertEqual(b"a", rawio.readall())
1412        self.assertIsNone(rawio.readall())
1413
1414    def test_read_past_eof(self):
1415        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1416        bufio = self.tp(rawio)
1417
1418        self.assertEqual(b"abcdefg", bufio.read(9000))
1419
1420    def test_read_all(self):
1421        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1422        bufio = self.tp(rawio)
1423
1424        self.assertEqual(b"abcdefg", bufio.read())
1425
1426    @support.requires_resource('cpu')
1427    def test_threads(self):
1428        try:
1429            # Write out many bytes with exactly the same number of 0's,
1430            # 1's... 255's. This will help us check that concurrent reading
1431            # doesn't duplicate or forget contents.
1432            N = 1000
1433            l = list(range(256)) * N
1434            random.shuffle(l)
1435            s = bytes(bytearray(l))
1436            with self.open(support.TESTFN, "wb") as f:
1437                f.write(s)
1438            with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
1439                bufio = self.tp(raw, 8)
1440                errors = []
1441                results = []
1442                def f():
1443                    try:
1444                        # Intra-buffer read then buffer-flushing read
1445                        for n in cycle([1, 19]):
1446                            s = bufio.read(n)
1447                            if not s:
1448                                break
1449                            # list.append() is atomic
1450                            results.append(s)
1451                    except Exception as e:
1452                        errors.append(e)
1453                        raise
1454                threads = [threading.Thread(target=f) for x in range(20)]
1455                with support.start_threads(threads):
1456                    time.sleep(0.02) # yield
1457                self.assertFalse(errors,
1458                    "the following exceptions were caught: %r" % errors)
1459                s = b''.join(results)
1460                for i in range(256):
1461                    c = bytes(bytearray([i]))
1462                    self.assertEqual(s.count(c), N)
1463        finally:
1464            support.unlink(support.TESTFN)
1465
1466    def test_unseekable(self):
1467        bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1468        self.assertRaises(self.UnsupportedOperation, bufio.tell)
1469        self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1470        bufio.read(1)
1471        self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1472        self.assertRaises(self.UnsupportedOperation, bufio.tell)
1473
1474    def test_misbehaved_io(self):
1475        rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1476        bufio = self.tp(rawio)
1477        self.assertRaises(OSError, bufio.seek, 0)
1478        self.assertRaises(OSError, bufio.tell)
1479
1480    def test_no_extraneous_read(self):
1481        # Issue #9550; when the raw IO object has satisfied the read request,
1482        # we should not issue any additional reads, otherwise it may block
1483        # (e.g. socket).
1484        bufsize = 16
1485        for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1486            rawio = self.MockRawIO([b"x" * n])
1487            bufio = self.tp(rawio, bufsize)
1488            self.assertEqual(bufio.read(n), b"x" * n)
1489            # Simple case: one raw read is enough to satisfy the request.
1490            self.assertEqual(rawio._extraneous_reads, 0,
1491                             "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1492            # A more complex case where two raw reads are needed to satisfy
1493            # the request.
1494            rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1495            bufio = self.tp(rawio, bufsize)
1496            self.assertEqual(bufio.read(n), b"x" * n)
1497            self.assertEqual(rawio._extraneous_reads, 0,
1498                             "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1499
1500    def test_read_on_closed(self):
1501        # Issue #23796
1502        b = io.BufferedReader(io.BytesIO(b"12"))
1503        b.read(1)
1504        b.close()
1505        self.assertRaises(ValueError, b.peek)
1506        self.assertRaises(ValueError, b.read1, 1)
1507
1508
1509class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
1510    tp = io.BufferedReader
1511
1512    @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1513                     "instead of returning NULL for malloc failure.")
1514    def test_constructor(self):
1515        BufferedReaderTest.test_constructor(self)
1516        # The allocation can succeed on 32-bit builds, e.g. with more
1517        # than 2 GiB RAM and a 64-bit kernel.
1518        if sys.maxsize > 0x7FFFFFFF:
1519            rawio = self.MockRawIO()
1520            bufio = self.tp(rawio)
1521            self.assertRaises((OverflowError, MemoryError, ValueError),
1522                bufio.__init__, rawio, sys.maxsize)
1523
1524    def test_initialization(self):
1525        rawio = self.MockRawIO([b"abc"])
1526        bufio = self.tp(rawio)
1527        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1528        self.assertRaises(ValueError, bufio.read)
1529        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1530        self.assertRaises(ValueError, bufio.read)
1531        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1532        self.assertRaises(ValueError, bufio.read)
1533
1534    def test_misbehaved_io_read(self):
1535        rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1536        bufio = self.tp(rawio)
1537        # _pyio.BufferedReader seems to implement reading different, so that
1538        # checking this is not so easy.
1539        self.assertRaises(OSError, bufio.read, 10)
1540
1541    def test_garbage_collection(self):
1542        # C BufferedReader objects are collected.
1543        # The Python version has __del__, so it ends into gc.garbage instead
1544        self.addCleanup(support.unlink, support.TESTFN)
1545        with support.check_warnings(('', ResourceWarning)):
1546            rawio = self.FileIO(support.TESTFN, "w+b")
1547            f = self.tp(rawio)
1548            f.f = f
1549            wr = weakref.ref(f)
1550            del f
1551            support.gc_collect()
1552        self.assertIsNone(wr(), wr)
1553
1554    def test_args_error(self):
1555        # Issue #17275
1556        with self.assertRaisesRegex(TypeError, "BufferedReader"):
1557            self.tp(io.BytesIO(), 1024, 1024, 1024)
1558
1559
1560class PyBufferedReaderTest(BufferedReaderTest):
1561    tp = pyio.BufferedReader
1562
1563
1564class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1565    write_mode = "wb"
1566
1567    def test_constructor(self):
1568        rawio = self.MockRawIO()
1569        bufio = self.tp(rawio)
1570        bufio.__init__(rawio)
1571        bufio.__init__(rawio, buffer_size=1024)
1572        bufio.__init__(rawio, buffer_size=16)
1573        self.assertEqual(3, bufio.write(b"abc"))
1574        bufio.flush()
1575        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1576        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1577        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1578        bufio.__init__(rawio)
1579        self.assertEqual(3, bufio.write(b"ghi"))
1580        bufio.flush()
1581        self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
1582
1583    def test_uninitialized(self):
1584        bufio = self.tp.__new__(self.tp)
1585        del bufio
1586        bufio = self.tp.__new__(self.tp)
1587        self.assertRaisesRegex((ValueError, AttributeError),
1588                               'uninitialized|has no attribute',
1589                               bufio.write, b'')
1590        bufio.__init__(self.MockRawIO())
1591        self.assertEqual(bufio.write(b''), 0)
1592
1593    def test_detach_flush(self):
1594        raw = self.MockRawIO()
1595        buf = self.tp(raw)
1596        buf.write(b"howdy!")
1597        self.assertFalse(raw._write_stack)
1598        buf.detach()
1599        self.assertEqual(raw._write_stack, [b"howdy!"])
1600
1601    def test_write(self):
1602        # Write to the buffered IO but don't overflow the buffer.
1603        writer = self.MockRawIO()
1604        bufio = self.tp(writer, 8)
1605        bufio.write(b"abc")
1606        self.assertFalse(writer._write_stack)
1607        buffer = bytearray(b"def")
1608        bufio.write(buffer)
1609        buffer[:] = b"***"  # Overwrite our copy of the data
1610        bufio.flush()
1611        self.assertEqual(b"".join(writer._write_stack), b"abcdef")
1612
1613    def test_write_overflow(self):
1614        writer = self.MockRawIO()
1615        bufio = self.tp(writer, 8)
1616        contents = b"abcdefghijklmnop"
1617        for n in range(0, len(contents), 3):
1618            bufio.write(contents[n:n+3])
1619        flushed = b"".join(writer._write_stack)
1620        # At least (total - 8) bytes were implicitly flushed, perhaps more
1621        # depending on the implementation.
1622        self.assertTrue(flushed.startswith(contents[:-8]), flushed)
1623
1624    def check_writes(self, intermediate_func):
1625        # Lots of writes, test the flushed output is as expected.
1626        contents = bytes(range(256)) * 1000
1627        n = 0
1628        writer = self.MockRawIO()
1629        bufio = self.tp(writer, 13)
1630        # Generator of write sizes: repeat each N 15 times then proceed to N+1
1631        def gen_sizes():
1632            for size in count(1):
1633                for i in range(15):
1634                    yield size
1635        sizes = gen_sizes()
1636        while n < len(contents):
1637            size = min(next(sizes), len(contents) - n)
1638            self.assertEqual(bufio.write(contents[n:n+size]), size)
1639            intermediate_func(bufio)
1640            n += size
1641        bufio.flush()
1642        self.assertEqual(contents, b"".join(writer._write_stack))
1643
1644    def test_writes(self):
1645        self.check_writes(lambda bufio: None)
1646
1647    def test_writes_and_flushes(self):
1648        self.check_writes(lambda bufio: bufio.flush())
1649
1650    def test_writes_and_seeks(self):
1651        def _seekabs(bufio):
1652            pos = bufio.tell()
1653            bufio.seek(pos + 1, 0)
1654            bufio.seek(pos - 1, 0)
1655            bufio.seek(pos, 0)
1656        self.check_writes(_seekabs)
1657        def _seekrel(bufio):
1658            pos = bufio.seek(0, 1)
1659            bufio.seek(+1, 1)
1660            bufio.seek(-1, 1)
1661            bufio.seek(pos, 0)
1662        self.check_writes(_seekrel)
1663
1664    def test_writes_and_truncates(self):
1665        self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
1666
1667    def test_write_non_blocking(self):
1668        raw = self.MockNonBlockWriterIO()
1669        bufio = self.tp(raw, 8)
1670
1671        self.assertEqual(bufio.write(b"abcd"), 4)
1672        self.assertEqual(bufio.write(b"efghi"), 5)
1673        # 1 byte will be written, the rest will be buffered
1674        raw.block_on(b"k")
1675        self.assertEqual(bufio.write(b"jklmn"), 5)
1676
1677        # 8 bytes will be written, 8 will be buffered and the rest will be lost
1678        raw.block_on(b"0")
1679        try:
1680            bufio.write(b"opqrwxyz0123456789")
1681        except self.BlockingIOError as e:
1682            written = e.characters_written
1683        else:
1684            self.fail("BlockingIOError should have been raised")
1685        self.assertEqual(written, 16)
1686        self.assertEqual(raw.pop_written(),
1687            b"abcdefghijklmnopqrwxyz")
1688
1689        self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
1690        s = raw.pop_written()
1691        # Previously buffered bytes were flushed
1692        self.assertTrue(s.startswith(b"01234567A"), s)
1693
1694    def test_write_and_rewind(self):
1695        raw = io.BytesIO()
1696        bufio = self.tp(raw, 4)
1697        self.assertEqual(bufio.write(b"abcdef"), 6)
1698        self.assertEqual(bufio.tell(), 6)
1699        bufio.seek(0, 0)
1700        self.assertEqual(bufio.write(b"XY"), 2)
1701        bufio.seek(6, 0)
1702        self.assertEqual(raw.getvalue(), b"XYcdef")
1703        self.assertEqual(bufio.write(b"123456"), 6)
1704        bufio.flush()
1705        self.assertEqual(raw.getvalue(), b"XYcdef123456")
1706
1707    def test_flush(self):
1708        writer = self.MockRawIO()
1709        bufio = self.tp(writer, 8)
1710        bufio.write(b"abc")
1711        bufio.flush()
1712        self.assertEqual(b"abc", writer._write_stack[0])
1713
1714    def test_writelines(self):
1715        l = [b'ab', b'cd', b'ef']
1716        writer = self.MockRawIO()
1717        bufio = self.tp(writer, 8)
1718        bufio.writelines(l)
1719        bufio.flush()
1720        self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1721
1722    def test_writelines_userlist(self):
1723        l = UserList([b'ab', b'cd', b'ef'])
1724        writer = self.MockRawIO()
1725        bufio = self.tp(writer, 8)
1726        bufio.writelines(l)
1727        bufio.flush()
1728        self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1729
1730    def test_writelines_error(self):
1731        writer = self.MockRawIO()
1732        bufio = self.tp(writer, 8)
1733        self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1734        self.assertRaises(TypeError, bufio.writelines, None)
1735        self.assertRaises(TypeError, bufio.writelines, 'abc')
1736
1737    def test_destructor(self):
1738        writer = self.MockRawIO()
1739        bufio = self.tp(writer, 8)
1740        bufio.write(b"abc")
1741        del bufio
1742        support.gc_collect()
1743        self.assertEqual(b"abc", writer._write_stack[0])
1744
1745    def test_truncate(self):
1746        # Truncate implicitly flushes the buffer.
1747        self.addCleanup(support.unlink, support.TESTFN)
1748        with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
1749            bufio = self.tp(raw, 8)
1750            bufio.write(b"abcdef")
1751            self.assertEqual(bufio.truncate(3), 3)
1752            self.assertEqual(bufio.tell(), 6)
1753        with self.open(support.TESTFN, "rb", buffering=0) as f:
1754            self.assertEqual(f.read(), b"abc")
1755
1756    def test_truncate_after_write(self):
1757        # Ensure that truncate preserves the file position after
1758        # writes longer than the buffer size.
1759        # Issue: https://bugs.python.org/issue32228
1760        self.addCleanup(support.unlink, support.TESTFN)
1761        with self.open(support.TESTFN, "wb") as f:
1762            # Fill with some buffer
1763            f.write(b'\x00' * 10000)
1764        buffer_sizes = [8192, 4096, 200]
1765        for buffer_size in buffer_sizes:
1766            with self.open(support.TESTFN, "r+b", buffering=buffer_size) as f:
1767                f.write(b'\x00' * (buffer_size + 1))
1768                # After write write_pos and write_end are set to 0
1769                f.read(1)
1770                # read operation makes sure that pos != raw_pos
1771                f.truncate()
1772                self.assertEqual(f.tell(), buffer_size + 2)
1773
1774    @support.requires_resource('cpu')
1775    def test_threads(self):
1776        try:
1777            # Write out many bytes from many threads and test they were
1778            # all flushed.
1779            N = 1000
1780            contents = bytes(range(256)) * N
1781            sizes = cycle([1, 19])
1782            n = 0
1783            queue = deque()
1784            while n < len(contents):
1785                size = next(sizes)
1786                queue.append(contents[n:n+size])
1787                n += size
1788            del contents
1789            # We use a real file object because it allows us to
1790            # exercise situations where the GIL is released before
1791            # writing the buffer to the raw streams. This is in addition
1792            # to concurrency issues due to switching threads in the middle
1793            # of Python code.
1794            with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
1795                bufio = self.tp(raw, 8)
1796                errors = []
1797                def f():
1798                    try:
1799                        while True:
1800                            try:
1801                                s = queue.popleft()
1802                            except IndexError:
1803                                return
1804                            bufio.write(s)
1805                    except Exception as e:
1806                        errors.append(e)
1807                        raise
1808                threads = [threading.Thread(target=f) for x in range(20)]
1809                with support.start_threads(threads):
1810                    time.sleep(0.02) # yield
1811                self.assertFalse(errors,
1812                    "the following exceptions were caught: %r" % errors)
1813                bufio.close()
1814            with self.open(support.TESTFN, "rb") as f:
1815                s = f.read()
1816            for i in range(256):
1817                self.assertEqual(s.count(bytes([i])), N)
1818        finally:
1819            support.unlink(support.TESTFN)
1820
1821    def test_misbehaved_io(self):
1822        rawio = self.MisbehavedRawIO()
1823        bufio = self.tp(rawio, 5)
1824        self.assertRaises(OSError, bufio.seek, 0)
1825        self.assertRaises(OSError, bufio.tell)
1826        self.assertRaises(OSError, bufio.write, b"abcdef")
1827
1828    def test_max_buffer_size_removal(self):
1829        with self.assertRaises(TypeError):
1830            self.tp(self.MockRawIO(), 8, 12)
1831
1832    def test_write_error_on_close(self):
1833        raw = self.MockRawIO()
1834        def bad_write(b):
1835            raise OSError()
1836        raw.write = bad_write
1837        b = self.tp(raw)
1838        b.write(b'spam')
1839        self.assertRaises(OSError, b.close) # exception not swallowed
1840        self.assertTrue(b.closed)
1841
1842    def test_slow_close_from_thread(self):
1843        # Issue #31976
1844        rawio = self.SlowFlushRawIO()
1845        bufio = self.tp(rawio, 8)
1846        t = threading.Thread(target=bufio.close)
1847        t.start()
1848        rawio.in_flush.wait()
1849        self.assertRaises(ValueError, bufio.write, b'spam')
1850        self.assertTrue(bufio.closed)
1851        t.join()
1852
1853
1854
1855class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
1856    tp = io.BufferedWriter
1857
1858    @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1859                     "instead of returning NULL for malloc failure.")
1860    def test_constructor(self):
1861        BufferedWriterTest.test_constructor(self)
1862        # The allocation can succeed on 32-bit builds, e.g. with more
1863        # than 2 GiB RAM and a 64-bit kernel.
1864        if sys.maxsize > 0x7FFFFFFF:
1865            rawio = self.MockRawIO()
1866            bufio = self.tp(rawio)
1867            self.assertRaises((OverflowError, MemoryError, ValueError),
1868                bufio.__init__, rawio, sys.maxsize)
1869
1870    def test_initialization(self):
1871        rawio = self.MockRawIO()
1872        bufio = self.tp(rawio)
1873        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1874        self.assertRaises(ValueError, bufio.write, b"def")
1875        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1876        self.assertRaises(ValueError, bufio.write, b"def")
1877        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1878        self.assertRaises(ValueError, bufio.write, b"def")
1879
1880    def test_garbage_collection(self):
1881        # C BufferedWriter objects are collected, and collecting them flushes
1882        # all data to disk.
1883        # The Python version has __del__, so it ends into gc.garbage instead
1884        self.addCleanup(support.unlink, support.TESTFN)
1885        with support.check_warnings(('', ResourceWarning)):
1886            rawio = self.FileIO(support.TESTFN, "w+b")
1887            f = self.tp(rawio)
1888            f.write(b"123xxx")
1889            f.x = f
1890            wr = weakref.ref(f)
1891            del f
1892            support.gc_collect()
1893        self.assertIsNone(wr(), wr)
1894        with self.open(support.TESTFN, "rb") as f:
1895            self.assertEqual(f.read(), b"123xxx")
1896
1897    def test_args_error(self):
1898        # Issue #17275
1899        with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1900            self.tp(io.BytesIO(), 1024, 1024, 1024)
1901
1902
1903class PyBufferedWriterTest(BufferedWriterTest):
1904    tp = pyio.BufferedWriter
1905
1906class BufferedRWPairTest(unittest.TestCase):
1907
1908    def test_constructor(self):
1909        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1910        self.assertFalse(pair.closed)
1911
1912    def test_uninitialized(self):
1913        pair = self.tp.__new__(self.tp)
1914        del pair
1915        pair = self.tp.__new__(self.tp)
1916        self.assertRaisesRegex((ValueError, AttributeError),
1917                               'uninitialized|has no attribute',
1918                               pair.read, 0)
1919        self.assertRaisesRegex((ValueError, AttributeError),
1920                               'uninitialized|has no attribute',
1921                               pair.write, b'')
1922        pair.__init__(self.MockRawIO(), self.MockRawIO())
1923        self.assertEqual(pair.read(0), b'')
1924        self.assertEqual(pair.write(b''), 0)
1925
1926    def test_detach(self):
1927        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1928        self.assertRaises(self.UnsupportedOperation, pair.detach)
1929
1930    def test_constructor_max_buffer_size_removal(self):
1931        with self.assertRaises(TypeError):
1932            self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1933
1934    def test_constructor_with_not_readable(self):
1935        class NotReadable(MockRawIO):
1936            def readable(self):
1937                return False
1938
1939        self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
1940
1941    def test_constructor_with_not_writeable(self):
1942        class NotWriteable(MockRawIO):
1943            def writable(self):
1944                return False
1945
1946        self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
1947
1948    def test_read(self):
1949        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1950
1951        self.assertEqual(pair.read(3), b"abc")
1952        self.assertEqual(pair.read(1), b"d")
1953        self.assertEqual(pair.read(), b"ef")
1954        pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1955        self.assertEqual(pair.read(None), b"abc")
1956
1957    def test_readlines(self):
1958        pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1959        self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1960        self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1961        self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
1962
1963    def test_read1(self):
1964        # .read1() is delegated to the underlying reader object, so this test
1965        # can be shallow.
1966        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1967
1968        self.assertEqual(pair.read1(3), b"abc")
1969        self.assertEqual(pair.read1(), b"def")
1970
1971    def test_readinto(self):
1972        for method in ("readinto", "readinto1"):
1973            with self.subTest(method):
1974                pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1975
1976                data = byteslike(b'\0' * 5)
1977                self.assertEqual(getattr(pair, method)(data), 5)
1978                self.assertEqual(bytes(data), b"abcde")
1979
1980    def test_write(self):
1981        w = self.MockRawIO()
1982        pair = self.tp(self.MockRawIO(), w)
1983
1984        pair.write(b"abc")
1985        pair.flush()
1986        buffer = bytearray(b"def")
1987        pair.write(buffer)
1988        buffer[:] = b"***"  # Overwrite our copy of the data
1989        pair.flush()
1990        self.assertEqual(w._write_stack, [b"abc", b"def"])
1991
1992    def test_peek(self):
1993        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1994
1995        self.assertTrue(pair.peek(3).startswith(b"abc"))
1996        self.assertEqual(pair.read(3), b"abc")
1997
1998    def test_readable(self):
1999        pair = self.tp(self.MockRawIO(), self.MockRawIO())
2000        self.assertTrue(pair.readable())
2001
2002    def test_writeable(self):
2003        pair = self.tp(self.MockRawIO(), self.MockRawIO())
2004        self.assertTrue(pair.writable())
2005
2006    def test_seekable(self):
2007        # BufferedRWPairs are never seekable, even if their readers and writers
2008        # are.
2009        pair = self.tp(self.MockRawIO(), self.MockRawIO())
2010        self.assertFalse(pair.seekable())
2011
2012    # .flush() is delegated to the underlying writer object and has been
2013    # tested in the test_write method.
2014
2015    def test_close_and_closed(self):
2016        pair = self.tp(self.MockRawIO(), self.MockRawIO())
2017        self.assertFalse(pair.closed)
2018        pair.close()
2019        self.assertTrue(pair.closed)
2020
2021    def test_reader_close_error_on_close(self):
2022        def reader_close():
2023            reader_non_existing
2024        reader = self.MockRawIO()
2025        reader.close = reader_close
2026        writer = self.MockRawIO()
2027        pair = self.tp(reader, writer)
2028        with self.assertRaises(NameError) as err:
2029            pair.close()
2030        self.assertIn('reader_non_existing', str(err.exception))
2031        self.assertTrue(pair.closed)
2032        self.assertFalse(reader.closed)
2033        self.assertTrue(writer.closed)
2034
2035    def test_writer_close_error_on_close(self):
2036        def writer_close():
2037            writer_non_existing
2038        reader = self.MockRawIO()
2039        writer = self.MockRawIO()
2040        writer.close = writer_close
2041        pair = self.tp(reader, writer)
2042        with self.assertRaises(NameError) as err:
2043            pair.close()
2044        self.assertIn('writer_non_existing', str(err.exception))
2045        self.assertFalse(pair.closed)
2046        self.assertTrue(reader.closed)
2047        self.assertFalse(writer.closed)
2048
2049    def test_reader_writer_close_error_on_close(self):
2050        def reader_close():
2051            reader_non_existing
2052        def writer_close():
2053            writer_non_existing
2054        reader = self.MockRawIO()
2055        reader.close = reader_close
2056        writer = self.MockRawIO()
2057        writer.close = writer_close
2058        pair = self.tp(reader, writer)
2059        with self.assertRaises(NameError) as err:
2060            pair.close()
2061        self.assertIn('reader_non_existing', str(err.exception))
2062        self.assertIsInstance(err.exception.__context__, NameError)
2063        self.assertIn('writer_non_existing', str(err.exception.__context__))
2064        self.assertFalse(pair.closed)
2065        self.assertFalse(reader.closed)
2066        self.assertFalse(writer.closed)
2067
2068    def test_isatty(self):
2069        class SelectableIsAtty(MockRawIO):
2070            def __init__(self, isatty):
2071                MockRawIO.__init__(self)
2072                self._isatty = isatty
2073
2074            def isatty(self):
2075                return self._isatty
2076
2077        pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
2078        self.assertFalse(pair.isatty())
2079
2080        pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
2081        self.assertTrue(pair.isatty())
2082
2083        pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
2084        self.assertTrue(pair.isatty())
2085
2086        pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
2087        self.assertTrue(pair.isatty())
2088
2089    def test_weakref_clearing(self):
2090        brw = self.tp(self.MockRawIO(), self.MockRawIO())
2091        ref = weakref.ref(brw)
2092        brw = None
2093        ref = None # Shouldn't segfault.
2094
2095class CBufferedRWPairTest(BufferedRWPairTest):
2096    tp = io.BufferedRWPair
2097
2098class PyBufferedRWPairTest(BufferedRWPairTest):
2099    tp = pyio.BufferedRWPair
2100
2101
2102class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
2103    read_mode = "rb+"
2104    write_mode = "wb+"
2105
2106    def test_constructor(self):
2107        BufferedReaderTest.test_constructor(self)
2108        BufferedWriterTest.test_constructor(self)
2109
2110    def test_uninitialized(self):
2111        BufferedReaderTest.test_uninitialized(self)
2112        BufferedWriterTest.test_uninitialized(self)
2113
2114    def test_read_and_write(self):
2115        raw = self.MockRawIO((b"asdf", b"ghjk"))
2116        rw = self.tp(raw, 8)
2117
2118        self.assertEqual(b"as", rw.read(2))
2119        rw.write(b"ddd")
2120        rw.write(b"eee")
2121        self.assertFalse(raw._write_stack) # Buffer writes
2122        self.assertEqual(b"ghjk", rw.read())
2123        self.assertEqual(b"dddeee", raw._write_stack[0])
2124
2125    def test_seek_and_tell(self):
2126        raw = self.BytesIO(b"asdfghjkl")
2127        rw = self.tp(raw)
2128
2129        self.assertEqual(b"as", rw.read(2))
2130        self.assertEqual(2, rw.tell())
2131        rw.seek(0, 0)
2132        self.assertEqual(b"asdf", rw.read(4))
2133
2134        rw.write(b"123f")
2135        rw.seek(0, 0)
2136        self.assertEqual(b"asdf123fl", rw.read())
2137        self.assertEqual(9, rw.tell())
2138        rw.seek(-4, 2)
2139        self.assertEqual(5, rw.tell())
2140        rw.seek(2, 1)
2141        self.assertEqual(7, rw.tell())
2142        self.assertEqual(b"fl", rw.read(11))
2143        rw.flush()
2144        self.assertEqual(b"asdf123fl", raw.getvalue())
2145
2146        self.assertRaises(TypeError, rw.seek, 0.0)
2147
2148    def check_flush_and_read(self, read_func):
2149        raw = self.BytesIO(b"abcdefghi")
2150        bufio = self.tp(raw)
2151
2152        self.assertEqual(b"ab", read_func(bufio, 2))
2153        bufio.write(b"12")
2154        self.assertEqual(b"ef", read_func(bufio, 2))
2155        self.assertEqual(6, bufio.tell())
2156        bufio.flush()
2157        self.assertEqual(6, bufio.tell())
2158        self.assertEqual(b"ghi", read_func(bufio))
2159        raw.seek(0, 0)
2160        raw.write(b"XYZ")
2161        # flush() resets the read buffer
2162        bufio.flush()
2163        bufio.seek(0, 0)
2164        self.assertEqual(b"XYZ", read_func(bufio, 3))
2165
2166    def test_flush_and_read(self):
2167        self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2168
2169    def test_flush_and_readinto(self):
2170        def _readinto(bufio, n=-1):
2171            b = bytearray(n if n >= 0 else 9999)
2172            n = bufio.readinto(b)
2173            return bytes(b[:n])
2174        self.check_flush_and_read(_readinto)
2175
2176    def test_flush_and_peek(self):
2177        def _peek(bufio, n=-1):
2178            # This relies on the fact that the buffer can contain the whole
2179            # raw stream, otherwise peek() can return less.
2180            b = bufio.peek(n)
2181            if n != -1:
2182                b = b[:n]
2183            bufio.seek(len(b), 1)
2184            return b
2185        self.check_flush_and_read(_peek)
2186
2187    def test_flush_and_write(self):
2188        raw = self.BytesIO(b"abcdefghi")
2189        bufio = self.tp(raw)
2190
2191        bufio.write(b"123")
2192        bufio.flush()
2193        bufio.write(b"45")
2194        bufio.flush()
2195        bufio.seek(0, 0)
2196        self.assertEqual(b"12345fghi", raw.getvalue())
2197        self.assertEqual(b"12345fghi", bufio.read())
2198
2199    def test_threads(self):
2200        BufferedReaderTest.test_threads(self)
2201        BufferedWriterTest.test_threads(self)
2202
2203    def test_writes_and_peek(self):
2204        def _peek(bufio):
2205            bufio.peek(1)
2206        self.check_writes(_peek)
2207        def _peek(bufio):
2208            pos = bufio.tell()
2209            bufio.seek(-1, 1)
2210            bufio.peek(1)
2211            bufio.seek(pos, 0)
2212        self.check_writes(_peek)
2213
2214    def test_writes_and_reads(self):
2215        def _read(bufio):
2216            bufio.seek(-1, 1)
2217            bufio.read(1)
2218        self.check_writes(_read)
2219
2220    def test_writes_and_read1s(self):
2221        def _read1(bufio):
2222            bufio.seek(-1, 1)
2223            bufio.read1(1)
2224        self.check_writes(_read1)
2225
2226    def test_writes_and_readintos(self):
2227        def _read(bufio):
2228            bufio.seek(-1, 1)
2229            bufio.readinto(bytearray(1))
2230        self.check_writes(_read)
2231
2232    def test_write_after_readahead(self):
2233        # Issue #6629: writing after the buffer was filled by readahead should
2234        # first rewind the raw stream.
2235        for overwrite_size in [1, 5]:
2236            raw = self.BytesIO(b"A" * 10)
2237            bufio = self.tp(raw, 4)
2238            # Trigger readahead
2239            self.assertEqual(bufio.read(1), b"A")
2240            self.assertEqual(bufio.tell(), 1)
2241            # Overwriting should rewind the raw stream if it needs so
2242            bufio.write(b"B" * overwrite_size)
2243            self.assertEqual(bufio.tell(), overwrite_size + 1)
2244            # If the write size was smaller than the buffer size, flush() and
2245            # check that rewind happens.
2246            bufio.flush()
2247            self.assertEqual(bufio.tell(), overwrite_size + 1)
2248            s = raw.getvalue()
2249            self.assertEqual(s,
2250                b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2251
2252    def test_write_rewind_write(self):
2253        # Various combinations of reading / writing / seeking backwards / writing again
2254        def mutate(bufio, pos1, pos2):
2255            assert pos2 >= pos1
2256            # Fill the buffer
2257            bufio.seek(pos1)
2258            bufio.read(pos2 - pos1)
2259            bufio.write(b'\x02')
2260            # This writes earlier than the previous write, but still inside
2261            # the buffer.
2262            bufio.seek(pos1)
2263            bufio.write(b'\x01')
2264
2265        b = b"\x80\x81\x82\x83\x84"
2266        for i in range(0, len(b)):
2267            for j in range(i, len(b)):
2268                raw = self.BytesIO(b)
2269                bufio = self.tp(raw, 100)
2270                mutate(bufio, i, j)
2271                bufio.flush()
2272                expected = bytearray(b)
2273                expected[j] = 2
2274                expected[i] = 1
2275                self.assertEqual(raw.getvalue(), expected,
2276                                 "failed result for i=%d, j=%d" % (i, j))
2277
2278    def test_truncate_after_read_or_write(self):
2279        raw = self.BytesIO(b"A" * 10)
2280        bufio = self.tp(raw, 100)
2281        self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2282        self.assertEqual(bufio.truncate(), 2)
2283        self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2284        self.assertEqual(bufio.truncate(), 4)
2285
2286    def test_misbehaved_io(self):
2287        BufferedReaderTest.test_misbehaved_io(self)
2288        BufferedWriterTest.test_misbehaved_io(self)
2289
2290    def test_interleaved_read_write(self):
2291        # Test for issue #12213
2292        with self.BytesIO(b'abcdefgh') as raw:
2293            with self.tp(raw, 100) as f:
2294                f.write(b"1")
2295                self.assertEqual(f.read(1), b'b')
2296                f.write(b'2')
2297                self.assertEqual(f.read1(1), b'd')
2298                f.write(b'3')
2299                buf = bytearray(1)
2300                f.readinto(buf)
2301                self.assertEqual(buf, b'f')
2302                f.write(b'4')
2303                self.assertEqual(f.peek(1), b'h')
2304                f.flush()
2305                self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2306
2307        with self.BytesIO(b'abc') as raw:
2308            with self.tp(raw, 100) as f:
2309                self.assertEqual(f.read(1), b'a')
2310                f.write(b"2")
2311                self.assertEqual(f.read(1), b'c')
2312                f.flush()
2313                self.assertEqual(raw.getvalue(), b'a2c')
2314
2315    def test_interleaved_readline_write(self):
2316        with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2317            with self.tp(raw) as f:
2318                f.write(b'1')
2319                self.assertEqual(f.readline(), b'b\n')
2320                f.write(b'2')
2321                self.assertEqual(f.readline(), b'def\n')
2322                f.write(b'3')
2323                self.assertEqual(f.readline(), b'\n')
2324                f.flush()
2325                self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2326
2327    # You can't construct a BufferedRandom over a non-seekable stream.
2328    test_unseekable = None
2329
2330
2331class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
2332    tp = io.BufferedRandom
2333
2334    @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
2335                     "instead of returning NULL for malloc failure.")
2336    def test_constructor(self):
2337        BufferedRandomTest.test_constructor(self)
2338        # The allocation can succeed on 32-bit builds, e.g. with more
2339        # than 2 GiB RAM and a 64-bit kernel.
2340        if sys.maxsize > 0x7FFFFFFF:
2341            rawio = self.MockRawIO()
2342            bufio = self.tp(rawio)
2343            self.assertRaises((OverflowError, MemoryError, ValueError),
2344                bufio.__init__, rawio, sys.maxsize)
2345
2346    def test_garbage_collection(self):
2347        CBufferedReaderTest.test_garbage_collection(self)
2348        CBufferedWriterTest.test_garbage_collection(self)
2349
2350    def test_args_error(self):
2351        # Issue #17275
2352        with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2353            self.tp(io.BytesIO(), 1024, 1024, 1024)
2354
2355
2356class PyBufferedRandomTest(BufferedRandomTest):
2357    tp = pyio.BufferedRandom
2358
2359
2360# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2361# properties:
2362#   - A single output character can correspond to many bytes of input.
2363#   - The number of input bytes to complete the character can be
2364#     undetermined until the last input byte is received.
2365#   - The number of input bytes can vary depending on previous input.
2366#   - A single input byte can correspond to many characters of output.
2367#   - The number of output characters can be undetermined until the
2368#     last input byte is received.
2369#   - The number of output characters can vary depending on previous input.
2370
2371class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2372    """
2373    For testing seek/tell behavior with a stateful, buffering decoder.
2374
2375    Input is a sequence of words.  Words may be fixed-length (length set
2376    by input) or variable-length (period-terminated).  In variable-length
2377    mode, extra periods are ignored.  Possible words are:
2378      - 'i' followed by a number sets the input length, I (maximum 99).
2379        When I is set to 0, words are space-terminated.
2380      - 'o' followed by a number sets the output length, O (maximum 99).
2381      - Any other word is converted into a word followed by a period on
2382        the output.  The output word consists of the input word truncated
2383        or padded out with hyphens to make its length equal to O.  If O
2384        is 0, the word is output verbatim without truncating or padding.
2385    I and O are initially set to 1.  When I changes, any buffered input is
2386    re-scanned according to the new I.  EOF also terminates the last word.
2387    """
2388
2389    def __init__(self, errors='strict'):
2390        codecs.IncrementalDecoder.__init__(self, errors)
2391        self.reset()
2392
2393    def __repr__(self):
2394        return '<SID %x>' % id(self)
2395
2396    def reset(self):
2397        self.i = 1
2398        self.o = 1
2399        self.buffer = bytearray()
2400
2401    def getstate(self):
2402        i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2403        return bytes(self.buffer), i*100 + o
2404
2405    def setstate(self, state):
2406        buffer, io = state
2407        self.buffer = bytearray(buffer)
2408        i, o = divmod(io, 100)
2409        self.i, self.o = i ^ 1, o ^ 1
2410
2411    def decode(self, input, final=False):
2412        output = ''
2413        for b in input:
2414            if self.i == 0: # variable-length, terminated with period
2415                if b == ord('.'):
2416                    if self.buffer:
2417                        output += self.process_word()
2418                else:
2419                    self.buffer.append(b)
2420            else: # fixed-length, terminate after self.i bytes
2421                self.buffer.append(b)
2422                if len(self.buffer) == self.i:
2423                    output += self.process_word()
2424        if final and self.buffer: # EOF terminates the last word
2425            output += self.process_word()
2426        return output
2427
2428    def process_word(self):
2429        output = ''
2430        if self.buffer[0] == ord('i'):
2431            self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2432        elif self.buffer[0] == ord('o'):
2433            self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2434        else:
2435            output = self.buffer.decode('ascii')
2436            if len(output) < self.o:
2437                output += '-'*self.o # pad out with hyphens
2438            if self.o:
2439                output = output[:self.o] # truncate to output length
2440            output += '.'
2441        self.buffer = bytearray()
2442        return output
2443
2444    codecEnabled = False
2445
2446    @classmethod
2447    def lookupTestDecoder(cls, name):
2448        if cls.codecEnabled and name == 'test_decoder':
2449            latin1 = codecs.lookup('latin-1')
2450            return codecs.CodecInfo(
2451                name='test_decoder', encode=latin1.encode, decode=None,
2452                incrementalencoder=None,
2453                streamreader=None, streamwriter=None,
2454                incrementaldecoder=cls)
2455
2456# Register the previous decoder for testing.
2457# Disabled by default, tests will enable it.
2458codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2459
2460
2461class StatefulIncrementalDecoderTest(unittest.TestCase):
2462    """
2463    Make sure the StatefulIncrementalDecoder actually works.
2464    """
2465
2466    test_cases = [
2467        # I=1, O=1 (fixed-length input == fixed-length output)
2468        (b'abcd', False, 'a.b.c.d.'),
2469        # I=0, O=0 (variable-length input, variable-length output)
2470        (b'oiabcd', True, 'abcd.'),
2471        # I=0, O=0 (should ignore extra periods)
2472        (b'oi...abcd...', True, 'abcd.'),
2473        # I=0, O=6 (variable-length input, fixed-length output)
2474        (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2475        # I=2, O=6 (fixed-length input < fixed-length output)
2476        (b'i.i2.o6xyz', True, 'xy----.z-----.'),
2477        # I=6, O=3 (fixed-length input > fixed-length output)
2478        (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2479        # I=0, then 3; O=29, then 15 (with longer output)
2480        (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2481         'a----------------------------.' +
2482         'b----------------------------.' +
2483         'cde--------------------------.' +
2484         'abcdefghijabcde.' +
2485         'a.b------------.' +
2486         '.c.------------.' +
2487         'd.e------------.' +
2488         'k--------------.' +
2489         'l--------------.' +
2490         'm--------------.')
2491    ]
2492
2493    def test_decoder(self):
2494        # Try a few one-shot test cases.
2495        for input, eof, output in self.test_cases:
2496            d = StatefulIncrementalDecoder()
2497            self.assertEqual(d.decode(input, eof), output)
2498
2499        # Also test an unfinished decode, followed by forcing EOF.
2500        d = StatefulIncrementalDecoder()
2501        self.assertEqual(d.decode(b'oiabcd'), '')
2502        self.assertEqual(d.decode(b'', 1), 'abcd.')
2503
2504class TextIOWrapperTest(unittest.TestCase):
2505
2506    def setUp(self):
2507        self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2508        self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
2509        support.unlink(support.TESTFN)
2510
2511    def tearDown(self):
2512        support.unlink(support.TESTFN)
2513
2514    def test_constructor(self):
2515        r = self.BytesIO(b"\xc3\xa9\n\n")
2516        b = self.BufferedReader(r, 1000)
2517        t = self.TextIOWrapper(b)
2518        t.__init__(b, encoding="latin-1", newline="\r\n")
2519        self.assertEqual(t.encoding, "latin-1")
2520        self.assertEqual(t.line_buffering, False)
2521        t.__init__(b, encoding="utf-8", line_buffering=True)
2522        self.assertEqual(t.encoding, "utf-8")
2523        self.assertEqual(t.line_buffering, True)
2524        self.assertEqual("\xe9\n", t.readline())
2525        self.assertRaises(TypeError, t.__init__, b, newline=42)
2526        self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2527
2528    def test_uninitialized(self):
2529        t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2530        del t
2531        t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2532        self.assertRaises(Exception, repr, t)
2533        self.assertRaisesRegex((ValueError, AttributeError),
2534                               'uninitialized|has no attribute',
2535                               t.read, 0)
2536        t.__init__(self.MockRawIO())
2537        self.assertEqual(t.read(0), '')
2538
2539    def test_non_text_encoding_codecs_are_rejected(self):
2540        # Ensure the constructor complains if passed a codec that isn't
2541        # marked as a text encoding
2542        # http://bugs.python.org/issue20404
2543        r = self.BytesIO()
2544        b = self.BufferedWriter(r)
2545        with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2546            self.TextIOWrapper(b, encoding="hex")
2547
2548    def test_detach(self):
2549        r = self.BytesIO()
2550        b = self.BufferedWriter(r)
2551        t = self.TextIOWrapper(b)
2552        self.assertIs(t.detach(), b)
2553
2554        t = self.TextIOWrapper(b, encoding="ascii")
2555        t.write("howdy")
2556        self.assertFalse(r.getvalue())
2557        t.detach()
2558        self.assertEqual(r.getvalue(), b"howdy")
2559        self.assertRaises(ValueError, t.detach)
2560
2561        # Operations independent of the detached stream should still work
2562        repr(t)
2563        self.assertEqual(t.encoding, "ascii")
2564        self.assertEqual(t.errors, "strict")
2565        self.assertFalse(t.line_buffering)
2566        self.assertFalse(t.write_through)
2567
2568    def test_repr(self):
2569        raw = self.BytesIO("hello".encode("utf-8"))
2570        b = self.BufferedReader(raw)
2571        t = self.TextIOWrapper(b, encoding="utf-8")
2572        modname = self.TextIOWrapper.__module__
2573        self.assertRegex(repr(t),
2574                         r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname)
2575        raw.name = "dummy"
2576        self.assertRegex(repr(t),
2577                         r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
2578        t.mode = "r"
2579        self.assertRegex(repr(t),
2580                         r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
2581        raw.name = b"dummy"
2582        self.assertRegex(repr(t),
2583                         r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
2584
2585        t.buffer.detach()
2586        repr(t)  # Should not raise an exception
2587
2588    def test_recursive_repr(self):
2589        # Issue #25455
2590        raw = self.BytesIO()
2591        t = self.TextIOWrapper(raw)
2592        with support.swap_attr(raw, 'name', t):
2593            try:
2594                repr(t)  # Should not crash
2595            except RuntimeError:
2596                pass
2597
2598    def test_line_buffering(self):
2599        r = self.BytesIO()
2600        b = self.BufferedWriter(r, 1000)
2601        t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
2602        t.write("X")
2603        self.assertEqual(r.getvalue(), b"")  # No flush happened
2604        t.write("Y\nZ")
2605        self.assertEqual(r.getvalue(), b"XY\nZ")  # All got flushed
2606        t.write("A\rB")
2607        self.assertEqual(r.getvalue(), b"XY\nZA\rB")
2608
2609    def test_reconfigure_line_buffering(self):
2610        r = self.BytesIO()
2611        b = self.BufferedWriter(r, 1000)
2612        t = self.TextIOWrapper(b, newline="\n", line_buffering=False)
2613        t.write("AB\nC")
2614        self.assertEqual(r.getvalue(), b"")
2615
2616        t.reconfigure(line_buffering=True)   # implicit flush
2617        self.assertEqual(r.getvalue(), b"AB\nC")
2618        t.write("DEF\nG")
2619        self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2620        t.write("H")
2621        self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2622        t.reconfigure(line_buffering=False)   # implicit flush
2623        self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2624        t.write("IJ")
2625        self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2626
2627        # Keeping default value
2628        t.reconfigure()
2629        t.reconfigure(line_buffering=None)
2630        self.assertEqual(t.line_buffering, False)
2631        t.reconfigure(line_buffering=True)
2632        t.reconfigure()
2633        t.reconfigure(line_buffering=None)
2634        self.assertEqual(t.line_buffering, True)
2635
2636    @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
2637    def test_default_encoding(self):
2638        old_environ = dict(os.environ)
2639        try:
2640            # try to get a user preferred encoding different than the current
2641            # locale encoding to check that TextIOWrapper() uses the current
2642            # locale encoding and not the user preferred encoding
2643            for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2644                if key in os.environ:
2645                    del os.environ[key]
2646
2647            current_locale_encoding = locale.getpreferredencoding(False)
2648            b = self.BytesIO()
2649            t = self.TextIOWrapper(b)
2650            self.assertEqual(t.encoding, current_locale_encoding)
2651        finally:
2652            os.environ.clear()
2653            os.environ.update(old_environ)
2654
2655    @support.cpython_only
2656    @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
2657    def test_device_encoding(self):
2658        # Issue 15989
2659        import _testcapi
2660        b = self.BytesIO()
2661        b.fileno = lambda: _testcapi.INT_MAX + 1
2662        self.assertRaises(OverflowError, self.TextIOWrapper, b)
2663        b.fileno = lambda: _testcapi.UINT_MAX + 1
2664        self.assertRaises(OverflowError, self.TextIOWrapper, b)
2665
2666    def test_encoding(self):
2667        # Check the encoding attribute is always set, and valid
2668        b = self.BytesIO()
2669        t = self.TextIOWrapper(b, encoding="utf-8")
2670        self.assertEqual(t.encoding, "utf-8")
2671        t = self.TextIOWrapper(b)
2672        self.assertIsNotNone(t.encoding)
2673        codecs.lookup(t.encoding)
2674
2675    def test_encoding_errors_reading(self):
2676        # (1) default
2677        b = self.BytesIO(b"abc\n\xff\n")
2678        t = self.TextIOWrapper(b, encoding="ascii")
2679        self.assertRaises(UnicodeError, t.read)
2680        # (2) explicit strict
2681        b = self.BytesIO(b"abc\n\xff\n")
2682        t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
2683        self.assertRaises(UnicodeError, t.read)
2684        # (3) ignore
2685        b = self.BytesIO(b"abc\n\xff\n")
2686        t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
2687        self.assertEqual(t.read(), "abc\n\n")
2688        # (4) replace
2689        b = self.BytesIO(b"abc\n\xff\n")
2690        t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
2691        self.assertEqual(t.read(), "abc\n\ufffd\n")
2692
2693    def test_encoding_errors_writing(self):
2694        # (1) default
2695        b = self.BytesIO()
2696        t = self.TextIOWrapper(b, encoding="ascii")
2697        self.assertRaises(UnicodeError, t.write, "\xff")
2698        # (2) explicit strict
2699        b = self.BytesIO()
2700        t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
2701        self.assertRaises(UnicodeError, t.write, "\xff")
2702        # (3) ignore
2703        b = self.BytesIO()
2704        t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
2705                             newline="\n")
2706        t.write("abc\xffdef\n")
2707        t.flush()
2708        self.assertEqual(b.getvalue(), b"abcdef\n")
2709        # (4) replace
2710        b = self.BytesIO()
2711        t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
2712                             newline="\n")
2713        t.write("abc\xffdef\n")
2714        t.flush()
2715        self.assertEqual(b.getvalue(), b"abc?def\n")
2716
2717    def test_newlines(self):
2718        input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2719
2720        tests = [
2721            [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
2722            [ '', input_lines ],
2723            [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2724            [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2725            [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
2726        ]
2727        encodings = (
2728            'utf-8', 'latin-1',
2729            'utf-16', 'utf-16-le', 'utf-16-be',
2730            'utf-32', 'utf-32-le', 'utf-32-be',
2731        )
2732
2733        # Try a range of buffer sizes to test the case where \r is the last
2734        # character in TextIOWrapper._pending_line.
2735        for encoding in encodings:
2736            # XXX: str.encode() should return bytes
2737            data = bytes(''.join(input_lines).encode(encoding))
2738            for do_reads in (False, True):
2739                for bufsize in range(1, 10):
2740                    for newline, exp_lines in tests:
2741                        bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2742                        textio = self.TextIOWrapper(bufio, newline=newline,
2743                                                  encoding=encoding)
2744                        if do_reads:
2745                            got_lines = []
2746                            while True:
2747                                c2 = textio.read(2)
2748                                if c2 == '':
2749                                    break
2750                                self.assertEqual(len(c2), 2)
2751                                got_lines.append(c2 + textio.readline())
2752                        else:
2753                            got_lines = list(textio)
2754
2755                        for got_line, exp_line in zip(got_lines, exp_lines):
2756                            self.assertEqual(got_line, exp_line)
2757                        self.assertEqual(len(got_lines), len(exp_lines))
2758
2759    def test_newlines_input(self):
2760        testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
2761        normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2762        for newline, expected in [
2763            (None, normalized.decode("ascii").splitlines(keepends=True)),
2764            ("", testdata.decode("ascii").splitlines(keepends=True)),
2765            ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2766            ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2767            ("\r",  ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
2768            ]:
2769            buf = self.BytesIO(testdata)
2770            txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2771            self.assertEqual(txt.readlines(), expected)
2772            txt.seek(0)
2773            self.assertEqual(txt.read(), "".join(expected))
2774
2775    def test_newlines_output(self):
2776        testdict = {
2777            "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2778            "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2779            "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2780            "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2781            }
2782        tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2783        for newline, expected in tests:
2784            buf = self.BytesIO()
2785            txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2786            txt.write("AAA\nB")
2787            txt.write("BB\nCCC\n")
2788            txt.write("X\rY\r\nZ")
2789            txt.flush()
2790            self.assertEqual(buf.closed, False)
2791            self.assertEqual(buf.getvalue(), expected)
2792
2793    def test_destructor(self):
2794        l = []
2795        base = self.BytesIO
2796        class MyBytesIO(base):
2797            def close(self):
2798                l.append(self.getvalue())
2799                base.close(self)
2800        b = MyBytesIO()
2801        t = self.TextIOWrapper(b, encoding="ascii")
2802        t.write("abc")
2803        del t
2804        support.gc_collect()
2805        self.assertEqual([b"abc"], l)
2806
2807    def test_override_destructor(self):
2808        record = []
2809        class MyTextIO(self.TextIOWrapper):
2810            def __del__(self):
2811                record.append(1)
2812                try:
2813                    f = super().__del__
2814                except AttributeError:
2815                    pass
2816                else:
2817                    f()
2818            def close(self):
2819                record.append(2)
2820                super().close()
2821            def flush(self):
2822                record.append(3)
2823                super().flush()
2824        b = self.BytesIO()
2825        t = MyTextIO(b, encoding="ascii")
2826        del t
2827        support.gc_collect()
2828        self.assertEqual(record, [1, 2, 3])
2829
2830    def test_error_through_destructor(self):
2831        # Test that the exception state is not modified by a destructor,
2832        # even if close() fails.
2833        rawio = self.CloseFailureIO()
2834        def f():
2835            self.TextIOWrapper(rawio).xyzzy
2836        with support.captured_output("stderr") as s:
2837            self.assertRaises(AttributeError, f)
2838        s = s.getvalue().strip()
2839        if s:
2840            # The destructor *may* have printed an unraisable error, check it
2841            self.assertEqual(len(s.splitlines()), 1)
2842            self.assertTrue(s.startswith("Exception OSError: "), s)
2843            self.assertTrue(s.endswith(" ignored"), s)
2844
2845    # Systematic tests of the text I/O API
2846
2847    def test_basic_io(self):
2848        for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2849            for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
2850                f = self.open(support.TESTFN, "w+", encoding=enc)
2851                f._CHUNK_SIZE = chunksize
2852                self.assertEqual(f.write("abc"), 3)
2853                f.close()
2854                f = self.open(support.TESTFN, "r+", encoding=enc)
2855                f._CHUNK_SIZE = chunksize
2856                self.assertEqual(f.tell(), 0)
2857                self.assertEqual(f.read(), "abc")
2858                cookie = f.tell()
2859                self.assertEqual(f.seek(0), 0)
2860                self.assertEqual(f.read(None), "abc")
2861                f.seek(0)
2862                self.assertEqual(f.read(2), "ab")
2863                self.assertEqual(f.read(1), "c")
2864                self.assertEqual(f.read(1), "")
2865                self.assertEqual(f.read(), "")
2866                self.assertEqual(f.tell(), cookie)
2867                self.assertEqual(f.seek(0), 0)
2868                self.assertEqual(f.seek(0, 2), cookie)
2869                self.assertEqual(f.write("def"), 3)
2870                self.assertEqual(f.seek(cookie), cookie)
2871                self.assertEqual(f.read(), "def")
2872                if enc.startswith("utf"):
2873                    self.multi_line_test(f, enc)
2874                f.close()
2875
2876    def multi_line_test(self, f, enc):
2877        f.seek(0)
2878        f.truncate()
2879        sample = "s\xff\u0fff\uffff"
2880        wlines = []
2881        for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2882            chars = []
2883            for i in range(size):
2884                chars.append(sample[i % len(sample)])
2885            line = "".join(chars) + "\n"
2886            wlines.append((f.tell(), line))
2887            f.write(line)
2888        f.seek(0)
2889        rlines = []
2890        while True:
2891            pos = f.tell()
2892            line = f.readline()
2893            if not line:
2894                break
2895            rlines.append((pos, line))
2896        self.assertEqual(rlines, wlines)
2897
2898    def test_telling(self):
2899        f = self.open(support.TESTFN, "w+", encoding="utf-8")
2900        p0 = f.tell()
2901        f.write("\xff\n")
2902        p1 = f.tell()
2903        f.write("\xff\n")
2904        p2 = f.tell()
2905        f.seek(0)
2906        self.assertEqual(f.tell(), p0)
2907        self.assertEqual(f.readline(), "\xff\n")
2908        self.assertEqual(f.tell(), p1)
2909        self.assertEqual(f.readline(), "\xff\n")
2910        self.assertEqual(f.tell(), p2)
2911        f.seek(0)
2912        for line in f:
2913            self.assertEqual(line, "\xff\n")
2914            self.assertRaises(OSError, f.tell)
2915        self.assertEqual(f.tell(), p2)
2916        f.close()
2917
2918    def test_seeking(self):
2919        chunk_size = _default_chunk_size()
2920        prefix_size = chunk_size - 2
2921        u_prefix = "a" * prefix_size
2922        prefix = bytes(u_prefix.encode("utf-8"))
2923        self.assertEqual(len(u_prefix), len(prefix))
2924        u_suffix = "\u8888\n"
2925        suffix = bytes(u_suffix.encode("utf-8"))
2926        line = prefix + suffix
2927        with self.open(support.TESTFN, "wb") as f:
2928            f.write(line*2)
2929        with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2930            s = f.read(prefix_size)
2931            self.assertEqual(s, str(prefix, "ascii"))
2932            self.assertEqual(f.tell(), prefix_size)
2933            self.assertEqual(f.readline(), u_suffix)
2934
2935    def test_seeking_too(self):
2936        # Regression test for a specific bug
2937        data = b'\xe0\xbf\xbf\n'
2938        with self.open(support.TESTFN, "wb") as f:
2939            f.write(data)
2940        with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2941            f._CHUNK_SIZE  # Just test that it exists
2942            f._CHUNK_SIZE = 2
2943            f.readline()
2944            f.tell()
2945
2946    def test_seek_and_tell(self):
2947        #Test seek/tell using the StatefulIncrementalDecoder.
2948        # Make test faster by doing smaller seeks
2949        CHUNK_SIZE = 128
2950
2951        def test_seek_and_tell_with_data(data, min_pos=0):
2952            """Tell/seek to various points within a data stream and ensure
2953            that the decoded data returned by read() is consistent."""
2954            f = self.open(support.TESTFN, 'wb')
2955            f.write(data)
2956            f.close()
2957            f = self.open(support.TESTFN, encoding='test_decoder')
2958            f._CHUNK_SIZE = CHUNK_SIZE
2959            decoded = f.read()
2960            f.close()
2961
2962            for i in range(min_pos, len(decoded) + 1): # seek positions
2963                for j in [1, 5, len(decoded) - i]: # read lengths
2964                    f = self.open(support.TESTFN, encoding='test_decoder')
2965                    self.assertEqual(f.read(i), decoded[:i])
2966                    cookie = f.tell()
2967                    self.assertEqual(f.read(j), decoded[i:i + j])
2968                    f.seek(cookie)
2969                    self.assertEqual(f.read(), decoded[i:])
2970                    f.close()
2971
2972        # Enable the test decoder.
2973        StatefulIncrementalDecoder.codecEnabled = 1
2974
2975        # Run the tests.
2976        try:
2977            # Try each test case.
2978            for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2979                test_seek_and_tell_with_data(input)
2980
2981            # Position each test case so that it crosses a chunk boundary.
2982            for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2983                offset = CHUNK_SIZE - len(input)//2
2984                prefix = b'.'*offset
2985                # Don't bother seeking into the prefix (takes too long).
2986                min_pos = offset*2
2987                test_seek_and_tell_with_data(prefix + input, min_pos)
2988
2989        # Ensure our test decoder won't interfere with subsequent tests.
2990        finally:
2991            StatefulIncrementalDecoder.codecEnabled = 0
2992
2993    def test_encoded_writes(self):
2994        data = "1234567890"
2995        tests = ("utf-16",
2996                 "utf-16-le",
2997                 "utf-16-be",
2998                 "utf-32",
2999                 "utf-32-le",
3000                 "utf-32-be")
3001        for encoding in tests:
3002            buf = self.BytesIO()
3003            f = self.TextIOWrapper(buf, encoding=encoding)
3004            # Check if the BOM is written only once (see issue1753).
3005            f.write(data)
3006            f.write(data)
3007            f.seek(0)
3008            self.assertEqual(f.read(), data * 2)
3009            f.seek(0)
3010            self.assertEqual(f.read(), data * 2)
3011            self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
3012
3013    def test_unreadable(self):
3014        class UnReadable(self.BytesIO):
3015            def readable(self):
3016                return False
3017        txt = self.TextIOWrapper(UnReadable())
3018        self.assertRaises(OSError, txt.read)
3019
3020    def test_read_one_by_one(self):
3021        txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
3022        reads = ""
3023        while True:
3024            c = txt.read(1)
3025            if not c:
3026                break
3027            reads += c
3028        self.assertEqual(reads, "AA\nBB")
3029
3030    def test_readlines(self):
3031        txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
3032        self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
3033        txt.seek(0)
3034        self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
3035        txt.seek(0)
3036        self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
3037
3038    # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
3039    def test_read_by_chunk(self):
3040        # make sure "\r\n" straddles 128 char boundary.
3041        txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
3042        reads = ""
3043        while True:
3044            c = txt.read(128)
3045            if not c:
3046                break
3047            reads += c
3048        self.assertEqual(reads, "A"*127+"\nB")
3049
3050    def test_writelines(self):
3051        l = ['ab', 'cd', 'ef']
3052        buf = self.BytesIO()
3053        txt = self.TextIOWrapper(buf)
3054        txt.writelines(l)
3055        txt.flush()
3056        self.assertEqual(buf.getvalue(), b'abcdef')
3057
3058    def test_writelines_userlist(self):
3059        l = UserList(['ab', 'cd', 'ef'])
3060        buf = self.BytesIO()
3061        txt = self.TextIOWrapper(buf)
3062        txt.writelines(l)
3063        txt.flush()
3064        self.assertEqual(buf.getvalue(), b'abcdef')
3065
3066    def test_writelines_error(self):
3067        txt = self.TextIOWrapper(self.BytesIO())
3068        self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
3069        self.assertRaises(TypeError, txt.writelines, None)
3070        self.assertRaises(TypeError, txt.writelines, b'abc')
3071
3072    def test_issue1395_1(self):
3073        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3074
3075        # read one char at a time
3076        reads = ""
3077        while True:
3078            c = txt.read(1)
3079            if not c:
3080                break
3081            reads += c
3082        self.assertEqual(reads, self.normalized)
3083
3084    def test_issue1395_2(self):
3085        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3086        txt._CHUNK_SIZE = 4
3087
3088        reads = ""
3089        while True:
3090            c = txt.read(4)
3091            if not c:
3092                break
3093            reads += c
3094        self.assertEqual(reads, self.normalized)
3095
3096    def test_issue1395_3(self):
3097        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3098        txt._CHUNK_SIZE = 4
3099
3100        reads = txt.read(4)
3101        reads += txt.read(4)
3102        reads += txt.readline()
3103        reads += txt.readline()
3104        reads += txt.readline()
3105        self.assertEqual(reads, self.normalized)
3106
3107    def test_issue1395_4(self):
3108        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3109        txt._CHUNK_SIZE = 4
3110
3111        reads = txt.read(4)
3112        reads += txt.read()
3113        self.assertEqual(reads, self.normalized)
3114
3115    def test_issue1395_5(self):
3116        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3117        txt._CHUNK_SIZE = 4
3118
3119        reads = txt.read(4)
3120        pos = txt.tell()
3121        txt.seek(0)
3122        txt.seek(pos)
3123        self.assertEqual(txt.read(4), "BBB\n")
3124
3125    def test_issue2282(self):
3126        buffer = self.BytesIO(self.testdata)
3127        txt = self.TextIOWrapper(buffer, encoding="ascii")
3128
3129        self.assertEqual(buffer.seekable(), txt.seekable())
3130
3131    def test_append_bom(self):
3132        # The BOM is not written again when appending to a non-empty file
3133        filename = support.TESTFN
3134        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3135            with self.open(filename, 'w', encoding=charset) as f:
3136                f.write('aaa')
3137                pos = f.tell()
3138            with self.open(filename, 'rb') as f:
3139                self.assertEqual(f.read(), 'aaa'.encode(charset))
3140
3141            with self.open(filename, 'a', encoding=charset) as f:
3142                f.write('xxx')
3143            with self.open(filename, 'rb') as f:
3144                self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3145
3146    def test_seek_bom(self):
3147        # Same test, but when seeking manually
3148        filename = support.TESTFN
3149        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3150            with self.open(filename, 'w', encoding=charset) as f:
3151                f.write('aaa')
3152                pos = f.tell()
3153            with self.open(filename, 'r+', encoding=charset) as f:
3154                f.seek(pos)
3155                f.write('zzz')
3156                f.seek(0)
3157                f.write('bbb')
3158            with self.open(filename, 'rb') as f:
3159                self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
3160
3161    def test_seek_append_bom(self):
3162        # Same test, but first seek to the start and then to the end
3163        filename = support.TESTFN
3164        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3165            with self.open(filename, 'w', encoding=charset) as f:
3166                f.write('aaa')
3167            with self.open(filename, 'a', encoding=charset) as f:
3168                f.seek(0)
3169                f.seek(0, self.SEEK_END)
3170                f.write('xxx')
3171            with self.open(filename, 'rb') as f:
3172                self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3173
3174    def test_errors_property(self):
3175        with self.open(support.TESTFN, "w") as f:
3176            self.assertEqual(f.errors, "strict")
3177        with self.open(support.TESTFN, "w", errors="replace") as f:
3178            self.assertEqual(f.errors, "replace")
3179
3180    @support.no_tracing
3181    def test_threads_write(self):
3182        # Issue6750: concurrent writes could duplicate data
3183        event = threading.Event()
3184        with self.open(support.TESTFN, "w", buffering=1) as f:
3185            def run(n):
3186                text = "Thread%03d\n" % n
3187                event.wait()
3188                f.write(text)
3189            threads = [threading.Thread(target=run, args=(x,))
3190                       for x in range(20)]
3191            with support.start_threads(threads, event.set):
3192                time.sleep(0.02)
3193        with self.open(support.TESTFN) as f:
3194            content = f.read()
3195            for n in range(20):
3196                self.assertEqual(content.count("Thread%03d\n" % n), 1)
3197
3198    def test_flush_error_on_close(self):
3199        # Test that text file is closed despite failed flush
3200        # and that flush() is called before file closed.
3201        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3202        closed = []
3203        def bad_flush():
3204            closed[:] = [txt.closed, txt.buffer.closed]
3205            raise OSError()
3206        txt.flush = bad_flush
3207        self.assertRaises(OSError, txt.close) # exception not swallowed
3208        self.assertTrue(txt.closed)
3209        self.assertTrue(txt.buffer.closed)
3210        self.assertTrue(closed)      # flush() called
3211        self.assertFalse(closed[0])  # flush() called before file closed
3212        self.assertFalse(closed[1])
3213        txt.flush = lambda: None  # break reference loop
3214
3215    def test_close_error_on_close(self):
3216        buffer = self.BytesIO(self.testdata)
3217        def bad_flush():
3218            raise OSError('flush')
3219        def bad_close():
3220            raise OSError('close')
3221        buffer.close = bad_close
3222        txt = self.TextIOWrapper(buffer, encoding="ascii")
3223        txt.flush = bad_flush
3224        with self.assertRaises(OSError) as err: # exception not swallowed
3225            txt.close()
3226        self.assertEqual(err.exception.args, ('close',))
3227        self.assertIsInstance(err.exception.__context__, OSError)
3228        self.assertEqual(err.exception.__context__.args, ('flush',))
3229        self.assertFalse(txt.closed)
3230
3231    def test_nonnormalized_close_error_on_close(self):
3232        # Issue #21677
3233        buffer = self.BytesIO(self.testdata)
3234        def bad_flush():
3235            raise non_existing_flush
3236        def bad_close():
3237            raise non_existing_close
3238        buffer.close = bad_close
3239        txt = self.TextIOWrapper(buffer, encoding="ascii")
3240        txt.flush = bad_flush
3241        with self.assertRaises(NameError) as err: # exception not swallowed
3242            txt.close()
3243        self.assertIn('non_existing_close', str(err.exception))
3244        self.assertIsInstance(err.exception.__context__, NameError)
3245        self.assertIn('non_existing_flush', str(err.exception.__context__))
3246        self.assertFalse(txt.closed)
3247
3248    def test_multi_close(self):
3249        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3250        txt.close()
3251        txt.close()
3252        txt.close()
3253        self.assertRaises(ValueError, txt.flush)
3254
3255    def test_unseekable(self):
3256        txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3257        self.assertRaises(self.UnsupportedOperation, txt.tell)
3258        self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3259
3260    def test_readonly_attributes(self):
3261        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3262        buf = self.BytesIO(self.testdata)
3263        with self.assertRaises(AttributeError):
3264            txt.buffer = buf
3265
3266    def test_rawio(self):
3267        # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3268        # that subprocess.Popen() can have the required unbuffered
3269        # semantics with universal_newlines=True.
3270        raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3271        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3272        # Reads
3273        self.assertEqual(txt.read(4), 'abcd')
3274        self.assertEqual(txt.readline(), 'efghi\n')
3275        self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3276
3277    def test_rawio_write_through(self):
3278        # Issue #12591: with write_through=True, writes don't need a flush
3279        raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3280        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3281                                 write_through=True)
3282        txt.write('1')
3283        txt.write('23\n4')
3284        txt.write('5')
3285        self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3286
3287    def test_bufio_write_through(self):
3288        # Issue #21396: write_through=True doesn't force a flush()
3289        # on the underlying binary buffered object.
3290        flush_called, write_called = [], []
3291        class BufferedWriter(self.BufferedWriter):
3292            def flush(self, *args, **kwargs):
3293                flush_called.append(True)
3294                return super().flush(*args, **kwargs)
3295            def write(self, *args, **kwargs):
3296                write_called.append(True)
3297                return super().write(*args, **kwargs)
3298
3299        rawio = self.BytesIO()
3300        data = b"a"
3301        bufio = BufferedWriter(rawio, len(data)*2)
3302        textio = self.TextIOWrapper(bufio, encoding='ascii',
3303                                    write_through=True)
3304        # write to the buffered io but don't overflow the buffer
3305        text = data.decode('ascii')
3306        textio.write(text)
3307
3308        # buffer.flush is not called with write_through=True
3309        self.assertFalse(flush_called)
3310        # buffer.write *is* called with write_through=True
3311        self.assertTrue(write_called)
3312        self.assertEqual(rawio.getvalue(), b"") # no flush
3313
3314        write_called = [] # reset
3315        textio.write(text * 10) # total content is larger than bufio buffer
3316        self.assertTrue(write_called)
3317        self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3318
3319    def test_reconfigure_write_through(self):
3320        raw = self.MockRawIO([])
3321        t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3322        t.write('1')
3323        t.reconfigure(write_through=True)  # implied flush
3324        self.assertEqual(t.write_through, True)
3325        self.assertEqual(b''.join(raw._write_stack), b'1')
3326        t.write('23')
3327        self.assertEqual(b''.join(raw._write_stack), b'123')
3328        t.reconfigure(write_through=False)
3329        self.assertEqual(t.write_through, False)
3330        t.write('45')
3331        t.flush()
3332        self.assertEqual(b''.join(raw._write_stack), b'12345')
3333        # Keeping default value
3334        t.reconfigure()
3335        t.reconfigure(write_through=None)
3336        self.assertEqual(t.write_through, False)
3337        t.reconfigure(write_through=True)
3338        t.reconfigure()
3339        t.reconfigure(write_through=None)
3340        self.assertEqual(t.write_through, True)
3341
3342    def test_read_nonbytes(self):
3343        # Issue #17106
3344        # Crash when underlying read() returns non-bytes
3345        t = self.TextIOWrapper(self.StringIO('a'))
3346        self.assertRaises(TypeError, t.read, 1)
3347        t = self.TextIOWrapper(self.StringIO('a'))
3348        self.assertRaises(TypeError, t.readline)
3349        t = self.TextIOWrapper(self.StringIO('a'))
3350        self.assertRaises(TypeError, t.read)
3351
3352    def test_illegal_encoder(self):
3353        # Issue 31271: Calling write() while the return value of encoder's
3354        # encode() is invalid shouldn't cause an assertion failure.
3355        rot13 = codecs.lookup("rot13")
3356        with support.swap_attr(rot13, '_is_text_encoding', True):
3357            t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
3358        self.assertRaises(TypeError, t.write, 'bar')
3359
3360    def test_illegal_decoder(self):
3361        # Issue #17106
3362        # Bypass the early encoding check added in issue 20404
3363        def _make_illegal_wrapper():
3364            quopri = codecs.lookup("quopri")
3365            quopri._is_text_encoding = True
3366            try:
3367                t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3368                                       newline='\n', encoding="quopri")
3369            finally:
3370                quopri._is_text_encoding = False
3371            return t
3372        # Crash when decoder returns non-string
3373        t = _make_illegal_wrapper()
3374        self.assertRaises(TypeError, t.read, 1)
3375        t = _make_illegal_wrapper()
3376        self.assertRaises(TypeError, t.readline)
3377        t = _make_illegal_wrapper()
3378        self.assertRaises(TypeError, t.read)
3379
3380        # Issue 31243: calling read() while the return value of decoder's
3381        # getstate() is invalid should neither crash the interpreter nor
3382        # raise a SystemError.
3383        def _make_very_illegal_wrapper(getstate_ret_val):
3384            class BadDecoder:
3385                def getstate(self):
3386                    return getstate_ret_val
3387            def _get_bad_decoder(dummy):
3388                return BadDecoder()
3389            quopri = codecs.lookup("quopri")
3390            with support.swap_attr(quopri, 'incrementaldecoder',
3391                                   _get_bad_decoder):
3392                return _make_illegal_wrapper()
3393        t = _make_very_illegal_wrapper(42)
3394        self.assertRaises(TypeError, t.read, 42)
3395        t = _make_very_illegal_wrapper(())
3396        self.assertRaises(TypeError, t.read, 42)
3397        t = _make_very_illegal_wrapper((1, 2))
3398        self.assertRaises(TypeError, t.read, 42)
3399
3400    def _check_create_at_shutdown(self, **kwargs):
3401        # Issue #20037: creating a TextIOWrapper at shutdown
3402        # shouldn't crash the interpreter.
3403        iomod = self.io.__name__
3404        code = """if 1:
3405            import codecs
3406            import {iomod} as io
3407
3408            # Avoid looking up codecs at shutdown
3409            codecs.lookup('utf-8')
3410
3411            class C:
3412                def __init__(self):
3413                    self.buf = io.BytesIO()
3414                def __del__(self):
3415                    io.TextIOWrapper(self.buf, **{kwargs})
3416                    print("ok")
3417            c = C()
3418            """.format(iomod=iomod, kwargs=kwargs)
3419        return assert_python_ok("-c", code)
3420
3421    @support.requires_type_collecting
3422    def test_create_at_shutdown_without_encoding(self):
3423        rc, out, err = self._check_create_at_shutdown()
3424        if err:
3425            # Can error out with a RuntimeError if the module state
3426            # isn't found.
3427            self.assertIn(self.shutdown_error, err.decode())
3428        else:
3429            self.assertEqual("ok", out.decode().strip())
3430
3431    @support.requires_type_collecting
3432    def test_create_at_shutdown_with_encoding(self):
3433        rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3434                                                      errors='strict')
3435        self.assertFalse(err)
3436        self.assertEqual("ok", out.decode().strip())
3437
3438    def test_read_byteslike(self):
3439        r = MemviewBytesIO(b'Just some random string\n')
3440        t = self.TextIOWrapper(r, 'utf-8')
3441
3442        # TextIOwrapper will not read the full string, because
3443        # we truncate it to a multiple of the native int size
3444        # so that we can construct a more complex memoryview.
3445        bytes_val =  _to_memoryview(r.getvalue()).tobytes()
3446
3447        self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3448
3449    def test_issue22849(self):
3450        class F(object):
3451            def readable(self): return True
3452            def writable(self): return True
3453            def seekable(self): return True
3454
3455        for i in range(10):
3456            try:
3457                self.TextIOWrapper(F(), encoding='utf-8')
3458            except Exception:
3459                pass
3460
3461        F.tell = lambda x: 0
3462        t = self.TextIOWrapper(F(), encoding='utf-8')
3463
3464    def test_reconfigure_encoding_read(self):
3465        # latin1 -> utf8
3466        # (latin1 can decode utf-8 encoded string)
3467        data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
3468        raw = self.BytesIO(data)
3469        txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3470        self.assertEqual(txt.readline(), 'abc\xe9\n')
3471        with self.assertRaises(self.UnsupportedOperation):
3472            txt.reconfigure(encoding='utf-8')
3473        with self.assertRaises(self.UnsupportedOperation):
3474            txt.reconfigure(newline=None)
3475
3476    def test_reconfigure_write_fromascii(self):
3477        # ascii has a specific encodefunc in the C implementation,
3478        # but utf-8-sig has not. Make sure that we get rid of the
3479        # cached encodefunc when we switch encoders.
3480        raw = self.BytesIO()
3481        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3482        txt.write('foo\n')
3483        txt.reconfigure(encoding='utf-8-sig')
3484        txt.write('\xe9\n')
3485        txt.flush()
3486        self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
3487
3488    def test_reconfigure_write(self):
3489        # latin -> utf8
3490        raw = self.BytesIO()
3491        txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3492        txt.write('abc\xe9\n')
3493        txt.reconfigure(encoding='utf-8')
3494        self.assertEqual(raw.getvalue(), b'abc\xe9\n')
3495        txt.write('d\xe9f\n')
3496        txt.flush()
3497        self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
3498
3499        # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
3500        # the file
3501        raw = self.BytesIO()
3502        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3503        txt.write('abc\n')
3504        txt.reconfigure(encoding='utf-8-sig')
3505        txt.write('d\xe9f\n')
3506        txt.flush()
3507        self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
3508
3509    def test_reconfigure_write_non_seekable(self):
3510        raw = self.BytesIO()
3511        raw.seekable = lambda: False
3512        raw.seek = None
3513        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3514        txt.write('abc\n')
3515        txt.reconfigure(encoding='utf-8-sig')
3516        txt.write('d\xe9f\n')
3517        txt.flush()
3518
3519        # If the raw stream is not seekable, there'll be a BOM
3520        self.assertEqual(raw.getvalue(),  b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
3521
3522    def test_reconfigure_defaults(self):
3523        txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
3524        txt.reconfigure(encoding=None)
3525        self.assertEqual(txt.encoding, 'ascii')
3526        self.assertEqual(txt.errors, 'replace')
3527        txt.write('LF\n')
3528
3529        txt.reconfigure(newline='\r\n')
3530        self.assertEqual(txt.encoding, 'ascii')
3531        self.assertEqual(txt.errors, 'replace')
3532
3533        txt.reconfigure(errors='ignore')
3534        self.assertEqual(txt.encoding, 'ascii')
3535        self.assertEqual(txt.errors, 'ignore')
3536        txt.write('CRLF\n')
3537
3538        txt.reconfigure(encoding='utf-8', newline=None)
3539        self.assertEqual(txt.errors, 'strict')
3540        txt.seek(0)
3541        self.assertEqual(txt.read(), 'LF\nCRLF\n')
3542
3543        self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
3544
3545    def test_reconfigure_newline(self):
3546        raw = self.BytesIO(b'CR\rEOF')
3547        txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3548        txt.reconfigure(newline=None)
3549        self.assertEqual(txt.readline(), 'CR\n')
3550        raw = self.BytesIO(b'CR\rEOF')
3551        txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3552        txt.reconfigure(newline='')
3553        self.assertEqual(txt.readline(), 'CR\r')
3554        raw = self.BytesIO(b'CR\rLF\nEOF')
3555        txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3556        txt.reconfigure(newline='\n')
3557        self.assertEqual(txt.readline(), 'CR\rLF\n')
3558        raw = self.BytesIO(b'LF\nCR\rEOF')
3559        txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3560        txt.reconfigure(newline='\r')
3561        self.assertEqual(txt.readline(), 'LF\nCR\r')
3562        raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
3563        txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3564        txt.reconfigure(newline='\r\n')
3565        self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
3566
3567        txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
3568        txt.reconfigure(newline=None)
3569        txt.write('linesep\n')
3570        txt.reconfigure(newline='')
3571        txt.write('LF\n')
3572        txt.reconfigure(newline='\n')
3573        txt.write('LF\n')
3574        txt.reconfigure(newline='\r')
3575        txt.write('CR\n')
3576        txt.reconfigure(newline='\r\n')
3577        txt.write('CRLF\n')
3578        expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
3579        self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
3580
3581    def test_issue25862(self):
3582        # Assertion failures occurred in tell() after read() and write().
3583        t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3584        t.read(1)
3585        t.read()
3586        t.tell()
3587        t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3588        t.read(1)
3589        t.write('x')
3590        t.tell()
3591
3592
3593class MemviewBytesIO(io.BytesIO):
3594    '''A BytesIO object whose read method returns memoryviews
3595       rather than bytes'''
3596
3597    def read1(self, len_):
3598        return _to_memoryview(super().read1(len_))
3599
3600    def read(self, len_):
3601        return _to_memoryview(super().read(len_))
3602
3603def _to_memoryview(buf):
3604    '''Convert bytes-object *buf* to a non-trivial memoryview'''
3605
3606    arr = array.array('i')
3607    idx = len(buf) - len(buf) % arr.itemsize
3608    arr.frombytes(buf[:idx])
3609    return memoryview(arr)
3610
3611
3612class CTextIOWrapperTest(TextIOWrapperTest):
3613    io = io
3614    shutdown_error = "RuntimeError: could not find io module state"
3615
3616    def test_initialization(self):
3617        r = self.BytesIO(b"\xc3\xa9\n\n")
3618        b = self.BufferedReader(r, 1000)
3619        t = self.TextIOWrapper(b)
3620        self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3621        self.assertRaises(ValueError, t.read)
3622
3623        t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3624        self.assertRaises(Exception, repr, t)
3625
3626    def test_garbage_collection(self):
3627        # C TextIOWrapper objects are collected, and collecting them flushes
3628        # all data to disk.
3629        # The Python version has __del__, so it ends in gc.garbage instead.
3630        with support.check_warnings(('', ResourceWarning)):
3631            rawio = io.FileIO(support.TESTFN, "wb")
3632            b = self.BufferedWriter(rawio)
3633            t = self.TextIOWrapper(b, encoding="ascii")
3634            t.write("456def")
3635            t.x = t
3636            wr = weakref.ref(t)
3637            del t
3638            support.gc_collect()
3639        self.assertIsNone(wr(), wr)
3640        with self.open(support.TESTFN, "rb") as f:
3641            self.assertEqual(f.read(), b"456def")
3642
3643    def test_rwpair_cleared_before_textio(self):
3644        # Issue 13070: TextIOWrapper's finalization would crash when called
3645        # after the reference to the underlying BufferedRWPair's writer got
3646        # cleared by the GC.
3647        for i in range(1000):
3648            b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3649            t1 = self.TextIOWrapper(b1, encoding="ascii")
3650            b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3651            t2 = self.TextIOWrapper(b2, encoding="ascii")
3652            # circular references
3653            t1.buddy = t2
3654            t2.buddy = t1
3655        support.gc_collect()
3656
3657    def test_del__CHUNK_SIZE_SystemError(self):
3658        t = self.TextIOWrapper(self.BytesIO(), encoding='ascii')
3659        with self.assertRaises(AttributeError):
3660            del t._CHUNK_SIZE
3661
3662
3663class PyTextIOWrapperTest(TextIOWrapperTest):
3664    io = pyio
3665    shutdown_error = "LookupError: unknown encoding: ascii"
3666
3667
3668class IncrementalNewlineDecoderTest(unittest.TestCase):
3669
3670    def check_newline_decoding_utf8(self, decoder):
3671        # UTF-8 specific tests for a newline decoder
3672        def _check_decode(b, s, **kwargs):
3673            # We exercise getstate() / setstate() as well as decode()
3674            state = decoder.getstate()
3675            self.assertEqual(decoder.decode(b, **kwargs), s)
3676            decoder.setstate(state)
3677            self.assertEqual(decoder.decode(b, **kwargs), s)
3678
3679        _check_decode(b'\xe8\xa2\x88', "\u8888")
3680
3681        _check_decode(b'\xe8', "")
3682        _check_decode(b'\xa2', "")
3683        _check_decode(b'\x88', "\u8888")
3684
3685        _check_decode(b'\xe8', "")
3686        _check_decode(b'\xa2', "")
3687        _check_decode(b'\x88', "\u8888")
3688
3689        _check_decode(b'\xe8', "")
3690        self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3691
3692        decoder.reset()
3693        _check_decode(b'\n', "\n")
3694        _check_decode(b'\r', "")
3695        _check_decode(b'', "\n", final=True)
3696        _check_decode(b'\r', "\n", final=True)
3697
3698        _check_decode(b'\r', "")
3699        _check_decode(b'a', "\na")
3700
3701        _check_decode(b'\r\r\n', "\n\n")
3702        _check_decode(b'\r', "")
3703        _check_decode(b'\r', "\n")
3704        _check_decode(b'\na', "\na")
3705
3706        _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3707        _check_decode(b'\xe8\xa2\x88', "\u8888")
3708        _check_decode(b'\n', "\n")
3709        _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3710        _check_decode(b'\n', "\n")
3711
3712    def check_newline_decoding(self, decoder, encoding):
3713        result = []
3714        if encoding is not None:
3715            encoder = codecs.getincrementalencoder(encoding)()
3716            def _decode_bytewise(s):
3717                # Decode one byte at a time
3718                for b in encoder.encode(s):
3719                    result.append(decoder.decode(bytes([b])))
3720        else:
3721            encoder = None
3722            def _decode_bytewise(s):
3723                # Decode one char at a time
3724                for c in s:
3725                    result.append(decoder.decode(c))
3726        self.assertEqual(decoder.newlines, None)
3727        _decode_bytewise("abc\n\r")
3728        self.assertEqual(decoder.newlines, '\n')
3729        _decode_bytewise("\nabc")
3730        self.assertEqual(decoder.newlines, ('\n', '\r\n'))
3731        _decode_bytewise("abc\r")
3732        self.assertEqual(decoder.newlines, ('\n', '\r\n'))
3733        _decode_bytewise("abc")
3734        self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
3735        _decode_bytewise("abc\r")
3736        self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
3737        decoder.reset()
3738        input = "abc"
3739        if encoder is not None:
3740            encoder.reset()
3741            input = encoder.encode(input)
3742        self.assertEqual(decoder.decode(input), "abc")
3743        self.assertEqual(decoder.newlines, None)
3744
3745    def test_newline_decoder(self):
3746        encodings = (
3747            # None meaning the IncrementalNewlineDecoder takes unicode input
3748            # rather than bytes input
3749            None, 'utf-8', 'latin-1',
3750            'utf-16', 'utf-16-le', 'utf-16-be',
3751            'utf-32', 'utf-32-le', 'utf-32-be',
3752        )
3753        for enc in encodings:
3754            decoder = enc and codecs.getincrementaldecoder(enc)()
3755            decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3756            self.check_newline_decoding(decoder, enc)
3757        decoder = codecs.getincrementaldecoder("utf-8")()
3758        decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3759        self.check_newline_decoding_utf8(decoder)
3760        self.assertRaises(TypeError, decoder.setstate, 42)
3761
3762    def test_newline_bytes(self):
3763        # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3764        def _check(dec):
3765            self.assertEqual(dec.newlines, None)
3766            self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3767            self.assertEqual(dec.newlines, None)
3768            self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3769            self.assertEqual(dec.newlines, None)
3770        dec = self.IncrementalNewlineDecoder(None, translate=False)
3771        _check(dec)
3772        dec = self.IncrementalNewlineDecoder(None, translate=True)
3773        _check(dec)
3774
3775    def test_translate(self):
3776        # issue 35062
3777        for translate in (-2, -1, 1, 2):
3778            decoder = codecs.getincrementaldecoder("utf-8")()
3779            decoder = self.IncrementalNewlineDecoder(decoder, translate)
3780            self.check_newline_decoding_utf8(decoder)
3781        decoder = codecs.getincrementaldecoder("utf-8")()
3782        decoder = self.IncrementalNewlineDecoder(decoder, translate=0)
3783        self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n")
3784
3785class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3786    pass
3787
3788class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3789    pass
3790
3791
3792# XXX Tests for open()
3793
3794class MiscIOTest(unittest.TestCase):
3795
3796    def tearDown(self):
3797        support.unlink(support.TESTFN)
3798
3799    def test___all__(self):
3800        for name in self.io.__all__:
3801            obj = getattr(self.io, name, None)
3802            self.assertIsNotNone(obj, name)
3803            if name == "open":
3804                continue
3805            elif "error" in name.lower() or name == "UnsupportedOperation":
3806                self.assertTrue(issubclass(obj, Exception), name)
3807            elif not name.startswith("SEEK_"):
3808                self.assertTrue(issubclass(obj, self.IOBase))
3809
3810    def test_attributes(self):
3811        f = self.open(support.TESTFN, "wb", buffering=0)
3812        self.assertEqual(f.mode, "wb")
3813        f.close()
3814
3815        with support.check_warnings(('', DeprecationWarning)):
3816            f = self.open(support.TESTFN, "U")
3817        self.assertEqual(f.name,            support.TESTFN)
3818        self.assertEqual(f.buffer.name,     support.TESTFN)
3819        self.assertEqual(f.buffer.raw.name, support.TESTFN)
3820        self.assertEqual(f.mode,            "U")
3821        self.assertEqual(f.buffer.mode,     "rb")
3822        self.assertEqual(f.buffer.raw.mode, "rb")
3823        f.close()
3824
3825        f = self.open(support.TESTFN, "w+")
3826        self.assertEqual(f.mode,            "w+")
3827        self.assertEqual(f.buffer.mode,     "rb+") # Does it really matter?
3828        self.assertEqual(f.buffer.raw.mode, "rb+")
3829
3830        g = self.open(f.fileno(), "wb", closefd=False)
3831        self.assertEqual(g.mode,     "wb")
3832        self.assertEqual(g.raw.mode, "wb")
3833        self.assertEqual(g.name,     f.fileno())
3834        self.assertEqual(g.raw.name, f.fileno())
3835        f.close()
3836        g.close()
3837
3838    def test_open_pipe_with_append(self):
3839        # bpo-27805: Ignore ESPIPE from lseek() in open().
3840        r, w = os.pipe()
3841        self.addCleanup(os.close, r)
3842        f = self.open(w, 'a')
3843        self.addCleanup(f.close)
3844        # Check that the file is marked non-seekable. On Windows, however, lseek
3845        # somehow succeeds on pipes.
3846        if sys.platform != 'win32':
3847            self.assertFalse(f.seekable())
3848
3849    def test_io_after_close(self):
3850        for kwargs in [
3851                {"mode": "w"},
3852                {"mode": "wb"},
3853                {"mode": "w", "buffering": 1},
3854                {"mode": "w", "buffering": 2},
3855                {"mode": "wb", "buffering": 0},
3856                {"mode": "r"},
3857                {"mode": "rb"},
3858                {"mode": "r", "buffering": 1},
3859                {"mode": "r", "buffering": 2},
3860                {"mode": "rb", "buffering": 0},
3861                {"mode": "w+"},
3862                {"mode": "w+b"},
3863                {"mode": "w+", "buffering": 1},
3864                {"mode": "w+", "buffering": 2},
3865                {"mode": "w+b", "buffering": 0},
3866            ]:
3867            f = self.open(support.TESTFN, **kwargs)
3868            f.close()
3869            self.assertRaises(ValueError, f.flush)
3870            self.assertRaises(ValueError, f.fileno)
3871            self.assertRaises(ValueError, f.isatty)
3872            self.assertRaises(ValueError, f.__iter__)
3873            if hasattr(f, "peek"):
3874                self.assertRaises(ValueError, f.peek, 1)
3875            self.assertRaises(ValueError, f.read)
3876            if hasattr(f, "read1"):
3877                self.assertRaises(ValueError, f.read1, 1024)
3878                self.assertRaises(ValueError, f.read1)
3879            if hasattr(f, "readall"):
3880                self.assertRaises(ValueError, f.readall)
3881            if hasattr(f, "readinto"):
3882                self.assertRaises(ValueError, f.readinto, bytearray(1024))
3883            if hasattr(f, "readinto1"):
3884                self.assertRaises(ValueError, f.readinto1, bytearray(1024))
3885            self.assertRaises(ValueError, f.readline)
3886            self.assertRaises(ValueError, f.readlines)
3887            self.assertRaises(ValueError, f.readlines, 1)
3888            self.assertRaises(ValueError, f.seek, 0)
3889            self.assertRaises(ValueError, f.tell)
3890            self.assertRaises(ValueError, f.truncate)
3891            self.assertRaises(ValueError, f.write,
3892                              b"" if "b" in kwargs['mode'] else "")
3893            self.assertRaises(ValueError, f.writelines, [])
3894            self.assertRaises(ValueError, next, f)
3895
3896    def test_blockingioerror(self):
3897        # Various BlockingIOError issues
3898        class C(str):
3899            pass
3900        c = C("")
3901        b = self.BlockingIOError(1, c)
3902        c.b = b
3903        b.c = c
3904        wr = weakref.ref(c)
3905        del c, b
3906        support.gc_collect()
3907        self.assertIsNone(wr(), wr)
3908
3909    def test_abcs(self):
3910        # Test the visible base classes are ABCs.
3911        self.assertIsInstance(self.IOBase, abc.ABCMeta)
3912        self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3913        self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3914        self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
3915
3916    def _check_abc_inheritance(self, abcmodule):
3917        with self.open(support.TESTFN, "wb", buffering=0) as f:
3918            self.assertIsInstance(f, abcmodule.IOBase)
3919            self.assertIsInstance(f, abcmodule.RawIOBase)
3920            self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3921            self.assertNotIsInstance(f, abcmodule.TextIOBase)
3922        with self.open(support.TESTFN, "wb") as f:
3923            self.assertIsInstance(f, abcmodule.IOBase)
3924            self.assertNotIsInstance(f, abcmodule.RawIOBase)
3925            self.assertIsInstance(f, abcmodule.BufferedIOBase)
3926            self.assertNotIsInstance(f, abcmodule.TextIOBase)
3927        with self.open(support.TESTFN, "w") as f:
3928            self.assertIsInstance(f, abcmodule.IOBase)
3929            self.assertNotIsInstance(f, abcmodule.RawIOBase)
3930            self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3931            self.assertIsInstance(f, abcmodule.TextIOBase)
3932
3933    def test_abc_inheritance(self):
3934        # Test implementations inherit from their respective ABCs
3935        self._check_abc_inheritance(self)
3936
3937    def test_abc_inheritance_official(self):
3938        # Test implementations inherit from the official ABCs of the
3939        # baseline "io" module.
3940        self._check_abc_inheritance(io)
3941
3942    def _check_warn_on_dealloc(self, *args, **kwargs):
3943        f = open(*args, **kwargs)
3944        r = repr(f)
3945        with self.assertWarns(ResourceWarning) as cm:
3946            f = None
3947            support.gc_collect()
3948        self.assertIn(r, str(cm.warning.args[0]))
3949
3950    def test_warn_on_dealloc(self):
3951        self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3952        self._check_warn_on_dealloc(support.TESTFN, "wb")
3953        self._check_warn_on_dealloc(support.TESTFN, "w")
3954
3955    def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3956        fds = []
3957        def cleanup_fds():
3958            for fd in fds:
3959                try:
3960                    os.close(fd)
3961                except OSError as e:
3962                    if e.errno != errno.EBADF:
3963                        raise
3964        self.addCleanup(cleanup_fds)
3965        r, w = os.pipe()
3966        fds += r, w
3967        self._check_warn_on_dealloc(r, *args, **kwargs)
3968        # When using closefd=False, there's no warning
3969        r, w = os.pipe()
3970        fds += r, w
3971        with support.check_no_resource_warning(self):
3972            open(r, *args, closefd=False, **kwargs)
3973
3974    def test_warn_on_dealloc_fd(self):
3975        self._check_warn_on_dealloc_fd("rb", buffering=0)
3976        self._check_warn_on_dealloc_fd("rb")
3977        self._check_warn_on_dealloc_fd("r")
3978
3979
3980    def test_pickling(self):
3981        # Pickling file objects is forbidden
3982        for kwargs in [
3983                {"mode": "w"},
3984                {"mode": "wb"},
3985                {"mode": "wb", "buffering": 0},
3986                {"mode": "r"},
3987                {"mode": "rb"},
3988                {"mode": "rb", "buffering": 0},
3989                {"mode": "w+"},
3990                {"mode": "w+b"},
3991                {"mode": "w+b", "buffering": 0},
3992            ]:
3993            for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3994                with self.open(support.TESTFN, **kwargs) as f:
3995                    self.assertRaises(TypeError, pickle.dumps, f, protocol)
3996
3997    def test_nonblock_pipe_write_bigbuf(self):
3998        self._test_nonblock_pipe_write(16*1024)
3999
4000    def test_nonblock_pipe_write_smallbuf(self):
4001        self._test_nonblock_pipe_write(1024)
4002
4003    @unittest.skipUnless(hasattr(os, 'set_blocking'),
4004                         'os.set_blocking() required for this test')
4005    def _test_nonblock_pipe_write(self, bufsize):
4006        sent = []
4007        received = []
4008        r, w = os.pipe()
4009        os.set_blocking(r, False)
4010        os.set_blocking(w, False)
4011
4012        # To exercise all code paths in the C implementation we need
4013        # to play with buffer sizes.  For instance, if we choose a
4014        # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
4015        # then we will never get a partial write of the buffer.
4016        rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
4017        wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
4018
4019        with rf, wf:
4020            for N in 9999, 73, 7574:
4021                try:
4022                    i = 0
4023                    while True:
4024                        msg = bytes([i % 26 + 97]) * N
4025                        sent.append(msg)
4026                        wf.write(msg)
4027                        i += 1
4028
4029                except self.BlockingIOError as e:
4030                    self.assertEqual(e.args[0], errno.EAGAIN)
4031                    self.assertEqual(e.args[2], e.characters_written)
4032                    sent[-1] = sent[-1][:e.characters_written]
4033                    received.append(rf.read())
4034                    msg = b'BLOCKED'
4035                    wf.write(msg)
4036                    sent.append(msg)
4037
4038            while True:
4039                try:
4040                    wf.flush()
4041                    break
4042                except self.BlockingIOError as e:
4043                    self.assertEqual(e.args[0], errno.EAGAIN)
4044                    self.assertEqual(e.args[2], e.characters_written)
4045                    self.assertEqual(e.characters_written, 0)
4046                    received.append(rf.read())
4047
4048            received += iter(rf.read, None)
4049
4050        sent, received = b''.join(sent), b''.join(received)
4051        self.assertEqual(sent, received)
4052        self.assertTrue(wf.closed)
4053        self.assertTrue(rf.closed)
4054
4055    def test_create_fail(self):
4056        # 'x' mode fails if file is existing
4057        with self.open(support.TESTFN, 'w'):
4058            pass
4059        self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
4060
4061    def test_create_writes(self):
4062        # 'x' mode opens for writing
4063        with self.open(support.TESTFN, 'xb') as f:
4064            f.write(b"spam")
4065        with self.open(support.TESTFN, 'rb') as f:
4066            self.assertEqual(b"spam", f.read())
4067
4068    def test_open_allargs(self):
4069        # there used to be a buffer overflow in the parser for rawmode
4070        self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
4071
4072
4073class CMiscIOTest(MiscIOTest):
4074    io = io
4075
4076    def test_readinto_buffer_overflow(self):
4077        # Issue #18025
4078        class BadReader(self.io.BufferedIOBase):
4079            def read(self, n=-1):
4080                return b'x' * 10**6
4081        bufio = BadReader()
4082        b = bytearray(2)
4083        self.assertRaises(ValueError, bufio.readinto, b)
4084
4085    def check_daemon_threads_shutdown_deadlock(self, stream_name):
4086        # Issue #23309: deadlocks at shutdown should be avoided when a
4087        # daemon thread and the main thread both write to a file.
4088        code = """if 1:
4089            import sys
4090            import time
4091            import threading
4092            from test.support import SuppressCrashReport
4093
4094            file = sys.{stream_name}
4095
4096            def run():
4097                while True:
4098                    file.write('.')
4099                    file.flush()
4100
4101            crash = SuppressCrashReport()
4102            crash.__enter__()
4103            # don't call __exit__(): the crash occurs at Python shutdown
4104
4105            thread = threading.Thread(target=run)
4106            thread.daemon = True
4107            thread.start()
4108
4109            time.sleep(0.5)
4110            file.write('!')
4111            file.flush()
4112            """.format_map(locals())
4113        res, _ = run_python_until_end("-c", code)
4114        err = res.err.decode()
4115        if res.rc != 0:
4116            # Failure: should be a fatal error
4117            pattern = (r"Fatal Python error: could not acquire lock "
4118                       r"for <(_io\.)?BufferedWriter name='<{stream_name}>'> "
4119                       r"at interpreter shutdown, possibly due to "
4120                       r"daemon threads".format_map(locals()))
4121            self.assertRegex(err, pattern)
4122        else:
4123            self.assertFalse(err.strip('.!'))
4124
4125    def test_daemon_threads_shutdown_stdout_deadlock(self):
4126        self.check_daemon_threads_shutdown_deadlock('stdout')
4127
4128    def test_daemon_threads_shutdown_stderr_deadlock(self):
4129        self.check_daemon_threads_shutdown_deadlock('stderr')
4130
4131
4132class PyMiscIOTest(MiscIOTest):
4133    io = pyio
4134
4135
4136@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
4137class SignalsTest(unittest.TestCase):
4138
4139    def setUp(self):
4140        self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
4141
4142    def tearDown(self):
4143        signal.signal(signal.SIGALRM, self.oldalrm)
4144
4145    def alarm_interrupt(self, sig, frame):
4146        1/0
4147
4148    def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
4149        """Check that a partial write, when it gets interrupted, properly
4150        invokes the signal handler, and bubbles up the exception raised
4151        in the latter."""
4152        read_results = []
4153        def _read():
4154            s = os.read(r, 1)
4155            read_results.append(s)
4156
4157        t = threading.Thread(target=_read)
4158        t.daemon = True
4159        r, w = os.pipe()
4160        fdopen_kwargs["closefd"] = False
4161        large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
4162        try:
4163            wio = self.io.open(w, **fdopen_kwargs)
4164            if hasattr(signal, 'pthread_sigmask'):
4165                # create the thread with SIGALRM signal blocked
4166                signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
4167                t.start()
4168                signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGALRM])
4169            else:
4170                t.start()
4171
4172            # Fill the pipe enough that the write will be blocking.
4173            # It will be interrupted by the timer armed above.  Since the
4174            # other thread has read one byte, the low-level write will
4175            # return with a successful (partial) result rather than an EINTR.
4176            # The buffered IO layer must check for pending signal
4177            # handlers, which in this case will invoke alarm_interrupt().
4178            signal.alarm(1)
4179            try:
4180                self.assertRaises(ZeroDivisionError, wio.write, large_data)
4181            finally:
4182                signal.alarm(0)
4183                t.join()
4184            # We got one byte, get another one and check that it isn't a
4185            # repeat of the first one.
4186            read_results.append(os.read(r, 1))
4187            self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
4188        finally:
4189            os.close(w)
4190            os.close(r)
4191            # This is deliberate. If we didn't close the file descriptor
4192            # before closing wio, wio would try to flush its internal
4193            # buffer, and block again.
4194            try:
4195                wio.close()
4196            except OSError as e:
4197                if e.errno != errno.EBADF:
4198                    raise
4199
4200    def test_interrupted_write_unbuffered(self):
4201        self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
4202
4203    def test_interrupted_write_buffered(self):
4204        self.check_interrupted_write(b"xy", b"xy", mode="wb")
4205
4206    def test_interrupted_write_text(self):
4207        self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
4208
4209    @support.no_tracing
4210    def check_reentrant_write(self, data, **fdopen_kwargs):
4211        def on_alarm(*args):
4212            # Will be called reentrantly from the same thread
4213            wio.write(data)
4214            1/0
4215        signal.signal(signal.SIGALRM, on_alarm)
4216        r, w = os.pipe()
4217        wio = self.io.open(w, **fdopen_kwargs)
4218        try:
4219            signal.alarm(1)
4220            # Either the reentrant call to wio.write() fails with RuntimeError,
4221            # or the signal handler raises ZeroDivisionError.
4222            with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
4223                while 1:
4224                    for i in range(100):
4225                        wio.write(data)
4226                        wio.flush()
4227                    # Make sure the buffer doesn't fill up and block further writes
4228                    os.read(r, len(data) * 100)
4229            exc = cm.exception
4230            if isinstance(exc, RuntimeError):
4231                self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
4232        finally:
4233            signal.alarm(0)
4234            wio.close()
4235            os.close(r)
4236
4237    def test_reentrant_write_buffered(self):
4238        self.check_reentrant_write(b"xy", mode="wb")
4239
4240    def test_reentrant_write_text(self):
4241        self.check_reentrant_write("xy", mode="w", encoding="ascii")
4242
4243    def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
4244        """Check that a buffered read, when it gets interrupted (either
4245        returning a partial result or EINTR), properly invokes the signal
4246        handler and retries if the latter returned successfully."""
4247        r, w = os.pipe()
4248        fdopen_kwargs["closefd"] = False
4249        def alarm_handler(sig, frame):
4250            os.write(w, b"bar")
4251        signal.signal(signal.SIGALRM, alarm_handler)
4252        try:
4253            rio = self.io.open(r, **fdopen_kwargs)
4254            os.write(w, b"foo")
4255            signal.alarm(1)
4256            # Expected behaviour:
4257            # - first raw read() returns partial b"foo"
4258            # - second raw read() returns EINTR
4259            # - third raw read() returns b"bar"
4260            self.assertEqual(decode(rio.read(6)), "foobar")
4261        finally:
4262            signal.alarm(0)
4263            rio.close()
4264            os.close(w)
4265            os.close(r)
4266
4267    def test_interrupted_read_retry_buffered(self):
4268        self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
4269                                          mode="rb")
4270
4271    def test_interrupted_read_retry_text(self):
4272        self.check_interrupted_read_retry(lambda x: x,
4273                                          mode="r")
4274
4275    def check_interrupted_write_retry(self, item, **fdopen_kwargs):
4276        """Check that a buffered write, when it gets interrupted (either
4277        returning a partial result or EINTR), properly invokes the signal
4278        handler and retries if the latter returned successfully."""
4279        select = support.import_module("select")
4280
4281        # A quantity that exceeds the buffer size of an anonymous pipe's
4282        # write end.
4283        N = support.PIPE_MAX_SIZE
4284        r, w = os.pipe()
4285        fdopen_kwargs["closefd"] = False
4286
4287        # We need a separate thread to read from the pipe and allow the
4288        # write() to finish.  This thread is started after the SIGALRM is
4289        # received (forcing a first EINTR in write()).
4290        read_results = []
4291        write_finished = False
4292        error = None
4293        def _read():
4294            try:
4295                while not write_finished:
4296                    while r in select.select([r], [], [], 1.0)[0]:
4297                        s = os.read(r, 1024)
4298                        read_results.append(s)
4299            except BaseException as exc:
4300                nonlocal error
4301                error = exc
4302        t = threading.Thread(target=_read)
4303        t.daemon = True
4304        def alarm1(sig, frame):
4305            signal.signal(signal.SIGALRM, alarm2)
4306            signal.alarm(1)
4307        def alarm2(sig, frame):
4308            t.start()
4309
4310        large_data = item * N
4311        signal.signal(signal.SIGALRM, alarm1)
4312        try:
4313            wio = self.io.open(w, **fdopen_kwargs)
4314            signal.alarm(1)
4315            # Expected behaviour:
4316            # - first raw write() is partial (because of the limited pipe buffer
4317            #   and the first alarm)
4318            # - second raw write() returns EINTR (because of the second alarm)
4319            # - subsequent write()s are successful (either partial or complete)
4320            written = wio.write(large_data)
4321            self.assertEqual(N, written)
4322
4323            wio.flush()
4324            write_finished = True
4325            t.join()
4326
4327            self.assertIsNone(error)
4328            self.assertEqual(N, sum(len(x) for x in read_results))
4329        finally:
4330            signal.alarm(0)
4331            write_finished = True
4332            os.close(w)
4333            os.close(r)
4334            # This is deliberate. If we didn't close the file descriptor
4335            # before closing wio, wio would try to flush its internal
4336            # buffer, and could block (in case of failure).
4337            try:
4338                wio.close()
4339            except OSError as e:
4340                if e.errno != errno.EBADF:
4341                    raise
4342
4343    def test_interrupted_write_retry_buffered(self):
4344        self.check_interrupted_write_retry(b"x", mode="wb")
4345
4346    def test_interrupted_write_retry_text(self):
4347        self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
4348
4349
4350class CSignalsTest(SignalsTest):
4351    io = io
4352
4353class PySignalsTest(SignalsTest):
4354    io = pyio
4355
4356    # Handling reentrancy issues would slow down _pyio even more, so the
4357    # tests are disabled.
4358    test_reentrant_write_buffered = None
4359    test_reentrant_write_text = None
4360
4361
4362def load_tests(*args):
4363    tests = (CIOTest, PyIOTest, APIMismatchTest,
4364             CBufferedReaderTest, PyBufferedReaderTest,
4365             CBufferedWriterTest, PyBufferedWriterTest,
4366             CBufferedRWPairTest, PyBufferedRWPairTest,
4367             CBufferedRandomTest, PyBufferedRandomTest,
4368             StatefulIncrementalDecoderTest,
4369             CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
4370             CTextIOWrapperTest, PyTextIOWrapperTest,
4371             CMiscIOTest, PyMiscIOTest,
4372             CSignalsTest, PySignalsTest,
4373             )
4374
4375    # Put the namespaces of the IO module we are testing and some useful mock
4376    # classes in the __dict__ of each test.
4377    mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
4378             MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead,
4379             SlowFlushRawIO)
4380    all_members = io.__all__ + ["IncrementalNewlineDecoder"]
4381    c_io_ns = {name : getattr(io, name) for name in all_members}
4382    py_io_ns = {name : getattr(pyio, name) for name in all_members}
4383    globs = globals()
4384    c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4385    py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
4386    # Avoid turning open into a bound method.
4387    py_io_ns["open"] = pyio.OpenWrapper
4388    for test in tests:
4389        if test.__name__.startswith("C"):
4390            for name, obj in c_io_ns.items():
4391                setattr(test, name, obj)
4392        elif test.__name__.startswith("Py"):
4393            for name, obj in py_io_ns.items():
4394                setattr(test, name, obj)
4395
4396    suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
4397    return suite
4398
4399if __name__ == "__main__":
4400    unittest.main()
4401