1import sys 2import tempfile 3import os 4import zipfile 5import datetime 6import time 7import subprocess 8import stat 9import distutils.dist 10import distutils.command.install_egg_info 11 12try: 13 from unittest import mock 14except ImportError: 15 import mock 16 17from pkg_resources import ( 18 DistInfoDistribution, Distribution, EggInfoDistribution, 19) 20 21import pytest 22 23import pkg_resources 24 25 26def timestamp(dt): 27 """ 28 Return a timestamp for a local, naive datetime instance. 29 """ 30 try: 31 return dt.timestamp() 32 except AttributeError: 33 # Python 3.2 and earlier 34 return time.mktime(dt.timetuple()) 35 36 37class EggRemover(str): 38 def __call__(self): 39 if self in sys.path: 40 sys.path.remove(self) 41 if os.path.exists(self): 42 os.remove(self) 43 44 45class TestZipProvider: 46 finalizers = [] 47 48 ref_time = datetime.datetime(2013, 5, 12, 13, 25, 0) 49 "A reference time for a file modification" 50 51 @classmethod 52 def setup_class(cls): 53 "create a zip egg and add it to sys.path" 54 egg = tempfile.NamedTemporaryFile(suffix='.egg', delete=False) 55 zip_egg = zipfile.ZipFile(egg, 'w') 56 zip_info = zipfile.ZipInfo() 57 zip_info.filename = 'mod.py' 58 zip_info.date_time = cls.ref_time.timetuple() 59 zip_egg.writestr(zip_info, 'x = 3\n') 60 zip_info = zipfile.ZipInfo() 61 zip_info.filename = 'data.dat' 62 zip_info.date_time = cls.ref_time.timetuple() 63 zip_egg.writestr(zip_info, 'hello, world!') 64 zip_info = zipfile.ZipInfo() 65 zip_info.filename = 'subdir/mod2.py' 66 zip_info.date_time = cls.ref_time.timetuple() 67 zip_egg.writestr(zip_info, 'x = 6\n') 68 zip_info = zipfile.ZipInfo() 69 zip_info.filename = 'subdir/data2.dat' 70 zip_info.date_time = cls.ref_time.timetuple() 71 zip_egg.writestr(zip_info, 'goodbye, world!') 72 zip_egg.close() 73 egg.close() 74 75 sys.path.append(egg.name) 76 subdir = os.path.join(egg.name, 'subdir') 77 sys.path.append(subdir) 78 cls.finalizers.append(EggRemover(subdir)) 79 cls.finalizers.append(EggRemover(egg.name)) 80 81 @classmethod 82 def teardown_class(cls): 83 for finalizer in cls.finalizers: 84 finalizer() 85 86 def test_resource_listdir(self): 87 import mod 88 zp = pkg_resources.ZipProvider(mod) 89 90 expected_root = ['data.dat', 'mod.py', 'subdir'] 91 assert sorted(zp.resource_listdir('')) == expected_root 92 93 expected_subdir = ['data2.dat', 'mod2.py'] 94 assert sorted(zp.resource_listdir('subdir')) == expected_subdir 95 assert sorted(zp.resource_listdir('subdir/')) == expected_subdir 96 97 assert zp.resource_listdir('nonexistent') == [] 98 assert zp.resource_listdir('nonexistent/') == [] 99 100 import mod2 101 zp2 = pkg_resources.ZipProvider(mod2) 102 103 assert sorted(zp2.resource_listdir('')) == expected_subdir 104 105 assert zp2.resource_listdir('subdir') == [] 106 assert zp2.resource_listdir('subdir/') == [] 107 108 def test_resource_filename_rewrites_on_change(self): 109 """ 110 If a previous call to get_resource_filename has saved the file, but 111 the file has been subsequently mutated with different file of the 112 same size and modification time, it should not be overwritten on a 113 subsequent call to get_resource_filename. 114 """ 115 import mod 116 manager = pkg_resources.ResourceManager() 117 zp = pkg_resources.ZipProvider(mod) 118 filename = zp.get_resource_filename(manager, 'data.dat') 119 actual = datetime.datetime.fromtimestamp(os.stat(filename).st_mtime) 120 assert actual == self.ref_time 121 f = open(filename, 'w') 122 f.write('hello, world?') 123 f.close() 124 ts = timestamp(self.ref_time) 125 os.utime(filename, (ts, ts)) 126 filename = zp.get_resource_filename(manager, 'data.dat') 127 with open(filename) as f: 128 assert f.read() == 'hello, world!' 129 manager.cleanup_resources() 130 131 132class TestResourceManager: 133 def test_get_cache_path(self): 134 mgr = pkg_resources.ResourceManager() 135 path = mgr.get_cache_path('foo') 136 type_ = str(type(path)) 137 message = "Unexpected type from get_cache_path: " + type_ 138 assert isinstance(path, str), message 139 140 def test_get_cache_path_race(self, tmpdir): 141 # Patch to os.path.isdir to create a race condition 142 def patched_isdir(dirname, unpatched_isdir=pkg_resources.isdir): 143 patched_isdir.dirnames.append(dirname) 144 145 was_dir = unpatched_isdir(dirname) 146 if not was_dir: 147 os.makedirs(dirname) 148 return was_dir 149 150 patched_isdir.dirnames = [] 151 152 # Get a cache path with a "race condition" 153 mgr = pkg_resources.ResourceManager() 154 mgr.set_extraction_path(str(tmpdir)) 155 156 archive_name = os.sep.join(('foo', 'bar', 'baz')) 157 with mock.patch.object(pkg_resources, 'isdir', new=patched_isdir): 158 mgr.get_cache_path(archive_name) 159 160 # Because this test relies on the implementation details of this 161 # function, these assertions are a sentinel to ensure that the 162 # test suite will not fail silently if the implementation changes. 163 called_dirnames = patched_isdir.dirnames 164 assert len(called_dirnames) == 2 165 assert called_dirnames[0].split(os.sep)[-2:] == ['foo', 'bar'] 166 assert called_dirnames[1].split(os.sep)[-1:] == ['foo'] 167 168 """ 169 Tests to ensure that pkg_resources runs independently from setuptools. 170 """ 171 172 def test_setuptools_not_imported(self): 173 """ 174 In a separate Python environment, import pkg_resources and assert 175 that action doesn't cause setuptools to be imported. 176 """ 177 lines = ( 178 'import pkg_resources', 179 'import sys', 180 ( 181 'assert "setuptools" not in sys.modules, ' 182 '"setuptools was imported"' 183 ), 184 ) 185 cmd = [sys.executable, '-c', '; '.join(lines)] 186 subprocess.check_call(cmd) 187 188 189def make_test_distribution(metadata_path, metadata): 190 """ 191 Make a test Distribution object, and return it. 192 193 :param metadata_path: the path to the metadata file that should be 194 created. This should be inside a distribution directory that should 195 also be created. For example, an argument value might end with 196 "<project>.dist-info/METADATA". 197 :param metadata: the desired contents of the metadata file, as bytes. 198 """ 199 dist_dir = os.path.dirname(metadata_path) 200 os.mkdir(dist_dir) 201 with open(metadata_path, 'wb') as f: 202 f.write(metadata) 203 dists = list(pkg_resources.distributions_from_metadata(dist_dir)) 204 dist, = dists 205 206 return dist 207 208 209def test_get_metadata__bad_utf8(tmpdir): 210 """ 211 Test a metadata file with bytes that can't be decoded as utf-8. 212 """ 213 filename = 'METADATA' 214 # Convert the tmpdir LocalPath object to a string before joining. 215 metadata_path = os.path.join(str(tmpdir), 'foo.dist-info', filename) 216 # Encode a non-ascii string with the wrong encoding (not utf-8). 217 metadata = 'née'.encode('iso-8859-1') 218 dist = make_test_distribution(metadata_path, metadata=metadata) 219 220 with pytest.raises(UnicodeDecodeError) as excinfo: 221 dist.get_metadata(filename) 222 223 exc = excinfo.value 224 actual = str(exc) 225 expected = ( 226 # The error message starts with "'utf-8' codec ..." However, the 227 # spelling of "utf-8" can vary (e.g. "utf8") so we don't include it 228 "codec can't decode byte 0xe9 in position 1: " 229 'invalid continuation byte in METADATA file at path: ' 230 ) 231 assert expected in actual, 'actual: {}'.format(actual) 232 assert actual.endswith(metadata_path), 'actual: {}'.format(actual) 233 234 235def make_distribution_no_version(tmpdir, basename): 236 """ 237 Create a distribution directory with no file containing the version. 238 """ 239 dist_dir = tmpdir / basename 240 dist_dir.ensure_dir() 241 # Make the directory non-empty so distributions_from_metadata() 242 # will detect it and yield it. 243 dist_dir.join('temp.txt').ensure() 244 245 if sys.version_info < (3, 6): 246 dist_dir = str(dist_dir) 247 248 dists = list(pkg_resources.distributions_from_metadata(dist_dir)) 249 assert len(dists) == 1 250 dist, = dists 251 252 return dist, dist_dir 253 254 255@pytest.mark.parametrize( 256 'suffix, expected_filename, expected_dist_type', 257 [ 258 ('egg-info', 'PKG-INFO', EggInfoDistribution), 259 ('dist-info', 'METADATA', DistInfoDistribution), 260 ], 261) 262def test_distribution_version_missing( 263 tmpdir, suffix, expected_filename, expected_dist_type): 264 """ 265 Test Distribution.version when the "Version" header is missing. 266 """ 267 basename = 'foo.{}'.format(suffix) 268 dist, dist_dir = make_distribution_no_version(tmpdir, basename) 269 270 expected_text = ( 271 "Missing 'Version:' header and/or {} file at path: " 272 ).format(expected_filename) 273 metadata_path = os.path.join(dist_dir, expected_filename) 274 275 # Now check the exception raised when the "version" attribute is accessed. 276 with pytest.raises(ValueError) as excinfo: 277 dist.version 278 279 err = str(excinfo.value) 280 # Include a string expression after the assert so the full strings 281 # will be visible for inspection on failure. 282 assert expected_text in err, str((expected_text, err)) 283 284 # Also check the args passed to the ValueError. 285 msg, dist = excinfo.value.args 286 assert expected_text in msg 287 # Check that the message portion contains the path. 288 assert metadata_path in msg, str((metadata_path, msg)) 289 assert type(dist) == expected_dist_type 290 291 292def test_distribution_version_missing_undetected_path(): 293 """ 294 Test Distribution.version when the "Version" header is missing and 295 the path can't be detected. 296 """ 297 # Create a Distribution object with no metadata argument, which results 298 # in an empty metadata provider. 299 dist = Distribution('/foo') 300 with pytest.raises(ValueError) as excinfo: 301 dist.version 302 303 msg, dist = excinfo.value.args 304 expected = ( 305 "Missing 'Version:' header and/or PKG-INFO file at path: " 306 '[could not detect]' 307 ) 308 assert msg == expected 309 310 311@pytest.mark.parametrize('only', [False, True]) 312def test_dist_info_is_not_dir(tmp_path, only): 313 """Test path containing a file with dist-info extension.""" 314 dist_info = tmp_path / 'foobar.dist-info' 315 dist_info.touch() 316 assert not pkg_resources.dist_factory(str(tmp_path), str(dist_info), only) 317 318 319class TestDeepVersionLookupDistutils: 320 @pytest.fixture 321 def env(self, tmpdir): 322 """ 323 Create a package environment, similar to a virtualenv, 324 in which packages are installed. 325 """ 326 327 class Environment(str): 328 pass 329 330 env = Environment(tmpdir) 331 tmpdir.chmod(stat.S_IRWXU) 332 subs = 'home', 'lib', 'scripts', 'data', 'egg-base' 333 env.paths = dict( 334 (dirname, str(tmpdir / dirname)) 335 for dirname in subs 336 ) 337 list(map(os.mkdir, env.paths.values())) 338 return env 339 340 def create_foo_pkg(self, env, version): 341 """ 342 Create a foo package installed (distutils-style) to env.paths['lib'] 343 as version. 344 """ 345 ld = "This package has unicode metadata! ❄" 346 attrs = dict(name='foo', version=version, long_description=ld) 347 dist = distutils.dist.Distribution(attrs) 348 iei_cmd = distutils.command.install_egg_info.install_egg_info(dist) 349 iei_cmd.initialize_options() 350 iei_cmd.install_dir = env.paths['lib'] 351 iei_cmd.finalize_options() 352 iei_cmd.run() 353 354 def test_version_resolved_from_egg_info(self, env): 355 version = '1.11.0.dev0+2329eae' 356 self.create_foo_pkg(env, version) 357 358 # this requirement parsing will raise a VersionConflict unless the 359 # .egg-info file is parsed (see #419 on BitBucket) 360 req = pkg_resources.Requirement.parse('foo>=1.9') 361 dist = pkg_resources.WorkingSet([env.paths['lib']]).find(req) 362 assert dist.version == version 363 364 @pytest.mark.parametrize( 365 'unnormalized, normalized', 366 [ 367 ('foo', 'foo'), 368 ('foo/', 'foo'), 369 ('foo/bar', 'foo/bar'), 370 ('foo/bar/', 'foo/bar'), 371 ], 372 ) 373 def test_normalize_path_trailing_sep(self, unnormalized, normalized): 374 """Ensure the trailing slash is cleaned for path comparison. 375 376 See pypa/setuptools#1519. 377 """ 378 result_from_unnormalized = pkg_resources.normalize_path(unnormalized) 379 result_from_normalized = pkg_resources.normalize_path(normalized) 380 assert result_from_unnormalized == result_from_normalized 381 382 @pytest.mark.skipif( 383 os.path.normcase('A') != os.path.normcase('a'), 384 reason='Testing case-insensitive filesystems.', 385 ) 386 @pytest.mark.parametrize( 387 'unnormalized, normalized', 388 [ 389 ('MiXeD/CasE', 'mixed/case'), 390 ], 391 ) 392 def test_normalize_path_normcase(self, unnormalized, normalized): 393 """Ensure mixed case is normalized on case-insensitive filesystems. 394 """ 395 result_from_unnormalized = pkg_resources.normalize_path(unnormalized) 396 result_from_normalized = pkg_resources.normalize_path(normalized) 397 assert result_from_unnormalized == result_from_normalized 398 399 @pytest.mark.skipif( 400 os.path.sep != '\\', 401 reason='Testing systems using backslashes as path separators.', 402 ) 403 @pytest.mark.parametrize( 404 'unnormalized, expected', 405 [ 406 ('forward/slash', 'forward\\slash'), 407 ('forward/slash/', 'forward\\slash'), 408 ('backward\\slash\\', 'backward\\slash'), 409 ], 410 ) 411 def test_normalize_path_backslash_sep(self, unnormalized, expected): 412 """Ensure path seps are cleaned on backslash path sep systems. 413 """ 414 result = pkg_resources.normalize_path(unnormalized) 415 assert result.endswith(expected) 416