1from pathlib import Path 2from test.support.import_helper import unload, CleanImport 3from test.support.warnings_helper import check_warnings 4import unittest 5import sys 6import importlib 7from importlib.util import spec_from_file_location 8import pkgutil 9import os 10import os.path 11import tempfile 12import shutil 13import zipfile 14 15# Note: pkgutil.walk_packages is currently tested in test_runpy. This is 16# a hack to get a major issue resolved for 3.3b2. Longer term, it should 17# be moved back here, perhaps by factoring out the helper code for 18# creating interesting package layouts to a separate module. 19# Issue #15348 declares this is indeed a dodgy hack ;) 20 21class PkgutilTests(unittest.TestCase): 22 23 def setUp(self): 24 self.dirname = tempfile.mkdtemp() 25 self.addCleanup(shutil.rmtree, self.dirname) 26 sys.path.insert(0, self.dirname) 27 28 def tearDown(self): 29 del sys.path[0] 30 31 def test_getdata_filesys(self): 32 pkg = 'test_getdata_filesys' 33 34 # Include a LF and a CRLF, to test that binary data is read back 35 RESOURCE_DATA = b'Hello, world!\nSecond line\r\nThird line' 36 37 # Make a package with some resources 38 package_dir = os.path.join(self.dirname, pkg) 39 os.mkdir(package_dir) 40 # Empty init.py 41 f = open(os.path.join(package_dir, '__init__.py'), "wb") 42 f.close() 43 # Resource files, res.txt, sub/res.txt 44 f = open(os.path.join(package_dir, 'res.txt'), "wb") 45 f.write(RESOURCE_DATA) 46 f.close() 47 os.mkdir(os.path.join(package_dir, 'sub')) 48 f = open(os.path.join(package_dir, 'sub', 'res.txt'), "wb") 49 f.write(RESOURCE_DATA) 50 f.close() 51 52 # Check we can read the resources 53 res1 = pkgutil.get_data(pkg, 'res.txt') 54 self.assertEqual(res1, RESOURCE_DATA) 55 res2 = pkgutil.get_data(pkg, 'sub/res.txt') 56 self.assertEqual(res2, RESOURCE_DATA) 57 58 del sys.modules[pkg] 59 60 def test_getdata_zipfile(self): 61 zip = 'test_getdata_zipfile.zip' 62 pkg = 'test_getdata_zipfile' 63 64 # Include a LF and a CRLF, to test that binary data is read back 65 RESOURCE_DATA = b'Hello, world!\nSecond line\r\nThird line' 66 67 # Make a package with some resources 68 zip_file = os.path.join(self.dirname, zip) 69 z = zipfile.ZipFile(zip_file, 'w') 70 71 # Empty init.py 72 z.writestr(pkg + '/__init__.py', "") 73 # Resource files, res.txt, sub/res.txt 74 z.writestr(pkg + '/res.txt', RESOURCE_DATA) 75 z.writestr(pkg + '/sub/res.txt', RESOURCE_DATA) 76 z.close() 77 78 # Check we can read the resources 79 sys.path.insert(0, zip_file) 80 res1 = pkgutil.get_data(pkg, 'res.txt') 81 self.assertEqual(res1, RESOURCE_DATA) 82 res2 = pkgutil.get_data(pkg, 'sub/res.txt') 83 self.assertEqual(res2, RESOURCE_DATA) 84 85 names = [] 86 for moduleinfo in pkgutil.iter_modules([zip_file]): 87 self.assertIsInstance(moduleinfo, pkgutil.ModuleInfo) 88 names.append(moduleinfo.name) 89 self.assertEqual(names, ['test_getdata_zipfile']) 90 91 del sys.path[0] 92 93 del sys.modules[pkg] 94 95 def test_issue44061_iter_modules(self): 96 #see: issue44061 97 zip = 'test_getdata_zipfile.zip' 98 pkg = 'test_getdata_zipfile' 99 100 # Include a LF and a CRLF, to test that binary data is read back 101 RESOURCE_DATA = b'Hello, world!\nSecond line\r\nThird line' 102 103 # Make a package with some resources 104 zip_file = os.path.join(self.dirname, zip) 105 z = zipfile.ZipFile(zip_file, 'w') 106 107 # Empty init.py 108 z.writestr(pkg + '/__init__.py', "") 109 # Resource files, res.txt 110 z.writestr(pkg + '/res.txt', RESOURCE_DATA) 111 z.close() 112 113 # Check we can read the resources 114 sys.path.insert(0, zip_file) 115 try: 116 res = pkgutil.get_data(pkg, 'res.txt') 117 self.assertEqual(res, RESOURCE_DATA) 118 119 # make sure iter_modules accepts Path objects 120 names = [] 121 for moduleinfo in pkgutil.iter_modules([Path(zip_file)]): 122 self.assertIsInstance(moduleinfo, pkgutil.ModuleInfo) 123 names.append(moduleinfo.name) 124 self.assertEqual(names, [pkg]) 125 finally: 126 del sys.path[0] 127 sys.modules.pop(pkg, None) 128 129 # assert path must be None or list of paths 130 expected_msg = "path must be None or list of paths to look for modules in" 131 with self.assertRaisesRegex(ValueError, expected_msg): 132 list(pkgutil.iter_modules("invalid_path")) 133 134 def test_unreadable_dir_on_syspath(self): 135 # issue7367 - walk_packages failed if unreadable dir on sys.path 136 package_name = "unreadable_package" 137 d = os.path.join(self.dirname, package_name) 138 # this does not appear to create an unreadable dir on Windows 139 # but the test should not fail anyway 140 os.mkdir(d, 0) 141 self.addCleanup(os.rmdir, d) 142 for t in pkgutil.walk_packages(path=[self.dirname]): 143 self.fail("unexpected package found") 144 145 def test_walkpackages_filesys(self): 146 pkg1 = 'test_walkpackages_filesys' 147 pkg1_dir = os.path.join(self.dirname, pkg1) 148 os.mkdir(pkg1_dir) 149 f = open(os.path.join(pkg1_dir, '__init__.py'), "wb") 150 f.close() 151 os.mkdir(os.path.join(pkg1_dir, 'sub')) 152 f = open(os.path.join(pkg1_dir, 'sub', '__init__.py'), "wb") 153 f.close() 154 f = open(os.path.join(pkg1_dir, 'sub', 'mod.py'), "wb") 155 f.close() 156 157 # Now, to juice it up, let's add the opposite packages, too. 158 pkg2 = 'sub' 159 pkg2_dir = os.path.join(self.dirname, pkg2) 160 os.mkdir(pkg2_dir) 161 f = open(os.path.join(pkg2_dir, '__init__.py'), "wb") 162 f.close() 163 os.mkdir(os.path.join(pkg2_dir, 'test_walkpackages_filesys')) 164 f = open(os.path.join(pkg2_dir, 'test_walkpackages_filesys', '__init__.py'), "wb") 165 f.close() 166 f = open(os.path.join(pkg2_dir, 'test_walkpackages_filesys', 'mod.py'), "wb") 167 f.close() 168 169 expected = [ 170 'sub', 171 'sub.test_walkpackages_filesys', 172 'sub.test_walkpackages_filesys.mod', 173 'test_walkpackages_filesys', 174 'test_walkpackages_filesys.sub', 175 'test_walkpackages_filesys.sub.mod', 176 ] 177 actual= [e[1] for e in pkgutil.walk_packages([self.dirname])] 178 self.assertEqual(actual, expected) 179 180 for pkg in expected: 181 if pkg.endswith('mod'): 182 continue 183 del sys.modules[pkg] 184 185 def test_walkpackages_zipfile(self): 186 """Tests the same as test_walkpackages_filesys, only with a zip file.""" 187 188 zip = 'test_walkpackages_zipfile.zip' 189 pkg1 = 'test_walkpackages_zipfile' 190 pkg2 = 'sub' 191 192 zip_file = os.path.join(self.dirname, zip) 193 z = zipfile.ZipFile(zip_file, 'w') 194 z.writestr(pkg2 + '/__init__.py', "") 195 z.writestr(pkg2 + '/' + pkg1 + '/__init__.py', "") 196 z.writestr(pkg2 + '/' + pkg1 + '/mod.py', "") 197 z.writestr(pkg1 + '/__init__.py', "") 198 z.writestr(pkg1 + '/' + pkg2 + '/__init__.py', "") 199 z.writestr(pkg1 + '/' + pkg2 + '/mod.py', "") 200 z.close() 201 202 sys.path.insert(0, zip_file) 203 expected = [ 204 'sub', 205 'sub.test_walkpackages_zipfile', 206 'sub.test_walkpackages_zipfile.mod', 207 'test_walkpackages_zipfile', 208 'test_walkpackages_zipfile.sub', 209 'test_walkpackages_zipfile.sub.mod', 210 ] 211 actual= [e[1] for e in pkgutil.walk_packages([zip_file])] 212 self.assertEqual(actual, expected) 213 del sys.path[0] 214 215 for pkg in expected: 216 if pkg.endswith('mod'): 217 continue 218 del sys.modules[pkg] 219 220 def test_walk_packages_raises_on_string_or_bytes_input(self): 221 222 str_input = 'test_dir' 223 with self.assertRaises((TypeError, ValueError)): 224 list(pkgutil.walk_packages(str_input)) 225 226 bytes_input = b'test_dir' 227 with self.assertRaises((TypeError, ValueError)): 228 list(pkgutil.walk_packages(bytes_input)) 229 230 def test_name_resolution(self): 231 import logging 232 import logging.handlers 233 234 success_cases = ( 235 ('os', os), 236 ('os.path', os.path), 237 ('os.path:pathsep', os.path.pathsep), 238 ('logging', logging), 239 ('logging:', logging), 240 ('logging.handlers', logging.handlers), 241 ('logging.handlers:', logging.handlers), 242 ('logging.handlers:SysLogHandler', logging.handlers.SysLogHandler), 243 ('logging.handlers.SysLogHandler', logging.handlers.SysLogHandler), 244 ('logging.handlers:SysLogHandler.LOG_ALERT', 245 logging.handlers.SysLogHandler.LOG_ALERT), 246 ('logging.handlers.SysLogHandler.LOG_ALERT', 247 logging.handlers.SysLogHandler.LOG_ALERT), 248 ('builtins.int', int), 249 ('builtins:int', int), 250 ('builtins.int.from_bytes', int.from_bytes), 251 ('builtins:int.from_bytes', int.from_bytes), 252 ('builtins.ZeroDivisionError', ZeroDivisionError), 253 ('builtins:ZeroDivisionError', ZeroDivisionError), 254 ('os:path', os.path), 255 ) 256 257 failure_cases = ( 258 (None, TypeError), 259 (1, TypeError), 260 (2.0, TypeError), 261 (True, TypeError), 262 ('', ValueError), 263 ('?abc', ValueError), 264 ('abc/foo', ValueError), 265 ('foo', ImportError), 266 ('os.foo', AttributeError), 267 ('os.foo:', ImportError), 268 ('os.pth:pathsep', ImportError), 269 ('logging.handlers:NoSuchHandler', AttributeError), 270 ('logging.handlers:SysLogHandler.NO_SUCH_VALUE', AttributeError), 271 ('logging.handlers.SysLogHandler.NO_SUCH_VALUE', AttributeError), 272 ('ZeroDivisionError', ImportError), 273 ('os.path.9abc', ValueError), 274 ('9abc', ValueError), 275 ) 276 277 # add some Unicode package names to the mix. 278 279 unicode_words = ('\u0935\u092e\u0938', 280 '\xe9', '\xc8', 281 '\uc548\ub155\ud558\uc138\uc694', 282 '\u3055\u3088\u306a\u3089', 283 '\u3042\u308a\u304c\u3068\u3046', 284 '\u0425\u043e\u0440\u043e\u0448\u043e', 285 '\u0441\u043f\u0430\u0441\u0438\u0431\u043e', 286 '\u73b0\u4ee3\u6c49\u8bed\u5e38\u7528\u5b57\u8868') 287 288 for uw in unicode_words: 289 d = os.path.join(self.dirname, uw) 290 try: 291 os.makedirs(d, exist_ok=True) 292 except UnicodeEncodeError: 293 # When filesystem encoding cannot encode uw: skip this test 294 continue 295 # make an empty __init__.py file 296 f = os.path.join(d, '__init__.py') 297 with open(f, 'w') as f: 298 f.write('') 299 f.flush() 300 # now import the package we just created; clearing the caches is 301 # needed, otherwise the newly created package isn't found 302 importlib.invalidate_caches() 303 mod = importlib.import_module(uw) 304 success_cases += (uw, mod), 305 if len(uw) > 1: 306 failure_cases += (uw[:-1], ImportError), 307 308 # add an example with a Unicode digit at the start 309 failure_cases += ('\u0966\u0935\u092e\u0938', ValueError), 310 311 for s, expected in success_cases: 312 with self.subTest(s=s): 313 o = pkgutil.resolve_name(s) 314 self.assertEqual(o, expected) 315 316 for s, exc in failure_cases: 317 with self.subTest(s=s): 318 with self.assertRaises(exc): 319 pkgutil.resolve_name(s) 320 321 322class PkgutilPEP302Tests(unittest.TestCase): 323 324 class MyTestLoader(object): 325 def create_module(self, spec): 326 return None 327 328 def exec_module(self, mod): 329 # Count how many times the module is reloaded 330 mod.__dict__['loads'] = mod.__dict__.get('loads', 0) + 1 331 332 def get_data(self, path): 333 return "Hello, world!" 334 335 class MyTestImporter(object): 336 def find_spec(self, fullname, path=None, target=None): 337 loader = PkgutilPEP302Tests.MyTestLoader() 338 return spec_from_file_location(fullname, 339 '<%s>' % loader.__class__.__name__, 340 loader=loader, 341 submodule_search_locations=[]) 342 343 def setUp(self): 344 sys.meta_path.insert(0, self.MyTestImporter()) 345 346 def tearDown(self): 347 del sys.meta_path[0] 348 349 def test_getdata_pep302(self): 350 # Use a dummy finder/loader 351 self.assertEqual(pkgutil.get_data('foo', 'dummy'), "Hello, world!") 352 del sys.modules['foo'] 353 354 def test_alreadyloaded(self): 355 # Ensure that get_data works without reloading - the "loads" module 356 # variable in the example loader should count how many times a reload 357 # occurs. 358 import foo 359 self.assertEqual(foo.loads, 1) 360 self.assertEqual(pkgutil.get_data('foo', 'dummy'), "Hello, world!") 361 self.assertEqual(foo.loads, 1) 362 del sys.modules['foo'] 363 364 365# These tests, especially the setup and cleanup, are hideous. They 366# need to be cleaned up once issue 14715 is addressed. 367class ExtendPathTests(unittest.TestCase): 368 def create_init(self, pkgname): 369 dirname = tempfile.mkdtemp() 370 sys.path.insert(0, dirname) 371 372 pkgdir = os.path.join(dirname, pkgname) 373 os.mkdir(pkgdir) 374 with open(os.path.join(pkgdir, '__init__.py'), 'w') as fl: 375 fl.write('from pkgutil import extend_path\n__path__ = extend_path(__path__, __name__)\n') 376 377 return dirname 378 379 def create_submodule(self, dirname, pkgname, submodule_name, value): 380 module_name = os.path.join(dirname, pkgname, submodule_name + '.py') 381 with open(module_name, 'w') as fl: 382 print('value={}'.format(value), file=fl) 383 384 def test_simple(self): 385 pkgname = 'foo' 386 dirname_0 = self.create_init(pkgname) 387 dirname_1 = self.create_init(pkgname) 388 self.create_submodule(dirname_0, pkgname, 'bar', 0) 389 self.create_submodule(dirname_1, pkgname, 'baz', 1) 390 import foo.bar 391 import foo.baz 392 # Ensure we read the expected values 393 self.assertEqual(foo.bar.value, 0) 394 self.assertEqual(foo.baz.value, 1) 395 396 # Ensure the path is set up correctly 397 self.assertEqual(sorted(foo.__path__), 398 sorted([os.path.join(dirname_0, pkgname), 399 os.path.join(dirname_1, pkgname)])) 400 401 # Cleanup 402 shutil.rmtree(dirname_0) 403 shutil.rmtree(dirname_1) 404 del sys.path[0] 405 del sys.path[0] 406 del sys.modules['foo'] 407 del sys.modules['foo.bar'] 408 del sys.modules['foo.baz'] 409 410 411 # Another awful testing hack to be cleaned up once the test_runpy 412 # helpers are factored out to a common location 413 def test_iter_importers(self): 414 iter_importers = pkgutil.iter_importers 415 get_importer = pkgutil.get_importer 416 417 pkgname = 'spam' 418 modname = 'eggs' 419 dirname = self.create_init(pkgname) 420 pathitem = os.path.join(dirname, pkgname) 421 fullname = '{}.{}'.format(pkgname, modname) 422 sys.modules.pop(fullname, None) 423 sys.modules.pop(pkgname, None) 424 try: 425 self.create_submodule(dirname, pkgname, modname, 0) 426 427 importlib.import_module(fullname) 428 429 importers = list(iter_importers(fullname)) 430 expected_importer = get_importer(pathitem) 431 for finder in importers: 432 spec = pkgutil._get_spec(finder, fullname) 433 loader = spec.loader 434 try: 435 loader = loader.loader 436 except AttributeError: 437 # For now we still allow raw loaders from 438 # find_module(). 439 pass 440 self.assertIsInstance(finder, importlib.machinery.FileFinder) 441 self.assertEqual(finder, expected_importer) 442 self.assertIsInstance(loader, 443 importlib.machinery.SourceFileLoader) 444 self.assertIsNone(pkgutil._get_spec(finder, pkgname)) 445 446 with self.assertRaises(ImportError): 447 list(iter_importers('invalid.module')) 448 449 with self.assertRaises(ImportError): 450 list(iter_importers('.spam')) 451 finally: 452 shutil.rmtree(dirname) 453 del sys.path[0] 454 try: 455 del sys.modules['spam'] 456 del sys.modules['spam.eggs'] 457 except KeyError: 458 pass 459 460 461 def test_mixed_namespace(self): 462 pkgname = 'foo' 463 dirname_0 = self.create_init(pkgname) 464 dirname_1 = self.create_init(pkgname) 465 self.create_submodule(dirname_0, pkgname, 'bar', 0) 466 # Turn this into a PEP 420 namespace package 467 os.unlink(os.path.join(dirname_0, pkgname, '__init__.py')) 468 self.create_submodule(dirname_1, pkgname, 'baz', 1) 469 import foo.bar 470 import foo.baz 471 # Ensure we read the expected values 472 self.assertEqual(foo.bar.value, 0) 473 self.assertEqual(foo.baz.value, 1) 474 475 # Ensure the path is set up correctly 476 self.assertEqual(sorted(foo.__path__), 477 sorted([os.path.join(dirname_0, pkgname), 478 os.path.join(dirname_1, pkgname)])) 479 480 # Cleanup 481 shutil.rmtree(dirname_0) 482 shutil.rmtree(dirname_1) 483 del sys.path[0] 484 del sys.path[0] 485 del sys.modules['foo'] 486 del sys.modules['foo.bar'] 487 del sys.modules['foo.baz'] 488 489 # XXX: test .pkg files 490 491 492class NestedNamespacePackageTest(unittest.TestCase): 493 494 def setUp(self): 495 self.basedir = tempfile.mkdtemp() 496 self.old_path = sys.path[:] 497 498 def tearDown(self): 499 sys.path[:] = self.old_path 500 shutil.rmtree(self.basedir) 501 502 def create_module(self, name, contents): 503 base, final = name.rsplit('.', 1) 504 base_path = os.path.join(self.basedir, base.replace('.', os.path.sep)) 505 os.makedirs(base_path, exist_ok=True) 506 with open(os.path.join(base_path, final + ".py"), 'w') as f: 507 f.write(contents) 508 509 def test_nested(self): 510 pkgutil_boilerplate = ( 511 'import pkgutil; ' 512 '__path__ = pkgutil.extend_path(__path__, __name__)') 513 self.create_module('a.pkg.__init__', pkgutil_boilerplate) 514 self.create_module('b.pkg.__init__', pkgutil_boilerplate) 515 self.create_module('a.pkg.subpkg.__init__', pkgutil_boilerplate) 516 self.create_module('b.pkg.subpkg.__init__', pkgutil_boilerplate) 517 self.create_module('a.pkg.subpkg.c', 'c = 1') 518 self.create_module('b.pkg.subpkg.d', 'd = 2') 519 sys.path.insert(0, os.path.join(self.basedir, 'a')) 520 sys.path.insert(0, os.path.join(self.basedir, 'b')) 521 import pkg 522 self.addCleanup(unload, 'pkg') 523 self.assertEqual(len(pkg.__path__), 2) 524 import pkg.subpkg 525 self.addCleanup(unload, 'pkg.subpkg') 526 self.assertEqual(len(pkg.subpkg.__path__), 2) 527 from pkg.subpkg.c import c 528 from pkg.subpkg.d import d 529 self.assertEqual(c, 1) 530 self.assertEqual(d, 2) 531 532 533class ImportlibMigrationTests(unittest.TestCase): 534 # With full PEP 302 support in the standard import machinery, the 535 # PEP 302 emulation in this module is in the process of being 536 # deprecated in favour of importlib proper 537 538 def check_deprecated(self): 539 return check_warnings( 540 ("This emulation is deprecated and slated for removal in " 541 "Python 3.12; use 'importlib' instead", 542 DeprecationWarning)) 543 544 def test_importer_deprecated(self): 545 with self.check_deprecated(): 546 pkgutil.ImpImporter("") 547 548 def test_loader_deprecated(self): 549 with self.check_deprecated(): 550 pkgutil.ImpLoader("", "", "", "") 551 552 def test_get_loader_avoids_emulation(self): 553 with check_warnings() as w: 554 self.assertIsNotNone(pkgutil.get_loader("sys")) 555 self.assertIsNotNone(pkgutil.get_loader("os")) 556 self.assertIsNotNone(pkgutil.get_loader("test.support")) 557 self.assertEqual(len(w.warnings), 0) 558 559 @unittest.skipIf(__name__ == '__main__', 'not compatible with __main__') 560 def test_get_loader_handles_missing_loader_attribute(self): 561 global __loader__ 562 this_loader = __loader__ 563 del __loader__ 564 try: 565 with check_warnings() as w: 566 self.assertIsNotNone(pkgutil.get_loader(__name__)) 567 self.assertEqual(len(w.warnings), 0) 568 finally: 569 __loader__ = this_loader 570 571 def test_get_loader_handles_missing_spec_attribute(self): 572 name = 'spam' 573 mod = type(sys)(name) 574 del mod.__spec__ 575 with CleanImport(name): 576 sys.modules[name] = mod 577 loader = pkgutil.get_loader(name) 578 self.assertIsNone(loader) 579 580 def test_get_loader_handles_spec_attribute_none(self): 581 name = 'spam' 582 mod = type(sys)(name) 583 mod.__spec__ = None 584 with CleanImport(name): 585 sys.modules[name] = mod 586 loader = pkgutil.get_loader(name) 587 self.assertIsNone(loader) 588 589 def test_get_loader_None_in_sys_modules(self): 590 name = 'totally bogus' 591 sys.modules[name] = None 592 try: 593 loader = pkgutil.get_loader(name) 594 finally: 595 del sys.modules[name] 596 self.assertIsNone(loader) 597 598 def test_find_loader_missing_module(self): 599 name = 'totally bogus' 600 loader = pkgutil.find_loader(name) 601 self.assertIsNone(loader) 602 603 def test_find_loader_avoids_emulation(self): 604 with check_warnings() as w: 605 self.assertIsNotNone(pkgutil.find_loader("sys")) 606 self.assertIsNotNone(pkgutil.find_loader("os")) 607 self.assertIsNotNone(pkgutil.find_loader("test.support")) 608 self.assertEqual(len(w.warnings), 0) 609 610 def test_get_importer_avoids_emulation(self): 611 # We use an illegal path so *none* of the path hooks should fire 612 with check_warnings() as w: 613 self.assertIsNone(pkgutil.get_importer("*??")) 614 self.assertEqual(len(w.warnings), 0) 615 616 def test_issue44061(self): 617 try: 618 pkgutil.get_importer(Path("/home")) 619 except AttributeError: 620 self.fail("Unexpected AttributeError when calling get_importer") 621 622 def test_iter_importers_avoids_emulation(self): 623 with check_warnings() as w: 624 for importer in pkgutil.iter_importers(): pass 625 self.assertEqual(len(w.warnings), 0) 626 627 628def tearDownModule(): 629 # this is necessary if test is run repeated (like when finding leaks) 630 import zipimport 631 import importlib 632 zipimport._zip_directory_cache.clear() 633 importlib.invalidate_caches() 634 635 636if __name__ == '__main__': 637 unittest.main() 638