1# -*- coding: utf-8 -*- 2# 3# Copyright (C) 2012-2013 Vinay Sajip. 4# Licensed to the Python Software Foundation under a contributor agreement. 5# See LICENSE.txt and CONTRIBUTORS.txt. 6# 7from io import BytesIO 8from itertools import islice 9import os 10import re 11import shutil 12try: 13 import ssl 14except ImportError: 15 ssl = None 16import sys 17import tempfile 18import textwrap 19import time 20 21from compat import unittest 22from support import TempdirManager, DistlibTestCase, in_github_workflow 23 24from distlib import DistlibException 25from distlib.compat import cache_from_source 26from distlib.util import (get_export_entry, ExportEntry, resolve, 27 get_cache_base, path_to_cache_dir, zip_dir, 28 parse_credentials, ensure_slash, split_filename, 29 EventMixin, Sequencer, unarchive, Progress, 30 iglob, RICH_GLOB, parse_requirement, get_extras, 31 Configurator, read_exports, write_exports, 32 FileOperator, is_string_sequence, get_package_data, 33 convert_path) 34 35 36HERE = os.path.dirname(os.path.abspath(__file__)) 37IN_GITHUB_WORKFLOW = in_github_workflow() 38 39class TestContainer(object): 40 def __init__(self, *args, **kwargs): 41 self.args = args 42 self.kwargs = kwargs 43 44 45class UtilTestCase(DistlibTestCase): 46 def check_entry(self, entry, name, prefix, suffix, flags): 47 self.assertEqual(entry.name, name) 48 self.assertEqual(entry.prefix, prefix) 49 self.assertEqual(entry.suffix, suffix) 50 self.assertEqual(entry.flags, flags) 51 52 def test_export_entry(self): 53 self.assertIsNone(get_export_entry('foo.py')) 54 self.assertIsNone(get_export_entry('foo.py=')) 55 for spec in ('foo=foo:main', 'foo =foo:main', 'foo= foo:main', 56 'foo = foo:main'): 57 self.check_entry(get_export_entry(spec), 58 'foo', 'foo', 'main', []) 59 self.check_entry(get_export_entry('foo=foo.bar:main'), 60 'foo', 'foo.bar', 'main', []) 61 self.check_entry(get_export_entry('foo=foo.bar:main [a]'), 62 'foo', 'foo.bar', 'main', ['a']) 63 # See issue #127 - allow hyphens 64 self.check_entry(get_export_entry('foo=foo.bar:main [with-foo]'), 65 'foo', 'foo.bar', 'main', ['with-foo']) 66 self.check_entry(get_export_entry('foo=foo.bar:main [ a ]'), 67 'foo', 'foo.bar', 'main', ['a']) 68 self.check_entry(get_export_entry('foo=foo.bar:main [a=b, c=d,e, f=g]'), 69 'foo', 'foo.bar', 'main', ['a=b', 'c=d', 'e', 'f=g']) 70 self.check_entry(get_export_entry('foo=foo.bar:main [a=9, 9=8,e, f9=g8]'), 71 'foo', 'foo.bar', 'main', ['a=9', '9=8', 'e', 'f9=g8']) 72 self.check_entry(get_export_entry('foo=foo.bar:main[x]'), 73 'foo', 'foo.bar', 'main', ['x']) 74 self.check_entry(get_export_entry('foo=abc'), 'foo', 'abc', None, []) 75 self.check_entry(get_export_entry('smc++ = smcpp.frontend:console'), 'smc++', 76 'smcpp.frontend', 'console', []) 77 self.assertRaises(DistlibException, get_export_entry, 'foo=foo.bar:x:y') 78 self.assertRaises(DistlibException, get_export_entry, 'foo=foo.bar:x [') 79 self.assertRaises(DistlibException, get_export_entry, 'foo=foo.bar:x ]') 80 self.assertRaises(DistlibException, get_export_entry, 'foo=foo.bar:x []') 81 self.assertRaises(DistlibException, get_export_entry, 'foo=foo.bar:x [\\]') 82 self.assertRaises(DistlibException, get_export_entry, 'foo=foo.bar:x [a=]') 83 self.assertRaises(DistlibException, get_export_entry, 'foo=foo.bar:x [a,]') 84 self.assertRaises(DistlibException, get_export_entry, 'foo=foo.bar:x [a,,b]') 85 self.assertRaises(DistlibException, get_export_entry, 'foo=foo.bar:x [a b]') 86 87 def test_resolve(self): 88 import logging 89 import logging.handlers 90 self.assertIs(resolve('logging', None), logging) 91 self.assertIs(resolve('logging.handlers', None), logging.handlers) 92 self.assertIs(resolve('logging', 'root'), logging.root) 93 self.assertEqual(resolve('logging', 'root.debug'), logging.root.debug) 94 95 def test_cache_base(self): 96 actual = get_cache_base() 97 if os.name == 'nt' and 'LOCALAPPDATA' in os.environ: 98 expected = os.path.expandvars('$localappdata') 99 else: 100 expected = os.path.expanduser('~') 101 expected = os.path.join(expected, '.distlib') 102 self.assertEqual(expected, actual) 103 self.assertTrue(os.path.isdir(expected)) 104 105 @unittest.skipIf(os.name != 'posix', 'Test is only valid for POSIX') 106 def test_path_to_cache_dir_posix(self): 107 self.assertEqual(path_to_cache_dir('/home/user/some-file.zip'), 108 '--home--user--some-file.zip.cache') 109 110 @unittest.skipIf(os.name != 'nt', 'Test is only valid for Windows') 111 def test_path_to_cache_dir_nt(self): 112 self.assertEqual(path_to_cache_dir(r'c:\Users\User\Some-File.zip'), 113 'c-----Users--User--Some-File.zip.cache') 114 115 def test_parse_credentials(self): 116 cases = ( 117 ('example.com', (None, None, 'example.com')), 118 ('user@example.com', ('user', None, 'example.com')), 119 ('user:pwd@example.com', ('user', 'pwd', 'example.com')), 120 ('user:@example.com', ('user', '', 'example.com')), 121 ('user:pass@word@example.com', ('user', 'pass@word', 'example.com')), 122 ('user:pass:word@example.com', ('user', 'pass:word', 'example.com')), 123 ('user%3Aname:%23%5E%40@example.com', ('user:name', '#^@', 'example.com')), 124 ) 125 126 for s, expected in cases: 127 self.assertEqual(parse_credentials(s), expected) 128 129 def test_ensure_slash(self): 130 self.assertEqual(ensure_slash(''), '/') 131 self.assertEqual(ensure_slash('/'), '/') 132 self.assertEqual(ensure_slash('abc'), 'abc/') 133 self.assertEqual(ensure_slash('def/'), 'def/') 134 135 def test_split_filename(self): 136 self.assertIsNone(split_filename('abl.jquery')) 137 self.assertEqual(split_filename('abl.jquery-1.4.2-2'), 138 ('abl.jquery', '1.4.2-2', None)) 139 self.assertEqual(split_filename('python-gnupg-0.1'), 140 ('python-gnupg', '0.1', None)) 141 self.assertEqual(split_filename('baklabel-1.0.3-2729-py3.2'), 142 ('baklabel', '1.0.3-2729', '3.2')) 143 self.assertEqual(split_filename('baklabel-1.0.3-2729-py27'), 144 ('baklabel', '1.0.3-2729', '27')) 145 self.assertEqual(split_filename('advpy-0.99b'), 146 ('advpy', '0.99b', None)) 147 self.assertEqual(split_filename('asv_files-dev-20120501-01', 'asv_files'), 148 ('asv_files', 'dev-20120501-01', None)) 149 self.assertEqual(split_filename('greenlet-0.4.0-py27-win32'), 150 ('greenlet', '0.4.0', '27')) 151 self.assertEqual(split_filename('greenlet-0.4.0-py27-linux_x86_64'), 152 ('greenlet', '0.4.0', '27')) 153 self.assertEqual(split_filename('django-altuser-v0.6.8'), 154 ('django-altuser', 'v0.6.8', None)) 155 self.assertEqual(split_filename('youtube_dl_server-alpha.1'), 156 ('youtube_dl_server', 'alpha.1', None)) 157 self.assertEqual(split_filename('pytest-xdist-dev'), 158 ('pytest-xdist', 'dev', None)) 159 self.assertEqual(split_filename('pytest_xdist-0.1_myfork', None), 160 ('pytest_xdist', '0.1_myfork', None)) 161 self.assertEqual(split_filename('pytest_xdist-0.1_myfork', 162 'pytest-xdist'), 163 ('pytest_xdist', '0.1_myfork', None)) 164 self.assertEqual(split_filename('pytest_xdist-0.1_myfork', 165 'pytest_dist'), 166 ('pytest_xdist', '0.1_myfork', None)) 167 168 def test_convert_path(self): 169 CP = convert_path 170 if os.sep == '/': 171 d = os.path.dirname(__file__) 172 self.assertEqual(CP(d), d) 173 else: 174 self.assertEqual(CP(''), '') 175 self.assertRaises(ValueError, CP, '/foo') 176 self.assertRaises(ValueError, CP, 'foo/') 177 178 def test_events(self): 179 collected = [] 180 181 def handler1(e, *args, **kwargs): 182 collected.append((1, e, args, kwargs)) 183 184 def handler2(e, *args, **kwargs): 185 collected.append((2, e, args, kwargs)) 186 187 def handler3(e, *args, **kwargs): 188 if not args: 189 raise NotImplementedError('surprise!') 190 collected.append((3, e, args, kwargs)) 191 return (args, kwargs) 192 193 e = EventMixin() 194 e.add('A', handler1) 195 self.assertRaises(ValueError, e.remove, 'B', handler1) 196 197 cases = ( 198 ((1, 2), {'buckle': 'my shoe'}), 199 ((3, 4), {'shut': 'the door'}), 200 ) 201 202 for case in cases: 203 e.publish('A', *case[0], **case[1]) 204 e.publish('B', *case[0], **case[1]) 205 206 for actual, source in zip(collected, cases): 207 self.assertEqual(actual, (1, 'A') + source[:1] + source[1:]) 208 209 collected = [] 210 e.add('B', handler2) 211 212 self.assertEqual(tuple(e.get_subscribers('A')), (handler1,)) 213 self.assertEqual(tuple(e.get_subscribers('B')), (handler2,)) 214 self.assertEqual(tuple(e.get_subscribers('C')), ()) 215 216 for case in cases: 217 e.publish('A', *case[0], **case[1]) 218 e.publish('B', *case[0], **case[1]) 219 220 actuals = islice(collected, 0, None, 2) 221 for actual, source in zip(actuals, cases): 222 self.assertEqual(actual, (1, 'A') + source[:1] + source[1:]) 223 224 actuals = islice(collected, 1, None, 2) 225 for actual, source in zip(actuals, cases): 226 self.assertEqual(actual, (2, 'B') + source[:1] + source[1:]) 227 228 e.remove('B', handler2) 229 230 collected = [] 231 232 for case in cases: 233 e.publish('A', *case[0], **case[1]) 234 e.publish('B', *case[0], **case[1]) 235 236 for actual, source in zip(collected, cases): 237 self.assertEqual(actual, (1, 'A') + source[:1] + source[1:]) 238 239 e.add('C', handler3) 240 241 collected = [] 242 returned = [] 243 244 for case in cases: 245 returned.extend(e.publish('C', *case[0], **case[1])) 246 returned.extend(e.publish('C')) 247 248 for actual, source in zip(collected, cases): 249 self.assertEqual(actual, (3, 'C') + source[:1] + source[1:]) 250 251 self.assertEqual(tuple(islice(returned, 1, None, 2)), (None, None)) 252 actuals = islice(returned, 0, None, 2) 253 for actual, expected in zip(actuals, cases): 254 self.assertEqual(actual, expected) 255 256 def test_sequencer_basic(self): 257 seq = Sequencer() 258 259 steps = ( 260 ('check', 'sdist'), 261 ('check', 'register'), 262 ('check', 'sdist'), 263 ('check', 'register'), 264 ('register', 'upload_sdist'), 265 ('sdist', 'upload_sdist'), 266 ('check', 'build_clibs'), 267 ('build_clibs', 'build_ext'), 268 ('build_ext', 'build_py'), 269 ('build_py', 'build_scripts'), 270 ('build_scripts', 'build'), 271 ('build', 'test'), 272 ('register', 'upload_bdist'), 273 ('build', 'upload_bdist'), 274 ('build', 'install_headers'), 275 ('install_headers', 'install_lib'), 276 ('install_lib', 'install_scripts'), 277 ('install_scripts', 'install_data'), 278 ('install_data', 'install_distinfo'), 279 ('install_distinfo', 'install') 280 ) 281 282 for pred, succ in steps: 283 seq.add(pred, succ) 284 285 # Note: these tests are sensitive to dictionary ordering 286 # but work under Python 2.6, 2.7, 3.2, 3.3, 3.4 and PyPy 2.5 287 cases = ( 288 ('check', ['check']), 289 ('register', ['check', 'register']), 290 ('sdist', ['check', 'sdist']), 291 ('build_clibs', ['check', 'build_clibs']), 292 ('build_ext', ['check', 'build_clibs', 'build_ext']), 293 ('build_py', ['check', 'build_clibs', 'build_ext', 'build_py']), 294 ('build_scripts', ['check', 'build_clibs', 'build_ext', 'build_py', 295 'build_scripts']), 296 ('build', ['check', 'build_clibs', 'build_ext', 'build_py', 297 'build_scripts', 'build']), 298 ('test', ['check', 'build_clibs', 'build_ext', 'build_py', 299 'build_scripts', 'build', 'test']), 300 ('install_headers', ['check', 'build_clibs', 'build_ext', 301 'build_py', 'build_scripts', 'build', 302 'install_headers']), 303 ('install_lib', ['check', 'build_clibs', 'build_ext', 'build_py', 304 'build_scripts', 'build', 'install_headers', 305 'install_lib']), 306 ('install_scripts', ['check', 'build_clibs', 'build_ext', 307 'build_py', 'build_scripts', 'build', 308 'install_headers', 'install_lib', 309 'install_scripts']), 310 ('install_data', ['check', 'build_clibs', 'build_ext', 'build_py', 311 'build_scripts', 'build', 'install_headers', 312 'install_lib', 'install_scripts', 313 'install_data']), 314 ('install_distinfo', ['check', 'build_clibs', 'build_ext', 315 'build_py', 'build_scripts', 'build', 316 'install_headers', 'install_lib', 317 'install_scripts', 'install_data', 318 'install_distinfo']), 319 ('install', ['check', 'build_clibs', 'build_ext', 'build_py', 320 'build_scripts', 'build', 'install_headers', 321 'install_lib', 'install_scripts', 'install_data', 322 'install_distinfo', 'install']), 323 ('upload_sdist', (['check', 'register', 'sdist', 'upload_sdist'], 324 ['check', 'sdist', 'register', 'upload_sdist'])), 325 ('upload_bdist', (['check', 'build_clibs', 'build_ext', 'build_py', 326 'build_scripts', 'build', 'register', 327 'upload_bdist'], 328 ['check', 'build_clibs', 'build_ext', 'build_py', 329 'build_scripts', 'register', 'build', 330 'upload_bdist'])), 331 ) 332 333 for final, expected in cases: 334 actual = list(seq.get_steps(final)) 335 if isinstance(expected, tuple): 336 self.assertIn(actual, expected) 337 else: 338 self.assertEqual(actual, expected) 339 340 dot = seq.dot 341 expected = ''' 342 digraph G { 343 check -> build_clibs; 344 install_lib -> install_scripts; 345 register -> upload_bdist; 346 build -> upload_bdist; 347 build_ext -> build_py; 348 install_scripts -> install_data; 349 check -> sdist; 350 check -> register; 351 build -> install_headers; 352 install_data -> install_distinfo; 353 sdist -> upload_sdist; 354 register -> upload_sdist; 355 install_distinfo -> install; 356 build -> test; 357 install_headers -> install_lib; 358 build_py -> build_scripts; 359 build_clibs -> build_ext; 360 build_scripts -> build; 361 } 362 ''' 363 expected = textwrap.dedent(expected).strip().splitlines() 364 actual = dot.splitlines() 365 self.assertEqual(expected[0], actual[0]) 366 self.assertEqual(expected[-1], actual[-1]) 367 self.assertEqual(set(expected[1:-1]), set(actual[1:-1])) 368 actual = seq.strong_connections 369 expected = ( 370 [ 371 ('test',), ('upload_bdist',), ('install',), 372 ('install_distinfo',), ('install_data',), ('install_scripts',), 373 ('install_lib',), ('install_headers',), ('build',), 374 ('build_scripts',), ('build_py',), ('build_ext',), 375 ('build_clibs',), ('upload_sdist',), ('sdist',), ('register',), 376 ('check',) 377 ], 378 [ 379 ('install',), ('install_distinfo',), ('install_data',), 380 ('install_scripts',), ('install_lib',), ('install_headers',), 381 ('test',), ('upload_bdist',), ('build',), ('build_scripts',), 382 ('build_py',), ('build_ext',), ('build_clibs',), 383 ('upload_sdist',), ('sdist',), ('register',), ('check',) 384 ], 385 [ 386 ('upload_sdist',), ('sdist',), ('install',), 387 ('install_distinfo',), ('install_data',), ('upload_bdist',), 388 ('register',), ('install_scripts',), ('install_lib',), 389 ('install_headers',), ('test',), ('build',), 390 ('build_scripts',), ('build_py',), ('build_ext',), 391 ('build_clibs',), ('check',) 392 ], 393 # Next case added for PyPy 394 [ 395 ('upload_sdist',), ('sdist',), ('upload_bdist',), ('register',), 396 ('test',), ('install',), ('install_distinfo',), 397 ('install_data',), ('install_scripts',), ('install_lib',), 398 ('install_headers',), ('build',), ('build_scripts',), 399 ('build_py',), ('build_ext',), ('build_clibs',), ('check',) 400 ], 401 # Next case added for Python 3.6 402 [ 403 ('upload_sdist',), ('sdist',), ('upload_bdist',), ('register',), 404 ('install',), ('install_distinfo',), ('install_data',), 405 ('install_scripts',), ('install_lib',), ('install_headers',), 406 ('test',), ('build',), ('build_scripts',), ('build_py',), 407 ('build_ext',), ('build_clibs',), ('check',) 408 ] 409 ) 410 self.assertIn(actual, expected) 411 412 def test_sequencer_cycle(self): 413 seq = Sequencer() 414 seq.add('A', 'B') 415 seq.add('B', 'C') 416 seq.add('C', 'D') 417 self.assertEqual(list(seq.get_steps('D')), ['A', 'B', 'C', 'D']) 418 seq.add('C', 'A') 419 self.assertEqual(list(seq.get_steps('D')), ['C', 'A', 'B', 'D']) 420 self.assertFalse(seq.is_step('E')) 421 self.assertRaises(ValueError, seq.get_steps, 'E') 422 seq.add_node('E') 423 self.assertTrue(seq.is_step('E')) 424 self.assertEqual(list(seq.get_steps('E')), ['E']) 425 seq.remove_node('E') 426 self.assertFalse(seq.is_step('E')) 427 self.assertRaises(ValueError, seq.get_steps, 'E') 428 seq.remove('C', 'A') 429 self.assertEqual(list(seq.get_steps('D')), ['A', 'B', 'C', 'D']) 430 431 def test_sequencer_removal(self): 432 seq = Sequencer() 433 seq.add('A', 'B') 434 seq.add('B', 'C') 435 seq.add('C', 'D') 436 preds = { 437 'B': set(['A']), 438 'C': set(['B']), 439 'D': set(['C']) 440 } 441 succs = { 442 'A': set(['B']), 443 'B': set(['C']), 444 'C': set(['D']) 445 } 446 self.assertEqual(seq._preds, preds) 447 self.assertEqual(seq._succs, succs) 448 seq.remove_node('C') 449 self.assertEqual(seq._preds, preds) 450 self.assertEqual(seq._succs, succs) 451 seq.remove_node('C', True) 452 self.assertEqual(seq._preds, {'B': set(['A'])}) 453 self.assertEqual(seq._succs, {'A': set(['B'])}) 454 455 def test_unarchive(self): 456 import zipfile, tarfile 457 458 good_archives = ( 459 ('good.zip', zipfile.ZipFile, 'r', 'namelist'), 460 ('good.tar', tarfile.open, 'r', 'getnames'), 461 ('good.tar.gz', tarfile.open, 'r:gz', 'getnames'), 462 ('good.tar.bz2', tarfile.open, 'r:bz2', 'getnames'), 463 ) 464 bad_archives = ('bad.zip', 'bad.tar', 'bad.tar.gz', 'bad.tar.bz2') 465 466 for name, cls, mode, lister in good_archives: 467 td = tempfile.mkdtemp() 468 archive = None 469 try: 470 name = os.path.join(HERE, name) 471 unarchive(name, td) 472 archive = cls(name, mode) 473 names = getattr(archive, lister)() 474 for name in names: 475 p = os.path.join(td, name) 476 self.assertTrue(os.path.exists(p)) 477 finally: 478 shutil.rmtree(td) 479 if archive: 480 archive.close() 481 482 for name in bad_archives: 483 name = os.path.join(HERE, name) 484 td = tempfile.mkdtemp() 485 try: 486 self.assertRaises(ValueError, unarchive, name, td) 487 finally: 488 shutil.rmtree(td) 489 490 def test_string_sequence(self): 491 self.assertTrue(is_string_sequence(['a'])) 492 self.assertTrue(is_string_sequence(['a', 'b'])) 493 self.assertFalse(is_string_sequence(['a', 'b', None])) 494 self.assertRaises(AssertionError, is_string_sequence, []) 495 496 @unittest.skipIf('SKIP_ONLINE' in os.environ, 'Skipping online test') 497 @unittest.skipUnless(ssl, 'SSL required for this test.') 498 def test_package_data(self): 499 data = get_package_data(name='config', version='0.3.6') 500 self.assertTrue(data) 501 self.assertTrue('index-metadata' in data) 502 metadata = data['index-metadata'] 503 self.assertEqual(metadata['name'], 'config') 504 self.assertEqual(metadata['version'], '0.3.6') 505 data = get_package_data(name='config', version='0.3.5') 506 self.assertFalse(data) 507 508 def test_zip_dir(self): 509 d = os.path.join(HERE, 'foofoo') 510 data = zip_dir(d) 511 self.assertIsInstance(data, BytesIO) 512 513 def test_configurator(self): 514 d = { 515 'a': 1, 516 'b': 2.0, 517 'c': 'xyz', 518 'd': 'inc://' + os.path.join(HERE, 'included.json'), 519 'e': 'inc://' + 'included.json', 520 'stderr': 'ext://sys.stderr', 521 'list_o_stuff': [ 522 'cfg://stderr', 523 'ext://sys.stdout', 524 'ext://logging.NOTSET', 525 ], 526 'dict_o_stuff': { 527 'k1': 'cfg://list_o_stuff[1]', 528 'k2': 'abc', 529 'k3': 'cfg://list_o_stuff', 530 }, 531 'another_dict_o_stuff': { 532 'k1': 'cfg://dict_o_stuff[k2]', 533 'k2': 'ext://re.I', 534 'k3': 'cfg://dict_o_stuff[k3][0]', 535 }, 536 'custom': { 537 '()': __name__ + '.TestContainer', 538 '[]': [1, 'a', 2.0, ('b', 'c', 'd')], 539 '.': { 540 'p1': 'a', 541 'p2': 'b', 542 'p3': { 543 '()' : __name__ + '.TestContainer', 544 '[]': [1, 2], 545 '.': { 546 'p1': 'c', 547 }, 548 }, 549 }, 550 'k1': 'v1', 551 'k2': 'v2', 552 } 553 } 554 555 cfg = Configurator(d, HERE) 556 self.assertEqual(cfg['a'], 1) 557 self.assertEqual(cfg['b'], 2.0) 558 self.assertEqual(cfg['c'], 'xyz') 559 self.assertIs(cfg['stderr'], sys.stderr) 560 self.assertIs(cfg['list_o_stuff'][0], sys.stderr) 561 self.assertIs(cfg['list_o_stuff'][1], sys.stdout) 562 self.assertIs(cfg['list_o_stuff'][-1], 0) # logging.NOTSET == 0 563 self.assertIs(cfg['dict_o_stuff']['k1'], sys.stdout) 564 self.assertIs(cfg['another_dict_o_stuff']['k1'], 'abc') 565 self.assertIs(cfg['another_dict_o_stuff']['k2'], re.I) 566 self.assertIs(cfg['another_dict_o_stuff']['k3'], sys.stderr) 567 custom = cfg['custom'] 568 self.assertIsInstance(custom, TestContainer) 569 self.assertEqual(custom.args, (1, 'a', 2.0, ('b', 'c', 'd'))) 570 self.assertEqual(custom.kwargs, {'k1': 'v1', 'k2': 'v2'}) 571 self.assertEqual(custom.p1, 'a') 572 self.assertEqual(custom.p2, 'b') 573 self.assertIsInstance(custom.p3, TestContainer) 574 self.assertEqual(custom.p3.args, (1, 2)) 575 self.assertEqual(custom.p3.kwargs, {}) 576 self.assertEqual(custom.p3.p1, 'c') 577 self.assertEqual(cfg['d'], {'foo': 'bar', 'bar': 'baz'}) 578 self.assertEqual(cfg['e'], {'foo': 'bar', 'bar': 'baz'}) 579 580 581def _speed_range(min_speed, max_speed): 582 return tuple(['%d KB/s' % v for v in range(min_speed, 583 max_speed + 1)]) 584 585def _eta_range(min_eta, max_eta, prefix='ETA '): 586 msg = prefix + ': 00:00:%02d' 587 return tuple([msg % v for v in range(min_eta, max_eta + 1)]) 588 589class ProgressTestCase(DistlibTestCase): 590 # Of late, the speed tests keep failing on AppVeyor and Windows 591 @unittest.skipIf(IN_GITHUB_WORKFLOW or (os.name == 'nt' and 592 os.environ.get('APPVEYOR') == 'True'), 593 'Test disabled on some environments due to performance') 594 def test_basic(self): 595 596 # These ranges may need tweaking to cater for especially slow 597 # machines 598 if os.name == 'nt': 599 speed1 = _speed_range(18, 20) 600 speed2 = _speed_range(20, 22) 601 else: 602 speed1 = _speed_range(16, 19) 603 speed2 = _speed_range(20, 22) 604 expected = ( 605 (' 10 %', _eta_range(4, 7), speed1), 606 (' 20 %', _eta_range(4, 7), speed1), 607 (' 30 %', _eta_range(3, 4), speed1), 608 (' 40 %', _eta_range(3, 3), speed1), 609 (' 50 %', _eta_range(2, 2), speed1), 610 (' 60 %', _eta_range(2, 2), speed1), 611 (' 70 %', _eta_range(1, 1), speed1), 612 (' 80 %', _eta_range(1, 1), speed1), 613 (' 90 %', _eta_range(0, 0), speed1), 614 ('100 %', _eta_range(4, 5, 'Done'), speed2), 615 ) 616 bar = Progress(maxval=100000).start() 617 for i, v in enumerate(range(10000, 100000, 10000)): 618 time.sleep(0.5) 619 bar.update(v) 620 p, e, s = expected[i] 621 self.assertEqual(bar.percentage, p) 622 self.assertIn(bar.ETA, e, p) 623 self.assertIn(bar.speed, s) 624 bar.stop() 625 p, e, s = expected[i + 1] 626 self.assertEqual(bar.percentage, p) 627 self.assertIn(bar.ETA, e, p) 628 self.assertIn(bar.speed, s) 629 630 # Of late, the speed tests keep failing on AppVeyor and Windows 631 @unittest.skipIf(IN_GITHUB_WORKFLOW or (os.name == 'nt' and 632 os.environ.get('APPVEYOR') == 'True'), 633 'Test disabled on some environments due to performance') 634 def test_unknown(self): 635 if os.name == 'nt': 636 speed = _speed_range(17, 20) 637 else: 638 speed = _speed_range(17, 19) 639 expected = ( 640 (' ?? %', 'ETA : ??:??:??', speed), 641 (' ?? %', 'ETA : ??:??:??', speed), 642 (' ?? %', 'ETA : ??:??:??', speed), 643 (' ?? %', 'ETA : ??:??:??', speed), 644 (' ?? %', 'ETA : ??:??:??', speed), 645 (' ?? %', 'ETA : ??:??:??', speed), 646 (' ?? %', 'ETA : ??:??:??', speed), 647 (' ?? %', 'ETA : ??:??:??', speed), 648 (' ?? %', 'ETA : ??:??:??', speed), 649 ('100 %', 'Done: 00:00:04', speed), 650 ) 651 bar = Progress(maxval=None).start() 652 for i, v in enumerate(range(10000, 100000, 10000)): 653 time.sleep(0.5) 654 bar.update(v) 655 p, e, s = expected[i] 656 self.assertEqual(bar.percentage, p) 657 self.assertEqual(bar.ETA, e) 658 self.assertIn(bar.speed, s) 659 bar.stop() 660 p, e, s = expected[i + 1] 661 self.assertEqual(bar.percentage, p) 662 self.assertEqual(bar.ETA, e) 663 self.assertIn(bar.speed, s) 664 665class FileOpsTestCase(DistlibTestCase): 666 667 def setUp(self): 668 self.fileop = FileOperator() 669 self.workdir = tempfile.mkdtemp() 670 671 def tearDown(self): 672 if os.path.isdir(self.workdir): 673 shutil.rmtree(self.workdir) 674 675 def test_ensure_dir(self): 676 td = self.workdir 677 os.rmdir(td) 678 self.fileop.ensure_dir(td) 679 self.assertTrue(os.path.exists(td)) 680 self.fileop.dry_run = True 681 os.rmdir(td) 682 self.fileop.ensure_dir(td) 683 self.assertFalse(os.path.exists(td)) 684 685 def test_ensure_removed(self): 686 td = self.workdir 687 self.assertTrue(os.path.exists(td)) 688 self.fileop.dry_run = True 689 self.fileop.ensure_removed(td) 690 self.assertTrue(os.path.exists(td)) 691 self.fileop.dry_run = False 692 self.fileop.ensure_removed(td) 693 self.assertFalse(os.path.exists(td)) 694 695 def test_is_writable(self): 696 sd = 'subdir' 697 ssd = 'subsubdir' 698 path = os.path.join(self.workdir, sd, ssd) 699 os.makedirs(path) 700 path = os.path.join(path, 'test') 701 self.assertTrue(self.fileop.is_writable(path)) 702 if os.name == 'posix': 703 self.assertFalse(self.fileop.is_writable('/etc')) 704 705 def test_byte_compile(self): 706 path = os.path.join(self.workdir, 'hello.py') 707 dpath = cache_from_source(path, True) 708 self.fileop.write_text_file(path, 'print("Hello, world!")', 'utf-8') 709 self.fileop.byte_compile(path, optimize=False) 710 self.assertTrue(os.path.exists(dpath)) 711 712 def write_some_files(self): 713 path = os.path.join(self.workdir, 'file1') 714 written = [] 715 self.fileop.write_text_file(path, 'test', 'utf-8') 716 written.append(path) 717 path = os.path.join(self.workdir, 'file2') 718 self.fileop.copy_file(written[0], path) 719 written.append(path) 720 path = os.path.join(self.workdir, 'dir1') 721 self.fileop.ensure_dir(path) 722 return set(written), set([path]) 723 724 def test_copy_check(self): 725 srcpath = os.path.join(self.workdir, 'file1') 726 self.fileop.write_text_file(srcpath, 'test', 'utf-8') 727 dstpath = os.path.join(self.workdir, 'file2') 728 os.mkdir(dstpath) 729 self.assertRaises(ValueError, self.fileop.copy_file, srcpath, 730 dstpath) 731 os.rmdir(dstpath) 732 if os.name == 'posix': # symlinks available 733 linkpath = os.path.join(self.workdir, 'file3') 734 self.fileop.write_text_file(linkpath, 'linkdest', 'utf-8') 735 os.symlink(linkpath, dstpath) 736 self.assertRaises(ValueError, self.fileop.copy_file, srcpath, 737 dstpath) 738 739 def test_commit(self): 740 # will assert if record isn't set 741 self.assertRaises(AssertionError, self.fileop.commit) 742 self.fileop.record = True 743 expected = self.write_some_files() 744 actual = self.fileop.commit() 745 self.assertEqual(actual, expected) 746 self.assertFalse(self.fileop.record) 747 748 def test_rollback(self): 749 # will assert if record isn't set 750 self.assertRaises(AssertionError, self.fileop.commit) 751 self.fileop.record = True 752 expected = self.write_some_files() 753 actual = self.fileop.rollback() 754 self.assertEqual(os.listdir(self.workdir), []) 755 self.assertFalse(self.fileop.record) 756 757 758class GlobTestCaseBase(TempdirManager, DistlibTestCase): 759 760 def build_files_tree(self, files): 761 tempdir = self.mkdtemp() 762 for filepath in files: 763 is_dir = filepath.endswith('/') 764 filepath = os.path.join(tempdir, *filepath.split('/')) 765 if is_dir: 766 dirname = filepath 767 else: 768 dirname = os.path.dirname(filepath) 769 if dirname and not os.path.exists(dirname): 770 os.makedirs(dirname) 771 if not is_dir: 772 self.write_file(filepath, 'babar') 773 return tempdir 774 775 @staticmethod 776 def os_dependent_path(path): 777 path = path.rstrip('/').split('/') 778 return os.path.join(*path) 779 780 def clean_tree(self, spec): 781 files = [] 782 for path, includes in spec.items(): 783 if includes: 784 files.append(self.os_dependent_path(path)) 785 return sorted(files) 786 787 788class GlobTestCase(GlobTestCaseBase): 789 790 def assertGlobMatch(self, glob, spec): 791 tempdir = self.build_files_tree(spec) 792 expected = self.clean_tree(spec) 793 os.chdir(tempdir) 794 result = sorted(iglob(glob)) 795 self.assertEqual(expected, result) 796 797 def test_regex_rich_glob(self): 798 matches = RICH_GLOB.findall( 799 r"babar aime les {fraises} est les {huitres}") 800 self.assertEqual(["fraises", "huitres"], matches) 801 802 def test_simple_glob(self): 803 glob = '*.tp?' 804 spec = {'coucou.tpl': True, 805 'coucou.tpj': True, 806 'Donotwant': False} 807 self.assertGlobMatch(glob, spec) 808 809 def test_simple_glob_in_dir(self): 810 glob = os.path.join('babar', '*.tp?') 811 spec = {'babar/coucou.tpl': True, 812 'babar/coucou.tpj': True, 813 'babar/toto.bin': False, 814 'Donotwant': False} 815 self.assertGlobMatch(glob, spec) 816 817 def test_recursive_glob_head(self): 818 glob = os.path.join('**', 'tip', '*.t?l') 819 spec = {'babar/zaza/zuzu/tip/coucou.tpl': True, 820 'babar/z/tip/coucou.tpl': True, 821 'babar/tip/coucou.tpl': True, 822 'babar/zeop/tip/babar/babar.tpl': False, 823 'babar/z/tip/coucou.bin': False, 824 'babar/toto.bin': False, 825 'zozo/zuzu/tip/babar.tpl': True, 826 'zozo/tip/babar.tpl': True, 827 'Donotwant': False} 828 self.assertGlobMatch(glob, spec) 829 830 def test_recursive_glob_tail(self): 831 glob = os.path.join('babar', '**') 832 spec = {'babar/zaza/': True, 833 'babar/zaza/zuzu/': True, 834 'babar/zaza/zuzu/babar.xml': True, 835 'babar/zaza/zuzu/toto.xml': True, 836 'babar/zaza/zuzu/toto.csv': True, 837 'babar/zaza/coucou.tpl': True, 838 'babar/bubu.tpl': True, 839 'zozo/zuzu/tip/babar.tpl': False, 840 'zozo/tip/babar.tpl': False, 841 'Donotwant': False} 842 self.assertGlobMatch(glob, spec) 843 844 def test_recursive_glob_middle(self): 845 glob = os.path.join('babar', '**', 'tip', '*.t?l') 846 spec = {'babar/zaza/zuzu/tip/coucou.tpl': True, 847 'babar/z/tip/coucou.tpl': True, 848 'babar/tip/coucou.tpl': True, 849 'babar/zeop/tip/babar/babar.tpl': False, 850 'babar/z/tip/coucou.bin': False, 851 'babar/toto.bin': False, 852 'zozo/zuzu/tip/babar.tpl': False, 853 'zozo/tip/babar.tpl': False, 854 'Donotwant': False} 855 self.assertGlobMatch(glob, spec) 856 857 def test_glob_set_tail(self): 858 glob = os.path.join('bin', '*.{bin,sh,exe}') 859 spec = {'bin/babar.bin': True, 860 'bin/zephir.sh': True, 861 'bin/celestine.exe': True, 862 'bin/cornelius.bat': False, 863 'bin/cornelius.xml': False, 864 'toto/yurg': False, 865 'Donotwant': False} 866 self.assertGlobMatch(glob, spec) 867 868 def test_glob_set_middle(self): 869 glob = os.path.join('xml', '{babar,toto}.xml') 870 spec = {'xml/babar.xml': True, 871 'xml/toto.xml': True, 872 'xml/babar.xslt': False, 873 'xml/cornelius.sgml': False, 874 'xml/zephir.xml': False, 875 'toto/yurg.xml': False, 876 'Donotwant': False} 877 self.assertGlobMatch(glob, spec) 878 879 def test_glob_set_head(self): 880 glob = os.path.join('{xml,xslt}', 'babar.*') 881 spec = {'xml/babar.xml': True, 882 'xml/toto.xml': False, 883 'xslt/babar.xslt': True, 884 'xslt/toto.xslt': False, 885 'toto/yurg.xml': False, 886 'Donotwant': False} 887 self.assertGlobMatch(glob, spec) 888 889 def test_glob_all(self): 890 dirs = '{%s,%s}' % (os.path.join('xml', '*'), 891 os.path.join('xslt', '**')) 892 glob = os.path.join(dirs, 'babar.xml') 893 spec = {'xml/a/babar.xml': True, 894 'xml/b/babar.xml': True, 895 'xml/a/c/babar.xml': False, 896 'xslt/a/babar.xml': True, 897 'xslt/b/babar.xml': True, 898 'xslt/a/c/babar.xml': True, 899 'toto/yurg.xml': False, 900 'Donotwant': False} 901 self.assertGlobMatch(glob, spec) 902 903 def test_invalid_glob_pattern(self): 904 invalids = [ 905 'ppooa**', 906 'azzaeaz4**/', 907 '/**ddsfs', 908 '**##1e"&e', 909 'DSFb**c009', 910 '{', 911 '{aaQSDFa', 912 '}', 913 'aQSDFSaa}', 914 '{**a,', 915 ',**a}', 916 '{a**,', 917 ',b**}', 918 '{a**a,babar}', 919 '{bob,b**z}', 920 ] 921 for pattern in invalids: 922 self.assertRaises(ValueError, iglob, pattern) 923 924 def test_parse_requirement(self): 925 # Empty requirements 926 for empty in ('', '#this should be ignored'): 927 self.assertIsNone(parse_requirement(empty)) 928 929 # Invalid requirements 930 for invalid in ('a (', 'a/', 'a$', 'a [', 'a () [],', 'a 1.2'): 931 self.assertRaises(SyntaxError, parse_requirement, invalid) 932 933 # Valid requirements 934 def validate(r, values): 935 self.assertEqual(r.name, values[0]) 936 self.assertEqual(r.constraints, values[1]) 937 self.assertEqual(r.extras, values[2]) 938 self.assertEqual(r.requirement, values[3]) 939 self.assertEqual(r.url, values[4]) 940 941 r = parse_requirement('a') 942 validate(r, ('a', None, None, 'a', None)) 943 r = parse_requirement('a >= 1.2, <2.0,!=1.7') 944 validate(r, ('a', [('>=', '1.2'), ('<', '2.0'), ('!=', '1.7')], None, 945 'a >= 1.2, < 2.0, != 1.7', None)) 946 r = parse_requirement('a [ab,cd , ef] >= 1.2, <2.0') 947 validate(r, ('a', [('>=', '1.2'), ('<', '2.0')], ['ab', 'cd', 'ef'], 948 'a >= 1.2, < 2.0', None)) 949 r = parse_requirement('a[]') 950 validate(r, ('a', None, None, 'a', None)) 951 r = parse_requirement('a (== 1.2.*, != 1.2.1.*)') 952 validate(r, ('a', [('==', '1.2.*'), ('!=', '1.2.1.*')], None, 953 'a == 1.2.*, != 1.2.1.*', None)) 954 r = parse_requirement('a @ http://domain.com/path#abc=def') 955 validate(r, ('a', None, None, 'a', 'http://domain.com/path#abc=def')) 956 # See issue #148 957 r = parse_requirement('a >=3.6') 958 validate(r, ('a', [('>=', '3.6')], None, 'a >= 3.6', None)) 959 r = parse_requirement('a >=3.6,') 960 validate(r, ('a', [('>=', '3.6')], None, 'a >= 3.6', None)) 961 962 if False: # TODO re-enable 963 for e in ('*', ':*:', ':meta:', '-', '-abc'): 964 r = parse_requirement('a [%s]' % e) 965 validate(r, ('a', None, [e], 'a', None)) 966 967 def test_write_exports(self): 968 exports = { 969 'foo': { 970 'v1': ExportEntry('v1', 'p1', 's1', []), 971 'v2': ExportEntry('v2', 'p2', 's2', ['f2=a', 'g2']), 972 }, 973 'bar': { 974 'v3': ExportEntry('v3', 'p3', 's3', ['f3', 'g3=h']), 975 'v4': ExportEntry('v4', 'p4', 's4', ['f4', 'g4']), 976 }, 977 } 978 979 fd, fn = tempfile.mkstemp() 980 try: 981 os.close(fd) 982 with open(fn, 'wb') as f: 983 write_exports(exports, f) 984 with open(fn, 'rb') as f: 985 actual = read_exports(f) 986 self.assertEqual(actual, exports) 987 finally: 988 os.remove(fn) 989 990 def test_get_extras(self): 991 cases = ( 992 (['*'], ['i18n'], set(['i18n'])), 993 (['*', '-bar'], ['foo', 'bar'], set(['foo'])), 994 ) 995 for requested, available, expected in cases: 996 actual = get_extras(requested, available) 997 self.assertEqual(actual, expected) 998if __name__ == '__main__': # pragma: no cover 999 unittest.main() 1000