1import contextlib
2import sys
3import unittest
4from test import support
5import time
6
7resource = support.import_module('resource')
8
9# This test is checking a few specific problem spots with the resource module.
10
11class ResourceTest(unittest.TestCase):
12
13    def test_args(self):
14        self.assertRaises(TypeError, resource.getrlimit)
15        self.assertRaises(TypeError, resource.getrlimit, 42, 42)
16        self.assertRaises(TypeError, resource.setrlimit)
17        self.assertRaises(TypeError, resource.setrlimit, 42, 42, 42)
18
19    @unittest.skipIf(sys.platform == "vxworks",
20                     "setting RLIMIT_FSIZE is not supported on VxWorks")
21    def test_fsize_ismax(self):
22        try:
23            (cur, max) = resource.getrlimit(resource.RLIMIT_FSIZE)
24        except AttributeError:
25            pass
26        else:
27            # RLIMIT_FSIZE should be RLIM_INFINITY, which will be a really big
28            # number on a platform with large file support.  On these platforms,
29            # we need to test that the get/setrlimit functions properly convert
30            # the number to a C long long and that the conversion doesn't raise
31            # an error.
32            self.assertEqual(resource.RLIM_INFINITY, max)
33            resource.setrlimit(resource.RLIMIT_FSIZE, (cur, max))
34
35    def test_fsize_enforced(self):
36        try:
37            (cur, max) = resource.getrlimit(resource.RLIMIT_FSIZE)
38        except AttributeError:
39            pass
40        else:
41            # Check to see what happens when the RLIMIT_FSIZE is small.  Some
42            # versions of Python were terminated by an uncaught SIGXFSZ, but
43            # pythonrun.c has been fixed to ignore that exception.  If so, the
44            # write() should return EFBIG when the limit is exceeded.
45
46            # At least one platform has an unlimited RLIMIT_FSIZE and attempts
47            # to change it raise ValueError instead.
48            try:
49                try:
50                    resource.setrlimit(resource.RLIMIT_FSIZE, (1024, max))
51                    limit_set = True
52                except ValueError:
53                    limit_set = False
54                f = open(support.TESTFN, "wb")
55                try:
56                    f.write(b"X" * 1024)
57                    try:
58                        f.write(b"Y")
59                        f.flush()
60                        # On some systems (e.g., Ubuntu on hppa) the flush()
61                        # doesn't always cause the exception, but the close()
62                        # does eventually.  Try flushing several times in
63                        # an attempt to ensure the file is really synced and
64                        # the exception raised.
65                        for i in range(5):
66                            time.sleep(.1)
67                            f.flush()
68                    except OSError:
69                        if not limit_set:
70                            raise
71                    if limit_set:
72                        # Close will attempt to flush the byte we wrote
73                        # Restore limit first to avoid getting a spurious error
74                        resource.setrlimit(resource.RLIMIT_FSIZE, (cur, max))
75                finally:
76                    f.close()
77            finally:
78                if limit_set:
79                    resource.setrlimit(resource.RLIMIT_FSIZE, (cur, max))
80                support.unlink(support.TESTFN)
81
82    def test_fsize_toobig(self):
83        # Be sure that setrlimit is checking for really large values
84        too_big = 10**50
85        try:
86            (cur, max) = resource.getrlimit(resource.RLIMIT_FSIZE)
87        except AttributeError:
88            pass
89        else:
90            try:
91                resource.setrlimit(resource.RLIMIT_FSIZE, (too_big, max))
92            except (OverflowError, ValueError):
93                pass
94            try:
95                resource.setrlimit(resource.RLIMIT_FSIZE, (max, too_big))
96            except (OverflowError, ValueError):
97                pass
98
99    def test_getrusage(self):
100        self.assertRaises(TypeError, resource.getrusage)
101        self.assertRaises(TypeError, resource.getrusage, 42, 42)
102        usageself = resource.getrusage(resource.RUSAGE_SELF)
103        usagechildren = resource.getrusage(resource.RUSAGE_CHILDREN)
104        # May not be available on all systems.
105        try:
106            usageboth = resource.getrusage(resource.RUSAGE_BOTH)
107        except (ValueError, AttributeError):
108            pass
109        try:
110            usage_thread = resource.getrusage(resource.RUSAGE_THREAD)
111        except (ValueError, AttributeError):
112            pass
113
114    # Issue 6083: Reference counting bug
115    @unittest.skipIf(sys.platform == "vxworks",
116                     "setting RLIMIT_CPU is not supported on VxWorks")
117    def test_setrusage_refcount(self):
118        try:
119            limits = resource.getrlimit(resource.RLIMIT_CPU)
120        except AttributeError:
121            pass
122        else:
123            class BadSequence:
124                def __len__(self):
125                    return 2
126                def __getitem__(self, key):
127                    if key in (0, 1):
128                        return len(tuple(range(1000000)))
129                    raise IndexError
130
131            resource.setrlimit(resource.RLIMIT_CPU, BadSequence())
132
133    def test_pagesize(self):
134        pagesize = resource.getpagesize()
135        self.assertIsInstance(pagesize, int)
136        self.assertGreaterEqual(pagesize, 0)
137
138    @unittest.skipUnless(sys.platform == 'linux', 'test requires Linux')
139    def test_linux_constants(self):
140        for attr in ['MSGQUEUE', 'NICE', 'RTPRIO', 'RTTIME', 'SIGPENDING']:
141            with contextlib.suppress(AttributeError):
142                self.assertIsInstance(getattr(resource, 'RLIMIT_' + attr), int)
143
144    def test_freebsd_contants(self):
145        for attr in ['SWAP', 'SBSIZE', 'NPTS']:
146            with contextlib.suppress(AttributeError):
147                self.assertIsInstance(getattr(resource, 'RLIMIT_' + attr), int)
148
149    @unittest.skipUnless(hasattr(resource, 'prlimit'), 'no prlimit')
150    @support.requires_linux_version(2, 6, 36)
151    def test_prlimit(self):
152        self.assertRaises(TypeError, resource.prlimit)
153        self.assertRaises(ProcessLookupError, resource.prlimit,
154                          -1, resource.RLIMIT_AS)
155        limit = resource.getrlimit(resource.RLIMIT_AS)
156        self.assertEqual(resource.prlimit(0, resource.RLIMIT_AS), limit)
157        self.assertEqual(resource.prlimit(0, resource.RLIMIT_AS, limit),
158                         limit)
159
160    # Issue 20191: Reference counting bug
161    @unittest.skipUnless(hasattr(resource, 'prlimit'), 'no prlimit')
162    @support.requires_linux_version(2, 6, 36)
163    def test_prlimit_refcount(self):
164        class BadSeq:
165            def __len__(self):
166                return 2
167            def __getitem__(self, key):
168                return limits[key] - 1  # new reference
169
170        limits = resource.getrlimit(resource.RLIMIT_AS)
171        self.assertEqual(resource.prlimit(0, resource.RLIMIT_AS, BadSeq()),
172                         limits)
173
174
175if __name__ == "__main__":
176    unittest.main()
177