1"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11#     (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as an attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
22import abc
23import array
24import errno
25import locale
26import os
27import pickle
28import random
29import signal
30import sys
31import sysconfig
32import textwrap
33import threading
34import time
35import unittest
36import warnings
37import weakref
38from collections import deque, UserList
39from itertools import cycle, count
40from test import support
41from test.support.script_helper import (
42    assert_python_ok, assert_python_failure, run_python_until_end)
43from test.support import import_helper
44from test.support import os_helper
45from test.support import threading_helper
46from test.support import warnings_helper
47from test.support.os_helper import FakePath
48
49import codecs
50import io  # C implementation of io
51import _pyio as pyio # Python implementation of io
52
53try:
54    import ctypes
55except ImportError:
56    def byteslike(*pos, **kw):
57        return array.array("b", bytes(*pos, **kw))
58else:
59    def byteslike(*pos, **kw):
60        """Create a bytes-like object having no string or sequence methods"""
61        data = bytes(*pos, **kw)
62        obj = EmptyStruct()
63        ctypes.resize(obj, len(data))
64        memoryview(obj).cast("B")[:] = data
65        return obj
66    class EmptyStruct(ctypes.Structure):
67        pass
68
69_cflags = sysconfig.get_config_var('CFLAGS') or ''
70_config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
71MEMORY_SANITIZER = (
72    '-fsanitize=memory' in _cflags or
73    '--with-memory-sanitizer' in _config_args
74)
75
76ADDRESS_SANITIZER = (
77    '-fsanitize=address' in _cflags
78)
79
80# Does io.IOBase finalizer log the exception if the close() method fails?
81# The exception is ignored silently by default in release build.
82IOBASE_EMITS_UNRAISABLE = (hasattr(sys, "gettotalrefcount") or sys.flags.dev_mode)
83
84
85def _default_chunk_size():
86    """Get the default TextIOWrapper chunk size"""
87    with open(__file__, "r", encoding="latin-1") as f:
88        return f._CHUNK_SIZE
89
90
91class MockRawIOWithoutRead:
92    """A RawIO implementation without read(), so as to exercise the default
93    RawIO.read() which calls readinto()."""
94
95    def __init__(self, read_stack=()):
96        self._read_stack = list(read_stack)
97        self._write_stack = []
98        self._reads = 0
99        self._extraneous_reads = 0
100
101    def write(self, b):
102        self._write_stack.append(bytes(b))
103        return len(b)
104
105    def writable(self):
106        return True
107
108    def fileno(self):
109        return 42
110
111    def readable(self):
112        return True
113
114    def seekable(self):
115        return True
116
117    def seek(self, pos, whence):
118        return 0   # wrong but we gotta return something
119
120    def tell(self):
121        return 0   # same comment as above
122
123    def readinto(self, buf):
124        self._reads += 1
125        max_len = len(buf)
126        try:
127            data = self._read_stack[0]
128        except IndexError:
129            self._extraneous_reads += 1
130            return 0
131        if data is None:
132            del self._read_stack[0]
133            return None
134        n = len(data)
135        if len(data) <= max_len:
136            del self._read_stack[0]
137            buf[:n] = data
138            return n
139        else:
140            buf[:] = data[:max_len]
141            self._read_stack[0] = data[max_len:]
142            return max_len
143
144    def truncate(self, pos=None):
145        return pos
146
147class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
148    pass
149
150class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
151    pass
152
153
154class MockRawIO(MockRawIOWithoutRead):
155
156    def read(self, n=None):
157        self._reads += 1
158        try:
159            return self._read_stack.pop(0)
160        except:
161            self._extraneous_reads += 1
162            return b""
163
164class CMockRawIO(MockRawIO, io.RawIOBase):
165    pass
166
167class PyMockRawIO(MockRawIO, pyio.RawIOBase):
168    pass
169
170
171class MisbehavedRawIO(MockRawIO):
172    def write(self, b):
173        return super().write(b) * 2
174
175    def read(self, n=None):
176        return super().read(n) * 2
177
178    def seek(self, pos, whence):
179        return -123
180
181    def tell(self):
182        return -456
183
184    def readinto(self, buf):
185        super().readinto(buf)
186        return len(buf) * 5
187
188class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
189    pass
190
191class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
192    pass
193
194
195class SlowFlushRawIO(MockRawIO):
196    def __init__(self):
197        super().__init__()
198        self.in_flush = threading.Event()
199
200    def flush(self):
201        self.in_flush.set()
202        time.sleep(0.25)
203
204class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase):
205    pass
206
207class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase):
208    pass
209
210
211class CloseFailureIO(MockRawIO):
212    closed = 0
213
214    def close(self):
215        if not self.closed:
216            self.closed = 1
217            raise OSError
218
219class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
220    pass
221
222class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
223    pass
224
225
226class MockFileIO:
227
228    def __init__(self, data):
229        self.read_history = []
230        super().__init__(data)
231
232    def read(self, n=None):
233        res = super().read(n)
234        self.read_history.append(None if res is None else len(res))
235        return res
236
237    def readinto(self, b):
238        res = super().readinto(b)
239        self.read_history.append(res)
240        return res
241
242class CMockFileIO(MockFileIO, io.BytesIO):
243    pass
244
245class PyMockFileIO(MockFileIO, pyio.BytesIO):
246    pass
247
248
249class MockUnseekableIO:
250    def seekable(self):
251        return False
252
253    def seek(self, *args):
254        raise self.UnsupportedOperation("not seekable")
255
256    def tell(self, *args):
257        raise self.UnsupportedOperation("not seekable")
258
259    def truncate(self, *args):
260        raise self.UnsupportedOperation("not seekable")
261
262class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
263    UnsupportedOperation = io.UnsupportedOperation
264
265class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
266    UnsupportedOperation = pyio.UnsupportedOperation
267
268
269class MockNonBlockWriterIO:
270
271    def __init__(self):
272        self._write_stack = []
273        self._blocker_char = None
274
275    def pop_written(self):
276        s = b"".join(self._write_stack)
277        self._write_stack[:] = []
278        return s
279
280    def block_on(self, char):
281        """Block when a given char is encountered."""
282        self._blocker_char = char
283
284    def readable(self):
285        return True
286
287    def seekable(self):
288        return True
289
290    def seek(self, pos, whence=0):
291        # naive implementation, enough for tests
292        return 0
293
294    def writable(self):
295        return True
296
297    def write(self, b):
298        b = bytes(b)
299        n = -1
300        if self._blocker_char:
301            try:
302                n = b.index(self._blocker_char)
303            except ValueError:
304                pass
305            else:
306                if n > 0:
307                    # write data up to the first blocker
308                    self._write_stack.append(b[:n])
309                    return n
310                else:
311                    # cancel blocker and indicate would block
312                    self._blocker_char = None
313                    return None
314        self._write_stack.append(b)
315        return len(b)
316
317class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
318    BlockingIOError = io.BlockingIOError
319
320class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
321    BlockingIOError = pyio.BlockingIOError
322
323
324class IOTest(unittest.TestCase):
325
326    def setUp(self):
327        os_helper.unlink(os_helper.TESTFN)
328
329    def tearDown(self):
330        os_helper.unlink(os_helper.TESTFN)
331
332    def write_ops(self, f):
333        self.assertEqual(f.write(b"blah."), 5)
334        f.truncate(0)
335        self.assertEqual(f.tell(), 5)
336        f.seek(0)
337
338        self.assertEqual(f.write(b"blah."), 5)
339        self.assertEqual(f.seek(0), 0)
340        self.assertEqual(f.write(b"Hello."), 6)
341        self.assertEqual(f.tell(), 6)
342        self.assertEqual(f.seek(-1, 1), 5)
343        self.assertEqual(f.tell(), 5)
344        buffer = bytearray(b" world\n\n\n")
345        self.assertEqual(f.write(buffer), 9)
346        buffer[:] = b"*" * 9  # Overwrite our copy of the data
347        self.assertEqual(f.seek(0), 0)
348        self.assertEqual(f.write(b"h"), 1)
349        self.assertEqual(f.seek(-1, 2), 13)
350        self.assertEqual(f.tell(), 13)
351
352        self.assertEqual(f.truncate(12), 12)
353        self.assertEqual(f.tell(), 13)
354        self.assertRaises(TypeError, f.seek, 0.0)
355
356    def read_ops(self, f, buffered=False):
357        data = f.read(5)
358        self.assertEqual(data, b"hello")
359        data = byteslike(data)
360        self.assertEqual(f.readinto(data), 5)
361        self.assertEqual(bytes(data), b" worl")
362        data = bytearray(5)
363        self.assertEqual(f.readinto(data), 2)
364        self.assertEqual(len(data), 5)
365        self.assertEqual(data[:2], b"d\n")
366        self.assertEqual(f.seek(0), 0)
367        self.assertEqual(f.read(20), b"hello world\n")
368        self.assertEqual(f.read(1), b"")
369        self.assertEqual(f.readinto(byteslike(b"x")), 0)
370        self.assertEqual(f.seek(-6, 2), 6)
371        self.assertEqual(f.read(5), b"world")
372        self.assertEqual(f.read(0), b"")
373        self.assertEqual(f.readinto(byteslike()), 0)
374        self.assertEqual(f.seek(-6, 1), 5)
375        self.assertEqual(f.read(5), b" worl")
376        self.assertEqual(f.tell(), 10)
377        self.assertRaises(TypeError, f.seek, 0.0)
378        if buffered:
379            f.seek(0)
380            self.assertEqual(f.read(), b"hello world\n")
381            f.seek(6)
382            self.assertEqual(f.read(), b"world\n")
383            self.assertEqual(f.read(), b"")
384            f.seek(0)
385            data = byteslike(5)
386            self.assertEqual(f.readinto1(data), 5)
387            self.assertEqual(bytes(data), b"hello")
388
389    LARGE = 2**31
390
391    def large_file_ops(self, f):
392        assert f.readable()
393        assert f.writable()
394        try:
395            self.assertEqual(f.seek(self.LARGE), self.LARGE)
396        except (OverflowError, ValueError):
397            self.skipTest("no largefile support")
398        self.assertEqual(f.tell(), self.LARGE)
399        self.assertEqual(f.write(b"xxx"), 3)
400        self.assertEqual(f.tell(), self.LARGE + 3)
401        self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
402        self.assertEqual(f.truncate(), self.LARGE + 2)
403        self.assertEqual(f.tell(), self.LARGE + 2)
404        self.assertEqual(f.seek(0, 2), self.LARGE + 2)
405        self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
406        self.assertEqual(f.tell(), self.LARGE + 2)
407        self.assertEqual(f.seek(0, 2), self.LARGE + 1)
408        self.assertEqual(f.seek(-1, 2), self.LARGE)
409        self.assertEqual(f.read(2), b"x")
410
411    def test_invalid_operations(self):
412        # Try writing on a file opened in read mode and vice-versa.
413        exc = self.UnsupportedOperation
414        with self.open(os_helper.TESTFN, "w", encoding="utf-8") as fp:
415            self.assertRaises(exc, fp.read)
416            self.assertRaises(exc, fp.readline)
417        with self.open(os_helper.TESTFN, "wb") as fp:
418            self.assertRaises(exc, fp.read)
419            self.assertRaises(exc, fp.readline)
420        with self.open(os_helper.TESTFN, "wb", buffering=0) as fp:
421            self.assertRaises(exc, fp.read)
422            self.assertRaises(exc, fp.readline)
423        with self.open(os_helper.TESTFN, "rb", buffering=0) as fp:
424            self.assertRaises(exc, fp.write, b"blah")
425            self.assertRaises(exc, fp.writelines, [b"blah\n"])
426        with self.open(os_helper.TESTFN, "rb") as fp:
427            self.assertRaises(exc, fp.write, b"blah")
428            self.assertRaises(exc, fp.writelines, [b"blah\n"])
429        with self.open(os_helper.TESTFN, "r", encoding="utf-8") as fp:
430            self.assertRaises(exc, fp.write, "blah")
431            self.assertRaises(exc, fp.writelines, ["blah\n"])
432            # Non-zero seeking from current or end pos
433            self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
434            self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
435
436    def test_optional_abilities(self):
437        # Test for OSError when optional APIs are not supported
438        # The purpose of this test is to try fileno(), reading, writing and
439        # seeking operations with various objects that indicate they do not
440        # support these operations.
441
442        def pipe_reader():
443            [r, w] = os.pipe()
444            os.close(w)  # So that read() is harmless
445            return self.FileIO(r, "r")
446
447        def pipe_writer():
448            [r, w] = os.pipe()
449            self.addCleanup(os.close, r)
450            # Guarantee that we can write into the pipe without blocking
451            thread = threading.Thread(target=os.read, args=(r, 100))
452            thread.start()
453            self.addCleanup(thread.join)
454            return self.FileIO(w, "w")
455
456        def buffered_reader():
457            return self.BufferedReader(self.MockUnseekableIO())
458
459        def buffered_writer():
460            return self.BufferedWriter(self.MockUnseekableIO())
461
462        def buffered_random():
463            return self.BufferedRandom(self.BytesIO())
464
465        def buffered_rw_pair():
466            return self.BufferedRWPair(self.MockUnseekableIO(),
467                self.MockUnseekableIO())
468
469        def text_reader():
470            class UnseekableReader(self.MockUnseekableIO):
471                writable = self.BufferedIOBase.writable
472                write = self.BufferedIOBase.write
473            return self.TextIOWrapper(UnseekableReader(), "ascii")
474
475        def text_writer():
476            class UnseekableWriter(self.MockUnseekableIO):
477                readable = self.BufferedIOBase.readable
478                read = self.BufferedIOBase.read
479            return self.TextIOWrapper(UnseekableWriter(), "ascii")
480
481        tests = (
482            (pipe_reader, "fr"), (pipe_writer, "fw"),
483            (buffered_reader, "r"), (buffered_writer, "w"),
484            (buffered_random, "rws"), (buffered_rw_pair, "rw"),
485            (text_reader, "r"), (text_writer, "w"),
486            (self.BytesIO, "rws"), (self.StringIO, "rws"),
487        )
488        for [test, abilities] in tests:
489            with self.subTest(test), test() as obj:
490                readable = "r" in abilities
491                self.assertEqual(obj.readable(), readable)
492                writable = "w" in abilities
493                self.assertEqual(obj.writable(), writable)
494
495                if isinstance(obj, self.TextIOBase):
496                    data = "3"
497                elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
498                    data = b"3"
499                else:
500                    self.fail("Unknown base class")
501
502                if "f" in abilities:
503                    obj.fileno()
504                else:
505                    self.assertRaises(OSError, obj.fileno)
506
507                if readable:
508                    obj.read(1)
509                    obj.read()
510                else:
511                    self.assertRaises(OSError, obj.read, 1)
512                    self.assertRaises(OSError, obj.read)
513
514                if writable:
515                    obj.write(data)
516                else:
517                    self.assertRaises(OSError, obj.write, data)
518
519                if sys.platform.startswith("win") and test in (
520                        pipe_reader, pipe_writer):
521                    # Pipes seem to appear as seekable on Windows
522                    continue
523                seekable = "s" in abilities
524                self.assertEqual(obj.seekable(), seekable)
525
526                if seekable:
527                    obj.tell()
528                    obj.seek(0)
529                else:
530                    self.assertRaises(OSError, obj.tell)
531                    self.assertRaises(OSError, obj.seek, 0)
532
533                if writable and seekable:
534                    obj.truncate()
535                    obj.truncate(0)
536                else:
537                    self.assertRaises(OSError, obj.truncate)
538                    self.assertRaises(OSError, obj.truncate, 0)
539
540    def test_open_handles_NUL_chars(self):
541        fn_with_NUL = 'foo\0bar'
542        self.assertRaises(ValueError, self.open, fn_with_NUL, 'w', encoding="utf-8")
543
544        bytes_fn = bytes(fn_with_NUL, 'ascii')
545        with warnings.catch_warnings():
546            warnings.simplefilter("ignore", DeprecationWarning)
547            self.assertRaises(ValueError, self.open, bytes_fn, 'w', encoding="utf-8")
548
549    def test_raw_file_io(self):
550        with self.open(os_helper.TESTFN, "wb", buffering=0) as f:
551            self.assertEqual(f.readable(), False)
552            self.assertEqual(f.writable(), True)
553            self.assertEqual(f.seekable(), True)
554            self.write_ops(f)
555        with self.open(os_helper.TESTFN, "rb", buffering=0) as f:
556            self.assertEqual(f.readable(), True)
557            self.assertEqual(f.writable(), False)
558            self.assertEqual(f.seekable(), True)
559            self.read_ops(f)
560
561    def test_buffered_file_io(self):
562        with self.open(os_helper.TESTFN, "wb") as f:
563            self.assertEqual(f.readable(), False)
564            self.assertEqual(f.writable(), True)
565            self.assertEqual(f.seekable(), True)
566            self.write_ops(f)
567        with self.open(os_helper.TESTFN, "rb") as f:
568            self.assertEqual(f.readable(), True)
569            self.assertEqual(f.writable(), False)
570            self.assertEqual(f.seekable(), True)
571            self.read_ops(f, True)
572
573    def test_readline(self):
574        with self.open(os_helper.TESTFN, "wb") as f:
575            f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
576        with self.open(os_helper.TESTFN, "rb") as f:
577            self.assertEqual(f.readline(), b"abc\n")
578            self.assertEqual(f.readline(10), b"def\n")
579            self.assertEqual(f.readline(2), b"xy")
580            self.assertEqual(f.readline(4), b"zzy\n")
581            self.assertEqual(f.readline(), b"foo\x00bar\n")
582            self.assertEqual(f.readline(None), b"another line")
583            self.assertRaises(TypeError, f.readline, 5.3)
584        with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
585            self.assertRaises(TypeError, f.readline, 5.3)
586
587    def test_readline_nonsizeable(self):
588        # Issue #30061
589        # Crash when readline() returns an object without __len__
590        class R(self.IOBase):
591            def readline(self):
592                return None
593        self.assertRaises((TypeError, StopIteration), next, R())
594
595    def test_next_nonsizeable(self):
596        # Issue #30061
597        # Crash when __next__() returns an object without __len__
598        class R(self.IOBase):
599            def __next__(self):
600                return None
601        self.assertRaises(TypeError, R().readlines, 1)
602
603    def test_raw_bytes_io(self):
604        f = self.BytesIO()
605        self.write_ops(f)
606        data = f.getvalue()
607        self.assertEqual(data, b"hello world\n")
608        f = self.BytesIO(data)
609        self.read_ops(f, True)
610
611    def test_large_file_ops(self):
612        # On Windows and Mac OSX this test consumes large resources; It takes
613        # a long time to build the >2 GiB file and takes >2 GiB of disk space
614        # therefore the resource must be enabled to run this test.
615        if sys.platform[:3] == 'win' or sys.platform == 'darwin':
616            support.requires(
617                'largefile',
618                'test requires %s bytes and a long time to run' % self.LARGE)
619        with self.open(os_helper.TESTFN, "w+b", 0) as f:
620            self.large_file_ops(f)
621        with self.open(os_helper.TESTFN, "w+b") as f:
622            self.large_file_ops(f)
623
624    def test_with_open(self):
625        for bufsize in (0, 100):
626            f = None
627            with self.open(os_helper.TESTFN, "wb", bufsize) as f:
628                f.write(b"xxx")
629            self.assertEqual(f.closed, True)
630            f = None
631            try:
632                with self.open(os_helper.TESTFN, "wb", bufsize) as f:
633                    1/0
634            except ZeroDivisionError:
635                self.assertEqual(f.closed, True)
636            else:
637                self.fail("1/0 didn't raise an exception")
638
639    # issue 5008
640    def test_append_mode_tell(self):
641        with self.open(os_helper.TESTFN, "wb") as f:
642            f.write(b"xxx")
643        with self.open(os_helper.TESTFN, "ab", buffering=0) as f:
644            self.assertEqual(f.tell(), 3)
645        with self.open(os_helper.TESTFN, "ab") as f:
646            self.assertEqual(f.tell(), 3)
647        with self.open(os_helper.TESTFN, "a", encoding="utf-8") as f:
648            self.assertGreater(f.tell(), 0)
649
650    def test_destructor(self):
651        record = []
652        class MyFileIO(self.FileIO):
653            def __del__(self):
654                record.append(1)
655                try:
656                    f = super().__del__
657                except AttributeError:
658                    pass
659                else:
660                    f()
661            def close(self):
662                record.append(2)
663                super().close()
664            def flush(self):
665                record.append(3)
666                super().flush()
667        with warnings_helper.check_warnings(('', ResourceWarning)):
668            f = MyFileIO(os_helper.TESTFN, "wb")
669            f.write(b"xxx")
670            del f
671            support.gc_collect()
672            self.assertEqual(record, [1, 2, 3])
673            with self.open(os_helper.TESTFN, "rb") as f:
674                self.assertEqual(f.read(), b"xxx")
675
676    def _check_base_destructor(self, base):
677        record = []
678        class MyIO(base):
679            def __init__(self):
680                # This exercises the availability of attributes on object
681                # destruction.
682                # (in the C version, close() is called by the tp_dealloc
683                # function, not by __del__)
684                self.on_del = 1
685                self.on_close = 2
686                self.on_flush = 3
687            def __del__(self):
688                record.append(self.on_del)
689                try:
690                    f = super().__del__
691                except AttributeError:
692                    pass
693                else:
694                    f()
695            def close(self):
696                record.append(self.on_close)
697                super().close()
698            def flush(self):
699                record.append(self.on_flush)
700                super().flush()
701        f = MyIO()
702        del f
703        support.gc_collect()
704        self.assertEqual(record, [1, 2, 3])
705
706    def test_IOBase_destructor(self):
707        self._check_base_destructor(self.IOBase)
708
709    def test_RawIOBase_destructor(self):
710        self._check_base_destructor(self.RawIOBase)
711
712    def test_BufferedIOBase_destructor(self):
713        self._check_base_destructor(self.BufferedIOBase)
714
715    def test_TextIOBase_destructor(self):
716        self._check_base_destructor(self.TextIOBase)
717
718    def test_close_flushes(self):
719        with self.open(os_helper.TESTFN, "wb") as f:
720            f.write(b"xxx")
721        with self.open(os_helper.TESTFN, "rb") as f:
722            self.assertEqual(f.read(), b"xxx")
723
724    def test_array_writes(self):
725        a = array.array('i', range(10))
726        n = len(a.tobytes())
727        def check(f):
728            with f:
729                self.assertEqual(f.write(a), n)
730                f.writelines((a,))
731        check(self.BytesIO())
732        check(self.FileIO(os_helper.TESTFN, "w"))
733        check(self.BufferedWriter(self.MockRawIO()))
734        check(self.BufferedRandom(self.MockRawIO()))
735        check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
736
737    def test_closefd(self):
738        self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'w',
739                          encoding="utf-8", closefd=False)
740
741    def test_read_closed(self):
742        with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
743            f.write("egg\n")
744        with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
745            file = self.open(f.fileno(), "r", encoding="utf-8", closefd=False)
746            self.assertEqual(file.read(), "egg\n")
747            file.seek(0)
748            file.close()
749            self.assertRaises(ValueError, file.read)
750        with self.open(os_helper.TESTFN, "rb") as f:
751            file = self.open(f.fileno(), "rb", closefd=False)
752            self.assertEqual(file.read()[:3], b"egg")
753            file.close()
754            self.assertRaises(ValueError, file.readinto, bytearray(1))
755
756    def test_no_closefd_with_filename(self):
757        # can't use closefd in combination with a file name
758        self.assertRaises(ValueError, self.open, os_helper.TESTFN, "r",
759                          encoding="utf-8", closefd=False)
760
761    def test_closefd_attr(self):
762        with self.open(os_helper.TESTFN, "wb") as f:
763            f.write(b"egg\n")
764        with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
765            self.assertEqual(f.buffer.raw.closefd, True)
766            file = self.open(f.fileno(), "r", encoding="utf-8", closefd=False)
767            self.assertEqual(file.buffer.raw.closefd, False)
768
769    def test_garbage_collection(self):
770        # FileIO objects are collected, and collecting them flushes
771        # all data to disk.
772        with warnings_helper.check_warnings(('', ResourceWarning)):
773            f = self.FileIO(os_helper.TESTFN, "wb")
774            f.write(b"abcxxx")
775            f.f = f
776            wr = weakref.ref(f)
777            del f
778            support.gc_collect()
779        self.assertIsNone(wr(), wr)
780        with self.open(os_helper.TESTFN, "rb") as f:
781            self.assertEqual(f.read(), b"abcxxx")
782
783    def test_unbounded_file(self):
784        # Issue #1174606: reading from an unbounded stream such as /dev/zero.
785        zero = "/dev/zero"
786        if not os.path.exists(zero):
787            self.skipTest("{0} does not exist".format(zero))
788        if sys.maxsize > 0x7FFFFFFF:
789            self.skipTest("test can only run in a 32-bit address space")
790        if support.real_max_memuse < support._2G:
791            self.skipTest("test requires at least 2 GiB of memory")
792        with self.open(zero, "rb", buffering=0) as f:
793            self.assertRaises(OverflowError, f.read)
794        with self.open(zero, "rb") as f:
795            self.assertRaises(OverflowError, f.read)
796        with self.open(zero, "r") as f:
797            self.assertRaises(OverflowError, f.read)
798
799    def check_flush_error_on_close(self, *args, **kwargs):
800        # Test that the file is closed despite failed flush
801        # and that flush() is called before file closed.
802        f = self.open(*args, **kwargs)
803        closed = []
804        def bad_flush():
805            closed[:] = [f.closed]
806            raise OSError()
807        f.flush = bad_flush
808        self.assertRaises(OSError, f.close) # exception not swallowed
809        self.assertTrue(f.closed)
810        self.assertTrue(closed)      # flush() called
811        self.assertFalse(closed[0])  # flush() called before file closed
812        f.flush = lambda: None  # break reference loop
813
814    def test_flush_error_on_close(self):
815        # raw file
816        # Issue #5700: io.FileIO calls flush() after file closed
817        self.check_flush_error_on_close(os_helper.TESTFN, 'wb', buffering=0)
818        fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
819        self.check_flush_error_on_close(fd, 'wb', buffering=0)
820        fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
821        self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
822        os.close(fd)
823        # buffered io
824        self.check_flush_error_on_close(os_helper.TESTFN, 'wb')
825        fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
826        self.check_flush_error_on_close(fd, 'wb')
827        fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
828        self.check_flush_error_on_close(fd, 'wb', closefd=False)
829        os.close(fd)
830        # text io
831        self.check_flush_error_on_close(os_helper.TESTFN, 'w', encoding="utf-8")
832        fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
833        self.check_flush_error_on_close(fd, 'w', encoding="utf-8")
834        fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
835        self.check_flush_error_on_close(fd, 'w', encoding="utf-8", closefd=False)
836        os.close(fd)
837
838    def test_multi_close(self):
839        f = self.open(os_helper.TESTFN, "wb", buffering=0)
840        f.close()
841        f.close()
842        f.close()
843        self.assertRaises(ValueError, f.flush)
844
845    def test_RawIOBase_read(self):
846        # Exercise the default limited RawIOBase.read(n) implementation (which
847        # calls readinto() internally).
848        rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
849        self.assertEqual(rawio.read(2), b"ab")
850        self.assertEqual(rawio.read(2), b"c")
851        self.assertEqual(rawio.read(2), b"d")
852        self.assertEqual(rawio.read(2), None)
853        self.assertEqual(rawio.read(2), b"ef")
854        self.assertEqual(rawio.read(2), b"g")
855        self.assertEqual(rawio.read(2), None)
856        self.assertEqual(rawio.read(2), b"")
857
858    def test_types_have_dict(self):
859        test = (
860            self.IOBase(),
861            self.RawIOBase(),
862            self.TextIOBase(),
863            self.StringIO(),
864            self.BytesIO()
865        )
866        for obj in test:
867            self.assertTrue(hasattr(obj, "__dict__"))
868
869    def test_opener(self):
870        with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
871            f.write("egg\n")
872        fd = os.open(os_helper.TESTFN, os.O_RDONLY)
873        def opener(path, flags):
874            return fd
875        with self.open("non-existent", "r", encoding="utf-8", opener=opener) as f:
876            self.assertEqual(f.read(), "egg\n")
877
878    def test_bad_opener_negative_1(self):
879        # Issue #27066.
880        def badopener(fname, flags):
881            return -1
882        with self.assertRaises(ValueError) as cm:
883            open('non-existent', 'r', opener=badopener)
884        self.assertEqual(str(cm.exception), 'opener returned -1')
885
886    def test_bad_opener_other_negative(self):
887        # Issue #27066.
888        def badopener(fname, flags):
889            return -2
890        with self.assertRaises(ValueError) as cm:
891            open('non-existent', 'r', opener=badopener)
892        self.assertEqual(str(cm.exception), 'opener returned -2')
893
894    def test_fileio_closefd(self):
895        # Issue #4841
896        with self.open(__file__, 'rb') as f1, \
897             self.open(__file__, 'rb') as f2:
898            fileio = self.FileIO(f1.fileno(), closefd=False)
899            # .__init__() must not close f1
900            fileio.__init__(f2.fileno(), closefd=False)
901            f1.readline()
902            # .close() must not close f2
903            fileio.close()
904            f2.readline()
905
906    def test_nonbuffered_textio(self):
907        with warnings_helper.check_no_resource_warning(self):
908            with self.assertRaises(ValueError):
909                self.open(os_helper.TESTFN, 'w', encoding="utf-8", buffering=0)
910
911    def test_invalid_newline(self):
912        with warnings_helper.check_no_resource_warning(self):
913            with self.assertRaises(ValueError):
914                self.open(os_helper.TESTFN, 'w', encoding="utf-8", newline='invalid')
915
916    def test_buffered_readinto_mixin(self):
917        # Test the implementation provided by BufferedIOBase
918        class Stream(self.BufferedIOBase):
919            def read(self, size):
920                return b"12345"
921            read1 = read
922        stream = Stream()
923        for method in ("readinto", "readinto1"):
924            with self.subTest(method):
925                buffer = byteslike(5)
926                self.assertEqual(getattr(stream, method)(buffer), 5)
927                self.assertEqual(bytes(buffer), b"12345")
928
929    def test_fspath_support(self):
930        def check_path_succeeds(path):
931            with self.open(path, "w", encoding="utf-8") as f:
932                f.write("egg\n")
933
934            with self.open(path, "r", encoding="utf-8") as f:
935                self.assertEqual(f.read(), "egg\n")
936
937        check_path_succeeds(FakePath(os_helper.TESTFN))
938        check_path_succeeds(FakePath(os.fsencode(os_helper.TESTFN)))
939
940        with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
941            bad_path = FakePath(f.fileno())
942            with self.assertRaises(TypeError):
943                self.open(bad_path, 'w', encoding="utf-8")
944
945        bad_path = FakePath(None)
946        with self.assertRaises(TypeError):
947            self.open(bad_path, 'w', encoding="utf-8")
948
949        bad_path = FakePath(FloatingPointError)
950        with self.assertRaises(FloatingPointError):
951            self.open(bad_path, 'w', encoding="utf-8")
952
953        # ensure that refcounting is correct with some error conditions
954        with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
955            self.open(FakePath(os_helper.TESTFN), 'rwxa', encoding="utf-8")
956
957    def test_RawIOBase_readall(self):
958        # Exercise the default unlimited RawIOBase.read() and readall()
959        # implementations.
960        rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
961        self.assertEqual(rawio.read(), b"abcdefg")
962        rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
963        self.assertEqual(rawio.readall(), b"abcdefg")
964
965    def test_BufferedIOBase_readinto(self):
966        # Exercise the default BufferedIOBase.readinto() and readinto1()
967        # implementations (which call read() or read1() internally).
968        class Reader(self.BufferedIOBase):
969            def __init__(self, avail):
970                self.avail = avail
971            def read(self, size):
972                result = self.avail[:size]
973                self.avail = self.avail[size:]
974                return result
975            def read1(self, size):
976                """Returns no more than 5 bytes at once"""
977                return self.read(min(size, 5))
978        tests = (
979            # (test method, total data available, read buffer size, expected
980            #     read size)
981            ("readinto", 10, 5, 5),
982            ("readinto", 10, 6, 6),  # More than read1() can return
983            ("readinto", 5, 6, 5),  # Buffer larger than total available
984            ("readinto", 6, 7, 6),
985            ("readinto", 10, 0, 0),  # Empty buffer
986            ("readinto1", 10, 5, 5),  # Result limited to single read1() call
987            ("readinto1", 10, 6, 5),  # Buffer larger than read1() can return
988            ("readinto1", 5, 6, 5),  # Buffer larger than total available
989            ("readinto1", 6, 7, 5),
990            ("readinto1", 10, 0, 0),  # Empty buffer
991        )
992        UNUSED_BYTE = 0x81
993        for test in tests:
994            with self.subTest(test):
995                method, avail, request, result = test
996                reader = Reader(bytes(range(avail)))
997                buffer = bytearray((UNUSED_BYTE,) * request)
998                method = getattr(reader, method)
999                self.assertEqual(method(buffer), result)
1000                self.assertEqual(len(buffer), request)
1001                self.assertSequenceEqual(buffer[:result], range(result))
1002                unused = (UNUSED_BYTE,) * (request - result)
1003                self.assertSequenceEqual(buffer[result:], unused)
1004                self.assertEqual(len(reader.avail), avail - result)
1005
1006    def test_close_assert(self):
1007        class R(self.IOBase):
1008            def __setattr__(self, name, value):
1009                pass
1010            def flush(self):
1011                raise OSError()
1012        f = R()
1013        # This would cause an assertion failure.
1014        self.assertRaises(OSError, f.close)
1015
1016        # Silence destructor error
1017        R.flush = lambda self: None
1018
1019
1020class CIOTest(IOTest):
1021
1022    def test_IOBase_finalize(self):
1023        # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
1024        # class which inherits IOBase and an object of this class are caught
1025        # in a reference cycle and close() is already in the method cache.
1026        class MyIO(self.IOBase):
1027            def close(self):
1028                pass
1029
1030        # create an instance to populate the method cache
1031        MyIO()
1032        obj = MyIO()
1033        obj.obj = obj
1034        wr = weakref.ref(obj)
1035        del MyIO
1036        del obj
1037        support.gc_collect()
1038        self.assertIsNone(wr(), wr)
1039
1040class PyIOTest(IOTest):
1041    pass
1042
1043
1044@support.cpython_only
1045class APIMismatchTest(unittest.TestCase):
1046
1047    def test_RawIOBase_io_in_pyio_match(self):
1048        """Test that pyio RawIOBase class has all c RawIOBase methods"""
1049        mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
1050                                               ignore=('__weakref__',))
1051        self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
1052
1053    def test_RawIOBase_pyio_in_io_match(self):
1054        """Test that c RawIOBase class has all pyio RawIOBase methods"""
1055        mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
1056        self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
1057
1058
1059class CommonBufferedTests:
1060    # Tests common to BufferedReader, BufferedWriter and BufferedRandom
1061
1062    def test_detach(self):
1063        raw = self.MockRawIO()
1064        buf = self.tp(raw)
1065        self.assertIs(buf.detach(), raw)
1066        self.assertRaises(ValueError, buf.detach)
1067
1068        repr(buf)  # Should still work
1069
1070    def test_fileno(self):
1071        rawio = self.MockRawIO()
1072        bufio = self.tp(rawio)
1073
1074        self.assertEqual(42, bufio.fileno())
1075
1076    def test_invalid_args(self):
1077        rawio = self.MockRawIO()
1078        bufio = self.tp(rawio)
1079        # Invalid whence
1080        self.assertRaises(ValueError, bufio.seek, 0, -1)
1081        self.assertRaises(ValueError, bufio.seek, 0, 9)
1082
1083    def test_override_destructor(self):
1084        tp = self.tp
1085        record = []
1086        class MyBufferedIO(tp):
1087            def __del__(self):
1088                record.append(1)
1089                try:
1090                    f = super().__del__
1091                except AttributeError:
1092                    pass
1093                else:
1094                    f()
1095            def close(self):
1096                record.append(2)
1097                super().close()
1098            def flush(self):
1099                record.append(3)
1100                super().flush()
1101        rawio = self.MockRawIO()
1102        bufio = MyBufferedIO(rawio)
1103        del bufio
1104        support.gc_collect()
1105        self.assertEqual(record, [1, 2, 3])
1106
1107    def test_context_manager(self):
1108        # Test usability as a context manager
1109        rawio = self.MockRawIO()
1110        bufio = self.tp(rawio)
1111        def _with():
1112            with bufio:
1113                pass
1114        _with()
1115        # bufio should now be closed, and using it a second time should raise
1116        # a ValueError.
1117        self.assertRaises(ValueError, _with)
1118
1119    def test_error_through_destructor(self):
1120        # Test that the exception state is not modified by a destructor,
1121        # even if close() fails.
1122        rawio = self.CloseFailureIO()
1123        with support.catch_unraisable_exception() as cm:
1124            with self.assertRaises(AttributeError):
1125                self.tp(rawio).xyzzy
1126
1127            if not IOBASE_EMITS_UNRAISABLE:
1128                self.assertIsNone(cm.unraisable)
1129            elif cm.unraisable is not None:
1130                self.assertEqual(cm.unraisable.exc_type, OSError)
1131
1132    def test_repr(self):
1133        raw = self.MockRawIO()
1134        b = self.tp(raw)
1135        clsname = r"(%s\.)?%s" % (self.tp.__module__, self.tp.__qualname__)
1136        self.assertRegex(repr(b), "<%s>" % clsname)
1137        raw.name = "dummy"
1138        self.assertRegex(repr(b), "<%s name='dummy'>" % clsname)
1139        raw.name = b"dummy"
1140        self.assertRegex(repr(b), "<%s name=b'dummy'>" % clsname)
1141
1142    def test_recursive_repr(self):
1143        # Issue #25455
1144        raw = self.MockRawIO()
1145        b = self.tp(raw)
1146        with support.swap_attr(raw, 'name', b):
1147            try:
1148                repr(b)  # Should not crash
1149            except RuntimeError:
1150                pass
1151
1152    def test_flush_error_on_close(self):
1153        # Test that buffered file is closed despite failed flush
1154        # and that flush() is called before file closed.
1155        raw = self.MockRawIO()
1156        closed = []
1157        def bad_flush():
1158            closed[:] = [b.closed, raw.closed]
1159            raise OSError()
1160        raw.flush = bad_flush
1161        b = self.tp(raw)
1162        self.assertRaises(OSError, b.close) # exception not swallowed
1163        self.assertTrue(b.closed)
1164        self.assertTrue(raw.closed)
1165        self.assertTrue(closed)      # flush() called
1166        self.assertFalse(closed[0])  # flush() called before file closed
1167        self.assertFalse(closed[1])
1168        raw.flush = lambda: None  # break reference loop
1169
1170    def test_close_error_on_close(self):
1171        raw = self.MockRawIO()
1172        def bad_flush():
1173            raise OSError('flush')
1174        def bad_close():
1175            raise OSError('close')
1176        raw.close = bad_close
1177        b = self.tp(raw)
1178        b.flush = bad_flush
1179        with self.assertRaises(OSError) as err: # exception not swallowed
1180            b.close()
1181        self.assertEqual(err.exception.args, ('close',))
1182        self.assertIsInstance(err.exception.__context__, OSError)
1183        self.assertEqual(err.exception.__context__.args, ('flush',))
1184        self.assertFalse(b.closed)
1185
1186        # Silence destructor error
1187        raw.close = lambda: None
1188        b.flush = lambda: None
1189
1190    def test_nonnormalized_close_error_on_close(self):
1191        # Issue #21677
1192        raw = self.MockRawIO()
1193        def bad_flush():
1194            raise non_existing_flush
1195        def bad_close():
1196            raise non_existing_close
1197        raw.close = bad_close
1198        b = self.tp(raw)
1199        b.flush = bad_flush
1200        with self.assertRaises(NameError) as err: # exception not swallowed
1201            b.close()
1202        self.assertIn('non_existing_close', str(err.exception))
1203        self.assertIsInstance(err.exception.__context__, NameError)
1204        self.assertIn('non_existing_flush', str(err.exception.__context__))
1205        self.assertFalse(b.closed)
1206
1207        # Silence destructor error
1208        b.flush = lambda: None
1209        raw.close = lambda: None
1210
1211    def test_multi_close(self):
1212        raw = self.MockRawIO()
1213        b = self.tp(raw)
1214        b.close()
1215        b.close()
1216        b.close()
1217        self.assertRaises(ValueError, b.flush)
1218
1219    def test_unseekable(self):
1220        bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1221        self.assertRaises(self.UnsupportedOperation, bufio.tell)
1222        self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1223
1224    def test_readonly_attributes(self):
1225        raw = self.MockRawIO()
1226        buf = self.tp(raw)
1227        x = self.MockRawIO()
1228        with self.assertRaises(AttributeError):
1229            buf.raw = x
1230
1231
1232class SizeofTest:
1233
1234    @support.cpython_only
1235    def test_sizeof(self):
1236        bufsize1 = 4096
1237        bufsize2 = 8192
1238        rawio = self.MockRawIO()
1239        bufio = self.tp(rawio, buffer_size=bufsize1)
1240        size = sys.getsizeof(bufio) - bufsize1
1241        rawio = self.MockRawIO()
1242        bufio = self.tp(rawio, buffer_size=bufsize2)
1243        self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1244
1245    @support.cpython_only
1246    def test_buffer_freeing(self) :
1247        bufsize = 4096
1248        rawio = self.MockRawIO()
1249        bufio = self.tp(rawio, buffer_size=bufsize)
1250        size = sys.getsizeof(bufio) - bufsize
1251        bufio.close()
1252        self.assertEqual(sys.getsizeof(bufio), size)
1253
1254class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1255    read_mode = "rb"
1256
1257    def test_constructor(self):
1258        rawio = self.MockRawIO([b"abc"])
1259        bufio = self.tp(rawio)
1260        bufio.__init__(rawio)
1261        bufio.__init__(rawio, buffer_size=1024)
1262        bufio.__init__(rawio, buffer_size=16)
1263        self.assertEqual(b"abc", bufio.read())
1264        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1265        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1266        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1267        rawio = self.MockRawIO([b"abc"])
1268        bufio.__init__(rawio)
1269        self.assertEqual(b"abc", bufio.read())
1270
1271    def test_uninitialized(self):
1272        bufio = self.tp.__new__(self.tp)
1273        del bufio
1274        bufio = self.tp.__new__(self.tp)
1275        self.assertRaisesRegex((ValueError, AttributeError),
1276                               'uninitialized|has no attribute',
1277                               bufio.read, 0)
1278        bufio.__init__(self.MockRawIO())
1279        self.assertEqual(bufio.read(0), b'')
1280
1281    def test_read(self):
1282        for arg in (None, 7):
1283            rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1284            bufio = self.tp(rawio)
1285            self.assertEqual(b"abcdefg", bufio.read(arg))
1286        # Invalid args
1287        self.assertRaises(ValueError, bufio.read, -2)
1288
1289    def test_read1(self):
1290        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1291        bufio = self.tp(rawio)
1292        self.assertEqual(b"a", bufio.read(1))
1293        self.assertEqual(b"b", bufio.read1(1))
1294        self.assertEqual(rawio._reads, 1)
1295        self.assertEqual(b"", bufio.read1(0))
1296        self.assertEqual(b"c", bufio.read1(100))
1297        self.assertEqual(rawio._reads, 1)
1298        self.assertEqual(b"d", bufio.read1(100))
1299        self.assertEqual(rawio._reads, 2)
1300        self.assertEqual(b"efg", bufio.read1(100))
1301        self.assertEqual(rawio._reads, 3)
1302        self.assertEqual(b"", bufio.read1(100))
1303        self.assertEqual(rawio._reads, 4)
1304
1305    def test_read1_arbitrary(self):
1306        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1307        bufio = self.tp(rawio)
1308        self.assertEqual(b"a", bufio.read(1))
1309        self.assertEqual(b"bc", bufio.read1())
1310        self.assertEqual(b"d", bufio.read1())
1311        self.assertEqual(b"efg", bufio.read1(-1))
1312        self.assertEqual(rawio._reads, 3)
1313        self.assertEqual(b"", bufio.read1())
1314        self.assertEqual(rawio._reads, 4)
1315
1316    def test_readinto(self):
1317        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1318        bufio = self.tp(rawio)
1319        b = bytearray(2)
1320        self.assertEqual(bufio.readinto(b), 2)
1321        self.assertEqual(b, b"ab")
1322        self.assertEqual(bufio.readinto(b), 2)
1323        self.assertEqual(b, b"cd")
1324        self.assertEqual(bufio.readinto(b), 2)
1325        self.assertEqual(b, b"ef")
1326        self.assertEqual(bufio.readinto(b), 1)
1327        self.assertEqual(b, b"gf")
1328        self.assertEqual(bufio.readinto(b), 0)
1329        self.assertEqual(b, b"gf")
1330        rawio = self.MockRawIO((b"abc", None))
1331        bufio = self.tp(rawio)
1332        self.assertEqual(bufio.readinto(b), 2)
1333        self.assertEqual(b, b"ab")
1334        self.assertEqual(bufio.readinto(b), 1)
1335        self.assertEqual(b, b"cb")
1336
1337    def test_readinto1(self):
1338        buffer_size = 10
1339        rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1340        bufio = self.tp(rawio, buffer_size=buffer_size)
1341        b = bytearray(2)
1342        self.assertEqual(bufio.peek(3), b'abc')
1343        self.assertEqual(rawio._reads, 1)
1344        self.assertEqual(bufio.readinto1(b), 2)
1345        self.assertEqual(b, b"ab")
1346        self.assertEqual(rawio._reads, 1)
1347        self.assertEqual(bufio.readinto1(b), 1)
1348        self.assertEqual(b[:1], b"c")
1349        self.assertEqual(rawio._reads, 1)
1350        self.assertEqual(bufio.readinto1(b), 2)
1351        self.assertEqual(b, b"de")
1352        self.assertEqual(rawio._reads, 2)
1353        b = bytearray(2*buffer_size)
1354        self.assertEqual(bufio.peek(3), b'fgh')
1355        self.assertEqual(rawio._reads, 3)
1356        self.assertEqual(bufio.readinto1(b), 6)
1357        self.assertEqual(b[:6], b"fghjkl")
1358        self.assertEqual(rawio._reads, 4)
1359
1360    def test_readinto_array(self):
1361        buffer_size = 60
1362        data = b"a" * 26
1363        rawio = self.MockRawIO((data,))
1364        bufio = self.tp(rawio, buffer_size=buffer_size)
1365
1366        # Create an array with element size > 1 byte
1367        b = array.array('i', b'x' * 32)
1368        assert len(b) != 16
1369
1370        # Read into it. We should get as many *bytes* as we can fit into b
1371        # (which is more than the number of elements)
1372        n = bufio.readinto(b)
1373        self.assertGreater(n, len(b))
1374
1375        # Check that old contents of b are preserved
1376        bm = memoryview(b).cast('B')
1377        self.assertLess(n, len(bm))
1378        self.assertEqual(bm[:n], data[:n])
1379        self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1380
1381    def test_readinto1_array(self):
1382        buffer_size = 60
1383        data = b"a" * 26
1384        rawio = self.MockRawIO((data,))
1385        bufio = self.tp(rawio, buffer_size=buffer_size)
1386
1387        # Create an array with element size > 1 byte
1388        b = array.array('i', b'x' * 32)
1389        assert len(b) != 16
1390
1391        # Read into it. We should get as many *bytes* as we can fit into b
1392        # (which is more than the number of elements)
1393        n = bufio.readinto1(b)
1394        self.assertGreater(n, len(b))
1395
1396        # Check that old contents of b are preserved
1397        bm = memoryview(b).cast('B')
1398        self.assertLess(n, len(bm))
1399        self.assertEqual(bm[:n], data[:n])
1400        self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1401
1402    def test_readlines(self):
1403        def bufio():
1404            rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1405            return self.tp(rawio)
1406        self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1407        self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1408        self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
1409
1410    def test_buffering(self):
1411        data = b"abcdefghi"
1412        dlen = len(data)
1413
1414        tests = [
1415            [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1416            [ 100, [ 3, 3, 3],     [ dlen ]    ],
1417            [   4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1418        ]
1419
1420        for bufsize, buf_read_sizes, raw_read_sizes in tests:
1421            rawio = self.MockFileIO(data)
1422            bufio = self.tp(rawio, buffer_size=bufsize)
1423            pos = 0
1424            for nbytes in buf_read_sizes:
1425                self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
1426                pos += nbytes
1427            # this is mildly implementation-dependent
1428            self.assertEqual(rawio.read_history, raw_read_sizes)
1429
1430    def test_read_non_blocking(self):
1431        # Inject some None's in there to simulate EWOULDBLOCK
1432        rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1433        bufio = self.tp(rawio)
1434        self.assertEqual(b"abcd", bufio.read(6))
1435        self.assertEqual(b"e", bufio.read(1))
1436        self.assertEqual(b"fg", bufio.read())
1437        self.assertEqual(b"", bufio.peek(1))
1438        self.assertIsNone(bufio.read())
1439        self.assertEqual(b"", bufio.read())
1440
1441        rawio = self.MockRawIO((b"a", None, None))
1442        self.assertEqual(b"a", rawio.readall())
1443        self.assertIsNone(rawio.readall())
1444
1445    def test_read_past_eof(self):
1446        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1447        bufio = self.tp(rawio)
1448
1449        self.assertEqual(b"abcdefg", bufio.read(9000))
1450
1451    def test_read_all(self):
1452        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1453        bufio = self.tp(rawio)
1454
1455        self.assertEqual(b"abcdefg", bufio.read())
1456
1457    @support.requires_resource('cpu')
1458    def test_threads(self):
1459        try:
1460            # Write out many bytes with exactly the same number of 0's,
1461            # 1's... 255's. This will help us check that concurrent reading
1462            # doesn't duplicate or forget contents.
1463            N = 1000
1464            l = list(range(256)) * N
1465            random.shuffle(l)
1466            s = bytes(bytearray(l))
1467            with self.open(os_helper.TESTFN, "wb") as f:
1468                f.write(s)
1469            with self.open(os_helper.TESTFN, self.read_mode, buffering=0) as raw:
1470                bufio = self.tp(raw, 8)
1471                errors = []
1472                results = []
1473                def f():
1474                    try:
1475                        # Intra-buffer read then buffer-flushing read
1476                        for n in cycle([1, 19]):
1477                            s = bufio.read(n)
1478                            if not s:
1479                                break
1480                            # list.append() is atomic
1481                            results.append(s)
1482                    except Exception as e:
1483                        errors.append(e)
1484                        raise
1485                threads = [threading.Thread(target=f) for x in range(20)]
1486                with threading_helper.start_threads(threads):
1487                    time.sleep(0.02) # yield
1488                self.assertFalse(errors,
1489                    "the following exceptions were caught: %r" % errors)
1490                s = b''.join(results)
1491                for i in range(256):
1492                    c = bytes(bytearray([i]))
1493                    self.assertEqual(s.count(c), N)
1494        finally:
1495            os_helper.unlink(os_helper.TESTFN)
1496
1497    def test_unseekable(self):
1498        bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1499        self.assertRaises(self.UnsupportedOperation, bufio.tell)
1500        self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1501        bufio.read(1)
1502        self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1503        self.assertRaises(self.UnsupportedOperation, bufio.tell)
1504
1505    def test_misbehaved_io(self):
1506        rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1507        bufio = self.tp(rawio)
1508        self.assertRaises(OSError, bufio.seek, 0)
1509        self.assertRaises(OSError, bufio.tell)
1510
1511        # Silence destructor error
1512        bufio.close = lambda: None
1513
1514    def test_no_extraneous_read(self):
1515        # Issue #9550; when the raw IO object has satisfied the read request,
1516        # we should not issue any additional reads, otherwise it may block
1517        # (e.g. socket).
1518        bufsize = 16
1519        for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1520            rawio = self.MockRawIO([b"x" * n])
1521            bufio = self.tp(rawio, bufsize)
1522            self.assertEqual(bufio.read(n), b"x" * n)
1523            # Simple case: one raw read is enough to satisfy the request.
1524            self.assertEqual(rawio._extraneous_reads, 0,
1525                             "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1526            # A more complex case where two raw reads are needed to satisfy
1527            # the request.
1528            rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1529            bufio = self.tp(rawio, bufsize)
1530            self.assertEqual(bufio.read(n), b"x" * n)
1531            self.assertEqual(rawio._extraneous_reads, 0,
1532                             "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1533
1534    def test_read_on_closed(self):
1535        # Issue #23796
1536        b = io.BufferedReader(io.BytesIO(b"12"))
1537        b.read(1)
1538        b.close()
1539        self.assertRaises(ValueError, b.peek)
1540        self.assertRaises(ValueError, b.read1, 1)
1541
1542    def test_truncate_on_read_only(self):
1543        rawio = self.MockFileIO(b"abc")
1544        bufio = self.tp(rawio)
1545        self.assertFalse(bufio.writable())
1546        self.assertRaises(self.UnsupportedOperation, bufio.truncate)
1547        self.assertRaises(self.UnsupportedOperation, bufio.truncate, 0)
1548
1549
1550class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
1551    tp = io.BufferedReader
1552
1553    @unittest.skipIf(MEMORY_SANITIZER or ADDRESS_SANITIZER, "sanitizer defaults to crashing "
1554                     "instead of returning NULL for malloc failure.")
1555    def test_constructor(self):
1556        BufferedReaderTest.test_constructor(self)
1557        # The allocation can succeed on 32-bit builds, e.g. with more
1558        # than 2 GiB RAM and a 64-bit kernel.
1559        if sys.maxsize > 0x7FFFFFFF:
1560            rawio = self.MockRawIO()
1561            bufio = self.tp(rawio)
1562            self.assertRaises((OverflowError, MemoryError, ValueError),
1563                bufio.__init__, rawio, sys.maxsize)
1564
1565    def test_initialization(self):
1566        rawio = self.MockRawIO([b"abc"])
1567        bufio = self.tp(rawio)
1568        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1569        self.assertRaises(ValueError, bufio.read)
1570        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1571        self.assertRaises(ValueError, bufio.read)
1572        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1573        self.assertRaises(ValueError, bufio.read)
1574
1575    def test_misbehaved_io_read(self):
1576        rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1577        bufio = self.tp(rawio)
1578        # _pyio.BufferedReader seems to implement reading different, so that
1579        # checking this is not so easy.
1580        self.assertRaises(OSError, bufio.read, 10)
1581
1582    def test_garbage_collection(self):
1583        # C BufferedReader objects are collected.
1584        # The Python version has __del__, so it ends into gc.garbage instead
1585        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1586        with warnings_helper.check_warnings(('', ResourceWarning)):
1587            rawio = self.FileIO(os_helper.TESTFN, "w+b")
1588            f = self.tp(rawio)
1589            f.f = f
1590            wr = weakref.ref(f)
1591            del f
1592            support.gc_collect()
1593        self.assertIsNone(wr(), wr)
1594
1595    def test_args_error(self):
1596        # Issue #17275
1597        with self.assertRaisesRegex(TypeError, "BufferedReader"):
1598            self.tp(io.BytesIO(), 1024, 1024, 1024)
1599
1600    def test_bad_readinto_value(self):
1601        rawio = io.BufferedReader(io.BytesIO(b"12"))
1602        rawio.readinto = lambda buf: -1
1603        bufio = self.tp(rawio)
1604        with self.assertRaises(OSError) as cm:
1605            bufio.readline()
1606        self.assertIsNone(cm.exception.__cause__)
1607
1608    def test_bad_readinto_type(self):
1609        rawio = io.BufferedReader(io.BytesIO(b"12"))
1610        rawio.readinto = lambda buf: b''
1611        bufio = self.tp(rawio)
1612        with self.assertRaises(OSError) as cm:
1613            bufio.readline()
1614        self.assertIsInstance(cm.exception.__cause__, TypeError)
1615
1616
1617class PyBufferedReaderTest(BufferedReaderTest):
1618    tp = pyio.BufferedReader
1619
1620
1621class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1622    write_mode = "wb"
1623
1624    def test_constructor(self):
1625        rawio = self.MockRawIO()
1626        bufio = self.tp(rawio)
1627        bufio.__init__(rawio)
1628        bufio.__init__(rawio, buffer_size=1024)
1629        bufio.__init__(rawio, buffer_size=16)
1630        self.assertEqual(3, bufio.write(b"abc"))
1631        bufio.flush()
1632        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1633        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1634        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1635        bufio.__init__(rawio)
1636        self.assertEqual(3, bufio.write(b"ghi"))
1637        bufio.flush()
1638        self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
1639
1640    def test_uninitialized(self):
1641        bufio = self.tp.__new__(self.tp)
1642        del bufio
1643        bufio = self.tp.__new__(self.tp)
1644        self.assertRaisesRegex((ValueError, AttributeError),
1645                               'uninitialized|has no attribute',
1646                               bufio.write, b'')
1647        bufio.__init__(self.MockRawIO())
1648        self.assertEqual(bufio.write(b''), 0)
1649
1650    def test_detach_flush(self):
1651        raw = self.MockRawIO()
1652        buf = self.tp(raw)
1653        buf.write(b"howdy!")
1654        self.assertFalse(raw._write_stack)
1655        buf.detach()
1656        self.assertEqual(raw._write_stack, [b"howdy!"])
1657
1658    def test_write(self):
1659        # Write to the buffered IO but don't overflow the buffer.
1660        writer = self.MockRawIO()
1661        bufio = self.tp(writer, 8)
1662        bufio.write(b"abc")
1663        self.assertFalse(writer._write_stack)
1664        buffer = bytearray(b"def")
1665        bufio.write(buffer)
1666        buffer[:] = b"***"  # Overwrite our copy of the data
1667        bufio.flush()
1668        self.assertEqual(b"".join(writer._write_stack), b"abcdef")
1669
1670    def test_write_overflow(self):
1671        writer = self.MockRawIO()
1672        bufio = self.tp(writer, 8)
1673        contents = b"abcdefghijklmnop"
1674        for n in range(0, len(contents), 3):
1675            bufio.write(contents[n:n+3])
1676        flushed = b"".join(writer._write_stack)
1677        # At least (total - 8) bytes were implicitly flushed, perhaps more
1678        # depending on the implementation.
1679        self.assertTrue(flushed.startswith(contents[:-8]), flushed)
1680
1681    def check_writes(self, intermediate_func):
1682        # Lots of writes, test the flushed output is as expected.
1683        contents = bytes(range(256)) * 1000
1684        n = 0
1685        writer = self.MockRawIO()
1686        bufio = self.tp(writer, 13)
1687        # Generator of write sizes: repeat each N 15 times then proceed to N+1
1688        def gen_sizes():
1689            for size in count(1):
1690                for i in range(15):
1691                    yield size
1692        sizes = gen_sizes()
1693        while n < len(contents):
1694            size = min(next(sizes), len(contents) - n)
1695            self.assertEqual(bufio.write(contents[n:n+size]), size)
1696            intermediate_func(bufio)
1697            n += size
1698        bufio.flush()
1699        self.assertEqual(contents, b"".join(writer._write_stack))
1700
1701    def test_writes(self):
1702        self.check_writes(lambda bufio: None)
1703
1704    def test_writes_and_flushes(self):
1705        self.check_writes(lambda bufio: bufio.flush())
1706
1707    def test_writes_and_seeks(self):
1708        def _seekabs(bufio):
1709            pos = bufio.tell()
1710            bufio.seek(pos + 1, 0)
1711            bufio.seek(pos - 1, 0)
1712            bufio.seek(pos, 0)
1713        self.check_writes(_seekabs)
1714        def _seekrel(bufio):
1715            pos = bufio.seek(0, 1)
1716            bufio.seek(+1, 1)
1717            bufio.seek(-1, 1)
1718            bufio.seek(pos, 0)
1719        self.check_writes(_seekrel)
1720
1721    def test_writes_and_truncates(self):
1722        self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
1723
1724    def test_write_non_blocking(self):
1725        raw = self.MockNonBlockWriterIO()
1726        bufio = self.tp(raw, 8)
1727
1728        self.assertEqual(bufio.write(b"abcd"), 4)
1729        self.assertEqual(bufio.write(b"efghi"), 5)
1730        # 1 byte will be written, the rest will be buffered
1731        raw.block_on(b"k")
1732        self.assertEqual(bufio.write(b"jklmn"), 5)
1733
1734        # 8 bytes will be written, 8 will be buffered and the rest will be lost
1735        raw.block_on(b"0")
1736        try:
1737            bufio.write(b"opqrwxyz0123456789")
1738        except self.BlockingIOError as e:
1739            written = e.characters_written
1740        else:
1741            self.fail("BlockingIOError should have been raised")
1742        self.assertEqual(written, 16)
1743        self.assertEqual(raw.pop_written(),
1744            b"abcdefghijklmnopqrwxyz")
1745
1746        self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
1747        s = raw.pop_written()
1748        # Previously buffered bytes were flushed
1749        self.assertTrue(s.startswith(b"01234567A"), s)
1750
1751    def test_write_and_rewind(self):
1752        raw = io.BytesIO()
1753        bufio = self.tp(raw, 4)
1754        self.assertEqual(bufio.write(b"abcdef"), 6)
1755        self.assertEqual(bufio.tell(), 6)
1756        bufio.seek(0, 0)
1757        self.assertEqual(bufio.write(b"XY"), 2)
1758        bufio.seek(6, 0)
1759        self.assertEqual(raw.getvalue(), b"XYcdef")
1760        self.assertEqual(bufio.write(b"123456"), 6)
1761        bufio.flush()
1762        self.assertEqual(raw.getvalue(), b"XYcdef123456")
1763
1764    def test_flush(self):
1765        writer = self.MockRawIO()
1766        bufio = self.tp(writer, 8)
1767        bufio.write(b"abc")
1768        bufio.flush()
1769        self.assertEqual(b"abc", writer._write_stack[0])
1770
1771    def test_writelines(self):
1772        l = [b'ab', b'cd', b'ef']
1773        writer = self.MockRawIO()
1774        bufio = self.tp(writer, 8)
1775        bufio.writelines(l)
1776        bufio.flush()
1777        self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1778
1779    def test_writelines_userlist(self):
1780        l = UserList([b'ab', b'cd', b'ef'])
1781        writer = self.MockRawIO()
1782        bufio = self.tp(writer, 8)
1783        bufio.writelines(l)
1784        bufio.flush()
1785        self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1786
1787    def test_writelines_error(self):
1788        writer = self.MockRawIO()
1789        bufio = self.tp(writer, 8)
1790        self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1791        self.assertRaises(TypeError, bufio.writelines, None)
1792        self.assertRaises(TypeError, bufio.writelines, 'abc')
1793
1794    def test_destructor(self):
1795        writer = self.MockRawIO()
1796        bufio = self.tp(writer, 8)
1797        bufio.write(b"abc")
1798        del bufio
1799        support.gc_collect()
1800        self.assertEqual(b"abc", writer._write_stack[0])
1801
1802    def test_truncate(self):
1803        # Truncate implicitly flushes the buffer.
1804        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1805        with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw:
1806            bufio = self.tp(raw, 8)
1807            bufio.write(b"abcdef")
1808            self.assertEqual(bufio.truncate(3), 3)
1809            self.assertEqual(bufio.tell(), 6)
1810        with self.open(os_helper.TESTFN, "rb", buffering=0) as f:
1811            self.assertEqual(f.read(), b"abc")
1812
1813    def test_truncate_after_write(self):
1814        # Ensure that truncate preserves the file position after
1815        # writes longer than the buffer size.
1816        # Issue: https://bugs.python.org/issue32228
1817        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1818        with self.open(os_helper.TESTFN, "wb") as f:
1819            # Fill with some buffer
1820            f.write(b'\x00' * 10000)
1821        buffer_sizes = [8192, 4096, 200]
1822        for buffer_size in buffer_sizes:
1823            with self.open(os_helper.TESTFN, "r+b", buffering=buffer_size) as f:
1824                f.write(b'\x00' * (buffer_size + 1))
1825                # After write write_pos and write_end are set to 0
1826                f.read(1)
1827                # read operation makes sure that pos != raw_pos
1828                f.truncate()
1829                self.assertEqual(f.tell(), buffer_size + 2)
1830
1831    @support.requires_resource('cpu')
1832    def test_threads(self):
1833        try:
1834            # Write out many bytes from many threads and test they were
1835            # all flushed.
1836            N = 1000
1837            contents = bytes(range(256)) * N
1838            sizes = cycle([1, 19])
1839            n = 0
1840            queue = deque()
1841            while n < len(contents):
1842                size = next(sizes)
1843                queue.append(contents[n:n+size])
1844                n += size
1845            del contents
1846            # We use a real file object because it allows us to
1847            # exercise situations where the GIL is released before
1848            # writing the buffer to the raw streams. This is in addition
1849            # to concurrency issues due to switching threads in the middle
1850            # of Python code.
1851            with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw:
1852                bufio = self.tp(raw, 8)
1853                errors = []
1854                def f():
1855                    try:
1856                        while True:
1857                            try:
1858                                s = queue.popleft()
1859                            except IndexError:
1860                                return
1861                            bufio.write(s)
1862                    except Exception as e:
1863                        errors.append(e)
1864                        raise
1865                threads = [threading.Thread(target=f) for x in range(20)]
1866                with threading_helper.start_threads(threads):
1867                    time.sleep(0.02) # yield
1868                self.assertFalse(errors,
1869                    "the following exceptions were caught: %r" % errors)
1870                bufio.close()
1871            with self.open(os_helper.TESTFN, "rb") as f:
1872                s = f.read()
1873            for i in range(256):
1874                self.assertEqual(s.count(bytes([i])), N)
1875        finally:
1876            os_helper.unlink(os_helper.TESTFN)
1877
1878    def test_misbehaved_io(self):
1879        rawio = self.MisbehavedRawIO()
1880        bufio = self.tp(rawio, 5)
1881        self.assertRaises(OSError, bufio.seek, 0)
1882        self.assertRaises(OSError, bufio.tell)
1883        self.assertRaises(OSError, bufio.write, b"abcdef")
1884
1885        # Silence destructor error
1886        bufio.close = lambda: None
1887
1888    def test_max_buffer_size_removal(self):
1889        with self.assertRaises(TypeError):
1890            self.tp(self.MockRawIO(), 8, 12)
1891
1892    def test_write_error_on_close(self):
1893        raw = self.MockRawIO()
1894        def bad_write(b):
1895            raise OSError()
1896        raw.write = bad_write
1897        b = self.tp(raw)
1898        b.write(b'spam')
1899        self.assertRaises(OSError, b.close) # exception not swallowed
1900        self.assertTrue(b.closed)
1901
1902    def test_slow_close_from_thread(self):
1903        # Issue #31976
1904        rawio = self.SlowFlushRawIO()
1905        bufio = self.tp(rawio, 8)
1906        t = threading.Thread(target=bufio.close)
1907        t.start()
1908        rawio.in_flush.wait()
1909        self.assertRaises(ValueError, bufio.write, b'spam')
1910        self.assertTrue(bufio.closed)
1911        t.join()
1912
1913
1914
1915class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
1916    tp = io.BufferedWriter
1917
1918    @unittest.skipIf(MEMORY_SANITIZER or ADDRESS_SANITIZER, "sanitizer defaults to crashing "
1919                     "instead of returning NULL for malloc failure.")
1920    def test_constructor(self):
1921        BufferedWriterTest.test_constructor(self)
1922        # The allocation can succeed on 32-bit builds, e.g. with more
1923        # than 2 GiB RAM and a 64-bit kernel.
1924        if sys.maxsize > 0x7FFFFFFF:
1925            rawio = self.MockRawIO()
1926            bufio = self.tp(rawio)
1927            self.assertRaises((OverflowError, MemoryError, ValueError),
1928                bufio.__init__, rawio, sys.maxsize)
1929
1930    def test_initialization(self):
1931        rawio = self.MockRawIO()
1932        bufio = self.tp(rawio)
1933        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1934        self.assertRaises(ValueError, bufio.write, b"def")
1935        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1936        self.assertRaises(ValueError, bufio.write, b"def")
1937        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1938        self.assertRaises(ValueError, bufio.write, b"def")
1939
1940    def test_garbage_collection(self):
1941        # C BufferedWriter objects are collected, and collecting them flushes
1942        # all data to disk.
1943        # The Python version has __del__, so it ends into gc.garbage instead
1944        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1945        with warnings_helper.check_warnings(('', ResourceWarning)):
1946            rawio = self.FileIO(os_helper.TESTFN, "w+b")
1947            f = self.tp(rawio)
1948            f.write(b"123xxx")
1949            f.x = f
1950            wr = weakref.ref(f)
1951            del f
1952            support.gc_collect()
1953        self.assertIsNone(wr(), wr)
1954        with self.open(os_helper.TESTFN, "rb") as f:
1955            self.assertEqual(f.read(), b"123xxx")
1956
1957    def test_args_error(self):
1958        # Issue #17275
1959        with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1960            self.tp(io.BytesIO(), 1024, 1024, 1024)
1961
1962
1963class PyBufferedWriterTest(BufferedWriterTest):
1964    tp = pyio.BufferedWriter
1965
1966class BufferedRWPairTest(unittest.TestCase):
1967
1968    def test_constructor(self):
1969        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1970        self.assertFalse(pair.closed)
1971
1972    def test_uninitialized(self):
1973        pair = self.tp.__new__(self.tp)
1974        del pair
1975        pair = self.tp.__new__(self.tp)
1976        self.assertRaisesRegex((ValueError, AttributeError),
1977                               'uninitialized|has no attribute',
1978                               pair.read, 0)
1979        self.assertRaisesRegex((ValueError, AttributeError),
1980                               'uninitialized|has no attribute',
1981                               pair.write, b'')
1982        pair.__init__(self.MockRawIO(), self.MockRawIO())
1983        self.assertEqual(pair.read(0), b'')
1984        self.assertEqual(pair.write(b''), 0)
1985
1986    def test_detach(self):
1987        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1988        self.assertRaises(self.UnsupportedOperation, pair.detach)
1989
1990    def test_constructor_max_buffer_size_removal(self):
1991        with self.assertRaises(TypeError):
1992            self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1993
1994    def test_constructor_with_not_readable(self):
1995        class NotReadable(MockRawIO):
1996            def readable(self):
1997                return False
1998
1999        self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
2000
2001    def test_constructor_with_not_writeable(self):
2002        class NotWriteable(MockRawIO):
2003            def writable(self):
2004                return False
2005
2006        self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
2007
2008    def test_read(self):
2009        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2010
2011        self.assertEqual(pair.read(3), b"abc")
2012        self.assertEqual(pair.read(1), b"d")
2013        self.assertEqual(pair.read(), b"ef")
2014        pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
2015        self.assertEqual(pair.read(None), b"abc")
2016
2017    def test_readlines(self):
2018        pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
2019        self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
2020        self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
2021        self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
2022
2023    def test_read1(self):
2024        # .read1() is delegated to the underlying reader object, so this test
2025        # can be shallow.
2026        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2027
2028        self.assertEqual(pair.read1(3), b"abc")
2029        self.assertEqual(pair.read1(), b"def")
2030
2031    def test_readinto(self):
2032        for method in ("readinto", "readinto1"):
2033            with self.subTest(method):
2034                pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2035
2036                data = byteslike(b'\0' * 5)
2037                self.assertEqual(getattr(pair, method)(data), 5)
2038                self.assertEqual(bytes(data), b"abcde")
2039
2040    def test_write(self):
2041        w = self.MockRawIO()
2042        pair = self.tp(self.MockRawIO(), w)
2043
2044        pair.write(b"abc")
2045        pair.flush()
2046        buffer = bytearray(b"def")
2047        pair.write(buffer)
2048        buffer[:] = b"***"  # Overwrite our copy of the data
2049        pair.flush()
2050        self.assertEqual(w._write_stack, [b"abc", b"def"])
2051
2052    def test_peek(self):
2053        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2054
2055        self.assertTrue(pair.peek(3).startswith(b"abc"))
2056        self.assertEqual(pair.read(3), b"abc")
2057
2058    def test_readable(self):
2059        pair = self.tp(self.MockRawIO(), self.MockRawIO())
2060        self.assertTrue(pair.readable())
2061
2062    def test_writeable(self):
2063        pair = self.tp(self.MockRawIO(), self.MockRawIO())
2064        self.assertTrue(pair.writable())
2065
2066    def test_seekable(self):
2067        # BufferedRWPairs are never seekable, even if their readers and writers
2068        # are.
2069        pair = self.tp(self.MockRawIO(), self.MockRawIO())
2070        self.assertFalse(pair.seekable())
2071
2072    # .flush() is delegated to the underlying writer object and has been
2073    # tested in the test_write method.
2074
2075    def test_close_and_closed(self):
2076        pair = self.tp(self.MockRawIO(), self.MockRawIO())
2077        self.assertFalse(pair.closed)
2078        pair.close()
2079        self.assertTrue(pair.closed)
2080
2081    def test_reader_close_error_on_close(self):
2082        def reader_close():
2083            reader_non_existing
2084        reader = self.MockRawIO()
2085        reader.close = reader_close
2086        writer = self.MockRawIO()
2087        pair = self.tp(reader, writer)
2088        with self.assertRaises(NameError) as err:
2089            pair.close()
2090        self.assertIn('reader_non_existing', str(err.exception))
2091        self.assertTrue(pair.closed)
2092        self.assertFalse(reader.closed)
2093        self.assertTrue(writer.closed)
2094
2095        # Silence destructor error
2096        reader.close = lambda: None
2097
2098    def test_writer_close_error_on_close(self):
2099        def writer_close():
2100            writer_non_existing
2101        reader = self.MockRawIO()
2102        writer = self.MockRawIO()
2103        writer.close = writer_close
2104        pair = self.tp(reader, writer)
2105        with self.assertRaises(NameError) as err:
2106            pair.close()
2107        self.assertIn('writer_non_existing', str(err.exception))
2108        self.assertFalse(pair.closed)
2109        self.assertTrue(reader.closed)
2110        self.assertFalse(writer.closed)
2111
2112        # Silence destructor error
2113        writer.close = lambda: None
2114        writer = None
2115
2116        # Ignore BufferedWriter (of the BufferedRWPair) unraisable exception
2117        with support.catch_unraisable_exception():
2118            # Ignore BufferedRWPair unraisable exception
2119            with support.catch_unraisable_exception():
2120                pair = None
2121                support.gc_collect()
2122            support.gc_collect()
2123
2124    def test_reader_writer_close_error_on_close(self):
2125        def reader_close():
2126            reader_non_existing
2127        def writer_close():
2128            writer_non_existing
2129        reader = self.MockRawIO()
2130        reader.close = reader_close
2131        writer = self.MockRawIO()
2132        writer.close = writer_close
2133        pair = self.tp(reader, writer)
2134        with self.assertRaises(NameError) as err:
2135            pair.close()
2136        self.assertIn('reader_non_existing', str(err.exception))
2137        self.assertIsInstance(err.exception.__context__, NameError)
2138        self.assertIn('writer_non_existing', str(err.exception.__context__))
2139        self.assertFalse(pair.closed)
2140        self.assertFalse(reader.closed)
2141        self.assertFalse(writer.closed)
2142
2143        # Silence destructor error
2144        reader.close = lambda: None
2145        writer.close = lambda: None
2146
2147    def test_isatty(self):
2148        class SelectableIsAtty(MockRawIO):
2149            def __init__(self, isatty):
2150                MockRawIO.__init__(self)
2151                self._isatty = isatty
2152
2153            def isatty(self):
2154                return self._isatty
2155
2156        pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
2157        self.assertFalse(pair.isatty())
2158
2159        pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
2160        self.assertTrue(pair.isatty())
2161
2162        pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
2163        self.assertTrue(pair.isatty())
2164
2165        pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
2166        self.assertTrue(pair.isatty())
2167
2168    def test_weakref_clearing(self):
2169        brw = self.tp(self.MockRawIO(), self.MockRawIO())
2170        ref = weakref.ref(brw)
2171        brw = None
2172        ref = None # Shouldn't segfault.
2173
2174class CBufferedRWPairTest(BufferedRWPairTest):
2175    tp = io.BufferedRWPair
2176
2177class PyBufferedRWPairTest(BufferedRWPairTest):
2178    tp = pyio.BufferedRWPair
2179
2180
2181class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
2182    read_mode = "rb+"
2183    write_mode = "wb+"
2184
2185    def test_constructor(self):
2186        BufferedReaderTest.test_constructor(self)
2187        BufferedWriterTest.test_constructor(self)
2188
2189    def test_uninitialized(self):
2190        BufferedReaderTest.test_uninitialized(self)
2191        BufferedWriterTest.test_uninitialized(self)
2192
2193    def test_read_and_write(self):
2194        raw = self.MockRawIO((b"asdf", b"ghjk"))
2195        rw = self.tp(raw, 8)
2196
2197        self.assertEqual(b"as", rw.read(2))
2198        rw.write(b"ddd")
2199        rw.write(b"eee")
2200        self.assertFalse(raw._write_stack) # Buffer writes
2201        self.assertEqual(b"ghjk", rw.read())
2202        self.assertEqual(b"dddeee", raw._write_stack[0])
2203
2204    def test_seek_and_tell(self):
2205        raw = self.BytesIO(b"asdfghjkl")
2206        rw = self.tp(raw)
2207
2208        self.assertEqual(b"as", rw.read(2))
2209        self.assertEqual(2, rw.tell())
2210        rw.seek(0, 0)
2211        self.assertEqual(b"asdf", rw.read(4))
2212
2213        rw.write(b"123f")
2214        rw.seek(0, 0)
2215        self.assertEqual(b"asdf123fl", rw.read())
2216        self.assertEqual(9, rw.tell())
2217        rw.seek(-4, 2)
2218        self.assertEqual(5, rw.tell())
2219        rw.seek(2, 1)
2220        self.assertEqual(7, rw.tell())
2221        self.assertEqual(b"fl", rw.read(11))
2222        rw.flush()
2223        self.assertEqual(b"asdf123fl", raw.getvalue())
2224
2225        self.assertRaises(TypeError, rw.seek, 0.0)
2226
2227    def check_flush_and_read(self, read_func):
2228        raw = self.BytesIO(b"abcdefghi")
2229        bufio = self.tp(raw)
2230
2231        self.assertEqual(b"ab", read_func(bufio, 2))
2232        bufio.write(b"12")
2233        self.assertEqual(b"ef", read_func(bufio, 2))
2234        self.assertEqual(6, bufio.tell())
2235        bufio.flush()
2236        self.assertEqual(6, bufio.tell())
2237        self.assertEqual(b"ghi", read_func(bufio))
2238        raw.seek(0, 0)
2239        raw.write(b"XYZ")
2240        # flush() resets the read buffer
2241        bufio.flush()
2242        bufio.seek(0, 0)
2243        self.assertEqual(b"XYZ", read_func(bufio, 3))
2244
2245    def test_flush_and_read(self):
2246        self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2247
2248    def test_flush_and_readinto(self):
2249        def _readinto(bufio, n=-1):
2250            b = bytearray(n if n >= 0 else 9999)
2251            n = bufio.readinto(b)
2252            return bytes(b[:n])
2253        self.check_flush_and_read(_readinto)
2254
2255    def test_flush_and_peek(self):
2256        def _peek(bufio, n=-1):
2257            # This relies on the fact that the buffer can contain the whole
2258            # raw stream, otherwise peek() can return less.
2259            b = bufio.peek(n)
2260            if n != -1:
2261                b = b[:n]
2262            bufio.seek(len(b), 1)
2263            return b
2264        self.check_flush_and_read(_peek)
2265
2266    def test_flush_and_write(self):
2267        raw = self.BytesIO(b"abcdefghi")
2268        bufio = self.tp(raw)
2269
2270        bufio.write(b"123")
2271        bufio.flush()
2272        bufio.write(b"45")
2273        bufio.flush()
2274        bufio.seek(0, 0)
2275        self.assertEqual(b"12345fghi", raw.getvalue())
2276        self.assertEqual(b"12345fghi", bufio.read())
2277
2278    def test_threads(self):
2279        BufferedReaderTest.test_threads(self)
2280        BufferedWriterTest.test_threads(self)
2281
2282    def test_writes_and_peek(self):
2283        def _peek(bufio):
2284            bufio.peek(1)
2285        self.check_writes(_peek)
2286        def _peek(bufio):
2287            pos = bufio.tell()
2288            bufio.seek(-1, 1)
2289            bufio.peek(1)
2290            bufio.seek(pos, 0)
2291        self.check_writes(_peek)
2292
2293    def test_writes_and_reads(self):
2294        def _read(bufio):
2295            bufio.seek(-1, 1)
2296            bufio.read(1)
2297        self.check_writes(_read)
2298
2299    def test_writes_and_read1s(self):
2300        def _read1(bufio):
2301            bufio.seek(-1, 1)
2302            bufio.read1(1)
2303        self.check_writes(_read1)
2304
2305    def test_writes_and_readintos(self):
2306        def _read(bufio):
2307            bufio.seek(-1, 1)
2308            bufio.readinto(bytearray(1))
2309        self.check_writes(_read)
2310
2311    def test_write_after_readahead(self):
2312        # Issue #6629: writing after the buffer was filled by readahead should
2313        # first rewind the raw stream.
2314        for overwrite_size in [1, 5]:
2315            raw = self.BytesIO(b"A" * 10)
2316            bufio = self.tp(raw, 4)
2317            # Trigger readahead
2318            self.assertEqual(bufio.read(1), b"A")
2319            self.assertEqual(bufio.tell(), 1)
2320            # Overwriting should rewind the raw stream if it needs so
2321            bufio.write(b"B" * overwrite_size)
2322            self.assertEqual(bufio.tell(), overwrite_size + 1)
2323            # If the write size was smaller than the buffer size, flush() and
2324            # check that rewind happens.
2325            bufio.flush()
2326            self.assertEqual(bufio.tell(), overwrite_size + 1)
2327            s = raw.getvalue()
2328            self.assertEqual(s,
2329                b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2330
2331    def test_write_rewind_write(self):
2332        # Various combinations of reading / writing / seeking backwards / writing again
2333        def mutate(bufio, pos1, pos2):
2334            assert pos2 >= pos1
2335            # Fill the buffer
2336            bufio.seek(pos1)
2337            bufio.read(pos2 - pos1)
2338            bufio.write(b'\x02')
2339            # This writes earlier than the previous write, but still inside
2340            # the buffer.
2341            bufio.seek(pos1)
2342            bufio.write(b'\x01')
2343
2344        b = b"\x80\x81\x82\x83\x84"
2345        for i in range(0, len(b)):
2346            for j in range(i, len(b)):
2347                raw = self.BytesIO(b)
2348                bufio = self.tp(raw, 100)
2349                mutate(bufio, i, j)
2350                bufio.flush()
2351                expected = bytearray(b)
2352                expected[j] = 2
2353                expected[i] = 1
2354                self.assertEqual(raw.getvalue(), expected,
2355                                 "failed result for i=%d, j=%d" % (i, j))
2356
2357    def test_truncate_after_read_or_write(self):
2358        raw = self.BytesIO(b"A" * 10)
2359        bufio = self.tp(raw, 100)
2360        self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2361        self.assertEqual(bufio.truncate(), 2)
2362        self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2363        self.assertEqual(bufio.truncate(), 4)
2364
2365    def test_misbehaved_io(self):
2366        BufferedReaderTest.test_misbehaved_io(self)
2367        BufferedWriterTest.test_misbehaved_io(self)
2368
2369    def test_interleaved_read_write(self):
2370        # Test for issue #12213
2371        with self.BytesIO(b'abcdefgh') as raw:
2372            with self.tp(raw, 100) as f:
2373                f.write(b"1")
2374                self.assertEqual(f.read(1), b'b')
2375                f.write(b'2')
2376                self.assertEqual(f.read1(1), b'd')
2377                f.write(b'3')
2378                buf = bytearray(1)
2379                f.readinto(buf)
2380                self.assertEqual(buf, b'f')
2381                f.write(b'4')
2382                self.assertEqual(f.peek(1), b'h')
2383                f.flush()
2384                self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2385
2386        with self.BytesIO(b'abc') as raw:
2387            with self.tp(raw, 100) as f:
2388                self.assertEqual(f.read(1), b'a')
2389                f.write(b"2")
2390                self.assertEqual(f.read(1), b'c')
2391                f.flush()
2392                self.assertEqual(raw.getvalue(), b'a2c')
2393
2394    def test_interleaved_readline_write(self):
2395        with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2396            with self.tp(raw) as f:
2397                f.write(b'1')
2398                self.assertEqual(f.readline(), b'b\n')
2399                f.write(b'2')
2400                self.assertEqual(f.readline(), b'def\n')
2401                f.write(b'3')
2402                self.assertEqual(f.readline(), b'\n')
2403                f.flush()
2404                self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2405
2406    # You can't construct a BufferedRandom over a non-seekable stream.
2407    test_unseekable = None
2408
2409    # writable() returns True, so there's no point to test it over
2410    # a writable stream.
2411    test_truncate_on_read_only = None
2412
2413
2414class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
2415    tp = io.BufferedRandom
2416
2417    @unittest.skipIf(MEMORY_SANITIZER or ADDRESS_SANITIZER, "sanitizer defaults to crashing "
2418                     "instead of returning NULL for malloc failure.")
2419    def test_constructor(self):
2420        BufferedRandomTest.test_constructor(self)
2421        # The allocation can succeed on 32-bit builds, e.g. with more
2422        # than 2 GiB RAM and a 64-bit kernel.
2423        if sys.maxsize > 0x7FFFFFFF:
2424            rawio = self.MockRawIO()
2425            bufio = self.tp(rawio)
2426            self.assertRaises((OverflowError, MemoryError, ValueError),
2427                bufio.__init__, rawio, sys.maxsize)
2428
2429    def test_garbage_collection(self):
2430        CBufferedReaderTest.test_garbage_collection(self)
2431        CBufferedWriterTest.test_garbage_collection(self)
2432
2433    def test_args_error(self):
2434        # Issue #17275
2435        with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2436            self.tp(io.BytesIO(), 1024, 1024, 1024)
2437
2438
2439class PyBufferedRandomTest(BufferedRandomTest):
2440    tp = pyio.BufferedRandom
2441
2442
2443# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2444# properties:
2445#   - A single output character can correspond to many bytes of input.
2446#   - The number of input bytes to complete the character can be
2447#     undetermined until the last input byte is received.
2448#   - The number of input bytes can vary depending on previous input.
2449#   - A single input byte can correspond to many characters of output.
2450#   - The number of output characters can be undetermined until the
2451#     last input byte is received.
2452#   - The number of output characters can vary depending on previous input.
2453
2454class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2455    """
2456    For testing seek/tell behavior with a stateful, buffering decoder.
2457
2458    Input is a sequence of words.  Words may be fixed-length (length set
2459    by input) or variable-length (period-terminated).  In variable-length
2460    mode, extra periods are ignored.  Possible words are:
2461      - 'i' followed by a number sets the input length, I (maximum 99).
2462        When I is set to 0, words are space-terminated.
2463      - 'o' followed by a number sets the output length, O (maximum 99).
2464      - Any other word is converted into a word followed by a period on
2465        the output.  The output word consists of the input word truncated
2466        or padded out with hyphens to make its length equal to O.  If O
2467        is 0, the word is output verbatim without truncating or padding.
2468    I and O are initially set to 1.  When I changes, any buffered input is
2469    re-scanned according to the new I.  EOF also terminates the last word.
2470    """
2471
2472    def __init__(self, errors='strict'):
2473        codecs.IncrementalDecoder.__init__(self, errors)
2474        self.reset()
2475
2476    def __repr__(self):
2477        return '<SID %x>' % id(self)
2478
2479    def reset(self):
2480        self.i = 1
2481        self.o = 1
2482        self.buffer = bytearray()
2483
2484    def getstate(self):
2485        i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2486        return bytes(self.buffer), i*100 + o
2487
2488    def setstate(self, state):
2489        buffer, io = state
2490        self.buffer = bytearray(buffer)
2491        i, o = divmod(io, 100)
2492        self.i, self.o = i ^ 1, o ^ 1
2493
2494    def decode(self, input, final=False):
2495        output = ''
2496        for b in input:
2497            if self.i == 0: # variable-length, terminated with period
2498                if b == ord('.'):
2499                    if self.buffer:
2500                        output += self.process_word()
2501                else:
2502                    self.buffer.append(b)
2503            else: # fixed-length, terminate after self.i bytes
2504                self.buffer.append(b)
2505                if len(self.buffer) == self.i:
2506                    output += self.process_word()
2507        if final and self.buffer: # EOF terminates the last word
2508            output += self.process_word()
2509        return output
2510
2511    def process_word(self):
2512        output = ''
2513        if self.buffer[0] == ord('i'):
2514            self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2515        elif self.buffer[0] == ord('o'):
2516            self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2517        else:
2518            output = self.buffer.decode('ascii')
2519            if len(output) < self.o:
2520                output += '-'*self.o # pad out with hyphens
2521            if self.o:
2522                output = output[:self.o] # truncate to output length
2523            output += '.'
2524        self.buffer = bytearray()
2525        return output
2526
2527    codecEnabled = False
2528
2529
2530# bpo-41919: This method is separated from StatefulIncrementalDecoder to avoid a resource leak
2531# when registering codecs and cleanup functions.
2532def lookupTestDecoder(name):
2533    if StatefulIncrementalDecoder.codecEnabled and name == 'test_decoder':
2534        latin1 = codecs.lookup('latin-1')
2535        return codecs.CodecInfo(
2536            name='test_decoder', encode=latin1.encode, decode=None,
2537            incrementalencoder=None,
2538            streamreader=None, streamwriter=None,
2539            incrementaldecoder=StatefulIncrementalDecoder)
2540
2541
2542class StatefulIncrementalDecoderTest(unittest.TestCase):
2543    """
2544    Make sure the StatefulIncrementalDecoder actually works.
2545    """
2546
2547    test_cases = [
2548        # I=1, O=1 (fixed-length input == fixed-length output)
2549        (b'abcd', False, 'a.b.c.d.'),
2550        # I=0, O=0 (variable-length input, variable-length output)
2551        (b'oiabcd', True, 'abcd.'),
2552        # I=0, O=0 (should ignore extra periods)
2553        (b'oi...abcd...', True, 'abcd.'),
2554        # I=0, O=6 (variable-length input, fixed-length output)
2555        (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2556        # I=2, O=6 (fixed-length input < fixed-length output)
2557        (b'i.i2.o6xyz', True, 'xy----.z-----.'),
2558        # I=6, O=3 (fixed-length input > fixed-length output)
2559        (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2560        # I=0, then 3; O=29, then 15 (with longer output)
2561        (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2562         'a----------------------------.' +
2563         'b----------------------------.' +
2564         'cde--------------------------.' +
2565         'abcdefghijabcde.' +
2566         'a.b------------.' +
2567         '.c.------------.' +
2568         'd.e------------.' +
2569         'k--------------.' +
2570         'l--------------.' +
2571         'm--------------.')
2572    ]
2573
2574    def test_decoder(self):
2575        # Try a few one-shot test cases.
2576        for input, eof, output in self.test_cases:
2577            d = StatefulIncrementalDecoder()
2578            self.assertEqual(d.decode(input, eof), output)
2579
2580        # Also test an unfinished decode, followed by forcing EOF.
2581        d = StatefulIncrementalDecoder()
2582        self.assertEqual(d.decode(b'oiabcd'), '')
2583        self.assertEqual(d.decode(b'', 1), 'abcd.')
2584
2585class TextIOWrapperTest(unittest.TestCase):
2586
2587    def setUp(self):
2588        self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2589        self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
2590        os_helper.unlink(os_helper.TESTFN)
2591        codecs.register(lookupTestDecoder)
2592        self.addCleanup(codecs.unregister, lookupTestDecoder)
2593
2594    def tearDown(self):
2595        os_helper.unlink(os_helper.TESTFN)
2596
2597    def test_constructor(self):
2598        r = self.BytesIO(b"\xc3\xa9\n\n")
2599        b = self.BufferedReader(r, 1000)
2600        t = self.TextIOWrapper(b, encoding="utf-8")
2601        t.__init__(b, encoding="latin-1", newline="\r\n")
2602        self.assertEqual(t.encoding, "latin-1")
2603        self.assertEqual(t.line_buffering, False)
2604        t.__init__(b, encoding="utf-8", line_buffering=True)
2605        self.assertEqual(t.encoding, "utf-8")
2606        self.assertEqual(t.line_buffering, True)
2607        self.assertEqual("\xe9\n", t.readline())
2608        self.assertRaises(TypeError, t.__init__, b, encoding="utf-8", newline=42)
2609        self.assertRaises(ValueError, t.__init__, b, encoding="utf-8", newline='xyzzy')
2610
2611    def test_uninitialized(self):
2612        t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2613        del t
2614        t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2615        self.assertRaises(Exception, repr, t)
2616        self.assertRaisesRegex((ValueError, AttributeError),
2617                               'uninitialized|has no attribute',
2618                               t.read, 0)
2619        t.__init__(self.MockRawIO(), encoding="utf-8")
2620        self.assertEqual(t.read(0), '')
2621
2622    def test_non_text_encoding_codecs_are_rejected(self):
2623        # Ensure the constructor complains if passed a codec that isn't
2624        # marked as a text encoding
2625        # http://bugs.python.org/issue20404
2626        r = self.BytesIO()
2627        b = self.BufferedWriter(r)
2628        with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2629            self.TextIOWrapper(b, encoding="hex")
2630
2631    def test_detach(self):
2632        r = self.BytesIO()
2633        b = self.BufferedWriter(r)
2634        t = self.TextIOWrapper(b, encoding="ascii")
2635        self.assertIs(t.detach(), b)
2636
2637        t = self.TextIOWrapper(b, encoding="ascii")
2638        t.write("howdy")
2639        self.assertFalse(r.getvalue())
2640        t.detach()
2641        self.assertEqual(r.getvalue(), b"howdy")
2642        self.assertRaises(ValueError, t.detach)
2643
2644        # Operations independent of the detached stream should still work
2645        repr(t)
2646        self.assertEqual(t.encoding, "ascii")
2647        self.assertEqual(t.errors, "strict")
2648        self.assertFalse(t.line_buffering)
2649        self.assertFalse(t.write_through)
2650
2651    def test_repr(self):
2652        raw = self.BytesIO("hello".encode("utf-8"))
2653        b = self.BufferedReader(raw)
2654        t = self.TextIOWrapper(b, encoding="utf-8")
2655        modname = self.TextIOWrapper.__module__
2656        self.assertRegex(repr(t),
2657                         r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname)
2658        raw.name = "dummy"
2659        self.assertRegex(repr(t),
2660                         r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
2661        t.mode = "r"
2662        self.assertRegex(repr(t),
2663                         r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
2664        raw.name = b"dummy"
2665        self.assertRegex(repr(t),
2666                         r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
2667
2668        t.buffer.detach()
2669        repr(t)  # Should not raise an exception
2670
2671    def test_recursive_repr(self):
2672        # Issue #25455
2673        raw = self.BytesIO()
2674        t = self.TextIOWrapper(raw, encoding="utf-8")
2675        with support.swap_attr(raw, 'name', t):
2676            try:
2677                repr(t)  # Should not crash
2678            except RuntimeError:
2679                pass
2680
2681    def test_line_buffering(self):
2682        r = self.BytesIO()
2683        b = self.BufferedWriter(r, 1000)
2684        t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=True)
2685        t.write("X")
2686        self.assertEqual(r.getvalue(), b"")  # No flush happened
2687        t.write("Y\nZ")
2688        self.assertEqual(r.getvalue(), b"XY\nZ")  # All got flushed
2689        t.write("A\rB")
2690        self.assertEqual(r.getvalue(), b"XY\nZA\rB")
2691
2692    def test_reconfigure_line_buffering(self):
2693        r = self.BytesIO()
2694        b = self.BufferedWriter(r, 1000)
2695        t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=False)
2696        t.write("AB\nC")
2697        self.assertEqual(r.getvalue(), b"")
2698
2699        t.reconfigure(line_buffering=True)   # implicit flush
2700        self.assertEqual(r.getvalue(), b"AB\nC")
2701        t.write("DEF\nG")
2702        self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2703        t.write("H")
2704        self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2705        t.reconfigure(line_buffering=False)   # implicit flush
2706        self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2707        t.write("IJ")
2708        self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2709
2710        # Keeping default value
2711        t.reconfigure()
2712        t.reconfigure(line_buffering=None)
2713        self.assertEqual(t.line_buffering, False)
2714        t.reconfigure(line_buffering=True)
2715        t.reconfigure()
2716        t.reconfigure(line_buffering=None)
2717        self.assertEqual(t.line_buffering, True)
2718
2719    @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
2720    def test_default_encoding(self):
2721        old_environ = dict(os.environ)
2722        try:
2723            # try to get a user preferred encoding different than the current
2724            # locale encoding to check that TextIOWrapper() uses the current
2725            # locale encoding and not the user preferred encoding
2726            for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2727                if key in os.environ:
2728                    del os.environ[key]
2729
2730            current_locale_encoding = locale.getpreferredencoding(False)
2731            b = self.BytesIO()
2732            with warnings.catch_warnings():
2733                warnings.simplefilter("ignore", EncodingWarning)
2734                t = self.TextIOWrapper(b)
2735            self.assertEqual(t.encoding, current_locale_encoding)
2736        finally:
2737            os.environ.clear()
2738            os.environ.update(old_environ)
2739
2740    @support.cpython_only
2741    @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
2742    def test_device_encoding(self):
2743        # Issue 15989
2744        import _testcapi
2745        b = self.BytesIO()
2746        b.fileno = lambda: _testcapi.INT_MAX + 1
2747        self.assertRaises(OverflowError, self.TextIOWrapper, b, encoding="locale")
2748        b.fileno = lambda: _testcapi.UINT_MAX + 1
2749        self.assertRaises(OverflowError, self.TextIOWrapper, b, encoding="locale")
2750
2751    def test_encoding(self):
2752        # Check the encoding attribute is always set, and valid
2753        b = self.BytesIO()
2754        t = self.TextIOWrapper(b, encoding="utf-8")
2755        self.assertEqual(t.encoding, "utf-8")
2756        with warnings.catch_warnings():
2757            warnings.simplefilter("ignore", EncodingWarning)
2758            t = self.TextIOWrapper(b)
2759        self.assertIsNotNone(t.encoding)
2760        codecs.lookup(t.encoding)
2761
2762    def test_encoding_errors_reading(self):
2763        # (1) default
2764        b = self.BytesIO(b"abc\n\xff\n")
2765        t = self.TextIOWrapper(b, encoding="ascii")
2766        self.assertRaises(UnicodeError, t.read)
2767        # (2) explicit strict
2768        b = self.BytesIO(b"abc\n\xff\n")
2769        t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
2770        self.assertRaises(UnicodeError, t.read)
2771        # (3) ignore
2772        b = self.BytesIO(b"abc\n\xff\n")
2773        t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
2774        self.assertEqual(t.read(), "abc\n\n")
2775        # (4) replace
2776        b = self.BytesIO(b"abc\n\xff\n")
2777        t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
2778        self.assertEqual(t.read(), "abc\n\ufffd\n")
2779
2780    def test_encoding_errors_writing(self):
2781        # (1) default
2782        b = self.BytesIO()
2783        t = self.TextIOWrapper(b, encoding="ascii")
2784        self.assertRaises(UnicodeError, t.write, "\xff")
2785        # (2) explicit strict
2786        b = self.BytesIO()
2787        t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
2788        self.assertRaises(UnicodeError, t.write, "\xff")
2789        # (3) ignore
2790        b = self.BytesIO()
2791        t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
2792                             newline="\n")
2793        t.write("abc\xffdef\n")
2794        t.flush()
2795        self.assertEqual(b.getvalue(), b"abcdef\n")
2796        # (4) replace
2797        b = self.BytesIO()
2798        t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
2799                             newline="\n")
2800        t.write("abc\xffdef\n")
2801        t.flush()
2802        self.assertEqual(b.getvalue(), b"abc?def\n")
2803
2804    def test_newlines(self):
2805        input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2806
2807        tests = [
2808            [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
2809            [ '', input_lines ],
2810            [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2811            [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2812            [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
2813        ]
2814        encodings = (
2815            'utf-8', 'latin-1',
2816            'utf-16', 'utf-16-le', 'utf-16-be',
2817            'utf-32', 'utf-32-le', 'utf-32-be',
2818        )
2819
2820        # Try a range of buffer sizes to test the case where \r is the last
2821        # character in TextIOWrapper._pending_line.
2822        for encoding in encodings:
2823            # XXX: str.encode() should return bytes
2824            data = bytes(''.join(input_lines).encode(encoding))
2825            for do_reads in (False, True):
2826                for bufsize in range(1, 10):
2827                    for newline, exp_lines in tests:
2828                        bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2829                        textio = self.TextIOWrapper(bufio, newline=newline,
2830                                                  encoding=encoding)
2831                        if do_reads:
2832                            got_lines = []
2833                            while True:
2834                                c2 = textio.read(2)
2835                                if c2 == '':
2836                                    break
2837                                self.assertEqual(len(c2), 2)
2838                                got_lines.append(c2 + textio.readline())
2839                        else:
2840                            got_lines = list(textio)
2841
2842                        for got_line, exp_line in zip(got_lines, exp_lines):
2843                            self.assertEqual(got_line, exp_line)
2844                        self.assertEqual(len(got_lines), len(exp_lines))
2845
2846    def test_newlines_input(self):
2847        testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
2848        normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2849        for newline, expected in [
2850            (None, normalized.decode("ascii").splitlines(keepends=True)),
2851            ("", testdata.decode("ascii").splitlines(keepends=True)),
2852            ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2853            ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2854            ("\r",  ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
2855            ]:
2856            buf = self.BytesIO(testdata)
2857            txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2858            self.assertEqual(txt.readlines(), expected)
2859            txt.seek(0)
2860            self.assertEqual(txt.read(), "".join(expected))
2861
2862    def test_newlines_output(self):
2863        testdict = {
2864            "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2865            "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2866            "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2867            "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2868            }
2869        tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2870        for newline, expected in tests:
2871            buf = self.BytesIO()
2872            txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2873            txt.write("AAA\nB")
2874            txt.write("BB\nCCC\n")
2875            txt.write("X\rY\r\nZ")
2876            txt.flush()
2877            self.assertEqual(buf.closed, False)
2878            self.assertEqual(buf.getvalue(), expected)
2879
2880    def test_destructor(self):
2881        l = []
2882        base = self.BytesIO
2883        class MyBytesIO(base):
2884            def close(self):
2885                l.append(self.getvalue())
2886                base.close(self)
2887        b = MyBytesIO()
2888        t = self.TextIOWrapper(b, encoding="ascii")
2889        t.write("abc")
2890        del t
2891        support.gc_collect()
2892        self.assertEqual([b"abc"], l)
2893
2894    def test_override_destructor(self):
2895        record = []
2896        class MyTextIO(self.TextIOWrapper):
2897            def __del__(self):
2898                record.append(1)
2899                try:
2900                    f = super().__del__
2901                except AttributeError:
2902                    pass
2903                else:
2904                    f()
2905            def close(self):
2906                record.append(2)
2907                super().close()
2908            def flush(self):
2909                record.append(3)
2910                super().flush()
2911        b = self.BytesIO()
2912        t = MyTextIO(b, encoding="ascii")
2913        del t
2914        support.gc_collect()
2915        self.assertEqual(record, [1, 2, 3])
2916
2917    def test_error_through_destructor(self):
2918        # Test that the exception state is not modified by a destructor,
2919        # even if close() fails.
2920        rawio = self.CloseFailureIO()
2921        with support.catch_unraisable_exception() as cm:
2922            with self.assertRaises(AttributeError):
2923                self.TextIOWrapper(rawio, encoding="utf-8").xyzzy
2924
2925            if not IOBASE_EMITS_UNRAISABLE:
2926                self.assertIsNone(cm.unraisable)
2927            elif cm.unraisable is not None:
2928                self.assertEqual(cm.unraisable.exc_type, OSError)
2929
2930    # Systematic tests of the text I/O API
2931
2932    def test_basic_io(self):
2933        for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2934            for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
2935                f = self.open(os_helper.TESTFN, "w+", encoding=enc)
2936                f._CHUNK_SIZE = chunksize
2937                self.assertEqual(f.write("abc"), 3)
2938                f.close()
2939                f = self.open(os_helper.TESTFN, "r+", encoding=enc)
2940                f._CHUNK_SIZE = chunksize
2941                self.assertEqual(f.tell(), 0)
2942                self.assertEqual(f.read(), "abc")
2943                cookie = f.tell()
2944                self.assertEqual(f.seek(0), 0)
2945                self.assertEqual(f.read(None), "abc")
2946                f.seek(0)
2947                self.assertEqual(f.read(2), "ab")
2948                self.assertEqual(f.read(1), "c")
2949                self.assertEqual(f.read(1), "")
2950                self.assertEqual(f.read(), "")
2951                self.assertEqual(f.tell(), cookie)
2952                self.assertEqual(f.seek(0), 0)
2953                self.assertEqual(f.seek(0, 2), cookie)
2954                self.assertEqual(f.write("def"), 3)
2955                self.assertEqual(f.seek(cookie), cookie)
2956                self.assertEqual(f.read(), "def")
2957                if enc.startswith("utf"):
2958                    self.multi_line_test(f, enc)
2959                f.close()
2960
2961    def multi_line_test(self, f, enc):
2962        f.seek(0)
2963        f.truncate()
2964        sample = "s\xff\u0fff\uffff"
2965        wlines = []
2966        for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2967            chars = []
2968            for i in range(size):
2969                chars.append(sample[i % len(sample)])
2970            line = "".join(chars) + "\n"
2971            wlines.append((f.tell(), line))
2972            f.write(line)
2973        f.seek(0)
2974        rlines = []
2975        while True:
2976            pos = f.tell()
2977            line = f.readline()
2978            if not line:
2979                break
2980            rlines.append((pos, line))
2981        self.assertEqual(rlines, wlines)
2982
2983    def test_telling(self):
2984        f = self.open(os_helper.TESTFN, "w+", encoding="utf-8")
2985        p0 = f.tell()
2986        f.write("\xff\n")
2987        p1 = f.tell()
2988        f.write("\xff\n")
2989        p2 = f.tell()
2990        f.seek(0)
2991        self.assertEqual(f.tell(), p0)
2992        self.assertEqual(f.readline(), "\xff\n")
2993        self.assertEqual(f.tell(), p1)
2994        self.assertEqual(f.readline(), "\xff\n")
2995        self.assertEqual(f.tell(), p2)
2996        f.seek(0)
2997        for line in f:
2998            self.assertEqual(line, "\xff\n")
2999            self.assertRaises(OSError, f.tell)
3000        self.assertEqual(f.tell(), p2)
3001        f.close()
3002
3003    def test_seeking(self):
3004        chunk_size = _default_chunk_size()
3005        prefix_size = chunk_size - 2
3006        u_prefix = "a" * prefix_size
3007        prefix = bytes(u_prefix.encode("utf-8"))
3008        self.assertEqual(len(u_prefix), len(prefix))
3009        u_suffix = "\u8888\n"
3010        suffix = bytes(u_suffix.encode("utf-8"))
3011        line = prefix + suffix
3012        with self.open(os_helper.TESTFN, "wb") as f:
3013            f.write(line*2)
3014        with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
3015            s = f.read(prefix_size)
3016            self.assertEqual(s, str(prefix, "ascii"))
3017            self.assertEqual(f.tell(), prefix_size)
3018            self.assertEqual(f.readline(), u_suffix)
3019
3020    def test_seeking_too(self):
3021        # Regression test for a specific bug
3022        data = b'\xe0\xbf\xbf\n'
3023        with self.open(os_helper.TESTFN, "wb") as f:
3024            f.write(data)
3025        with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
3026            f._CHUNK_SIZE  # Just test that it exists
3027            f._CHUNK_SIZE = 2
3028            f.readline()
3029            f.tell()
3030
3031    def test_seek_and_tell(self):
3032        #Test seek/tell using the StatefulIncrementalDecoder.
3033        # Make test faster by doing smaller seeks
3034        CHUNK_SIZE = 128
3035
3036        def test_seek_and_tell_with_data(data, min_pos=0):
3037            """Tell/seek to various points within a data stream and ensure
3038            that the decoded data returned by read() is consistent."""
3039            f = self.open(os_helper.TESTFN, 'wb')
3040            f.write(data)
3041            f.close()
3042            f = self.open(os_helper.TESTFN, encoding='test_decoder')
3043            f._CHUNK_SIZE = CHUNK_SIZE
3044            decoded = f.read()
3045            f.close()
3046
3047            for i in range(min_pos, len(decoded) + 1): # seek positions
3048                for j in [1, 5, len(decoded) - i]: # read lengths
3049                    f = self.open(os_helper.TESTFN, encoding='test_decoder')
3050                    self.assertEqual(f.read(i), decoded[:i])
3051                    cookie = f.tell()
3052                    self.assertEqual(f.read(j), decoded[i:i + j])
3053                    f.seek(cookie)
3054                    self.assertEqual(f.read(), decoded[i:])
3055                    f.close()
3056
3057        # Enable the test decoder.
3058        StatefulIncrementalDecoder.codecEnabled = 1
3059
3060        # Run the tests.
3061        try:
3062            # Try each test case.
3063            for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
3064                test_seek_and_tell_with_data(input)
3065
3066            # Position each test case so that it crosses a chunk boundary.
3067            for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
3068                offset = CHUNK_SIZE - len(input)//2
3069                prefix = b'.'*offset
3070                # Don't bother seeking into the prefix (takes too long).
3071                min_pos = offset*2
3072                test_seek_and_tell_with_data(prefix + input, min_pos)
3073
3074        # Ensure our test decoder won't interfere with subsequent tests.
3075        finally:
3076            StatefulIncrementalDecoder.codecEnabled = 0
3077
3078    def test_multibyte_seek_and_tell(self):
3079        f = self.open(os_helper.TESTFN, "w", encoding="euc_jp")
3080        f.write("AB\n\u3046\u3048\n")
3081        f.close()
3082
3083        f = self.open(os_helper.TESTFN, "r", encoding="euc_jp")
3084        self.assertEqual(f.readline(), "AB\n")
3085        p0 = f.tell()
3086        self.assertEqual(f.readline(), "\u3046\u3048\n")
3087        p1 = f.tell()
3088        f.seek(p0)
3089        self.assertEqual(f.readline(), "\u3046\u3048\n")
3090        self.assertEqual(f.tell(), p1)
3091        f.close()
3092
3093    def test_seek_with_encoder_state(self):
3094        f = self.open(os_helper.TESTFN, "w", encoding="euc_jis_2004")
3095        f.write("\u00e6\u0300")
3096        p0 = f.tell()
3097        f.write("\u00e6")
3098        f.seek(p0)
3099        f.write("\u0300")
3100        f.close()
3101
3102        f = self.open(os_helper.TESTFN, "r", encoding="euc_jis_2004")
3103        self.assertEqual(f.readline(), "\u00e6\u0300\u0300")
3104        f.close()
3105
3106    def test_encoded_writes(self):
3107        data = "1234567890"
3108        tests = ("utf-16",
3109                 "utf-16-le",
3110                 "utf-16-be",
3111                 "utf-32",
3112                 "utf-32-le",
3113                 "utf-32-be")
3114        for encoding in tests:
3115            buf = self.BytesIO()
3116            f = self.TextIOWrapper(buf, encoding=encoding)
3117            # Check if the BOM is written only once (see issue1753).
3118            f.write(data)
3119            f.write(data)
3120            f.seek(0)
3121            self.assertEqual(f.read(), data * 2)
3122            f.seek(0)
3123            self.assertEqual(f.read(), data * 2)
3124            self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
3125
3126    def test_unreadable(self):
3127        class UnReadable(self.BytesIO):
3128            def readable(self):
3129                return False
3130        txt = self.TextIOWrapper(UnReadable(), encoding="utf-8")
3131        self.assertRaises(OSError, txt.read)
3132
3133    def test_read_one_by_one(self):
3134        txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"), encoding="utf-8")
3135        reads = ""
3136        while True:
3137            c = txt.read(1)
3138            if not c:
3139                break
3140            reads += c
3141        self.assertEqual(reads, "AA\nBB")
3142
3143    def test_readlines(self):
3144        txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"), encoding="utf-8")
3145        self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
3146        txt.seek(0)
3147        self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
3148        txt.seek(0)
3149        self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
3150
3151    # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
3152    def test_read_by_chunk(self):
3153        # make sure "\r\n" straddles 128 char boundary.
3154        txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"), encoding="utf-8")
3155        reads = ""
3156        while True:
3157            c = txt.read(128)
3158            if not c:
3159                break
3160            reads += c
3161        self.assertEqual(reads, "A"*127+"\nB")
3162
3163    def test_writelines(self):
3164        l = ['ab', 'cd', 'ef']
3165        buf = self.BytesIO()
3166        txt = self.TextIOWrapper(buf, encoding="utf-8")
3167        txt.writelines(l)
3168        txt.flush()
3169        self.assertEqual(buf.getvalue(), b'abcdef')
3170
3171    def test_writelines_userlist(self):
3172        l = UserList(['ab', 'cd', 'ef'])
3173        buf = self.BytesIO()
3174        txt = self.TextIOWrapper(buf, encoding="utf-8")
3175        txt.writelines(l)
3176        txt.flush()
3177        self.assertEqual(buf.getvalue(), b'abcdef')
3178
3179    def test_writelines_error(self):
3180        txt = self.TextIOWrapper(self.BytesIO(), encoding="utf-8")
3181        self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
3182        self.assertRaises(TypeError, txt.writelines, None)
3183        self.assertRaises(TypeError, txt.writelines, b'abc')
3184
3185    def test_issue1395_1(self):
3186        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3187
3188        # read one char at a time
3189        reads = ""
3190        while True:
3191            c = txt.read(1)
3192            if not c:
3193                break
3194            reads += c
3195        self.assertEqual(reads, self.normalized)
3196
3197    def test_issue1395_2(self):
3198        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3199        txt._CHUNK_SIZE = 4
3200
3201        reads = ""
3202        while True:
3203            c = txt.read(4)
3204            if not c:
3205                break
3206            reads += c
3207        self.assertEqual(reads, self.normalized)
3208
3209    def test_issue1395_3(self):
3210        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3211        txt._CHUNK_SIZE = 4
3212
3213        reads = txt.read(4)
3214        reads += txt.read(4)
3215        reads += txt.readline()
3216        reads += txt.readline()
3217        reads += txt.readline()
3218        self.assertEqual(reads, self.normalized)
3219
3220    def test_issue1395_4(self):
3221        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3222        txt._CHUNK_SIZE = 4
3223
3224        reads = txt.read(4)
3225        reads += txt.read()
3226        self.assertEqual(reads, self.normalized)
3227
3228    def test_issue1395_5(self):
3229        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3230        txt._CHUNK_SIZE = 4
3231
3232        reads = txt.read(4)
3233        pos = txt.tell()
3234        txt.seek(0)
3235        txt.seek(pos)
3236        self.assertEqual(txt.read(4), "BBB\n")
3237
3238    def test_issue2282(self):
3239        buffer = self.BytesIO(self.testdata)
3240        txt = self.TextIOWrapper(buffer, encoding="ascii")
3241
3242        self.assertEqual(buffer.seekable(), txt.seekable())
3243
3244    def test_append_bom(self):
3245        # The BOM is not written again when appending to a non-empty file
3246        filename = os_helper.TESTFN
3247        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3248            with self.open(filename, 'w', encoding=charset) as f:
3249                f.write('aaa')
3250                pos = f.tell()
3251            with self.open(filename, 'rb') as f:
3252                self.assertEqual(f.read(), 'aaa'.encode(charset))
3253
3254            with self.open(filename, 'a', encoding=charset) as f:
3255                f.write('xxx')
3256            with self.open(filename, 'rb') as f:
3257                self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3258
3259    def test_seek_bom(self):
3260        # Same test, but when seeking manually
3261        filename = os_helper.TESTFN
3262        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3263            with self.open(filename, 'w', encoding=charset) as f:
3264                f.write('aaa')
3265                pos = f.tell()
3266            with self.open(filename, 'r+', encoding=charset) as f:
3267                f.seek(pos)
3268                f.write('zzz')
3269                f.seek(0)
3270                f.write('bbb')
3271            with self.open(filename, 'rb') as f:
3272                self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
3273
3274    def test_seek_append_bom(self):
3275        # Same test, but first seek to the start and then to the end
3276        filename = os_helper.TESTFN
3277        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3278            with self.open(filename, 'w', encoding=charset) as f:
3279                f.write('aaa')
3280            with self.open(filename, 'a', encoding=charset) as f:
3281                f.seek(0)
3282                f.seek(0, self.SEEK_END)
3283                f.write('xxx')
3284            with self.open(filename, 'rb') as f:
3285                self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3286
3287    def test_errors_property(self):
3288        with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
3289            self.assertEqual(f.errors, "strict")
3290        with self.open(os_helper.TESTFN, "w", encoding="utf-8", errors="replace") as f:
3291            self.assertEqual(f.errors, "replace")
3292
3293    @support.no_tracing
3294    def test_threads_write(self):
3295        # Issue6750: concurrent writes could duplicate data
3296        event = threading.Event()
3297        with self.open(os_helper.TESTFN, "w", encoding="utf-8", buffering=1) as f:
3298            def run(n):
3299                text = "Thread%03d\n" % n
3300                event.wait()
3301                f.write(text)
3302            threads = [threading.Thread(target=run, args=(x,))
3303                       for x in range(20)]
3304            with threading_helper.start_threads(threads, event.set):
3305                time.sleep(0.02)
3306        with self.open(os_helper.TESTFN, encoding="utf-8") as f:
3307            content = f.read()
3308            for n in range(20):
3309                self.assertEqual(content.count("Thread%03d\n" % n), 1)
3310
3311    def test_flush_error_on_close(self):
3312        # Test that text file is closed despite failed flush
3313        # and that flush() is called before file closed.
3314        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3315        closed = []
3316        def bad_flush():
3317            closed[:] = [txt.closed, txt.buffer.closed]
3318            raise OSError()
3319        txt.flush = bad_flush
3320        self.assertRaises(OSError, txt.close) # exception not swallowed
3321        self.assertTrue(txt.closed)
3322        self.assertTrue(txt.buffer.closed)
3323        self.assertTrue(closed)      # flush() called
3324        self.assertFalse(closed[0])  # flush() called before file closed
3325        self.assertFalse(closed[1])
3326        txt.flush = lambda: None  # break reference loop
3327
3328    def test_close_error_on_close(self):
3329        buffer = self.BytesIO(self.testdata)
3330        def bad_flush():
3331            raise OSError('flush')
3332        def bad_close():
3333            raise OSError('close')
3334        buffer.close = bad_close
3335        txt = self.TextIOWrapper(buffer, encoding="ascii")
3336        txt.flush = bad_flush
3337        with self.assertRaises(OSError) as err: # exception not swallowed
3338            txt.close()
3339        self.assertEqual(err.exception.args, ('close',))
3340        self.assertIsInstance(err.exception.__context__, OSError)
3341        self.assertEqual(err.exception.__context__.args, ('flush',))
3342        self.assertFalse(txt.closed)
3343
3344        # Silence destructor error
3345        buffer.close = lambda: None
3346        txt.flush = lambda: None
3347
3348    def test_nonnormalized_close_error_on_close(self):
3349        # Issue #21677
3350        buffer = self.BytesIO(self.testdata)
3351        def bad_flush():
3352            raise non_existing_flush
3353        def bad_close():
3354            raise non_existing_close
3355        buffer.close = bad_close
3356        txt = self.TextIOWrapper(buffer, encoding="ascii")
3357        txt.flush = bad_flush
3358        with self.assertRaises(NameError) as err: # exception not swallowed
3359            txt.close()
3360        self.assertIn('non_existing_close', str(err.exception))
3361        self.assertIsInstance(err.exception.__context__, NameError)
3362        self.assertIn('non_existing_flush', str(err.exception.__context__))
3363        self.assertFalse(txt.closed)
3364
3365        # Silence destructor error
3366        buffer.close = lambda: None
3367        txt.flush = lambda: None
3368
3369    def test_multi_close(self):
3370        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3371        txt.close()
3372        txt.close()
3373        txt.close()
3374        self.assertRaises(ValueError, txt.flush)
3375
3376    def test_unseekable(self):
3377        txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata), encoding="utf-8")
3378        self.assertRaises(self.UnsupportedOperation, txt.tell)
3379        self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3380
3381    def test_readonly_attributes(self):
3382        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3383        buf = self.BytesIO(self.testdata)
3384        with self.assertRaises(AttributeError):
3385            txt.buffer = buf
3386
3387    def test_rawio(self):
3388        # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3389        # that subprocess.Popen() can have the required unbuffered
3390        # semantics with universal_newlines=True.
3391        raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3392        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3393        # Reads
3394        self.assertEqual(txt.read(4), 'abcd')
3395        self.assertEqual(txt.readline(), 'efghi\n')
3396        self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3397
3398    def test_rawio_write_through(self):
3399        # Issue #12591: with write_through=True, writes don't need a flush
3400        raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3401        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3402                                 write_through=True)
3403        txt.write('1')
3404        txt.write('23\n4')
3405        txt.write('5')
3406        self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3407
3408    def test_bufio_write_through(self):
3409        # Issue #21396: write_through=True doesn't force a flush()
3410        # on the underlying binary buffered object.
3411        flush_called, write_called = [], []
3412        class BufferedWriter(self.BufferedWriter):
3413            def flush(self, *args, **kwargs):
3414                flush_called.append(True)
3415                return super().flush(*args, **kwargs)
3416            def write(self, *args, **kwargs):
3417                write_called.append(True)
3418                return super().write(*args, **kwargs)
3419
3420        rawio = self.BytesIO()
3421        data = b"a"
3422        bufio = BufferedWriter(rawio, len(data)*2)
3423        textio = self.TextIOWrapper(bufio, encoding='ascii',
3424                                    write_through=True)
3425        # write to the buffered io but don't overflow the buffer
3426        text = data.decode('ascii')
3427        textio.write(text)
3428
3429        # buffer.flush is not called with write_through=True
3430        self.assertFalse(flush_called)
3431        # buffer.write *is* called with write_through=True
3432        self.assertTrue(write_called)
3433        self.assertEqual(rawio.getvalue(), b"") # no flush
3434
3435        write_called = [] # reset
3436        textio.write(text * 10) # total content is larger than bufio buffer
3437        self.assertTrue(write_called)
3438        self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3439
3440    def test_reconfigure_write_through(self):
3441        raw = self.MockRawIO([])
3442        t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3443        t.write('1')
3444        t.reconfigure(write_through=True)  # implied flush
3445        self.assertEqual(t.write_through, True)
3446        self.assertEqual(b''.join(raw._write_stack), b'1')
3447        t.write('23')
3448        self.assertEqual(b''.join(raw._write_stack), b'123')
3449        t.reconfigure(write_through=False)
3450        self.assertEqual(t.write_through, False)
3451        t.write('45')
3452        t.flush()
3453        self.assertEqual(b''.join(raw._write_stack), b'12345')
3454        # Keeping default value
3455        t.reconfigure()
3456        t.reconfigure(write_through=None)
3457        self.assertEqual(t.write_through, False)
3458        t.reconfigure(write_through=True)
3459        t.reconfigure()
3460        t.reconfigure(write_through=None)
3461        self.assertEqual(t.write_through, True)
3462
3463    def test_read_nonbytes(self):
3464        # Issue #17106
3465        # Crash when underlying read() returns non-bytes
3466        t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
3467        self.assertRaises(TypeError, t.read, 1)
3468        t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
3469        self.assertRaises(TypeError, t.readline)
3470        t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
3471        self.assertRaises(TypeError, t.read)
3472
3473    def test_illegal_encoder(self):
3474        # Issue 31271: Calling write() while the return value of encoder's
3475        # encode() is invalid shouldn't cause an assertion failure.
3476        rot13 = codecs.lookup("rot13")
3477        with support.swap_attr(rot13, '_is_text_encoding', True):
3478            t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
3479        self.assertRaises(TypeError, t.write, 'bar')
3480
3481    def test_illegal_decoder(self):
3482        # Issue #17106
3483        # Bypass the early encoding check added in issue 20404
3484        def _make_illegal_wrapper():
3485            quopri = codecs.lookup("quopri")
3486            quopri._is_text_encoding = True
3487            try:
3488                t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3489                                       newline='\n', encoding="quopri")
3490            finally:
3491                quopri._is_text_encoding = False
3492            return t
3493        # Crash when decoder returns non-string
3494        t = _make_illegal_wrapper()
3495        self.assertRaises(TypeError, t.read, 1)
3496        t = _make_illegal_wrapper()
3497        self.assertRaises(TypeError, t.readline)
3498        t = _make_illegal_wrapper()
3499        self.assertRaises(TypeError, t.read)
3500
3501        # Issue 31243: calling read() while the return value of decoder's
3502        # getstate() is invalid should neither crash the interpreter nor
3503        # raise a SystemError.
3504        def _make_very_illegal_wrapper(getstate_ret_val):
3505            class BadDecoder:
3506                def getstate(self):
3507                    return getstate_ret_val
3508            def _get_bad_decoder(dummy):
3509                return BadDecoder()
3510            quopri = codecs.lookup("quopri")
3511            with support.swap_attr(quopri, 'incrementaldecoder',
3512                                   _get_bad_decoder):
3513                return _make_illegal_wrapper()
3514        t = _make_very_illegal_wrapper(42)
3515        self.assertRaises(TypeError, t.read, 42)
3516        t = _make_very_illegal_wrapper(())
3517        self.assertRaises(TypeError, t.read, 42)
3518        t = _make_very_illegal_wrapper((1, 2))
3519        self.assertRaises(TypeError, t.read, 42)
3520
3521    def _check_create_at_shutdown(self, **kwargs):
3522        # Issue #20037: creating a TextIOWrapper at shutdown
3523        # shouldn't crash the interpreter.
3524        iomod = self.io.__name__
3525        code = """if 1:
3526            import codecs
3527            import {iomod} as io
3528
3529            # Avoid looking up codecs at shutdown
3530            codecs.lookup('utf-8')
3531
3532            class C:
3533                def __init__(self):
3534                    self.buf = io.BytesIO()
3535                def __del__(self):
3536                    io.TextIOWrapper(self.buf, **{kwargs})
3537                    print("ok")
3538            c = C()
3539            """.format(iomod=iomod, kwargs=kwargs)
3540        return assert_python_ok("-c", code)
3541
3542    def test_create_at_shutdown_without_encoding(self):
3543        rc, out, err = self._check_create_at_shutdown()
3544        if err:
3545            # Can error out with a RuntimeError if the module state
3546            # isn't found.
3547            self.assertIn(self.shutdown_error, err.decode())
3548        else:
3549            self.assertEqual("ok", out.decode().strip())
3550
3551    def test_create_at_shutdown_with_encoding(self):
3552        rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3553                                                      errors='strict')
3554        self.assertFalse(err)
3555        self.assertEqual("ok", out.decode().strip())
3556
3557    def test_read_byteslike(self):
3558        r = MemviewBytesIO(b'Just some random string\n')
3559        t = self.TextIOWrapper(r, 'utf-8')
3560
3561        # TextIOwrapper will not read the full string, because
3562        # we truncate it to a multiple of the native int size
3563        # so that we can construct a more complex memoryview.
3564        bytes_val =  _to_memoryview(r.getvalue()).tobytes()
3565
3566        self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3567
3568    def test_issue22849(self):
3569        class F(object):
3570            def readable(self): return True
3571            def writable(self): return True
3572            def seekable(self): return True
3573
3574        for i in range(10):
3575            try:
3576                self.TextIOWrapper(F(), encoding='utf-8')
3577            except Exception:
3578                pass
3579
3580        F.tell = lambda x: 0
3581        t = self.TextIOWrapper(F(), encoding='utf-8')
3582
3583    def test_reconfigure_encoding_read(self):
3584        # latin1 -> utf8
3585        # (latin1 can decode utf-8 encoded string)
3586        data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
3587        raw = self.BytesIO(data)
3588        txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3589        self.assertEqual(txt.readline(), 'abc\xe9\n')
3590        with self.assertRaises(self.UnsupportedOperation):
3591            txt.reconfigure(encoding='utf-8')
3592        with self.assertRaises(self.UnsupportedOperation):
3593            txt.reconfigure(newline=None)
3594
3595    def test_reconfigure_write_fromascii(self):
3596        # ascii has a specific encodefunc in the C implementation,
3597        # but utf-8-sig has not. Make sure that we get rid of the
3598        # cached encodefunc when we switch encoders.
3599        raw = self.BytesIO()
3600        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3601        txt.write('foo\n')
3602        txt.reconfigure(encoding='utf-8-sig')
3603        txt.write('\xe9\n')
3604        txt.flush()
3605        self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
3606
3607    def test_reconfigure_write(self):
3608        # latin -> utf8
3609        raw = self.BytesIO()
3610        txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3611        txt.write('abc\xe9\n')
3612        txt.reconfigure(encoding='utf-8')
3613        self.assertEqual(raw.getvalue(), b'abc\xe9\n')
3614        txt.write('d\xe9f\n')
3615        txt.flush()
3616        self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
3617
3618        # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
3619        # the file
3620        raw = self.BytesIO()
3621        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3622        txt.write('abc\n')
3623        txt.reconfigure(encoding='utf-8-sig')
3624        txt.write('d\xe9f\n')
3625        txt.flush()
3626        self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
3627
3628    def test_reconfigure_write_non_seekable(self):
3629        raw = self.BytesIO()
3630        raw.seekable = lambda: False
3631        raw.seek = None
3632        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3633        txt.write('abc\n')
3634        txt.reconfigure(encoding='utf-8-sig')
3635        txt.write('d\xe9f\n')
3636        txt.flush()
3637
3638        # If the raw stream is not seekable, there'll be a BOM
3639        self.assertEqual(raw.getvalue(),  b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
3640
3641    def test_reconfigure_defaults(self):
3642        txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
3643        txt.reconfigure(encoding=None)
3644        self.assertEqual(txt.encoding, 'ascii')
3645        self.assertEqual(txt.errors, 'replace')
3646        txt.write('LF\n')
3647
3648        txt.reconfigure(newline='\r\n')
3649        self.assertEqual(txt.encoding, 'ascii')
3650        self.assertEqual(txt.errors, 'replace')
3651
3652        txt.reconfigure(errors='ignore')
3653        self.assertEqual(txt.encoding, 'ascii')
3654        self.assertEqual(txt.errors, 'ignore')
3655        txt.write('CRLF\n')
3656
3657        txt.reconfigure(encoding='utf-8', newline=None)
3658        self.assertEqual(txt.errors, 'strict')
3659        txt.seek(0)
3660        self.assertEqual(txt.read(), 'LF\nCRLF\n')
3661
3662        self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
3663
3664    def test_reconfigure_newline(self):
3665        raw = self.BytesIO(b'CR\rEOF')
3666        txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3667        txt.reconfigure(newline=None)
3668        self.assertEqual(txt.readline(), 'CR\n')
3669        raw = self.BytesIO(b'CR\rEOF')
3670        txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3671        txt.reconfigure(newline='')
3672        self.assertEqual(txt.readline(), 'CR\r')
3673        raw = self.BytesIO(b'CR\rLF\nEOF')
3674        txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3675        txt.reconfigure(newline='\n')
3676        self.assertEqual(txt.readline(), 'CR\rLF\n')
3677        raw = self.BytesIO(b'LF\nCR\rEOF')
3678        txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3679        txt.reconfigure(newline='\r')
3680        self.assertEqual(txt.readline(), 'LF\nCR\r')
3681        raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
3682        txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3683        txt.reconfigure(newline='\r\n')
3684        self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
3685
3686        txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
3687        txt.reconfigure(newline=None)
3688        txt.write('linesep\n')
3689        txt.reconfigure(newline='')
3690        txt.write('LF\n')
3691        txt.reconfigure(newline='\n')
3692        txt.write('LF\n')
3693        txt.reconfigure(newline='\r')
3694        txt.write('CR\n')
3695        txt.reconfigure(newline='\r\n')
3696        txt.write('CRLF\n')
3697        expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
3698        self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
3699
3700    def test_issue25862(self):
3701        # Assertion failures occurred in tell() after read() and write().
3702        t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3703        t.read(1)
3704        t.read()
3705        t.tell()
3706        t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3707        t.read(1)
3708        t.write('x')
3709        t.tell()
3710
3711
3712class MemviewBytesIO(io.BytesIO):
3713    '''A BytesIO object whose read method returns memoryviews
3714       rather than bytes'''
3715
3716    def read1(self, len_):
3717        return _to_memoryview(super().read1(len_))
3718
3719    def read(self, len_):
3720        return _to_memoryview(super().read(len_))
3721
3722def _to_memoryview(buf):
3723    '''Convert bytes-object *buf* to a non-trivial memoryview'''
3724
3725    arr = array.array('i')
3726    idx = len(buf) - len(buf) % arr.itemsize
3727    arr.frombytes(buf[:idx])
3728    return memoryview(arr)
3729
3730
3731class CTextIOWrapperTest(TextIOWrapperTest):
3732    io = io
3733    shutdown_error = "LookupError: unknown encoding: ascii"
3734
3735    def test_initialization(self):
3736        r = self.BytesIO(b"\xc3\xa9\n\n")
3737        b = self.BufferedReader(r, 1000)
3738        t = self.TextIOWrapper(b, encoding="utf-8")
3739        self.assertRaises(ValueError, t.__init__, b, encoding="utf-8", newline='xyzzy')
3740        self.assertRaises(ValueError, t.read)
3741
3742        t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3743        self.assertRaises(Exception, repr, t)
3744
3745    def test_garbage_collection(self):
3746        # C TextIOWrapper objects are collected, and collecting them flushes
3747        # all data to disk.
3748        # The Python version has __del__, so it ends in gc.garbage instead.
3749        with warnings_helper.check_warnings(('', ResourceWarning)):
3750            rawio = io.FileIO(os_helper.TESTFN, "wb")
3751            b = self.BufferedWriter(rawio)
3752            t = self.TextIOWrapper(b, encoding="ascii")
3753            t.write("456def")
3754            t.x = t
3755            wr = weakref.ref(t)
3756            del t
3757            support.gc_collect()
3758        self.assertIsNone(wr(), wr)
3759        with self.open(os_helper.TESTFN, "rb") as f:
3760            self.assertEqual(f.read(), b"456def")
3761
3762    def test_rwpair_cleared_before_textio(self):
3763        # Issue 13070: TextIOWrapper's finalization would crash when called
3764        # after the reference to the underlying BufferedRWPair's writer got
3765        # cleared by the GC.
3766        for i in range(1000):
3767            b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3768            t1 = self.TextIOWrapper(b1, encoding="ascii")
3769            b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3770            t2 = self.TextIOWrapper(b2, encoding="ascii")
3771            # circular references
3772            t1.buddy = t2
3773            t2.buddy = t1
3774        support.gc_collect()
3775
3776    def test_del__CHUNK_SIZE_SystemError(self):
3777        t = self.TextIOWrapper(self.BytesIO(), encoding='ascii')
3778        with self.assertRaises(AttributeError):
3779            del t._CHUNK_SIZE
3780
3781    def test_internal_buffer_size(self):
3782        # bpo-43260: TextIOWrapper's internal buffer should not store
3783        # data larger than chunk size.
3784        chunk_size = 8192  # default chunk size, updated later
3785
3786        class MockIO(self.MockRawIO):
3787            def write(self, data):
3788                if len(data) > chunk_size:
3789                    raise RuntimeError
3790                return super().write(data)
3791
3792        buf = MockIO()
3793        t = self.TextIOWrapper(buf, encoding="ascii")
3794        chunk_size = t._CHUNK_SIZE
3795        t.write("abc")
3796        t.write("def")
3797        # default chunk size is 8192 bytes so t don't write data to buf.
3798        self.assertEqual([], buf._write_stack)
3799
3800        with self.assertRaises(RuntimeError):
3801            t.write("x"*(chunk_size+1))
3802
3803        self.assertEqual([b"abcdef"], buf._write_stack)
3804        t.write("ghi")
3805        t.write("x"*chunk_size)
3806        self.assertEqual([b"abcdef", b"ghi", b"x"*chunk_size], buf._write_stack)
3807
3808
3809class PyTextIOWrapperTest(TextIOWrapperTest):
3810    io = pyio
3811    shutdown_error = "LookupError: unknown encoding: ascii"
3812
3813
3814class IncrementalNewlineDecoderTest(unittest.TestCase):
3815
3816    def check_newline_decoding_utf8(self, decoder):
3817        # UTF-8 specific tests for a newline decoder
3818        def _check_decode(b, s, **kwargs):
3819            # We exercise getstate() / setstate() as well as decode()
3820            state = decoder.getstate()
3821            self.assertEqual(decoder.decode(b, **kwargs), s)
3822            decoder.setstate(state)
3823            self.assertEqual(decoder.decode(b, **kwargs), s)
3824
3825        _check_decode(b'\xe8\xa2\x88', "\u8888")
3826
3827        _check_decode(b'\xe8', "")
3828        _check_decode(b'\xa2', "")
3829        _check_decode(b'\x88', "\u8888")
3830
3831        _check_decode(b'\xe8', "")
3832        _check_decode(b'\xa2', "")
3833        _check_decode(b'\x88', "\u8888")
3834
3835        _check_decode(b'\xe8', "")
3836        self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3837
3838        decoder.reset()
3839        _check_decode(b'\n', "\n")
3840        _check_decode(b'\r', "")
3841        _check_decode(b'', "\n", final=True)
3842        _check_decode(b'\r', "\n", final=True)
3843
3844        _check_decode(b'\r', "")
3845        _check_decode(b'a', "\na")
3846
3847        _check_decode(b'\r\r\n', "\n\n")
3848        _check_decode(b'\r', "")
3849        _check_decode(b'\r', "\n")
3850        _check_decode(b'\na', "\na")
3851
3852        _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3853        _check_decode(b'\xe8\xa2\x88', "\u8888")
3854        _check_decode(b'\n', "\n")
3855        _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3856        _check_decode(b'\n', "\n")
3857
3858    def check_newline_decoding(self, decoder, encoding):
3859        result = []
3860        if encoding is not None:
3861            encoder = codecs.getincrementalencoder(encoding)()
3862            def _decode_bytewise(s):
3863                # Decode one byte at a time
3864                for b in encoder.encode(s):
3865                    result.append(decoder.decode(bytes([b])))
3866        else:
3867            encoder = None
3868            def _decode_bytewise(s):
3869                # Decode one char at a time
3870                for c in s:
3871                    result.append(decoder.decode(c))
3872        self.assertEqual(decoder.newlines, None)
3873        _decode_bytewise("abc\n\r")
3874        self.assertEqual(decoder.newlines, '\n')
3875        _decode_bytewise("\nabc")
3876        self.assertEqual(decoder.newlines, ('\n', '\r\n'))
3877        _decode_bytewise("abc\r")
3878        self.assertEqual(decoder.newlines, ('\n', '\r\n'))
3879        _decode_bytewise("abc")
3880        self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
3881        _decode_bytewise("abc\r")
3882        self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
3883        decoder.reset()
3884        input = "abc"
3885        if encoder is not None:
3886            encoder.reset()
3887            input = encoder.encode(input)
3888        self.assertEqual(decoder.decode(input), "abc")
3889        self.assertEqual(decoder.newlines, None)
3890
3891    def test_newline_decoder(self):
3892        encodings = (
3893            # None meaning the IncrementalNewlineDecoder takes unicode input
3894            # rather than bytes input
3895            None, 'utf-8', 'latin-1',
3896            'utf-16', 'utf-16-le', 'utf-16-be',
3897            'utf-32', 'utf-32-le', 'utf-32-be',
3898        )
3899        for enc in encodings:
3900            decoder = enc and codecs.getincrementaldecoder(enc)()
3901            decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3902            self.check_newline_decoding(decoder, enc)
3903        decoder = codecs.getincrementaldecoder("utf-8")()
3904        decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3905        self.check_newline_decoding_utf8(decoder)
3906        self.assertRaises(TypeError, decoder.setstate, 42)
3907
3908    def test_newline_bytes(self):
3909        # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3910        def _check(dec):
3911            self.assertEqual(dec.newlines, None)
3912            self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3913            self.assertEqual(dec.newlines, None)
3914            self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3915            self.assertEqual(dec.newlines, None)
3916        dec = self.IncrementalNewlineDecoder(None, translate=False)
3917        _check(dec)
3918        dec = self.IncrementalNewlineDecoder(None, translate=True)
3919        _check(dec)
3920
3921    def test_translate(self):
3922        # issue 35062
3923        for translate in (-2, -1, 1, 2):
3924            decoder = codecs.getincrementaldecoder("utf-8")()
3925            decoder = self.IncrementalNewlineDecoder(decoder, translate)
3926            self.check_newline_decoding_utf8(decoder)
3927        decoder = codecs.getincrementaldecoder("utf-8")()
3928        decoder = self.IncrementalNewlineDecoder(decoder, translate=0)
3929        self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n")
3930
3931class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3932    pass
3933
3934class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3935    pass
3936
3937
3938# XXX Tests for open()
3939
3940class MiscIOTest(unittest.TestCase):
3941
3942    def tearDown(self):
3943        os_helper.unlink(os_helper.TESTFN)
3944
3945    def test___all__(self):
3946        for name in self.io.__all__:
3947            obj = getattr(self.io, name, None)
3948            self.assertIsNotNone(obj, name)
3949            if name in ("open", "open_code"):
3950                continue
3951            elif "error" in name.lower() or name == "UnsupportedOperation":
3952                self.assertTrue(issubclass(obj, Exception), name)
3953            elif not name.startswith("SEEK_"):
3954                self.assertTrue(issubclass(obj, self.IOBase))
3955
3956    def test_attributes(self):
3957        f = self.open(os_helper.TESTFN, "wb", buffering=0)
3958        self.assertEqual(f.mode, "wb")
3959        f.close()
3960
3961        f = self.open(os_helper.TESTFN, "w+", encoding="utf-8")
3962        self.assertEqual(f.mode,            "w+")
3963        self.assertEqual(f.buffer.mode,     "rb+") # Does it really matter?
3964        self.assertEqual(f.buffer.raw.mode, "rb+")
3965
3966        g = self.open(f.fileno(), "wb", closefd=False)
3967        self.assertEqual(g.mode,     "wb")
3968        self.assertEqual(g.raw.mode, "wb")
3969        self.assertEqual(g.name,     f.fileno())
3970        self.assertEqual(g.raw.name, f.fileno())
3971        f.close()
3972        g.close()
3973
3974    def test_removed_u_mode(self):
3975        # bpo-37330: The "U" mode has been removed in Python 3.11
3976        for mode in ("U", "rU", "r+U"):
3977            with self.assertRaises(ValueError) as cm:
3978                self.open(os_helper.TESTFN, mode)
3979            self.assertIn('invalid mode', str(cm.exception))
3980
3981    def test_open_pipe_with_append(self):
3982        # bpo-27805: Ignore ESPIPE from lseek() in open().
3983        r, w = os.pipe()
3984        self.addCleanup(os.close, r)
3985        f = self.open(w, 'a', encoding="utf-8")
3986        self.addCleanup(f.close)
3987        # Check that the file is marked non-seekable. On Windows, however, lseek
3988        # somehow succeeds on pipes.
3989        if sys.platform != 'win32':
3990            self.assertFalse(f.seekable())
3991
3992    def test_io_after_close(self):
3993        for kwargs in [
3994                {"mode": "w"},
3995                {"mode": "wb"},
3996                {"mode": "w", "buffering": 1},
3997                {"mode": "w", "buffering": 2},
3998                {"mode": "wb", "buffering": 0},
3999                {"mode": "r"},
4000                {"mode": "rb"},
4001                {"mode": "r", "buffering": 1},
4002                {"mode": "r", "buffering": 2},
4003                {"mode": "rb", "buffering": 0},
4004                {"mode": "w+"},
4005                {"mode": "w+b"},
4006                {"mode": "w+", "buffering": 1},
4007                {"mode": "w+", "buffering": 2},
4008                {"mode": "w+b", "buffering": 0},
4009            ]:
4010            if "b" not in kwargs["mode"]:
4011                kwargs["encoding"] = "utf-8"
4012            f = self.open(os_helper.TESTFN, **kwargs)
4013            f.close()
4014            self.assertRaises(ValueError, f.flush)
4015            self.assertRaises(ValueError, f.fileno)
4016            self.assertRaises(ValueError, f.isatty)
4017            self.assertRaises(ValueError, f.__iter__)
4018            if hasattr(f, "peek"):
4019                self.assertRaises(ValueError, f.peek, 1)
4020            self.assertRaises(ValueError, f.read)
4021            if hasattr(f, "read1"):
4022                self.assertRaises(ValueError, f.read1, 1024)
4023                self.assertRaises(ValueError, f.read1)
4024            if hasattr(f, "readall"):
4025                self.assertRaises(ValueError, f.readall)
4026            if hasattr(f, "readinto"):
4027                self.assertRaises(ValueError, f.readinto, bytearray(1024))
4028            if hasattr(f, "readinto1"):
4029                self.assertRaises(ValueError, f.readinto1, bytearray(1024))
4030            self.assertRaises(ValueError, f.readline)
4031            self.assertRaises(ValueError, f.readlines)
4032            self.assertRaises(ValueError, f.readlines, 1)
4033            self.assertRaises(ValueError, f.seek, 0)
4034            self.assertRaises(ValueError, f.tell)
4035            self.assertRaises(ValueError, f.truncate)
4036            self.assertRaises(ValueError, f.write,
4037                              b"" if "b" in kwargs['mode'] else "")
4038            self.assertRaises(ValueError, f.writelines, [])
4039            self.assertRaises(ValueError, next, f)
4040
4041    def test_blockingioerror(self):
4042        # Various BlockingIOError issues
4043        class C(str):
4044            pass
4045        c = C("")
4046        b = self.BlockingIOError(1, c)
4047        c.b = b
4048        b.c = c
4049        wr = weakref.ref(c)
4050        del c, b
4051        support.gc_collect()
4052        self.assertIsNone(wr(), wr)
4053
4054    def test_abcs(self):
4055        # Test the visible base classes are ABCs.
4056        self.assertIsInstance(self.IOBase, abc.ABCMeta)
4057        self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
4058        self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
4059        self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
4060
4061    def _check_abc_inheritance(self, abcmodule):
4062        with self.open(os_helper.TESTFN, "wb", buffering=0) as f:
4063            self.assertIsInstance(f, abcmodule.IOBase)
4064            self.assertIsInstance(f, abcmodule.RawIOBase)
4065            self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
4066            self.assertNotIsInstance(f, abcmodule.TextIOBase)
4067        with self.open(os_helper.TESTFN, "wb") as f:
4068            self.assertIsInstance(f, abcmodule.IOBase)
4069            self.assertNotIsInstance(f, abcmodule.RawIOBase)
4070            self.assertIsInstance(f, abcmodule.BufferedIOBase)
4071            self.assertNotIsInstance(f, abcmodule.TextIOBase)
4072        with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
4073            self.assertIsInstance(f, abcmodule.IOBase)
4074            self.assertNotIsInstance(f, abcmodule.RawIOBase)
4075            self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
4076            self.assertIsInstance(f, abcmodule.TextIOBase)
4077
4078    def test_abc_inheritance(self):
4079        # Test implementations inherit from their respective ABCs
4080        self._check_abc_inheritance(self)
4081
4082    def test_abc_inheritance_official(self):
4083        # Test implementations inherit from the official ABCs of the
4084        # baseline "io" module.
4085        self._check_abc_inheritance(io)
4086
4087    def _check_warn_on_dealloc(self, *args, **kwargs):
4088        f = open(*args, **kwargs)
4089        r = repr(f)
4090        with self.assertWarns(ResourceWarning) as cm:
4091            f = None
4092            support.gc_collect()
4093        self.assertIn(r, str(cm.warning.args[0]))
4094
4095    def test_warn_on_dealloc(self):
4096        self._check_warn_on_dealloc(os_helper.TESTFN, "wb", buffering=0)
4097        self._check_warn_on_dealloc(os_helper.TESTFN, "wb")
4098        self._check_warn_on_dealloc(os_helper.TESTFN, "w", encoding="utf-8")
4099
4100    def _check_warn_on_dealloc_fd(self, *args, **kwargs):
4101        fds = []
4102        def cleanup_fds():
4103            for fd in fds:
4104                try:
4105                    os.close(fd)
4106                except OSError as e:
4107                    if e.errno != errno.EBADF:
4108                        raise
4109        self.addCleanup(cleanup_fds)
4110        r, w = os.pipe()
4111        fds += r, w
4112        self._check_warn_on_dealloc(r, *args, **kwargs)
4113        # When using closefd=False, there's no warning
4114        r, w = os.pipe()
4115        fds += r, w
4116        with warnings_helper.check_no_resource_warning(self):
4117            open(r, *args, closefd=False, **kwargs)
4118
4119    def test_warn_on_dealloc_fd(self):
4120        self._check_warn_on_dealloc_fd("rb", buffering=0)
4121        self._check_warn_on_dealloc_fd("rb")
4122        self._check_warn_on_dealloc_fd("r", encoding="utf-8")
4123
4124
4125    def test_pickling(self):
4126        # Pickling file objects is forbidden
4127        for kwargs in [
4128                {"mode": "w"},
4129                {"mode": "wb"},
4130                {"mode": "wb", "buffering": 0},
4131                {"mode": "r"},
4132                {"mode": "rb"},
4133                {"mode": "rb", "buffering": 0},
4134                {"mode": "w+"},
4135                {"mode": "w+b"},
4136                {"mode": "w+b", "buffering": 0},
4137            ]:
4138            if "b" not in kwargs["mode"]:
4139                kwargs["encoding"] = "utf-8"
4140            for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
4141                with self.open(os_helper.TESTFN, **kwargs) as f:
4142                    self.assertRaises(TypeError, pickle.dumps, f, protocol)
4143
4144    def test_nonblock_pipe_write_bigbuf(self):
4145        self._test_nonblock_pipe_write(16*1024)
4146
4147    def test_nonblock_pipe_write_smallbuf(self):
4148        self._test_nonblock_pipe_write(1024)
4149
4150    @unittest.skipUnless(hasattr(os, 'set_blocking'),
4151                         'os.set_blocking() required for this test')
4152    def _test_nonblock_pipe_write(self, bufsize):
4153        sent = []
4154        received = []
4155        r, w = os.pipe()
4156        os.set_blocking(r, False)
4157        os.set_blocking(w, False)
4158
4159        # To exercise all code paths in the C implementation we need
4160        # to play with buffer sizes.  For instance, if we choose a
4161        # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
4162        # then we will never get a partial write of the buffer.
4163        rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
4164        wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
4165
4166        with rf, wf:
4167            for N in 9999, 73, 7574:
4168                try:
4169                    i = 0
4170                    while True:
4171                        msg = bytes([i % 26 + 97]) * N
4172                        sent.append(msg)
4173                        wf.write(msg)
4174                        i += 1
4175
4176                except self.BlockingIOError as e:
4177                    self.assertEqual(e.args[0], errno.EAGAIN)
4178                    self.assertEqual(e.args[2], e.characters_written)
4179                    sent[-1] = sent[-1][:e.characters_written]
4180                    received.append(rf.read())
4181                    msg = b'BLOCKED'
4182                    wf.write(msg)
4183                    sent.append(msg)
4184
4185            while True:
4186                try:
4187                    wf.flush()
4188                    break
4189                except self.BlockingIOError as e:
4190                    self.assertEqual(e.args[0], errno.EAGAIN)
4191                    self.assertEqual(e.args[2], e.characters_written)
4192                    self.assertEqual(e.characters_written, 0)
4193                    received.append(rf.read())
4194
4195            received += iter(rf.read, None)
4196
4197        sent, received = b''.join(sent), b''.join(received)
4198        self.assertEqual(sent, received)
4199        self.assertTrue(wf.closed)
4200        self.assertTrue(rf.closed)
4201
4202    def test_create_fail(self):
4203        # 'x' mode fails if file is existing
4204        with self.open(os_helper.TESTFN, 'w', encoding="utf-8"):
4205            pass
4206        self.assertRaises(FileExistsError, self.open, os_helper.TESTFN, 'x', encoding="utf-8")
4207
4208    def test_create_writes(self):
4209        # 'x' mode opens for writing
4210        with self.open(os_helper.TESTFN, 'xb') as f:
4211            f.write(b"spam")
4212        with self.open(os_helper.TESTFN, 'rb') as f:
4213            self.assertEqual(b"spam", f.read())
4214
4215    def test_open_allargs(self):
4216        # there used to be a buffer overflow in the parser for rawmode
4217        self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'rwax+', encoding="utf-8")
4218
4219    def test_check_encoding_errors(self):
4220        # bpo-37388: open() and TextIOWrapper must check encoding and errors
4221        # arguments in dev mode
4222        mod = self.io.__name__
4223        filename = __file__
4224        invalid = 'Boom, Shaka Laka, Boom!'
4225        code = textwrap.dedent(f'''
4226            import sys
4227            from {mod} import open, TextIOWrapper
4228
4229            try:
4230                open({filename!r}, encoding={invalid!r})
4231            except LookupError:
4232                pass
4233            else:
4234                sys.exit(21)
4235
4236            try:
4237                open({filename!r}, errors={invalid!r})
4238            except LookupError:
4239                pass
4240            else:
4241                sys.exit(22)
4242
4243            fp = open({filename!r}, "rb")
4244            with fp:
4245                try:
4246                    TextIOWrapper(fp, encoding={invalid!r})
4247                except LookupError:
4248                    pass
4249                else:
4250                    sys.exit(23)
4251
4252                try:
4253                    TextIOWrapper(fp, errors={invalid!r})
4254                except LookupError:
4255                    pass
4256                else:
4257                    sys.exit(24)
4258
4259            sys.exit(10)
4260        ''')
4261        proc = assert_python_failure('-X', 'dev', '-c', code)
4262        self.assertEqual(proc.rc, 10, proc)
4263
4264    def test_check_encoding_warning(self):
4265        # PEP 597: Raise warning when encoding is not specified
4266        # and sys.flags.warn_default_encoding is set.
4267        mod = self.io.__name__
4268        filename = __file__
4269        code = textwrap.dedent(f'''\
4270            import sys
4271            from {mod} import open, TextIOWrapper
4272            import pathlib
4273
4274            with open({filename!r}) as f:           # line 5
4275                pass
4276
4277            pathlib.Path({filename!r}).read_text()  # line 8
4278        ''')
4279        proc = assert_python_ok('-X', 'warn_default_encoding', '-c', code)
4280        warnings = proc.err.splitlines()
4281        self.assertEqual(len(warnings), 2)
4282        self.assertTrue(
4283            warnings[0].startswith(b"<string>:5: EncodingWarning: "))
4284        self.assertTrue(
4285            warnings[1].startswith(b"<string>:8: EncodingWarning: "))
4286
4287    @support.cpython_only
4288    # Depending if OpenWrapper was already created or not, the warning is
4289    # emitted or not. For example, the attribute is already created when this
4290    # test is run multiple times.
4291    @warnings_helper.ignore_warnings(category=DeprecationWarning)
4292    def test_openwrapper(self):
4293        self.assertIs(self.io.OpenWrapper, self.io.open)
4294
4295
4296class CMiscIOTest(MiscIOTest):
4297    io = io
4298
4299    def test_readinto_buffer_overflow(self):
4300        # Issue #18025
4301        class BadReader(self.io.BufferedIOBase):
4302            def read(self, n=-1):
4303                return b'x' * 10**6
4304        bufio = BadReader()
4305        b = bytearray(2)
4306        self.assertRaises(ValueError, bufio.readinto, b)
4307
4308    def check_daemon_threads_shutdown_deadlock(self, stream_name):
4309        # Issue #23309: deadlocks at shutdown should be avoided when a
4310        # daemon thread and the main thread both write to a file.
4311        code = """if 1:
4312            import sys
4313            import time
4314            import threading
4315            from test.support import SuppressCrashReport
4316
4317            file = sys.{stream_name}
4318
4319            def run():
4320                while True:
4321                    file.write('.')
4322                    file.flush()
4323
4324            crash = SuppressCrashReport()
4325            crash.__enter__()
4326            # don't call __exit__(): the crash occurs at Python shutdown
4327
4328            thread = threading.Thread(target=run)
4329            thread.daemon = True
4330            thread.start()
4331
4332            time.sleep(0.5)
4333            file.write('!')
4334            file.flush()
4335            """.format_map(locals())
4336        res, _ = run_python_until_end("-c", code)
4337        err = res.err.decode()
4338        if res.rc != 0:
4339            # Failure: should be a fatal error
4340            pattern = (r"Fatal Python error: _enter_buffered_busy: "
4341                       r"could not acquire lock "
4342                       r"for <(_io\.)?BufferedWriter name='<{stream_name}>'> "
4343                       r"at interpreter shutdown, possibly due to "
4344                       r"daemon threads".format_map(locals()))
4345            self.assertRegex(err, pattern)
4346        else:
4347            self.assertFalse(err.strip('.!'))
4348
4349    def test_daemon_threads_shutdown_stdout_deadlock(self):
4350        self.check_daemon_threads_shutdown_deadlock('stdout')
4351
4352    def test_daemon_threads_shutdown_stderr_deadlock(self):
4353        self.check_daemon_threads_shutdown_deadlock('stderr')
4354
4355
4356class PyMiscIOTest(MiscIOTest):
4357    io = pyio
4358
4359
4360@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
4361class SignalsTest(unittest.TestCase):
4362
4363    def setUp(self):
4364        self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
4365
4366    def tearDown(self):
4367        signal.signal(signal.SIGALRM, self.oldalrm)
4368
4369    def alarm_interrupt(self, sig, frame):
4370        1/0
4371
4372    def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
4373        """Check that a partial write, when it gets interrupted, properly
4374        invokes the signal handler, and bubbles up the exception raised
4375        in the latter."""
4376
4377        # XXX This test has three flaws that appear when objects are
4378        # XXX not reference counted.
4379
4380        # - if wio.write() happens to trigger a garbage collection,
4381        #   the signal exception may be raised when some __del__
4382        #   method is running; it will not reach the assertRaises()
4383        #   call.
4384
4385        # - more subtle, if the wio object is not destroyed at once
4386        #   and survives this function, the next opened file is likely
4387        #   to have the same fileno (since the file descriptor was
4388        #   actively closed).  When wio.__del__ is finally called, it
4389        #   will close the other's test file...  To trigger this with
4390        #   CPython, try adding "global wio" in this function.
4391
4392        # - This happens only for streams created by the _pyio module,
4393        #   because a wio.close() that fails still consider that the
4394        #   file needs to be closed again.  You can try adding an
4395        #   "assert wio.closed" at the end of the function.
4396
4397        # Fortunately, a little gc.collect() seems to be enough to
4398        # work around all these issues.
4399        support.gc_collect()  # For PyPy or other GCs.
4400
4401        read_results = []
4402        def _read():
4403            s = os.read(r, 1)
4404            read_results.append(s)
4405
4406        t = threading.Thread(target=_read)
4407        t.daemon = True
4408        r, w = os.pipe()
4409        fdopen_kwargs["closefd"] = False
4410        large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
4411        try:
4412            wio = self.io.open(w, **fdopen_kwargs)
4413            if hasattr(signal, 'pthread_sigmask'):
4414                # create the thread with SIGALRM signal blocked
4415                signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
4416                t.start()
4417                signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGALRM])
4418            else:
4419                t.start()
4420
4421            # Fill the pipe enough that the write will be blocking.
4422            # It will be interrupted by the timer armed above.  Since the
4423            # other thread has read one byte, the low-level write will
4424            # return with a successful (partial) result rather than an EINTR.
4425            # The buffered IO layer must check for pending signal
4426            # handlers, which in this case will invoke alarm_interrupt().
4427            signal.alarm(1)
4428            try:
4429                self.assertRaises(ZeroDivisionError, wio.write, large_data)
4430            finally:
4431                signal.alarm(0)
4432                t.join()
4433            # We got one byte, get another one and check that it isn't a
4434            # repeat of the first one.
4435            read_results.append(os.read(r, 1))
4436            self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
4437        finally:
4438            os.close(w)
4439            os.close(r)
4440            # This is deliberate. If we didn't close the file descriptor
4441            # before closing wio, wio would try to flush its internal
4442            # buffer, and block again.
4443            try:
4444                wio.close()
4445            except OSError as e:
4446                if e.errno != errno.EBADF:
4447                    raise
4448
4449    def test_interrupted_write_unbuffered(self):
4450        self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
4451
4452    def test_interrupted_write_buffered(self):
4453        self.check_interrupted_write(b"xy", b"xy", mode="wb")
4454
4455    def test_interrupted_write_text(self):
4456        self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
4457
4458    @support.no_tracing
4459    def check_reentrant_write(self, data, **fdopen_kwargs):
4460        def on_alarm(*args):
4461            # Will be called reentrantly from the same thread
4462            wio.write(data)
4463            1/0
4464        signal.signal(signal.SIGALRM, on_alarm)
4465        r, w = os.pipe()
4466        wio = self.io.open(w, **fdopen_kwargs)
4467        try:
4468            signal.alarm(1)
4469            # Either the reentrant call to wio.write() fails with RuntimeError,
4470            # or the signal handler raises ZeroDivisionError.
4471            with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
4472                while 1:
4473                    for i in range(100):
4474                        wio.write(data)
4475                        wio.flush()
4476                    # Make sure the buffer doesn't fill up and block further writes
4477                    os.read(r, len(data) * 100)
4478            exc = cm.exception
4479            if isinstance(exc, RuntimeError):
4480                self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
4481        finally:
4482            signal.alarm(0)
4483            wio.close()
4484            os.close(r)
4485
4486    def test_reentrant_write_buffered(self):
4487        self.check_reentrant_write(b"xy", mode="wb")
4488
4489    def test_reentrant_write_text(self):
4490        self.check_reentrant_write("xy", mode="w", encoding="ascii")
4491
4492    def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
4493        """Check that a buffered read, when it gets interrupted (either
4494        returning a partial result or EINTR), properly invokes the signal
4495        handler and retries if the latter returned successfully."""
4496        r, w = os.pipe()
4497        fdopen_kwargs["closefd"] = False
4498        def alarm_handler(sig, frame):
4499            os.write(w, b"bar")
4500        signal.signal(signal.SIGALRM, alarm_handler)
4501        try:
4502            rio = self.io.open(r, **fdopen_kwargs)
4503            os.write(w, b"foo")
4504            signal.alarm(1)
4505            # Expected behaviour:
4506            # - first raw read() returns partial b"foo"
4507            # - second raw read() returns EINTR
4508            # - third raw read() returns b"bar"
4509            self.assertEqual(decode(rio.read(6)), "foobar")
4510        finally:
4511            signal.alarm(0)
4512            rio.close()
4513            os.close(w)
4514            os.close(r)
4515
4516    def test_interrupted_read_retry_buffered(self):
4517        self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
4518                                          mode="rb")
4519
4520    def test_interrupted_read_retry_text(self):
4521        self.check_interrupted_read_retry(lambda x: x,
4522                                          mode="r", encoding="latin1")
4523
4524    def check_interrupted_write_retry(self, item, **fdopen_kwargs):
4525        """Check that a buffered write, when it gets interrupted (either
4526        returning a partial result or EINTR), properly invokes the signal
4527        handler and retries if the latter returned successfully."""
4528        select = import_helper.import_module("select")
4529
4530        # A quantity that exceeds the buffer size of an anonymous pipe's
4531        # write end.
4532        N = support.PIPE_MAX_SIZE
4533        r, w = os.pipe()
4534        fdopen_kwargs["closefd"] = False
4535
4536        # We need a separate thread to read from the pipe and allow the
4537        # write() to finish.  This thread is started after the SIGALRM is
4538        # received (forcing a first EINTR in write()).
4539        read_results = []
4540        write_finished = False
4541        error = None
4542        def _read():
4543            try:
4544                while not write_finished:
4545                    while r in select.select([r], [], [], 1.0)[0]:
4546                        s = os.read(r, 1024)
4547                        read_results.append(s)
4548            except BaseException as exc:
4549                nonlocal error
4550                error = exc
4551        t = threading.Thread(target=_read)
4552        t.daemon = True
4553        def alarm1(sig, frame):
4554            signal.signal(signal.SIGALRM, alarm2)
4555            signal.alarm(1)
4556        def alarm2(sig, frame):
4557            t.start()
4558
4559        large_data = item * N
4560        signal.signal(signal.SIGALRM, alarm1)
4561        try:
4562            wio = self.io.open(w, **fdopen_kwargs)
4563            signal.alarm(1)
4564            # Expected behaviour:
4565            # - first raw write() is partial (because of the limited pipe buffer
4566            #   and the first alarm)
4567            # - second raw write() returns EINTR (because of the second alarm)
4568            # - subsequent write()s are successful (either partial or complete)
4569            written = wio.write(large_data)
4570            self.assertEqual(N, written)
4571
4572            wio.flush()
4573            write_finished = True
4574            t.join()
4575
4576            self.assertIsNone(error)
4577            self.assertEqual(N, sum(len(x) for x in read_results))
4578        finally:
4579            signal.alarm(0)
4580            write_finished = True
4581            os.close(w)
4582            os.close(r)
4583            # This is deliberate. If we didn't close the file descriptor
4584            # before closing wio, wio would try to flush its internal
4585            # buffer, and could block (in case of failure).
4586            try:
4587                wio.close()
4588            except OSError as e:
4589                if e.errno != errno.EBADF:
4590                    raise
4591
4592    def test_interrupted_write_retry_buffered(self):
4593        self.check_interrupted_write_retry(b"x", mode="wb")
4594
4595    def test_interrupted_write_retry_text(self):
4596        self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
4597
4598
4599class CSignalsTest(SignalsTest):
4600    io = io
4601
4602class PySignalsTest(SignalsTest):
4603    io = pyio
4604
4605    # Handling reentrancy issues would slow down _pyio even more, so the
4606    # tests are disabled.
4607    test_reentrant_write_buffered = None
4608    test_reentrant_write_text = None
4609
4610
4611def load_tests(loader, tests, pattern):
4612    tests = (CIOTest, PyIOTest, APIMismatchTest,
4613             CBufferedReaderTest, PyBufferedReaderTest,
4614             CBufferedWriterTest, PyBufferedWriterTest,
4615             CBufferedRWPairTest, PyBufferedRWPairTest,
4616             CBufferedRandomTest, PyBufferedRandomTest,
4617             StatefulIncrementalDecoderTest,
4618             CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
4619             CTextIOWrapperTest, PyTextIOWrapperTest,
4620             CMiscIOTest, PyMiscIOTest,
4621             CSignalsTest, PySignalsTest,
4622             )
4623
4624    # Put the namespaces of the IO module we are testing and some useful mock
4625    # classes in the __dict__ of each test.
4626    mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
4627             MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead,
4628             SlowFlushRawIO)
4629    all_members = io.__all__ + ["IncrementalNewlineDecoder"]
4630    c_io_ns = {name : getattr(io, name) for name in all_members}
4631    py_io_ns = {name : getattr(pyio, name) for name in all_members}
4632    globs = globals()
4633    c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4634    py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
4635    for test in tests:
4636        if test.__name__.startswith("C"):
4637            for name, obj in c_io_ns.items():
4638                setattr(test, name, obj)
4639        elif test.__name__.startswith("Py"):
4640            for name, obj in py_io_ns.items():
4641                setattr(test, name, obj)
4642
4643    suite = loader.suiteClass()
4644    for test in tests:
4645        suite.addTest(loader.loadTestsFromTestCase(test))
4646    return suite
4647
4648if __name__ == "__main__":
4649    unittest.main()
4650