1"""This test checks for correct fork() behavior.
2"""
3
4import _imp as imp
5import os
6import signal
7import sys
8import threading
9import time
10import unittest
11
12from test.fork_wait import ForkWait
13from test import support
14
15
16# Skip test if fork does not exist.
17support.get_attribute(os, 'fork')
18
19class ForkTest(ForkWait):
20    def test_threaded_import_lock_fork(self):
21        """Check fork() in main thread works while a subthread is doing an import"""
22        import_started = threading.Event()
23        fake_module_name = "fake test module"
24        partial_module = "partial"
25        complete_module = "complete"
26        def importer():
27            imp.acquire_lock()
28            sys.modules[fake_module_name] = partial_module
29            import_started.set()
30            time.sleep(0.01) # Give the other thread time to try and acquire.
31            sys.modules[fake_module_name] = complete_module
32            imp.release_lock()
33        t = threading.Thread(target=importer)
34        t.start()
35        import_started.wait()
36        exitcode = 42
37        pid = os.fork()
38        try:
39            # PyOS_BeforeFork should have waited for the import to complete
40            # before forking, so the child can recreate the import lock
41            # correctly, but also won't see a partially initialised module
42            if not pid:
43                m = __import__(fake_module_name)
44                if m == complete_module:
45                    os._exit(exitcode)
46                else:
47                    if support.verbose > 1:
48                        print("Child encountered partial module")
49                    os._exit(1)
50            else:
51                t.join()
52                # Exitcode 1 means the child got a partial module (bad.) No
53                # exitcode (but a hang, which manifests as 'got pid 0')
54                # means the child deadlocked (also bad.)
55                self.wait_impl(pid, exitcode=exitcode)
56        finally:
57            try:
58                os.kill(pid, signal.SIGKILL)
59            except OSError:
60                pass
61
62
63    def test_nested_import_lock_fork(self):
64        """Check fork() in main thread works while the main thread is doing an import"""
65        exitcode = 42
66        # Issue 9573: this used to trigger RuntimeError in the child process
67        def fork_with_import_lock(level):
68            release = 0
69            in_child = False
70            try:
71                try:
72                    for i in range(level):
73                        imp.acquire_lock()
74                        release += 1
75                    pid = os.fork()
76                    in_child = not pid
77                finally:
78                    for i in range(release):
79                        imp.release_lock()
80            except RuntimeError:
81                if in_child:
82                    if support.verbose > 1:
83                        print("RuntimeError in child")
84                    os._exit(1)
85                raise
86            if in_child:
87                os._exit(exitcode)
88            self.wait_impl(pid, exitcode=exitcode)
89
90        # Check this works with various levels of nested
91        # import in the main thread
92        for level in range(5):
93            fork_with_import_lock(level)
94
95
96def tearDownModule():
97    support.reap_children()
98
99if __name__ == "__main__":
100    unittest.main()
101