1"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11#     (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as an attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
22from __future__ import print_function
23from __future__ import unicode_literals
24
25import os
26import sys
27import time
28import array
29import random
30import unittest
31import weakref
32import warnings
33import abc
34import signal
35import errno
36from itertools import cycle, count
37from collections import deque
38from UserList import UserList
39from test import test_support as support
40import contextlib
41
42import codecs
43import io  # C implementation of io
44import _pyio as pyio # Python implementation of io
45try:
46    import threading
47except ImportError:
48    threading = None
49try:
50    import fcntl
51except ImportError:
52    fcntl = None
53
54__metaclass__ = type
55bytes = support.py3k_bytes
56
57def byteslike(*pos, **kw):
58    return memoryview(bytearray(*pos, **kw))
59
60def _default_chunk_size():
61    """Get the default TextIOWrapper chunk size"""
62    with io.open(__file__, "r", encoding="latin1") as f:
63        return f._CHUNK_SIZE
64
65
66class MockRawIOWithoutRead:
67    """A RawIO implementation without read(), so as to exercise the default
68    RawIO.read() which calls readinto()."""
69
70    def __init__(self, read_stack=()):
71        self._read_stack = list(read_stack)
72        self._write_stack = []
73        self._reads = 0
74        self._extraneous_reads = 0
75
76    def write(self, b):
77        self._write_stack.append(bytes(b))
78        return len(b)
79
80    def writable(self):
81        return True
82
83    def fileno(self):
84        return 42
85
86    def readable(self):
87        return True
88
89    def seekable(self):
90        return True
91
92    def seek(self, pos, whence):
93        return 0   # wrong but we gotta return something
94
95    def tell(self):
96        return 0   # same comment as above
97
98    def readinto(self, buf):
99        self._reads += 1
100        max_len = len(buf)
101        try:
102            data = self._read_stack[0]
103        except IndexError:
104            self._extraneous_reads += 1
105            return 0
106        if data is None:
107            del self._read_stack[0]
108            return None
109        n = len(data)
110        if len(data) <= max_len:
111            del self._read_stack[0]
112            buf[:n] = data
113            return n
114        else:
115            buf[:] = data[:max_len]
116            self._read_stack[0] = data[max_len:]
117            return max_len
118
119    def truncate(self, pos=None):
120        return pos
121
122class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
123    pass
124
125class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
126    pass
127
128
129class MockRawIO(MockRawIOWithoutRead):
130
131    def read(self, n=None):
132        self._reads += 1
133        try:
134            return self._read_stack.pop(0)
135        except:
136            self._extraneous_reads += 1
137            return b""
138
139class CMockRawIO(MockRawIO, io.RawIOBase):
140    pass
141
142class PyMockRawIO(MockRawIO, pyio.RawIOBase):
143    pass
144
145
146class MisbehavedRawIO(MockRawIO):
147    def write(self, b):
148        return MockRawIO.write(self, b) * 2
149
150    def read(self, n=None):
151        return MockRawIO.read(self, n) * 2
152
153    def seek(self, pos, whence):
154        return -123
155
156    def tell(self):
157        return -456
158
159    def readinto(self, buf):
160        MockRawIO.readinto(self, buf)
161        return len(buf) * 5
162
163class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
164    pass
165
166class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
167    pass
168
169
170class CloseFailureIO(MockRawIO):
171    closed = 0
172
173    def close(self):
174        if not self.closed:
175            self.closed = 1
176            raise IOError
177
178class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
179    pass
180
181class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
182    pass
183
184
185class MockFileIO:
186
187    def __init__(self, data):
188        self.read_history = []
189        super(MockFileIO, self).__init__(data)
190
191    def read(self, n=None):
192        res = super(MockFileIO, self).read(n)
193        self.read_history.append(None if res is None else len(res))
194        return res
195
196    def readinto(self, b):
197        res = super(MockFileIO, self).readinto(b)
198        self.read_history.append(res)
199        return res
200
201class CMockFileIO(MockFileIO, io.BytesIO):
202    pass
203
204class PyMockFileIO(MockFileIO, pyio.BytesIO):
205    pass
206
207
208class MockNonBlockWriterIO:
209
210    def __init__(self):
211        self._write_stack = []
212        self._blocker_char = None
213
214    def pop_written(self):
215        s = b"".join(self._write_stack)
216        self._write_stack[:] = []
217        return s
218
219    def block_on(self, char):
220        """Block when a given char is encountered."""
221        self._blocker_char = char
222
223    def readable(self):
224        return True
225
226    def seekable(self):
227        return True
228
229    def writable(self):
230        return True
231
232    def write(self, b):
233        b = bytes(b)
234        n = -1
235        if self._blocker_char:
236            try:
237                n = b.index(self._blocker_char)
238            except ValueError:
239                pass
240            else:
241                if n > 0:
242                    # write data up to the first blocker
243                    self._write_stack.append(b[:n])
244                    return n
245                else:
246                    # cancel blocker and indicate would block
247                    self._blocker_char = None
248                    return None
249        self._write_stack.append(b)
250        return len(b)
251
252class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
253    BlockingIOError = io.BlockingIOError
254
255class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
256    BlockingIOError = pyio.BlockingIOError
257
258
259class IOTest(unittest.TestCase):
260
261    def setUp(self):
262        support.unlink(support.TESTFN)
263
264    def tearDown(self):
265        support.unlink(support.TESTFN)
266
267    def write_ops(self, f):
268        self.assertEqual(f.write(b"blah."), 5)
269        f.truncate(0)
270        self.assertEqual(f.tell(), 5)
271        f.seek(0)
272
273        self.assertEqual(f.write(b"blah."), 5)
274        self.assertEqual(f.seek(0), 0)
275        self.assertEqual(f.write(b"Hello."), 6)
276        self.assertEqual(f.tell(), 6)
277        self.assertEqual(f.seek(-1, 1), 5)
278        self.assertEqual(f.tell(), 5)
279        buffer = bytearray(b" world\n\n\n")
280        self.assertEqual(f.write(buffer), 9)
281        buffer[:] = b"*" * 9  # Overwrite our copy of the data
282        self.assertEqual(f.seek(0), 0)
283        self.assertEqual(f.write(b"h"), 1)
284        self.assertEqual(f.seek(-1, 2), 13)
285        self.assertEqual(f.tell(), 13)
286
287        self.assertEqual(f.truncate(12), 12)
288        self.assertEqual(f.tell(), 13)
289        self.assertRaises(TypeError, f.seek, 0.0)
290
291    def read_ops(self, f, buffered=False):
292        data = f.read(5)
293        self.assertEqual(data, b"hello")
294        data = byteslike(data)
295        self.assertEqual(f.readinto(data), 5)
296        self.assertEqual(data.tobytes(), b" worl")
297        data = bytearray(5)
298        self.assertEqual(f.readinto(data), 2)
299        self.assertEqual(len(data), 5)
300        self.assertEqual(data[:2], b"d\n")
301        self.assertEqual(f.seek(0), 0)
302        self.assertEqual(f.read(20), b"hello world\n")
303        self.assertEqual(f.read(1), b"")
304        self.assertEqual(f.readinto(byteslike(b"x")), 0)
305        self.assertEqual(f.seek(-6, 2), 6)
306        self.assertEqual(f.read(5), b"world")
307        self.assertEqual(f.read(0), b"")
308        self.assertEqual(f.readinto(byteslike()), 0)
309        self.assertEqual(f.seek(-6, 1), 5)
310        self.assertEqual(f.read(5), b" worl")
311        self.assertEqual(f.tell(), 10)
312        self.assertRaises(TypeError, f.seek, 0.0)
313        if buffered:
314            f.seek(0)
315            self.assertEqual(f.read(), b"hello world\n")
316            f.seek(6)
317            self.assertEqual(f.read(), b"world\n")
318            self.assertEqual(f.read(), b"")
319
320    LARGE = 2**31
321
322    def large_file_ops(self, f):
323        assert f.readable()
324        assert f.writable()
325        self.assertEqual(f.seek(self.LARGE), self.LARGE)
326        self.assertEqual(f.tell(), self.LARGE)
327        self.assertEqual(f.write(b"xxx"), 3)
328        self.assertEqual(f.tell(), self.LARGE + 3)
329        self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
330        self.assertEqual(f.truncate(), self.LARGE + 2)
331        self.assertEqual(f.tell(), self.LARGE + 2)
332        self.assertEqual(f.seek(0, 2), self.LARGE + 2)
333        self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
334        self.assertEqual(f.tell(), self.LARGE + 2)
335        self.assertEqual(f.seek(0, 2), self.LARGE + 1)
336        self.assertEqual(f.seek(-1, 2), self.LARGE)
337        self.assertEqual(f.read(2), b"x")
338
339    def test_invalid_operations(self):
340        # Try writing on a file opened in read mode and vice-versa.
341        for mode in ("w", "wb"):
342            with self.open(support.TESTFN, mode) as fp:
343                self.assertRaises(IOError, fp.read)
344                self.assertRaises(IOError, fp.readline)
345        with self.open(support.TESTFN, "rb") as fp:
346            self.assertRaises(IOError, fp.write, b"blah")
347            self.assertRaises(IOError, fp.writelines, [b"blah\n"])
348        with self.open(support.TESTFN, "r") as fp:
349            self.assertRaises(IOError, fp.write, "blah")
350            self.assertRaises(IOError, fp.writelines, ["blah\n"])
351
352    def test_open_handles_NUL_chars(self):
353        fn_with_NUL = 'foo\0bar'
354        self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
355
356        bytes_fn = fn_with_NUL.encode('ascii')
357        with warnings.catch_warnings():
358            warnings.simplefilter("ignore", DeprecationWarning)
359            self.assertRaises(TypeError, self.open, bytes_fn, 'w')
360
361    def test_raw_file_io(self):
362        with self.open(support.TESTFN, "wb", buffering=0) as f:
363            self.assertEqual(f.readable(), False)
364            self.assertEqual(f.writable(), True)
365            self.assertEqual(f.seekable(), True)
366            self.write_ops(f)
367        with self.open(support.TESTFN, "rb", buffering=0) as f:
368            self.assertEqual(f.readable(), True)
369            self.assertEqual(f.writable(), False)
370            self.assertEqual(f.seekable(), True)
371            self.read_ops(f)
372
373    def test_buffered_file_io(self):
374        with self.open(support.TESTFN, "wb") as f:
375            self.assertEqual(f.readable(), False)
376            self.assertEqual(f.writable(), True)
377            self.assertEqual(f.seekable(), True)
378            self.write_ops(f)
379        with self.open(support.TESTFN, "rb") as f:
380            self.assertEqual(f.readable(), True)
381            self.assertEqual(f.writable(), False)
382            self.assertEqual(f.seekable(), True)
383            self.read_ops(f, True)
384
385    def test_readline(self):
386        with self.open(support.TESTFN, "wb") as f:
387            f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
388        with self.open(support.TESTFN, "rb") as f:
389            self.assertEqual(f.readline(), b"abc\n")
390            self.assertEqual(f.readline(10), b"def\n")
391            self.assertEqual(f.readline(2), b"xy")
392            self.assertEqual(f.readline(4), b"zzy\n")
393            self.assertEqual(f.readline(), b"foo\x00bar\n")
394            self.assertEqual(f.readline(None), b"another line")
395            self.assertRaises(TypeError, f.readline, 5.3)
396        with self.open(support.TESTFN, "r") as f:
397            self.assertRaises(TypeError, f.readline, 5.3)
398
399    def test_readline_nonsizeable(self):
400        # Issue #30061
401        # Crash when readline() returns an object without __len__
402        class R(self.IOBase):
403            def readline(self):
404                return None
405        self.assertRaises((TypeError, StopIteration), next, R())
406
407    def test_next_nonsizeable(self):
408        # Issue #30061
409        # Crash when next() returns an object without __len__
410        class R(self.IOBase):
411            def next(self):
412                return None
413        self.assertRaises(TypeError, R().readlines, 1)
414
415    def test_raw_bytes_io(self):
416        f = self.BytesIO()
417        self.write_ops(f)
418        data = f.getvalue()
419        self.assertEqual(data, b"hello world\n")
420        f = self.BytesIO(data)
421        self.read_ops(f, True)
422
423    def test_large_file_ops(self):
424        # On Windows and Mac OSX this test comsumes large resources; It takes
425        # a long time to build the >2GB file and takes >2GB of disk space
426        # therefore the resource must be enabled to run this test.
427        if sys.platform[:3] == 'win' or sys.platform == 'darwin':
428            support.requires(
429                'largefile',
430                'test requires %s bytes and a long time to run' % self.LARGE)
431        with self.open(support.TESTFN, "w+b", 0) as f:
432            self.large_file_ops(f)
433        with self.open(support.TESTFN, "w+b") as f:
434            self.large_file_ops(f)
435
436    def test_with_open(self):
437        for bufsize in (0, 1, 100):
438            f = None
439            with self.open(support.TESTFN, "wb", bufsize) as f:
440                f.write(b"xxx")
441            self.assertEqual(f.closed, True)
442            f = None
443            try:
444                with self.open(support.TESTFN, "wb", bufsize) as f:
445                    1 // 0
446            except ZeroDivisionError:
447                self.assertEqual(f.closed, True)
448            else:
449                self.fail("1 // 0 didn't raise an exception")
450
451    # issue 5008
452    def test_append_mode_tell(self):
453        with self.open(support.TESTFN, "wb") as f:
454            f.write(b"xxx")
455        with self.open(support.TESTFN, "ab", buffering=0) as f:
456            self.assertEqual(f.tell(), 3)
457        with self.open(support.TESTFN, "ab") as f:
458            self.assertEqual(f.tell(), 3)
459        with self.open(support.TESTFN, "a") as f:
460            self.assertGreater(f.tell(), 0)
461
462    def test_destructor(self):
463        record = []
464        class MyFileIO(self.FileIO):
465            def __del__(self):
466                record.append(1)
467                try:
468                    f = super(MyFileIO, self).__del__
469                except AttributeError:
470                    pass
471                else:
472                    f()
473            def close(self):
474                record.append(2)
475                super(MyFileIO, self).close()
476            def flush(self):
477                record.append(3)
478                super(MyFileIO, self).flush()
479        f = MyFileIO(support.TESTFN, "wb")
480        f.write(b"xxx")
481        del f
482        support.gc_collect()
483        self.assertEqual(record, [1, 2, 3])
484        with self.open(support.TESTFN, "rb") as f:
485            self.assertEqual(f.read(), b"xxx")
486
487    def _check_base_destructor(self, base):
488        record = []
489        class MyIO(base):
490            def __init__(self):
491                # This exercises the availability of attributes on object
492                # destruction.
493                # (in the C version, close() is called by the tp_dealloc
494                # function, not by __del__)
495                self.on_del = 1
496                self.on_close = 2
497                self.on_flush = 3
498            def __del__(self):
499                record.append(self.on_del)
500                try:
501                    f = super(MyIO, self).__del__
502                except AttributeError:
503                    pass
504                else:
505                    f()
506            def close(self):
507                record.append(self.on_close)
508                super(MyIO, self).close()
509            def flush(self):
510                record.append(self.on_flush)
511                super(MyIO, self).flush()
512        f = MyIO()
513        del f
514        support.gc_collect()
515        self.assertEqual(record, [1, 2, 3])
516
517    def test_IOBase_destructor(self):
518        self._check_base_destructor(self.IOBase)
519
520    def test_RawIOBase_destructor(self):
521        self._check_base_destructor(self.RawIOBase)
522
523    def test_BufferedIOBase_destructor(self):
524        self._check_base_destructor(self.BufferedIOBase)
525
526    def test_TextIOBase_destructor(self):
527        self._check_base_destructor(self.TextIOBase)
528
529    def test_close_flushes(self):
530        with self.open(support.TESTFN, "wb") as f:
531            f.write(b"xxx")
532        with self.open(support.TESTFN, "rb") as f:
533            self.assertEqual(f.read(), b"xxx")
534
535    def test_array_writes(self):
536        a = array.array(b'i', range(10))
537        n = len(a.tostring())
538        with self.open(support.TESTFN, "wb", 0) as f:
539            self.assertEqual(f.write(a), n)
540        with self.open(support.TESTFN, "wb") as f:
541            self.assertEqual(f.write(a), n)
542
543    def test_closefd(self):
544        self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
545                          closefd=False)
546
547    def test_read_closed(self):
548        with self.open(support.TESTFN, "w") as f:
549            f.write("egg\n")
550        with self.open(support.TESTFN, "r") as f:
551            file = self.open(f.fileno(), "r", closefd=False)
552            self.assertEqual(file.read(), "egg\n")
553            file.seek(0)
554            file.close()
555            self.assertRaises(ValueError, file.read)
556
557    def test_no_closefd_with_filename(self):
558        # can't use closefd in combination with a file name
559        self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
560
561    def test_closefd_attr(self):
562        with self.open(support.TESTFN, "wb") as f:
563            f.write(b"egg\n")
564        with self.open(support.TESTFN, "r") as f:
565            self.assertEqual(f.buffer.raw.closefd, True)
566            file = self.open(f.fileno(), "r", closefd=False)
567            self.assertEqual(file.buffer.raw.closefd, False)
568
569    def test_garbage_collection(self):
570        # FileIO objects are collected, and collecting them flushes
571        # all data to disk.
572        f = self.FileIO(support.TESTFN, "wb")
573        f.write(b"abcxxx")
574        f.f = f
575        wr = weakref.ref(f)
576        del f
577        support.gc_collect()
578        self.assertIsNone(wr(), wr)
579        with self.open(support.TESTFN, "rb") as f:
580            self.assertEqual(f.read(), b"abcxxx")
581
582    def test_unbounded_file(self):
583        # Issue #1174606: reading from an unbounded stream such as /dev/zero.
584        zero = "/dev/zero"
585        if not os.path.exists(zero):
586            self.skipTest("{0} does not exist".format(zero))
587        if sys.maxsize > 0x7FFFFFFF:
588            self.skipTest("test can only run in a 32-bit address space")
589        if support.real_max_memuse < support._2G:
590            self.skipTest("test requires at least 2GB of memory")
591        with self.open(zero, "rb", buffering=0) as f:
592            self.assertRaises(OverflowError, f.read)
593        with self.open(zero, "rb") as f:
594            self.assertRaises(OverflowError, f.read)
595        with self.open(zero, "r") as f:
596            self.assertRaises(OverflowError, f.read)
597
598    def check_flush_error_on_close(self, *args, **kwargs):
599        # Test that the file is closed despite failed flush
600        # and that flush() is called before file closed.
601        f = self.open(*args, **kwargs)
602        closed = []
603        def bad_flush():
604            closed[:] = [f.closed]
605            raise IOError()
606        f.flush = bad_flush
607        self.assertRaises(IOError, f.close) # exception not swallowed
608        self.assertTrue(f.closed)
609        self.assertTrue(closed)      # flush() called
610        self.assertFalse(closed[0])  # flush() called before file closed
611        f.flush = lambda: None  # break reference loop
612
613    def test_flush_error_on_close(self):
614        # raw file
615        # Issue #5700: io.FileIO calls flush() after file closed
616        self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
617        fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
618        self.check_flush_error_on_close(fd, 'wb', buffering=0)
619        fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
620        self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
621        os.close(fd)
622        # buffered io
623        self.check_flush_error_on_close(support.TESTFN, 'wb')
624        fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
625        self.check_flush_error_on_close(fd, 'wb')
626        fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
627        self.check_flush_error_on_close(fd, 'wb', closefd=False)
628        os.close(fd)
629        # text io
630        self.check_flush_error_on_close(support.TESTFN, 'w')
631        fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
632        self.check_flush_error_on_close(fd, 'w')
633        fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
634        self.check_flush_error_on_close(fd, 'w', closefd=False)
635        os.close(fd)
636
637    def test_multi_close(self):
638        f = self.open(support.TESTFN, "wb", buffering=0)
639        f.close()
640        f.close()
641        f.close()
642        self.assertRaises(ValueError, f.flush)
643
644    def test_RawIOBase_read(self):
645        # Exercise the default RawIOBase.read() implementation (which calls
646        # readinto() internally).
647        rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
648        self.assertEqual(rawio.read(2), b"ab")
649        self.assertEqual(rawio.read(2), b"c")
650        self.assertEqual(rawio.read(2), b"d")
651        self.assertEqual(rawio.read(2), None)
652        self.assertEqual(rawio.read(2), b"ef")
653        self.assertEqual(rawio.read(2), b"g")
654        self.assertEqual(rawio.read(2), None)
655        self.assertEqual(rawio.read(2), b"")
656
657    def test_fileio_closefd(self):
658        # Issue #4841
659        with self.open(__file__, 'rb') as f1, \
660             self.open(__file__, 'rb') as f2:
661            fileio = self.FileIO(f1.fileno(), closefd=False)
662            # .__init__() must not close f1
663            fileio.__init__(f2.fileno(), closefd=False)
664            f1.readline()
665            # .close() must not close f2
666            fileio.close()
667            f2.readline()
668
669    def test_nonbuffered_textio(self):
670        with warnings.catch_warnings(record=True) as recorded:
671            with self.assertRaises(ValueError):
672                self.open(support.TESTFN, 'w', buffering=0)
673            support.gc_collect()
674        self.assertEqual(recorded, [])
675
676    def test_invalid_newline(self):
677        with warnings.catch_warnings(record=True) as recorded:
678            with self.assertRaises(ValueError):
679                self.open(support.TESTFN, 'w', newline='invalid')
680            support.gc_collect()
681        self.assertEqual(recorded, [])
682
683    def test_buffered_readinto_mixin(self):
684        # Test the implementation provided by BufferedIOBase
685        class Stream(self.BufferedIOBase):
686            def read(self, size):
687                return b"12345"
688        stream = Stream()
689        buffer = byteslike(5)
690        self.assertEqual(stream.readinto(buffer), 5)
691        self.assertEqual(buffer.tobytes(), b"12345")
692
693    def test_close_assert(self):
694        class R(self.IOBase):
695            def __setattr__(self, name, value):
696                pass
697            def flush(self):
698                raise OSError()
699        f = R()
700        # This would cause an assertion failure.
701        self.assertRaises(OSError, f.close)
702
703
704class CIOTest(IOTest):
705
706    def test_IOBase_finalize(self):
707        # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
708        # class which inherits IOBase and an object of this class are caught
709        # in a reference cycle and close() is already in the method cache.
710        class MyIO(self.IOBase):
711            def close(self):
712                pass
713
714        # create an instance to populate the method cache
715        MyIO()
716        obj = MyIO()
717        obj.obj = obj
718        wr = weakref.ref(obj)
719        del MyIO
720        del obj
721        support.gc_collect()
722        self.assertIsNone(wr(), wr)
723
724class PyIOTest(IOTest):
725    test_array_writes = unittest.skip(
726        "len(array.array) returns number of elements rather than bytelength"
727    )(IOTest.test_array_writes)
728
729
730class CommonBufferedTests:
731    # Tests common to BufferedReader, BufferedWriter and BufferedRandom
732
733    def test_detach(self):
734        raw = self.MockRawIO()
735        buf = self.tp(raw)
736        self.assertIs(buf.detach(), raw)
737        self.assertRaises(ValueError, buf.detach)
738
739        repr(buf)  # Should still work
740
741    def test_fileno(self):
742        rawio = self.MockRawIO()
743        bufio = self.tp(rawio)
744
745        self.assertEqual(42, bufio.fileno())
746
747    def test_invalid_args(self):
748        rawio = self.MockRawIO()
749        bufio = self.tp(rawio)
750        # Invalid whence
751        self.assertRaises(ValueError, bufio.seek, 0, -1)
752        self.assertRaises(ValueError, bufio.seek, 0, 3)
753
754    def test_override_destructor(self):
755        tp = self.tp
756        record = []
757        class MyBufferedIO(tp):
758            def __del__(self):
759                record.append(1)
760                try:
761                    f = super(MyBufferedIO, self).__del__
762                except AttributeError:
763                    pass
764                else:
765                    f()
766            def close(self):
767                record.append(2)
768                super(MyBufferedIO, self).close()
769            def flush(self):
770                record.append(3)
771                super(MyBufferedIO, self).flush()
772        rawio = self.MockRawIO()
773        bufio = MyBufferedIO(rawio)
774        writable = bufio.writable()
775        del bufio
776        support.gc_collect()
777        if writable:
778            self.assertEqual(record, [1, 2, 3])
779        else:
780            self.assertEqual(record, [1, 2])
781
782    def test_context_manager(self):
783        # Test usability as a context manager
784        rawio = self.MockRawIO()
785        bufio = self.tp(rawio)
786        def _with():
787            with bufio:
788                pass
789        _with()
790        # bufio should now be closed, and using it a second time should raise
791        # a ValueError.
792        self.assertRaises(ValueError, _with)
793
794    def test_error_through_destructor(self):
795        # Test that the exception state is not modified by a destructor,
796        # even if close() fails.
797        rawio = self.CloseFailureIO()
798        def f():
799            self.tp(rawio).xyzzy
800        with support.captured_output("stderr") as s:
801            self.assertRaises(AttributeError, f)
802        s = s.getvalue().strip()
803        if s:
804            # The destructor *may* have printed an unraisable error, check it
805            self.assertEqual(len(s.splitlines()), 1)
806            self.assertTrue(s.startswith("Exception IOError: "), s)
807            self.assertTrue(s.endswith(" ignored"), s)
808
809    def test_repr(self):
810        raw = self.MockRawIO()
811        b = self.tp(raw)
812        clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
813        self.assertEqual(repr(b), "<%s>" % clsname)
814        raw.name = "dummy"
815        self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
816        raw.name = b"dummy"
817        self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
818
819    def test_flush_error_on_close(self):
820        # Test that buffered file is closed despite failed flush
821        # and that flush() is called before file closed.
822        raw = self.MockRawIO()
823        closed = []
824        def bad_flush():
825            closed[:] = [b.closed, raw.closed]
826            raise IOError()
827        raw.flush = bad_flush
828        b = self.tp(raw)
829        self.assertRaises(IOError, b.close) # exception not swallowed
830        self.assertTrue(b.closed)
831        self.assertTrue(raw.closed)
832        self.assertTrue(closed)      # flush() called
833        self.assertFalse(closed[0])  # flush() called before file closed
834        self.assertFalse(closed[1])
835        raw.flush = lambda: None  # break reference loop
836
837    def test_close_error_on_close(self):
838        raw = self.MockRawIO()
839        def bad_flush():
840            raise IOError('flush')
841        def bad_close():
842            raise IOError('close')
843        raw.close = bad_close
844        b = self.tp(raw)
845        b.flush = bad_flush
846        with self.assertRaises(IOError) as err: # exception not swallowed
847            b.close()
848        self.assertEqual(err.exception.args, ('close',))
849        self.assertFalse(b.closed)
850
851    def test_multi_close(self):
852        raw = self.MockRawIO()
853        b = self.tp(raw)
854        b.close()
855        b.close()
856        b.close()
857        self.assertRaises(ValueError, b.flush)
858
859    def test_readonly_attributes(self):
860        raw = self.MockRawIO()
861        buf = self.tp(raw)
862        x = self.MockRawIO()
863        with self.assertRaises((AttributeError, TypeError)):
864            buf.raw = x
865
866
867class SizeofTest:
868
869    @support.cpython_only
870    def test_sizeof(self):
871        bufsize1 = 4096
872        bufsize2 = 8192
873        rawio = self.MockRawIO()
874        bufio = self.tp(rawio, buffer_size=bufsize1)
875        size = sys.getsizeof(bufio) - bufsize1
876        rawio = self.MockRawIO()
877        bufio = self.tp(rawio, buffer_size=bufsize2)
878        self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
879
880
881class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
882    read_mode = "rb"
883
884    def test_constructor(self):
885        rawio = self.MockRawIO([b"abc"])
886        bufio = self.tp(rawio)
887        bufio.__init__(rawio)
888        bufio.__init__(rawio, buffer_size=1024)
889        bufio.__init__(rawio, buffer_size=16)
890        self.assertEqual(b"abc", bufio.read())
891        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
892        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
893        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
894        rawio = self.MockRawIO([b"abc"])
895        bufio.__init__(rawio)
896        self.assertEqual(b"abc", bufio.read())
897
898    def test_uninitialized(self):
899        bufio = self.tp.__new__(self.tp)
900        del bufio
901        bufio = self.tp.__new__(self.tp)
902        self.assertRaisesRegexp((ValueError, AttributeError),
903                                'uninitialized|has no attribute',
904                                bufio.read, 0)
905        bufio.__init__(self.MockRawIO())
906        self.assertEqual(bufio.read(0), b'')
907
908    def test_read(self):
909        for arg in (None, 7):
910            rawio = self.MockRawIO((b"abc", b"d", b"efg"))
911            bufio = self.tp(rawio)
912            self.assertEqual(b"abcdefg", bufio.read(arg))
913        # Invalid args
914        self.assertRaises(ValueError, bufio.read, -2)
915
916    def test_read1(self):
917        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
918        bufio = self.tp(rawio)
919        self.assertEqual(b"a", bufio.read(1))
920        self.assertEqual(b"b", bufio.read1(1))
921        self.assertEqual(rawio._reads, 1)
922        self.assertEqual(b"c", bufio.read1(100))
923        self.assertEqual(rawio._reads, 1)
924        self.assertEqual(b"d", bufio.read1(100))
925        self.assertEqual(rawio._reads, 2)
926        self.assertEqual(b"efg", bufio.read1(100))
927        self.assertEqual(rawio._reads, 3)
928        self.assertEqual(b"", bufio.read1(100))
929        self.assertEqual(rawio._reads, 4)
930        # Invalid args
931        self.assertRaises(ValueError, bufio.read1, -1)
932
933    def test_readinto(self):
934        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
935        bufio = self.tp(rawio)
936        b = bytearray(2)
937        self.assertEqual(bufio.readinto(b), 2)
938        self.assertEqual(b, b"ab")
939        self.assertEqual(bufio.readinto(b), 2)
940        self.assertEqual(b, b"cd")
941        self.assertEqual(bufio.readinto(b), 2)
942        self.assertEqual(b, b"ef")
943        self.assertEqual(bufio.readinto(b), 1)
944        self.assertEqual(b, b"gf")
945        self.assertEqual(bufio.readinto(b), 0)
946        self.assertEqual(b, b"gf")
947
948    def test_readlines(self):
949        def bufio():
950            rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
951            return self.tp(rawio)
952        self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
953        self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
954        self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
955
956    def test_buffering(self):
957        data = b"abcdefghi"
958        dlen = len(data)
959
960        tests = [
961            [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
962            [ 100, [ 3, 3, 3],     [ dlen ]    ],
963            [   4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
964        ]
965
966        for bufsize, buf_read_sizes, raw_read_sizes in tests:
967            rawio = self.MockFileIO(data)
968            bufio = self.tp(rawio, buffer_size=bufsize)
969            pos = 0
970            for nbytes in buf_read_sizes:
971                self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
972                pos += nbytes
973            # this is mildly implementation-dependent
974            self.assertEqual(rawio.read_history, raw_read_sizes)
975
976    def test_read_non_blocking(self):
977        # Inject some None's in there to simulate EWOULDBLOCK
978        rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
979        bufio = self.tp(rawio)
980        self.assertEqual(b"abcd", bufio.read(6))
981        self.assertEqual(b"e", bufio.read(1))
982        self.assertEqual(b"fg", bufio.read())
983        self.assertEqual(b"", bufio.peek(1))
984        self.assertIsNone(bufio.read())
985        self.assertEqual(b"", bufio.read())
986
987        rawio = self.MockRawIO((b"a", None, None))
988        self.assertEqual(b"a", rawio.readall())
989        self.assertIsNone(rawio.readall())
990
991    def test_read_past_eof(self):
992        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
993        bufio = self.tp(rawio)
994
995        self.assertEqual(b"abcdefg", bufio.read(9000))
996
997    def test_read_all(self):
998        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
999        bufio = self.tp(rawio)
1000
1001        self.assertEqual(b"abcdefg", bufio.read())
1002
1003    @unittest.skipUnless(threading, 'Threading required for this test.')
1004    @support.requires_resource('cpu')
1005    def test_threads(self):
1006        try:
1007            # Write out many bytes with exactly the same number of 0's,
1008            # 1's... 255's. This will help us check that concurrent reading
1009            # doesn't duplicate or forget contents.
1010            N = 1000
1011            l = list(range(256)) * N
1012            random.shuffle(l)
1013            s = bytes(bytearray(l))
1014            with self.open(support.TESTFN, "wb") as f:
1015                f.write(s)
1016            with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
1017                bufio = self.tp(raw, 8)
1018                errors = []
1019                results = []
1020                def f():
1021                    try:
1022                        # Intra-buffer read then buffer-flushing read
1023                        for n in cycle([1, 19]):
1024                            s = bufio.read(n)
1025                            if not s:
1026                                break
1027                            # list.append() is atomic
1028                            results.append(s)
1029                    except Exception as e:
1030                        errors.append(e)
1031                        raise
1032                threads = [threading.Thread(target=f) for x in range(20)]
1033                with support.start_threads(threads):
1034                    time.sleep(0.02) # yield
1035                self.assertFalse(errors,
1036                    "the following exceptions were caught: %r" % errors)
1037                s = b''.join(results)
1038                for i in range(256):
1039                    c = bytes(bytearray([i]))
1040                    self.assertEqual(s.count(c), N)
1041        finally:
1042            support.unlink(support.TESTFN)
1043
1044    def test_misbehaved_io(self):
1045        rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1046        bufio = self.tp(rawio)
1047        self.assertRaises(IOError, bufio.seek, 0)
1048        self.assertRaises(IOError, bufio.tell)
1049
1050    def test_no_extraneous_read(self):
1051        # Issue #9550; when the raw IO object has satisfied the read request,
1052        # we should not issue any additional reads, otherwise it may block
1053        # (e.g. socket).
1054        bufsize = 16
1055        for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1056            rawio = self.MockRawIO([b"x" * n])
1057            bufio = self.tp(rawio, bufsize)
1058            self.assertEqual(bufio.read(n), b"x" * n)
1059            # Simple case: one raw read is enough to satisfy the request.
1060            self.assertEqual(rawio._extraneous_reads, 0,
1061                             "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1062            # A more complex case where two raw reads are needed to satisfy
1063            # the request.
1064            rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1065            bufio = self.tp(rawio, bufsize)
1066            self.assertEqual(bufio.read(n), b"x" * n)
1067            self.assertEqual(rawio._extraneous_reads, 0,
1068                             "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1069
1070
1071class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
1072    tp = io.BufferedReader
1073
1074    def test_constructor(self):
1075        BufferedReaderTest.test_constructor(self)
1076        # The allocation can succeed on 32-bit builds, e.g. with more
1077        # than 2GB RAM and a 64-bit kernel.
1078        if sys.maxsize > 0x7FFFFFFF:
1079            rawio = self.MockRawIO()
1080            bufio = self.tp(rawio)
1081            self.assertRaises((OverflowError, MemoryError, ValueError),
1082                bufio.__init__, rawio, sys.maxsize)
1083
1084    def test_initialization(self):
1085        rawio = self.MockRawIO([b"abc"])
1086        bufio = self.tp(rawio)
1087        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1088        self.assertRaises(ValueError, bufio.read)
1089        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1090        self.assertRaises(ValueError, bufio.read)
1091        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1092        self.assertRaises(ValueError, bufio.read)
1093
1094    def test_misbehaved_io_read(self):
1095        rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1096        bufio = self.tp(rawio)
1097        # _pyio.BufferedReader seems to implement reading different, so that
1098        # checking this is not so easy.
1099        self.assertRaises(IOError, bufio.read, 10)
1100
1101    def test_garbage_collection(self):
1102        # C BufferedReader objects are collected.
1103        # The Python version has __del__, so it ends into gc.garbage instead
1104        self.addCleanup(support.unlink, support.TESTFN)
1105        rawio = self.FileIO(support.TESTFN, "w+b")
1106        f = self.tp(rawio)
1107        f.f = f
1108        wr = weakref.ref(f)
1109        del f
1110        support.gc_collect()
1111        self.assertIsNone(wr(), wr)
1112
1113    def test_args_error(self):
1114        # Issue #17275
1115        with self.assertRaisesRegexp(TypeError, "BufferedReader"):
1116            self.tp(io.BytesIO(), 1024, 1024, 1024)
1117
1118
1119class PyBufferedReaderTest(BufferedReaderTest):
1120    tp = pyio.BufferedReader
1121
1122
1123class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1124    write_mode = "wb"
1125
1126    def test_constructor(self):
1127        rawio = self.MockRawIO()
1128        bufio = self.tp(rawio)
1129        bufio.__init__(rawio)
1130        bufio.__init__(rawio, buffer_size=1024)
1131        bufio.__init__(rawio, buffer_size=16)
1132        self.assertEqual(3, bufio.write(b"abc"))
1133        bufio.flush()
1134        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1135        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1136        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1137        bufio.__init__(rawio)
1138        self.assertEqual(3, bufio.write(b"ghi"))
1139        bufio.flush()
1140        self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
1141
1142    def test_uninitialized(self):
1143        bufio = self.tp.__new__(self.tp)
1144        del bufio
1145        bufio = self.tp.__new__(self.tp)
1146        self.assertRaisesRegexp((ValueError, AttributeError),
1147                                'uninitialized|has no attribute',
1148                                bufio.write, b'')
1149        bufio.__init__(self.MockRawIO())
1150        self.assertEqual(bufio.write(b''), 0)
1151
1152    def test_detach_flush(self):
1153        raw = self.MockRawIO()
1154        buf = self.tp(raw)
1155        buf.write(b"howdy!")
1156        self.assertFalse(raw._write_stack)
1157        buf.detach()
1158        self.assertEqual(raw._write_stack, [b"howdy!"])
1159
1160    def test_write(self):
1161        # Write to the buffered IO but don't overflow the buffer.
1162        writer = self.MockRawIO()
1163        bufio = self.tp(writer, 8)
1164        bufio.write(b"abc")
1165        self.assertFalse(writer._write_stack)
1166        buffer = bytearray(b"def")
1167        bufio.write(buffer)
1168        buffer[:] = b"***"  # Overwrite our copy of the data
1169        bufio.flush()
1170        self.assertEqual(b"".join(writer._write_stack), b"abcdef")
1171
1172    def test_write_overflow(self):
1173        writer = self.MockRawIO()
1174        bufio = self.tp(writer, 8)
1175        contents = b"abcdefghijklmnop"
1176        for n in range(0, len(contents), 3):
1177            bufio.write(contents[n:n+3])
1178        flushed = b"".join(writer._write_stack)
1179        # At least (total - 8) bytes were implicitly flushed, perhaps more
1180        # depending on the implementation.
1181        self.assertTrue(flushed.startswith(contents[:-8]), flushed)
1182
1183    def check_writes(self, intermediate_func):
1184        # Lots of writes, test the flushed output is as expected.
1185        contents = bytes(range(256)) * 1000
1186        n = 0
1187        writer = self.MockRawIO()
1188        bufio = self.tp(writer, 13)
1189        # Generator of write sizes: repeat each N 15 times then proceed to N+1
1190        def gen_sizes():
1191            for size in count(1):
1192                for i in range(15):
1193                    yield size
1194        sizes = gen_sizes()
1195        while n < len(contents):
1196            size = min(next(sizes), len(contents) - n)
1197            self.assertEqual(bufio.write(contents[n:n+size]), size)
1198            intermediate_func(bufio)
1199            n += size
1200        bufio.flush()
1201        self.assertEqual(contents,
1202            b"".join(writer._write_stack))
1203
1204    def test_writes(self):
1205        self.check_writes(lambda bufio: None)
1206
1207    def test_writes_and_flushes(self):
1208        self.check_writes(lambda bufio: bufio.flush())
1209
1210    def test_writes_and_seeks(self):
1211        def _seekabs(bufio):
1212            pos = bufio.tell()
1213            bufio.seek(pos + 1, 0)
1214            bufio.seek(pos - 1, 0)
1215            bufio.seek(pos, 0)
1216        self.check_writes(_seekabs)
1217        def _seekrel(bufio):
1218            pos = bufio.seek(0, 1)
1219            bufio.seek(+1, 1)
1220            bufio.seek(-1, 1)
1221            bufio.seek(pos, 0)
1222        self.check_writes(_seekrel)
1223
1224    def test_writes_and_truncates(self):
1225        self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
1226
1227    def test_write_non_blocking(self):
1228        raw = self.MockNonBlockWriterIO()
1229        bufio = self.tp(raw, 8)
1230
1231        self.assertEqual(bufio.write(b"abcd"), 4)
1232        self.assertEqual(bufio.write(b"efghi"), 5)
1233        # 1 byte will be written, the rest will be buffered
1234        raw.block_on(b"k")
1235        self.assertEqual(bufio.write(b"jklmn"), 5)
1236
1237        # 8 bytes will be written, 8 will be buffered and the rest will be lost
1238        raw.block_on(b"0")
1239        try:
1240            bufio.write(b"opqrwxyz0123456789")
1241        except self.BlockingIOError as e:
1242            written = e.characters_written
1243        else:
1244            self.fail("BlockingIOError should have been raised")
1245        self.assertEqual(written, 16)
1246        self.assertEqual(raw.pop_written(),
1247            b"abcdefghijklmnopqrwxyz")
1248
1249        self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
1250        s = raw.pop_written()
1251        # Previously buffered bytes were flushed
1252        self.assertTrue(s.startswith(b"01234567A"), s)
1253
1254    def test_write_and_rewind(self):
1255        raw = io.BytesIO()
1256        bufio = self.tp(raw, 4)
1257        self.assertEqual(bufio.write(b"abcdef"), 6)
1258        self.assertEqual(bufio.tell(), 6)
1259        bufio.seek(0, 0)
1260        self.assertEqual(bufio.write(b"XY"), 2)
1261        bufio.seek(6, 0)
1262        self.assertEqual(raw.getvalue(), b"XYcdef")
1263        self.assertEqual(bufio.write(b"123456"), 6)
1264        bufio.flush()
1265        self.assertEqual(raw.getvalue(), b"XYcdef123456")
1266
1267    def test_flush(self):
1268        writer = self.MockRawIO()
1269        bufio = self.tp(writer, 8)
1270        bufio.write(b"abc")
1271        bufio.flush()
1272        self.assertEqual(b"abc", writer._write_stack[0])
1273
1274    def test_writelines(self):
1275        l = [b'ab', b'cd', b'ef']
1276        writer = self.MockRawIO()
1277        bufio = self.tp(writer, 8)
1278        bufio.writelines(l)
1279        bufio.flush()
1280        self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1281
1282    def test_writelines_userlist(self):
1283        l = UserList([b'ab', b'cd', b'ef'])
1284        writer = self.MockRawIO()
1285        bufio = self.tp(writer, 8)
1286        bufio.writelines(l)
1287        bufio.flush()
1288        self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1289
1290    def test_writelines_error(self):
1291        writer = self.MockRawIO()
1292        bufio = self.tp(writer, 8)
1293        self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1294        self.assertRaises(TypeError, bufio.writelines, None)
1295
1296    def test_destructor(self):
1297        writer = self.MockRawIO()
1298        bufio = self.tp(writer, 8)
1299        bufio.write(b"abc")
1300        del bufio
1301        support.gc_collect()
1302        self.assertEqual(b"abc", writer._write_stack[0])
1303
1304    def test_truncate(self):
1305        # Truncate implicitly flushes the buffer.
1306        self.addCleanup(support.unlink, support.TESTFN)
1307        with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
1308            bufio = self.tp(raw, 8)
1309            bufio.write(b"abcdef")
1310            self.assertEqual(bufio.truncate(3), 3)
1311            self.assertEqual(bufio.tell(), 6)
1312        with self.open(support.TESTFN, "rb", buffering=0) as f:
1313            self.assertEqual(f.read(), b"abc")
1314
1315    @unittest.skipUnless(threading, 'Threading required for this test.')
1316    @support.requires_resource('cpu')
1317    def test_threads(self):
1318        try:
1319            # Write out many bytes from many threads and test they were
1320            # all flushed.
1321            N = 1000
1322            contents = bytes(range(256)) * N
1323            sizes = cycle([1, 19])
1324            n = 0
1325            queue = deque()
1326            while n < len(contents):
1327                size = next(sizes)
1328                queue.append(contents[n:n+size])
1329                n += size
1330            del contents
1331            # We use a real file object because it allows us to
1332            # exercise situations where the GIL is released before
1333            # writing the buffer to the raw streams. This is in addition
1334            # to concurrency issues due to switching threads in the middle
1335            # of Python code.
1336            with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
1337                bufio = self.tp(raw, 8)
1338                errors = []
1339                def f():
1340                    try:
1341                        while True:
1342                            try:
1343                                s = queue.popleft()
1344                            except IndexError:
1345                                return
1346                            bufio.write(s)
1347                    except Exception as e:
1348                        errors.append(e)
1349                        raise
1350                threads = [threading.Thread(target=f) for x in range(20)]
1351                with support.start_threads(threads):
1352                    time.sleep(0.02) # yield
1353                self.assertFalse(errors,
1354                    "the following exceptions were caught: %r" % errors)
1355                bufio.close()
1356            with self.open(support.TESTFN, "rb") as f:
1357                s = f.read()
1358            for i in range(256):
1359                self.assertEqual(s.count(bytes([i])), N)
1360        finally:
1361            support.unlink(support.TESTFN)
1362
1363    def test_misbehaved_io(self):
1364        rawio = self.MisbehavedRawIO()
1365        bufio = self.tp(rawio, 5)
1366        self.assertRaises(IOError, bufio.seek, 0)
1367        self.assertRaises(IOError, bufio.tell)
1368        self.assertRaises(IOError, bufio.write, b"abcdef")
1369
1370    def test_max_buffer_size_deprecation(self):
1371        with support.check_warnings(("max_buffer_size is deprecated",
1372                                     DeprecationWarning)):
1373            self.tp(self.MockRawIO(), 8, 12)
1374
1375    def test_write_error_on_close(self):
1376        raw = self.MockRawIO()
1377        def bad_write(b):
1378            raise IOError()
1379        raw.write = bad_write
1380        b = self.tp(raw)
1381        b.write(b'spam')
1382        self.assertRaises(IOError, b.close) # exception not swallowed
1383        self.assertTrue(b.closed)
1384
1385
1386class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
1387    tp = io.BufferedWriter
1388
1389    def test_constructor(self):
1390        BufferedWriterTest.test_constructor(self)
1391        # The allocation can succeed on 32-bit builds, e.g. with more
1392        # than 2GB RAM and a 64-bit kernel.
1393        if sys.maxsize > 0x7FFFFFFF:
1394            rawio = self.MockRawIO()
1395            bufio = self.tp(rawio)
1396            self.assertRaises((OverflowError, MemoryError, ValueError),
1397                bufio.__init__, rawio, sys.maxsize)
1398
1399    def test_initialization(self):
1400        rawio = self.MockRawIO()
1401        bufio = self.tp(rawio)
1402        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1403        self.assertRaises(ValueError, bufio.write, b"def")
1404        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1405        self.assertRaises(ValueError, bufio.write, b"def")
1406        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1407        self.assertRaises(ValueError, bufio.write, b"def")
1408
1409    def test_garbage_collection(self):
1410        # C BufferedWriter objects are collected, and collecting them flushes
1411        # all data to disk.
1412        # The Python version has __del__, so it ends into gc.garbage instead
1413        self.addCleanup(support.unlink, support.TESTFN)
1414        rawio = self.FileIO(support.TESTFN, "w+b")
1415        f = self.tp(rawio)
1416        f.write(b"123xxx")
1417        f.x = f
1418        wr = weakref.ref(f)
1419        del f
1420        support.gc_collect()
1421        self.assertIsNone(wr(), wr)
1422        with self.open(support.TESTFN, "rb") as f:
1423            self.assertEqual(f.read(), b"123xxx")
1424
1425    def test_args_error(self):
1426        # Issue #17275
1427        with self.assertRaisesRegexp(TypeError, "BufferedWriter"):
1428            self.tp(io.BytesIO(), 1024, 1024, 1024)
1429
1430
1431class PyBufferedWriterTest(BufferedWriterTest):
1432    tp = pyio.BufferedWriter
1433
1434class BufferedRWPairTest(unittest.TestCase):
1435
1436    def test_constructor(self):
1437        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1438        self.assertFalse(pair.closed)
1439
1440    def test_uninitialized(self):
1441        pair = self.tp.__new__(self.tp)
1442        del pair
1443        pair = self.tp.__new__(self.tp)
1444        self.assertRaisesRegexp((ValueError, AttributeError),
1445                                'uninitialized|has no attribute',
1446                                pair.read, 0)
1447        self.assertRaisesRegexp((ValueError, AttributeError),
1448                                'uninitialized|has no attribute',
1449                                pair.write, b'')
1450        pair.__init__(self.MockRawIO(), self.MockRawIO())
1451        self.assertEqual(pair.read(0), b'')
1452        self.assertEqual(pair.write(b''), 0)
1453
1454    def test_detach(self):
1455        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1456        self.assertRaises(self.UnsupportedOperation, pair.detach)
1457
1458    def test_constructor_max_buffer_size_deprecation(self):
1459        with support.check_warnings(("max_buffer_size is deprecated",
1460                                     DeprecationWarning)):
1461            self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1462
1463    def test_constructor_with_not_readable(self):
1464        class NotReadable(MockRawIO):
1465            def readable(self):
1466                return False
1467
1468        self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1469
1470    def test_constructor_with_not_writeable(self):
1471        class NotWriteable(MockRawIO):
1472            def writable(self):
1473                return False
1474
1475        self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1476
1477    def test_read(self):
1478        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1479
1480        self.assertEqual(pair.read(3), b"abc")
1481        self.assertEqual(pair.read(1), b"d")
1482        self.assertEqual(pair.read(), b"ef")
1483        pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1484        self.assertEqual(pair.read(None), b"abc")
1485
1486    def test_readlines(self):
1487        pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1488        self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1489        self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1490        self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
1491
1492    def test_read1(self):
1493        # .read1() is delegated to the underlying reader object, so this test
1494        # can be shallow.
1495        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1496
1497        self.assertEqual(pair.read1(3), b"abc")
1498
1499    def test_readinto(self):
1500        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1501
1502        data = byteslike(5)
1503        self.assertEqual(pair.readinto(data), 5)
1504        self.assertEqual(data.tobytes(), b"abcde")
1505
1506    def test_write(self):
1507        w = self.MockRawIO()
1508        pair = self.tp(self.MockRawIO(), w)
1509
1510        pair.write(b"abc")
1511        pair.flush()
1512        buffer = bytearray(b"def")
1513        pair.write(buffer)
1514        buffer[:] = b"***"  # Overwrite our copy of the data
1515        pair.flush()
1516        self.assertEqual(w._write_stack, [b"abc", b"def"])
1517
1518    def test_peek(self):
1519        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1520
1521        self.assertTrue(pair.peek(3).startswith(b"abc"))
1522        self.assertEqual(pair.read(3), b"abc")
1523
1524    def test_readable(self):
1525        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1526        self.assertTrue(pair.readable())
1527
1528    def test_writeable(self):
1529        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1530        self.assertTrue(pair.writable())
1531
1532    def test_seekable(self):
1533        # BufferedRWPairs are never seekable, even if their readers and writers
1534        # are.
1535        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1536        self.assertFalse(pair.seekable())
1537
1538    # .flush() is delegated to the underlying writer object and has been
1539    # tested in the test_write method.
1540
1541    def test_close_and_closed(self):
1542        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1543        self.assertFalse(pair.closed)
1544        pair.close()
1545        self.assertTrue(pair.closed)
1546
1547    def test_reader_close_error_on_close(self):
1548        def reader_close():
1549            reader_non_existing
1550        reader = self.MockRawIO()
1551        reader.close = reader_close
1552        writer = self.MockRawIO()
1553        pair = self.tp(reader, writer)
1554        with self.assertRaises(NameError) as err:
1555            pair.close()
1556        self.assertIn('reader_non_existing', str(err.exception))
1557        self.assertTrue(pair.closed)
1558        self.assertFalse(reader.closed)
1559        self.assertTrue(writer.closed)
1560
1561    def test_writer_close_error_on_close(self):
1562        def writer_close():
1563            writer_non_existing
1564        reader = self.MockRawIO()
1565        writer = self.MockRawIO()
1566        writer.close = writer_close
1567        pair = self.tp(reader, writer)
1568        with self.assertRaises(NameError) as err:
1569            pair.close()
1570        self.assertIn('writer_non_existing', str(err.exception))
1571        self.assertFalse(pair.closed)
1572        self.assertTrue(reader.closed)
1573        self.assertFalse(writer.closed)
1574
1575    def test_reader_writer_close_error_on_close(self):
1576        def reader_close():
1577            reader_non_existing
1578        def writer_close():
1579            writer_non_existing
1580        reader = self.MockRawIO()
1581        reader.close = reader_close
1582        writer = self.MockRawIO()
1583        writer.close = writer_close
1584        pair = self.tp(reader, writer)
1585        with self.assertRaises(NameError) as err:
1586            pair.close()
1587        self.assertIn('reader_non_existing', str(err.exception))
1588        self.assertFalse(pair.closed)
1589        self.assertFalse(reader.closed)
1590        self.assertFalse(writer.closed)
1591
1592    def test_isatty(self):
1593        class SelectableIsAtty(MockRawIO):
1594            def __init__(self, isatty):
1595                MockRawIO.__init__(self)
1596                self._isatty = isatty
1597
1598            def isatty(self):
1599                return self._isatty
1600
1601        pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1602        self.assertFalse(pair.isatty())
1603
1604        pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1605        self.assertTrue(pair.isatty())
1606
1607        pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1608        self.assertTrue(pair.isatty())
1609
1610        pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1611        self.assertTrue(pair.isatty())
1612
1613    def test_weakref_clearing(self):
1614        brw = self.tp(self.MockRawIO(), self.MockRawIO())
1615        ref = weakref.ref(brw)
1616        brw = None
1617        ref = None # Shouldn't segfault.
1618
1619class CBufferedRWPairTest(BufferedRWPairTest):
1620    tp = io.BufferedRWPair
1621
1622class PyBufferedRWPairTest(BufferedRWPairTest):
1623    tp = pyio.BufferedRWPair
1624
1625
1626class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1627    read_mode = "rb+"
1628    write_mode = "wb+"
1629
1630    def test_constructor(self):
1631        BufferedReaderTest.test_constructor(self)
1632        BufferedWriterTest.test_constructor(self)
1633
1634    def test_uninitialized(self):
1635        BufferedReaderTest.test_uninitialized(self)
1636        BufferedWriterTest.test_uninitialized(self)
1637
1638    def test_read_and_write(self):
1639        raw = self.MockRawIO((b"asdf", b"ghjk"))
1640        rw = self.tp(raw, 8)
1641
1642        self.assertEqual(b"as", rw.read(2))
1643        rw.write(b"ddd")
1644        rw.write(b"eee")
1645        self.assertFalse(raw._write_stack) # Buffer writes
1646        self.assertEqual(b"ghjk", rw.read())
1647        self.assertEqual(b"dddeee", raw._write_stack[0])
1648
1649    def test_seek_and_tell(self):
1650        raw = self.BytesIO(b"asdfghjkl")
1651        rw = self.tp(raw)
1652
1653        self.assertEqual(b"as", rw.read(2))
1654        self.assertEqual(2, rw.tell())
1655        rw.seek(0, 0)
1656        self.assertEqual(b"asdf", rw.read(4))
1657
1658        rw.write(b"123f")
1659        rw.seek(0, 0)
1660        self.assertEqual(b"asdf123fl", rw.read())
1661        self.assertEqual(9, rw.tell())
1662        rw.seek(-4, 2)
1663        self.assertEqual(5, rw.tell())
1664        rw.seek(2, 1)
1665        self.assertEqual(7, rw.tell())
1666        self.assertEqual(b"fl", rw.read(11))
1667        rw.flush()
1668        self.assertEqual(b"asdf123fl", raw.getvalue())
1669
1670        self.assertRaises(TypeError, rw.seek, 0.0)
1671
1672    def check_flush_and_read(self, read_func):
1673        raw = self.BytesIO(b"abcdefghi")
1674        bufio = self.tp(raw)
1675
1676        self.assertEqual(b"ab", read_func(bufio, 2))
1677        bufio.write(b"12")
1678        self.assertEqual(b"ef", read_func(bufio, 2))
1679        self.assertEqual(6, bufio.tell())
1680        bufio.flush()
1681        self.assertEqual(6, bufio.tell())
1682        self.assertEqual(b"ghi", read_func(bufio))
1683        raw.seek(0, 0)
1684        raw.write(b"XYZ")
1685        # flush() resets the read buffer
1686        bufio.flush()
1687        bufio.seek(0, 0)
1688        self.assertEqual(b"XYZ", read_func(bufio, 3))
1689
1690    def test_flush_and_read(self):
1691        self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1692
1693    def test_flush_and_readinto(self):
1694        def _readinto(bufio, n=-1):
1695            b = bytearray(n if n >= 0 else 9999)
1696            n = bufio.readinto(b)
1697            return bytes(b[:n])
1698        self.check_flush_and_read(_readinto)
1699
1700    def test_flush_and_peek(self):
1701        def _peek(bufio, n=-1):
1702            # This relies on the fact that the buffer can contain the whole
1703            # raw stream, otherwise peek() can return less.
1704            b = bufio.peek(n)
1705            if n != -1:
1706                b = b[:n]
1707            bufio.seek(len(b), 1)
1708            return b
1709        self.check_flush_and_read(_peek)
1710
1711    def test_flush_and_write(self):
1712        raw = self.BytesIO(b"abcdefghi")
1713        bufio = self.tp(raw)
1714
1715        bufio.write(b"123")
1716        bufio.flush()
1717        bufio.write(b"45")
1718        bufio.flush()
1719        bufio.seek(0, 0)
1720        self.assertEqual(b"12345fghi", raw.getvalue())
1721        self.assertEqual(b"12345fghi", bufio.read())
1722
1723    def test_threads(self):
1724        BufferedReaderTest.test_threads(self)
1725        BufferedWriterTest.test_threads(self)
1726
1727    def test_writes_and_peek(self):
1728        def _peek(bufio):
1729            bufio.peek(1)
1730        self.check_writes(_peek)
1731        def _peek(bufio):
1732            pos = bufio.tell()
1733            bufio.seek(-1, 1)
1734            bufio.peek(1)
1735            bufio.seek(pos, 0)
1736        self.check_writes(_peek)
1737
1738    def test_writes_and_reads(self):
1739        def _read(bufio):
1740            bufio.seek(-1, 1)
1741            bufio.read(1)
1742        self.check_writes(_read)
1743
1744    def test_writes_and_read1s(self):
1745        def _read1(bufio):
1746            bufio.seek(-1, 1)
1747            bufio.read1(1)
1748        self.check_writes(_read1)
1749
1750    def test_writes_and_readintos(self):
1751        def _read(bufio):
1752            bufio.seek(-1, 1)
1753            bufio.readinto(bytearray(1))
1754        self.check_writes(_read)
1755
1756    def test_write_after_readahead(self):
1757        # Issue #6629: writing after the buffer was filled by readahead should
1758        # first rewind the raw stream.
1759        for overwrite_size in [1, 5]:
1760            raw = self.BytesIO(b"A" * 10)
1761            bufio = self.tp(raw, 4)
1762            # Trigger readahead
1763            self.assertEqual(bufio.read(1), b"A")
1764            self.assertEqual(bufio.tell(), 1)
1765            # Overwriting should rewind the raw stream if it needs so
1766            bufio.write(b"B" * overwrite_size)
1767            self.assertEqual(bufio.tell(), overwrite_size + 1)
1768            # If the write size was smaller than the buffer size, flush() and
1769            # check that rewind happens.
1770            bufio.flush()
1771            self.assertEqual(bufio.tell(), overwrite_size + 1)
1772            s = raw.getvalue()
1773            self.assertEqual(s,
1774                b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1775
1776    def test_write_rewind_write(self):
1777        # Various combinations of reading / writing / seeking backwards / writing again
1778        def mutate(bufio, pos1, pos2):
1779            assert pos2 >= pos1
1780            # Fill the buffer
1781            bufio.seek(pos1)
1782            bufio.read(pos2 - pos1)
1783            bufio.write(b'\x02')
1784            # This writes earlier than the previous write, but still inside
1785            # the buffer.
1786            bufio.seek(pos1)
1787            bufio.write(b'\x01')
1788
1789        b = b"\x80\x81\x82\x83\x84"
1790        for i in range(0, len(b)):
1791            for j in range(i, len(b)):
1792                raw = self.BytesIO(b)
1793                bufio = self.tp(raw, 100)
1794                mutate(bufio, i, j)
1795                bufio.flush()
1796                expected = bytearray(b)
1797                expected[j] = 2
1798                expected[i] = 1
1799                self.assertEqual(raw.getvalue(), expected,
1800                                 "failed result for i=%d, j=%d" % (i, j))
1801
1802    def test_truncate_after_read_or_write(self):
1803        raw = self.BytesIO(b"A" * 10)
1804        bufio = self.tp(raw, 100)
1805        self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1806        self.assertEqual(bufio.truncate(), 2)
1807        self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1808        self.assertEqual(bufio.truncate(), 4)
1809
1810    def test_misbehaved_io(self):
1811        BufferedReaderTest.test_misbehaved_io(self)
1812        BufferedWriterTest.test_misbehaved_io(self)
1813
1814    def test_interleaved_read_write(self):
1815        # Test for issue #12213
1816        with self.BytesIO(b'abcdefgh') as raw:
1817            with self.tp(raw, 100) as f:
1818                f.write(b"1")
1819                self.assertEqual(f.read(1), b'b')
1820                f.write(b'2')
1821                self.assertEqual(f.read1(1), b'd')
1822                f.write(b'3')
1823                buf = bytearray(1)
1824                f.readinto(buf)
1825                self.assertEqual(buf, b'f')
1826                f.write(b'4')
1827                self.assertEqual(f.peek(1), b'h')
1828                f.flush()
1829                self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1830
1831        with self.BytesIO(b'abc') as raw:
1832            with self.tp(raw, 100) as f:
1833                self.assertEqual(f.read(1), b'a')
1834                f.write(b"2")
1835                self.assertEqual(f.read(1), b'c')
1836                f.flush()
1837                self.assertEqual(raw.getvalue(), b'a2c')
1838
1839    def test_interleaved_readline_write(self):
1840        with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1841            with self.tp(raw) as f:
1842                f.write(b'1')
1843                self.assertEqual(f.readline(), b'b\n')
1844                f.write(b'2')
1845                self.assertEqual(f.readline(), b'def\n')
1846                f.write(b'3')
1847                self.assertEqual(f.readline(), b'\n')
1848                f.flush()
1849                self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1850
1851
1852class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest,
1853                          BufferedRandomTest, SizeofTest):
1854    tp = io.BufferedRandom
1855
1856    def test_constructor(self):
1857        BufferedRandomTest.test_constructor(self)
1858        # The allocation can succeed on 32-bit builds, e.g. with more
1859        # than 2GB RAM and a 64-bit kernel.
1860        if sys.maxsize > 0x7FFFFFFF:
1861            rawio = self.MockRawIO()
1862            bufio = self.tp(rawio)
1863            self.assertRaises((OverflowError, MemoryError, ValueError),
1864                bufio.__init__, rawio, sys.maxsize)
1865
1866    def test_garbage_collection(self):
1867        CBufferedReaderTest.test_garbage_collection(self)
1868        CBufferedWriterTest.test_garbage_collection(self)
1869
1870    def test_args_error(self):
1871        # Issue #17275
1872        with self.assertRaisesRegexp(TypeError, "BufferedRandom"):
1873            self.tp(io.BytesIO(), 1024, 1024, 1024)
1874
1875
1876class PyBufferedRandomTest(BufferedRandomTest):
1877    tp = pyio.BufferedRandom
1878
1879
1880# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1881# properties:
1882#   - A single output character can correspond to many bytes of input.
1883#   - The number of input bytes to complete the character can be
1884#     undetermined until the last input byte is received.
1885#   - The number of input bytes can vary depending on previous input.
1886#   - A single input byte can correspond to many characters of output.
1887#   - The number of output characters can be undetermined until the
1888#     last input byte is received.
1889#   - The number of output characters can vary depending on previous input.
1890
1891class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1892    """
1893    For testing seek/tell behavior with a stateful, buffering decoder.
1894
1895    Input is a sequence of words.  Words may be fixed-length (length set
1896    by input) or variable-length (period-terminated).  In variable-length
1897    mode, extra periods are ignored.  Possible words are:
1898      - 'i' followed by a number sets the input length, I (maximum 99).
1899        When I is set to 0, words are space-terminated.
1900      - 'o' followed by a number sets the output length, O (maximum 99).
1901      - Any other word is converted into a word followed by a period on
1902        the output.  The output word consists of the input word truncated
1903        or padded out with hyphens to make its length equal to O.  If O
1904        is 0, the word is output verbatim without truncating or padding.
1905    I and O are initially set to 1.  When I changes, any buffered input is
1906    re-scanned according to the new I.  EOF also terminates the last word.
1907    """
1908
1909    def __init__(self, errors='strict'):
1910        codecs.IncrementalDecoder.__init__(self, errors)
1911        self.reset()
1912
1913    def __repr__(self):
1914        return '<SID %x>' % id(self)
1915
1916    def reset(self):
1917        self.i = 1
1918        self.o = 1
1919        self.buffer = bytearray()
1920
1921    def getstate(self):
1922        i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1923        return bytes(self.buffer), i*100 + o
1924
1925    def setstate(self, state):
1926        buffer, io = state
1927        self.buffer = bytearray(buffer)
1928        i, o = divmod(io, 100)
1929        self.i, self.o = i ^ 1, o ^ 1
1930
1931    def decode(self, input, final=False):
1932        output = ''
1933        for b in input:
1934            if self.i == 0: # variable-length, terminated with period
1935                if b == '.':
1936                    if self.buffer:
1937                        output += self.process_word()
1938                else:
1939                    self.buffer.append(b)
1940            else: # fixed-length, terminate after self.i bytes
1941                self.buffer.append(b)
1942                if len(self.buffer) == self.i:
1943                    output += self.process_word()
1944        if final and self.buffer: # EOF terminates the last word
1945            output += self.process_word()
1946        return output
1947
1948    def process_word(self):
1949        output = ''
1950        if self.buffer[0] == ord('i'):
1951            self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1952        elif self.buffer[0] == ord('o'):
1953            self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1954        else:
1955            output = self.buffer.decode('ascii')
1956            if len(output) < self.o:
1957                output += '-'*self.o # pad out with hyphens
1958            if self.o:
1959                output = output[:self.o] # truncate to output length
1960            output += '.'
1961        self.buffer = bytearray()
1962        return output
1963
1964    codecEnabled = False
1965
1966    @classmethod
1967    def lookupTestDecoder(cls, name):
1968        if cls.codecEnabled and name == 'test_decoder':
1969            latin1 = codecs.lookup('latin-1')
1970            return codecs.CodecInfo(
1971                name='test_decoder', encode=latin1.encode, decode=None,
1972                incrementalencoder=None,
1973                streamreader=None, streamwriter=None,
1974                incrementaldecoder=cls)
1975
1976# Register the previous decoder for testing.
1977# Disabled by default, tests will enable it.
1978codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1979
1980
1981class StatefulIncrementalDecoderTest(unittest.TestCase):
1982    """
1983    Make sure the StatefulIncrementalDecoder actually works.
1984    """
1985
1986    test_cases = [
1987        # I=1, O=1 (fixed-length input == fixed-length output)
1988        (b'abcd', False, 'a.b.c.d.'),
1989        # I=0, O=0 (variable-length input, variable-length output)
1990        (b'oiabcd', True, 'abcd.'),
1991        # I=0, O=0 (should ignore extra periods)
1992        (b'oi...abcd...', True, 'abcd.'),
1993        # I=0, O=6 (variable-length input, fixed-length output)
1994        (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1995        # I=2, O=6 (fixed-length input < fixed-length output)
1996        (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1997        # I=6, O=3 (fixed-length input > fixed-length output)
1998        (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1999        # I=0, then 3; O=29, then 15 (with longer output)
2000        (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2001         'a----------------------------.' +
2002         'b----------------------------.' +
2003         'cde--------------------------.' +
2004         'abcdefghijabcde.' +
2005         'a.b------------.' +
2006         '.c.------------.' +
2007         'd.e------------.' +
2008         'k--------------.' +
2009         'l--------------.' +
2010         'm--------------.')
2011    ]
2012
2013    def test_decoder(self):
2014        # Try a few one-shot test cases.
2015        for input, eof, output in self.test_cases:
2016            d = StatefulIncrementalDecoder()
2017            self.assertEqual(d.decode(input, eof), output)
2018
2019        # Also test an unfinished decode, followed by forcing EOF.
2020        d = StatefulIncrementalDecoder()
2021        self.assertEqual(d.decode(b'oiabcd'), '')
2022        self.assertEqual(d.decode(b'', 1), 'abcd.')
2023
2024class TextIOWrapperTest(unittest.TestCase):
2025
2026    def setUp(self):
2027        self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2028        self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
2029        support.unlink(support.TESTFN)
2030
2031    def tearDown(self):
2032        support.unlink(support.TESTFN)
2033
2034    def test_constructor(self):
2035        r = self.BytesIO(b"\xc3\xa9\n\n")
2036        b = self.BufferedReader(r, 1000)
2037        t = self.TextIOWrapper(b)
2038        t.__init__(b, encoding="latin1", newline="\r\n")
2039        self.assertEqual(t.encoding, "latin1")
2040        self.assertEqual(t.line_buffering, False)
2041        t.__init__(b, encoding="utf8", line_buffering=True)
2042        self.assertEqual(t.encoding, "utf8")
2043        self.assertEqual(t.line_buffering, True)
2044        self.assertEqual("\xe9\n", t.readline())
2045        self.assertRaises(TypeError, t.__init__, b, newline=42)
2046        self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2047
2048    def test_uninitialized(self):
2049        t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2050        del t
2051        t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2052        self.assertRaises(Exception, repr, t)
2053        self.assertRaisesRegexp((ValueError, AttributeError),
2054                                'uninitialized|has no attribute',
2055                                t.read, 0)
2056        t.__init__(self.MockRawIO())
2057        self.assertEqual(t.read(0), u'')
2058
2059    def test_non_text_encoding_codecs_are_rejected(self):
2060        # Ensure the constructor complains if passed a codec that isn't
2061        # marked as a text encoding
2062        # http://bugs.python.org/issue20404
2063        r = self.BytesIO()
2064        b = self.BufferedWriter(r)
2065        with support.check_py3k_warnings():
2066            self.TextIOWrapper(b, encoding="hex_codec")
2067
2068    def test_detach(self):
2069        r = self.BytesIO()
2070        b = self.BufferedWriter(r)
2071        t = self.TextIOWrapper(b)
2072        self.assertIs(t.detach(), b)
2073
2074        t = self.TextIOWrapper(b, encoding="ascii")
2075        t.write("howdy")
2076        self.assertFalse(r.getvalue())
2077        t.detach()
2078        self.assertEqual(r.getvalue(), b"howdy")
2079        self.assertRaises(ValueError, t.detach)
2080
2081        # Operations independent of the detached stream should still work
2082        repr(t)
2083        self.assertEqual(t.encoding, "ascii")
2084        self.assertEqual(t.errors, "strict")
2085        self.assertFalse(t.line_buffering)
2086
2087    def test_repr(self):
2088        raw = self.BytesIO("hello".encode("utf-8"))
2089        b = self.BufferedReader(raw)
2090        t = self.TextIOWrapper(b, encoding="utf-8")
2091        modname = self.TextIOWrapper.__module__
2092        self.assertEqual(repr(t),
2093                         "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2094        raw.name = "dummy"
2095        self.assertEqual(repr(t),
2096                         "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
2097        raw.name = b"dummy"
2098        self.assertEqual(repr(t),
2099                         "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
2100
2101        t.buffer.detach()
2102        repr(t)  # Should not raise an exception
2103
2104    def test_line_buffering(self):
2105        r = self.BytesIO()
2106        b = self.BufferedWriter(r, 1000)
2107        t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
2108        t.write("X")
2109        self.assertEqual(r.getvalue(), b"")  # No flush happened
2110        t.write("Y\nZ")
2111        self.assertEqual(r.getvalue(), b"XY\nZ")  # All got flushed
2112        t.write("A\rB")
2113        self.assertEqual(r.getvalue(), b"XY\nZA\rB")
2114
2115    def test_encoding(self):
2116        # Check the encoding attribute is always set, and valid
2117        b = self.BytesIO()
2118        t = self.TextIOWrapper(b, encoding="utf8")
2119        self.assertEqual(t.encoding, "utf8")
2120        t = self.TextIOWrapper(b)
2121        self.assertIsNotNone(t.encoding)
2122        codecs.lookup(t.encoding)
2123
2124    def test_encoding_errors_reading(self):
2125        # (1) default
2126        b = self.BytesIO(b"abc\n\xff\n")
2127        t = self.TextIOWrapper(b, encoding="ascii")
2128        self.assertRaises(UnicodeError, t.read)
2129        # (2) explicit strict
2130        b = self.BytesIO(b"abc\n\xff\n")
2131        t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
2132        self.assertRaises(UnicodeError, t.read)
2133        # (3) ignore
2134        b = self.BytesIO(b"abc\n\xff\n")
2135        t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
2136        self.assertEqual(t.read(), "abc\n\n")
2137        # (4) replace
2138        b = self.BytesIO(b"abc\n\xff\n")
2139        t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
2140        self.assertEqual(t.read(), "abc\n\ufffd\n")
2141
2142    def test_encoding_errors_writing(self):
2143        # (1) default
2144        b = self.BytesIO()
2145        t = self.TextIOWrapper(b, encoding="ascii")
2146        self.assertRaises(UnicodeError, t.write, "\xff")
2147        # (2) explicit strict
2148        b = self.BytesIO()
2149        t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
2150        self.assertRaises(UnicodeError, t.write, "\xff")
2151        # (3) ignore
2152        b = self.BytesIO()
2153        t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
2154                             newline="\n")
2155        t.write("abc\xffdef\n")
2156        t.flush()
2157        self.assertEqual(b.getvalue(), b"abcdef\n")
2158        # (4) replace
2159        b = self.BytesIO()
2160        t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
2161                             newline="\n")
2162        t.write("abc\xffdef\n")
2163        t.flush()
2164        self.assertEqual(b.getvalue(), b"abc?def\n")
2165
2166    def test_newlines(self):
2167        input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2168
2169        tests = [
2170            [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
2171            [ '', input_lines ],
2172            [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2173            [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2174            [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
2175        ]
2176        encodings = (
2177            'utf-8', 'latin-1',
2178            'utf-16', 'utf-16-le', 'utf-16-be',
2179            'utf-32', 'utf-32-le', 'utf-32-be',
2180        )
2181
2182        # Try a range of buffer sizes to test the case where \r is the last
2183        # character in TextIOWrapper._pending_line.
2184        for encoding in encodings:
2185            # XXX: str.encode() should return bytes
2186            data = bytes(''.join(input_lines).encode(encoding))
2187            for do_reads in (False, True):
2188                for bufsize in range(1, 10):
2189                    for newline, exp_lines in tests:
2190                        bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2191                        textio = self.TextIOWrapper(bufio, newline=newline,
2192                                                  encoding=encoding)
2193                        if do_reads:
2194                            got_lines = []
2195                            while True:
2196                                c2 = textio.read(2)
2197                                if c2 == '':
2198                                    break
2199                                self.assertEqual(len(c2), 2)
2200                                got_lines.append(c2 + textio.readline())
2201                        else:
2202                            got_lines = list(textio)
2203
2204                        for got_line, exp_line in zip(got_lines, exp_lines):
2205                            self.assertEqual(got_line, exp_line)
2206                        self.assertEqual(len(got_lines), len(exp_lines))
2207
2208    def test_newlines_input(self):
2209        testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
2210        normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2211        for newline, expected in [
2212            (None, normalized.decode("ascii").splitlines(True)),
2213            ("", testdata.decode("ascii").splitlines(True)),
2214            ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2215            ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2216            ("\r",  ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
2217            ]:
2218            buf = self.BytesIO(testdata)
2219            txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2220            self.assertEqual(txt.readlines(), expected)
2221            txt.seek(0)
2222            self.assertEqual(txt.read(), "".join(expected))
2223
2224    def test_newlines_output(self):
2225        testdict = {
2226            "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2227            "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2228            "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2229            "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2230            }
2231        tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2232        for newline, expected in tests:
2233            buf = self.BytesIO()
2234            txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2235            txt.write("AAA\nB")
2236            txt.write("BB\nCCC\n")
2237            txt.write("X\rY\r\nZ")
2238            txt.flush()
2239            self.assertEqual(buf.closed, False)
2240            self.assertEqual(buf.getvalue(), expected)
2241
2242    def test_destructor(self):
2243        l = []
2244        base = self.BytesIO
2245        class MyBytesIO(base):
2246            def close(self):
2247                l.append(self.getvalue())
2248                base.close(self)
2249        b = MyBytesIO()
2250        t = self.TextIOWrapper(b, encoding="ascii")
2251        t.write("abc")
2252        del t
2253        support.gc_collect()
2254        self.assertEqual([b"abc"], l)
2255
2256    def test_override_destructor(self):
2257        record = []
2258        class MyTextIO(self.TextIOWrapper):
2259            def __del__(self):
2260                record.append(1)
2261                try:
2262                    f = super(MyTextIO, self).__del__
2263                except AttributeError:
2264                    pass
2265                else:
2266                    f()
2267            def close(self):
2268                record.append(2)
2269                super(MyTextIO, self).close()
2270            def flush(self):
2271                record.append(3)
2272                super(MyTextIO, self).flush()
2273        b = self.BytesIO()
2274        t = MyTextIO(b, encoding="ascii")
2275        del t
2276        support.gc_collect()
2277        self.assertEqual(record, [1, 2, 3])
2278
2279    def test_error_through_destructor(self):
2280        # Test that the exception state is not modified by a destructor,
2281        # even if close() fails.
2282        rawio = self.CloseFailureIO()
2283        def f():
2284            self.TextIOWrapper(rawio).xyzzy
2285        with support.captured_output("stderr") as s:
2286            self.assertRaises(AttributeError, f)
2287        s = s.getvalue().strip()
2288        if s:
2289            # The destructor *may* have printed an unraisable error, check it
2290            self.assertEqual(len(s.splitlines()), 1)
2291            self.assertTrue(s.startswith("Exception IOError: "), s)
2292            self.assertTrue(s.endswith(" ignored"), s)
2293
2294    # Systematic tests of the text I/O API
2295
2296    def test_basic_io(self):
2297        for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2298            for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
2299                f = self.open(support.TESTFN, "w+", encoding=enc)
2300                f._CHUNK_SIZE = chunksize
2301                self.assertEqual(f.write("abc"), 3)
2302                f.close()
2303                f = self.open(support.TESTFN, "r+", encoding=enc)
2304                f._CHUNK_SIZE = chunksize
2305                self.assertEqual(f.tell(), 0)
2306                self.assertEqual(f.read(), "abc")
2307                cookie = f.tell()
2308                self.assertEqual(f.seek(0), 0)
2309                self.assertEqual(f.read(None), "abc")
2310                f.seek(0)
2311                self.assertEqual(f.read(2), "ab")
2312                self.assertEqual(f.read(1), "c")
2313                self.assertEqual(f.read(1), "")
2314                self.assertEqual(f.read(), "")
2315                self.assertEqual(f.tell(), cookie)
2316                self.assertEqual(f.seek(0), 0)
2317                self.assertEqual(f.seek(0, 2), cookie)
2318                self.assertEqual(f.write("def"), 3)
2319                self.assertEqual(f.seek(cookie), cookie)
2320                self.assertEqual(f.read(), "def")
2321                if enc.startswith("utf"):
2322                    self.multi_line_test(f, enc)
2323                f.close()
2324
2325    def multi_line_test(self, f, enc):
2326        f.seek(0)
2327        f.truncate()
2328        sample = "s\xff\u0fff\uffff"
2329        wlines = []
2330        for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2331            chars = []
2332            for i in range(size):
2333                chars.append(sample[i % len(sample)])
2334            line = "".join(chars) + "\n"
2335            wlines.append((f.tell(), line))
2336            f.write(line)
2337        f.seek(0)
2338        rlines = []
2339        while True:
2340            pos = f.tell()
2341            line = f.readline()
2342            if not line:
2343                break
2344            rlines.append((pos, line))
2345        self.assertEqual(rlines, wlines)
2346
2347    def test_telling(self):
2348        f = self.open(support.TESTFN, "w+", encoding="utf8")
2349        p0 = f.tell()
2350        f.write("\xff\n")
2351        p1 = f.tell()
2352        f.write("\xff\n")
2353        p2 = f.tell()
2354        f.seek(0)
2355        self.assertEqual(f.tell(), p0)
2356        self.assertEqual(f.readline(), "\xff\n")
2357        self.assertEqual(f.tell(), p1)
2358        self.assertEqual(f.readline(), "\xff\n")
2359        self.assertEqual(f.tell(), p2)
2360        f.seek(0)
2361        for line in f:
2362            self.assertEqual(line, "\xff\n")
2363            self.assertRaises(IOError, f.tell)
2364        self.assertEqual(f.tell(), p2)
2365        f.close()
2366
2367    def test_seeking(self):
2368        chunk_size = _default_chunk_size()
2369        prefix_size = chunk_size - 2
2370        u_prefix = "a" * prefix_size
2371        prefix = bytes(u_prefix.encode("utf-8"))
2372        self.assertEqual(len(u_prefix), len(prefix))
2373        u_suffix = "\u8888\n"
2374        suffix = bytes(u_suffix.encode("utf-8"))
2375        line = prefix + suffix
2376        f = self.open(support.TESTFN, "wb")
2377        f.write(line*2)
2378        f.close()
2379        f = self.open(support.TESTFN, "r", encoding="utf-8")
2380        s = f.read(prefix_size)
2381        self.assertEqual(s, prefix.decode("ascii"))
2382        self.assertEqual(f.tell(), prefix_size)
2383        self.assertEqual(f.readline(), u_suffix)
2384
2385    def test_seeking_too(self):
2386        # Regression test for a specific bug
2387        data = b'\xe0\xbf\xbf\n'
2388        f = self.open(support.TESTFN, "wb")
2389        f.write(data)
2390        f.close()
2391        f = self.open(support.TESTFN, "r", encoding="utf-8")
2392        f._CHUNK_SIZE  # Just test that it exists
2393        f._CHUNK_SIZE = 2
2394        f.readline()
2395        f.tell()
2396
2397    def test_seek_and_tell(self):
2398        #Test seek/tell using the StatefulIncrementalDecoder.
2399        # Make test faster by doing smaller seeks
2400        CHUNK_SIZE = 128
2401
2402        def test_seek_and_tell_with_data(data, min_pos=0):
2403            """Tell/seek to various points within a data stream and ensure
2404            that the decoded data returned by read() is consistent."""
2405            f = self.open(support.TESTFN, 'wb')
2406            f.write(data)
2407            f.close()
2408            f = self.open(support.TESTFN, encoding='test_decoder')
2409            f._CHUNK_SIZE = CHUNK_SIZE
2410            decoded = f.read()
2411            f.close()
2412
2413            for i in range(min_pos, len(decoded) + 1): # seek positions
2414                for j in [1, 5, len(decoded) - i]: # read lengths
2415                    f = self.open(support.TESTFN, encoding='test_decoder')
2416                    self.assertEqual(f.read(i), decoded[:i])
2417                    cookie = f.tell()
2418                    self.assertEqual(f.read(j), decoded[i:i + j])
2419                    f.seek(cookie)
2420                    self.assertEqual(f.read(), decoded[i:])
2421                    f.close()
2422
2423        # Enable the test decoder.
2424        StatefulIncrementalDecoder.codecEnabled = 1
2425
2426        # Run the tests.
2427        try:
2428            # Try each test case.
2429            for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2430                test_seek_and_tell_with_data(input)
2431
2432            # Position each test case so that it crosses a chunk boundary.
2433            for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2434                offset = CHUNK_SIZE - len(input)//2
2435                prefix = b'.'*offset
2436                # Don't bother seeking into the prefix (takes too long).
2437                min_pos = offset*2
2438                test_seek_and_tell_with_data(prefix + input, min_pos)
2439
2440        # Ensure our test decoder won't interfere with subsequent tests.
2441        finally:
2442            StatefulIncrementalDecoder.codecEnabled = 0
2443
2444    def test_encoded_writes(self):
2445        data = "1234567890"
2446        tests = ("utf-16",
2447                 "utf-16-le",
2448                 "utf-16-be",
2449                 "utf-32",
2450                 "utf-32-le",
2451                 "utf-32-be")
2452        for encoding in tests:
2453            buf = self.BytesIO()
2454            f = self.TextIOWrapper(buf, encoding=encoding)
2455            # Check if the BOM is written only once (see issue1753).
2456            f.write(data)
2457            f.write(data)
2458            f.seek(0)
2459            self.assertEqual(f.read(), data * 2)
2460            f.seek(0)
2461            self.assertEqual(f.read(), data * 2)
2462            self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
2463
2464    def test_unreadable(self):
2465        class UnReadable(self.BytesIO):
2466            def readable(self):
2467                return False
2468        txt = self.TextIOWrapper(UnReadable())
2469        self.assertRaises(IOError, txt.read)
2470
2471    def test_read_one_by_one(self):
2472        txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
2473        reads = ""
2474        while True:
2475            c = txt.read(1)
2476            if not c:
2477                break
2478            reads += c
2479        self.assertEqual(reads, "AA\nBB")
2480
2481    def test_readlines(self):
2482        txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2483        self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2484        txt.seek(0)
2485        self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2486        txt.seek(0)
2487        self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2488
2489    # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
2490    def test_read_by_chunk(self):
2491        # make sure "\r\n" straddles 128 char boundary.
2492        txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
2493        reads = ""
2494        while True:
2495            c = txt.read(128)
2496            if not c:
2497                break
2498            reads += c
2499        self.assertEqual(reads, "A"*127+"\nB")
2500
2501    def test_writelines(self):
2502        l = ['ab', 'cd', 'ef']
2503        buf = self.BytesIO()
2504        txt = self.TextIOWrapper(buf)
2505        txt.writelines(l)
2506        txt.flush()
2507        self.assertEqual(buf.getvalue(), b'abcdef')
2508
2509    def test_writelines_userlist(self):
2510        l = UserList(['ab', 'cd', 'ef'])
2511        buf = self.BytesIO()
2512        txt = self.TextIOWrapper(buf)
2513        txt.writelines(l)
2514        txt.flush()
2515        self.assertEqual(buf.getvalue(), b'abcdef')
2516
2517    def test_writelines_error(self):
2518        txt = self.TextIOWrapper(self.BytesIO())
2519        self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2520        self.assertRaises(TypeError, txt.writelines, None)
2521        self.assertRaises(TypeError, txt.writelines, b'abc')
2522
2523    def test_issue1395_1(self):
2524        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2525
2526        # read one char at a time
2527        reads = ""
2528        while True:
2529            c = txt.read(1)
2530            if not c:
2531                break
2532            reads += c
2533        self.assertEqual(reads, self.normalized)
2534
2535    def test_issue1395_2(self):
2536        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2537        txt._CHUNK_SIZE = 4
2538
2539        reads = ""
2540        while True:
2541            c = txt.read(4)
2542            if not c:
2543                break
2544            reads += c
2545        self.assertEqual(reads, self.normalized)
2546
2547    def test_issue1395_3(self):
2548        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2549        txt._CHUNK_SIZE = 4
2550
2551        reads = txt.read(4)
2552        reads += txt.read(4)
2553        reads += txt.readline()
2554        reads += txt.readline()
2555        reads += txt.readline()
2556        self.assertEqual(reads, self.normalized)
2557
2558    def test_issue1395_4(self):
2559        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2560        txt._CHUNK_SIZE = 4
2561
2562        reads = txt.read(4)
2563        reads += txt.read()
2564        self.assertEqual(reads, self.normalized)
2565
2566    def test_issue1395_5(self):
2567        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2568        txt._CHUNK_SIZE = 4
2569
2570        reads = txt.read(4)
2571        pos = txt.tell()
2572        txt.seek(0)
2573        txt.seek(pos)
2574        self.assertEqual(txt.read(4), "BBB\n")
2575
2576    def test_issue2282(self):
2577        buffer = self.BytesIO(self.testdata)
2578        txt = self.TextIOWrapper(buffer, encoding="ascii")
2579
2580        self.assertEqual(buffer.seekable(), txt.seekable())
2581
2582    def test_append_bom(self):
2583        # The BOM is not written again when appending to a non-empty file
2584        filename = support.TESTFN
2585        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2586            with self.open(filename, 'w', encoding=charset) as f:
2587                f.write('aaa')
2588                pos = f.tell()
2589            with self.open(filename, 'rb') as f:
2590                self.assertEqual(f.read(), 'aaa'.encode(charset))
2591
2592            with self.open(filename, 'a', encoding=charset) as f:
2593                f.write('xxx')
2594            with self.open(filename, 'rb') as f:
2595                self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
2596
2597    def test_seek_bom(self):
2598        # Same test, but when seeking manually
2599        filename = support.TESTFN
2600        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2601            with self.open(filename, 'w', encoding=charset) as f:
2602                f.write('aaa')
2603                pos = f.tell()
2604            with self.open(filename, 'r+', encoding=charset) as f:
2605                f.seek(pos)
2606                f.write('zzz')
2607                f.seek(0)
2608                f.write('bbb')
2609            with self.open(filename, 'rb') as f:
2610                self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
2611
2612    def test_errors_property(self):
2613        with self.open(support.TESTFN, "w") as f:
2614            self.assertEqual(f.errors, "strict")
2615        with self.open(support.TESTFN, "w", errors="replace") as f:
2616            self.assertEqual(f.errors, "replace")
2617
2618    @unittest.skipUnless(threading, 'Threading required for this test.')
2619    def test_threads_write(self):
2620        # Issue6750: concurrent writes could duplicate data
2621        event = threading.Event()
2622        with self.open(support.TESTFN, "w", buffering=1) as f:
2623            def run(n):
2624                text = "Thread%03d\n" % n
2625                event.wait()
2626                f.write(text)
2627            threads = [threading.Thread(target=run, args=(x,))
2628                       for x in range(20)]
2629            with support.start_threads(threads, event.set):
2630                time.sleep(0.02)
2631        with self.open(support.TESTFN) as f:
2632            content = f.read()
2633            for n in range(20):
2634                self.assertEqual(content.count("Thread%03d\n" % n), 1)
2635
2636    def test_flush_error_on_close(self):
2637        # Test that text file is closed despite failed flush
2638        # and that flush() is called before file closed.
2639        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2640        closed = []
2641        def bad_flush():
2642            closed[:] = [txt.closed, txt.buffer.closed]
2643            raise IOError()
2644        txt.flush = bad_flush
2645        self.assertRaises(IOError, txt.close) # exception not swallowed
2646        self.assertTrue(txt.closed)
2647        self.assertTrue(txt.buffer.closed)
2648        self.assertTrue(closed)      # flush() called
2649        self.assertFalse(closed[0])  # flush() called before file closed
2650        self.assertFalse(closed[1])
2651        txt.flush = lambda: None  # break reference loop
2652
2653    def test_multi_close(self):
2654        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2655        txt.close()
2656        txt.close()
2657        txt.close()
2658        self.assertRaises(ValueError, txt.flush)
2659
2660    def test_readonly_attributes(self):
2661        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2662        buf = self.BytesIO(self.testdata)
2663        with self.assertRaises((AttributeError, TypeError)):
2664            txt.buffer = buf
2665
2666    def test_read_nonbytes(self):
2667        # Issue #17106
2668        # Crash when underlying read() returns non-bytes
2669        class NonbytesStream(self.StringIO):
2670            read1 = self.StringIO.read
2671        class NonbytesStream(self.StringIO):
2672            read1 = self.StringIO.read
2673        t = self.TextIOWrapper(NonbytesStream('a'))
2674        with self.maybeRaises(TypeError):
2675            t.read(1)
2676        t = self.TextIOWrapper(NonbytesStream('a'))
2677        with self.maybeRaises(TypeError):
2678            t.readline()
2679        t = self.TextIOWrapper(NonbytesStream('a'))
2680        self.assertEqual(t.read(), u'a')
2681
2682    def test_illegal_encoder(self):
2683        # bpo-31271: A TypeError should be raised in case the return value of
2684        # encoder's encode() is invalid.
2685        class BadEncoder:
2686            def encode(self, dummy):
2687                return u'spam'
2688        def get_bad_encoder(dummy):
2689            return BadEncoder()
2690        rot13 = codecs.lookup("rot13")
2691        with support.swap_attr(rot13, '_is_text_encoding', True), \
2692             support.swap_attr(rot13, 'incrementalencoder', get_bad_encoder):
2693            t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
2694        with self.assertRaises(TypeError):
2695            t.write('bar')
2696            t.flush()
2697
2698    def test_illegal_decoder(self):
2699        # Issue #17106
2700        # Bypass the early encoding check added in issue 20404
2701        def _make_illegal_wrapper():
2702            quopri = codecs.lookup("quopri_codec")
2703            quopri._is_text_encoding = True
2704            try:
2705                t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
2706                                       newline='\n', encoding="quopri_codec")
2707            finally:
2708                quopri._is_text_encoding = False
2709            return t
2710        # Crash when decoder returns non-string
2711        with support.check_py3k_warnings():
2712            t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2713                                   encoding='quopri_codec')
2714        with self.maybeRaises(TypeError):
2715            t.read(1)
2716        with support.check_py3k_warnings():
2717            t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2718                                   encoding='quopri_codec')
2719        with self.maybeRaises(TypeError):
2720            t.readline()
2721        with support.check_py3k_warnings():
2722            t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2723                                   encoding='quopri_codec')
2724        with self.maybeRaises(TypeError):
2725            t.read()
2726        #else:
2727            #t = _make_illegal_wrapper()
2728            #self.assertRaises(TypeError, t.read, 1)
2729            #t = _make_illegal_wrapper()
2730            #self.assertRaises(TypeError, t.readline)
2731            #t = _make_illegal_wrapper()
2732            #self.assertRaises(TypeError, t.read)
2733
2734        # Issue 31243: calling read() while the return value of decoder's
2735        # getstate() is invalid should neither crash the interpreter nor
2736        # raise a SystemError.
2737        def _make_very_illegal_wrapper(getstate_ret_val):
2738            class BadDecoder:
2739                def getstate(self):
2740                    return getstate_ret_val
2741            def _get_bad_decoder(dummy):
2742                return BadDecoder()
2743            quopri = codecs.lookup("quopri_codec")
2744            with support.swap_attr(quopri, 'incrementaldecoder',
2745                                   _get_bad_decoder):
2746                return _make_illegal_wrapper()
2747        t = _make_very_illegal_wrapper(42)
2748        with self.maybeRaises(TypeError):
2749            t.read(42)
2750        t = _make_very_illegal_wrapper(())
2751        with self.maybeRaises(TypeError):
2752            t.read(42)
2753
2754    def test_issue25862(self):
2755        # Assertion failures occurred in tell() after read() and write().
2756        t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
2757        t.read(1)
2758        t.read()
2759        t.tell()
2760        t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
2761        t.read(1)
2762        t.write('x')
2763        t.tell()
2764
2765
2766class CTextIOWrapperTest(TextIOWrapperTest):
2767
2768    def test_initialization(self):
2769        r = self.BytesIO(b"\xc3\xa9\n\n")
2770        b = self.BufferedReader(r, 1000)
2771        t = self.TextIOWrapper(b)
2772        self.assertRaises(TypeError, t.__init__, b, newline=42)
2773        self.assertRaises(ValueError, t.read)
2774        self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2775        self.assertRaises(ValueError, t.read)
2776
2777        t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2778        self.assertRaises(Exception, repr, t)
2779
2780    def test_garbage_collection(self):
2781        # C TextIOWrapper objects are collected, and collecting them flushes
2782        # all data to disk.
2783        # The Python version has __del__, so it ends in gc.garbage instead.
2784        rawio = io.FileIO(support.TESTFN, "wb")
2785        b = self.BufferedWriter(rawio)
2786        t = self.TextIOWrapper(b, encoding="ascii")
2787        t.write("456def")
2788        t.x = t
2789        wr = weakref.ref(t)
2790        del t
2791        support.gc_collect()
2792        self.assertIsNone(wr(), wr)
2793        with self.open(support.TESTFN, "rb") as f:
2794            self.assertEqual(f.read(), b"456def")
2795
2796    def test_rwpair_cleared_before_textio(self):
2797        # Issue 13070: TextIOWrapper's finalization would crash when called
2798        # after the reference to the underlying BufferedRWPair's writer got
2799        # cleared by the GC.
2800        for i in range(1000):
2801            b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2802            t1 = self.TextIOWrapper(b1, encoding="ascii")
2803            b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2804            t2 = self.TextIOWrapper(b2, encoding="ascii")
2805            # circular references
2806            t1.buddy = t2
2807            t2.buddy = t1
2808        support.gc_collect()
2809
2810    def test_del__CHUNK_SIZE_SystemError(self):
2811        t = self.TextIOWrapper(self.BytesIO(), encoding='ascii')
2812        with self.assertRaises(AttributeError):
2813            del t._CHUNK_SIZE
2814
2815    maybeRaises = unittest.TestCase.assertRaises
2816
2817
2818class PyTextIOWrapperTest(TextIOWrapperTest):
2819    @contextlib.contextmanager
2820    def maybeRaises(self, *args, **kwds):
2821        yield
2822
2823
2824class IncrementalNewlineDecoderTest(unittest.TestCase):
2825
2826    def check_newline_decoding_utf8(self, decoder):
2827        # UTF-8 specific tests for a newline decoder
2828        def _check_decode(b, s, **kwargs):
2829            # We exercise getstate() / setstate() as well as decode()
2830            state = decoder.getstate()
2831            self.assertEqual(decoder.decode(b, **kwargs), s)
2832            decoder.setstate(state)
2833            self.assertEqual(decoder.decode(b, **kwargs), s)
2834
2835        _check_decode(b'\xe8\xa2\x88', "\u8888")
2836
2837        _check_decode(b'\xe8', "")
2838        _check_decode(b'\xa2', "")
2839        _check_decode(b'\x88', "\u8888")
2840
2841        _check_decode(b'\xe8', "")
2842        _check_decode(b'\xa2', "")
2843        _check_decode(b'\x88', "\u8888")
2844
2845        _check_decode(b'\xe8', "")
2846        self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2847
2848        decoder.reset()
2849        _check_decode(b'\n', "\n")
2850        _check_decode(b'\r', "")
2851        _check_decode(b'', "\n", final=True)
2852        _check_decode(b'\r', "\n", final=True)
2853
2854        _check_decode(b'\r', "")
2855        _check_decode(b'a', "\na")
2856
2857        _check_decode(b'\r\r\n', "\n\n")
2858        _check_decode(b'\r', "")
2859        _check_decode(b'\r', "\n")
2860        _check_decode(b'\na', "\na")
2861
2862        _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2863        _check_decode(b'\xe8\xa2\x88', "\u8888")
2864        _check_decode(b'\n', "\n")
2865        _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2866        _check_decode(b'\n', "\n")
2867
2868    def check_newline_decoding(self, decoder, encoding):
2869        result = []
2870        if encoding is not None:
2871            encoder = codecs.getincrementalencoder(encoding)()
2872            def _decode_bytewise(s):
2873                # Decode one byte at a time
2874                for b in encoder.encode(s):
2875                    result.append(decoder.decode(b))
2876        else:
2877            encoder = None
2878            def _decode_bytewise(s):
2879                # Decode one char at a time
2880                for c in s:
2881                    result.append(decoder.decode(c))
2882        self.assertEqual(decoder.newlines, None)
2883        _decode_bytewise("abc\n\r")
2884        self.assertEqual(decoder.newlines, '\n')
2885        _decode_bytewise("\nabc")
2886        self.assertEqual(decoder.newlines, ('\n', '\r\n'))
2887        _decode_bytewise("abc\r")
2888        self.assertEqual(decoder.newlines, ('\n', '\r\n'))
2889        _decode_bytewise("abc")
2890        self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
2891        _decode_bytewise("abc\r")
2892        self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
2893        decoder.reset()
2894        input = "abc"
2895        if encoder is not None:
2896            encoder.reset()
2897            input = encoder.encode(input)
2898        self.assertEqual(decoder.decode(input), "abc")
2899        self.assertEqual(decoder.newlines, None)
2900
2901    def test_newline_decoder(self):
2902        encodings = (
2903            # None meaning the IncrementalNewlineDecoder takes unicode input
2904            # rather than bytes input
2905            None, 'utf-8', 'latin-1',
2906            'utf-16', 'utf-16-le', 'utf-16-be',
2907            'utf-32', 'utf-32-le', 'utf-32-be',
2908        )
2909        for enc in encodings:
2910            decoder = enc and codecs.getincrementaldecoder(enc)()
2911            decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2912            self.check_newline_decoding(decoder, enc)
2913        decoder = codecs.getincrementaldecoder("utf-8")()
2914        decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2915        self.check_newline_decoding_utf8(decoder)
2916
2917    def test_newline_bytes(self):
2918        # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2919        def _check(dec):
2920            self.assertEqual(dec.newlines, None)
2921            self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2922            self.assertEqual(dec.newlines, None)
2923            self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2924            self.assertEqual(dec.newlines, None)
2925        dec = self.IncrementalNewlineDecoder(None, translate=False)
2926        _check(dec)
2927        dec = self.IncrementalNewlineDecoder(None, translate=True)
2928        _check(dec)
2929
2930    def test_translate(self):
2931        # issue 35062
2932        for translate in (-2, -1, 1, 2):
2933            decoder = codecs.getincrementaldecoder("utf-8")()
2934            decoder = self.IncrementalNewlineDecoder(decoder, translate)
2935            self.check_newline_decoding_utf8(decoder)
2936        decoder = codecs.getincrementaldecoder("utf-8")()
2937        decoder = self.IncrementalNewlineDecoder(decoder, translate=0)
2938        self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n")
2939
2940class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2941    pass
2942
2943class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2944    pass
2945
2946
2947# XXX Tests for open()
2948
2949class MiscIOTest(unittest.TestCase):
2950
2951    def tearDown(self):
2952        support.unlink(support.TESTFN)
2953
2954    def test___all__(self):
2955        for name in self.io.__all__:
2956            obj = getattr(self.io, name, None)
2957            self.assertIsNotNone(obj, name)
2958            if name == "open":
2959                continue
2960            elif "error" in name.lower() or name == "UnsupportedOperation":
2961                self.assertTrue(issubclass(obj, Exception), name)
2962            elif not name.startswith("SEEK_"):
2963                self.assertTrue(issubclass(obj, self.IOBase))
2964
2965    def test_attributes(self):
2966        f = self.open(support.TESTFN, "wb", buffering=0)
2967        self.assertEqual(f.mode, "wb")
2968        f.close()
2969
2970        f = self.open(support.TESTFN, "U")
2971        self.assertEqual(f.name,            support.TESTFN)
2972        self.assertEqual(f.buffer.name,     support.TESTFN)
2973        self.assertEqual(f.buffer.raw.name, support.TESTFN)
2974        self.assertEqual(f.mode,            "U")
2975        self.assertEqual(f.buffer.mode,     "rb")
2976        self.assertEqual(f.buffer.raw.mode, "rb")
2977        f.close()
2978
2979        f = self.open(support.TESTFN, "w+")
2980        self.assertEqual(f.mode,            "w+")
2981        self.assertEqual(f.buffer.mode,     "rb+") # Does it really matter?
2982        self.assertEqual(f.buffer.raw.mode, "rb+")
2983
2984        g = self.open(f.fileno(), "wb", closefd=False)
2985        self.assertEqual(g.mode,     "wb")
2986        self.assertEqual(g.raw.mode, "wb")
2987        self.assertEqual(g.name,     f.fileno())
2988        self.assertEqual(g.raw.name, f.fileno())
2989        f.close()
2990        g.close()
2991
2992    def test_io_after_close(self):
2993        for kwargs in [
2994                {"mode": "w"},
2995                {"mode": "wb"},
2996                {"mode": "w", "buffering": 1},
2997                {"mode": "w", "buffering": 2},
2998                {"mode": "wb", "buffering": 0},
2999                {"mode": "r"},
3000                {"mode": "rb"},
3001                {"mode": "r", "buffering": 1},
3002                {"mode": "r", "buffering": 2},
3003                {"mode": "rb", "buffering": 0},
3004                {"mode": "w+"},
3005                {"mode": "w+b"},
3006                {"mode": "w+", "buffering": 1},
3007                {"mode": "w+", "buffering": 2},
3008                {"mode": "w+b", "buffering": 0},
3009            ]:
3010            f = self.open(support.TESTFN, **kwargs)
3011            f.close()
3012            self.assertRaises(ValueError, f.flush)
3013            self.assertRaises(ValueError, f.fileno)
3014            self.assertRaises(ValueError, f.isatty)
3015            self.assertRaises(ValueError, f.__iter__)
3016            if hasattr(f, "peek"):
3017                self.assertRaises(ValueError, f.peek, 1)
3018            self.assertRaises(ValueError, f.read)
3019            if hasattr(f, "read1"):
3020                self.assertRaises(ValueError, f.read1, 1024)
3021            if hasattr(f, "readall"):
3022                self.assertRaises(ValueError, f.readall)
3023            if hasattr(f, "readinto"):
3024                self.assertRaises(ValueError, f.readinto, bytearray(1024))
3025            self.assertRaises(ValueError, f.readline)
3026            self.assertRaises(ValueError, f.readlines)
3027            self.assertRaises(ValueError, f.readlines, 1)
3028            self.assertRaises(ValueError, f.seek, 0)
3029            self.assertRaises(ValueError, f.tell)
3030            self.assertRaises(ValueError, f.truncate)
3031            self.assertRaises(ValueError, f.write,
3032                              b"" if "b" in kwargs['mode'] else "")
3033            self.assertRaises(ValueError, f.writelines, [])
3034            self.assertRaises(ValueError, next, f)
3035
3036    def test_blockingioerror(self):
3037        # Various BlockingIOError issues
3038        self.assertRaises(TypeError, self.BlockingIOError)
3039        self.assertRaises(TypeError, self.BlockingIOError, 1)
3040        self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
3041        self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
3042        b = self.BlockingIOError(1, "")
3043        self.assertEqual(b.characters_written, 0)
3044        class C(unicode):
3045            pass
3046        c = C("")
3047        b = self.BlockingIOError(1, c)
3048        c.b = b
3049        b.c = c
3050        wr = weakref.ref(c)
3051        del c, b
3052        support.gc_collect()
3053        self.assertIsNone(wr(), wr)
3054
3055    def test_abcs(self):
3056        # Test the visible base classes are ABCs.
3057        self.assertIsInstance(self.IOBase, abc.ABCMeta)
3058        self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3059        self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3060        self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
3061
3062    def _check_abc_inheritance(self, abcmodule):
3063        with self.open(support.TESTFN, "wb", buffering=0) as f:
3064            self.assertIsInstance(f, abcmodule.IOBase)
3065            self.assertIsInstance(f, abcmodule.RawIOBase)
3066            self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3067            self.assertNotIsInstance(f, abcmodule.TextIOBase)
3068        with self.open(support.TESTFN, "wb") as f:
3069            self.assertIsInstance(f, abcmodule.IOBase)
3070            self.assertNotIsInstance(f, abcmodule.RawIOBase)
3071            self.assertIsInstance(f, abcmodule.BufferedIOBase)
3072            self.assertNotIsInstance(f, abcmodule.TextIOBase)
3073        with self.open(support.TESTFN, "w") as f:
3074            self.assertIsInstance(f, abcmodule.IOBase)
3075            self.assertNotIsInstance(f, abcmodule.RawIOBase)
3076            self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3077            self.assertIsInstance(f, abcmodule.TextIOBase)
3078
3079    def test_abc_inheritance(self):
3080        # Test implementations inherit from their respective ABCs
3081        self._check_abc_inheritance(self)
3082
3083    def test_abc_inheritance_official(self):
3084        # Test implementations inherit from the official ABCs of the
3085        # baseline "io" module.
3086        self._check_abc_inheritance(io)
3087
3088    @unittest.skipUnless(fcntl, 'fcntl required for this test')
3089    def test_nonblock_pipe_write_bigbuf(self):
3090        self._test_nonblock_pipe_write(16*1024)
3091
3092    @unittest.skipUnless(fcntl, 'fcntl required for this test')
3093    def test_nonblock_pipe_write_smallbuf(self):
3094        self._test_nonblock_pipe_write(1024)
3095
3096    def _set_non_blocking(self, fd):
3097        flags = fcntl.fcntl(fd, fcntl.F_GETFL)
3098        self.assertNotEqual(flags, -1)
3099        res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
3100        self.assertEqual(res, 0)
3101
3102    def _test_nonblock_pipe_write(self, bufsize):
3103        sent = []
3104        received = []
3105        r, w = os.pipe()
3106        self._set_non_blocking(r)
3107        self._set_non_blocking(w)
3108
3109        # To exercise all code paths in the C implementation we need
3110        # to play with buffer sizes.  For instance, if we choose a
3111        # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3112        # then we will never get a partial write of the buffer.
3113        rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3114        wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3115
3116        with rf, wf:
3117            for N in 9999, 73, 7574:
3118                try:
3119                    i = 0
3120                    while True:
3121                        msg = bytes([i % 26 + 97]) * N
3122                        sent.append(msg)
3123                        wf.write(msg)
3124                        i += 1
3125
3126                except self.BlockingIOError as e:
3127                    self.assertEqual(e.args[0], errno.EAGAIN)
3128                    sent[-1] = sent[-1][:e.characters_written]
3129                    received.append(rf.read())
3130                    msg = b'BLOCKED'
3131                    wf.write(msg)
3132                    sent.append(msg)
3133
3134            while True:
3135                try:
3136                    wf.flush()
3137                    break
3138                except self.BlockingIOError as e:
3139                    self.assertEqual(e.args[0], errno.EAGAIN)
3140                    self.assertEqual(e.characters_written, 0)
3141                    received.append(rf.read())
3142
3143            received += iter(rf.read, None)
3144
3145        sent, received = b''.join(sent), b''.join(received)
3146        self.assertEqual(sent, received)
3147        self.assertTrue(wf.closed)
3148        self.assertTrue(rf.closed)
3149
3150class CMiscIOTest(MiscIOTest):
3151    io = io
3152    shutdown_error = "RuntimeError: could not find io module state"
3153
3154class PyMiscIOTest(MiscIOTest):
3155    io = pyio
3156    shutdown_error = "LookupError: unknown encoding: ascii"
3157
3158
3159@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3160class SignalsTest(unittest.TestCase):
3161
3162    def setUp(self):
3163        self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3164
3165    def tearDown(self):
3166        signal.signal(signal.SIGALRM, self.oldalrm)
3167
3168    def alarm_interrupt(self, sig, frame):
3169        1 // 0
3170
3171    @unittest.skipUnless(threading, 'Threading required for this test.')
3172    @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
3173                     'issue #12429: skip test on FreeBSD <= 7')
3174    def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3175        """Check that a partial write, when it gets interrupted, properly
3176        invokes the signal handler, and bubbles up the exception raised
3177        in the latter."""
3178        read_results = []
3179        def _read():
3180            s = os.read(r, 1)
3181            read_results.append(s)
3182        t = threading.Thread(target=_read)
3183        t.daemon = True
3184        r, w = os.pipe()
3185        try:
3186            wio = self.io.open(w, **fdopen_kwargs)
3187            t.start()
3188            # Fill the pipe enough that the write will be blocking.
3189            # It will be interrupted by the timer armed above.  Since the
3190            # other thread has read one byte, the low-level write will
3191            # return with a successful (partial) result rather than an EINTR.
3192            # The buffered IO layer must check for pending signal
3193            # handlers, which in this case will invoke alarm_interrupt().
3194            try:
3195                signal.alarm(1)
3196                with self.assertRaises(ZeroDivisionError):
3197                    wio.write(item * (support.PIPE_MAX_SIZE // len(item) + 1))
3198            finally:
3199                signal.alarm(0)
3200                t.join()
3201
3202            # We got one byte, get another one and check that it isn't a
3203            # repeat of the first one.
3204            read_results.append(os.read(r, 1))
3205            self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3206        finally:
3207            os.close(w)
3208            os.close(r)
3209            # This is deliberate. If we didn't close the file descriptor
3210            # before closing wio, wio would try to flush its internal
3211            # buffer, and block again.
3212            try:
3213                wio.close()
3214            except IOError as e:
3215                if e.errno != errno.EBADF:
3216                    raise
3217
3218    def test_interrupted_write_unbuffered(self):
3219        self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3220
3221    def test_interrupted_write_buffered(self):
3222        self.check_interrupted_write(b"xy", b"xy", mode="wb")
3223
3224    def test_interrupted_write_text(self):
3225        self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3226
3227    def check_reentrant_write(self, data, **fdopen_kwargs):
3228        def on_alarm(*args):
3229            # Will be called reentrantly from the same thread
3230            wio.write(data)
3231            1//0
3232        signal.signal(signal.SIGALRM, on_alarm)
3233        r, w = os.pipe()
3234        wio = self.io.open(w, **fdopen_kwargs)
3235        try:
3236            signal.alarm(1)
3237            # Either the reentrant call to wio.write() fails with RuntimeError,
3238            # or the signal handler raises ZeroDivisionError.
3239            with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3240                while 1:
3241                    for i in range(100):
3242                        wio.write(data)
3243                        wio.flush()
3244                    # Make sure the buffer doesn't fill up and block further writes
3245                    os.read(r, len(data) * 100)
3246            exc = cm.exception
3247            if isinstance(exc, RuntimeError):
3248                self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3249        finally:
3250            signal.alarm(0)
3251            wio.close()
3252            os.close(r)
3253
3254    def test_reentrant_write_buffered(self):
3255        self.check_reentrant_write(b"xy", mode="wb")
3256
3257    def test_reentrant_write_text(self):
3258        self.check_reentrant_write("xy", mode="w", encoding="ascii")
3259
3260    def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3261        """Check that a buffered read, when it gets interrupted (either
3262        returning a partial result or EINTR), properly invokes the signal
3263        handler and retries if the latter returned successfully."""
3264        r, w = os.pipe()
3265        fdopen_kwargs["closefd"] = False
3266        def alarm_handler(sig, frame):
3267            os.write(w, b"bar")
3268        signal.signal(signal.SIGALRM, alarm_handler)
3269        try:
3270            rio = self.io.open(r, **fdopen_kwargs)
3271            os.write(w, b"foo")
3272            signal.alarm(1)
3273            # Expected behaviour:
3274            # - first raw read() returns partial b"foo"
3275            # - second raw read() returns EINTR
3276            # - third raw read() returns b"bar"
3277            self.assertEqual(decode(rio.read(6)), "foobar")
3278        finally:
3279            signal.alarm(0)
3280            rio.close()
3281            os.close(w)
3282            os.close(r)
3283
3284    def test_interrupterd_read_retry_buffered(self):
3285        self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3286                                          mode="rb")
3287
3288    def test_interrupterd_read_retry_text(self):
3289        self.check_interrupted_read_retry(lambda x: x,
3290                                          mode="r")
3291
3292    @unittest.skipUnless(threading, 'Threading required for this test.')
3293    def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3294        """Check that a buffered write, when it gets interrupted (either
3295        returning a partial result or EINTR), properly invokes the signal
3296        handler and retries if the latter returned successfully."""
3297        select = support.import_module("select")
3298        # A quantity that exceeds the buffer size of an anonymous pipe's
3299        # write end.
3300        N = support.PIPE_MAX_SIZE
3301        r, w = os.pipe()
3302        fdopen_kwargs["closefd"] = False
3303        # We need a separate thread to read from the pipe and allow the
3304        # write() to finish.  This thread is started after the SIGALRM is
3305        # received (forcing a first EINTR in write()).
3306        read_results = []
3307        write_finished = False
3308        error = [None]
3309        def _read():
3310            try:
3311                while not write_finished:
3312                    while r in select.select([r], [], [], 1.0)[0]:
3313                        s = os.read(r, 1024)
3314                        read_results.append(s)
3315            except BaseException as exc:
3316                error[0] = exc
3317        t = threading.Thread(target=_read)
3318        t.daemon = True
3319        def alarm1(sig, frame):
3320            signal.signal(signal.SIGALRM, alarm2)
3321            signal.alarm(1)
3322        def alarm2(sig, frame):
3323            t.start()
3324        signal.signal(signal.SIGALRM, alarm1)
3325        try:
3326            wio = self.io.open(w, **fdopen_kwargs)
3327            signal.alarm(1)
3328            # Expected behaviour:
3329            # - first raw write() is partial (because of the limited pipe buffer
3330            #   and the first alarm)
3331            # - second raw write() returns EINTR (because of the second alarm)
3332            # - subsequent write()s are successful (either partial or complete)
3333            self.assertEqual(N, wio.write(item * N))
3334            wio.flush()
3335            write_finished = True
3336            t.join()
3337
3338            self.assertIsNone(error[0])
3339            self.assertEqual(N, sum(len(x) for x in read_results))
3340        finally:
3341            signal.alarm(0)
3342            write_finished = True
3343            os.close(w)
3344            os.close(r)
3345            # This is deliberate. If we didn't close the file descriptor
3346            # before closing wio, wio would try to flush its internal
3347            # buffer, and could block (in case of failure).
3348            try:
3349                wio.close()
3350            except IOError as e:
3351                if e.errno != errno.EBADF:
3352                    raise
3353
3354    def test_interrupterd_write_retry_buffered(self):
3355        self.check_interrupted_write_retry(b"x", mode="wb")
3356
3357    def test_interrupterd_write_retry_text(self):
3358        self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3359
3360
3361class CSignalsTest(SignalsTest):
3362    io = io
3363
3364class PySignalsTest(SignalsTest):
3365    io = pyio
3366
3367    # Handling reentrancy issues would slow down _pyio even more, so the
3368    # tests are disabled.
3369    test_reentrant_write_buffered = None
3370    test_reentrant_write_text = None
3371
3372
3373def test_main():
3374    tests = (CIOTest, PyIOTest,
3375             CBufferedReaderTest, PyBufferedReaderTest,
3376             CBufferedWriterTest, PyBufferedWriterTest,
3377             CBufferedRWPairTest, PyBufferedRWPairTest,
3378             CBufferedRandomTest, PyBufferedRandomTest,
3379             StatefulIncrementalDecoderTest,
3380             CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3381             CTextIOWrapperTest, PyTextIOWrapperTest,
3382             CMiscIOTest, PyMiscIOTest,
3383             CSignalsTest, PySignalsTest,
3384             )
3385
3386    # Put the namespaces of the IO module we are testing and some useful mock
3387    # classes in the __dict__ of each test.
3388    mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
3389             MockNonBlockWriterIO, MockRawIOWithoutRead)
3390    all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3391    c_io_ns = dict((name, getattr(io, name)) for name in all_members)
3392    py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
3393    globs = globals()
3394    c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3395    py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3396    # Avoid turning open into a bound method.
3397    py_io_ns["open"] = pyio.OpenWrapper
3398    for test in tests:
3399        if test.__name__.startswith("C"):
3400            for name, obj in c_io_ns.items():
3401                setattr(test, name, obj)
3402        elif test.__name__.startswith("Py"):
3403            for name, obj in py_io_ns.items():
3404                setattr(test, name, obj)
3405
3406    support.run_unittest(*tests)
3407
3408if __name__ == "__main__":
3409    test_main()
3410