1import sys 2import compileall 3import importlib.util 4import test.test_importlib.util 5import os 6import pathlib 7import py_compile 8import shutil 9import struct 10import tempfile 11import time 12import unittest 13import io 14 15from unittest import mock, skipUnless 16try: 17 from concurrent.futures import ProcessPoolExecutor 18 _have_multiprocessing = True 19except ImportError: 20 _have_multiprocessing = False 21 22from test import support 23from test.support import script_helper 24 25from .test_py_compile import without_source_date_epoch 26from .test_py_compile import SourceDateEpochTestMeta 27 28 29class CompileallTestsBase: 30 31 def setUp(self): 32 self.directory = tempfile.mkdtemp() 33 self.source_path = os.path.join(self.directory, '_test.py') 34 self.bc_path = importlib.util.cache_from_source(self.source_path) 35 with open(self.source_path, 'w') as file: 36 file.write('x = 123\n') 37 self.source_path2 = os.path.join(self.directory, '_test2.py') 38 self.bc_path2 = importlib.util.cache_from_source(self.source_path2) 39 shutil.copyfile(self.source_path, self.source_path2) 40 self.subdirectory = os.path.join(self.directory, '_subdir') 41 os.mkdir(self.subdirectory) 42 self.source_path3 = os.path.join(self.subdirectory, '_test3.py') 43 shutil.copyfile(self.source_path, self.source_path3) 44 45 def tearDown(self): 46 shutil.rmtree(self.directory) 47 48 def add_bad_source_file(self): 49 self.bad_source_path = os.path.join(self.directory, '_test_bad.py') 50 with open(self.bad_source_path, 'w') as file: 51 file.write('x (\n') 52 53 def timestamp_metadata(self): 54 with open(self.bc_path, 'rb') as file: 55 data = file.read(12) 56 mtime = int(os.stat(self.source_path).st_mtime) 57 compare = struct.pack('<4sll', importlib.util.MAGIC_NUMBER, 0, mtime) 58 return data, compare 59 60 def recreation_check(self, metadata): 61 """Check that compileall recreates bytecode when the new metadata is 62 used.""" 63 if os.environ.get('SOURCE_DATE_EPOCH'): 64 raise unittest.SkipTest('SOURCE_DATE_EPOCH is set') 65 py_compile.compile(self.source_path) 66 self.assertEqual(*self.timestamp_metadata()) 67 with open(self.bc_path, 'rb') as file: 68 bc = file.read()[len(metadata):] 69 with open(self.bc_path, 'wb') as file: 70 file.write(metadata) 71 file.write(bc) 72 self.assertNotEqual(*self.timestamp_metadata()) 73 compileall.compile_dir(self.directory, force=False, quiet=True) 74 self.assertTrue(*self.timestamp_metadata()) 75 76 def test_mtime(self): 77 # Test a change in mtime leads to a new .pyc. 78 self.recreation_check(struct.pack('<4sll', importlib.util.MAGIC_NUMBER, 79 0, 1)) 80 81 def test_magic_number(self): 82 # Test a change in mtime leads to a new .pyc. 83 self.recreation_check(b'\0\0\0\0') 84 85 def test_compile_files(self): 86 # Test compiling a single file, and complete directory 87 for fn in (self.bc_path, self.bc_path2): 88 try: 89 os.unlink(fn) 90 except: 91 pass 92 self.assertTrue(compileall.compile_file(self.source_path, 93 force=False, quiet=True)) 94 self.assertTrue(os.path.isfile(self.bc_path) and 95 not os.path.isfile(self.bc_path2)) 96 os.unlink(self.bc_path) 97 self.assertTrue(compileall.compile_dir(self.directory, force=False, 98 quiet=True)) 99 self.assertTrue(os.path.isfile(self.bc_path) and 100 os.path.isfile(self.bc_path2)) 101 os.unlink(self.bc_path) 102 os.unlink(self.bc_path2) 103 # Test against bad files 104 self.add_bad_source_file() 105 self.assertFalse(compileall.compile_file(self.bad_source_path, 106 force=False, quiet=2)) 107 self.assertFalse(compileall.compile_dir(self.directory, 108 force=False, quiet=2)) 109 110 def test_compile_file_pathlike(self): 111 self.assertFalse(os.path.isfile(self.bc_path)) 112 # we should also test the output 113 with support.captured_stdout() as stdout: 114 self.assertTrue(compileall.compile_file(pathlib.Path(self.source_path))) 115 self.assertRegex(stdout.getvalue(), r'Compiling ([^WindowsPath|PosixPath].*)') 116 self.assertTrue(os.path.isfile(self.bc_path)) 117 118 def test_compile_file_pathlike_ddir(self): 119 self.assertFalse(os.path.isfile(self.bc_path)) 120 self.assertTrue(compileall.compile_file(pathlib.Path(self.source_path), 121 ddir=pathlib.Path('ddir_path'), 122 quiet=2)) 123 self.assertTrue(os.path.isfile(self.bc_path)) 124 125 def test_compile_path(self): 126 with test.test_importlib.util.import_state(path=[self.directory]): 127 self.assertTrue(compileall.compile_path(quiet=2)) 128 129 with test.test_importlib.util.import_state(path=[self.directory]): 130 self.add_bad_source_file() 131 self.assertFalse(compileall.compile_path(skip_curdir=False, 132 force=True, quiet=2)) 133 134 def test_no_pycache_in_non_package(self): 135 # Bug 8563 reported that __pycache__ directories got created by 136 # compile_file() for non-.py files. 137 data_dir = os.path.join(self.directory, 'data') 138 data_file = os.path.join(data_dir, 'file') 139 os.mkdir(data_dir) 140 # touch data/file 141 with open(data_file, 'w'): 142 pass 143 compileall.compile_file(data_file) 144 self.assertFalse(os.path.exists(os.path.join(data_dir, '__pycache__'))) 145 146 def test_optimize(self): 147 # make sure compiling with different optimization settings than the 148 # interpreter's creates the correct file names 149 optimize, opt = (1, 1) if __debug__ else (0, '') 150 compileall.compile_dir(self.directory, quiet=True, optimize=optimize) 151 cached = importlib.util.cache_from_source(self.source_path, 152 optimization=opt) 153 self.assertTrue(os.path.isfile(cached)) 154 cached2 = importlib.util.cache_from_source(self.source_path2, 155 optimization=opt) 156 self.assertTrue(os.path.isfile(cached2)) 157 cached3 = importlib.util.cache_from_source(self.source_path3, 158 optimization=opt) 159 self.assertTrue(os.path.isfile(cached3)) 160 161 def test_compile_dir_pathlike(self): 162 self.assertFalse(os.path.isfile(self.bc_path)) 163 with support.captured_stdout() as stdout: 164 compileall.compile_dir(pathlib.Path(self.directory)) 165 line = stdout.getvalue().splitlines()[0] 166 self.assertRegex(line, r'Listing ([^WindowsPath|PosixPath].*)') 167 self.assertTrue(os.path.isfile(self.bc_path)) 168 169 @mock.patch('concurrent.futures.ProcessPoolExecutor') 170 def test_compile_pool_called(self, pool_mock): 171 compileall.compile_dir(self.directory, quiet=True, workers=5) 172 self.assertTrue(pool_mock.called) 173 174 def test_compile_workers_non_positive(self): 175 with self.assertRaisesRegex(ValueError, 176 "workers must be greater or equal to 0"): 177 compileall.compile_dir(self.directory, workers=-1) 178 179 @mock.patch('concurrent.futures.ProcessPoolExecutor') 180 def test_compile_workers_cpu_count(self, pool_mock): 181 compileall.compile_dir(self.directory, quiet=True, workers=0) 182 self.assertEqual(pool_mock.call_args[1]['max_workers'], None) 183 184 @mock.patch('concurrent.futures.ProcessPoolExecutor') 185 @mock.patch('compileall.compile_file') 186 def test_compile_one_worker(self, compile_file_mock, pool_mock): 187 compileall.compile_dir(self.directory, quiet=True) 188 self.assertFalse(pool_mock.called) 189 self.assertTrue(compile_file_mock.called) 190 191 @mock.patch('concurrent.futures.ProcessPoolExecutor', new=None) 192 @mock.patch('compileall.compile_file') 193 def test_compile_missing_multiprocessing(self, compile_file_mock): 194 compileall.compile_dir(self.directory, quiet=True, workers=5) 195 self.assertTrue(compile_file_mock.called) 196 197 198class CompileallTestsWithSourceEpoch(CompileallTestsBase, 199 unittest.TestCase, 200 metaclass=SourceDateEpochTestMeta, 201 source_date_epoch=True): 202 pass 203 204 205class CompileallTestsWithoutSourceEpoch(CompileallTestsBase, 206 unittest.TestCase, 207 metaclass=SourceDateEpochTestMeta, 208 source_date_epoch=False): 209 pass 210 211 212class EncodingTest(unittest.TestCase): 213 """Issue 6716: compileall should escape source code when printing errors 214 to stdout.""" 215 216 def setUp(self): 217 self.directory = tempfile.mkdtemp() 218 self.source_path = os.path.join(self.directory, '_test.py') 219 with open(self.source_path, 'w', encoding='utf-8') as file: 220 file.write('# -*- coding: utf-8 -*-\n') 221 file.write('print u"\u20ac"\n') 222 223 def tearDown(self): 224 shutil.rmtree(self.directory) 225 226 def test_error(self): 227 try: 228 orig_stdout = sys.stdout 229 sys.stdout = io.TextIOWrapper(io.BytesIO(),encoding='ascii') 230 compileall.compile_dir(self.directory) 231 finally: 232 sys.stdout = orig_stdout 233 234 235class CommandLineTestsBase: 236 """Test compileall's CLI.""" 237 238 @classmethod 239 def setUpClass(cls): 240 for path in filter(os.path.isdir, sys.path): 241 directory_created = False 242 directory = pathlib.Path(path) / '__pycache__' 243 path = directory / 'test.try' 244 try: 245 if not directory.is_dir(): 246 directory.mkdir() 247 directory_created = True 248 with path.open('w') as file: 249 file.write('# for test_compileall') 250 except OSError: 251 sys_path_writable = False 252 break 253 finally: 254 support.unlink(str(path)) 255 if directory_created: 256 directory.rmdir() 257 else: 258 sys_path_writable = True 259 cls._sys_path_writable = sys_path_writable 260 261 def _skip_if_sys_path_not_writable(self): 262 if not self._sys_path_writable: 263 raise unittest.SkipTest('not all entries on sys.path are writable') 264 265 def _get_run_args(self, args): 266 return [*support.optim_args_from_interpreter_flags(), 267 '-S', '-m', 'compileall', 268 *args] 269 270 def assertRunOK(self, *args, **env_vars): 271 rc, out, err = script_helper.assert_python_ok( 272 *self._get_run_args(args), **env_vars) 273 self.assertEqual(b'', err) 274 return out 275 276 def assertRunNotOK(self, *args, **env_vars): 277 rc, out, err = script_helper.assert_python_failure( 278 *self._get_run_args(args), **env_vars) 279 return rc, out, err 280 281 def assertCompiled(self, fn): 282 path = importlib.util.cache_from_source(fn) 283 self.assertTrue(os.path.exists(path)) 284 285 def assertNotCompiled(self, fn): 286 path = importlib.util.cache_from_source(fn) 287 self.assertFalse(os.path.exists(path)) 288 289 def setUp(self): 290 self.directory = tempfile.mkdtemp() 291 self.addCleanup(support.rmtree, self.directory) 292 self.pkgdir = os.path.join(self.directory, 'foo') 293 os.mkdir(self.pkgdir) 294 self.pkgdir_cachedir = os.path.join(self.pkgdir, '__pycache__') 295 # Create the __init__.py and a package module. 296 self.initfn = script_helper.make_script(self.pkgdir, '__init__', '') 297 self.barfn = script_helper.make_script(self.pkgdir, 'bar', '') 298 299 def test_no_args_compiles_path(self): 300 # Note that -l is implied for the no args case. 301 self._skip_if_sys_path_not_writable() 302 bazfn = script_helper.make_script(self.directory, 'baz', '') 303 self.assertRunOK(PYTHONPATH=self.directory) 304 self.assertCompiled(bazfn) 305 self.assertNotCompiled(self.initfn) 306 self.assertNotCompiled(self.barfn) 307 308 @without_source_date_epoch # timestamp invalidation test 309 def test_no_args_respects_force_flag(self): 310 self._skip_if_sys_path_not_writable() 311 bazfn = script_helper.make_script(self.directory, 'baz', '') 312 self.assertRunOK(PYTHONPATH=self.directory) 313 pycpath = importlib.util.cache_from_source(bazfn) 314 # Set atime/mtime backward to avoid file timestamp resolution issues 315 os.utime(pycpath, (time.time()-60,)*2) 316 mtime = os.stat(pycpath).st_mtime 317 # Without force, no recompilation 318 self.assertRunOK(PYTHONPATH=self.directory) 319 mtime2 = os.stat(pycpath).st_mtime 320 self.assertEqual(mtime, mtime2) 321 # Now force it. 322 self.assertRunOK('-f', PYTHONPATH=self.directory) 323 mtime2 = os.stat(pycpath).st_mtime 324 self.assertNotEqual(mtime, mtime2) 325 326 def test_no_args_respects_quiet_flag(self): 327 self._skip_if_sys_path_not_writable() 328 script_helper.make_script(self.directory, 'baz', '') 329 noisy = self.assertRunOK(PYTHONPATH=self.directory) 330 self.assertIn(b'Listing ', noisy) 331 quiet = self.assertRunOK('-q', PYTHONPATH=self.directory) 332 self.assertNotIn(b'Listing ', quiet) 333 334 # Ensure that the default behavior of compileall's CLI is to create 335 # PEP 3147/PEP 488 pyc files. 336 for name, ext, switch in [ 337 ('normal', 'pyc', []), 338 ('optimize', 'opt-1.pyc', ['-O']), 339 ('doubleoptimize', 'opt-2.pyc', ['-OO']), 340 ]: 341 def f(self, ext=ext, switch=switch): 342 script_helper.assert_python_ok(*(switch + 343 ['-m', 'compileall', '-q', self.pkgdir])) 344 # Verify the __pycache__ directory contents. 345 self.assertTrue(os.path.exists(self.pkgdir_cachedir)) 346 expected = sorted(base.format(sys.implementation.cache_tag, ext) 347 for base in ('__init__.{}.{}', 'bar.{}.{}')) 348 self.assertEqual(sorted(os.listdir(self.pkgdir_cachedir)), expected) 349 # Make sure there are no .pyc files in the source directory. 350 self.assertFalse([fn for fn in os.listdir(self.pkgdir) 351 if fn.endswith(ext)]) 352 locals()['test_pep3147_paths_' + name] = f 353 354 def test_legacy_paths(self): 355 # Ensure that with the proper switch, compileall leaves legacy 356 # pyc files, and no __pycache__ directory. 357 self.assertRunOK('-b', '-q', self.pkgdir) 358 # Verify the __pycache__ directory contents. 359 self.assertFalse(os.path.exists(self.pkgdir_cachedir)) 360 expected = sorted(['__init__.py', '__init__.pyc', 'bar.py', 361 'bar.pyc']) 362 self.assertEqual(sorted(os.listdir(self.pkgdir)), expected) 363 364 def test_multiple_runs(self): 365 # Bug 8527 reported that multiple calls produced empty 366 # __pycache__/__pycache__ directories. 367 self.assertRunOK('-q', self.pkgdir) 368 # Verify the __pycache__ directory contents. 369 self.assertTrue(os.path.exists(self.pkgdir_cachedir)) 370 cachecachedir = os.path.join(self.pkgdir_cachedir, '__pycache__') 371 self.assertFalse(os.path.exists(cachecachedir)) 372 # Call compileall again. 373 self.assertRunOK('-q', self.pkgdir) 374 self.assertTrue(os.path.exists(self.pkgdir_cachedir)) 375 self.assertFalse(os.path.exists(cachecachedir)) 376 377 @without_source_date_epoch # timestamp invalidation test 378 def test_force(self): 379 self.assertRunOK('-q', self.pkgdir) 380 pycpath = importlib.util.cache_from_source(self.barfn) 381 # set atime/mtime backward to avoid file timestamp resolution issues 382 os.utime(pycpath, (time.time()-60,)*2) 383 mtime = os.stat(pycpath).st_mtime 384 # without force, no recompilation 385 self.assertRunOK('-q', self.pkgdir) 386 mtime2 = os.stat(pycpath).st_mtime 387 self.assertEqual(mtime, mtime2) 388 # now force it. 389 self.assertRunOK('-q', '-f', self.pkgdir) 390 mtime2 = os.stat(pycpath).st_mtime 391 self.assertNotEqual(mtime, mtime2) 392 393 def test_recursion_control(self): 394 subpackage = os.path.join(self.pkgdir, 'spam') 395 os.mkdir(subpackage) 396 subinitfn = script_helper.make_script(subpackage, '__init__', '') 397 hamfn = script_helper.make_script(subpackage, 'ham', '') 398 self.assertRunOK('-q', '-l', self.pkgdir) 399 self.assertNotCompiled(subinitfn) 400 self.assertFalse(os.path.exists(os.path.join(subpackage, '__pycache__'))) 401 self.assertRunOK('-q', self.pkgdir) 402 self.assertCompiled(subinitfn) 403 self.assertCompiled(hamfn) 404 405 def test_recursion_limit(self): 406 subpackage = os.path.join(self.pkgdir, 'spam') 407 subpackage2 = os.path.join(subpackage, 'ham') 408 subpackage3 = os.path.join(subpackage2, 'eggs') 409 for pkg in (subpackage, subpackage2, subpackage3): 410 script_helper.make_pkg(pkg) 411 412 subinitfn = os.path.join(subpackage, '__init__.py') 413 hamfn = script_helper.make_script(subpackage, 'ham', '') 414 spamfn = script_helper.make_script(subpackage2, 'spam', '') 415 eggfn = script_helper.make_script(subpackage3, 'egg', '') 416 417 self.assertRunOK('-q', '-r 0', self.pkgdir) 418 self.assertNotCompiled(subinitfn) 419 self.assertFalse( 420 os.path.exists(os.path.join(subpackage, '__pycache__'))) 421 422 self.assertRunOK('-q', '-r 1', self.pkgdir) 423 self.assertCompiled(subinitfn) 424 self.assertCompiled(hamfn) 425 self.assertNotCompiled(spamfn) 426 427 self.assertRunOK('-q', '-r 2', self.pkgdir) 428 self.assertCompiled(subinitfn) 429 self.assertCompiled(hamfn) 430 self.assertCompiled(spamfn) 431 self.assertNotCompiled(eggfn) 432 433 self.assertRunOK('-q', '-r 5', self.pkgdir) 434 self.assertCompiled(subinitfn) 435 self.assertCompiled(hamfn) 436 self.assertCompiled(spamfn) 437 self.assertCompiled(eggfn) 438 439 def test_quiet(self): 440 noisy = self.assertRunOK(self.pkgdir) 441 quiet = self.assertRunOK('-q', self.pkgdir) 442 self.assertNotEqual(b'', noisy) 443 self.assertEqual(b'', quiet) 444 445 def test_silent(self): 446 script_helper.make_script(self.pkgdir, 'crunchyfrog', 'bad(syntax') 447 _, quiet, _ = self.assertRunNotOK('-q', self.pkgdir) 448 _, silent, _ = self.assertRunNotOK('-qq', self.pkgdir) 449 self.assertNotEqual(b'', quiet) 450 self.assertEqual(b'', silent) 451 452 def test_regexp(self): 453 self.assertRunOK('-q', '-x', r'ba[^\\/]*$', self.pkgdir) 454 self.assertNotCompiled(self.barfn) 455 self.assertCompiled(self.initfn) 456 457 def test_multiple_dirs(self): 458 pkgdir2 = os.path.join(self.directory, 'foo2') 459 os.mkdir(pkgdir2) 460 init2fn = script_helper.make_script(pkgdir2, '__init__', '') 461 bar2fn = script_helper.make_script(pkgdir2, 'bar2', '') 462 self.assertRunOK('-q', self.pkgdir, pkgdir2) 463 self.assertCompiled(self.initfn) 464 self.assertCompiled(self.barfn) 465 self.assertCompiled(init2fn) 466 self.assertCompiled(bar2fn) 467 468 def test_d_compile_error(self): 469 script_helper.make_script(self.pkgdir, 'crunchyfrog', 'bad(syntax') 470 rc, out, err = self.assertRunNotOK('-q', '-d', 'dinsdale', self.pkgdir) 471 self.assertRegex(out, b'File "dinsdale') 472 473 def test_d_runtime_error(self): 474 bazfn = script_helper.make_script(self.pkgdir, 'baz', 'raise Exception') 475 self.assertRunOK('-q', '-d', 'dinsdale', self.pkgdir) 476 fn = script_helper.make_script(self.pkgdir, 'bing', 'import baz') 477 pyc = importlib.util.cache_from_source(bazfn) 478 os.rename(pyc, os.path.join(self.pkgdir, 'baz.pyc')) 479 os.remove(bazfn) 480 rc, out, err = script_helper.assert_python_failure(fn, __isolated=False) 481 self.assertRegex(err, b'File "dinsdale') 482 483 def test_include_bad_file(self): 484 rc, out, err = self.assertRunNotOK( 485 '-i', os.path.join(self.directory, 'nosuchfile'), self.pkgdir) 486 self.assertRegex(out, b'rror.*nosuchfile') 487 self.assertNotRegex(err, b'Traceback') 488 self.assertFalse(os.path.exists(importlib.util.cache_from_source( 489 self.pkgdir_cachedir))) 490 491 def test_include_file_with_arg(self): 492 f1 = script_helper.make_script(self.pkgdir, 'f1', '') 493 f2 = script_helper.make_script(self.pkgdir, 'f2', '') 494 f3 = script_helper.make_script(self.pkgdir, 'f3', '') 495 f4 = script_helper.make_script(self.pkgdir, 'f4', '') 496 with open(os.path.join(self.directory, 'l1'), 'w') as l1: 497 l1.write(os.path.join(self.pkgdir, 'f1.py')+os.linesep) 498 l1.write(os.path.join(self.pkgdir, 'f2.py')+os.linesep) 499 self.assertRunOK('-i', os.path.join(self.directory, 'l1'), f4) 500 self.assertCompiled(f1) 501 self.assertCompiled(f2) 502 self.assertNotCompiled(f3) 503 self.assertCompiled(f4) 504 505 def test_include_file_no_arg(self): 506 f1 = script_helper.make_script(self.pkgdir, 'f1', '') 507 f2 = script_helper.make_script(self.pkgdir, 'f2', '') 508 f3 = script_helper.make_script(self.pkgdir, 'f3', '') 509 f4 = script_helper.make_script(self.pkgdir, 'f4', '') 510 with open(os.path.join(self.directory, 'l1'), 'w') as l1: 511 l1.write(os.path.join(self.pkgdir, 'f2.py')+os.linesep) 512 self.assertRunOK('-i', os.path.join(self.directory, 'l1')) 513 self.assertNotCompiled(f1) 514 self.assertCompiled(f2) 515 self.assertNotCompiled(f3) 516 self.assertNotCompiled(f4) 517 518 def test_include_on_stdin(self): 519 f1 = script_helper.make_script(self.pkgdir, 'f1', '') 520 f2 = script_helper.make_script(self.pkgdir, 'f2', '') 521 f3 = script_helper.make_script(self.pkgdir, 'f3', '') 522 f4 = script_helper.make_script(self.pkgdir, 'f4', '') 523 p = script_helper.spawn_python(*(self._get_run_args(()) + ['-i', '-'])) 524 p.stdin.write((f3+os.linesep).encode('ascii')) 525 script_helper.kill_python(p) 526 self.assertNotCompiled(f1) 527 self.assertNotCompiled(f2) 528 self.assertCompiled(f3) 529 self.assertNotCompiled(f4) 530 531 def test_compiles_as_much_as_possible(self): 532 bingfn = script_helper.make_script(self.pkgdir, 'bing', 'syntax(error') 533 rc, out, err = self.assertRunNotOK('nosuchfile', self.initfn, 534 bingfn, self.barfn) 535 self.assertRegex(out, b'rror') 536 self.assertNotCompiled(bingfn) 537 self.assertCompiled(self.initfn) 538 self.assertCompiled(self.barfn) 539 540 def test_invalid_arg_produces_message(self): 541 out = self.assertRunOK('badfilename') 542 self.assertRegex(out, b"Can't list 'badfilename'") 543 544 def test_pyc_invalidation_mode(self): 545 script_helper.make_script(self.pkgdir, 'f1', '') 546 pyc = importlib.util.cache_from_source( 547 os.path.join(self.pkgdir, 'f1.py')) 548 self.assertRunOK('--invalidation-mode=checked-hash', self.pkgdir) 549 with open(pyc, 'rb') as fp: 550 data = fp.read() 551 self.assertEqual(int.from_bytes(data[4:8], 'little'), 0b11) 552 self.assertRunOK('--invalidation-mode=unchecked-hash', self.pkgdir) 553 with open(pyc, 'rb') as fp: 554 data = fp.read() 555 self.assertEqual(int.from_bytes(data[4:8], 'little'), 0b01) 556 557 @skipUnless(_have_multiprocessing, "requires multiprocessing") 558 def test_workers(self): 559 bar2fn = script_helper.make_script(self.directory, 'bar2', '') 560 files = [] 561 for suffix in range(5): 562 pkgdir = os.path.join(self.directory, 'foo{}'.format(suffix)) 563 os.mkdir(pkgdir) 564 fn = script_helper.make_script(pkgdir, '__init__', '') 565 files.append(script_helper.make_script(pkgdir, 'bar2', '')) 566 567 self.assertRunOK(self.directory, '-j', '0') 568 self.assertCompiled(bar2fn) 569 for file in files: 570 self.assertCompiled(file) 571 572 @mock.patch('compileall.compile_dir') 573 def test_workers_available_cores(self, compile_dir): 574 with mock.patch("sys.argv", 575 new=[sys.executable, self.directory, "-j0"]): 576 compileall.main() 577 self.assertTrue(compile_dir.called) 578 self.assertEqual(compile_dir.call_args[-1]['workers'], 0) 579 580 def _test_ddir_only(self, *, ddir, parallel=True): 581 """Recursive compile_dir ddir must contain package paths; bpo39769.""" 582 fullpath = ["test", "foo"] 583 path = self.directory 584 mods = [] 585 for subdir in fullpath: 586 path = os.path.join(path, subdir) 587 os.mkdir(path) 588 script_helper.make_script(path, "__init__", "") 589 mods.append(script_helper.make_script(path, "mod", 590 "def fn(): 1/0\nfn()\n")) 591 compileall.compile_dir( 592 self.directory, quiet=True, ddir=ddir, 593 workers=2 if parallel else 1) 594 self.assertTrue(mods) 595 for mod in mods: 596 self.assertTrue(mod.startswith(self.directory), mod) 597 modcode = importlib.util.cache_from_source(mod) 598 modpath = mod[len(self.directory+os.sep):] 599 _, _, err = script_helper.assert_python_failure(modcode) 600 expected_in = os.path.join(ddir, modpath) 601 mod_code_obj = test.test_importlib.util._get_code_from_pyc(modcode) 602 self.assertEqual(mod_code_obj.co_filename, expected_in) 603 self.assertIn(f'"{expected_in}"', os.fsdecode(err)) 604 605 def test_ddir_only_one_worker(self): 606 """Recursive compile_dir ddir= contains package paths; bpo39769.""" 607 return self._test_ddir_only(ddir="<a prefix>", parallel=False) 608 609 def test_ddir_multiple_workers(self): 610 """Recursive compile_dir ddir= contains package paths; bpo39769.""" 611 return self._test_ddir_only(ddir="<a prefix>", parallel=True) 612 613 def test_ddir_empty_only_one_worker(self): 614 """Recursive compile_dir ddir='' contains package paths; bpo39769.""" 615 return self._test_ddir_only(ddir="", parallel=False) 616 617 def test_ddir_empty_multiple_workers(self): 618 """Recursive compile_dir ddir='' contains package paths; bpo39769.""" 619 return self._test_ddir_only(ddir="", parallel=True) 620 621 622class CommmandLineTestsWithSourceEpoch(CommandLineTestsBase, 623 unittest.TestCase, 624 metaclass=SourceDateEpochTestMeta, 625 source_date_epoch=True): 626 pass 627 628 629class CommmandLineTestsNoSourceEpoch(CommandLineTestsBase, 630 unittest.TestCase, 631 metaclass=SourceDateEpochTestMeta, 632 source_date_epoch=False): 633 pass 634 635 636 637if __name__ == "__main__": 638 unittest.main() 639