1import contextlib
2import sys
3import unittest
4from test import support
5from test.support import import_helper
6from test.support import os_helper
7import time
8
9resource = import_helper.import_module('resource')
10
11# This test is checking a few specific problem spots with the resource module.
12
13class ResourceTest(unittest.TestCase):
14
15    def test_args(self):
16        self.assertRaises(TypeError, resource.getrlimit)
17        self.assertRaises(TypeError, resource.getrlimit, 42, 42)
18        self.assertRaises(TypeError, resource.setrlimit)
19        self.assertRaises(TypeError, resource.setrlimit, 42, 42, 42)
20
21    @unittest.skipIf(sys.platform == "vxworks",
22                     "setting RLIMIT_FSIZE is not supported on VxWorks")
23    def test_fsize_ismax(self):
24        try:
25            (cur, max) = resource.getrlimit(resource.RLIMIT_FSIZE)
26        except AttributeError:
27            pass
28        else:
29            # RLIMIT_FSIZE should be RLIM_INFINITY, which will be a really big
30            # number on a platform with large file support.  On these platforms,
31            # we need to test that the get/setrlimit functions properly convert
32            # the number to a C long long and that the conversion doesn't raise
33            # an error.
34            self.assertEqual(resource.RLIM_INFINITY, max)
35            resource.setrlimit(resource.RLIMIT_FSIZE, (cur, max))
36
37    def test_fsize_enforced(self):
38        try:
39            (cur, max) = resource.getrlimit(resource.RLIMIT_FSIZE)
40        except AttributeError:
41            pass
42        else:
43            # Check to see what happens when the RLIMIT_FSIZE is small.  Some
44            # versions of Python were terminated by an uncaught SIGXFSZ, but
45            # pythonrun.c has been fixed to ignore that exception.  If so, the
46            # write() should return EFBIG when the limit is exceeded.
47
48            # At least one platform has an unlimited RLIMIT_FSIZE and attempts
49            # to change it raise ValueError instead.
50            try:
51                try:
52                    resource.setrlimit(resource.RLIMIT_FSIZE, (1024, max))
53                    limit_set = True
54                except ValueError:
55                    limit_set = False
56                f = open(os_helper.TESTFN, "wb")
57                try:
58                    f.write(b"X" * 1024)
59                    try:
60                        f.write(b"Y")
61                        f.flush()
62                        # On some systems (e.g., Ubuntu on hppa) the flush()
63                        # doesn't always cause the exception, but the close()
64                        # does eventually.  Try flushing several times in
65                        # an attempt to ensure the file is really synced and
66                        # the exception raised.
67                        for i in range(5):
68                            time.sleep(.1)
69                            f.flush()
70                    except OSError:
71                        if not limit_set:
72                            raise
73                    if limit_set:
74                        # Close will attempt to flush the byte we wrote
75                        # Restore limit first to avoid getting a spurious error
76                        resource.setrlimit(resource.RLIMIT_FSIZE, (cur, max))
77                finally:
78                    f.close()
79            finally:
80                if limit_set:
81                    resource.setrlimit(resource.RLIMIT_FSIZE, (cur, max))
82                os_helper.unlink(os_helper.TESTFN)
83
84    def test_fsize_toobig(self):
85        # Be sure that setrlimit is checking for really large values
86        too_big = 10**50
87        try:
88            (cur, max) = resource.getrlimit(resource.RLIMIT_FSIZE)
89        except AttributeError:
90            pass
91        else:
92            try:
93                resource.setrlimit(resource.RLIMIT_FSIZE, (too_big, max))
94            except (OverflowError, ValueError):
95                pass
96            try:
97                resource.setrlimit(resource.RLIMIT_FSIZE, (max, too_big))
98            except (OverflowError, ValueError):
99                pass
100
101    def test_getrusage(self):
102        self.assertRaises(TypeError, resource.getrusage)
103        self.assertRaises(TypeError, resource.getrusage, 42, 42)
104        usageself = resource.getrusage(resource.RUSAGE_SELF)
105        usagechildren = resource.getrusage(resource.RUSAGE_CHILDREN)
106        # May not be available on all systems.
107        try:
108            usageboth = resource.getrusage(resource.RUSAGE_BOTH)
109        except (ValueError, AttributeError):
110            pass
111        try:
112            usage_thread = resource.getrusage(resource.RUSAGE_THREAD)
113        except (ValueError, AttributeError):
114            pass
115
116    # Issue 6083: Reference counting bug
117    @unittest.skipIf(sys.platform == "vxworks",
118                     "setting RLIMIT_CPU is not supported on VxWorks")
119    def test_setrusage_refcount(self):
120        try:
121            limits = resource.getrlimit(resource.RLIMIT_CPU)
122        except AttributeError:
123            pass
124        else:
125            class BadSequence:
126                def __len__(self):
127                    return 2
128                def __getitem__(self, key):
129                    if key in (0, 1):
130                        return len(tuple(range(1000000)))
131                    raise IndexError
132
133            resource.setrlimit(resource.RLIMIT_CPU, BadSequence())
134
135    def test_pagesize(self):
136        pagesize = resource.getpagesize()
137        self.assertIsInstance(pagesize, int)
138        self.assertGreaterEqual(pagesize, 0)
139
140    @unittest.skipUnless(sys.platform == 'linux', 'test requires Linux')
141    def test_linux_constants(self):
142        for attr in ['MSGQUEUE', 'NICE', 'RTPRIO', 'RTTIME', 'SIGPENDING']:
143            with contextlib.suppress(AttributeError):
144                self.assertIsInstance(getattr(resource, 'RLIMIT_' + attr), int)
145
146    def test_freebsd_contants(self):
147        for attr in ['SWAP', 'SBSIZE', 'NPTS']:
148            with contextlib.suppress(AttributeError):
149                self.assertIsInstance(getattr(resource, 'RLIMIT_' + attr), int)
150
151    @unittest.skipUnless(hasattr(resource, 'prlimit'), 'no prlimit')
152    @support.requires_linux_version(2, 6, 36)
153    def test_prlimit(self):
154        self.assertRaises(TypeError, resource.prlimit)
155        self.assertRaises(ProcessLookupError, resource.prlimit,
156                          -1, resource.RLIMIT_AS)
157        limit = resource.getrlimit(resource.RLIMIT_AS)
158        self.assertEqual(resource.prlimit(0, resource.RLIMIT_AS), limit)
159        self.assertEqual(resource.prlimit(0, resource.RLIMIT_AS, limit),
160                         limit)
161
162    # Issue 20191: Reference counting bug
163    @unittest.skipUnless(hasattr(resource, 'prlimit'), 'no prlimit')
164    @support.requires_linux_version(2, 6, 36)
165    def test_prlimit_refcount(self):
166        class BadSeq:
167            def __len__(self):
168                return 2
169            def __getitem__(self, key):
170                return limits[key] - 1  # new reference
171
172        limits = resource.getrlimit(resource.RLIMIT_AS)
173        self.assertEqual(resource.prlimit(0, resource.RLIMIT_AS, BadSeq()),
174                         limits)
175
176
177if __name__ == "__main__":
178    unittest.main()
179