1import sys
2import tempfile
3import os
4import zipfile
5import datetime
6import time
7import subprocess
8import stat
9import distutils.dist
10import distutils.command.install_egg_info
11
12try:
13    from unittest import mock
14except ImportError:
15    import mock
16
17from pkg_resources import (
18    DistInfoDistribution, Distribution, EggInfoDistribution,
19)
20
21import pytest
22
23import pkg_resources
24
25
26def timestamp(dt):
27    """
28    Return a timestamp for a local, naive datetime instance.
29    """
30    try:
31        return dt.timestamp()
32    except AttributeError:
33        # Python 3.2 and earlier
34        return time.mktime(dt.timetuple())
35
36
37class EggRemover(str):
38    def __call__(self):
39        if self in sys.path:
40            sys.path.remove(self)
41        if os.path.exists(self):
42            os.remove(self)
43
44
45class TestZipProvider:
46    finalizers = []
47
48    ref_time = datetime.datetime(2013, 5, 12, 13, 25, 0)
49    "A reference time for a file modification"
50
51    @classmethod
52    def setup_class(cls):
53        "create a zip egg and add it to sys.path"
54        egg = tempfile.NamedTemporaryFile(suffix='.egg', delete=False)
55        zip_egg = zipfile.ZipFile(egg, 'w')
56        zip_info = zipfile.ZipInfo()
57        zip_info.filename = 'mod.py'
58        zip_info.date_time = cls.ref_time.timetuple()
59        zip_egg.writestr(zip_info, 'x = 3\n')
60        zip_info = zipfile.ZipInfo()
61        zip_info.filename = 'data.dat'
62        zip_info.date_time = cls.ref_time.timetuple()
63        zip_egg.writestr(zip_info, 'hello, world!')
64        zip_info = zipfile.ZipInfo()
65        zip_info.filename = 'subdir/mod2.py'
66        zip_info.date_time = cls.ref_time.timetuple()
67        zip_egg.writestr(zip_info, 'x = 6\n')
68        zip_info = zipfile.ZipInfo()
69        zip_info.filename = 'subdir/data2.dat'
70        zip_info.date_time = cls.ref_time.timetuple()
71        zip_egg.writestr(zip_info, 'goodbye, world!')
72        zip_egg.close()
73        egg.close()
74
75        sys.path.append(egg.name)
76        subdir = os.path.join(egg.name, 'subdir')
77        sys.path.append(subdir)
78        cls.finalizers.append(EggRemover(subdir))
79        cls.finalizers.append(EggRemover(egg.name))
80
81    @classmethod
82    def teardown_class(cls):
83        for finalizer in cls.finalizers:
84            finalizer()
85
86    def test_resource_listdir(self):
87        import mod
88        zp = pkg_resources.ZipProvider(mod)
89
90        expected_root = ['data.dat', 'mod.py', 'subdir']
91        assert sorted(zp.resource_listdir('')) == expected_root
92
93        expected_subdir = ['data2.dat', 'mod2.py']
94        assert sorted(zp.resource_listdir('subdir')) == expected_subdir
95        assert sorted(zp.resource_listdir('subdir/')) == expected_subdir
96
97        assert zp.resource_listdir('nonexistent') == []
98        assert zp.resource_listdir('nonexistent/') == []
99
100        import mod2
101        zp2 = pkg_resources.ZipProvider(mod2)
102
103        assert sorted(zp2.resource_listdir('')) == expected_subdir
104
105        assert zp2.resource_listdir('subdir') == []
106        assert zp2.resource_listdir('subdir/') == []
107
108    def test_resource_filename_rewrites_on_change(self):
109        """
110        If a previous call to get_resource_filename has saved the file, but
111        the file has been subsequently mutated with different file of the
112        same size and modification time, it should not be overwritten on a
113        subsequent call to get_resource_filename.
114        """
115        import mod
116        manager = pkg_resources.ResourceManager()
117        zp = pkg_resources.ZipProvider(mod)
118        filename = zp.get_resource_filename(manager, 'data.dat')
119        actual = datetime.datetime.fromtimestamp(os.stat(filename).st_mtime)
120        assert actual == self.ref_time
121        f = open(filename, 'w')
122        f.write('hello, world?')
123        f.close()
124        ts = timestamp(self.ref_time)
125        os.utime(filename, (ts, ts))
126        filename = zp.get_resource_filename(manager, 'data.dat')
127        with open(filename) as f:
128            assert f.read() == 'hello, world!'
129        manager.cleanup_resources()
130
131
132class TestResourceManager:
133    def test_get_cache_path(self):
134        mgr = pkg_resources.ResourceManager()
135        path = mgr.get_cache_path('foo')
136        type_ = str(type(path))
137        message = "Unexpected type from get_cache_path: " + type_
138        assert isinstance(path, str), message
139
140    def test_get_cache_path_race(self, tmpdir):
141        # Patch to os.path.isdir to create a race condition
142        def patched_isdir(dirname, unpatched_isdir=pkg_resources.isdir):
143            patched_isdir.dirnames.append(dirname)
144
145            was_dir = unpatched_isdir(dirname)
146            if not was_dir:
147                os.makedirs(dirname)
148            return was_dir
149
150        patched_isdir.dirnames = []
151
152        # Get a cache path with a "race condition"
153        mgr = pkg_resources.ResourceManager()
154        mgr.set_extraction_path(str(tmpdir))
155
156        archive_name = os.sep.join(('foo', 'bar', 'baz'))
157        with mock.patch.object(pkg_resources, 'isdir', new=patched_isdir):
158            mgr.get_cache_path(archive_name)
159
160        # Because this test relies on the implementation details of this
161        # function, these assertions are a sentinel to ensure that the
162        # test suite will not fail silently if the implementation changes.
163        called_dirnames = patched_isdir.dirnames
164        assert len(called_dirnames) == 2
165        assert called_dirnames[0].split(os.sep)[-2:] == ['foo', 'bar']
166        assert called_dirnames[1].split(os.sep)[-1:] == ['foo']
167
168    """
169    Tests to ensure that pkg_resources runs independently from setuptools.
170    """
171
172    def test_setuptools_not_imported(self):
173        """
174        In a separate Python environment, import pkg_resources and assert
175        that action doesn't cause setuptools to be imported.
176        """
177        lines = (
178            'import pkg_resources',
179            'import sys',
180            (
181                'assert "setuptools" not in sys.modules, '
182                '"setuptools was imported"'
183            ),
184        )
185        cmd = [sys.executable, '-c', '; '.join(lines)]
186        subprocess.check_call(cmd)
187
188
189def make_test_distribution(metadata_path, metadata):
190    """
191    Make a test Distribution object, and return it.
192
193    :param metadata_path: the path to the metadata file that should be
194        created. This should be inside a distribution directory that should
195        also be created. For example, an argument value might end with
196        "<project>.dist-info/METADATA".
197    :param metadata: the desired contents of the metadata file, as bytes.
198    """
199    dist_dir = os.path.dirname(metadata_path)
200    os.mkdir(dist_dir)
201    with open(metadata_path, 'wb') as f:
202        f.write(metadata)
203    dists = list(pkg_resources.distributions_from_metadata(dist_dir))
204    dist, = dists
205
206    return dist
207
208
209def test_get_metadata__bad_utf8(tmpdir):
210    """
211    Test a metadata file with bytes that can't be decoded as utf-8.
212    """
213    filename = 'METADATA'
214    # Convert the tmpdir LocalPath object to a string before joining.
215    metadata_path = os.path.join(str(tmpdir), 'foo.dist-info', filename)
216    # Encode a non-ascii string with the wrong encoding (not utf-8).
217    metadata = 'née'.encode('iso-8859-1')
218    dist = make_test_distribution(metadata_path, metadata=metadata)
219
220    with pytest.raises(UnicodeDecodeError) as excinfo:
221        dist.get_metadata(filename)
222
223    exc = excinfo.value
224    actual = str(exc)
225    expected = (
226        # The error message starts with "'utf-8' codec ..." However, the
227        # spelling of "utf-8" can vary (e.g. "utf8") so we don't include it
228        "codec can't decode byte 0xe9 in position 1: "
229        'invalid continuation byte in METADATA file at path: '
230    )
231    assert expected in actual, 'actual: {}'.format(actual)
232    assert actual.endswith(metadata_path), 'actual: {}'.format(actual)
233
234
235def make_distribution_no_version(tmpdir, basename):
236    """
237    Create a distribution directory with no file containing the version.
238    """
239    dist_dir = tmpdir / basename
240    dist_dir.ensure_dir()
241    # Make the directory non-empty so distributions_from_metadata()
242    # will detect it and yield it.
243    dist_dir.join('temp.txt').ensure()
244
245    if sys.version_info < (3, 6):
246        dist_dir = str(dist_dir)
247
248    dists = list(pkg_resources.distributions_from_metadata(dist_dir))
249    assert len(dists) == 1
250    dist, = dists
251
252    return dist, dist_dir
253
254
255@pytest.mark.parametrize(
256    'suffix, expected_filename, expected_dist_type',
257    [
258        ('egg-info', 'PKG-INFO', EggInfoDistribution),
259        ('dist-info', 'METADATA', DistInfoDistribution),
260    ],
261)
262def test_distribution_version_missing(
263        tmpdir, suffix, expected_filename, expected_dist_type):
264    """
265    Test Distribution.version when the "Version" header is missing.
266    """
267    basename = 'foo.{}'.format(suffix)
268    dist, dist_dir = make_distribution_no_version(tmpdir, basename)
269
270    expected_text = (
271        "Missing 'Version:' header and/or {} file at path: "
272    ).format(expected_filename)
273    metadata_path = os.path.join(dist_dir, expected_filename)
274
275    # Now check the exception raised when the "version" attribute is accessed.
276    with pytest.raises(ValueError) as excinfo:
277        dist.version
278
279    err = str(excinfo.value)
280    # Include a string expression after the assert so the full strings
281    # will be visible for inspection on failure.
282    assert expected_text in err, str((expected_text, err))
283
284    # Also check the args passed to the ValueError.
285    msg, dist = excinfo.value.args
286    assert expected_text in msg
287    # Check that the message portion contains the path.
288    assert metadata_path in msg, str((metadata_path, msg))
289    assert type(dist) == expected_dist_type
290
291
292def test_distribution_version_missing_undetected_path():
293    """
294    Test Distribution.version when the "Version" header is missing and
295    the path can't be detected.
296    """
297    # Create a Distribution object with no metadata argument, which results
298    # in an empty metadata provider.
299    dist = Distribution('/foo')
300    with pytest.raises(ValueError) as excinfo:
301        dist.version
302
303    msg, dist = excinfo.value.args
304    expected = (
305        "Missing 'Version:' header and/or PKG-INFO file at path: "
306        '[could not detect]'
307    )
308    assert msg == expected
309
310
311@pytest.mark.parametrize('only', [False, True])
312def test_dist_info_is_not_dir(tmp_path, only):
313    """Test path containing a file with dist-info extension."""
314    dist_info = tmp_path / 'foobar.dist-info'
315    dist_info.touch()
316    assert not pkg_resources.dist_factory(str(tmp_path), str(dist_info), only)
317
318
319class TestDeepVersionLookupDistutils:
320    @pytest.fixture
321    def env(self, tmpdir):
322        """
323        Create a package environment, similar to a virtualenv,
324        in which packages are installed.
325        """
326
327        class Environment(str):
328            pass
329
330        env = Environment(tmpdir)
331        tmpdir.chmod(stat.S_IRWXU)
332        subs = 'home', 'lib', 'scripts', 'data', 'egg-base'
333        env.paths = dict(
334            (dirname, str(tmpdir / dirname))
335            for dirname in subs
336        )
337        list(map(os.mkdir, env.paths.values()))
338        return env
339
340    def create_foo_pkg(self, env, version):
341        """
342        Create a foo package installed (distutils-style) to env.paths['lib']
343        as version.
344        """
345        ld = "This package has unicode metadata! ❄"
346        attrs = dict(name='foo', version=version, long_description=ld)
347        dist = distutils.dist.Distribution(attrs)
348        iei_cmd = distutils.command.install_egg_info.install_egg_info(dist)
349        iei_cmd.initialize_options()
350        iei_cmd.install_dir = env.paths['lib']
351        iei_cmd.finalize_options()
352        iei_cmd.run()
353
354    def test_version_resolved_from_egg_info(self, env):
355        version = '1.11.0.dev0+2329eae'
356        self.create_foo_pkg(env, version)
357
358        # this requirement parsing will raise a VersionConflict unless the
359        # .egg-info file is parsed (see #419 on BitBucket)
360        req = pkg_resources.Requirement.parse('foo>=1.9')
361        dist = pkg_resources.WorkingSet([env.paths['lib']]).find(req)
362        assert dist.version == version
363
364    @pytest.mark.parametrize(
365        'unnormalized, normalized',
366        [
367            ('foo', 'foo'),
368            ('foo/', 'foo'),
369            ('foo/bar', 'foo/bar'),
370            ('foo/bar/', 'foo/bar'),
371        ],
372    )
373    def test_normalize_path_trailing_sep(self, unnormalized, normalized):
374        """Ensure the trailing slash is cleaned for path comparison.
375
376        See pypa/setuptools#1519.
377        """
378        result_from_unnormalized = pkg_resources.normalize_path(unnormalized)
379        result_from_normalized = pkg_resources.normalize_path(normalized)
380        assert result_from_unnormalized == result_from_normalized
381
382    @pytest.mark.skipif(
383        os.path.normcase('A') != os.path.normcase('a'),
384        reason='Testing case-insensitive filesystems.',
385    )
386    @pytest.mark.parametrize(
387        'unnormalized, normalized',
388        [
389            ('MiXeD/CasE', 'mixed/case'),
390        ],
391    )
392    def test_normalize_path_normcase(self, unnormalized, normalized):
393        """Ensure mixed case is normalized on case-insensitive filesystems.
394        """
395        result_from_unnormalized = pkg_resources.normalize_path(unnormalized)
396        result_from_normalized = pkg_resources.normalize_path(normalized)
397        assert result_from_unnormalized == result_from_normalized
398
399    @pytest.mark.skipif(
400        os.path.sep != '\\',
401        reason='Testing systems using backslashes as path separators.',
402    )
403    @pytest.mark.parametrize(
404        'unnormalized, expected',
405        [
406            ('forward/slash', 'forward\\slash'),
407            ('forward/slash/', 'forward\\slash'),
408            ('backward\\slash\\', 'backward\\slash'),
409        ],
410    )
411    def test_normalize_path_backslash_sep(self, unnormalized, expected):
412        """Ensure path seps are cleaned on backslash path sep systems.
413        """
414        result = pkg_resources.normalize_path(unnormalized)
415        assert result.endswith(expected)
416