1import contextlib
2import errno
3import importlib
4import io
5import os
6import shutil
7import socket
8import stat
9import subprocess
10import sys
11import tempfile
12import textwrap
13import time
14import unittest
15from test import support
16from test.support import script_helper
17
18TESTFN = support.TESTFN
19
20
21class TestSupport(unittest.TestCase):
22
23    def test_import_module(self):
24        support.import_module("ftplib")
25        self.assertRaises(unittest.SkipTest, support.import_module, "foo")
26
27    def test_import_fresh_module(self):
28        support.import_fresh_module("ftplib")
29
30    def test_get_attribute(self):
31        self.assertEqual(support.get_attribute(self, "test_get_attribute"),
32                        self.test_get_attribute)
33        self.assertRaises(unittest.SkipTest, support.get_attribute, self, "foo")
34
35    @unittest.skip("failing buildbots")
36    def test_get_original_stdout(self):
37        self.assertEqual(support.get_original_stdout(), sys.stdout)
38
39    def test_unload(self):
40        import sched
41        self.assertIn("sched", sys.modules)
42        support.unload("sched")
43        self.assertNotIn("sched", sys.modules)
44
45    def test_unlink(self):
46        with open(TESTFN, "w") as f:
47            pass
48        support.unlink(TESTFN)
49        self.assertFalse(os.path.exists(TESTFN))
50        support.unlink(TESTFN)
51
52    def test_rmtree(self):
53        dirpath = support.TESTFN + 'd'
54        subdirpath = os.path.join(dirpath, 'subdir')
55        os.mkdir(dirpath)
56        os.mkdir(subdirpath)
57        support.rmtree(dirpath)
58        self.assertFalse(os.path.exists(dirpath))
59        with support.swap_attr(support, 'verbose', 0):
60            support.rmtree(dirpath)
61
62        os.mkdir(dirpath)
63        os.mkdir(subdirpath)
64        os.chmod(dirpath, stat.S_IRUSR|stat.S_IXUSR)
65        with support.swap_attr(support, 'verbose', 0):
66            support.rmtree(dirpath)
67        self.assertFalse(os.path.exists(dirpath))
68
69        os.mkdir(dirpath)
70        os.mkdir(subdirpath)
71        os.chmod(dirpath, 0)
72        with support.swap_attr(support, 'verbose', 0):
73            support.rmtree(dirpath)
74        self.assertFalse(os.path.exists(dirpath))
75
76    def test_forget(self):
77        mod_filename = TESTFN + '.py'
78        with open(mod_filename, 'w') as f:
79            print('foo = 1', file=f)
80        sys.path.insert(0, os.curdir)
81        importlib.invalidate_caches()
82        try:
83            mod = __import__(TESTFN)
84            self.assertIn(TESTFN, sys.modules)
85
86            support.forget(TESTFN)
87            self.assertNotIn(TESTFN, sys.modules)
88        finally:
89            del sys.path[0]
90            support.unlink(mod_filename)
91            support.rmtree('__pycache__')
92
93    def test_HOST(self):
94        s = socket.socket()
95        s.bind((support.HOST, 0))
96        s.close()
97
98    def test_find_unused_port(self):
99        port = support.find_unused_port()
100        s = socket.socket()
101        s.bind((support.HOST, port))
102        s.close()
103
104    def test_bind_port(self):
105        s = socket.socket()
106        support.bind_port(s)
107        s.listen()
108        s.close()
109
110    # Tests for temp_dir()
111
112    def test_temp_dir(self):
113        """Test that temp_dir() creates and destroys its directory."""
114        parent_dir = tempfile.mkdtemp()
115        parent_dir = os.path.realpath(parent_dir)
116
117        try:
118            path = os.path.join(parent_dir, 'temp')
119            self.assertFalse(os.path.isdir(path))
120            with support.temp_dir(path) as temp_path:
121                self.assertEqual(temp_path, path)
122                self.assertTrue(os.path.isdir(path))
123            self.assertFalse(os.path.isdir(path))
124        finally:
125            support.rmtree(parent_dir)
126
127    def test_temp_dir__path_none(self):
128        """Test passing no path."""
129        with support.temp_dir() as temp_path:
130            self.assertTrue(os.path.isdir(temp_path))
131        self.assertFalse(os.path.isdir(temp_path))
132
133    def test_temp_dir__existing_dir__quiet_default(self):
134        """Test passing a directory that already exists."""
135        def call_temp_dir(path):
136            with support.temp_dir(path) as temp_path:
137                raise Exception("should not get here")
138
139        path = tempfile.mkdtemp()
140        path = os.path.realpath(path)
141        try:
142            self.assertTrue(os.path.isdir(path))
143            self.assertRaises(FileExistsError, call_temp_dir, path)
144            # Make sure temp_dir did not delete the original directory.
145            self.assertTrue(os.path.isdir(path))
146        finally:
147            shutil.rmtree(path)
148
149    def test_temp_dir__existing_dir__quiet_true(self):
150        """Test passing a directory that already exists with quiet=True."""
151        path = tempfile.mkdtemp()
152        path = os.path.realpath(path)
153
154        try:
155            with support.check_warnings() as recorder:
156                with support.temp_dir(path, quiet=True) as temp_path:
157                    self.assertEqual(path, temp_path)
158                warnings = [str(w.message) for w in recorder.warnings]
159            # Make sure temp_dir did not delete the original directory.
160            self.assertTrue(os.path.isdir(path))
161        finally:
162            shutil.rmtree(path)
163
164        self.assertEqual(len(warnings), 1, warnings)
165        warn = warnings[0]
166        self.assertTrue(warn.startswith(f'tests may fail, unable to create '
167                                        f'temporary directory {path!r}: '),
168                        warn)
169
170    @unittest.skipUnless(hasattr(os, "fork"), "test requires os.fork")
171    def test_temp_dir__forked_child(self):
172        """Test that a forked child process does not remove the directory."""
173        # See bpo-30028 for details.
174        # Run the test as an external script, because it uses fork.
175        script_helper.assert_python_ok("-c", textwrap.dedent("""
176            import os
177            from test import support
178            with support.temp_cwd() as temp_path:
179                pid = os.fork()
180                if pid != 0:
181                    # parent process (child has pid == 0)
182
183                    # wait for the child to terminate
184                    (pid, status) = os.waitpid(pid, 0)
185                    if status != 0:
186                        raise AssertionError(f"Child process failed with exit "
187                                             f"status indication 0x{status:x}.")
188
189                    # Make sure that temp_path is still present. When the child
190                    # process leaves the 'temp_cwd'-context, the __exit__()-
191                    # method of the context must not remove the temporary
192                    # directory.
193                    if not os.path.isdir(temp_path):
194                        raise AssertionError("Child removed temp_path.")
195        """))
196
197    # Tests for change_cwd()
198
199    def test_change_cwd(self):
200        original_cwd = os.getcwd()
201
202        with support.temp_dir() as temp_path:
203            with support.change_cwd(temp_path) as new_cwd:
204                self.assertEqual(new_cwd, temp_path)
205                self.assertEqual(os.getcwd(), new_cwd)
206
207        self.assertEqual(os.getcwd(), original_cwd)
208
209    def test_change_cwd__non_existent_dir(self):
210        """Test passing a non-existent directory."""
211        original_cwd = os.getcwd()
212
213        def call_change_cwd(path):
214            with support.change_cwd(path) as new_cwd:
215                raise Exception("should not get here")
216
217        with support.temp_dir() as parent_dir:
218            non_existent_dir = os.path.join(parent_dir, 'does_not_exist')
219            self.assertRaises(FileNotFoundError, call_change_cwd,
220                              non_existent_dir)
221
222        self.assertEqual(os.getcwd(), original_cwd)
223
224    def test_change_cwd__non_existent_dir__quiet_true(self):
225        """Test passing a non-existent directory with quiet=True."""
226        original_cwd = os.getcwd()
227
228        with support.temp_dir() as parent_dir:
229            bad_dir = os.path.join(parent_dir, 'does_not_exist')
230            with support.check_warnings() as recorder:
231                with support.change_cwd(bad_dir, quiet=True) as new_cwd:
232                    self.assertEqual(new_cwd, original_cwd)
233                    self.assertEqual(os.getcwd(), new_cwd)
234                warnings = [str(w.message) for w in recorder.warnings]
235
236        self.assertEqual(len(warnings), 1, warnings)
237        warn = warnings[0]
238        self.assertTrue(warn.startswith(f'tests may fail, unable to change '
239                                        f'the current working directory '
240                                        f'to {bad_dir!r}: '),
241                        warn)
242
243    # Tests for change_cwd()
244
245    def test_change_cwd__chdir_warning(self):
246        """Check the warning message when os.chdir() fails."""
247        path = TESTFN + '_does_not_exist'
248        with support.check_warnings() as recorder:
249            with support.change_cwd(path=path, quiet=True):
250                pass
251            messages = [str(w.message) for w in recorder.warnings]
252
253        self.assertEqual(len(messages), 1, messages)
254        msg = messages[0]
255        self.assertTrue(msg.startswith(f'tests may fail, unable to change '
256                                       f'the current working directory '
257                                       f'to {path!r}: '),
258                        msg)
259
260    # Tests for temp_cwd()
261
262    def test_temp_cwd(self):
263        here = os.getcwd()
264        with support.temp_cwd(name=TESTFN):
265            self.assertEqual(os.path.basename(os.getcwd()), TESTFN)
266        self.assertFalse(os.path.exists(TESTFN))
267        self.assertEqual(os.getcwd(), here)
268
269
270    def test_temp_cwd__name_none(self):
271        """Test passing None to temp_cwd()."""
272        original_cwd = os.getcwd()
273        with support.temp_cwd(name=None) as new_cwd:
274            self.assertNotEqual(new_cwd, original_cwd)
275            self.assertTrue(os.path.isdir(new_cwd))
276            self.assertEqual(os.getcwd(), new_cwd)
277        self.assertEqual(os.getcwd(), original_cwd)
278
279    def test_sortdict(self):
280        self.assertEqual(support.sortdict({3:3, 2:2, 1:1}), "{1: 1, 2: 2, 3: 3}")
281
282    def test_make_bad_fd(self):
283        fd = support.make_bad_fd()
284        with self.assertRaises(OSError) as cm:
285            os.write(fd, b"foo")
286        self.assertEqual(cm.exception.errno, errno.EBADF)
287
288    def test_check_syntax_error(self):
289        support.check_syntax_error(self, "def class", lineno=1, offset=9)
290        with self.assertRaises(AssertionError):
291            support.check_syntax_error(self, "x=1")
292
293    def test_CleanImport(self):
294        import importlib
295        with support.CleanImport("asyncore"):
296            importlib.import_module("asyncore")
297
298    def test_DirsOnSysPath(self):
299        with support.DirsOnSysPath('foo', 'bar'):
300            self.assertIn("foo", sys.path)
301            self.assertIn("bar", sys.path)
302        self.assertNotIn("foo", sys.path)
303        self.assertNotIn("bar", sys.path)
304
305    def test_captured_stdout(self):
306        with support.captured_stdout() as stdout:
307            print("hello")
308        self.assertEqual(stdout.getvalue(), "hello\n")
309
310    def test_captured_stderr(self):
311        with support.captured_stderr() as stderr:
312            print("hello", file=sys.stderr)
313        self.assertEqual(stderr.getvalue(), "hello\n")
314
315    def test_captured_stdin(self):
316        with support.captured_stdin() as stdin:
317            stdin.write('hello\n')
318            stdin.seek(0)
319            # call test code that consumes from sys.stdin
320            captured = input()
321        self.assertEqual(captured, "hello")
322
323    def test_gc_collect(self):
324        support.gc_collect()
325
326    def test_python_is_optimized(self):
327        self.assertIsInstance(support.python_is_optimized(), bool)
328
329    def test_swap_attr(self):
330        class Obj:
331            pass
332        obj = Obj()
333        obj.x = 1
334        with support.swap_attr(obj, "x", 5) as x:
335            self.assertEqual(obj.x, 5)
336            self.assertEqual(x, 1)
337        self.assertEqual(obj.x, 1)
338        with support.swap_attr(obj, "y", 5) as y:
339            self.assertEqual(obj.y, 5)
340            self.assertIsNone(y)
341        self.assertFalse(hasattr(obj, 'y'))
342        with support.swap_attr(obj, "y", 5):
343            del obj.y
344        self.assertFalse(hasattr(obj, 'y'))
345
346    def test_swap_item(self):
347        D = {"x":1}
348        with support.swap_item(D, "x", 5) as x:
349            self.assertEqual(D["x"], 5)
350            self.assertEqual(x, 1)
351        self.assertEqual(D["x"], 1)
352        with support.swap_item(D, "y", 5) as y:
353            self.assertEqual(D["y"], 5)
354            self.assertIsNone(y)
355        self.assertNotIn("y", D)
356        with support.swap_item(D, "y", 5):
357            del D["y"]
358        self.assertNotIn("y", D)
359
360    class RefClass:
361        attribute1 = None
362        attribute2 = None
363        _hidden_attribute1 = None
364        __magic_1__ = None
365
366    class OtherClass:
367        attribute2 = None
368        attribute3 = None
369        __magic_1__ = None
370        __magic_2__ = None
371
372    def test_detect_api_mismatch(self):
373        missing_items = support.detect_api_mismatch(self.RefClass,
374                                                    self.OtherClass)
375        self.assertEqual({'attribute1'}, missing_items)
376
377        missing_items = support.detect_api_mismatch(self.OtherClass,
378                                                    self.RefClass)
379        self.assertEqual({'attribute3', '__magic_2__'}, missing_items)
380
381    def test_detect_api_mismatch__ignore(self):
382        ignore = ['attribute1', 'attribute3', '__magic_2__', 'not_in_either']
383
384        missing_items = support.detect_api_mismatch(
385                self.RefClass, self.OtherClass, ignore=ignore)
386        self.assertEqual(set(), missing_items)
387
388        missing_items = support.detect_api_mismatch(
389                self.OtherClass, self.RefClass, ignore=ignore)
390        self.assertEqual(set(), missing_items)
391
392    def test_check__all__(self):
393        extra = {'tempdir'}
394        blacklist = {'template'}
395        support.check__all__(self,
396                             tempfile,
397                             extra=extra,
398                             blacklist=blacklist)
399
400        extra = {'TextTestResult', 'installHandler'}
401        blacklist = {'load_tests', "TestProgram", "BaseTestSuite"}
402
403        support.check__all__(self,
404                             unittest,
405                             ("unittest.result", "unittest.case",
406                              "unittest.suite", "unittest.loader",
407                              "unittest.main", "unittest.runner",
408                              "unittest.signals"),
409                             extra=extra,
410                             blacklist=blacklist)
411
412        self.assertRaises(AssertionError, support.check__all__, self, unittest)
413
414    @unittest.skipUnless(hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG'),
415                         'need os.waitpid() and os.WNOHANG')
416    def test_reap_children(self):
417        # Make sure that there is no other pending child process
418        support.reap_children()
419
420        # Create a child process
421        pid = os.fork()
422        if pid == 0:
423            # child process: do nothing, just exit
424            os._exit(0)
425
426        t0 = time.monotonic()
427        deadline = time.monotonic() + 60.0
428
429        was_altered = support.environment_altered
430        try:
431            support.environment_altered = False
432            stderr = io.StringIO()
433
434            while True:
435                if time.monotonic() > deadline:
436                    self.fail("timeout")
437
438                with contextlib.redirect_stderr(stderr):
439                    support.reap_children()
440
441                # Use environment_altered to check if reap_children() found
442                # the child process
443                if support.environment_altered:
444                    break
445
446                # loop until the child process completed
447                time.sleep(0.100)
448
449            msg = "Warning -- reap_children() reaped child process %s" % pid
450            self.assertIn(msg, stderr.getvalue())
451            self.assertTrue(support.environment_altered)
452        finally:
453            support.environment_altered = was_altered
454
455        # Just in case, check again that there is no other
456        # pending child process
457        support.reap_children()
458
459    def check_options(self, args, func, expected=None):
460        code = f'from test.support import {func}; print(repr({func}()))'
461        cmd = [sys.executable, *args, '-c', code]
462        env = {key: value for key, value in os.environ.items()
463               if not key.startswith('PYTHON')}
464        proc = subprocess.run(cmd,
465                              stdout=subprocess.PIPE,
466                              stderr=subprocess.DEVNULL,
467                              universal_newlines=True,
468                              env=env)
469        if expected is None:
470            expected = args
471        self.assertEqual(proc.stdout.rstrip(), repr(expected))
472        self.assertEqual(proc.returncode, 0)
473
474    def test_args_from_interpreter_flags(self):
475        # Test test.support.args_from_interpreter_flags()
476        for opts in (
477            # no option
478            [],
479            # single option
480            ['-B'],
481            ['-s'],
482            ['-S'],
483            ['-E'],
484            ['-v'],
485            ['-b'],
486            ['-q'],
487            ['-I'],
488            # same option multiple times
489            ['-bb'],
490            ['-vvv'],
491            # -W options
492            ['-Wignore'],
493            # -X options
494            ['-X', 'dev'],
495            ['-Wignore', '-X', 'dev'],
496            ['-X', 'faulthandler'],
497            ['-X', 'importtime'],
498            ['-X', 'showalloccount'],
499            ['-X', 'showrefcount'],
500            ['-X', 'tracemalloc'],
501            ['-X', 'tracemalloc=3'],
502        ):
503            with self.subTest(opts=opts):
504                self.check_options(opts, 'args_from_interpreter_flags')
505
506        self.check_options(['-I', '-E', '-s'], 'args_from_interpreter_flags',
507                           ['-I'])
508
509    def test_optim_args_from_interpreter_flags(self):
510        # Test test.support.optim_args_from_interpreter_flags()
511        for opts in (
512            # no option
513            [],
514            ['-O'],
515            ['-OO'],
516            ['-OOOO'],
517        ):
518            with self.subTest(opts=opts):
519                self.check_options(opts, 'optim_args_from_interpreter_flags')
520
521    def test_match_test(self):
522        class Test:
523            def __init__(self, test_id):
524                self.test_id = test_id
525
526            def id(self):
527                return self.test_id
528
529        test_access = Test('test.test_os.FileTests.test_access')
530        test_chdir = Test('test.test_os.Win32ErrorTests.test_chdir')
531
532        # Test acceptance
533        with support.swap_attr(support, '_match_test_func', None):
534            # match all
535            support.set_match_tests([])
536            self.assertTrue(support.match_test(test_access))
537            self.assertTrue(support.match_test(test_chdir))
538
539            # match all using None
540            support.set_match_tests(None, None)
541            self.assertTrue(support.match_test(test_access))
542            self.assertTrue(support.match_test(test_chdir))
543
544            # match the full test identifier
545            support.set_match_tests([test_access.id()], None)
546            self.assertTrue(support.match_test(test_access))
547            self.assertFalse(support.match_test(test_chdir))
548
549            # match the module name
550            support.set_match_tests(['test_os'], None)
551            self.assertTrue(support.match_test(test_access))
552            self.assertTrue(support.match_test(test_chdir))
553
554            # Test '*' pattern
555            support.set_match_tests(['test_*'], None)
556            self.assertTrue(support.match_test(test_access))
557            self.assertTrue(support.match_test(test_chdir))
558
559            # Test case sensitivity
560            support.set_match_tests(['filetests'], None)
561            self.assertFalse(support.match_test(test_access))
562            support.set_match_tests(['FileTests'], None)
563            self.assertTrue(support.match_test(test_access))
564
565            # Test pattern containing '.' and a '*' metacharacter
566            support.set_match_tests(['*test_os.*.test_*'], None)
567            self.assertTrue(support.match_test(test_access))
568            self.assertTrue(support.match_test(test_chdir))
569
570            # Multiple patterns
571            support.set_match_tests([test_access.id(), test_chdir.id()], None)
572            self.assertTrue(support.match_test(test_access))
573            self.assertTrue(support.match_test(test_chdir))
574
575            support.set_match_tests(['test_access', 'DONTMATCH'], None)
576            self.assertTrue(support.match_test(test_access))
577            self.assertFalse(support.match_test(test_chdir))
578
579        # Test rejection
580        with support.swap_attr(support, '_match_test_func', None):
581            # match all
582            support.set_match_tests(ignore_patterns=[])
583            self.assertTrue(support.match_test(test_access))
584            self.assertTrue(support.match_test(test_chdir))
585
586            # match all using None
587            support.set_match_tests(None, None)
588            self.assertTrue(support.match_test(test_access))
589            self.assertTrue(support.match_test(test_chdir))
590
591            # match the full test identifier
592            support.set_match_tests(None, [test_access.id()])
593            self.assertFalse(support.match_test(test_access))
594            self.assertTrue(support.match_test(test_chdir))
595
596            # match the module name
597            support.set_match_tests(None, ['test_os'])
598            self.assertFalse(support.match_test(test_access))
599            self.assertFalse(support.match_test(test_chdir))
600
601            # Test '*' pattern
602            support.set_match_tests(None, ['test_*'])
603            self.assertFalse(support.match_test(test_access))
604            self.assertFalse(support.match_test(test_chdir))
605
606            # Test case sensitivity
607            support.set_match_tests(None, ['filetests'])
608            self.assertTrue(support.match_test(test_access))
609            support.set_match_tests(None, ['FileTests'])
610            self.assertFalse(support.match_test(test_access))
611
612            # Test pattern containing '.' and a '*' metacharacter
613            support.set_match_tests(None, ['*test_os.*.test_*'])
614            self.assertFalse(support.match_test(test_access))
615            self.assertFalse(support.match_test(test_chdir))
616
617            # Multiple patterns
618            support.set_match_tests(None, [test_access.id(), test_chdir.id()])
619            self.assertFalse(support.match_test(test_access))
620            self.assertFalse(support.match_test(test_chdir))
621
622            support.set_match_tests(None, ['test_access', 'DONTMATCH'])
623            self.assertFalse(support.match_test(test_access))
624            self.assertTrue(support.match_test(test_chdir))
625
626    def test_fd_count(self):
627        # We cannot test the absolute value of fd_count(): on old Linux
628        # kernel or glibc versions, os.urandom() keeps a FD open on
629        # /dev/urandom device and Python has 4 FD opens instead of 3.
630        start = support.fd_count()
631        fd = os.open(__file__, os.O_RDONLY)
632        try:
633            more = support.fd_count()
634        finally:
635            os.close(fd)
636        self.assertEqual(more - start, 1)
637
638    # XXX -follows a list of untested API
639    # make_legacy_pyc
640    # is_resource_enabled
641    # requires
642    # fcmp
643    # umaks
644    # findfile
645    # check_warnings
646    # EnvironmentVarGuard
647    # TransientResource
648    # transient_internet
649    # run_with_locale
650    # set_memlimit
651    # bigmemtest
652    # precisionbigmemtest
653    # bigaddrspacetest
654    # requires_resource
655    # run_doctest
656    # threading_cleanup
657    # reap_threads
658    # strip_python_stderr
659    # can_symlink
660    # skip_unless_symlink
661    # SuppressCrashReport
662
663
664def test_main():
665    tests = [TestSupport]
666    support.run_unittest(*tests)
667
668if __name__ == '__main__':
669    test_main()
670