1# tests __main__ module handling in multiprocessing 2from test import support 3# Skip tests if _multiprocessing wasn't built. 4support.import_module('_multiprocessing') 5 6import importlib 7import importlib.machinery 8import unittest 9import sys 10import os 11import os.path 12import py_compile 13 14from test.support.script_helper import ( 15 make_pkg, make_script, make_zip_pkg, make_zip_script, 16 assert_python_ok) 17 18if support.PGO: 19 raise unittest.SkipTest("test is not helpful for PGO") 20 21# Look up which start methods are available to test 22import multiprocess as multiprocessing 23AVAILABLE_START_METHODS = set(multiprocessing.get_all_start_methods()) 24 25# Issue #22332: Skip tests if sem_open implementation is broken. 26support.import_module('multiprocess.synchronize') 27 28verbose = support.verbose 29 30test_source = """\ 31# multiprocessing includes all sorts of shenanigans to make __main__ 32# attributes accessible in the subprocess in a pickle compatible way. 33 34# We run the "doesn't work in the interactive interpreter" example from 35# the docs to make sure it *does* work from an executed __main__, 36# regardless of the invocation mechanism 37 38import sys 39import time 40from multiprocess import Pool, set_start_method 41 42# We use this __main__ defined function in the map call below in order to 43# check that multiprocessing in correctly running the unguarded 44# code in child processes and then making it available as __main__ 45def f(x): 46 return x*x 47 48# Check explicit relative imports 49if "check_sibling" in __file__: 50 # We're inside a package and not in a __main__.py file 51 # so make sure explicit relative imports work correctly 52 from . import sibling 53 54if __name__ == '__main__': 55 start_method = sys.argv[1] 56 set_start_method(start_method) 57 results = [] 58 with Pool(5) as pool: 59 pool.map_async(f, [1, 2, 3], callback=results.extend) 60 start_time = getattr(time,'monotonic',time.time)() 61 while not results: 62 time.sleep(0.05) 63 # up to 1 min to report the results 64 dt = getattr(time,'monotonic',time.time)() - start_time 65 if dt > 60.0: 66 raise RuntimeError("Timed out waiting for results (%.1f sec)" % dt) 67 results.sort() 68 print(start_method, "->", results) 69 70 pool.join() 71""" 72 73test_source_main_skipped_in_children = """\ 74# __main__.py files have an implied "if __name__ == '__main__'" so 75# multiprocessing should always skip running them in child processes 76 77# This means we can't use __main__ defined functions in child processes, 78# so we just use "int" as a passthrough operation below 79 80if __name__ != "__main__": 81 raise RuntimeError("Should only be called as __main__!") 82 83import sys 84import time 85from multiprocess import Pool, set_start_method 86 87start_method = sys.argv[1] 88set_start_method(start_method) 89results = [] 90with Pool(5) as pool: 91 pool.map_async(int, [1, 4, 9], callback=results.extend) 92 start_time = getattr(time,'monotonic',time.time)() 93 while not results: 94 time.sleep(0.05) 95 # up to 1 min to report the results 96 dt = getattr(time,'monotonic',time.time)() - start_time 97 if dt > 60.0: 98 raise RuntimeError("Timed out waiting for results (%.1f sec)" % dt) 99 100results.sort() 101print(start_method, "->", results) 102 103pool.join() 104""" 105 106# These helpers were copied from test_cmd_line_script & tweaked a bit... 107 108def _make_test_script(script_dir, script_basename, 109 source=test_source, omit_suffix=False): 110 to_return = make_script(script_dir, script_basename, 111 source, omit_suffix) 112 # Hack to check explicit relative imports 113 if script_basename == "check_sibling": 114 make_script(script_dir, "sibling", "") 115 importlib.invalidate_caches() 116 return to_return 117 118def _make_test_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename, 119 source=test_source, depth=1): 120 to_return = make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename, 121 source, depth) 122 importlib.invalidate_caches() 123 return to_return 124 125# There's no easy way to pass the script directory in to get 126# -m to work (avoiding that is the whole point of making 127# directories and zipfiles executable!) 128# So we fake it for testing purposes with a custom launch script 129launch_source = """\ 130import sys, os.path, runpy 131sys.path.insert(0, %s) 132runpy._run_module_as_main(%r) 133""" 134 135def _make_launch_script(script_dir, script_basename, module_name, path=None): 136 if path is None: 137 path = "os.path.dirname(__file__)" 138 else: 139 path = repr(path) 140 source = launch_source % (path, module_name) 141 to_return = make_script(script_dir, script_basename, source) 142 importlib.invalidate_caches() 143 return to_return 144 145class MultiProcessingCmdLineMixin(): 146 maxDiff = None # Show full tracebacks on subprocess failure 147 148 def setUp(self): 149 if self.start_method not in AVAILABLE_START_METHODS: 150 self.skipTest("%r start method not available" % self.start_method) 151 152 def _check_output(self, script_name, exit_code, out, err): 153 if verbose > 1: 154 print("Output from test script %r:" % script_name) 155 print(repr(out)) 156 self.assertEqual(exit_code, 0) 157 self.assertEqual(err.decode('utf-8'), '') 158 expected_results = "%s -> [1, 4, 9]" % self.start_method 159 self.assertEqual(out.decode('utf-8').strip(), expected_results) 160 161 def _check_script(self, script_name, *cmd_line_switches): 162 if not __debug__: 163 cmd_line_switches += ('-' + 'O' * sys.flags.optimize,) 164 run_args = cmd_line_switches + (script_name, self.start_method) 165 rc, out, err = assert_python_ok(*run_args, __isolated=False) 166 self._check_output(script_name, rc, out, err) 167 168 def test_basic_script(self): 169 with support.temp_dir() as script_dir: 170 script_name = _make_test_script(script_dir, 'script') 171 self._check_script(script_name) 172 173 def test_basic_script_no_suffix(self): 174 with support.temp_dir() as script_dir: 175 script_name = _make_test_script(script_dir, 'script', 176 omit_suffix=True) 177 self._check_script(script_name) 178 179 def test_ipython_workaround(self): 180 # Some versions of the IPython launch script are missing the 181 # __name__ = "__main__" guard, and multiprocessing has long had 182 # a workaround for that case 183 # See https://github.com/ipython/ipython/issues/4698 184 source = test_source_main_skipped_in_children 185 with support.temp_dir() as script_dir: 186 script_name = _make_test_script(script_dir, 'ipython', 187 source=source) 188 self._check_script(script_name) 189 script_no_suffix = _make_test_script(script_dir, 'ipython', 190 source=source, 191 omit_suffix=True) 192 self._check_script(script_no_suffix) 193 194 def test_script_compiled(self): 195 with support.temp_dir() as script_dir: 196 script_name = _make_test_script(script_dir, 'script') 197 py_compile.compile(script_name, doraise=True) 198 os.remove(script_name) 199 pyc_file = support.make_legacy_pyc(script_name) 200 self._check_script(pyc_file) 201 202 def test_directory(self): 203 source = self.main_in_children_source 204 with support.temp_dir() as script_dir: 205 script_name = _make_test_script(script_dir, '__main__', 206 source=source) 207 self._check_script(script_dir) 208 209 def test_directory_compiled(self): 210 source = self.main_in_children_source 211 with support.temp_dir() as script_dir: 212 script_name = _make_test_script(script_dir, '__main__', 213 source=source) 214 py_compile.compile(script_name, doraise=True) 215 os.remove(script_name) 216 pyc_file = support.make_legacy_pyc(script_name) 217 self._check_script(script_dir) 218 219 def test_zipfile(self): 220 source = self.main_in_children_source 221 with support.temp_dir() as script_dir: 222 script_name = _make_test_script(script_dir, '__main__', 223 source=source) 224 zip_name, run_name = make_zip_script(script_dir, 'test_zip', script_name) 225 self._check_script(zip_name) 226 227 def test_zipfile_compiled(self): 228 source = self.main_in_children_source 229 with support.temp_dir() as script_dir: 230 script_name = _make_test_script(script_dir, '__main__', 231 source=source) 232 compiled_name = py_compile.compile(script_name, doraise=True) 233 zip_name, run_name = make_zip_script(script_dir, 'test_zip', compiled_name) 234 self._check_script(zip_name) 235 236 def test_module_in_package(self): 237 with support.temp_dir() as script_dir: 238 pkg_dir = os.path.join(script_dir, 'test_pkg') 239 make_pkg(pkg_dir) 240 script_name = _make_test_script(pkg_dir, 'check_sibling') 241 launch_name = _make_launch_script(script_dir, 'launch', 242 'test_pkg.check_sibling') 243 self._check_script(launch_name) 244 245 def test_module_in_package_in_zipfile(self): 246 with support.temp_dir() as script_dir: 247 zip_name, run_name = _make_test_zip_pkg(script_dir, 'test_zip', 'test_pkg', 'script') 248 launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg.script', zip_name) 249 self._check_script(launch_name) 250 251 def test_module_in_subpackage_in_zipfile(self): 252 with support.temp_dir() as script_dir: 253 zip_name, run_name = _make_test_zip_pkg(script_dir, 'test_zip', 'test_pkg', 'script', depth=2) 254 launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg.test_pkg.script', zip_name) 255 self._check_script(launch_name) 256 257 def test_package(self): 258 source = self.main_in_children_source 259 with support.temp_dir() as script_dir: 260 pkg_dir = os.path.join(script_dir, 'test_pkg') 261 make_pkg(pkg_dir) 262 script_name = _make_test_script(pkg_dir, '__main__', 263 source=source) 264 launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg') 265 self._check_script(launch_name) 266 267 def test_package_compiled(self): 268 source = self.main_in_children_source 269 with support.temp_dir() as script_dir: 270 pkg_dir = os.path.join(script_dir, 'test_pkg') 271 make_pkg(pkg_dir) 272 script_name = _make_test_script(pkg_dir, '__main__', 273 source=source) 274 compiled_name = py_compile.compile(script_name, doraise=True) 275 os.remove(script_name) 276 pyc_file = support.make_legacy_pyc(script_name) 277 launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg') 278 self._check_script(launch_name) 279 280# Test all supported start methods (setupClass skips as appropriate) 281 282class SpawnCmdLineTest(MultiProcessingCmdLineMixin, unittest.TestCase): 283 start_method = 'spawn' 284 main_in_children_source = test_source_main_skipped_in_children 285 286class ForkCmdLineTest(MultiProcessingCmdLineMixin, unittest.TestCase): 287 start_method = 'fork' 288 main_in_children_source = test_source 289 290class ForkServerCmdLineTest(MultiProcessingCmdLineMixin, unittest.TestCase): 291 start_method = 'forkserver' 292 main_in_children_source = test_source_main_skipped_in_children 293 294def tearDownModule(): 295 support.reap_children() 296 297if __name__ == '__main__': 298 unittest.main() 299