1from pathlib import Path
2from test.support.import_helper import unload, CleanImport
3from test.support.warnings_helper import check_warnings
4import unittest
5import sys
6import importlib
7from importlib.util import spec_from_file_location
8import pkgutil
9import os
10import os.path
11import tempfile
12import shutil
13import zipfile
14
15# Note: pkgutil.walk_packages is currently tested in test_runpy. This is
16# a hack to get a major issue resolved for 3.3b2. Longer term, it should
17# be moved back here, perhaps by factoring out the helper code for
18# creating interesting package layouts to a separate module.
19# Issue #15348 declares this is indeed a dodgy hack ;)
20
21class PkgutilTests(unittest.TestCase):
22
23    def setUp(self):
24        self.dirname = tempfile.mkdtemp()
25        self.addCleanup(shutil.rmtree, self.dirname)
26        sys.path.insert(0, self.dirname)
27
28    def tearDown(self):
29        del sys.path[0]
30
31    def test_getdata_filesys(self):
32        pkg = 'test_getdata_filesys'
33
34        # Include a LF and a CRLF, to test that binary data is read back
35        RESOURCE_DATA = b'Hello, world!\nSecond line\r\nThird line'
36
37        # Make a package with some resources
38        package_dir = os.path.join(self.dirname, pkg)
39        os.mkdir(package_dir)
40        # Empty init.py
41        f = open(os.path.join(package_dir, '__init__.py'), "wb")
42        f.close()
43        # Resource files, res.txt, sub/res.txt
44        f = open(os.path.join(package_dir, 'res.txt'), "wb")
45        f.write(RESOURCE_DATA)
46        f.close()
47        os.mkdir(os.path.join(package_dir, 'sub'))
48        f = open(os.path.join(package_dir, 'sub', 'res.txt'), "wb")
49        f.write(RESOURCE_DATA)
50        f.close()
51
52        # Check we can read the resources
53        res1 = pkgutil.get_data(pkg, 'res.txt')
54        self.assertEqual(res1, RESOURCE_DATA)
55        res2 = pkgutil.get_data(pkg, 'sub/res.txt')
56        self.assertEqual(res2, RESOURCE_DATA)
57
58        del sys.modules[pkg]
59
60    def test_getdata_zipfile(self):
61        zip = 'test_getdata_zipfile.zip'
62        pkg = 'test_getdata_zipfile'
63
64        # Include a LF and a CRLF, to test that binary data is read back
65        RESOURCE_DATA = b'Hello, world!\nSecond line\r\nThird line'
66
67        # Make a package with some resources
68        zip_file = os.path.join(self.dirname, zip)
69        z = zipfile.ZipFile(zip_file, 'w')
70
71        # Empty init.py
72        z.writestr(pkg + '/__init__.py', "")
73        # Resource files, res.txt, sub/res.txt
74        z.writestr(pkg + '/res.txt', RESOURCE_DATA)
75        z.writestr(pkg + '/sub/res.txt', RESOURCE_DATA)
76        z.close()
77
78        # Check we can read the resources
79        sys.path.insert(0, zip_file)
80        res1 = pkgutil.get_data(pkg, 'res.txt')
81        self.assertEqual(res1, RESOURCE_DATA)
82        res2 = pkgutil.get_data(pkg, 'sub/res.txt')
83        self.assertEqual(res2, RESOURCE_DATA)
84
85        names = []
86        for moduleinfo in pkgutil.iter_modules([zip_file]):
87            self.assertIsInstance(moduleinfo, pkgutil.ModuleInfo)
88            names.append(moduleinfo.name)
89        self.assertEqual(names, ['test_getdata_zipfile'])
90
91        del sys.path[0]
92
93        del sys.modules[pkg]
94
95    def test_issue44061_iter_modules(self):
96        #see: issue44061
97        zip = 'test_getdata_zipfile.zip'
98        pkg = 'test_getdata_zipfile'
99
100        # Include a LF and a CRLF, to test that binary data is read back
101        RESOURCE_DATA = b'Hello, world!\nSecond line\r\nThird line'
102
103        # Make a package with some resources
104        zip_file = os.path.join(self.dirname, zip)
105        z = zipfile.ZipFile(zip_file, 'w')
106
107        # Empty init.py
108        z.writestr(pkg + '/__init__.py', "")
109        # Resource files, res.txt
110        z.writestr(pkg + '/res.txt', RESOURCE_DATA)
111        z.close()
112
113        # Check we can read the resources
114        sys.path.insert(0, zip_file)
115        try:
116            res = pkgutil.get_data(pkg, 'res.txt')
117            self.assertEqual(res, RESOURCE_DATA)
118
119            # make sure iter_modules accepts Path objects
120            names = []
121            for moduleinfo in pkgutil.iter_modules([Path(zip_file)]):
122                self.assertIsInstance(moduleinfo, pkgutil.ModuleInfo)
123                names.append(moduleinfo.name)
124            self.assertEqual(names, [pkg])
125        finally:
126            del sys.path[0]
127            sys.modules.pop(pkg, None)
128
129        # assert path must be None or list of paths
130        expected_msg = "path must be None or list of paths to look for modules in"
131        with self.assertRaisesRegex(ValueError, expected_msg):
132            list(pkgutil.iter_modules("invalid_path"))
133
134    def test_unreadable_dir_on_syspath(self):
135        # issue7367 - walk_packages failed if unreadable dir on sys.path
136        package_name = "unreadable_package"
137        d = os.path.join(self.dirname, package_name)
138        # this does not appear to create an unreadable dir on Windows
139        #   but the test should not fail anyway
140        os.mkdir(d, 0)
141        self.addCleanup(os.rmdir, d)
142        for t in pkgutil.walk_packages(path=[self.dirname]):
143            self.fail("unexpected package found")
144
145    def test_walkpackages_filesys(self):
146        pkg1 = 'test_walkpackages_filesys'
147        pkg1_dir = os.path.join(self.dirname, pkg1)
148        os.mkdir(pkg1_dir)
149        f = open(os.path.join(pkg1_dir, '__init__.py'), "wb")
150        f.close()
151        os.mkdir(os.path.join(pkg1_dir, 'sub'))
152        f = open(os.path.join(pkg1_dir, 'sub', '__init__.py'), "wb")
153        f.close()
154        f = open(os.path.join(pkg1_dir, 'sub', 'mod.py'), "wb")
155        f.close()
156
157        # Now, to juice it up, let's add the opposite packages, too.
158        pkg2 = 'sub'
159        pkg2_dir = os.path.join(self.dirname, pkg2)
160        os.mkdir(pkg2_dir)
161        f = open(os.path.join(pkg2_dir, '__init__.py'), "wb")
162        f.close()
163        os.mkdir(os.path.join(pkg2_dir, 'test_walkpackages_filesys'))
164        f = open(os.path.join(pkg2_dir, 'test_walkpackages_filesys', '__init__.py'), "wb")
165        f.close()
166        f = open(os.path.join(pkg2_dir, 'test_walkpackages_filesys', 'mod.py'), "wb")
167        f.close()
168
169        expected = [
170            'sub',
171            'sub.test_walkpackages_filesys',
172            'sub.test_walkpackages_filesys.mod',
173            'test_walkpackages_filesys',
174            'test_walkpackages_filesys.sub',
175            'test_walkpackages_filesys.sub.mod',
176        ]
177        actual= [e[1] for e in pkgutil.walk_packages([self.dirname])]
178        self.assertEqual(actual, expected)
179
180        for pkg in expected:
181            if pkg.endswith('mod'):
182                continue
183            del sys.modules[pkg]
184
185    def test_walkpackages_zipfile(self):
186        """Tests the same as test_walkpackages_filesys, only with a zip file."""
187
188        zip = 'test_walkpackages_zipfile.zip'
189        pkg1 = 'test_walkpackages_zipfile'
190        pkg2 = 'sub'
191
192        zip_file = os.path.join(self.dirname, zip)
193        z = zipfile.ZipFile(zip_file, 'w')
194        z.writestr(pkg2 + '/__init__.py', "")
195        z.writestr(pkg2 + '/' + pkg1 + '/__init__.py', "")
196        z.writestr(pkg2 + '/' + pkg1 + '/mod.py', "")
197        z.writestr(pkg1 + '/__init__.py', "")
198        z.writestr(pkg1 + '/' + pkg2 + '/__init__.py', "")
199        z.writestr(pkg1 + '/' + pkg2 + '/mod.py', "")
200        z.close()
201
202        sys.path.insert(0, zip_file)
203        expected = [
204            'sub',
205            'sub.test_walkpackages_zipfile',
206            'sub.test_walkpackages_zipfile.mod',
207            'test_walkpackages_zipfile',
208            'test_walkpackages_zipfile.sub',
209            'test_walkpackages_zipfile.sub.mod',
210        ]
211        actual= [e[1] for e in pkgutil.walk_packages([zip_file])]
212        self.assertEqual(actual, expected)
213        del sys.path[0]
214
215        for pkg in expected:
216            if pkg.endswith('mod'):
217                continue
218            del sys.modules[pkg]
219
220    def test_walk_packages_raises_on_string_or_bytes_input(self):
221
222        str_input = 'test_dir'
223        with self.assertRaises((TypeError, ValueError)):
224            list(pkgutil.walk_packages(str_input))
225
226        bytes_input = b'test_dir'
227        with self.assertRaises((TypeError, ValueError)):
228            list(pkgutil.walk_packages(bytes_input))
229
230    def test_name_resolution(self):
231        import logging
232        import logging.handlers
233
234        success_cases = (
235            ('os', os),
236            ('os.path', os.path),
237            ('os.path:pathsep', os.path.pathsep),
238            ('logging', logging),
239            ('logging:', logging),
240            ('logging.handlers', logging.handlers),
241            ('logging.handlers:', logging.handlers),
242            ('logging.handlers:SysLogHandler', logging.handlers.SysLogHandler),
243            ('logging.handlers.SysLogHandler', logging.handlers.SysLogHandler),
244            ('logging.handlers:SysLogHandler.LOG_ALERT',
245                logging.handlers.SysLogHandler.LOG_ALERT),
246            ('logging.handlers.SysLogHandler.LOG_ALERT',
247                logging.handlers.SysLogHandler.LOG_ALERT),
248            ('builtins.int', int),
249            ('builtins:int', int),
250            ('builtins.int.from_bytes', int.from_bytes),
251            ('builtins:int.from_bytes', int.from_bytes),
252            ('builtins.ZeroDivisionError', ZeroDivisionError),
253            ('builtins:ZeroDivisionError', ZeroDivisionError),
254            ('os:path', os.path),
255        )
256
257        failure_cases = (
258            (None, TypeError),
259            (1, TypeError),
260            (2.0, TypeError),
261            (True, TypeError),
262            ('', ValueError),
263            ('?abc', ValueError),
264            ('abc/foo', ValueError),
265            ('foo', ImportError),
266            ('os.foo', AttributeError),
267            ('os.foo:', ImportError),
268            ('os.pth:pathsep', ImportError),
269            ('logging.handlers:NoSuchHandler', AttributeError),
270            ('logging.handlers:SysLogHandler.NO_SUCH_VALUE', AttributeError),
271            ('logging.handlers.SysLogHandler.NO_SUCH_VALUE', AttributeError),
272            ('ZeroDivisionError', ImportError),
273            ('os.path.9abc', ValueError),
274            ('9abc', ValueError),
275        )
276
277        # add some Unicode package names to the mix.
278
279        unicode_words = ('\u0935\u092e\u0938',
280                         '\xe9', '\xc8',
281                         '\uc548\ub155\ud558\uc138\uc694',
282                         '\u3055\u3088\u306a\u3089',
283                         '\u3042\u308a\u304c\u3068\u3046',
284                         '\u0425\u043e\u0440\u043e\u0448\u043e',
285                         '\u0441\u043f\u0430\u0441\u0438\u0431\u043e',
286                         '\u73b0\u4ee3\u6c49\u8bed\u5e38\u7528\u5b57\u8868')
287
288        for uw in unicode_words:
289            d = os.path.join(self.dirname, uw)
290            try:
291                os.makedirs(d, exist_ok=True)
292            except  UnicodeEncodeError:
293                # When filesystem encoding cannot encode uw: skip this test
294                continue
295            # make an empty __init__.py file
296            f = os.path.join(d, '__init__.py')
297            with open(f, 'w') as f:
298                f.write('')
299                f.flush()
300            # now import the package we just created; clearing the caches is
301            # needed, otherwise the newly created package isn't found
302            importlib.invalidate_caches()
303            mod = importlib.import_module(uw)
304            success_cases += (uw, mod),
305            if len(uw) > 1:
306                failure_cases += (uw[:-1], ImportError),
307
308        # add an example with a Unicode digit at the start
309        failure_cases += ('\u0966\u0935\u092e\u0938', ValueError),
310
311        for s, expected in success_cases:
312            with self.subTest(s=s):
313                o = pkgutil.resolve_name(s)
314                self.assertEqual(o, expected)
315
316        for s, exc in failure_cases:
317            with self.subTest(s=s):
318                with self.assertRaises(exc):
319                    pkgutil.resolve_name(s)
320
321
322class PkgutilPEP302Tests(unittest.TestCase):
323
324    class MyTestLoader(object):
325        def create_module(self, spec):
326            return None
327
328        def exec_module(self, mod):
329            # Count how many times the module is reloaded
330            mod.__dict__['loads'] = mod.__dict__.get('loads', 0) + 1
331
332        def get_data(self, path):
333            return "Hello, world!"
334
335    class MyTestImporter(object):
336        def find_spec(self, fullname, path=None, target=None):
337            loader = PkgutilPEP302Tests.MyTestLoader()
338            return spec_from_file_location(fullname,
339                                           '<%s>' % loader.__class__.__name__,
340                                           loader=loader,
341                                           submodule_search_locations=[])
342
343    def setUp(self):
344        sys.meta_path.insert(0, self.MyTestImporter())
345
346    def tearDown(self):
347        del sys.meta_path[0]
348
349    def test_getdata_pep302(self):
350        # Use a dummy finder/loader
351        self.assertEqual(pkgutil.get_data('foo', 'dummy'), "Hello, world!")
352        del sys.modules['foo']
353
354    def test_alreadyloaded(self):
355        # Ensure that get_data works without reloading - the "loads" module
356        # variable in the example loader should count how many times a reload
357        # occurs.
358        import foo
359        self.assertEqual(foo.loads, 1)
360        self.assertEqual(pkgutil.get_data('foo', 'dummy'), "Hello, world!")
361        self.assertEqual(foo.loads, 1)
362        del sys.modules['foo']
363
364
365# These tests, especially the setup and cleanup, are hideous. They
366# need to be cleaned up once issue 14715 is addressed.
367class ExtendPathTests(unittest.TestCase):
368    def create_init(self, pkgname):
369        dirname = tempfile.mkdtemp()
370        sys.path.insert(0, dirname)
371
372        pkgdir = os.path.join(dirname, pkgname)
373        os.mkdir(pkgdir)
374        with open(os.path.join(pkgdir, '__init__.py'), 'w') as fl:
375            fl.write('from pkgutil import extend_path\n__path__ = extend_path(__path__, __name__)\n')
376
377        return dirname
378
379    def create_submodule(self, dirname, pkgname, submodule_name, value):
380        module_name = os.path.join(dirname, pkgname, submodule_name + '.py')
381        with open(module_name, 'w') as fl:
382            print('value={}'.format(value), file=fl)
383
384    def test_simple(self):
385        pkgname = 'foo'
386        dirname_0 = self.create_init(pkgname)
387        dirname_1 = self.create_init(pkgname)
388        self.create_submodule(dirname_0, pkgname, 'bar', 0)
389        self.create_submodule(dirname_1, pkgname, 'baz', 1)
390        import foo.bar
391        import foo.baz
392        # Ensure we read the expected values
393        self.assertEqual(foo.bar.value, 0)
394        self.assertEqual(foo.baz.value, 1)
395
396        # Ensure the path is set up correctly
397        self.assertEqual(sorted(foo.__path__),
398                         sorted([os.path.join(dirname_0, pkgname),
399                                 os.path.join(dirname_1, pkgname)]))
400
401        # Cleanup
402        shutil.rmtree(dirname_0)
403        shutil.rmtree(dirname_1)
404        del sys.path[0]
405        del sys.path[0]
406        del sys.modules['foo']
407        del sys.modules['foo.bar']
408        del sys.modules['foo.baz']
409
410
411    # Another awful testing hack to be cleaned up once the test_runpy
412    # helpers are factored out to a common location
413    def test_iter_importers(self):
414        iter_importers = pkgutil.iter_importers
415        get_importer = pkgutil.get_importer
416
417        pkgname = 'spam'
418        modname = 'eggs'
419        dirname = self.create_init(pkgname)
420        pathitem = os.path.join(dirname, pkgname)
421        fullname = '{}.{}'.format(pkgname, modname)
422        sys.modules.pop(fullname, None)
423        sys.modules.pop(pkgname, None)
424        try:
425            self.create_submodule(dirname, pkgname, modname, 0)
426
427            importlib.import_module(fullname)
428
429            importers = list(iter_importers(fullname))
430            expected_importer = get_importer(pathitem)
431            for finder in importers:
432                spec = pkgutil._get_spec(finder, fullname)
433                loader = spec.loader
434                try:
435                    loader = loader.loader
436                except AttributeError:
437                    # For now we still allow raw loaders from
438                    # find_module().
439                    pass
440                self.assertIsInstance(finder, importlib.machinery.FileFinder)
441                self.assertEqual(finder, expected_importer)
442                self.assertIsInstance(loader,
443                                      importlib.machinery.SourceFileLoader)
444                self.assertIsNone(pkgutil._get_spec(finder, pkgname))
445
446            with self.assertRaises(ImportError):
447                list(iter_importers('invalid.module'))
448
449            with self.assertRaises(ImportError):
450                list(iter_importers('.spam'))
451        finally:
452            shutil.rmtree(dirname)
453            del sys.path[0]
454            try:
455                del sys.modules['spam']
456                del sys.modules['spam.eggs']
457            except KeyError:
458                pass
459
460
461    def test_mixed_namespace(self):
462        pkgname = 'foo'
463        dirname_0 = self.create_init(pkgname)
464        dirname_1 = self.create_init(pkgname)
465        self.create_submodule(dirname_0, pkgname, 'bar', 0)
466        # Turn this into a PEP 420 namespace package
467        os.unlink(os.path.join(dirname_0, pkgname, '__init__.py'))
468        self.create_submodule(dirname_1, pkgname, 'baz', 1)
469        import foo.bar
470        import foo.baz
471        # Ensure we read the expected values
472        self.assertEqual(foo.bar.value, 0)
473        self.assertEqual(foo.baz.value, 1)
474
475        # Ensure the path is set up correctly
476        self.assertEqual(sorted(foo.__path__),
477                         sorted([os.path.join(dirname_0, pkgname),
478                                 os.path.join(dirname_1, pkgname)]))
479
480        # Cleanup
481        shutil.rmtree(dirname_0)
482        shutil.rmtree(dirname_1)
483        del sys.path[0]
484        del sys.path[0]
485        del sys.modules['foo']
486        del sys.modules['foo.bar']
487        del sys.modules['foo.baz']
488
489    # XXX: test .pkg files
490
491
492class NestedNamespacePackageTest(unittest.TestCase):
493
494    def setUp(self):
495        self.basedir = tempfile.mkdtemp()
496        self.old_path = sys.path[:]
497
498    def tearDown(self):
499        sys.path[:] = self.old_path
500        shutil.rmtree(self.basedir)
501
502    def create_module(self, name, contents):
503        base, final = name.rsplit('.', 1)
504        base_path = os.path.join(self.basedir, base.replace('.', os.path.sep))
505        os.makedirs(base_path, exist_ok=True)
506        with open(os.path.join(base_path, final + ".py"), 'w') as f:
507            f.write(contents)
508
509    def test_nested(self):
510        pkgutil_boilerplate = (
511            'import pkgutil; '
512            '__path__ = pkgutil.extend_path(__path__, __name__)')
513        self.create_module('a.pkg.__init__', pkgutil_boilerplate)
514        self.create_module('b.pkg.__init__', pkgutil_boilerplate)
515        self.create_module('a.pkg.subpkg.__init__', pkgutil_boilerplate)
516        self.create_module('b.pkg.subpkg.__init__', pkgutil_boilerplate)
517        self.create_module('a.pkg.subpkg.c', 'c = 1')
518        self.create_module('b.pkg.subpkg.d', 'd = 2')
519        sys.path.insert(0, os.path.join(self.basedir, 'a'))
520        sys.path.insert(0, os.path.join(self.basedir, 'b'))
521        import pkg
522        self.addCleanup(unload, 'pkg')
523        self.assertEqual(len(pkg.__path__), 2)
524        import pkg.subpkg
525        self.addCleanup(unload, 'pkg.subpkg')
526        self.assertEqual(len(pkg.subpkg.__path__), 2)
527        from pkg.subpkg.c import c
528        from pkg.subpkg.d import d
529        self.assertEqual(c, 1)
530        self.assertEqual(d, 2)
531
532
533class ImportlibMigrationTests(unittest.TestCase):
534    # With full PEP 302 support in the standard import machinery, the
535    # PEP 302 emulation in this module is in the process of being
536    # deprecated in favour of importlib proper
537
538    def check_deprecated(self):
539        return check_warnings(
540            ("This emulation is deprecated and slated for removal in "
541             "Python 3.12; use 'importlib' instead",
542             DeprecationWarning))
543
544    def test_importer_deprecated(self):
545        with self.check_deprecated():
546            pkgutil.ImpImporter("")
547
548    def test_loader_deprecated(self):
549        with self.check_deprecated():
550            pkgutil.ImpLoader("", "", "", "")
551
552    def test_get_loader_avoids_emulation(self):
553        with check_warnings() as w:
554            self.assertIsNotNone(pkgutil.get_loader("sys"))
555            self.assertIsNotNone(pkgutil.get_loader("os"))
556            self.assertIsNotNone(pkgutil.get_loader("test.support"))
557            self.assertEqual(len(w.warnings), 0)
558
559    @unittest.skipIf(__name__ == '__main__', 'not compatible with __main__')
560    def test_get_loader_handles_missing_loader_attribute(self):
561        global __loader__
562        this_loader = __loader__
563        del __loader__
564        try:
565            with check_warnings() as w:
566                self.assertIsNotNone(pkgutil.get_loader(__name__))
567                self.assertEqual(len(w.warnings), 0)
568        finally:
569            __loader__ = this_loader
570
571    def test_get_loader_handles_missing_spec_attribute(self):
572        name = 'spam'
573        mod = type(sys)(name)
574        del mod.__spec__
575        with CleanImport(name):
576            sys.modules[name] = mod
577            loader = pkgutil.get_loader(name)
578        self.assertIsNone(loader)
579
580    def test_get_loader_handles_spec_attribute_none(self):
581        name = 'spam'
582        mod = type(sys)(name)
583        mod.__spec__ = None
584        with CleanImport(name):
585            sys.modules[name] = mod
586            loader = pkgutil.get_loader(name)
587        self.assertIsNone(loader)
588
589    def test_get_loader_None_in_sys_modules(self):
590        name = 'totally bogus'
591        sys.modules[name] = None
592        try:
593            loader = pkgutil.get_loader(name)
594        finally:
595            del sys.modules[name]
596        self.assertIsNone(loader)
597
598    def test_find_loader_missing_module(self):
599        name = 'totally bogus'
600        loader = pkgutil.find_loader(name)
601        self.assertIsNone(loader)
602
603    def test_find_loader_avoids_emulation(self):
604        with check_warnings() as w:
605            self.assertIsNotNone(pkgutil.find_loader("sys"))
606            self.assertIsNotNone(pkgutil.find_loader("os"))
607            self.assertIsNotNone(pkgutil.find_loader("test.support"))
608            self.assertEqual(len(w.warnings), 0)
609
610    def test_get_importer_avoids_emulation(self):
611        # We use an illegal path so *none* of the path hooks should fire
612        with check_warnings() as w:
613            self.assertIsNone(pkgutil.get_importer("*??"))
614            self.assertEqual(len(w.warnings), 0)
615
616    def test_issue44061(self):
617        try:
618            pkgutil.get_importer(Path("/home"))
619        except AttributeError:
620            self.fail("Unexpected AttributeError when calling get_importer")
621
622    def test_iter_importers_avoids_emulation(self):
623        with check_warnings() as w:
624            for importer in pkgutil.iter_importers(): pass
625            self.assertEqual(len(w.warnings), 0)
626
627
628def tearDownModule():
629    # this is necessary if test is run repeated (like when finding leaks)
630    import zipimport
631    import importlib
632    zipimport._zip_directory_cache.clear()
633    importlib.invalidate_caches()
634
635
636if __name__ == '__main__':
637    unittest.main()
638