1import contextlib
2import errno
3import importlib
4import io
5import os
6import shutil
7import socket
8import stat
9import subprocess
10import sys
11import tempfile
12import textwrap
13import time
14import unittest
15from test import support
16from test.support import script_helper
17
18TESTFN = support.TESTFN
19
20
21class TestSupport(unittest.TestCase):
22
23    def test_import_module(self):
24        support.import_module("ftplib")
25        self.assertRaises(unittest.SkipTest, support.import_module, "foo")
26
27    def test_import_fresh_module(self):
28        support.import_fresh_module("ftplib")
29
30    def test_get_attribute(self):
31        self.assertEqual(support.get_attribute(self, "test_get_attribute"),
32                        self.test_get_attribute)
33        self.assertRaises(unittest.SkipTest, support.get_attribute, self, "foo")
34
35    @unittest.skip("failing buildbots")
36    def test_get_original_stdout(self):
37        self.assertEqual(support.get_original_stdout(), sys.stdout)
38
39    def test_unload(self):
40        import sched
41        self.assertIn("sched", sys.modules)
42        support.unload("sched")
43        self.assertNotIn("sched", sys.modules)
44
45    def test_unlink(self):
46        with open(TESTFN, "w") as f:
47            pass
48        support.unlink(TESTFN)
49        self.assertFalse(os.path.exists(TESTFN))
50        support.unlink(TESTFN)
51
52    def test_rmtree(self):
53        dirpath = support.TESTFN + 'd'
54        subdirpath = os.path.join(dirpath, 'subdir')
55        os.mkdir(dirpath)
56        os.mkdir(subdirpath)
57        support.rmtree(dirpath)
58        self.assertFalse(os.path.exists(dirpath))
59        with support.swap_attr(support, 'verbose', 0):
60            support.rmtree(dirpath)
61
62        os.mkdir(dirpath)
63        os.mkdir(subdirpath)
64        os.chmod(dirpath, stat.S_IRUSR|stat.S_IXUSR)
65        with support.swap_attr(support, 'verbose', 0):
66            support.rmtree(dirpath)
67        self.assertFalse(os.path.exists(dirpath))
68
69        os.mkdir(dirpath)
70        os.mkdir(subdirpath)
71        os.chmod(dirpath, 0)
72        with support.swap_attr(support, 'verbose', 0):
73            support.rmtree(dirpath)
74        self.assertFalse(os.path.exists(dirpath))
75
76    def test_forget(self):
77        mod_filename = TESTFN + '.py'
78        with open(mod_filename, 'w') as f:
79            print('foo = 1', file=f)
80        sys.path.insert(0, os.curdir)
81        importlib.invalidate_caches()
82        try:
83            mod = __import__(TESTFN)
84            self.assertIn(TESTFN, sys.modules)
85
86            support.forget(TESTFN)
87            self.assertNotIn(TESTFN, sys.modules)
88        finally:
89            del sys.path[0]
90            support.unlink(mod_filename)
91            support.rmtree('__pycache__')
92
93    def test_HOST(self):
94        s = socket.create_server((support.HOST, 0))
95        s.close()
96
97    def test_find_unused_port(self):
98        port = support.find_unused_port()
99        s = socket.create_server((support.HOST, port))
100        s.close()
101
102    def test_bind_port(self):
103        s = socket.socket()
104        support.bind_port(s)
105        s.listen()
106        s.close()
107
108    # Tests for temp_dir()
109
110    def test_temp_dir(self):
111        """Test that temp_dir() creates and destroys its directory."""
112        parent_dir = tempfile.mkdtemp()
113        parent_dir = os.path.realpath(parent_dir)
114
115        try:
116            path = os.path.join(parent_dir, 'temp')
117            self.assertFalse(os.path.isdir(path))
118            with support.temp_dir(path) as temp_path:
119                self.assertEqual(temp_path, path)
120                self.assertTrue(os.path.isdir(path))
121            self.assertFalse(os.path.isdir(path))
122        finally:
123            support.rmtree(parent_dir)
124
125    def test_temp_dir__path_none(self):
126        """Test passing no path."""
127        with support.temp_dir() as temp_path:
128            self.assertTrue(os.path.isdir(temp_path))
129        self.assertFalse(os.path.isdir(temp_path))
130
131    def test_temp_dir__existing_dir__quiet_default(self):
132        """Test passing a directory that already exists."""
133        def call_temp_dir(path):
134            with support.temp_dir(path) as temp_path:
135                raise Exception("should not get here")
136
137        path = tempfile.mkdtemp()
138        path = os.path.realpath(path)
139        try:
140            self.assertTrue(os.path.isdir(path))
141            self.assertRaises(FileExistsError, call_temp_dir, path)
142            # Make sure temp_dir did not delete the original directory.
143            self.assertTrue(os.path.isdir(path))
144        finally:
145            shutil.rmtree(path)
146
147    def test_temp_dir__existing_dir__quiet_true(self):
148        """Test passing a directory that already exists with quiet=True."""
149        path = tempfile.mkdtemp()
150        path = os.path.realpath(path)
151
152        try:
153            with support.check_warnings() as recorder:
154                with support.temp_dir(path, quiet=True) as temp_path:
155                    self.assertEqual(path, temp_path)
156                warnings = [str(w.message) for w in recorder.warnings]
157            # Make sure temp_dir did not delete the original directory.
158            self.assertTrue(os.path.isdir(path))
159        finally:
160            shutil.rmtree(path)
161
162        self.assertEqual(len(warnings), 1, warnings)
163        warn = warnings[0]
164        self.assertTrue(warn.startswith(f'tests may fail, unable to create '
165                                        f'temporary directory {path!r}: '),
166                        warn)
167
168    @unittest.skipUnless(hasattr(os, "fork"), "test requires os.fork")
169    def test_temp_dir__forked_child(self):
170        """Test that a forked child process does not remove the directory."""
171        # See bpo-30028 for details.
172        # Run the test as an external script, because it uses fork.
173        script_helper.assert_python_ok("-c", textwrap.dedent("""
174            import os
175            from test import support
176            with support.temp_cwd() as temp_path:
177                pid = os.fork()
178                if pid != 0:
179                    # parent process (child has pid == 0)
180
181                    # wait for the child to terminate
182                    (pid, status) = os.waitpid(pid, 0)
183                    if status != 0:
184                        raise AssertionError(f"Child process failed with exit "
185                                             f"status indication 0x{status:x}.")
186
187                    # Make sure that temp_path is still present. When the child
188                    # process leaves the 'temp_cwd'-context, the __exit__()-
189                    # method of the context must not remove the temporary
190                    # directory.
191                    if not os.path.isdir(temp_path):
192                        raise AssertionError("Child removed temp_path.")
193        """))
194
195    # Tests for change_cwd()
196
197    def test_change_cwd(self):
198        original_cwd = os.getcwd()
199
200        with support.temp_dir() as temp_path:
201            with support.change_cwd(temp_path) as new_cwd:
202                self.assertEqual(new_cwd, temp_path)
203                self.assertEqual(os.getcwd(), new_cwd)
204
205        self.assertEqual(os.getcwd(), original_cwd)
206
207    def test_change_cwd__non_existent_dir(self):
208        """Test passing a non-existent directory."""
209        original_cwd = os.getcwd()
210
211        def call_change_cwd(path):
212            with support.change_cwd(path) as new_cwd:
213                raise Exception("should not get here")
214
215        with support.temp_dir() as parent_dir:
216            non_existent_dir = os.path.join(parent_dir, 'does_not_exist')
217            self.assertRaises(FileNotFoundError, call_change_cwd,
218                              non_existent_dir)
219
220        self.assertEqual(os.getcwd(), original_cwd)
221
222    def test_change_cwd__non_existent_dir__quiet_true(self):
223        """Test passing a non-existent directory with quiet=True."""
224        original_cwd = os.getcwd()
225
226        with support.temp_dir() as parent_dir:
227            bad_dir = os.path.join(parent_dir, 'does_not_exist')
228            with support.check_warnings() as recorder:
229                with support.change_cwd(bad_dir, quiet=True) as new_cwd:
230                    self.assertEqual(new_cwd, original_cwd)
231                    self.assertEqual(os.getcwd(), new_cwd)
232                warnings = [str(w.message) for w in recorder.warnings]
233
234        self.assertEqual(len(warnings), 1, warnings)
235        warn = warnings[0]
236        self.assertTrue(warn.startswith(f'tests may fail, unable to change '
237                                        f'the current working directory '
238                                        f'to {bad_dir!r}: '),
239                        warn)
240
241    # Tests for change_cwd()
242
243    def test_change_cwd__chdir_warning(self):
244        """Check the warning message when os.chdir() fails."""
245        path = TESTFN + '_does_not_exist'
246        with support.check_warnings() as recorder:
247            with support.change_cwd(path=path, quiet=True):
248                pass
249            messages = [str(w.message) for w in recorder.warnings]
250
251        self.assertEqual(len(messages), 1, messages)
252        msg = messages[0]
253        self.assertTrue(msg.startswith(f'tests may fail, unable to change '
254                                       f'the current working directory '
255                                       f'to {path!r}: '),
256                        msg)
257
258    # Tests for temp_cwd()
259
260    def test_temp_cwd(self):
261        here = os.getcwd()
262        with support.temp_cwd(name=TESTFN):
263            self.assertEqual(os.path.basename(os.getcwd()), TESTFN)
264        self.assertFalse(os.path.exists(TESTFN))
265        self.assertEqual(os.getcwd(), here)
266
267
268    def test_temp_cwd__name_none(self):
269        """Test passing None to temp_cwd()."""
270        original_cwd = os.getcwd()
271        with support.temp_cwd(name=None) as new_cwd:
272            self.assertNotEqual(new_cwd, original_cwd)
273            self.assertTrue(os.path.isdir(new_cwd))
274            self.assertEqual(os.getcwd(), new_cwd)
275        self.assertEqual(os.getcwd(), original_cwd)
276
277    def test_sortdict(self):
278        self.assertEqual(support.sortdict({3:3, 2:2, 1:1}), "{1: 1, 2: 2, 3: 3}")
279
280    def test_make_bad_fd(self):
281        fd = support.make_bad_fd()
282        with self.assertRaises(OSError) as cm:
283            os.write(fd, b"foo")
284        self.assertEqual(cm.exception.errno, errno.EBADF)
285
286    def test_check_syntax_error(self):
287        support.check_syntax_error(self, "def class", lineno=1, offset=5)
288        with self.assertRaises(AssertionError):
289            support.check_syntax_error(self, "x=1")
290
291    def test_CleanImport(self):
292        import importlib
293        with support.CleanImport("asyncore"):
294            importlib.import_module("asyncore")
295
296    def test_DirsOnSysPath(self):
297        with support.DirsOnSysPath('foo', 'bar'):
298            self.assertIn("foo", sys.path)
299            self.assertIn("bar", sys.path)
300        self.assertNotIn("foo", sys.path)
301        self.assertNotIn("bar", sys.path)
302
303    def test_captured_stdout(self):
304        with support.captured_stdout() as stdout:
305            print("hello")
306        self.assertEqual(stdout.getvalue(), "hello\n")
307
308    def test_captured_stderr(self):
309        with support.captured_stderr() as stderr:
310            print("hello", file=sys.stderr)
311        self.assertEqual(stderr.getvalue(), "hello\n")
312
313    def test_captured_stdin(self):
314        with support.captured_stdin() as stdin:
315            stdin.write('hello\n')
316            stdin.seek(0)
317            # call test code that consumes from sys.stdin
318            captured = input()
319        self.assertEqual(captured, "hello")
320
321    def test_gc_collect(self):
322        support.gc_collect()
323
324    def test_python_is_optimized(self):
325        self.assertIsInstance(support.python_is_optimized(), bool)
326
327    def test_swap_attr(self):
328        class Obj:
329            pass
330        obj = Obj()
331        obj.x = 1
332        with support.swap_attr(obj, "x", 5) as x:
333            self.assertEqual(obj.x, 5)
334            self.assertEqual(x, 1)
335        self.assertEqual(obj.x, 1)
336        with support.swap_attr(obj, "y", 5) as y:
337            self.assertEqual(obj.y, 5)
338            self.assertIsNone(y)
339        self.assertFalse(hasattr(obj, 'y'))
340        with support.swap_attr(obj, "y", 5):
341            del obj.y
342        self.assertFalse(hasattr(obj, 'y'))
343
344    def test_swap_item(self):
345        D = {"x":1}
346        with support.swap_item(D, "x", 5) as x:
347            self.assertEqual(D["x"], 5)
348            self.assertEqual(x, 1)
349        self.assertEqual(D["x"], 1)
350        with support.swap_item(D, "y", 5) as y:
351            self.assertEqual(D["y"], 5)
352            self.assertIsNone(y)
353        self.assertNotIn("y", D)
354        with support.swap_item(D, "y", 5):
355            del D["y"]
356        self.assertNotIn("y", D)
357
358    class RefClass:
359        attribute1 = None
360        attribute2 = None
361        _hidden_attribute1 = None
362        __magic_1__ = None
363
364    class OtherClass:
365        attribute2 = None
366        attribute3 = None
367        __magic_1__ = None
368        __magic_2__ = None
369
370    def test_detect_api_mismatch(self):
371        missing_items = support.detect_api_mismatch(self.RefClass,
372                                                    self.OtherClass)
373        self.assertEqual({'attribute1'}, missing_items)
374
375        missing_items = support.detect_api_mismatch(self.OtherClass,
376                                                    self.RefClass)
377        self.assertEqual({'attribute3', '__magic_2__'}, missing_items)
378
379    def test_detect_api_mismatch__ignore(self):
380        ignore = ['attribute1', 'attribute3', '__magic_2__', 'not_in_either']
381
382        missing_items = support.detect_api_mismatch(
383                self.RefClass, self.OtherClass, ignore=ignore)
384        self.assertEqual(set(), missing_items)
385
386        missing_items = support.detect_api_mismatch(
387                self.OtherClass, self.RefClass, ignore=ignore)
388        self.assertEqual(set(), missing_items)
389
390    def test_check__all__(self):
391        extra = {'tempdir'}
392        blacklist = {'template'}
393        support.check__all__(self,
394                             tempfile,
395                             extra=extra,
396                             blacklist=blacklist)
397
398        extra = {'TextTestResult', 'installHandler'}
399        blacklist = {'load_tests', "TestProgram", "BaseTestSuite"}
400
401        support.check__all__(self,
402                             unittest,
403                             ("unittest.result", "unittest.case",
404                              "unittest.suite", "unittest.loader",
405                              "unittest.main", "unittest.runner",
406                              "unittest.signals", "unittest.async_case"),
407                             extra=extra,
408                             blacklist=blacklist)
409
410        self.assertRaises(AssertionError, support.check__all__, self, unittest)
411
412    @unittest.skipUnless(hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG'),
413                         'need os.waitpid() and os.WNOHANG')
414    def test_reap_children(self):
415        # Make sure that there is no other pending child process
416        support.reap_children()
417
418        # Create a child process
419        pid = os.fork()
420        if pid == 0:
421            # child process: do nothing, just exit
422            os._exit(0)
423
424        t0 = time.monotonic()
425        deadline = time.monotonic() + 60.0
426
427        was_altered = support.environment_altered
428        try:
429            support.environment_altered = False
430            stderr = io.StringIO()
431
432            while True:
433                if time.monotonic() > deadline:
434                    self.fail("timeout")
435
436                old_stderr = sys.__stderr__
437                try:
438                    sys.__stderr__ = stderr
439                    support.reap_children()
440                finally:
441                    sys.__stderr__ = old_stderr
442
443                # Use environment_altered to check if reap_children() found
444                # the child process
445                if support.environment_altered:
446                    break
447
448                # loop until the child process completed
449                time.sleep(0.100)
450
451            msg = "Warning -- reap_children() reaped child process %s" % pid
452            self.assertIn(msg, stderr.getvalue())
453            self.assertTrue(support.environment_altered)
454        finally:
455            support.environment_altered = was_altered
456
457        # Just in case, check again that there is no other
458        # pending child process
459        support.reap_children()
460
461    def check_options(self, args, func, expected=None):
462        code = f'from test.support import {func}; print(repr({func}()))'
463        cmd = [sys.executable, *args, '-c', code]
464        env = {key: value for key, value in os.environ.items()
465               if not key.startswith('PYTHON')}
466        proc = subprocess.run(cmd,
467                              stdout=subprocess.PIPE,
468                              stderr=subprocess.DEVNULL,
469                              universal_newlines=True,
470                              env=env)
471        if expected is None:
472            expected = args
473        self.assertEqual(proc.stdout.rstrip(), repr(expected))
474        self.assertEqual(proc.returncode, 0)
475
476    def test_args_from_interpreter_flags(self):
477        # Test test.support.args_from_interpreter_flags()
478        for opts in (
479            # no option
480            [],
481            # single option
482            ['-B'],
483            ['-s'],
484            ['-S'],
485            ['-E'],
486            ['-v'],
487            ['-b'],
488            ['-q'],
489            ['-I'],
490            # same option multiple times
491            ['-bb'],
492            ['-vvv'],
493            # -W options
494            ['-Wignore'],
495            # -X options
496            ['-X', 'dev'],
497            ['-Wignore', '-X', 'dev'],
498            ['-X', 'faulthandler'],
499            ['-X', 'importtime'],
500            ['-X', 'showalloccount'],
501            ['-X', 'showrefcount'],
502            ['-X', 'tracemalloc'],
503            ['-X', 'tracemalloc=3'],
504        ):
505            with self.subTest(opts=opts):
506                self.check_options(opts, 'args_from_interpreter_flags')
507
508        self.check_options(['-I', '-E', '-s'], 'args_from_interpreter_flags',
509                           ['-I'])
510
511    def test_optim_args_from_interpreter_flags(self):
512        # Test test.support.optim_args_from_interpreter_flags()
513        for opts in (
514            # no option
515            [],
516            ['-O'],
517            ['-OO'],
518            ['-OOOO'],
519        ):
520            with self.subTest(opts=opts):
521                self.check_options(opts, 'optim_args_from_interpreter_flags')
522
523    def test_match_test(self):
524        class Test:
525            def __init__(self, test_id):
526                self.test_id = test_id
527
528            def id(self):
529                return self.test_id
530
531        test_access = Test('test.test_os.FileTests.test_access')
532        test_chdir = Test('test.test_os.Win32ErrorTests.test_chdir')
533
534        # Test acceptance
535        with support.swap_attr(support, '_match_test_func', None):
536            # match all
537            support.set_match_tests([])
538            self.assertTrue(support.match_test(test_access))
539            self.assertTrue(support.match_test(test_chdir))
540
541            # match all using None
542            support.set_match_tests(None, None)
543            self.assertTrue(support.match_test(test_access))
544            self.assertTrue(support.match_test(test_chdir))
545
546            # match the full test identifier
547            support.set_match_tests([test_access.id()], None)
548            self.assertTrue(support.match_test(test_access))
549            self.assertFalse(support.match_test(test_chdir))
550
551            # match the module name
552            support.set_match_tests(['test_os'], None)
553            self.assertTrue(support.match_test(test_access))
554            self.assertTrue(support.match_test(test_chdir))
555
556            # Test '*' pattern
557            support.set_match_tests(['test_*'], None)
558            self.assertTrue(support.match_test(test_access))
559            self.assertTrue(support.match_test(test_chdir))
560
561            # Test case sensitivity
562            support.set_match_tests(['filetests'], None)
563            self.assertFalse(support.match_test(test_access))
564            support.set_match_tests(['FileTests'], None)
565            self.assertTrue(support.match_test(test_access))
566
567            # Test pattern containing '.' and a '*' metacharacter
568            support.set_match_tests(['*test_os.*.test_*'], None)
569            self.assertTrue(support.match_test(test_access))
570            self.assertTrue(support.match_test(test_chdir))
571
572            # Multiple patterns
573            support.set_match_tests([test_access.id(), test_chdir.id()], None)
574            self.assertTrue(support.match_test(test_access))
575            self.assertTrue(support.match_test(test_chdir))
576
577            support.set_match_tests(['test_access', 'DONTMATCH'], None)
578            self.assertTrue(support.match_test(test_access))
579            self.assertFalse(support.match_test(test_chdir))
580
581        # Test rejection
582        with support.swap_attr(support, '_match_test_func', None):
583            # match all
584            support.set_match_tests(ignore_patterns=[])
585            self.assertTrue(support.match_test(test_access))
586            self.assertTrue(support.match_test(test_chdir))
587
588            # match all using None
589            support.set_match_tests(None, None)
590            self.assertTrue(support.match_test(test_access))
591            self.assertTrue(support.match_test(test_chdir))
592
593            # match the full test identifier
594            support.set_match_tests(None, [test_access.id()])
595            self.assertFalse(support.match_test(test_access))
596            self.assertTrue(support.match_test(test_chdir))
597
598            # match the module name
599            support.set_match_tests(None, ['test_os'])
600            self.assertFalse(support.match_test(test_access))
601            self.assertFalse(support.match_test(test_chdir))
602
603            # Test '*' pattern
604            support.set_match_tests(None, ['test_*'])
605            self.assertFalse(support.match_test(test_access))
606            self.assertFalse(support.match_test(test_chdir))
607
608            # Test case sensitivity
609            support.set_match_tests(None, ['filetests'])
610            self.assertTrue(support.match_test(test_access))
611            support.set_match_tests(None, ['FileTests'])
612            self.assertFalse(support.match_test(test_access))
613
614            # Test pattern containing '.' and a '*' metacharacter
615            support.set_match_tests(None, ['*test_os.*.test_*'])
616            self.assertFalse(support.match_test(test_access))
617            self.assertFalse(support.match_test(test_chdir))
618
619            # Multiple patterns
620            support.set_match_tests(None, [test_access.id(), test_chdir.id()])
621            self.assertFalse(support.match_test(test_access))
622            self.assertFalse(support.match_test(test_chdir))
623
624            support.set_match_tests(None, ['test_access', 'DONTMATCH'])
625            self.assertFalse(support.match_test(test_access))
626            self.assertTrue(support.match_test(test_chdir))
627
628    def test_fd_count(self):
629        # We cannot test the absolute value of fd_count(): on old Linux
630        # kernel or glibc versions, os.urandom() keeps a FD open on
631        # /dev/urandom device and Python has 4 FD opens instead of 3.
632        start = support.fd_count()
633        fd = os.open(__file__, os.O_RDONLY)
634        try:
635            more = support.fd_count()
636        finally:
637            os.close(fd)
638        self.assertEqual(more - start, 1)
639
640    def check_print_warning(self, msg, expected):
641        stderr = io.StringIO()
642
643        old_stderr = sys.__stderr__
644        try:
645            sys.__stderr__ = stderr
646            support.print_warning(msg)
647        finally:
648            sys.__stderr__ = old_stderr
649
650        self.assertEqual(stderr.getvalue(), expected)
651
652    def test_print_warning(self):
653        self.check_print_warning("msg",
654                                 "Warning -- msg\n")
655        self.check_print_warning("a\nb",
656                                 'Warning -- a\nWarning -- b\n')
657
658    # XXX -follows a list of untested API
659    # make_legacy_pyc
660    # is_resource_enabled
661    # requires
662    # fcmp
663    # umaks
664    # findfile
665    # check_warnings
666    # EnvironmentVarGuard
667    # TransientResource
668    # transient_internet
669    # run_with_locale
670    # set_memlimit
671    # bigmemtest
672    # precisionbigmemtest
673    # bigaddrspacetest
674    # requires_resource
675    # run_doctest
676    # threading_cleanup
677    # reap_threads
678    # strip_python_stderr
679    # can_symlink
680    # skip_unless_symlink
681    # SuppressCrashReport
682
683
684def test_main():
685    tests = [TestSupport]
686    support.run_unittest(*tests)
687
688if __name__ == '__main__':
689    test_main()
690