1from test.support import run_unittest, unload, check_warnings, CleanImport
2from pathlib import Path
3import unittest
4import sys
5import importlib
6from importlib.util import spec_from_file_location
7import pkgutil
8import os
9import os.path
10import tempfile
11import shutil
12import zipfile
13
14# Note: pkgutil.walk_packages is currently tested in test_runpy. This is
15# a hack to get a major issue resolved for 3.3b2. Longer term, it should
16# be moved back here, perhaps by factoring out the helper code for
17# creating interesting package layouts to a separate module.
18# Issue #15348 declares this is indeed a dodgy hack ;)
19
20class PkgutilTests(unittest.TestCase):
21
22    def setUp(self):
23        self.dirname = tempfile.mkdtemp()
24        self.addCleanup(shutil.rmtree, self.dirname)
25        sys.path.insert(0, self.dirname)
26
27    def tearDown(self):
28        del sys.path[0]
29
30    def test_getdata_filesys(self):
31        pkg = 'test_getdata_filesys'
32
33        # Include a LF and a CRLF, to test that binary data is read back
34        RESOURCE_DATA = b'Hello, world!\nSecond line\r\nThird line'
35
36        # Make a package with some resources
37        package_dir = os.path.join(self.dirname, pkg)
38        os.mkdir(package_dir)
39        # Empty init.py
40        f = open(os.path.join(package_dir, '__init__.py'), "wb")
41        f.close()
42        # Resource files, res.txt, sub/res.txt
43        f = open(os.path.join(package_dir, 'res.txt'), "wb")
44        f.write(RESOURCE_DATA)
45        f.close()
46        os.mkdir(os.path.join(package_dir, 'sub'))
47        f = open(os.path.join(package_dir, 'sub', 'res.txt'), "wb")
48        f.write(RESOURCE_DATA)
49        f.close()
50
51        # Check we can read the resources
52        res1 = pkgutil.get_data(pkg, 'res.txt')
53        self.assertEqual(res1, RESOURCE_DATA)
54        res2 = pkgutil.get_data(pkg, 'sub/res.txt')
55        self.assertEqual(res2, RESOURCE_DATA)
56
57        del sys.modules[pkg]
58
59    def test_getdata_zipfile(self):
60        zip = 'test_getdata_zipfile.zip'
61        pkg = 'test_getdata_zipfile'
62
63        # Include a LF and a CRLF, to test that binary data is read back
64        RESOURCE_DATA = b'Hello, world!\nSecond line\r\nThird line'
65
66        # Make a package with some resources
67        zip_file = os.path.join(self.dirname, zip)
68        z = zipfile.ZipFile(zip_file, 'w')
69
70        # Empty init.py
71        z.writestr(pkg + '/__init__.py', "")
72        # Resource files, res.txt, sub/res.txt
73        z.writestr(pkg + '/res.txt', RESOURCE_DATA)
74        z.writestr(pkg + '/sub/res.txt', RESOURCE_DATA)
75        z.close()
76
77        # Check we can read the resources
78        sys.path.insert(0, zip_file)
79        res1 = pkgutil.get_data(pkg, 'res.txt')
80        self.assertEqual(res1, RESOURCE_DATA)
81        res2 = pkgutil.get_data(pkg, 'sub/res.txt')
82        self.assertEqual(res2, RESOURCE_DATA)
83
84        names = []
85        for moduleinfo in pkgutil.iter_modules([zip_file]):
86            self.assertIsInstance(moduleinfo, pkgutil.ModuleInfo)
87            names.append(moduleinfo.name)
88        self.assertEqual(names, ['test_getdata_zipfile'])
89
90        del sys.path[0]
91
92        del sys.modules[pkg]
93
94    def test_issue44061_iter_modules(self):
95        #see: issue44061
96        zip = 'test_getdata_zipfile.zip'
97        pkg = 'test_getdata_zipfile'
98
99        # Include a LF and a CRLF, to test that binary data is read back
100        RESOURCE_DATA = b'Hello, world!\nSecond line\r\nThird line'
101
102        # Make a package with some resources
103        zip_file = os.path.join(self.dirname, zip)
104        z = zipfile.ZipFile(zip_file, 'w')
105
106        # Empty init.py
107        z.writestr(pkg + '/__init__.py', "")
108        # Resource files, res.txt
109        z.writestr(pkg + '/res.txt', RESOURCE_DATA)
110        z.close()
111
112        # Check we can read the resources
113        sys.path.insert(0, zip_file)
114        try:
115            res = pkgutil.get_data(pkg, 'res.txt')
116            self.assertEqual(res, RESOURCE_DATA)
117
118            # make sure iter_modules accepts Path objects
119            names = []
120            for moduleinfo in pkgutil.iter_modules([Path(zip_file)]):
121                self.assertIsInstance(moduleinfo, pkgutil.ModuleInfo)
122                names.append(moduleinfo.name)
123            self.assertEqual(names, [pkg])
124        finally:
125            del sys.path[0]
126            sys.modules.pop(pkg, None)
127
128        # assert path must be None or list of paths
129        expected_msg = "path must be None or list of paths to look for modules in"
130        with self.assertRaisesRegex(ValueError, expected_msg):
131            list(pkgutil.iter_modules("invalid_path"))
132
133    def test_unreadable_dir_on_syspath(self):
134        # issue7367 - walk_packages failed if unreadable dir on sys.path
135        package_name = "unreadable_package"
136        d = os.path.join(self.dirname, package_name)
137        # this does not appear to create an unreadable dir on Windows
138        #   but the test should not fail anyway
139        os.mkdir(d, 0)
140        self.addCleanup(os.rmdir, d)
141        for t in pkgutil.walk_packages(path=[self.dirname]):
142            self.fail("unexpected package found")
143
144    def test_walkpackages_filesys(self):
145        pkg1 = 'test_walkpackages_filesys'
146        pkg1_dir = os.path.join(self.dirname, pkg1)
147        os.mkdir(pkg1_dir)
148        f = open(os.path.join(pkg1_dir, '__init__.py'), "wb")
149        f.close()
150        os.mkdir(os.path.join(pkg1_dir, 'sub'))
151        f = open(os.path.join(pkg1_dir, 'sub', '__init__.py'), "wb")
152        f.close()
153        f = open(os.path.join(pkg1_dir, 'sub', 'mod.py'), "wb")
154        f.close()
155
156        # Now, to juice it up, let's add the opposite packages, too.
157        pkg2 = 'sub'
158        pkg2_dir = os.path.join(self.dirname, pkg2)
159        os.mkdir(pkg2_dir)
160        f = open(os.path.join(pkg2_dir, '__init__.py'), "wb")
161        f.close()
162        os.mkdir(os.path.join(pkg2_dir, 'test_walkpackages_filesys'))
163        f = open(os.path.join(pkg2_dir, 'test_walkpackages_filesys', '__init__.py'), "wb")
164        f.close()
165        f = open(os.path.join(pkg2_dir, 'test_walkpackages_filesys', 'mod.py'), "wb")
166        f.close()
167
168        expected = [
169            'sub',
170            'sub.test_walkpackages_filesys',
171            'sub.test_walkpackages_filesys.mod',
172            'test_walkpackages_filesys',
173            'test_walkpackages_filesys.sub',
174            'test_walkpackages_filesys.sub.mod',
175        ]
176        actual= [e[1] for e in pkgutil.walk_packages([self.dirname])]
177        self.assertEqual(actual, expected)
178
179        for pkg in expected:
180            if pkg.endswith('mod'):
181                continue
182            del sys.modules[pkg]
183
184    def test_walkpackages_zipfile(self):
185        """Tests the same as test_walkpackages_filesys, only with a zip file."""
186
187        zip = 'test_walkpackages_zipfile.zip'
188        pkg1 = 'test_walkpackages_zipfile'
189        pkg2 = 'sub'
190
191        zip_file = os.path.join(self.dirname, zip)
192        z = zipfile.ZipFile(zip_file, 'w')
193        z.writestr(pkg2 + '/__init__.py', "")
194        z.writestr(pkg2 + '/' + pkg1 + '/__init__.py', "")
195        z.writestr(pkg2 + '/' + pkg1 + '/mod.py', "")
196        z.writestr(pkg1 + '/__init__.py', "")
197        z.writestr(pkg1 + '/' + pkg2 + '/__init__.py', "")
198        z.writestr(pkg1 + '/' + pkg2 + '/mod.py', "")
199        z.close()
200
201        sys.path.insert(0, zip_file)
202        expected = [
203            'sub',
204            'sub.test_walkpackages_zipfile',
205            'sub.test_walkpackages_zipfile.mod',
206            'test_walkpackages_zipfile',
207            'test_walkpackages_zipfile.sub',
208            'test_walkpackages_zipfile.sub.mod',
209        ]
210        actual= [e[1] for e in pkgutil.walk_packages([zip_file])]
211        self.assertEqual(actual, expected)
212        del sys.path[0]
213
214        for pkg in expected:
215            if pkg.endswith('mod'):
216                continue
217            del sys.modules[pkg]
218
219    def test_walk_packages_raises_on_string_or_bytes_input(self):
220
221        str_input = 'test_dir'
222        with self.assertRaises((TypeError, ValueError)):
223            list(pkgutil.walk_packages(str_input))
224
225        bytes_input = b'test_dir'
226        with self.assertRaises((TypeError, ValueError)):
227            list(pkgutil.walk_packages(bytes_input))
228
229
230class PkgutilPEP302Tests(unittest.TestCase):
231
232    class MyTestLoader(object):
233        def create_module(self, spec):
234            return None
235
236        def exec_module(self, mod):
237            # Count how many times the module is reloaded
238            mod.__dict__['loads'] = mod.__dict__.get('loads', 0) + 1
239
240        def get_data(self, path):
241            return "Hello, world!"
242
243    class MyTestImporter(object):
244        def find_spec(self, fullname, path=None, target=None):
245            loader = PkgutilPEP302Tests.MyTestLoader()
246            return spec_from_file_location(fullname,
247                                           '<%s>' % loader.__class__.__name__,
248                                           loader=loader,
249                                           submodule_search_locations=[])
250
251    def setUp(self):
252        sys.meta_path.insert(0, self.MyTestImporter())
253
254    def tearDown(self):
255        del sys.meta_path[0]
256
257    def test_getdata_pep302(self):
258        # Use a dummy finder/loader
259        self.assertEqual(pkgutil.get_data('foo', 'dummy'), "Hello, world!")
260        del sys.modules['foo']
261
262    def test_alreadyloaded(self):
263        # Ensure that get_data works without reloading - the "loads" module
264        # variable in the example loader should count how many times a reload
265        # occurs.
266        import foo
267        self.assertEqual(foo.loads, 1)
268        self.assertEqual(pkgutil.get_data('foo', 'dummy'), "Hello, world!")
269        self.assertEqual(foo.loads, 1)
270        del sys.modules['foo']
271
272
273# These tests, especially the setup and cleanup, are hideous. They
274# need to be cleaned up once issue 14715 is addressed.
275class ExtendPathTests(unittest.TestCase):
276    def create_init(self, pkgname):
277        dirname = tempfile.mkdtemp()
278        sys.path.insert(0, dirname)
279
280        pkgdir = os.path.join(dirname, pkgname)
281        os.mkdir(pkgdir)
282        with open(os.path.join(pkgdir, '__init__.py'), 'w') as fl:
283            fl.write('from pkgutil import extend_path\n__path__ = extend_path(__path__, __name__)\n')
284
285        return dirname
286
287    def create_submodule(self, dirname, pkgname, submodule_name, value):
288        module_name = os.path.join(dirname, pkgname, submodule_name + '.py')
289        with open(module_name, 'w') as fl:
290            print('value={}'.format(value), file=fl)
291
292    def test_simple(self):
293        pkgname = 'foo'
294        dirname_0 = self.create_init(pkgname)
295        dirname_1 = self.create_init(pkgname)
296        self.create_submodule(dirname_0, pkgname, 'bar', 0)
297        self.create_submodule(dirname_1, pkgname, 'baz', 1)
298        import foo.bar
299        import foo.baz
300        # Ensure we read the expected values
301        self.assertEqual(foo.bar.value, 0)
302        self.assertEqual(foo.baz.value, 1)
303
304        # Ensure the path is set up correctly
305        self.assertEqual(sorted(foo.__path__),
306                         sorted([os.path.join(dirname_0, pkgname),
307                                 os.path.join(dirname_1, pkgname)]))
308
309        # Cleanup
310        shutil.rmtree(dirname_0)
311        shutil.rmtree(dirname_1)
312        del sys.path[0]
313        del sys.path[0]
314        del sys.modules['foo']
315        del sys.modules['foo.bar']
316        del sys.modules['foo.baz']
317
318
319    # Another awful testing hack to be cleaned up once the test_runpy
320    # helpers are factored out to a common location
321    def test_iter_importers(self):
322        iter_importers = pkgutil.iter_importers
323        get_importer = pkgutil.get_importer
324
325        pkgname = 'spam'
326        modname = 'eggs'
327        dirname = self.create_init(pkgname)
328        pathitem = os.path.join(dirname, pkgname)
329        fullname = '{}.{}'.format(pkgname, modname)
330        sys.modules.pop(fullname, None)
331        sys.modules.pop(pkgname, None)
332        try:
333            self.create_submodule(dirname, pkgname, modname, 0)
334
335            importlib.import_module(fullname)
336
337            importers = list(iter_importers(fullname))
338            expected_importer = get_importer(pathitem)
339            for finder in importers:
340                spec = pkgutil._get_spec(finder, fullname)
341                loader = spec.loader
342                try:
343                    loader = loader.loader
344                except AttributeError:
345                    # For now we still allow raw loaders from
346                    # find_module().
347                    pass
348                self.assertIsInstance(finder, importlib.machinery.FileFinder)
349                self.assertEqual(finder, expected_importer)
350                self.assertIsInstance(loader,
351                                      importlib.machinery.SourceFileLoader)
352                self.assertIsNone(pkgutil._get_spec(finder, pkgname))
353
354            with self.assertRaises(ImportError):
355                list(iter_importers('invalid.module'))
356
357            with self.assertRaises(ImportError):
358                list(iter_importers('.spam'))
359        finally:
360            shutil.rmtree(dirname)
361            del sys.path[0]
362            try:
363                del sys.modules['spam']
364                del sys.modules['spam.eggs']
365            except KeyError:
366                pass
367
368
369    def test_mixed_namespace(self):
370        pkgname = 'foo'
371        dirname_0 = self.create_init(pkgname)
372        dirname_1 = self.create_init(pkgname)
373        self.create_submodule(dirname_0, pkgname, 'bar', 0)
374        # Turn this into a PEP 420 namespace package
375        os.unlink(os.path.join(dirname_0, pkgname, '__init__.py'))
376        self.create_submodule(dirname_1, pkgname, 'baz', 1)
377        import foo.bar
378        import foo.baz
379        # Ensure we read the expected values
380        self.assertEqual(foo.bar.value, 0)
381        self.assertEqual(foo.baz.value, 1)
382
383        # Ensure the path is set up correctly
384        self.assertEqual(sorted(foo.__path__),
385                         sorted([os.path.join(dirname_0, pkgname),
386                                 os.path.join(dirname_1, pkgname)]))
387
388        # Cleanup
389        shutil.rmtree(dirname_0)
390        shutil.rmtree(dirname_1)
391        del sys.path[0]
392        del sys.path[0]
393        del sys.modules['foo']
394        del sys.modules['foo.bar']
395        del sys.modules['foo.baz']
396
397    # XXX: test .pkg files
398
399
400class NestedNamespacePackageTest(unittest.TestCase):
401
402    def setUp(self):
403        self.basedir = tempfile.mkdtemp()
404        self.old_path = sys.path[:]
405
406    def tearDown(self):
407        sys.path[:] = self.old_path
408        shutil.rmtree(self.basedir)
409
410    def create_module(self, name, contents):
411        base, final = name.rsplit('.', 1)
412        base_path = os.path.join(self.basedir, base.replace('.', os.path.sep))
413        os.makedirs(base_path, exist_ok=True)
414        with open(os.path.join(base_path, final + ".py"), 'w') as f:
415            f.write(contents)
416
417    def test_nested(self):
418        pkgutil_boilerplate = (
419            'import pkgutil; '
420            '__path__ = pkgutil.extend_path(__path__, __name__)')
421        self.create_module('a.pkg.__init__', pkgutil_boilerplate)
422        self.create_module('b.pkg.__init__', pkgutil_boilerplate)
423        self.create_module('a.pkg.subpkg.__init__', pkgutil_boilerplate)
424        self.create_module('b.pkg.subpkg.__init__', pkgutil_boilerplate)
425        self.create_module('a.pkg.subpkg.c', 'c = 1')
426        self.create_module('b.pkg.subpkg.d', 'd = 2')
427        sys.path.insert(0, os.path.join(self.basedir, 'a'))
428        sys.path.insert(0, os.path.join(self.basedir, 'b'))
429        import pkg
430        self.addCleanup(unload, 'pkg')
431        self.assertEqual(len(pkg.__path__), 2)
432        import pkg.subpkg
433        self.addCleanup(unload, 'pkg.subpkg')
434        self.assertEqual(len(pkg.subpkg.__path__), 2)
435        from pkg.subpkg.c import c
436        from pkg.subpkg.d import d
437        self.assertEqual(c, 1)
438        self.assertEqual(d, 2)
439
440
441class ImportlibMigrationTests(unittest.TestCase):
442    # With full PEP 302 support in the standard import machinery, the
443    # PEP 302 emulation in this module is in the process of being
444    # deprecated in favour of importlib proper
445
446    def check_deprecated(self):
447        return check_warnings(
448            ("This emulation is deprecated, use 'importlib' instead",
449             DeprecationWarning))
450
451    def test_importer_deprecated(self):
452        with self.check_deprecated():
453            pkgutil.ImpImporter("")
454
455    def test_loader_deprecated(self):
456        with self.check_deprecated():
457            pkgutil.ImpLoader("", "", "", "")
458
459    def test_get_loader_avoids_emulation(self):
460        with check_warnings() as w:
461            self.assertIsNotNone(pkgutil.get_loader("sys"))
462            self.assertIsNotNone(pkgutil.get_loader("os"))
463            self.assertIsNotNone(pkgutil.get_loader("test.support"))
464            self.assertEqual(len(w.warnings), 0)
465
466    @unittest.skipIf(__name__ == '__main__', 'not compatible with __main__')
467    def test_get_loader_handles_missing_loader_attribute(self):
468        global __loader__
469        this_loader = __loader__
470        del __loader__
471        try:
472            with check_warnings() as w:
473                self.assertIsNotNone(pkgutil.get_loader(__name__))
474                self.assertEqual(len(w.warnings), 0)
475        finally:
476            __loader__ = this_loader
477
478    def test_get_loader_handles_missing_spec_attribute(self):
479        name = 'spam'
480        mod = type(sys)(name)
481        del mod.__spec__
482        with CleanImport(name):
483            sys.modules[name] = mod
484            loader = pkgutil.get_loader(name)
485        self.assertIsNone(loader)
486
487    def test_get_loader_handles_spec_attribute_none(self):
488        name = 'spam'
489        mod = type(sys)(name)
490        mod.__spec__ = None
491        with CleanImport(name):
492            sys.modules[name] = mod
493            loader = pkgutil.get_loader(name)
494        self.assertIsNone(loader)
495
496    def test_get_loader_None_in_sys_modules(self):
497        name = 'totally bogus'
498        sys.modules[name] = None
499        try:
500            loader = pkgutil.get_loader(name)
501        finally:
502            del sys.modules[name]
503        self.assertIsNone(loader)
504
505    def test_find_loader_missing_module(self):
506        name = 'totally bogus'
507        loader = pkgutil.find_loader(name)
508        self.assertIsNone(loader)
509
510    def test_find_loader_avoids_emulation(self):
511        with check_warnings() as w:
512            self.assertIsNotNone(pkgutil.find_loader("sys"))
513            self.assertIsNotNone(pkgutil.find_loader("os"))
514            self.assertIsNotNone(pkgutil.find_loader("test.support"))
515            self.assertEqual(len(w.warnings), 0)
516
517    def test_get_importer_avoids_emulation(self):
518        # We use an illegal path so *none* of the path hooks should fire
519        with check_warnings() as w:
520            self.assertIsNone(pkgutil.get_importer("*??"))
521            self.assertEqual(len(w.warnings), 0)
522
523    def test_issue44061(self):
524        try:
525            pkgutil.get_importer(Path("/home"))
526        except AttributeError:
527            self.fail("Unexpected AttributeError when calling get_importer")
528
529    def test_iter_importers_avoids_emulation(self):
530        with check_warnings() as w:
531            for importer in pkgutil.iter_importers(): pass
532            self.assertEqual(len(w.warnings), 0)
533
534
535def test_main():
536    run_unittest(PkgutilTests, PkgutilPEP302Tests, ExtendPathTests,
537                 NestedNamespacePackageTest, ImportlibMigrationTests)
538    # this is necessary if test is run repeated (like when finding leaks)
539    import zipimport
540    import importlib
541    zipimport._zip_directory_cache.clear()
542    importlib.invalidate_caches()
543
544
545if __name__ == '__main__':
546    test_main()
547