1import compileall
2import contextlib
3import filecmp
4import importlib.util
5import io
6import itertools
7import os
8import pathlib
9import py_compile
10import shutil
11import struct
12import sys
13import tempfile
14import test.test_importlib.util
15import time
16import unittest
17
18from unittest import mock, skipUnless
19from concurrent.futures import ProcessPoolExecutor
20try:
21    # compileall relies on ProcessPoolExecutor if ProcessPoolExecutor exists
22    # and it can function.
23    from concurrent.futures.process import _check_system_limits
24    _check_system_limits()
25    _have_multiprocessing = True
26except NotImplementedError:
27    _have_multiprocessing = False
28
29from test import support
30from test.support import os_helper
31from test.support import script_helper
32
33from .test_py_compile import without_source_date_epoch
34from .test_py_compile import SourceDateEpochTestMeta
35
36
37def get_pyc(script, opt):
38    if not opt:
39        # Replace None and 0 with ''
40        opt = ''
41    return importlib.util.cache_from_source(script, optimization=opt)
42
43
44def get_pycs(script):
45    return [get_pyc(script, opt) for opt in (0, 1, 2)]
46
47
48def is_hardlink(filename1, filename2):
49    """Returns True if two files have the same inode (hardlink)"""
50    inode1 = os.stat(filename1).st_ino
51    inode2 = os.stat(filename2).st_ino
52    return inode1 == inode2
53
54
55class CompileallTestsBase:
56
57    def setUp(self):
58        self.directory = tempfile.mkdtemp()
59        self.source_path = os.path.join(self.directory, '_test.py')
60        self.bc_path = importlib.util.cache_from_source(self.source_path)
61        with open(self.source_path, 'w', encoding="utf-8") as file:
62            file.write('x = 123\n')
63        self.source_path2 = os.path.join(self.directory, '_test2.py')
64        self.bc_path2 = importlib.util.cache_from_source(self.source_path2)
65        shutil.copyfile(self.source_path, self.source_path2)
66        self.subdirectory = os.path.join(self.directory, '_subdir')
67        os.mkdir(self.subdirectory)
68        self.source_path3 = os.path.join(self.subdirectory, '_test3.py')
69        shutil.copyfile(self.source_path, self.source_path3)
70
71    def tearDown(self):
72        shutil.rmtree(self.directory)
73
74    def add_bad_source_file(self):
75        self.bad_source_path = os.path.join(self.directory, '_test_bad.py')
76        with open(self.bad_source_path, 'w', encoding="utf-8") as file:
77            file.write('x (\n')
78
79    def timestamp_metadata(self):
80        with open(self.bc_path, 'rb') as file:
81            data = file.read(12)
82        mtime = int(os.stat(self.source_path).st_mtime)
83        compare = struct.pack('<4sLL', importlib.util.MAGIC_NUMBER, 0,
84                              mtime & 0xFFFF_FFFF)
85        return data, compare
86
87    def test_year_2038_mtime_compilation(self):
88        # Test to make sure we can handle mtimes larger than what a 32-bit
89        # signed number can hold as part of bpo-34990
90        try:
91            os.utime(self.source_path, (2**32 - 1, 2**32 - 1))
92        except (OverflowError, OSError):
93            self.skipTest("filesystem doesn't support timestamps near 2**32")
94        with contextlib.redirect_stdout(io.StringIO()):
95            self.assertTrue(compileall.compile_file(self.source_path))
96
97    def test_larger_than_32_bit_times(self):
98        # This is similar to the test above but we skip it if the OS doesn't
99        # support modification times larger than 32-bits.
100        try:
101            os.utime(self.source_path, (2**35, 2**35))
102        except (OverflowError, OSError):
103            self.skipTest("filesystem doesn't support large timestamps")
104        with contextlib.redirect_stdout(io.StringIO()):
105            self.assertTrue(compileall.compile_file(self.source_path))
106
107    def recreation_check(self, metadata):
108        """Check that compileall recreates bytecode when the new metadata is
109        used."""
110        if os.environ.get('SOURCE_DATE_EPOCH'):
111            raise unittest.SkipTest('SOURCE_DATE_EPOCH is set')
112        py_compile.compile(self.source_path)
113        self.assertEqual(*self.timestamp_metadata())
114        with open(self.bc_path, 'rb') as file:
115            bc = file.read()[len(metadata):]
116        with open(self.bc_path, 'wb') as file:
117            file.write(metadata)
118            file.write(bc)
119        self.assertNotEqual(*self.timestamp_metadata())
120        compileall.compile_dir(self.directory, force=False, quiet=True)
121        self.assertTrue(*self.timestamp_metadata())
122
123    def test_mtime(self):
124        # Test a change in mtime leads to a new .pyc.
125        self.recreation_check(struct.pack('<4sLL', importlib.util.MAGIC_NUMBER,
126                                          0, 1))
127
128    def test_magic_number(self):
129        # Test a change in mtime leads to a new .pyc.
130        self.recreation_check(b'\0\0\0\0')
131
132    def test_compile_files(self):
133        # Test compiling a single file, and complete directory
134        for fn in (self.bc_path, self.bc_path2):
135            try:
136                os.unlink(fn)
137            except:
138                pass
139        self.assertTrue(compileall.compile_file(self.source_path,
140                                                force=False, quiet=True))
141        self.assertTrue(os.path.isfile(self.bc_path) and
142                        not os.path.isfile(self.bc_path2))
143        os.unlink(self.bc_path)
144        self.assertTrue(compileall.compile_dir(self.directory, force=False,
145                                               quiet=True))
146        self.assertTrue(os.path.isfile(self.bc_path) and
147                        os.path.isfile(self.bc_path2))
148        os.unlink(self.bc_path)
149        os.unlink(self.bc_path2)
150        # Test against bad files
151        self.add_bad_source_file()
152        self.assertFalse(compileall.compile_file(self.bad_source_path,
153                                                 force=False, quiet=2))
154        self.assertFalse(compileall.compile_dir(self.directory,
155                                                force=False, quiet=2))
156
157    def test_compile_file_pathlike(self):
158        self.assertFalse(os.path.isfile(self.bc_path))
159        # we should also test the output
160        with support.captured_stdout() as stdout:
161            self.assertTrue(compileall.compile_file(pathlib.Path(self.source_path)))
162        self.assertRegex(stdout.getvalue(), r'Compiling ([^WindowsPath|PosixPath].*)')
163        self.assertTrue(os.path.isfile(self.bc_path))
164
165    def test_compile_file_pathlike_ddir(self):
166        self.assertFalse(os.path.isfile(self.bc_path))
167        self.assertTrue(compileall.compile_file(pathlib.Path(self.source_path),
168                                                ddir=pathlib.Path('ddir_path'),
169                                                quiet=2))
170        self.assertTrue(os.path.isfile(self.bc_path))
171
172    def test_compile_path(self):
173        with test.test_importlib.util.import_state(path=[self.directory]):
174            self.assertTrue(compileall.compile_path(quiet=2))
175
176        with test.test_importlib.util.import_state(path=[self.directory]):
177            self.add_bad_source_file()
178            self.assertFalse(compileall.compile_path(skip_curdir=False,
179                                                     force=True, quiet=2))
180
181    def test_no_pycache_in_non_package(self):
182        # Bug 8563 reported that __pycache__ directories got created by
183        # compile_file() for non-.py files.
184        data_dir = os.path.join(self.directory, 'data')
185        data_file = os.path.join(data_dir, 'file')
186        os.mkdir(data_dir)
187        # touch data/file
188        with open(data_file, 'wb'):
189            pass
190        compileall.compile_file(data_file)
191        self.assertFalse(os.path.exists(os.path.join(data_dir, '__pycache__')))
192
193
194    def test_compile_file_encoding_fallback(self):
195        # Bug 44666 reported that compile_file failed when sys.stdout.encoding is None
196        self.add_bad_source_file()
197        with contextlib.redirect_stdout(io.StringIO()):
198            self.assertFalse(compileall.compile_file(self.bad_source_path))
199
200
201    def test_optimize(self):
202        # make sure compiling with different optimization settings than the
203        # interpreter's creates the correct file names
204        optimize, opt = (1, 1) if __debug__ else (0, '')
205        compileall.compile_dir(self.directory, quiet=True, optimize=optimize)
206        cached = importlib.util.cache_from_source(self.source_path,
207                                                  optimization=opt)
208        self.assertTrue(os.path.isfile(cached))
209        cached2 = importlib.util.cache_from_source(self.source_path2,
210                                                   optimization=opt)
211        self.assertTrue(os.path.isfile(cached2))
212        cached3 = importlib.util.cache_from_source(self.source_path3,
213                                                   optimization=opt)
214        self.assertTrue(os.path.isfile(cached3))
215
216    def test_compile_dir_pathlike(self):
217        self.assertFalse(os.path.isfile(self.bc_path))
218        with support.captured_stdout() as stdout:
219            compileall.compile_dir(pathlib.Path(self.directory))
220        line = stdout.getvalue().splitlines()[0]
221        self.assertRegex(line, r'Listing ([^WindowsPath|PosixPath].*)')
222        self.assertTrue(os.path.isfile(self.bc_path))
223
224    @skipUnless(_have_multiprocessing, "requires multiprocessing")
225    @mock.patch('concurrent.futures.ProcessPoolExecutor')
226    def test_compile_pool_called(self, pool_mock):
227        compileall.compile_dir(self.directory, quiet=True, workers=5)
228        self.assertTrue(pool_mock.called)
229
230    def test_compile_workers_non_positive(self):
231        with self.assertRaisesRegex(ValueError,
232                                    "workers must be greater or equal to 0"):
233            compileall.compile_dir(self.directory, workers=-1)
234
235    @skipUnless(_have_multiprocessing, "requires multiprocessing")
236    @mock.patch('concurrent.futures.ProcessPoolExecutor')
237    def test_compile_workers_cpu_count(self, pool_mock):
238        compileall.compile_dir(self.directory, quiet=True, workers=0)
239        self.assertEqual(pool_mock.call_args[1]['max_workers'], None)
240
241    @skipUnless(_have_multiprocessing, "requires multiprocessing")
242    @mock.patch('concurrent.futures.ProcessPoolExecutor')
243    @mock.patch('compileall.compile_file')
244    def test_compile_one_worker(self, compile_file_mock, pool_mock):
245        compileall.compile_dir(self.directory, quiet=True)
246        self.assertFalse(pool_mock.called)
247        self.assertTrue(compile_file_mock.called)
248
249    @mock.patch('concurrent.futures.ProcessPoolExecutor', new=None)
250    @mock.patch('compileall.compile_file')
251    def test_compile_missing_multiprocessing(self, compile_file_mock):
252        compileall.compile_dir(self.directory, quiet=True, workers=5)
253        self.assertTrue(compile_file_mock.called)
254
255    def test_compile_dir_maxlevels(self):
256        # Test the actual impact of maxlevels parameter
257        depth = 3
258        path = self.directory
259        for i in range(1, depth + 1):
260            path = os.path.join(path, f"dir_{i}")
261            source = os.path.join(path, 'script.py')
262            os.mkdir(path)
263            shutil.copyfile(self.source_path, source)
264        pyc_filename = importlib.util.cache_from_source(source)
265
266        compileall.compile_dir(self.directory, quiet=True, maxlevels=depth - 1)
267        self.assertFalse(os.path.isfile(pyc_filename))
268
269        compileall.compile_dir(self.directory, quiet=True, maxlevels=depth)
270        self.assertTrue(os.path.isfile(pyc_filename))
271
272    def _test_ddir_only(self, *, ddir, parallel=True):
273        """Recursive compile_dir ddir must contain package paths; bpo39769."""
274        fullpath = ["test", "foo"]
275        path = self.directory
276        mods = []
277        for subdir in fullpath:
278            path = os.path.join(path, subdir)
279            os.mkdir(path)
280            script_helper.make_script(path, "__init__", "")
281            mods.append(script_helper.make_script(path, "mod",
282                                                  "def fn(): 1/0\nfn()\n"))
283        compileall.compile_dir(
284                self.directory, quiet=True, ddir=ddir,
285                workers=2 if parallel else 1)
286        self.assertTrue(mods)
287        for mod in mods:
288            self.assertTrue(mod.startswith(self.directory), mod)
289            modcode = importlib.util.cache_from_source(mod)
290            modpath = mod[len(self.directory+os.sep):]
291            _, _, err = script_helper.assert_python_failure(modcode)
292            expected_in = os.path.join(ddir, modpath)
293            mod_code_obj = test.test_importlib.util.get_code_from_pyc(modcode)
294            self.assertEqual(mod_code_obj.co_filename, expected_in)
295            self.assertIn(f'"{expected_in}"', os.fsdecode(err))
296
297    def test_ddir_only_one_worker(self):
298        """Recursive compile_dir ddir= contains package paths; bpo39769."""
299        return self._test_ddir_only(ddir="<a prefix>", parallel=False)
300
301    def test_ddir_multiple_workers(self):
302        """Recursive compile_dir ddir= contains package paths; bpo39769."""
303        return self._test_ddir_only(ddir="<a prefix>", parallel=True)
304
305    def test_ddir_empty_only_one_worker(self):
306        """Recursive compile_dir ddir='' contains package paths; bpo39769."""
307        return self._test_ddir_only(ddir="", parallel=False)
308
309    def test_ddir_empty_multiple_workers(self):
310        """Recursive compile_dir ddir='' contains package paths; bpo39769."""
311        return self._test_ddir_only(ddir="", parallel=True)
312
313    def test_strip_only(self):
314        fullpath = ["test", "build", "real", "path"]
315        path = os.path.join(self.directory, *fullpath)
316        os.makedirs(path)
317        script = script_helper.make_script(path, "test", "1 / 0")
318        bc = importlib.util.cache_from_source(script)
319        stripdir = os.path.join(self.directory, *fullpath[:2])
320        compileall.compile_dir(path, quiet=True, stripdir=stripdir)
321        rc, out, err = script_helper.assert_python_failure(bc)
322        expected_in = os.path.join(*fullpath[2:])
323        self.assertIn(
324            expected_in,
325            str(err, encoding=sys.getdefaultencoding())
326        )
327        self.assertNotIn(
328            stripdir,
329            str(err, encoding=sys.getdefaultencoding())
330        )
331
332    def test_prepend_only(self):
333        fullpath = ["test", "build", "real", "path"]
334        path = os.path.join(self.directory, *fullpath)
335        os.makedirs(path)
336        script = script_helper.make_script(path, "test", "1 / 0")
337        bc = importlib.util.cache_from_source(script)
338        prependdir = "/foo"
339        compileall.compile_dir(path, quiet=True, prependdir=prependdir)
340        rc, out, err = script_helper.assert_python_failure(bc)
341        expected_in = os.path.join(prependdir, self.directory, *fullpath)
342        self.assertIn(
343            expected_in,
344            str(err, encoding=sys.getdefaultencoding())
345        )
346
347    def test_strip_and_prepend(self):
348        fullpath = ["test", "build", "real", "path"]
349        path = os.path.join(self.directory, *fullpath)
350        os.makedirs(path)
351        script = script_helper.make_script(path, "test", "1 / 0")
352        bc = importlib.util.cache_from_source(script)
353        stripdir = os.path.join(self.directory, *fullpath[:2])
354        prependdir = "/foo"
355        compileall.compile_dir(path, quiet=True,
356                               stripdir=stripdir, prependdir=prependdir)
357        rc, out, err = script_helper.assert_python_failure(bc)
358        expected_in = os.path.join(prependdir, *fullpath[2:])
359        self.assertIn(
360            expected_in,
361            str(err, encoding=sys.getdefaultencoding())
362        )
363        self.assertNotIn(
364            stripdir,
365            str(err, encoding=sys.getdefaultencoding())
366        )
367
368    def test_strip_prepend_and_ddir(self):
369        fullpath = ["test", "build", "real", "path", "ddir"]
370        path = os.path.join(self.directory, *fullpath)
371        os.makedirs(path)
372        script_helper.make_script(path, "test", "1 / 0")
373        with self.assertRaises(ValueError):
374            compileall.compile_dir(path, quiet=True, ddir="/bar",
375                                   stripdir="/foo", prependdir="/bar")
376
377    def test_multiple_optimization_levels(self):
378        script = script_helper.make_script(self.directory,
379                                           "test_optimization",
380                                           "a = 0")
381        bc = []
382        for opt_level in "", 1, 2, 3:
383            bc.append(importlib.util.cache_from_source(script,
384                                                       optimization=opt_level))
385        test_combinations = [[0, 1], [1, 2], [0, 2], [0, 1, 2]]
386        for opt_combination in test_combinations:
387            compileall.compile_file(script, quiet=True,
388                                    optimize=opt_combination)
389            for opt_level in opt_combination:
390                self.assertTrue(os.path.isfile(bc[opt_level]))
391                try:
392                    os.unlink(bc[opt_level])
393                except Exception:
394                    pass
395
396    @os_helper.skip_unless_symlink
397    def test_ignore_symlink_destination(self):
398        # Create folders for allowed files, symlinks and prohibited area
399        allowed_path = os.path.join(self.directory, "test", "dir", "allowed")
400        symlinks_path = os.path.join(self.directory, "test", "dir", "symlinks")
401        prohibited_path = os.path.join(self.directory, "test", "dir", "prohibited")
402        os.makedirs(allowed_path)
403        os.makedirs(symlinks_path)
404        os.makedirs(prohibited_path)
405
406        # Create scripts and symlinks and remember their byte-compiled versions
407        allowed_script = script_helper.make_script(allowed_path, "test_allowed", "a = 0")
408        prohibited_script = script_helper.make_script(prohibited_path, "test_prohibited", "a = 0")
409        allowed_symlink = os.path.join(symlinks_path, "test_allowed.py")
410        prohibited_symlink = os.path.join(symlinks_path, "test_prohibited.py")
411        os.symlink(allowed_script, allowed_symlink)
412        os.symlink(prohibited_script, prohibited_symlink)
413        allowed_bc = importlib.util.cache_from_source(allowed_symlink)
414        prohibited_bc = importlib.util.cache_from_source(prohibited_symlink)
415
416        compileall.compile_dir(symlinks_path, quiet=True, limit_sl_dest=allowed_path)
417
418        self.assertTrue(os.path.isfile(allowed_bc))
419        self.assertFalse(os.path.isfile(prohibited_bc))
420
421
422class CompileallTestsWithSourceEpoch(CompileallTestsBase,
423                                     unittest.TestCase,
424                                     metaclass=SourceDateEpochTestMeta,
425                                     source_date_epoch=True):
426    pass
427
428
429class CompileallTestsWithoutSourceEpoch(CompileallTestsBase,
430                                        unittest.TestCase,
431                                        metaclass=SourceDateEpochTestMeta,
432                                        source_date_epoch=False):
433    pass
434
435
436class EncodingTest(unittest.TestCase):
437    """Issue 6716: compileall should escape source code when printing errors
438    to stdout."""
439
440    def setUp(self):
441        self.directory = tempfile.mkdtemp()
442        self.source_path = os.path.join(self.directory, '_test.py')
443        with open(self.source_path, 'w', encoding='utf-8') as file:
444            file.write('# -*- coding: utf-8 -*-\n')
445            file.write('print u"\u20ac"\n')
446
447    def tearDown(self):
448        shutil.rmtree(self.directory)
449
450    def test_error(self):
451        try:
452            orig_stdout = sys.stdout
453            sys.stdout = io.TextIOWrapper(io.BytesIO(),encoding='ascii')
454            compileall.compile_dir(self.directory)
455        finally:
456            sys.stdout = orig_stdout
457
458
459class CommandLineTestsBase:
460    """Test compileall's CLI."""
461
462    @classmethod
463    def setUpClass(cls):
464        for path in filter(os.path.isdir, sys.path):
465            directory_created = False
466            directory = pathlib.Path(path) / '__pycache__'
467            path = directory / 'test.try'
468            try:
469                if not directory.is_dir():
470                    directory.mkdir()
471                    directory_created = True
472                path.write_text('# for test_compileall', encoding="utf-8")
473            except OSError:
474                sys_path_writable = False
475                break
476            finally:
477                os_helper.unlink(str(path))
478                if directory_created:
479                    directory.rmdir()
480        else:
481            sys_path_writable = True
482        cls._sys_path_writable = sys_path_writable
483
484    def _skip_if_sys_path_not_writable(self):
485        if not self._sys_path_writable:
486            raise unittest.SkipTest('not all entries on sys.path are writable')
487
488    def _get_run_args(self, args):
489        return [*support.optim_args_from_interpreter_flags(),
490                '-S', '-m', 'compileall',
491                *args]
492
493    def assertRunOK(self, *args, **env_vars):
494        rc, out, err = script_helper.assert_python_ok(
495                         *self._get_run_args(args), **env_vars,
496                         PYTHONIOENCODING='utf-8')
497        self.assertEqual(b'', err)
498        return out
499
500    def assertRunNotOK(self, *args, **env_vars):
501        rc, out, err = script_helper.assert_python_failure(
502                        *self._get_run_args(args), **env_vars,
503                        PYTHONIOENCODING='utf-8')
504        return rc, out, err
505
506    def assertCompiled(self, fn):
507        path = importlib.util.cache_from_source(fn)
508        self.assertTrue(os.path.exists(path))
509
510    def assertNotCompiled(self, fn):
511        path = importlib.util.cache_from_source(fn)
512        self.assertFalse(os.path.exists(path))
513
514    def setUp(self):
515        self.directory = tempfile.mkdtemp()
516        self.addCleanup(os_helper.rmtree, self.directory)
517        self.pkgdir = os.path.join(self.directory, 'foo')
518        os.mkdir(self.pkgdir)
519        self.pkgdir_cachedir = os.path.join(self.pkgdir, '__pycache__')
520        # Create the __init__.py and a package module.
521        self.initfn = script_helper.make_script(self.pkgdir, '__init__', '')
522        self.barfn = script_helper.make_script(self.pkgdir, 'bar', '')
523
524    def test_no_args_compiles_path(self):
525        # Note that -l is implied for the no args case.
526        self._skip_if_sys_path_not_writable()
527        bazfn = script_helper.make_script(self.directory, 'baz', '')
528        self.assertRunOK(PYTHONPATH=self.directory)
529        self.assertCompiled(bazfn)
530        self.assertNotCompiled(self.initfn)
531        self.assertNotCompiled(self.barfn)
532
533    @without_source_date_epoch  # timestamp invalidation test
534    def test_no_args_respects_force_flag(self):
535        self._skip_if_sys_path_not_writable()
536        bazfn = script_helper.make_script(self.directory, 'baz', '')
537        self.assertRunOK(PYTHONPATH=self.directory)
538        pycpath = importlib.util.cache_from_source(bazfn)
539        # Set atime/mtime backward to avoid file timestamp resolution issues
540        os.utime(pycpath, (time.time()-60,)*2)
541        mtime = os.stat(pycpath).st_mtime
542        # Without force, no recompilation
543        self.assertRunOK(PYTHONPATH=self.directory)
544        mtime2 = os.stat(pycpath).st_mtime
545        self.assertEqual(mtime, mtime2)
546        # Now force it.
547        self.assertRunOK('-f', PYTHONPATH=self.directory)
548        mtime2 = os.stat(pycpath).st_mtime
549        self.assertNotEqual(mtime, mtime2)
550
551    def test_no_args_respects_quiet_flag(self):
552        self._skip_if_sys_path_not_writable()
553        script_helper.make_script(self.directory, 'baz', '')
554        noisy = self.assertRunOK(PYTHONPATH=self.directory)
555        self.assertIn(b'Listing ', noisy)
556        quiet = self.assertRunOK('-q', PYTHONPATH=self.directory)
557        self.assertNotIn(b'Listing ', quiet)
558
559    # Ensure that the default behavior of compileall's CLI is to create
560    # PEP 3147/PEP 488 pyc files.
561    for name, ext, switch in [
562        ('normal', 'pyc', []),
563        ('optimize', 'opt-1.pyc', ['-O']),
564        ('doubleoptimize', 'opt-2.pyc', ['-OO']),
565    ]:
566        def f(self, ext=ext, switch=switch):
567            script_helper.assert_python_ok(*(switch +
568                ['-m', 'compileall', '-q', self.pkgdir]))
569            # Verify the __pycache__ directory contents.
570            self.assertTrue(os.path.exists(self.pkgdir_cachedir))
571            expected = sorted(base.format(sys.implementation.cache_tag, ext)
572                              for base in ('__init__.{}.{}', 'bar.{}.{}'))
573            self.assertEqual(sorted(os.listdir(self.pkgdir_cachedir)), expected)
574            # Make sure there are no .pyc files in the source directory.
575            self.assertFalse([fn for fn in os.listdir(self.pkgdir)
576                              if fn.endswith(ext)])
577        locals()['test_pep3147_paths_' + name] = f
578
579    def test_legacy_paths(self):
580        # Ensure that with the proper switch, compileall leaves legacy
581        # pyc files, and no __pycache__ directory.
582        self.assertRunOK('-b', '-q', self.pkgdir)
583        # Verify the __pycache__ directory contents.
584        self.assertFalse(os.path.exists(self.pkgdir_cachedir))
585        expected = sorted(['__init__.py', '__init__.pyc', 'bar.py',
586                           'bar.pyc'])
587        self.assertEqual(sorted(os.listdir(self.pkgdir)), expected)
588
589    def test_multiple_runs(self):
590        # Bug 8527 reported that multiple calls produced empty
591        # __pycache__/__pycache__ directories.
592        self.assertRunOK('-q', self.pkgdir)
593        # Verify the __pycache__ directory contents.
594        self.assertTrue(os.path.exists(self.pkgdir_cachedir))
595        cachecachedir = os.path.join(self.pkgdir_cachedir, '__pycache__')
596        self.assertFalse(os.path.exists(cachecachedir))
597        # Call compileall again.
598        self.assertRunOK('-q', self.pkgdir)
599        self.assertTrue(os.path.exists(self.pkgdir_cachedir))
600        self.assertFalse(os.path.exists(cachecachedir))
601
602    @without_source_date_epoch  # timestamp invalidation test
603    def test_force(self):
604        self.assertRunOK('-q', self.pkgdir)
605        pycpath = importlib.util.cache_from_source(self.barfn)
606        # set atime/mtime backward to avoid file timestamp resolution issues
607        os.utime(pycpath, (time.time()-60,)*2)
608        mtime = os.stat(pycpath).st_mtime
609        # without force, no recompilation
610        self.assertRunOK('-q', self.pkgdir)
611        mtime2 = os.stat(pycpath).st_mtime
612        self.assertEqual(mtime, mtime2)
613        # now force it.
614        self.assertRunOK('-q', '-f', self.pkgdir)
615        mtime2 = os.stat(pycpath).st_mtime
616        self.assertNotEqual(mtime, mtime2)
617
618    def test_recursion_control(self):
619        subpackage = os.path.join(self.pkgdir, 'spam')
620        os.mkdir(subpackage)
621        subinitfn = script_helper.make_script(subpackage, '__init__', '')
622        hamfn = script_helper.make_script(subpackage, 'ham', '')
623        self.assertRunOK('-q', '-l', self.pkgdir)
624        self.assertNotCompiled(subinitfn)
625        self.assertFalse(os.path.exists(os.path.join(subpackage, '__pycache__')))
626        self.assertRunOK('-q', self.pkgdir)
627        self.assertCompiled(subinitfn)
628        self.assertCompiled(hamfn)
629
630    def test_recursion_limit(self):
631        subpackage = os.path.join(self.pkgdir, 'spam')
632        subpackage2 = os.path.join(subpackage, 'ham')
633        subpackage3 = os.path.join(subpackage2, 'eggs')
634        for pkg in (subpackage, subpackage2, subpackage3):
635            script_helper.make_pkg(pkg)
636
637        subinitfn = os.path.join(subpackage, '__init__.py')
638        hamfn = script_helper.make_script(subpackage, 'ham', '')
639        spamfn = script_helper.make_script(subpackage2, 'spam', '')
640        eggfn = script_helper.make_script(subpackage3, 'egg', '')
641
642        self.assertRunOK('-q', '-r 0', self.pkgdir)
643        self.assertNotCompiled(subinitfn)
644        self.assertFalse(
645            os.path.exists(os.path.join(subpackage, '__pycache__')))
646
647        self.assertRunOK('-q', '-r 1', self.pkgdir)
648        self.assertCompiled(subinitfn)
649        self.assertCompiled(hamfn)
650        self.assertNotCompiled(spamfn)
651
652        self.assertRunOK('-q', '-r 2', self.pkgdir)
653        self.assertCompiled(subinitfn)
654        self.assertCompiled(hamfn)
655        self.assertCompiled(spamfn)
656        self.assertNotCompiled(eggfn)
657
658        self.assertRunOK('-q', '-r 5', self.pkgdir)
659        self.assertCompiled(subinitfn)
660        self.assertCompiled(hamfn)
661        self.assertCompiled(spamfn)
662        self.assertCompiled(eggfn)
663
664    @os_helper.skip_unless_symlink
665    def test_symlink_loop(self):
666        # Currently, compileall ignores symlinks to directories.
667        # If that limitation is ever lifted, it should protect against
668        # recursion in symlink loops.
669        pkg = os.path.join(self.pkgdir, 'spam')
670        script_helper.make_pkg(pkg)
671        os.symlink('.', os.path.join(pkg, 'evil'))
672        os.symlink('.', os.path.join(pkg, 'evil2'))
673        self.assertRunOK('-q', self.pkgdir)
674        self.assertCompiled(os.path.join(
675            self.pkgdir, 'spam', 'evil', 'evil2', '__init__.py'
676        ))
677
678    def test_quiet(self):
679        noisy = self.assertRunOK(self.pkgdir)
680        quiet = self.assertRunOK('-q', self.pkgdir)
681        self.assertNotEqual(b'', noisy)
682        self.assertEqual(b'', quiet)
683
684    def test_silent(self):
685        script_helper.make_script(self.pkgdir, 'crunchyfrog', 'bad(syntax')
686        _, quiet, _ = self.assertRunNotOK('-q', self.pkgdir)
687        _, silent, _ = self.assertRunNotOK('-qq', self.pkgdir)
688        self.assertNotEqual(b'', quiet)
689        self.assertEqual(b'', silent)
690
691    def test_regexp(self):
692        self.assertRunOK('-q', '-x', r'ba[^\\/]*$', self.pkgdir)
693        self.assertNotCompiled(self.barfn)
694        self.assertCompiled(self.initfn)
695
696    def test_multiple_dirs(self):
697        pkgdir2 = os.path.join(self.directory, 'foo2')
698        os.mkdir(pkgdir2)
699        init2fn = script_helper.make_script(pkgdir2, '__init__', '')
700        bar2fn = script_helper.make_script(pkgdir2, 'bar2', '')
701        self.assertRunOK('-q', self.pkgdir, pkgdir2)
702        self.assertCompiled(self.initfn)
703        self.assertCompiled(self.barfn)
704        self.assertCompiled(init2fn)
705        self.assertCompiled(bar2fn)
706
707    def test_d_compile_error(self):
708        script_helper.make_script(self.pkgdir, 'crunchyfrog', 'bad(syntax')
709        rc, out, err = self.assertRunNotOK('-q', '-d', 'dinsdale', self.pkgdir)
710        self.assertRegex(out, b'File "dinsdale')
711
712    def test_d_runtime_error(self):
713        bazfn = script_helper.make_script(self.pkgdir, 'baz', 'raise Exception')
714        self.assertRunOK('-q', '-d', 'dinsdale', self.pkgdir)
715        fn = script_helper.make_script(self.pkgdir, 'bing', 'import baz')
716        pyc = importlib.util.cache_from_source(bazfn)
717        os.rename(pyc, os.path.join(self.pkgdir, 'baz.pyc'))
718        os.remove(bazfn)
719        rc, out, err = script_helper.assert_python_failure(fn, __isolated=False)
720        self.assertRegex(err, b'File "dinsdale')
721
722    def test_include_bad_file(self):
723        rc, out, err = self.assertRunNotOK(
724            '-i', os.path.join(self.directory, 'nosuchfile'), self.pkgdir)
725        self.assertRegex(out, b'rror.*nosuchfile')
726        self.assertNotRegex(err, b'Traceback')
727        self.assertFalse(os.path.exists(importlib.util.cache_from_source(
728                                            self.pkgdir_cachedir)))
729
730    def test_include_file_with_arg(self):
731        f1 = script_helper.make_script(self.pkgdir, 'f1', '')
732        f2 = script_helper.make_script(self.pkgdir, 'f2', '')
733        f3 = script_helper.make_script(self.pkgdir, 'f3', '')
734        f4 = script_helper.make_script(self.pkgdir, 'f4', '')
735        with open(os.path.join(self.directory, 'l1'), 'w', encoding="utf-8") as l1:
736            l1.write(os.path.join(self.pkgdir, 'f1.py')+os.linesep)
737            l1.write(os.path.join(self.pkgdir, 'f2.py')+os.linesep)
738        self.assertRunOK('-i', os.path.join(self.directory, 'l1'), f4)
739        self.assertCompiled(f1)
740        self.assertCompiled(f2)
741        self.assertNotCompiled(f3)
742        self.assertCompiled(f4)
743
744    def test_include_file_no_arg(self):
745        f1 = script_helper.make_script(self.pkgdir, 'f1', '')
746        f2 = script_helper.make_script(self.pkgdir, 'f2', '')
747        f3 = script_helper.make_script(self.pkgdir, 'f3', '')
748        f4 = script_helper.make_script(self.pkgdir, 'f4', '')
749        with open(os.path.join(self.directory, 'l1'), 'w', encoding="utf-8") as l1:
750            l1.write(os.path.join(self.pkgdir, 'f2.py')+os.linesep)
751        self.assertRunOK('-i', os.path.join(self.directory, 'l1'))
752        self.assertNotCompiled(f1)
753        self.assertCompiled(f2)
754        self.assertNotCompiled(f3)
755        self.assertNotCompiled(f4)
756
757    def test_include_on_stdin(self):
758        f1 = script_helper.make_script(self.pkgdir, 'f1', '')
759        f2 = script_helper.make_script(self.pkgdir, 'f2', '')
760        f3 = script_helper.make_script(self.pkgdir, 'f3', '')
761        f4 = script_helper.make_script(self.pkgdir, 'f4', '')
762        p = script_helper.spawn_python(*(self._get_run_args(()) + ['-i', '-']))
763        p.stdin.write((f3+os.linesep).encode('ascii'))
764        script_helper.kill_python(p)
765        self.assertNotCompiled(f1)
766        self.assertNotCompiled(f2)
767        self.assertCompiled(f3)
768        self.assertNotCompiled(f4)
769
770    def test_compiles_as_much_as_possible(self):
771        bingfn = script_helper.make_script(self.pkgdir, 'bing', 'syntax(error')
772        rc, out, err = self.assertRunNotOK('nosuchfile', self.initfn,
773                                           bingfn, self.barfn)
774        self.assertRegex(out, b'rror')
775        self.assertNotCompiled(bingfn)
776        self.assertCompiled(self.initfn)
777        self.assertCompiled(self.barfn)
778
779    def test_invalid_arg_produces_message(self):
780        out = self.assertRunOK('badfilename')
781        self.assertRegex(out, b"Can't list 'badfilename'")
782
783    def test_pyc_invalidation_mode(self):
784        script_helper.make_script(self.pkgdir, 'f1', '')
785        pyc = importlib.util.cache_from_source(
786            os.path.join(self.pkgdir, 'f1.py'))
787        self.assertRunOK('--invalidation-mode=checked-hash', self.pkgdir)
788        with open(pyc, 'rb') as fp:
789            data = fp.read()
790        self.assertEqual(int.from_bytes(data[4:8], 'little'), 0b11)
791        self.assertRunOK('--invalidation-mode=unchecked-hash', self.pkgdir)
792        with open(pyc, 'rb') as fp:
793            data = fp.read()
794        self.assertEqual(int.from_bytes(data[4:8], 'little'), 0b01)
795
796    @skipUnless(_have_multiprocessing, "requires multiprocessing")
797    def test_workers(self):
798        bar2fn = script_helper.make_script(self.directory, 'bar2', '')
799        files = []
800        for suffix in range(5):
801            pkgdir = os.path.join(self.directory, 'foo{}'.format(suffix))
802            os.mkdir(pkgdir)
803            fn = script_helper.make_script(pkgdir, '__init__', '')
804            files.append(script_helper.make_script(pkgdir, 'bar2', ''))
805
806        self.assertRunOK(self.directory, '-j', '0')
807        self.assertCompiled(bar2fn)
808        for file in files:
809            self.assertCompiled(file)
810
811    @mock.patch('compileall.compile_dir')
812    def test_workers_available_cores(self, compile_dir):
813        with mock.patch("sys.argv",
814                        new=[sys.executable, self.directory, "-j0"]):
815            compileall.main()
816            self.assertTrue(compile_dir.called)
817            self.assertEqual(compile_dir.call_args[-1]['workers'], 0)
818
819    def test_strip_and_prepend(self):
820        fullpath = ["test", "build", "real", "path"]
821        path = os.path.join(self.directory, *fullpath)
822        os.makedirs(path)
823        script = script_helper.make_script(path, "test", "1 / 0")
824        bc = importlib.util.cache_from_source(script)
825        stripdir = os.path.join(self.directory, *fullpath[:2])
826        prependdir = "/foo"
827        self.assertRunOK("-s", stripdir, "-p", prependdir, path)
828        rc, out, err = script_helper.assert_python_failure(bc)
829        expected_in = os.path.join(prependdir, *fullpath[2:])
830        self.assertIn(
831            expected_in,
832            str(err, encoding=sys.getdefaultencoding())
833        )
834        self.assertNotIn(
835            stripdir,
836            str(err, encoding=sys.getdefaultencoding())
837        )
838
839    def test_multiple_optimization_levels(self):
840        path = os.path.join(self.directory, "optimizations")
841        os.makedirs(path)
842        script = script_helper.make_script(path,
843                                           "test_optimization",
844                                           "a = 0")
845        bc = []
846        for opt_level in "", 1, 2, 3:
847            bc.append(importlib.util.cache_from_source(script,
848                                                       optimization=opt_level))
849        test_combinations = [["0", "1"],
850                             ["1", "2"],
851                             ["0", "2"],
852                             ["0", "1", "2"]]
853        for opt_combination in test_combinations:
854            self.assertRunOK(path, *("-o" + str(n) for n in opt_combination))
855            for opt_level in opt_combination:
856                self.assertTrue(os.path.isfile(bc[int(opt_level)]))
857                try:
858                    os.unlink(bc[opt_level])
859                except Exception:
860                    pass
861
862    @os_helper.skip_unless_symlink
863    def test_ignore_symlink_destination(self):
864        # Create folders for allowed files, symlinks and prohibited area
865        allowed_path = os.path.join(self.directory, "test", "dir", "allowed")
866        symlinks_path = os.path.join(self.directory, "test", "dir", "symlinks")
867        prohibited_path = os.path.join(self.directory, "test", "dir", "prohibited")
868        os.makedirs(allowed_path)
869        os.makedirs(symlinks_path)
870        os.makedirs(prohibited_path)
871
872        # Create scripts and symlinks and remember their byte-compiled versions
873        allowed_script = script_helper.make_script(allowed_path, "test_allowed", "a = 0")
874        prohibited_script = script_helper.make_script(prohibited_path, "test_prohibited", "a = 0")
875        allowed_symlink = os.path.join(symlinks_path, "test_allowed.py")
876        prohibited_symlink = os.path.join(symlinks_path, "test_prohibited.py")
877        os.symlink(allowed_script, allowed_symlink)
878        os.symlink(prohibited_script, prohibited_symlink)
879        allowed_bc = importlib.util.cache_from_source(allowed_symlink)
880        prohibited_bc = importlib.util.cache_from_source(prohibited_symlink)
881
882        self.assertRunOK(symlinks_path, "-e", allowed_path)
883
884        self.assertTrue(os.path.isfile(allowed_bc))
885        self.assertFalse(os.path.isfile(prohibited_bc))
886
887    def test_hardlink_bad_args(self):
888        # Bad arguments combination, hardlink deduplication make sense
889        # only for more than one optimization level
890        self.assertRunNotOK(self.directory, "-o 1", "--hardlink-dupes")
891
892    def test_hardlink(self):
893        # 'a = 0' code produces the same bytecode for the 3 optimization
894        # levels. All three .pyc files must have the same inode (hardlinks).
895        #
896        # If deduplication is disabled, all pyc files must have different
897        # inodes.
898        for dedup in (True, False):
899            with tempfile.TemporaryDirectory() as path:
900                with self.subTest(dedup=dedup):
901                    script = script_helper.make_script(path, "script", "a = 0")
902                    pycs = get_pycs(script)
903
904                    args = ["-q", "-o 0", "-o 1", "-o 2"]
905                    if dedup:
906                        args.append("--hardlink-dupes")
907                    self.assertRunOK(path, *args)
908
909                    self.assertEqual(is_hardlink(pycs[0], pycs[1]), dedup)
910                    self.assertEqual(is_hardlink(pycs[1], pycs[2]), dedup)
911                    self.assertEqual(is_hardlink(pycs[0], pycs[2]), dedup)
912
913
914class CommandLineTestsWithSourceEpoch(CommandLineTestsBase,
915                                       unittest.TestCase,
916                                       metaclass=SourceDateEpochTestMeta,
917                                       source_date_epoch=True):
918    pass
919
920
921class CommandLineTestsNoSourceEpoch(CommandLineTestsBase,
922                                     unittest.TestCase,
923                                     metaclass=SourceDateEpochTestMeta,
924                                     source_date_epoch=False):
925    pass
926
927
928
929class HardlinkDedupTestsBase:
930    # Test hardlink_dupes parameter of compileall.compile_dir()
931
932    def setUp(self):
933        self.path = None
934
935    @contextlib.contextmanager
936    def temporary_directory(self):
937        with tempfile.TemporaryDirectory() as path:
938            self.path = path
939            yield path
940            self.path = None
941
942    def make_script(self, code, name="script"):
943        return script_helper.make_script(self.path, name, code)
944
945    def compile_dir(self, *, dedup=True, optimize=(0, 1, 2), force=False):
946        compileall.compile_dir(self.path, quiet=True, optimize=optimize,
947                               hardlink_dupes=dedup, force=force)
948
949    def test_bad_args(self):
950        # Bad arguments combination, hardlink deduplication make sense
951        # only for more than one optimization level
952        with self.temporary_directory():
953            self.make_script("pass")
954            with self.assertRaises(ValueError):
955                compileall.compile_dir(self.path, quiet=True, optimize=0,
956                                       hardlink_dupes=True)
957            with self.assertRaises(ValueError):
958                # same optimization level specified twice:
959                # compile_dir() removes duplicates
960                compileall.compile_dir(self.path, quiet=True, optimize=[0, 0],
961                                       hardlink_dupes=True)
962
963    def create_code(self, docstring=False, assertion=False):
964        lines = []
965        if docstring:
966            lines.append("'module docstring'")
967        lines.append('x = 1')
968        if assertion:
969            lines.append("assert x == 1")
970        return '\n'.join(lines)
971
972    def iter_codes(self):
973        for docstring in (False, True):
974            for assertion in (False, True):
975                code = self.create_code(docstring=docstring, assertion=assertion)
976                yield (code, docstring, assertion)
977
978    def test_disabled(self):
979        # Deduplication disabled, no hardlinks
980        for code, docstring, assertion in self.iter_codes():
981            with self.subTest(docstring=docstring, assertion=assertion):
982                with self.temporary_directory():
983                    script = self.make_script(code)
984                    pycs = get_pycs(script)
985                    self.compile_dir(dedup=False)
986                    self.assertFalse(is_hardlink(pycs[0], pycs[1]))
987                    self.assertFalse(is_hardlink(pycs[0], pycs[2]))
988                    self.assertFalse(is_hardlink(pycs[1], pycs[2]))
989
990    def check_hardlinks(self, script, docstring=False, assertion=False):
991        pycs = get_pycs(script)
992        self.assertEqual(is_hardlink(pycs[0], pycs[1]),
993                         not assertion)
994        self.assertEqual(is_hardlink(pycs[0], pycs[2]),
995                         not assertion and not docstring)
996        self.assertEqual(is_hardlink(pycs[1], pycs[2]),
997                         not docstring)
998
999    def test_hardlink(self):
1000        # Test deduplication on all combinations
1001        for code, docstring, assertion in self.iter_codes():
1002            with self.subTest(docstring=docstring, assertion=assertion):
1003                with self.temporary_directory():
1004                    script = self.make_script(code)
1005                    self.compile_dir()
1006                    self.check_hardlinks(script, docstring, assertion)
1007
1008    def test_only_two_levels(self):
1009        # Don't build the 3 optimization levels, but only 2
1010        for opts in ((0, 1), (1, 2), (0, 2)):
1011            with self.subTest(opts=opts):
1012                with self.temporary_directory():
1013                    # code with no dostring and no assertion:
1014                    # same bytecode for all optimization levels
1015                    script = self.make_script(self.create_code())
1016                    self.compile_dir(optimize=opts)
1017                    pyc1 = get_pyc(script, opts[0])
1018                    pyc2 = get_pyc(script, opts[1])
1019                    self.assertTrue(is_hardlink(pyc1, pyc2))
1020
1021    def test_duplicated_levels(self):
1022        # compile_dir() must not fail if optimize contains duplicated
1023        # optimization levels and/or if optimization levels are not sorted.
1024        with self.temporary_directory():
1025            # code with no dostring and no assertion:
1026            # same bytecode for all optimization levels
1027            script = self.make_script(self.create_code())
1028            self.compile_dir(optimize=[1, 0, 1, 0])
1029            pyc1 = get_pyc(script, 0)
1030            pyc2 = get_pyc(script, 1)
1031            self.assertTrue(is_hardlink(pyc1, pyc2))
1032
1033    def test_recompilation(self):
1034        # Test compile_dir() when pyc files already exists and the script
1035        # content changed
1036        with self.temporary_directory():
1037            script = self.make_script("a = 0")
1038            self.compile_dir()
1039            # All three levels have the same inode
1040            self.check_hardlinks(script)
1041
1042            pycs = get_pycs(script)
1043            inode = os.stat(pycs[0]).st_ino
1044
1045            # Change of the module content
1046            script = self.make_script("print(0)")
1047
1048            # Recompilation without -o 1
1049            self.compile_dir(optimize=[0, 2], force=True)
1050
1051            # opt-1.pyc should have the same inode as before and others should not
1052            self.assertEqual(inode, os.stat(pycs[1]).st_ino)
1053            self.assertTrue(is_hardlink(pycs[0], pycs[2]))
1054            self.assertNotEqual(inode, os.stat(pycs[2]).st_ino)
1055            # opt-1.pyc and opt-2.pyc have different content
1056            self.assertFalse(filecmp.cmp(pycs[1], pycs[2], shallow=True))
1057
1058    def test_import(self):
1059        # Test that import updates a single pyc file when pyc files already
1060        # exists and the script content changed
1061        with self.temporary_directory():
1062            script = self.make_script(self.create_code(), name="module")
1063            self.compile_dir()
1064            # All three levels have the same inode
1065            self.check_hardlinks(script)
1066
1067            pycs = get_pycs(script)
1068            inode = os.stat(pycs[0]).st_ino
1069
1070            # Change of the module content
1071            script = self.make_script("print(0)", name="module")
1072
1073            # Import the module in Python with -O (optimization level 1)
1074            script_helper.assert_python_ok(
1075                "-O", "-c", "import module", __isolated=False, PYTHONPATH=self.path
1076            )
1077
1078            # Only opt-1.pyc is changed
1079            self.assertEqual(inode, os.stat(pycs[0]).st_ino)
1080            self.assertEqual(inode, os.stat(pycs[2]).st_ino)
1081            self.assertFalse(is_hardlink(pycs[1], pycs[2]))
1082            # opt-1.pyc and opt-2.pyc have different content
1083            self.assertFalse(filecmp.cmp(pycs[1], pycs[2], shallow=True))
1084
1085
1086class HardlinkDedupTestsWithSourceEpoch(HardlinkDedupTestsBase,
1087                                        unittest.TestCase,
1088                                        metaclass=SourceDateEpochTestMeta,
1089                                        source_date_epoch=True):
1090    pass
1091
1092
1093class HardlinkDedupTestsNoSourceEpoch(HardlinkDedupTestsBase,
1094                                      unittest.TestCase,
1095                                      metaclass=SourceDateEpochTestMeta,
1096                                      source_date_epoch=False):
1097    pass
1098
1099
1100if __name__ == "__main__":
1101    unittest.main()
1102