1import abc
2import builtins
3import contextlib
4import errno
5import functools
6import importlib
7from importlib import machinery, util, invalidate_caches
8from importlib.abc import ResourceReader
9import io
10import marshal
11import os
12import os.path
13from pathlib import Path, PurePath
14from test import support
15import unittest
16import sys
17import tempfile
18import types
19
20from . import data01
21from . import zipdata01
22
23
24BUILTINS = types.SimpleNamespace()
25BUILTINS.good_name = None
26BUILTINS.bad_name = None
27if 'errno' in sys.builtin_module_names:
28    BUILTINS.good_name = 'errno'
29if 'importlib' not in sys.builtin_module_names:
30    BUILTINS.bad_name = 'importlib'
31
32EXTENSIONS = types.SimpleNamespace()
33EXTENSIONS.path = None
34EXTENSIONS.ext = None
35EXTENSIONS.filename = None
36EXTENSIONS.file_path = None
37EXTENSIONS.name = '_testcapi'
38
39def _extension_details():
40    global EXTENSIONS
41    for path in sys.path:
42        for ext in machinery.EXTENSION_SUFFIXES:
43            filename = EXTENSIONS.name + ext
44            file_path = os.path.join(path, filename)
45            if os.path.exists(file_path):
46                EXTENSIONS.path = path
47                EXTENSIONS.ext = ext
48                EXTENSIONS.filename = filename
49                EXTENSIONS.file_path = file_path
50                return
51
52_extension_details()
53
54
55def import_importlib(module_name):
56    """Import a module from importlib both w/ and w/o _frozen_importlib."""
57    fresh = ('importlib',) if '.' in module_name else ()
58    frozen = support.import_fresh_module(module_name)
59    source = support.import_fresh_module(module_name, fresh=fresh,
60                                         blocked=('_frozen_importlib', '_frozen_importlib_external'))
61    return {'Frozen': frozen, 'Source': source}
62
63
64def specialize_class(cls, kind, base=None, **kwargs):
65    # XXX Support passing in submodule names--load (and cache) them?
66    # That would clean up the test modules a bit more.
67    if base is None:
68        base = unittest.TestCase
69    elif not isinstance(base, type):
70        base = base[kind]
71    name = '{}_{}'.format(kind, cls.__name__)
72    bases = (cls, base)
73    specialized = types.new_class(name, bases)
74    specialized.__module__ = cls.__module__
75    specialized._NAME = cls.__name__
76    specialized._KIND = kind
77    for attr, values in kwargs.items():
78        value = values[kind]
79        setattr(specialized, attr, value)
80    return specialized
81
82
83def split_frozen(cls, base=None, **kwargs):
84    frozen = specialize_class(cls, 'Frozen', base, **kwargs)
85    source = specialize_class(cls, 'Source', base, **kwargs)
86    return frozen, source
87
88
89def test_both(test_class, base=None, **kwargs):
90    return split_frozen(test_class, base, **kwargs)
91
92
93CASE_INSENSITIVE_FS = True
94# Windows is the only OS that is *always* case-insensitive
95# (OS X *can* be case-sensitive).
96if sys.platform not in ('win32', 'cygwin'):
97    changed_name = __file__.upper()
98    if changed_name == __file__:
99        changed_name = __file__.lower()
100    if not os.path.exists(changed_name):
101        CASE_INSENSITIVE_FS = False
102
103source_importlib = import_importlib('importlib')['Source']
104__import__ = {'Frozen': staticmethod(builtins.__import__),
105              'Source': staticmethod(source_importlib.__import__)}
106
107
108def case_insensitive_tests(test):
109    """Class decorator that nullifies tests requiring a case-insensitive
110    file system."""
111    return unittest.skipIf(not CASE_INSENSITIVE_FS,
112                            "requires a case-insensitive filesystem")(test)
113
114
115def submodule(parent, name, pkg_dir, content=''):
116    path = os.path.join(pkg_dir, name + '.py')
117    with open(path, 'w') as subfile:
118        subfile.write(content)
119    return '{}.{}'.format(parent, name), path
120
121
122def _get_code_from_pyc(pyc_path):
123    """Reads a pyc file and returns the unmarshalled code object within.
124
125    No header validation is performed.
126    """
127    with open(pyc_path, 'rb') as pyc_f:
128        pyc_f.seek(16)
129        return marshal.load(pyc_f)
130
131
132@contextlib.contextmanager
133def uncache(*names):
134    """Uncache a module from sys.modules.
135
136    A basic sanity check is performed to prevent uncaching modules that either
137    cannot/shouldn't be uncached.
138
139    """
140    for name in names:
141        if name in ('sys', 'marshal', 'imp'):
142            raise ValueError(
143                "cannot uncache {0}".format(name))
144        try:
145            del sys.modules[name]
146        except KeyError:
147            pass
148    try:
149        yield
150    finally:
151        for name in names:
152            try:
153                del sys.modules[name]
154            except KeyError:
155                pass
156
157
158@contextlib.contextmanager
159def temp_module(name, content='', *, pkg=False):
160    conflicts = [n for n in sys.modules if n.partition('.')[0] == name]
161    with support.temp_cwd(None) as cwd:
162        with uncache(name, *conflicts):
163            with support.DirsOnSysPath(cwd):
164                invalidate_caches()
165
166                location = os.path.join(cwd, name)
167                if pkg:
168                    modpath = os.path.join(location, '__init__.py')
169                    os.mkdir(name)
170                else:
171                    modpath = location + '.py'
172                    if content is None:
173                        # Make sure the module file gets created.
174                        content = ''
175                if content is not None:
176                    # not a namespace package
177                    with open(modpath, 'w') as modfile:
178                        modfile.write(content)
179                yield location
180
181
182@contextlib.contextmanager
183def import_state(**kwargs):
184    """Context manager to manage the various importers and stored state in the
185    sys module.
186
187    The 'modules' attribute is not supported as the interpreter state stores a
188    pointer to the dict that the interpreter uses internally;
189    reassigning to sys.modules does not have the desired effect.
190
191    """
192    originals = {}
193    try:
194        for attr, default in (('meta_path', []), ('path', []),
195                              ('path_hooks', []),
196                              ('path_importer_cache', {})):
197            originals[attr] = getattr(sys, attr)
198            if attr in kwargs:
199                new_value = kwargs[attr]
200                del kwargs[attr]
201            else:
202                new_value = default
203            setattr(sys, attr, new_value)
204        if len(kwargs):
205            raise ValueError(
206                    'unrecognized arguments: {0}'.format(kwargs.keys()))
207        yield
208    finally:
209        for attr, value in originals.items():
210            setattr(sys, attr, value)
211
212
213class _ImporterMock:
214
215    """Base class to help with creating importer mocks."""
216
217    def __init__(self, *names, module_code={}):
218        self.modules = {}
219        self.module_code = {}
220        for name in names:
221            if not name.endswith('.__init__'):
222                import_name = name
223            else:
224                import_name = name[:-len('.__init__')]
225            if '.' not in name:
226                package = None
227            elif import_name == name:
228                package = name.rsplit('.', 1)[0]
229            else:
230                package = import_name
231            module = types.ModuleType(import_name)
232            module.__loader__ = self
233            module.__file__ = '<mock __file__>'
234            module.__package__ = package
235            module.attr = name
236            if import_name != name:
237                module.__path__ = ['<mock __path__>']
238            self.modules[import_name] = module
239            if import_name in module_code:
240                self.module_code[import_name] = module_code[import_name]
241
242    def __getitem__(self, name):
243        return self.modules[name]
244
245    def __enter__(self):
246        self._uncache = uncache(*self.modules.keys())
247        self._uncache.__enter__()
248        return self
249
250    def __exit__(self, *exc_info):
251        self._uncache.__exit__(None, None, None)
252
253
254class mock_modules(_ImporterMock):
255
256    """Importer mock using PEP 302 APIs."""
257
258    def find_module(self, fullname, path=None):
259        if fullname not in self.modules:
260            return None
261        else:
262            return self
263
264    def load_module(self, fullname):
265        if fullname not in self.modules:
266            raise ImportError
267        else:
268            sys.modules[fullname] = self.modules[fullname]
269            if fullname in self.module_code:
270                try:
271                    self.module_code[fullname]()
272                except Exception:
273                    del sys.modules[fullname]
274                    raise
275            return self.modules[fullname]
276
277
278class mock_spec(_ImporterMock):
279
280    """Importer mock using PEP 451 APIs."""
281
282    def find_spec(self, fullname, path=None, parent=None):
283        try:
284            module = self.modules[fullname]
285        except KeyError:
286            return None
287        spec = util.spec_from_file_location(
288                fullname, module.__file__, loader=self,
289                submodule_search_locations=getattr(module, '__path__', None))
290        return spec
291
292    def create_module(self, spec):
293        if spec.name not in self.modules:
294            raise ImportError
295        return self.modules[spec.name]
296
297    def exec_module(self, module):
298        try:
299            self.module_code[module.__spec__.name]()
300        except KeyError:
301            pass
302
303
304def writes_bytecode_files(fxn):
305    """Decorator to protect sys.dont_write_bytecode from mutation and to skip
306    tests that require it to be set to False."""
307    if sys.dont_write_bytecode:
308        return lambda *args, **kwargs: None
309    @functools.wraps(fxn)
310    def wrapper(*args, **kwargs):
311        original = sys.dont_write_bytecode
312        sys.dont_write_bytecode = False
313        try:
314            to_return = fxn(*args, **kwargs)
315        finally:
316            sys.dont_write_bytecode = original
317        return to_return
318    return wrapper
319
320
321def ensure_bytecode_path(bytecode_path):
322    """Ensure that the __pycache__ directory for PEP 3147 pyc file exists.
323
324    :param bytecode_path: File system path to PEP 3147 pyc file.
325    """
326    try:
327        os.mkdir(os.path.dirname(bytecode_path))
328    except OSError as error:
329        if error.errno != errno.EEXIST:
330            raise
331
332
333@contextlib.contextmanager
334def temporary_pycache_prefix(prefix):
335    """Adjust and restore sys.pycache_prefix."""
336    _orig_prefix = sys.pycache_prefix
337    sys.pycache_prefix = prefix
338    try:
339        yield
340    finally:
341        sys.pycache_prefix = _orig_prefix
342
343
344@contextlib.contextmanager
345def create_modules(*names):
346    """Temporarily create each named module with an attribute (named 'attr')
347    that contains the name passed into the context manager that caused the
348    creation of the module.
349
350    All files are created in a temporary directory returned by
351    tempfile.mkdtemp(). This directory is inserted at the beginning of
352    sys.path. When the context manager exits all created files (source and
353    bytecode) are explicitly deleted.
354
355    No magic is performed when creating packages! This means that if you create
356    a module within a package you must also create the package's __init__ as
357    well.
358
359    """
360    source = 'attr = {0!r}'
361    created_paths = []
362    mapping = {}
363    state_manager = None
364    uncache_manager = None
365    try:
366        temp_dir = tempfile.mkdtemp()
367        mapping['.root'] = temp_dir
368        import_names = set()
369        for name in names:
370            if not name.endswith('__init__'):
371                import_name = name
372            else:
373                import_name = name[:-len('.__init__')]
374            import_names.add(import_name)
375            if import_name in sys.modules:
376                del sys.modules[import_name]
377            name_parts = name.split('.')
378            file_path = temp_dir
379            for directory in name_parts[:-1]:
380                file_path = os.path.join(file_path, directory)
381                if not os.path.exists(file_path):
382                    os.mkdir(file_path)
383                    created_paths.append(file_path)
384            file_path = os.path.join(file_path, name_parts[-1] + '.py')
385            with open(file_path, 'w') as file:
386                file.write(source.format(name))
387            created_paths.append(file_path)
388            mapping[name] = file_path
389        uncache_manager = uncache(*import_names)
390        uncache_manager.__enter__()
391        state_manager = import_state(path=[temp_dir])
392        state_manager.__enter__()
393        yield mapping
394    finally:
395        if state_manager is not None:
396            state_manager.__exit__(None, None, None)
397        if uncache_manager is not None:
398            uncache_manager.__exit__(None, None, None)
399        support.rmtree(temp_dir)
400
401
402def mock_path_hook(*entries, importer):
403    """A mock sys.path_hooks entry."""
404    def hook(entry):
405        if entry not in entries:
406            raise ImportError
407        return importer
408    return hook
409
410
411class CASEOKTestBase:
412
413    def caseok_env_changed(self, *, should_exist):
414        possibilities = b'PYTHONCASEOK', 'PYTHONCASEOK'
415        if any(x in self.importlib._bootstrap_external._os.environ
416                    for x in possibilities) != should_exist:
417            self.skipTest('os.environ changes not reflected in _os.environ')
418
419
420def create_package(file, path, is_package=True, contents=()):
421    class Reader(ResourceReader):
422        def get_resource_reader(self, package):
423            return self
424
425        def open_resource(self, path):
426            self._path = path
427            if isinstance(file, Exception):
428                raise file
429            else:
430                return file
431
432        def resource_path(self, path_):
433            self._path = path_
434            if isinstance(path, Exception):
435                raise path
436            else:
437                return path
438
439        def is_resource(self, path_):
440            self._path = path_
441            if isinstance(path, Exception):
442                raise path
443            for entry in contents:
444                parts = entry.split('/')
445                if len(parts) == 1 and parts[0] == path_:
446                    return True
447            return False
448
449        def contents(self):
450            if isinstance(path, Exception):
451                raise path
452            # There's no yield from in baseball, er, Python 2.
453            for entry in contents:
454                yield entry
455
456    name = 'testingpackage'
457    # Unfortunately importlib.util.module_from_spec() was not introduced until
458    # Python 3.5.
459    module = types.ModuleType(name)
460    loader = Reader()
461    spec = machinery.ModuleSpec(
462        name, loader,
463        origin='does-not-exist',
464        is_package=is_package)
465    module.__spec__ = spec
466    module.__loader__ = loader
467    return module
468
469
470class CommonResourceTests(abc.ABC):
471    @abc.abstractmethod
472    def execute(self, package, path):
473        raise NotImplementedError
474
475    def test_package_name(self):
476        # Passing in the package name should succeed.
477        self.execute(data01.__name__, 'utf-8.file')
478
479    def test_package_object(self):
480        # Passing in the package itself should succeed.
481        self.execute(data01, 'utf-8.file')
482
483    def test_string_path(self):
484        # Passing in a string for the path should succeed.
485        path = 'utf-8.file'
486        self.execute(data01, path)
487
488    @unittest.skipIf(sys.version_info < (3, 6), 'requires os.PathLike support')
489    def test_pathlib_path(self):
490        # Passing in a pathlib.PurePath object for the path should succeed.
491        path = PurePath('utf-8.file')
492        self.execute(data01, path)
493
494    def test_absolute_path(self):
495        # An absolute path is a ValueError.
496        path = Path(__file__)
497        full_path = path.parent/'utf-8.file'
498        with self.assertRaises(ValueError):
499            self.execute(data01, full_path)
500
501    def test_relative_path(self):
502        # A reative path is a ValueError.
503        with self.assertRaises(ValueError):
504            self.execute(data01, '../data01/utf-8.file')
505
506    def test_importing_module_as_side_effect(self):
507        # The anchor package can already be imported.
508        del sys.modules[data01.__name__]
509        self.execute(data01.__name__, 'utf-8.file')
510
511    def test_non_package_by_name(self):
512        # The anchor package cannot be a module.
513        with self.assertRaises(TypeError):
514            self.execute(__name__, 'utf-8.file')
515
516    def test_non_package_by_package(self):
517        # The anchor package cannot be a module.
518        with self.assertRaises(TypeError):
519            module = sys.modules['test.test_importlib.util']
520            self.execute(module, 'utf-8.file')
521
522    @unittest.skipIf(sys.version_info < (3,), 'No ResourceReader in Python 2')
523    def test_resource_opener(self):
524        bytes_data = io.BytesIO(b'Hello, world!')
525        package = create_package(file=bytes_data, path=FileNotFoundError())
526        self.execute(package, 'utf-8.file')
527        self.assertEqual(package.__loader__._path, 'utf-8.file')
528
529    @unittest.skipIf(sys.version_info < (3,), 'No ResourceReader in Python 2')
530    def test_resource_path(self):
531        bytes_data = io.BytesIO(b'Hello, world!')
532        path = __file__
533        package = create_package(file=bytes_data, path=path)
534        self.execute(package, 'utf-8.file')
535        self.assertEqual(package.__loader__._path, 'utf-8.file')
536
537    def test_useless_loader(self):
538        package = create_package(file=FileNotFoundError(),
539                                 path=FileNotFoundError())
540        with self.assertRaises(FileNotFoundError):
541            self.execute(package, 'utf-8.file')
542
543
544class ZipSetupBase:
545    ZIP_MODULE = None
546
547    @classmethod
548    def setUpClass(cls):
549        data_path = Path(cls.ZIP_MODULE.__file__)
550        data_dir = data_path.parent
551        cls._zip_path = str(data_dir / 'ziptestdata.zip')
552        sys.path.append(cls._zip_path)
553        cls.data = importlib.import_module('ziptestdata')
554
555    @classmethod
556    def tearDownClass(cls):
557        try:
558            sys.path.remove(cls._zip_path)
559        except ValueError:
560            pass
561
562        try:
563            del sys.path_importer_cache[cls._zip_path]
564            del sys.modules[cls.data.__name__]
565        except KeyError:
566            pass
567
568        try:
569            del cls.data
570            del cls._zip_path
571        except AttributeError:
572            pass
573
574    def setUp(self):
575        modules = support.modules_setup()
576        self.addCleanup(support.modules_cleanup, *modules)
577
578
579class ZipSetup(ZipSetupBase):
580    ZIP_MODULE = zipdata01                          # type: ignore
581