1"""Test largefile support on system where this makes sense.
4import os
5import stat
6import sys
7import unittest
8import socket
9import shutil
10import threading
11from test.support import requires, bigmemtest
12from test.support import SHORT_TIMEOUT
13from test.support import socket_helper
14from test.support.os_helper import TESTFN, unlink
15import io  # C implementation of io
16import _pyio as pyio # Python implementation of io
18# size of file to create (>2 GiB; 2 GiB == 2,147,483,648 bytes)
19size = 2_500_000_000
20TESTFN2 = TESTFN + '2'
23class LargeFileTest:
25    def setUp(self):
26        if os.path.exists(TESTFN):
27            mode = 'r+b'
28        else:
29            mode = 'w+b'
31        with self.open(TESTFN, mode) as f:
32            current_size = os.fstat(f.fileno())[stat.ST_SIZE]
33            if current_size == size+1:
34                return
36            if current_size == 0:
37                f.write(b'z')
39            f.seek(0)
40            f.seek(size)
41            f.write(b'a')
42            f.flush()
43            self.assertEqual(os.fstat(f.fileno())[stat.ST_SIZE], size+1)
45    @classmethod
46    def tearDownClass(cls):
47        with cls.open(TESTFN, 'wb'):
48            pass
49        if not os.stat(TESTFN)[stat.ST_SIZE] == 0:
50            raise cls.failureException('File was not truncated by opening '
51                                       'with mode "wb"')
52        unlink(TESTFN2)
55class TestFileMethods(LargeFileTest):
56    """Test that each file function works as expected for large
57    (i.e. > 2 GiB) files.
58    """
60    # _pyio.FileIO.readall() uses a temporary bytearray then casted to bytes,
61    # so memuse=2 is needed
62    @bigmemtest(size=size, memuse=2, dry_run=False)
63    def test_large_read(self, _size):
64        # bpo-24658: Test that a read greater than 2GB does not fail.
65        with self.open(TESTFN, "rb") as f:
66            self.assertEqual(len(f.read()), size + 1)
67            self.assertEqual(f.tell(), size + 1)
69    def test_osstat(self):
70        self.assertEqual(os.stat(TESTFN)[stat.ST_SIZE], size+1)
72    def test_seek_read(self):
73        with self.open(TESTFN, 'rb') as f:
74            self.assertEqual(f.tell(), 0)
75            self.assertEqual(f.read(1), b'z')
76            self.assertEqual(f.tell(), 1)
77            f.seek(0)
78            self.assertEqual(f.tell(), 0)
79            f.seek(0, 0)
80            self.assertEqual(f.tell(), 0)
81            f.seek(42)
82            self.assertEqual(f.tell(), 42)
83            f.seek(42, 0)
84            self.assertEqual(f.tell(), 42)
85            f.seek(42, 1)
86            self.assertEqual(f.tell(), 84)
87            f.seek(0, 1)
88            self.assertEqual(f.tell(), 84)
89            f.seek(0, 2)  # seek from the end
90            self.assertEqual(f.tell(), size + 1 + 0)
91            f.seek(-10, 2)
92            self.assertEqual(f.tell(), size + 1 - 10)
93            f.seek(-size-1, 2)
94            self.assertEqual(f.tell(), 0)
95            f.seek(size)
96            self.assertEqual(f.tell(), size)
97            # the 'a' that was written at the end of file above
98            self.assertEqual(f.read(1), b'a')
99            f.seek(-size-1, 1)
100            self.assertEqual(f.read(1), b'z')
101            self.assertEqual(f.tell(), 1)
103    def test_lseek(self):
104        with self.open(TESTFN, 'rb') as f:
105            self.assertEqual(os.lseek(f.fileno(), 0, 0), 0)
106            self.assertEqual(os.lseek(f.fileno(), 42, 0), 42)
107            self.assertEqual(os.lseek(f.fileno(), 42, 1), 84)
108            self.assertEqual(os.lseek(f.fileno(), 0, 1), 84)
109            self.assertEqual(os.lseek(f.fileno(), 0, 2), size+1+0)
110            self.assertEqual(os.lseek(f.fileno(), -10, 2), size+1-10)
111            self.assertEqual(os.lseek(f.fileno(), -size-1, 2), 0)
112            self.assertEqual(os.lseek(f.fileno(), size, 0), size)
113            # the 'a' that was written at the end of file above
114            self.assertEqual(f.read(1), b'a')
116    def test_truncate(self):
117        with self.open(TESTFN, 'r+b') as f:
118            if not hasattr(f, 'truncate'):
119                raise unittest.SkipTest("open().truncate() not available "
120                                        "on this system")
121            f.seek(0, 2)
122            # else we've lost track of the true size
123            self.assertEqual(f.tell(), size+1)
124            # Cut it back via seek + truncate with no argument.
125            newsize = size - 10
126            f.seek(newsize)
127            f.truncate()
128            self.assertEqual(f.tell(), newsize)  # else pointer moved
129            f.seek(0, 2)
130            self.assertEqual(f.tell(), newsize)  # else wasn't truncated
131            # Ensure that truncate(smaller than true size) shrinks
132            # the file.
133            newsize -= 1
134            f.seek(42)
135            f.truncate(newsize)
136            self.assertEqual(f.tell(), 42)
137            f.seek(0, 2)
138            self.assertEqual(f.tell(), newsize)
139            # XXX truncate(larger than true size) is ill-defined
140            # across platform; cut it waaaaay back
141            f.seek(0)
142            f.truncate(1)
143            self.assertEqual(f.tell(), 0)       # else pointer moved
144            f.seek(0)
145            self.assertEqual(len(f.read()), 1)  # else wasn't truncated
147    def test_seekable(self):
148        # Issue #5016; seekable() can return False when the current position
149        # is negative when truncated to an int.
150        for pos in (2**31-1, 2**31, 2**31+1):
151            with self.open(TESTFN, 'rb') as f:
152                f.seek(pos)
153                self.assertTrue(f.seekable())
156def skip_no_disk_space(path, required):
157    def decorator(fun):
158        def wrapper(*args, **kwargs):
159            if shutil.disk_usage(os.path.realpath(path)).free < required:
160                hsize = int(required / 1024 / 1024)
161                raise unittest.SkipTest(
162                    f"required {hsize} MiB of free disk space")
163            return fun(*args, **kwargs)
164        return wrapper
165    return decorator
168class TestCopyfile(LargeFileTest, unittest.TestCase):
169    open = staticmethod(io.open)
171    # Exact required disk space would be (size * 2), but let's give it a
172    # bit more tolerance.
173    @skip_no_disk_space(TESTFN, size * 2.5)
174    def test_it(self):
175        # Internally shutil.copyfile() can use "fast copy" methods like
176        # os.sendfile().
177        size = os.path.getsize(TESTFN)
178        shutil.copyfile(TESTFN, TESTFN2)
179        self.assertEqual(os.path.getsize(TESTFN2), size)
180        with open(TESTFN2, 'rb') as f:
181            self.assertEqual(f.read(5), b'z\x00\x00\x00\x00')
182            f.seek(size - 5)
183            self.assertEqual(f.read(), b'\x00\x00\x00\x00a')
186@unittest.skipIf(not hasattr(os, 'sendfile'), 'sendfile not supported')
187class TestSocketSendfile(LargeFileTest, unittest.TestCase):
188    open = staticmethod(io.open)
189    timeout = SHORT_TIMEOUT
191    def setUp(self):
192        super().setUp()
193        self.thread = None
195    def tearDown(self):
196        super().tearDown()
197        if self.thread is not None:
198            self.thread.join(self.timeout)
199            self.thread = None
201    def tcp_server(self, sock):
202        def run(sock):
203            with sock:
204                conn, _ = sock.accept()
205                conn.settimeout(self.timeout)
206                with conn, open(TESTFN2, 'wb') as f:
207                    event.wait(self.timeout)
208                    while True:
209                        chunk = conn.recv(65536)
210                        if not chunk:
211                            return
212                        f.write(chunk)
214        event = threading.Event()
215        sock.settimeout(self.timeout)
216        self.thread = threading.Thread(target=run, args=(sock, ))
217        self.thread.start()
218        event.set()
220    # Exact required disk space would be (size * 2), but let's give it a
221    # bit more tolerance.
222    @skip_no_disk_space(TESTFN, size * 2.5)
223    def test_it(self):
224        port = socket_helper.find_unused_port()
225        with socket.create_server(("", port)) as sock:
226            self.tcp_server(sock)
227            with socket.create_connection(("", port)) as client:
228                with open(TESTFN, 'rb') as f:
229                    client.sendfile(f)
230        self.tearDown()
232        size = os.path.getsize(TESTFN)
233        self.assertEqual(os.path.getsize(TESTFN2), size)
234        with open(TESTFN2, 'rb') as f:
235            self.assertEqual(f.read(5), b'z\x00\x00\x00\x00')
236            f.seek(size - 5)
237            self.assertEqual(f.read(), b'\x00\x00\x00\x00a')
240def setUpModule():
241    try:
242        import signal
243        # The default handler for SIGXFSZ is to abort the process.
244        # By ignoring it, system calls exceeding the file size resource
245        # limit will raise OSError instead of crashing the interpreter.
246        signal.signal(signal.SIGXFSZ, signal.SIG_IGN)
247    except (ImportError, AttributeError):
248        pass
250    # On Windows and Mac OSX this test consumes large resources; It
251    # takes a long time to build the >2 GiB file and takes >2 GiB of disk
252    # space therefore the resource must be enabled to run this test.
253    # If not, nothing after this line stanza will be executed.
254    if sys.platform[:3] == 'win' or sys.platform == 'darwin':
255        requires('largefile',
256                 'test requires %s bytes and a long time to run' % str(size))
257    else:
258        # Only run if the current filesystem supports large files.
259        # (Skip this test on Windows, since we now always support
260        # large files.)
261        f = open(TESTFN, 'wb', buffering=0)
262        try:
263            # 2**31 == 2147483648
264            f.seek(2147483649)
265            # Seeking is not enough of a test: you must write and flush, too!
266            f.write(b'x')
267            f.flush()
268        except (OSError, OverflowError):
269            raise unittest.SkipTest("filesystem does not have "
270                                    "largefile support")
271        finally:
272            f.close()
273            unlink(TESTFN)
276class CLargeFileTest(TestFileMethods, unittest.TestCase):
277    open = staticmethod(io.open)
280class PyLargeFileTest(TestFileMethods, unittest.TestCase):
281    open = staticmethod(pyio.open)
284def tearDownModule():
285    unlink(TESTFN)
286    unlink(TESTFN2)
289if __name__ == '__main__':
290    unittest.main()