1# tests __main__ module handling in multiprocessing
2from test import support
3# Skip tests if _multiprocessing wasn't built.
4support.import_module('_multiprocessing')
5
6import importlib
7import importlib.machinery
8import unittest
9import sys
10import os
11import os.path
12import py_compile
13
14from test.support.script_helper import (
15    make_pkg, make_script, make_zip_pkg, make_zip_script,
16    assert_python_ok)
17
18if support.PGO:
19    raise unittest.SkipTest("test is not helpful for PGO")
20
21# Look up which start methods are available to test
22import multiprocess as multiprocessing
23AVAILABLE_START_METHODS = set(multiprocessing.get_all_start_methods())
24
25# Issue #22332: Skip tests if sem_open implementation is broken.
26support.import_module('multiprocess.synchronize')
27
28verbose = support.verbose
29
30test_source = """\
31# multiprocessing includes all sorts of shenanigans to make __main__
32# attributes accessible in the subprocess in a pickle compatible way.
33
34# We run the "doesn't work in the interactive interpreter" example from
35# the docs to make sure it *does* work from an executed __main__,
36# regardless of the invocation mechanism
37
38import sys
39import time
40from multiprocess import Pool, set_start_method
41
42# We use this __main__ defined function in the map call below in order to
43# check that multiprocessing in correctly running the unguarded
44# code in child processes and then making it available as __main__
45def f(x):
46    return x*x
47
48# Check explicit relative imports
49if "check_sibling" in __file__:
50    # We're inside a package and not in a __main__.py file
51    # so make sure explicit relative imports work correctly
52    from . import sibling
53
54if __name__ == '__main__':
55    start_method = sys.argv[1]
56    set_start_method(start_method)
57    results = []
58    with Pool(5) as pool:
59        pool.map_async(f, [1, 2, 3], callback=results.extend)
60        start_time = getattr(time,'monotonic',time.time)()
61        while not results:
62            time.sleep(0.05)
63            # up to 1 min to report the results
64            dt = getattr(time,'monotonic',time.time)() - start_time
65            if dt > 60.0:
66                raise RuntimeError("Timed out waiting for results (%.1f sec)" % dt)
67
68    results.sort()
69    print(start_method, "->", results)
70
71    pool.join()
72"""
73
74test_source_main_skipped_in_children = """\
75# __main__.py files have an implied "if __name__ == '__main__'" so
76# multiprocessing should always skip running them in child processes
77
78# This means we can't use __main__ defined functions in child processes,
79# so we just use "int" as a passthrough operation below
80
81if __name__ != "__main__":
82    raise RuntimeError("Should only be called as __main__!")
83
84import sys
85import time
86from multiprocess import Pool, set_start_method
87
88start_method = sys.argv[1]
89set_start_method(start_method)
90results = []
91with Pool(5) as pool:
92    pool.map_async(int, [1, 4, 9], callback=results.extend)
93    start_time = getattr(time,'monotonic',time.time)()
94    while not results:
95        time.sleep(0.05)
96        # up to 1 min to report the results
97        dt = getattr(time,'monotonic',time.time)() - start_time
98        if dt > 60.0:
99            raise RuntimeError("Timed out waiting for results (%.1f sec)" % dt)
100
101results.sort()
102print(start_method, "->", results)
103
104pool.join()
105"""
106
107# These helpers were copied from test_cmd_line_script & tweaked a bit...
108
109def _make_test_script(script_dir, script_basename,
110                      source=test_source, omit_suffix=False):
111    to_return = make_script(script_dir, script_basename,
112                            source, omit_suffix)
113    # Hack to check explicit relative imports
114    if script_basename == "check_sibling":
115        make_script(script_dir, "sibling", "")
116    importlib.invalidate_caches()
117    return to_return
118
119def _make_test_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename,
120                       source=test_source, depth=1):
121    to_return = make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename,
122                             source, depth)
123    importlib.invalidate_caches()
124    return to_return
125
126# There's no easy way to pass the script directory in to get
127# -m to work (avoiding that is the whole point of making
128# directories and zipfiles executable!)
129# So we fake it for testing purposes with a custom launch script
130launch_source = """\
131import sys, os.path, runpy
132sys.path.insert(0, %s)
133runpy._run_module_as_main(%r)
134"""
135
136def _make_launch_script(script_dir, script_basename, module_name, path=None):
137    if path is None:
138        path = "os.path.dirname(__file__)"
139    else:
140        path = repr(path)
141    source = launch_source % (path, module_name)
142    to_return = make_script(script_dir, script_basename, source)
143    importlib.invalidate_caches()
144    return to_return
145
146class MultiProcessingCmdLineMixin():
147    maxDiff = None # Show full tracebacks on subprocess failure
148
149    def setUp(self):
150        if self.start_method not in AVAILABLE_START_METHODS:
151            self.skipTest("%r start method not available" % self.start_method)
152
153    def _check_output(self, script_name, exit_code, out, err):
154        if verbose > 1:
155            print("Output from test script %r:" % script_name)
156            print(repr(out))
157        self.assertEqual(exit_code, 0)
158        self.assertEqual(err.decode('utf-8'), '')
159        expected_results = "%s -> [1, 4, 9]" % self.start_method
160        self.assertEqual(out.decode('utf-8').strip(), expected_results)
161
162    def _check_script(self, script_name, *cmd_line_switches):
163        if not __debug__:
164            cmd_line_switches += ('-' + 'O' * sys.flags.optimize,)
165        run_args = cmd_line_switches + (script_name, self.start_method)
166        rc, out, err = assert_python_ok(*run_args, __isolated=False)
167        self._check_output(script_name, rc, out, err)
168
169    def test_basic_script(self):
170        with support.temp_dir() as script_dir:
171            script_name = _make_test_script(script_dir, 'script')
172            self._check_script(script_name)
173
174    def test_basic_script_no_suffix(self):
175        with support.temp_dir() as script_dir:
176            script_name = _make_test_script(script_dir, 'script',
177                                            omit_suffix=True)
178            self._check_script(script_name)
179
180    def test_ipython_workaround(self):
181        # Some versions of the IPython launch script are missing the
182        # __name__ = "__main__" guard, and multiprocessing has long had
183        # a workaround for that case
184        # See https://github.com/ipython/ipython/issues/4698
185        source = test_source_main_skipped_in_children
186        with support.temp_dir() as script_dir:
187            script_name = _make_test_script(script_dir, 'ipython',
188                                            source=source)
189            self._check_script(script_name)
190            script_no_suffix = _make_test_script(script_dir, 'ipython',
191                                                 source=source,
192                                                 omit_suffix=True)
193            self._check_script(script_no_suffix)
194
195    def test_script_compiled(self):
196        with support.temp_dir() as script_dir:
197            script_name = _make_test_script(script_dir, 'script')
198            py_compile.compile(script_name, doraise=True)
199            os.remove(script_name)
200            pyc_file = support.make_legacy_pyc(script_name)
201            self._check_script(pyc_file)
202
203    def test_directory(self):
204        source = self.main_in_children_source
205        with support.temp_dir() as script_dir:
206            script_name = _make_test_script(script_dir, '__main__',
207                                            source=source)
208            self._check_script(script_dir)
209
210    def test_directory_compiled(self):
211        source = self.main_in_children_source
212        with support.temp_dir() as script_dir:
213            script_name = _make_test_script(script_dir, '__main__',
214                                            source=source)
215            py_compile.compile(script_name, doraise=True)
216            os.remove(script_name)
217            pyc_file = support.make_legacy_pyc(script_name)
218            self._check_script(script_dir)
219
220    def test_zipfile(self):
221        source = self.main_in_children_source
222        with support.temp_dir() as script_dir:
223            script_name = _make_test_script(script_dir, '__main__',
224                                            source=source)
225            zip_name, run_name = make_zip_script(script_dir, 'test_zip', script_name)
226            self._check_script(zip_name)
227
228    def test_zipfile_compiled(self):
229        source = self.main_in_children_source
230        with support.temp_dir() as script_dir:
231            script_name = _make_test_script(script_dir, '__main__',
232                                            source=source)
233            compiled_name = py_compile.compile(script_name, doraise=True)
234            zip_name, run_name = make_zip_script(script_dir, 'test_zip', compiled_name)
235            self._check_script(zip_name)
236
237    def test_module_in_package(self):
238        with support.temp_dir() as script_dir:
239            pkg_dir = os.path.join(script_dir, 'test_pkg')
240            make_pkg(pkg_dir)
241            script_name = _make_test_script(pkg_dir, 'check_sibling')
242            launch_name = _make_launch_script(script_dir, 'launch',
243                                              'test_pkg.check_sibling')
244            self._check_script(launch_name)
245
246    def test_module_in_package_in_zipfile(self):
247        with support.temp_dir() as script_dir:
248            zip_name, run_name = _make_test_zip_pkg(script_dir, 'test_zip', 'test_pkg', 'script')
249            launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg.script', zip_name)
250            self._check_script(launch_name)
251
252    def test_module_in_subpackage_in_zipfile(self):
253        with support.temp_dir() as script_dir:
254            zip_name, run_name = _make_test_zip_pkg(script_dir, 'test_zip', 'test_pkg', 'script', depth=2)
255            launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg.test_pkg.script', zip_name)
256            self._check_script(launch_name)
257
258    def test_package(self):
259        source = self.main_in_children_source
260        with support.temp_dir() as script_dir:
261            pkg_dir = os.path.join(script_dir, 'test_pkg')
262            make_pkg(pkg_dir)
263            script_name = _make_test_script(pkg_dir, '__main__',
264                                            source=source)
265            launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg')
266            self._check_script(launch_name)
267
268    def test_package_compiled(self):
269        source = self.main_in_children_source
270        with support.temp_dir() as script_dir:
271            pkg_dir = os.path.join(script_dir, 'test_pkg')
272            make_pkg(pkg_dir)
273            script_name = _make_test_script(pkg_dir, '__main__',
274                                            source=source)
275            compiled_name = py_compile.compile(script_name, doraise=True)
276            os.remove(script_name)
277            pyc_file = support.make_legacy_pyc(script_name)
278            launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg')
279            self._check_script(launch_name)
280
281# Test all supported start methods (setupClass skips as appropriate)
282
283class SpawnCmdLineTest(MultiProcessingCmdLineMixin, unittest.TestCase):
284    start_method = 'spawn'
285    main_in_children_source = test_source_main_skipped_in_children
286
287class ForkCmdLineTest(MultiProcessingCmdLineMixin, unittest.TestCase):
288    start_method = 'fork'
289    main_in_children_source = test_source
290
291class ForkServerCmdLineTest(MultiProcessingCmdLineMixin, unittest.TestCase):
292    start_method = 'forkserver'
293    main_in_children_source = test_source_main_skipped_in_children
294
295def tearDownModule():
296    support.reap_children()
297
298if __name__ == '__main__':
299    unittest.main()
300