1# -*- coding: utf-8 -*-
2#
3# Copyright (C) 2012-2013 Vinay Sajip.
4# Licensed to the Python Software Foundation under a contributor agreement.
5# See LICENSE.txt and CONTRIBUTORS.txt.
6#
7from io import BytesIO
8from itertools import islice
9import os
10import re
11import shutil
12try:
13    import ssl
14except ImportError:
15    ssl = None
16import sys
17import tempfile
18import textwrap
19import time
20
21from compat import unittest
22from support import TempdirManager, DistlibTestCase, in_github_workflow
23
24from distlib import DistlibException
25from distlib.compat import cache_from_source
26from distlib.util import (get_export_entry, ExportEntry, resolve,
27                          get_cache_base, path_to_cache_dir, zip_dir,
28                          parse_credentials, ensure_slash, split_filename,
29                          EventMixin, Sequencer, unarchive, Progress,
30                          iglob, RICH_GLOB, parse_requirement, get_extras,
31                          Configurator, read_exports, write_exports,
32                          FileOperator, is_string_sequence, get_package_data,
33                          convert_path)
34
35
36HERE = os.path.dirname(os.path.abspath(__file__))
37IN_GITHUB_WORKFLOW = in_github_workflow()
38
39class TestContainer(object):
40    def __init__(self, *args, **kwargs):
41        self.args = args
42        self.kwargs = kwargs
43
44
45class UtilTestCase(DistlibTestCase):
46    def check_entry(self, entry, name, prefix, suffix, flags):
47        self.assertEqual(entry.name, name)
48        self.assertEqual(entry.prefix, prefix)
49        self.assertEqual(entry.suffix, suffix)
50        self.assertEqual(entry.flags, flags)
51
52    def test_export_entry(self):
53        self.assertIsNone(get_export_entry('foo.py'))
54        self.assertIsNone(get_export_entry('foo.py='))
55        for spec in ('foo=foo:main', 'foo =foo:main', 'foo= foo:main',
56                     'foo = foo:main'):
57            self.check_entry(get_export_entry(spec),
58                             'foo', 'foo', 'main', [])
59        self.check_entry(get_export_entry('foo=foo.bar:main'),
60                         'foo', 'foo.bar', 'main', [])
61        self.check_entry(get_export_entry('foo=foo.bar:main [a]'),
62                         'foo', 'foo.bar', 'main', ['a'])
63        # See issue #127 - allow hyphens
64        self.check_entry(get_export_entry('foo=foo.bar:main [with-foo]'),
65                         'foo', 'foo.bar', 'main', ['with-foo'])
66        self.check_entry(get_export_entry('foo=foo.bar:main [ a ]'),
67                         'foo', 'foo.bar', 'main', ['a'])
68        self.check_entry(get_export_entry('foo=foo.bar:main [a=b, c=d,e, f=g]'),
69                         'foo', 'foo.bar', 'main', ['a=b', 'c=d', 'e', 'f=g'])
70        self.check_entry(get_export_entry('foo=foo.bar:main [a=9, 9=8,e, f9=g8]'),
71                         'foo', 'foo.bar', 'main', ['a=9', '9=8', 'e', 'f9=g8'])
72        self.check_entry(get_export_entry('foo=foo.bar:main[x]'),
73                         'foo', 'foo.bar', 'main', ['x'])
74        self.check_entry(get_export_entry('foo=abc'), 'foo', 'abc', None, [])
75        self.check_entry(get_export_entry('smc++ = smcpp.frontend:console'), 'smc++',
76                                          'smcpp.frontend', 'console', [])
77        self.assertRaises(DistlibException, get_export_entry, 'foo=foo.bar:x:y')
78        self.assertRaises(DistlibException, get_export_entry, 'foo=foo.bar:x [')
79        self.assertRaises(DistlibException, get_export_entry, 'foo=foo.bar:x ]')
80        self.assertRaises(DistlibException, get_export_entry, 'foo=foo.bar:x []')
81        self.assertRaises(DistlibException, get_export_entry, 'foo=foo.bar:x [\\]')
82        self.assertRaises(DistlibException, get_export_entry, 'foo=foo.bar:x [a=]')
83        self.assertRaises(DistlibException, get_export_entry, 'foo=foo.bar:x [a,]')
84        self.assertRaises(DistlibException, get_export_entry, 'foo=foo.bar:x [a,,b]')
85        self.assertRaises(DistlibException, get_export_entry, 'foo=foo.bar:x [a b]')
86
87    def test_resolve(self):
88        import logging
89        import logging.handlers
90        self.assertIs(resolve('logging', None), logging)
91        self.assertIs(resolve('logging.handlers', None), logging.handlers)
92        self.assertIs(resolve('logging', 'root'), logging.root)
93        self.assertEqual(resolve('logging', 'root.debug'), logging.root.debug)
94
95    def test_cache_base(self):
96        actual = get_cache_base()
97        if os.name == 'nt' and 'LOCALAPPDATA' in os.environ:
98            expected = os.path.expandvars('$localappdata')
99        else:
100            expected = os.path.expanduser('~')
101        expected = os.path.join(expected, '.distlib')
102        self.assertEqual(expected, actual)
103        self.assertTrue(os.path.isdir(expected))
104
105    @unittest.skipIf(os.name != 'posix', 'Test is only valid for POSIX')
106    def test_path_to_cache_dir_posix(self):
107        self.assertEqual(path_to_cache_dir('/home/user/some-file.zip'),
108                        '--home--user--some-file.zip.cache')
109
110    @unittest.skipIf(os.name != 'nt', 'Test is only valid for Windows')
111    def test_path_to_cache_dir_nt(self):
112        self.assertEqual(path_to_cache_dir(r'c:\Users\User\Some-File.zip'),
113                        'c-----Users--User--Some-File.zip.cache')
114
115    def test_parse_credentials(self):
116        cases = (
117            ('example.com', (None, None, 'example.com')),
118            ('user@example.com',  ('user', None, 'example.com')),
119            ('user:pwd@example.com', ('user', 'pwd', 'example.com')),
120            ('user:@example.com', ('user', '', 'example.com')),
121            ('user:pass@word@example.com', ('user', 'pass@word', 'example.com')),
122            ('user:pass:word@example.com', ('user', 'pass:word', 'example.com')),
123            ('user%3Aname:%23%5E%40@example.com', ('user:name', '#^@', 'example.com')),
124        )
125
126        for s, expected in cases:
127            self.assertEqual(parse_credentials(s), expected)
128
129    def test_ensure_slash(self):
130        self.assertEqual(ensure_slash(''), '/')
131        self.assertEqual(ensure_slash('/'), '/')
132        self.assertEqual(ensure_slash('abc'), 'abc/')
133        self.assertEqual(ensure_slash('def/'), 'def/')
134
135    def test_split_filename(self):
136        self.assertIsNone(split_filename('abl.jquery'))
137        self.assertEqual(split_filename('abl.jquery-1.4.2-2'),
138                         ('abl.jquery', '1.4.2-2', None))
139        self.assertEqual(split_filename('python-gnupg-0.1'),
140                         ('python-gnupg', '0.1', None))
141        self.assertEqual(split_filename('baklabel-1.0.3-2729-py3.2'),
142                         ('baklabel', '1.0.3-2729', '3.2'))
143        self.assertEqual(split_filename('baklabel-1.0.3-2729-py27'),
144                         ('baklabel', '1.0.3-2729', '27'))
145        self.assertEqual(split_filename('advpy-0.99b'),
146                         ('advpy', '0.99b', None))
147        self.assertEqual(split_filename('asv_files-dev-20120501-01', 'asv_files'),
148                         ('asv_files', 'dev-20120501-01', None))
149        self.assertEqual(split_filename('greenlet-0.4.0-py27-win32'),
150                         ('greenlet', '0.4.0', '27'))
151        self.assertEqual(split_filename('greenlet-0.4.0-py27-linux_x86_64'),
152                         ('greenlet', '0.4.0', '27'))
153        self.assertEqual(split_filename('django-altuser-v0.6.8'),
154                         ('django-altuser', 'v0.6.8', None))
155        self.assertEqual(split_filename('youtube_dl_server-alpha.1'),
156                         ('youtube_dl_server', 'alpha.1', None))
157        self.assertEqual(split_filename('pytest-xdist-dev'),
158                         ('pytest-xdist', 'dev', None))
159        self.assertEqual(split_filename('pytest_xdist-0.1_myfork', None),
160                         ('pytest_xdist', '0.1_myfork', None))
161        self.assertEqual(split_filename('pytest_xdist-0.1_myfork',
162                                        'pytest-xdist'),
163                         ('pytest_xdist', '0.1_myfork', None))
164        self.assertEqual(split_filename('pytest_xdist-0.1_myfork',
165                                        'pytest_dist'),
166                         ('pytest_xdist', '0.1_myfork', None))
167
168    def test_convert_path(self):
169        CP = convert_path
170        if os.sep == '/':
171            d = os.path.dirname(__file__)
172            self.assertEqual(CP(d), d)
173        else:
174            self.assertEqual(CP(''), '')
175            self.assertRaises(ValueError, CP, '/foo')
176            self.assertRaises(ValueError, CP, 'foo/')
177
178    def test_events(self):
179        collected = []
180
181        def handler1(e, *args, **kwargs):
182            collected.append((1, e, args, kwargs))
183
184        def handler2(e, *args, **kwargs):
185            collected.append((2, e, args, kwargs))
186
187        def handler3(e, *args, **kwargs):
188            if not args:
189                raise NotImplementedError('surprise!')
190            collected.append((3, e, args, kwargs))
191            return (args, kwargs)
192
193        e = EventMixin()
194        e.add('A', handler1)
195        self.assertRaises(ValueError, e.remove, 'B', handler1)
196
197        cases = (
198            ((1, 2), {'buckle': 'my shoe'}),
199            ((3, 4), {'shut': 'the door'}),
200        )
201
202        for case in cases:
203            e.publish('A', *case[0], **case[1])
204            e.publish('B', *case[0], **case[1])
205
206        for actual, source in zip(collected, cases):
207            self.assertEqual(actual, (1, 'A') + source[:1] + source[1:])
208
209        collected = []
210        e.add('B', handler2)
211
212        self.assertEqual(tuple(e.get_subscribers('A')), (handler1,))
213        self.assertEqual(tuple(e.get_subscribers('B')), (handler2,))
214        self.assertEqual(tuple(e.get_subscribers('C')), ())
215
216        for case in cases:
217            e.publish('A', *case[0], **case[1])
218            e.publish('B', *case[0], **case[1])
219
220        actuals = islice(collected, 0, None, 2)
221        for actual, source in zip(actuals, cases):
222            self.assertEqual(actual, (1, 'A') + source[:1] + source[1:])
223
224        actuals = islice(collected, 1, None, 2)
225        for actual, source in zip(actuals, cases):
226            self.assertEqual(actual, (2, 'B') + source[:1] + source[1:])
227
228        e.remove('B', handler2)
229
230        collected = []
231
232        for case in cases:
233            e.publish('A', *case[0], **case[1])
234            e.publish('B', *case[0], **case[1])
235
236        for actual, source in zip(collected, cases):
237            self.assertEqual(actual, (1, 'A') + source[:1] + source[1:])
238
239        e.add('C', handler3)
240
241        collected = []
242        returned = []
243
244        for case in cases:
245            returned.extend(e.publish('C', *case[0], **case[1]))
246            returned.extend(e.publish('C'))
247
248        for actual, source in zip(collected, cases):
249            self.assertEqual(actual, (3, 'C') + source[:1] + source[1:])
250
251        self.assertEqual(tuple(islice(returned, 1, None, 2)), (None, None))
252        actuals = islice(returned, 0, None, 2)
253        for actual, expected in zip(actuals, cases):
254            self.assertEqual(actual, expected)
255
256    def test_sequencer_basic(self):
257        seq = Sequencer()
258
259        steps = (
260            ('check', 'sdist'),
261            ('check', 'register'),
262            ('check', 'sdist'),
263            ('check', 'register'),
264            ('register', 'upload_sdist'),
265            ('sdist', 'upload_sdist'),
266            ('check', 'build_clibs'),
267            ('build_clibs', 'build_ext'),
268            ('build_ext', 'build_py'),
269            ('build_py', 'build_scripts'),
270            ('build_scripts', 'build'),
271            ('build', 'test'),
272            ('register', 'upload_bdist'),
273            ('build', 'upload_bdist'),
274            ('build', 'install_headers'),
275            ('install_headers', 'install_lib'),
276            ('install_lib', 'install_scripts'),
277            ('install_scripts', 'install_data'),
278            ('install_data', 'install_distinfo'),
279            ('install_distinfo', 'install')
280        )
281
282        for pred, succ in steps:
283            seq.add(pred, succ)
284
285        # Note: these tests are sensitive to dictionary ordering
286        # but work under Python 2.6, 2.7, 3.2, 3.3, 3.4 and PyPy 2.5
287        cases = (
288            ('check', ['check']),
289            ('register', ['check', 'register']),
290            ('sdist', ['check', 'sdist']),
291            ('build_clibs', ['check', 'build_clibs']),
292            ('build_ext', ['check', 'build_clibs', 'build_ext']),
293            ('build_py', ['check', 'build_clibs', 'build_ext', 'build_py']),
294            ('build_scripts', ['check', 'build_clibs', 'build_ext', 'build_py',
295                               'build_scripts']),
296            ('build', ['check', 'build_clibs', 'build_ext', 'build_py',
297                       'build_scripts', 'build']),
298            ('test', ['check', 'build_clibs', 'build_ext', 'build_py',
299                      'build_scripts', 'build', 'test']),
300            ('install_headers', ['check', 'build_clibs', 'build_ext',
301                                 'build_py', 'build_scripts', 'build',
302                                 'install_headers']),
303            ('install_lib', ['check', 'build_clibs', 'build_ext', 'build_py',
304                             'build_scripts', 'build', 'install_headers',
305                             'install_lib']),
306            ('install_scripts', ['check', 'build_clibs', 'build_ext',
307                                 'build_py', 'build_scripts', 'build',
308                                 'install_headers', 'install_lib',
309                                 'install_scripts']),
310            ('install_data', ['check', 'build_clibs', 'build_ext', 'build_py',
311                              'build_scripts', 'build', 'install_headers',
312                              'install_lib', 'install_scripts',
313                              'install_data']),
314            ('install_distinfo', ['check', 'build_clibs', 'build_ext',
315                                  'build_py', 'build_scripts', 'build',
316                                  'install_headers', 'install_lib',
317                                  'install_scripts', 'install_data',
318                                  'install_distinfo']),
319            ('install', ['check', 'build_clibs', 'build_ext', 'build_py',
320                         'build_scripts', 'build', 'install_headers',
321                         'install_lib', 'install_scripts', 'install_data',
322                         'install_distinfo', 'install']),
323            ('upload_sdist', (['check', 'register', 'sdist', 'upload_sdist'],
324                              ['check', 'sdist', 'register', 'upload_sdist'])),
325            ('upload_bdist', (['check', 'build_clibs', 'build_ext', 'build_py',
326                               'build_scripts', 'build', 'register',
327                               'upload_bdist'],
328                              ['check', 'build_clibs', 'build_ext', 'build_py',
329                               'build_scripts', 'register', 'build',
330                               'upload_bdist'])),
331        )
332
333        for final, expected in cases:
334            actual = list(seq.get_steps(final))
335            if isinstance(expected, tuple):
336                self.assertIn(actual, expected)
337            else:
338                self.assertEqual(actual, expected)
339
340        dot = seq.dot
341        expected = '''
342        digraph G {
343          check -> build_clibs;
344          install_lib -> install_scripts;
345          register -> upload_bdist;
346          build -> upload_bdist;
347          build_ext -> build_py;
348          install_scripts -> install_data;
349          check -> sdist;
350          check -> register;
351          build -> install_headers;
352          install_data -> install_distinfo;
353          sdist -> upload_sdist;
354          register -> upload_sdist;
355          install_distinfo -> install;
356          build -> test;
357          install_headers -> install_lib;
358          build_py -> build_scripts;
359          build_clibs -> build_ext;
360          build_scripts -> build;
361        }
362        '''
363        expected = textwrap.dedent(expected).strip().splitlines()
364        actual = dot.splitlines()
365        self.assertEqual(expected[0], actual[0])
366        self.assertEqual(expected[-1], actual[-1])
367        self.assertEqual(set(expected[1:-1]), set(actual[1:-1]))
368        actual = seq.strong_connections
369        expected = (
370            [
371                ('test',), ('upload_bdist',), ('install',),
372                ('install_distinfo',), ('install_data',), ('install_scripts',),
373                ('install_lib',), ('install_headers',), ('build',),
374                ('build_scripts',), ('build_py',), ('build_ext',),
375                ('build_clibs',), ('upload_sdist',), ('sdist',), ('register',),
376                ('check',)
377            ],
378            [
379                ('install',), ('install_distinfo',), ('install_data',),
380                ('install_scripts',), ('install_lib',), ('install_headers',),
381                ('test',), ('upload_bdist',), ('build',), ('build_scripts',),
382                ('build_py',), ('build_ext',), ('build_clibs',),
383                ('upload_sdist',), ('sdist',), ('register',), ('check',)
384            ],
385            [
386                ('upload_sdist',), ('sdist',), ('install',),
387                ('install_distinfo',), ('install_data',), ('upload_bdist',),
388                ('register',), ('install_scripts',), ('install_lib',),
389                ('install_headers',), ('test',), ('build',),
390                ('build_scripts',), ('build_py',), ('build_ext',),
391                ('build_clibs',), ('check',)
392            ],
393            # Next case added for PyPy
394            [
395                ('upload_sdist',), ('sdist',), ('upload_bdist',), ('register',),
396                ('test',), ('install',), ('install_distinfo',),
397                ('install_data',), ('install_scripts',), ('install_lib',),
398                ('install_headers',), ('build',), ('build_scripts',),
399                ('build_py',), ('build_ext',), ('build_clibs',), ('check',)
400            ],
401            # Next case added for Python 3.6
402            [
403                ('upload_sdist',), ('sdist',), ('upload_bdist',), ('register',),
404                ('install',), ('install_distinfo',), ('install_data',),
405                ('install_scripts',), ('install_lib',), ('install_headers',),
406                ('test',), ('build',), ('build_scripts',), ('build_py',),
407                ('build_ext',), ('build_clibs',), ('check',)
408            ]
409        )
410        self.assertIn(actual, expected)
411
412    def test_sequencer_cycle(self):
413        seq = Sequencer()
414        seq.add('A', 'B')
415        seq.add('B', 'C')
416        seq.add('C', 'D')
417        self.assertEqual(list(seq.get_steps('D')), ['A', 'B', 'C', 'D'])
418        seq.add('C', 'A')
419        self.assertEqual(list(seq.get_steps('D')), ['C', 'A', 'B', 'D'])
420        self.assertFalse(seq.is_step('E'))
421        self.assertRaises(ValueError, seq.get_steps, 'E')
422        seq.add_node('E')
423        self.assertTrue(seq.is_step('E'))
424        self.assertEqual(list(seq.get_steps('E')), ['E'])
425        seq.remove_node('E')
426        self.assertFalse(seq.is_step('E'))
427        self.assertRaises(ValueError, seq.get_steps, 'E')
428        seq.remove('C', 'A')
429        self.assertEqual(list(seq.get_steps('D')), ['A', 'B', 'C', 'D'])
430
431    def test_sequencer_removal(self):
432        seq = Sequencer()
433        seq.add('A', 'B')
434        seq.add('B', 'C')
435        seq.add('C', 'D')
436        preds = {
437            'B': set(['A']),
438            'C': set(['B']),
439            'D': set(['C'])
440        }
441        succs =  {
442            'A': set(['B']),
443            'B': set(['C']),
444            'C': set(['D'])
445        }
446        self.assertEqual(seq._preds, preds)
447        self.assertEqual(seq._succs, succs)
448        seq.remove_node('C')
449        self.assertEqual(seq._preds, preds)
450        self.assertEqual(seq._succs, succs)
451        seq.remove_node('C', True)
452        self.assertEqual(seq._preds, {'B': set(['A'])})
453        self.assertEqual(seq._succs, {'A': set(['B'])})
454
455    def test_unarchive(self):
456        import zipfile, tarfile
457
458        good_archives = (
459            ('good.zip', zipfile.ZipFile, 'r', 'namelist'),
460            ('good.tar', tarfile.open, 'r', 'getnames'),
461            ('good.tar.gz', tarfile.open, 'r:gz', 'getnames'),
462            ('good.tar.bz2', tarfile.open, 'r:bz2', 'getnames'),
463        )
464        bad_archives = ('bad.zip', 'bad.tar', 'bad.tar.gz', 'bad.tar.bz2')
465
466        for name, cls, mode, lister in good_archives:
467            td = tempfile.mkdtemp()
468            archive = None
469            try:
470                name = os.path.join(HERE, name)
471                unarchive(name, td)
472                archive = cls(name, mode)
473                names = getattr(archive, lister)()
474                for name in names:
475                    p = os.path.join(td, name)
476                    self.assertTrue(os.path.exists(p))
477            finally:
478                shutil.rmtree(td)
479                if archive:
480                    archive.close()
481
482        for name in bad_archives:
483            name = os.path.join(HERE, name)
484            td = tempfile.mkdtemp()
485            try:
486                self.assertRaises(ValueError, unarchive, name, td)
487            finally:
488                shutil.rmtree(td)
489
490    def test_string_sequence(self):
491        self.assertTrue(is_string_sequence(['a']))
492        self.assertTrue(is_string_sequence(['a', 'b']))
493        self.assertFalse(is_string_sequence(['a', 'b', None]))
494        self.assertRaises(AssertionError, is_string_sequence, [])
495
496    @unittest.skipIf('SKIP_ONLINE' in os.environ, 'Skipping online test')
497    @unittest.skipUnless(ssl, 'SSL required for this test.')
498    def test_package_data(self):
499        data = get_package_data(name='config', version='0.3.6')
500        self.assertTrue(data)
501        self.assertTrue('index-metadata' in data)
502        metadata = data['index-metadata']
503        self.assertEqual(metadata['name'], 'config')
504        self.assertEqual(metadata['version'], '0.3.6')
505        data = get_package_data(name='config', version='0.3.5')
506        self.assertFalse(data)
507
508    def test_zip_dir(self):
509        d = os.path.join(HERE, 'foofoo')
510        data = zip_dir(d)
511        self.assertIsInstance(data, BytesIO)
512
513    def test_configurator(self):
514        d = {
515            'a': 1,
516            'b': 2.0,
517            'c': 'xyz',
518            'd': 'inc://' + os.path.join(HERE, 'included.json'),
519            'e': 'inc://' + 'included.json',
520            'stderr': 'ext://sys.stderr',
521            'list_o_stuff': [
522                'cfg://stderr',
523                'ext://sys.stdout',
524                'ext://logging.NOTSET',
525            ],
526            'dict_o_stuff': {
527                'k1': 'cfg://list_o_stuff[1]',
528                'k2': 'abc',
529                'k3': 'cfg://list_o_stuff',
530            },
531            'another_dict_o_stuff': {
532                'k1': 'cfg://dict_o_stuff[k2]',
533                'k2': 'ext://re.I',
534                'k3': 'cfg://dict_o_stuff[k3][0]',
535            },
536            'custom': {
537                '()': __name__ + '.TestContainer',
538                '[]': [1, 'a', 2.0, ('b', 'c', 'd')],
539                '.': {
540                    'p1': 'a',
541                    'p2': 'b',
542                    'p3': {
543                        '()' : __name__ + '.TestContainer',
544                        '[]': [1, 2],
545                        '.': {
546                            'p1': 'c',
547                        },
548                    },
549                },
550                'k1': 'v1',
551                'k2': 'v2',
552            }
553        }
554
555        cfg = Configurator(d, HERE)
556        self.assertEqual(cfg['a'], 1)
557        self.assertEqual(cfg['b'], 2.0)
558        self.assertEqual(cfg['c'], 'xyz')
559        self.assertIs(cfg['stderr'], sys.stderr)
560        self.assertIs(cfg['list_o_stuff'][0], sys.stderr)
561        self.assertIs(cfg['list_o_stuff'][1], sys.stdout)
562        self.assertIs(cfg['list_o_stuff'][-1], 0)   # logging.NOTSET == 0
563        self.assertIs(cfg['dict_o_stuff']['k1'], sys.stdout)
564        self.assertIs(cfg['another_dict_o_stuff']['k1'], 'abc')
565        self.assertIs(cfg['another_dict_o_stuff']['k2'], re.I)
566        self.assertIs(cfg['another_dict_o_stuff']['k3'], sys.stderr)
567        custom = cfg['custom']
568        self.assertIsInstance(custom, TestContainer)
569        self.assertEqual(custom.args, (1, 'a', 2.0, ('b', 'c', 'd')))
570        self.assertEqual(custom.kwargs, {'k1': 'v1', 'k2': 'v2'})
571        self.assertEqual(custom.p1, 'a')
572        self.assertEqual(custom.p2, 'b')
573        self.assertIsInstance(custom.p3, TestContainer)
574        self.assertEqual(custom.p3.args, (1, 2))
575        self.assertEqual(custom.p3.kwargs, {})
576        self.assertEqual(custom.p3.p1, 'c')
577        self.assertEqual(cfg['d'], {'foo': 'bar', 'bar': 'baz'})
578        self.assertEqual(cfg['e'], {'foo': 'bar', 'bar': 'baz'})
579
580
581def _speed_range(min_speed, max_speed):
582    return tuple(['%d KB/s' % v for v in range(min_speed,
583                                               max_speed + 1)])
584
585def _eta_range(min_eta, max_eta, prefix='ETA '):
586    msg = prefix + ': 00:00:%02d'
587    return tuple([msg % v for v in range(min_eta, max_eta + 1)])
588
589class ProgressTestCase(DistlibTestCase):
590    # Of late, the speed tests keep failing on AppVeyor and Windows
591    @unittest.skipIf(IN_GITHUB_WORKFLOW or (os.name == 'nt' and
592                                            os.environ.get('APPVEYOR') == 'True'),
593                     'Test disabled on some environments due to performance')
594    def test_basic(self):
595
596        # These ranges may need tweaking to cater for especially slow
597        # machines
598        if os.name == 'nt':
599            speed1 = _speed_range(18, 20)
600            speed2 = _speed_range(20, 22)
601        else:
602            speed1 = _speed_range(16, 19)
603            speed2 = _speed_range(20, 22)
604        expected = (
605            (' 10 %', _eta_range(4, 7), speed1),
606            (' 20 %', _eta_range(4, 7), speed1),
607            (' 30 %', _eta_range(3, 4), speed1),
608            (' 40 %', _eta_range(3, 3), speed1),
609            (' 50 %', _eta_range(2, 2), speed1),
610            (' 60 %', _eta_range(2, 2), speed1),
611            (' 70 %', _eta_range(1, 1), speed1),
612            (' 80 %', _eta_range(1, 1), speed1),
613            (' 90 %', _eta_range(0, 0), speed1),
614            ('100 %', _eta_range(4, 5, 'Done'), speed2),
615        )
616        bar = Progress(maxval=100000).start()
617        for i, v in enumerate(range(10000, 100000, 10000)):
618            time.sleep(0.5)
619            bar.update(v)
620            p, e, s = expected[i]
621            self.assertEqual(bar.percentage, p)
622            self.assertIn(bar.ETA, e, p)
623            self.assertIn(bar.speed, s)
624        bar.stop()
625        p, e, s = expected[i + 1]
626        self.assertEqual(bar.percentage, p)
627        self.assertIn(bar.ETA, e, p)
628        self.assertIn(bar.speed, s)
629
630    # Of late, the speed tests keep failing on AppVeyor and Windows
631    @unittest.skipIf(IN_GITHUB_WORKFLOW or (os.name == 'nt' and
632                                            os.environ.get('APPVEYOR') == 'True'),
633                     'Test disabled on some environments due to performance')
634    def test_unknown(self):
635        if os.name == 'nt':
636            speed = _speed_range(17, 20)
637        else:
638            speed = _speed_range(17, 19)
639        expected = (
640            (' ?? %', 'ETA : ??:??:??', speed),
641            (' ?? %', 'ETA : ??:??:??', speed),
642            (' ?? %', 'ETA : ??:??:??', speed),
643            (' ?? %', 'ETA : ??:??:??', speed),
644            (' ?? %', 'ETA : ??:??:??', speed),
645            (' ?? %', 'ETA : ??:??:??', speed),
646            (' ?? %', 'ETA : ??:??:??', speed),
647            (' ?? %', 'ETA : ??:??:??', speed),
648            (' ?? %', 'ETA : ??:??:??', speed),
649            ('100 %', 'Done: 00:00:04', speed),
650        )
651        bar = Progress(maxval=None).start()
652        for i, v in enumerate(range(10000, 100000, 10000)):
653            time.sleep(0.5)
654            bar.update(v)
655            p, e, s = expected[i]
656            self.assertEqual(bar.percentage, p)
657            self.assertEqual(bar.ETA, e)
658            self.assertIn(bar.speed, s)
659        bar.stop()
660        p, e, s = expected[i + 1]
661        self.assertEqual(bar.percentage, p)
662        self.assertEqual(bar.ETA, e)
663        self.assertIn(bar.speed, s)
664
665class FileOpsTestCase(DistlibTestCase):
666
667    def setUp(self):
668        self.fileop = FileOperator()
669        self.workdir = tempfile.mkdtemp()
670
671    def tearDown(self):
672        if os.path.isdir(self.workdir):
673            shutil.rmtree(self.workdir)
674
675    def test_ensure_dir(self):
676        td = self.workdir
677        os.rmdir(td)
678        self.fileop.ensure_dir(td)
679        self.assertTrue(os.path.exists(td))
680        self.fileop.dry_run = True
681        os.rmdir(td)
682        self.fileop.ensure_dir(td)
683        self.assertFalse(os.path.exists(td))
684
685    def test_ensure_removed(self):
686        td = self.workdir
687        self.assertTrue(os.path.exists(td))
688        self.fileop.dry_run = True
689        self.fileop.ensure_removed(td)
690        self.assertTrue(os.path.exists(td))
691        self.fileop.dry_run = False
692        self.fileop.ensure_removed(td)
693        self.assertFalse(os.path.exists(td))
694
695    def test_is_writable(self):
696        sd = 'subdir'
697        ssd = 'subsubdir'
698        path = os.path.join(self.workdir, sd, ssd)
699        os.makedirs(path)
700        path = os.path.join(path, 'test')
701        self.assertTrue(self.fileop.is_writable(path))
702        if os.name == 'posix':
703            self.assertFalse(self.fileop.is_writable('/etc'))
704
705    def test_byte_compile(self):
706        path = os.path.join(self.workdir, 'hello.py')
707        dpath = cache_from_source(path, True)
708        self.fileop.write_text_file(path, 'print("Hello, world!")', 'utf-8')
709        self.fileop.byte_compile(path, optimize=False)
710        self.assertTrue(os.path.exists(dpath))
711
712    def write_some_files(self):
713        path = os.path.join(self.workdir, 'file1')
714        written = []
715        self.fileop.write_text_file(path, 'test', 'utf-8')
716        written.append(path)
717        path = os.path.join(self.workdir, 'file2')
718        self.fileop.copy_file(written[0], path)
719        written.append(path)
720        path = os.path.join(self.workdir, 'dir1')
721        self.fileop.ensure_dir(path)
722        return set(written), set([path])
723
724    def test_copy_check(self):
725        srcpath = os.path.join(self.workdir, 'file1')
726        self.fileop.write_text_file(srcpath, 'test', 'utf-8')
727        dstpath = os.path.join(self.workdir, 'file2')
728        os.mkdir(dstpath)
729        self.assertRaises(ValueError, self.fileop.copy_file, srcpath,
730                          dstpath)
731        os.rmdir(dstpath)
732        if os.name == 'posix':      # symlinks available
733            linkpath = os.path.join(self.workdir, 'file3')
734            self.fileop.write_text_file(linkpath, 'linkdest', 'utf-8')
735            os.symlink(linkpath, dstpath)
736            self.assertRaises(ValueError, self.fileop.copy_file, srcpath,
737                              dstpath)
738
739    def test_commit(self):
740        # will assert if record isn't set
741        self.assertRaises(AssertionError, self.fileop.commit)
742        self.fileop.record = True
743        expected = self.write_some_files()
744        actual = self.fileop.commit()
745        self.assertEqual(actual, expected)
746        self.assertFalse(self.fileop.record)
747
748    def test_rollback(self):
749        # will assert if record isn't set
750        self.assertRaises(AssertionError, self.fileop.commit)
751        self.fileop.record = True
752        expected = self.write_some_files()
753        actual = self.fileop.rollback()
754        self.assertEqual(os.listdir(self.workdir), [])
755        self.assertFalse(self.fileop.record)
756
757
758class GlobTestCaseBase(TempdirManager, DistlibTestCase):
759
760    def build_files_tree(self, files):
761        tempdir = self.mkdtemp()
762        for filepath in files:
763            is_dir = filepath.endswith('/')
764            filepath = os.path.join(tempdir, *filepath.split('/'))
765            if is_dir:
766                dirname = filepath
767            else:
768                dirname = os.path.dirname(filepath)
769            if dirname and not os.path.exists(dirname):
770                os.makedirs(dirname)
771            if not is_dir:
772                self.write_file(filepath, 'babar')
773        return tempdir
774
775    @staticmethod
776    def os_dependent_path(path):
777        path = path.rstrip('/').split('/')
778        return os.path.join(*path)
779
780    def clean_tree(self, spec):
781        files = []
782        for path, includes in spec.items():
783            if includes:
784                files.append(self.os_dependent_path(path))
785        return sorted(files)
786
787
788class GlobTestCase(GlobTestCaseBase):
789
790    def assertGlobMatch(self, glob, spec):
791        tempdir = self.build_files_tree(spec)
792        expected = self.clean_tree(spec)
793        os.chdir(tempdir)
794        result = sorted(iglob(glob))
795        self.assertEqual(expected, result)
796
797    def test_regex_rich_glob(self):
798        matches = RICH_GLOB.findall(
799                                r"babar aime les {fraises} est les {huitres}")
800        self.assertEqual(["fraises", "huitres"], matches)
801
802    def test_simple_glob(self):
803        glob = '*.tp?'
804        spec = {'coucou.tpl': True,
805                 'coucou.tpj': True,
806                 'Donotwant': False}
807        self.assertGlobMatch(glob, spec)
808
809    def test_simple_glob_in_dir(self):
810        glob = os.path.join('babar', '*.tp?')
811        spec = {'babar/coucou.tpl': True,
812                 'babar/coucou.tpj': True,
813                 'babar/toto.bin': False,
814                 'Donotwant': False}
815        self.assertGlobMatch(glob, spec)
816
817    def test_recursive_glob_head(self):
818        glob = os.path.join('**', 'tip', '*.t?l')
819        spec = {'babar/zaza/zuzu/tip/coucou.tpl': True,
820                 'babar/z/tip/coucou.tpl': True,
821                 'babar/tip/coucou.tpl': True,
822                 'babar/zeop/tip/babar/babar.tpl': False,
823                 'babar/z/tip/coucou.bin': False,
824                 'babar/toto.bin': False,
825                 'zozo/zuzu/tip/babar.tpl': True,
826                 'zozo/tip/babar.tpl': True,
827                 'Donotwant': False}
828        self.assertGlobMatch(glob, spec)
829
830    def test_recursive_glob_tail(self):
831        glob = os.path.join('babar', '**')
832        spec = {'babar/zaza/': True,
833                'babar/zaza/zuzu/': True,
834                'babar/zaza/zuzu/babar.xml': True,
835                'babar/zaza/zuzu/toto.xml': True,
836                'babar/zaza/zuzu/toto.csv': True,
837                'babar/zaza/coucou.tpl': True,
838                'babar/bubu.tpl': True,
839                'zozo/zuzu/tip/babar.tpl': False,
840                'zozo/tip/babar.tpl': False,
841                'Donotwant': False}
842        self.assertGlobMatch(glob, spec)
843
844    def test_recursive_glob_middle(self):
845        glob = os.path.join('babar', '**', 'tip', '*.t?l')
846        spec = {'babar/zaza/zuzu/tip/coucou.tpl': True,
847                 'babar/z/tip/coucou.tpl': True,
848                 'babar/tip/coucou.tpl': True,
849                 'babar/zeop/tip/babar/babar.tpl': False,
850                 'babar/z/tip/coucou.bin': False,
851                 'babar/toto.bin': False,
852                 'zozo/zuzu/tip/babar.tpl': False,
853                 'zozo/tip/babar.tpl': False,
854                 'Donotwant': False}
855        self.assertGlobMatch(glob, spec)
856
857    def test_glob_set_tail(self):
858        glob = os.path.join('bin', '*.{bin,sh,exe}')
859        spec = {'bin/babar.bin': True,
860                 'bin/zephir.sh': True,
861                 'bin/celestine.exe': True,
862                 'bin/cornelius.bat': False,
863                 'bin/cornelius.xml': False,
864                 'toto/yurg': False,
865                 'Donotwant': False}
866        self.assertGlobMatch(glob, spec)
867
868    def test_glob_set_middle(self):
869        glob = os.path.join('xml', '{babar,toto}.xml')
870        spec = {'xml/babar.xml': True,
871                 'xml/toto.xml': True,
872                 'xml/babar.xslt': False,
873                 'xml/cornelius.sgml': False,
874                 'xml/zephir.xml': False,
875                 'toto/yurg.xml': False,
876                 'Donotwant': False}
877        self.assertGlobMatch(glob, spec)
878
879    def test_glob_set_head(self):
880        glob = os.path.join('{xml,xslt}', 'babar.*')
881        spec = {'xml/babar.xml': True,
882                 'xml/toto.xml': False,
883                 'xslt/babar.xslt': True,
884                 'xslt/toto.xslt': False,
885                 'toto/yurg.xml': False,
886                 'Donotwant': False}
887        self.assertGlobMatch(glob, spec)
888
889    def test_glob_all(self):
890        dirs = '{%s,%s}' % (os.path.join('xml', '*'),
891                            os.path.join('xslt', '**'))
892        glob = os.path.join(dirs, 'babar.xml')
893        spec = {'xml/a/babar.xml': True,
894                 'xml/b/babar.xml': True,
895                 'xml/a/c/babar.xml': False,
896                 'xslt/a/babar.xml': True,
897                 'xslt/b/babar.xml': True,
898                 'xslt/a/c/babar.xml': True,
899                 'toto/yurg.xml': False,
900                 'Donotwant': False}
901        self.assertGlobMatch(glob, spec)
902
903    def test_invalid_glob_pattern(self):
904        invalids = [
905            'ppooa**',
906            'azzaeaz4**/',
907            '/**ddsfs',
908            '**##1e"&e',
909            'DSFb**c009',
910            '{',
911            '{aaQSDFa',
912            '}',
913            'aQSDFSaa}',
914            '{**a,',
915            ',**a}',
916            '{a**,',
917            ',b**}',
918            '{a**a,babar}',
919            '{bob,b**z}',
920        ]
921        for pattern in invalids:
922            self.assertRaises(ValueError, iglob, pattern)
923
924    def test_parse_requirement(self):
925        # Empty requirements
926        for empty in ('', '#this should be ignored'):
927            self.assertIsNone(parse_requirement(empty))
928
929        # Invalid requirements
930        for invalid in ('a (', 'a/', 'a$', 'a [', 'a () [],', 'a 1.2'):
931            self.assertRaises(SyntaxError, parse_requirement, invalid)
932
933        # Valid requirements
934        def validate(r, values):
935            self.assertEqual(r.name, values[0])
936            self.assertEqual(r.constraints, values[1])
937            self.assertEqual(r.extras, values[2])
938            self.assertEqual(r.requirement, values[3])
939            self.assertEqual(r.url, values[4])
940
941        r = parse_requirement('a')
942        validate(r, ('a', None, None, 'a', None))
943        r = parse_requirement('a >= 1.2, <2.0,!=1.7')
944        validate(r, ('a', [('>=', '1.2'), ('<', '2.0'), ('!=', '1.7')], None,
945                     'a >= 1.2, < 2.0, != 1.7', None))
946        r = parse_requirement('a [ab,cd , ef] >= 1.2, <2.0')
947        validate(r, ('a', [('>=', '1.2'), ('<', '2.0')], ['ab', 'cd', 'ef'],
948                     'a >= 1.2, < 2.0', None))
949        r = parse_requirement('a[]')
950        validate(r, ('a', None, None, 'a', None))
951        r = parse_requirement('a (== 1.2.*, != 1.2.1.*)')
952        validate(r, ('a', [('==', '1.2.*'), ('!=', '1.2.1.*')], None,
953                 'a == 1.2.*, != 1.2.1.*', None))
954        r = parse_requirement('a @ http://domain.com/path#abc=def')
955        validate(r, ('a', None, None, 'a', 'http://domain.com/path#abc=def'))
956        # See issue #148
957        r = parse_requirement('a >=3.6')
958        validate(r, ('a', [('>=', '3.6')], None, 'a >= 3.6', None))
959        r = parse_requirement('a >=3.6,')
960        validate(r, ('a', [('>=', '3.6')], None, 'a >= 3.6', None))
961
962        if False: # TODO re-enable
963            for e in ('*', ':*:', ':meta:', '-', '-abc'):
964                r = parse_requirement('a [%s]' % e)
965                validate(r, ('a', None, [e], 'a', None))
966
967    def test_write_exports(self):
968        exports = {
969            'foo': {
970                'v1': ExportEntry('v1', 'p1', 's1', []),
971                'v2': ExportEntry('v2', 'p2', 's2', ['f2=a', 'g2']),
972            },
973            'bar': {
974                'v3': ExportEntry('v3', 'p3', 's3', ['f3', 'g3=h']),
975                'v4': ExportEntry('v4', 'p4', 's4', ['f4', 'g4']),
976            },
977        }
978
979        fd, fn = tempfile.mkstemp()
980        try:
981            os.close(fd)
982            with open(fn, 'wb') as f:
983                write_exports(exports, f)
984            with open(fn, 'rb') as f:
985                actual = read_exports(f)
986            self.assertEqual(actual, exports)
987        finally:
988            os.remove(fn)
989
990    def test_get_extras(self):
991        cases = (
992            (['*'], ['i18n'], set(['i18n'])),
993            (['*', '-bar'], ['foo', 'bar'], set(['foo'])),
994        )
995        for requested, available, expected in cases:
996            actual = get_extras(requested, available)
997            self.assertEqual(actual, expected)
998if __name__ == '__main__':  # pragma: no cover
999    unittest.main()
1000