1# -*- coding: utf-8 -*- 2"""Tests for `wcmatch`.""" 3import unittest 4import os 5import wcmatch.wcmatch as wcmatch 6import shutil 7from wcmatch import _wcparse 8 9 10# Below is general helper stuff that Python uses in `unittests`. As these 11# not meant for users, and could change without notice, include them 12# ourselves so we aren't surprised later. 13TESTFN = '@test' 14 15# Disambiguate `TESTFN` for parallel testing, while letting it remain a valid 16# module name. 17TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid()) 18 19 20def create_empty_file(filename): 21 """Create an empty file. If the file already exists, truncate it.""" 22 23 fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC) 24 os.close(fd) 25 26 27_can_symlink = None 28 29 30def can_symlink(): 31 """Check if we can symlink.""" 32 33 global _can_symlink 34 if _can_symlink is not None: 35 return _can_symlink 36 symlink_path = TESTFN + "can_symlink" 37 try: 38 os.symlink(TESTFN, symlink_path) 39 can = True 40 except (OSError, NotImplementedError, AttributeError): 41 can = False 42 else: 43 os.remove(symlink_path) 44 _can_symlink = can 45 return can 46 47 48def skip_unless_symlink(test): 49 """Skip decorator for tests that require functional symlink.""" 50 51 ok = can_symlink() 52 msg = "Requires functional symlink implementation" 53 return test if ok else unittest.skip(msg)(test) 54 55 56class _TestWcmatch(unittest.TestCase): 57 """Test the `WcMatch` class.""" 58 59 def mktemp(self, *parts): 60 """Make temp directory.""" 61 62 filename = self.norm(*parts) 63 base, file = os.path.split(filename) 64 if not os.path.exists(base): 65 retry = 3 66 while retry: 67 try: 68 os.makedirs(base) 69 retry = 0 70 except Exception: 71 retry -= 1 72 create_empty_file(filename) 73 74 def force_err(self): 75 """Force an error.""" 76 77 raise TypeError 78 79 def norm(self, *parts): 80 """Normalizes file path (in relation to temp directory).""" 81 tempdir = os.fsencode(self.tempdir) if isinstance(parts[0], bytes) else self.tempdir 82 return os.path.join(tempdir, *parts) 83 84 def norm_list(self, files): 85 """Normalize file list.""" 86 87 return sorted([self.norm(os.path.normpath(x)) for x in files]) 88 89 def setUp(self): 90 """Setup.""" 91 92 self.tempdir = TESTFN + "_dir" 93 self.default_flags = wcmatch.R | wcmatch.I | wcmatch.M | wcmatch.SL 94 self.errors = [] 95 self.skipped = 0 96 self.skip_records = [] 97 self.error_records = [] 98 self.files = [] 99 100 def tearDown(self): 101 """Cleanup.""" 102 103 retry = 3 104 while retry: 105 try: 106 shutil.rmtree(self.tempdir) 107 retry = 0 108 except Exception: 109 retry -= 1 110 111 def crawl_files(self, walker): 112 """Crawl the files.""" 113 114 for f in walker.match(): 115 if f == '<SKIPPED>': 116 self.skip_records.append(f) 117 elif f == '<ERROR>': 118 self.error_records.append(f) 119 else: 120 self.files.append(f) 121 self.skipped = walker.get_skipped() 122 123 124class TestWcmatch(_TestWcmatch): 125 """Test the `WcMatch` class.""" 126 127 def setUp(self): 128 """Setup.""" 129 130 self.tempdir = TESTFN + "_dir" 131 self.mktemp('.hidden', 'a.txt') 132 self.mktemp('.hidden', 'b.file') 133 self.mktemp('.hidden_file') 134 self.mktemp('a.txt') 135 self.mktemp('b.file') 136 self.mktemp('c.txt.bak') 137 138 self.default_flags = wcmatch.R | wcmatch.I | wcmatch.M | wcmatch.SL 139 self.errors = [] 140 self.skipped = 0 141 self.skip_records = [] 142 self.error_records = [] 143 self.files = [] 144 145 def test_full_path_exclude(self): 146 """Test full path exclude.""" 147 148 walker = wcmatch.WcMatch( 149 self.tempdir, 150 '*.txt', exclude_pattern='**/.hidden', 151 flags=self.default_flags | wcmatch.DIRPATHNAME | wcmatch.GLOBSTAR | wcmatch.RECURSIVE | wcmatch.HIDDEN 152 ) 153 154 self.crawl_files(walker) 155 156 self.assertEqual(sorted(self.files), self.norm_list(['a.txt'])) 157 158 def test_full_file(self): 159 """Test full file.""" 160 161 walker = wcmatch.WcMatch( 162 self.tempdir, 163 '**/*.txt|-**/.hidden/*', 164 flags=self.default_flags | wcmatch.FILEPATHNAME | wcmatch.GLOBSTAR | wcmatch.RECURSIVE | wcmatch.HIDDEN 165 ) 166 167 self.crawl_files(walker) 168 169 self.assertEqual(sorted(self.files), self.norm_list(['a.txt'])) 170 171 def test_non_recursive(self): 172 """Test non-recursive search.""" 173 174 walker = wcmatch.WcMatch( 175 self.tempdir, 176 '*.txt', 177 flags=self.default_flags 178 ) 179 180 self.crawl_files(walker) 181 self.assertEqual(self.skipped, 3) 182 self.assertEqual(sorted(self.files), self.norm_list(['a.txt'])) 183 184 def test_non_recursive_inverse(self): 185 """Test non-recursive inverse search.""" 186 187 walker = wcmatch.WcMatch( 188 self.tempdir, 189 '*.*|-*.file', 190 flags=self.default_flags 191 ) 192 193 self.crawl_files(walker) 194 self.assertEqual(self.skipped, 2) 195 self.assertEqual(sorted(self.files), self.norm_list(['a.txt', 'c.txt.bak'])) 196 197 def test_recursive(self): 198 """Test non-recursive search.""" 199 200 walker = wcmatch.WcMatch( 201 self.tempdir, 202 '*.txt', 203 flags=self.default_flags | wcmatch.RECURSIVE 204 ) 205 206 self.crawl_files(walker) 207 self.assertEqual(self.skipped, 3) 208 self.assertEqual(sorted(self.files), self.norm_list(['a.txt'])) 209 210 def test_recursive_bytes(self): 211 """Test non-recursive search.""" 212 213 walker = wcmatch.WcMatch( 214 os.fsencode(self.tempdir), 215 b'*.txt', 216 flags=self.default_flags | wcmatch.RECURSIVE 217 ) 218 219 self.crawl_files(walker) 220 self.assertEqual(self.skipped, 3) 221 self.assertEqual(sorted(self.files), self.norm_list([b'a.txt'])) 222 223 def test_recursive_hidden(self): 224 """Test non-recursive search.""" 225 226 walker = wcmatch.WcMatch( 227 self.tempdir, 228 '*.txt', 229 flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.HIDDEN 230 ) 231 232 self.crawl_files(walker) 233 self.assertEqual(self.skipped, 4) 234 self.assertEqual(sorted(self.files), self.norm_list(['.hidden/a.txt', 'a.txt'])) 235 236 def test_recursive_hidden_bytes(self): 237 """Test non-recursive search with byte strings.""" 238 239 walker = wcmatch.WcMatch( 240 os.fsencode(self.tempdir), 241 b'*.txt', 242 flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.HIDDEN 243 ) 244 245 self.crawl_files(walker) 246 self.assertEqual(self.skipped, 4) 247 self.assertEqual(sorted(self.files), self.norm_list([b'.hidden/a.txt', b'a.txt'])) 248 249 def test_recursive_hidden_folder_exclude(self): 250 """Test non-recursive search.""" 251 252 walker = wcmatch.WcMatch( 253 self.tempdir, 254 '*.txt', exclude_pattern='.hidden', 255 flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.HIDDEN 256 ) 257 258 self.crawl_files(walker) 259 self.assertEqual(self.skipped, 3) 260 self.assertEqual(sorted(self.files), self.norm_list(['a.txt'])) 261 262 def test_recursive_hidden_folder_exclude_inverse(self): 263 """Test non-recursive search with inverse.""" 264 265 walker = wcmatch.WcMatch( 266 self.tempdir, 267 '*.txt', exclude_pattern='*|-.hidden', 268 flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.HIDDEN 269 ) 270 271 self.crawl_files(walker) 272 self.assertEqual(self.skipped, 4) 273 self.assertEqual(sorted(self.files), self.norm_list(['.hidden/a.txt', 'a.txt'])) 274 275 def test_abort(self): 276 """Test aborting.""" 277 278 walker = wcmatch.WcMatch( 279 self.tempdir, 280 '*.txt', 281 flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.HIDDEN 282 ) 283 284 records = 0 285 for f in walker.imatch(): 286 records += 1 287 walker.kill() 288 self.assertEqual(records, 1) 289 290 # Reset our test tracker along with the walker object 291 self.errors = [] 292 self.skipped = 0 293 self.files = [] 294 records = 0 295 walker.reset() 296 297 self.crawl_files(walker) 298 self.assertEqual(sorted(self.files), self.norm_list(['.hidden/a.txt', 'a.txt'])) 299 300 def test_abort_early(self): 301 """Test aborting early.""" 302 303 walker = wcmatch.WcMatch( 304 self.tempdir, 305 '*.txt*', 306 flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.HIDDEN 307 ) 308 309 walker.kill() 310 records = 0 311 for f in walker.imatch(): 312 records += 1 313 314 self.assertTrue(records == 0 or walker.get_skipped() == 0) 315 316 def test_empty_string_dir(self): 317 """Test when directory is an empty string.""" 318 319 target = '.' + os.sep 320 walker = wcmatch.WcMatch( 321 '', 322 '*.txt*', 323 flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.HIDDEN 324 ) 325 self.assertEqual(walker._root_dir, target) 326 327 walker = wcmatch.WcMatch( 328 b'', 329 b'*.txt*', 330 flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.HIDDEN 331 ) 332 self.assertEqual(walker._root_dir, os.fsencode(target)) 333 334 def test_empty_string_file(self): 335 """Test when file pattern is an empty string.""" 336 337 walker = wcmatch.WcMatch( 338 self.tempdir, 339 '', 340 flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.HIDDEN 341 ) 342 self.crawl_files(walker) 343 self.assertEqual( 344 sorted(self.files), 345 self.norm_list( 346 ['a.txt', '.hidden/a.txt', '.hidden/b.file', 'b.file', '.hidden_file', 'c.txt.bak'] 347 ) 348 ) 349 350 def test_skip_override(self): 351 """Test `on_skip` override.""" 352 353 walker = wcmatch.WcMatch( 354 self.tempdir, 355 '*.txt', 356 flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.HIDDEN 357 ) 358 359 walker.on_skip = lambda base, name: '<SKIPPED>' 360 361 self.crawl_files(walker) 362 self.assertEqual(len(self.skip_records), 4) 363 364 def test_errors(self): 365 """Test errors.""" 366 367 walker = wcmatch.WcMatch( 368 self.tempdir, 369 '*.txt', 370 flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.HIDDEN 371 ) 372 373 walker.on_validate_file = lambda base, name: self.force_err() 374 375 self.crawl_files(walker) 376 self.assertEqual(sorted(self.files), self.norm_list([])) 377 378 self.errors = [] 379 self.skipped = 0 380 self.files = [] 381 382 walker = wcmatch.WcMatch( 383 self.tempdir, 384 '*.txt', 385 flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.HIDDEN 386 ) 387 388 walker.on_validate_directory = lambda base, name: self.force_err() 389 390 self.crawl_files(walker) 391 self.assertEqual(sorted(self.files), self.norm_list(['a.txt'])) 392 393 def test_error_override(self): 394 """Test `on_eror` override.""" 395 396 walker = wcmatch.WcMatch( 397 self.tempdir, 398 '*.txt', 399 flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.HIDDEN 400 ) 401 402 walker.on_validate_file = lambda base, name: self.force_err() 403 walker.on_error = lambda base, name: '<ERROR>' 404 405 self.crawl_files(walker) 406 self.assertEqual(len(self.error_records), 2) 407 408 def test_match_base_filepath(self): 409 """Test `MATCHBASE` with filepath.""" 410 411 walker = wcmatch.WcMatch( 412 self.tempdir, 413 '*.txt', 414 flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.HIDDEN | wcmatch.FILEPATHNAME | wcmatch.MATCHBASE 415 ) 416 self.crawl_files(walker) 417 self.assertEqual( 418 sorted(self.files), 419 self.norm_list( 420 ['a.txt', '.hidden/a.txt'] 421 ) 422 ) 423 424 def test_match_base_absolute_filepath(self): 425 """Test `MATCHBASE` with filepath and an absolute path.""" 426 427 walker = wcmatch.WcMatch( 428 self.tempdir, 429 '.hidden/*.txt', 430 flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.HIDDEN | wcmatch.FILEPATHNAME | wcmatch.MATCHBASE 431 ) 432 self.crawl_files(walker) 433 self.assertEqual( 434 sorted(self.files), 435 self.norm_list( 436 ['.hidden/a.txt'] 437 ) 438 ) 439 440 def test_match_base_anchored_filepath(self): 441 """Test `MATCHBASE` with filepath and an anchored pattern.""" 442 443 walker = wcmatch.WcMatch( 444 self.tempdir, 445 '/*.txt', 446 flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.HIDDEN | wcmatch.FILEPATHNAME | wcmatch.MATCHBASE 447 ) 448 self.crawl_files(walker) 449 self.assertEqual( 450 sorted(self.files), 451 self.norm_list( 452 ['a.txt'] 453 ) 454 ) 455 456 def test_match_insensitive(self): 457 """Test case insensitive.""" 458 459 walker = wcmatch.WcMatch( 460 self.tempdir, 461 'A.TXT', 462 flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.FILEPATHNAME | wcmatch.IGNORECASE 463 ) 464 self.crawl_files(walker) 465 self.assertEqual( 466 sorted(self.files), 467 self.norm_list( 468 ['a.txt'] 469 ) 470 ) 471 472 def test_nomatch_sensitive(self): 473 """Test case sensitive does not match.""" 474 475 walker = wcmatch.WcMatch( 476 self.tempdir, 477 'A.TXT', 478 flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.FILEPATHNAME | wcmatch.CASE 479 ) 480 self.crawl_files(walker) 481 self.assertEqual( 482 sorted(self.files), 483 self.norm_list( 484 [] 485 ) 486 ) 487 488 def test_match_sensitive(self): 489 """Test case sensitive.""" 490 491 walker = wcmatch.WcMatch( 492 self.tempdir, 493 'a.txt', 494 flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.FILEPATHNAME | wcmatch.CASE 495 ) 496 self.crawl_files(walker) 497 self.assertEqual( 498 sorted(self.files), 499 self.norm_list( 500 ['a.txt'] 501 ) 502 ) 503 504 505@skip_unless_symlink 506class TestWcmatchSymlink(_TestWcmatch): 507 """Test symlinks.""" 508 509 def mksymlink(self, original, link): 510 """Make symlink.""" 511 512 if not os.path.lexists(link): 513 os.symlink(original, link) 514 515 def setUp(self): 516 """Setup.""" 517 518 self.tempdir = TESTFN + "_dir" 519 self.mktemp('.hidden', 'a.txt') 520 self.mktemp('.hidden', 'b.file') 521 self.mktemp('.hidden_file') 522 self.mktemp('a.txt') 523 self.mktemp('b.file') 524 self.mktemp('c.txt.bak') 525 self.can_symlink = can_symlink() 526 if self.can_symlink: 527 self.mksymlink('.hidden', self.norm('sym1')) 528 self.mksymlink(os.path.join('.hidden', 'a.txt'), self.norm('sym2')) 529 530 self.default_flags = wcmatch.R | wcmatch.I | wcmatch.M 531 self.errors = [] 532 self.skipped = 0 533 self.skip_records = [] 534 self.error_records = [] 535 self.files = [] 536 537 def test_symlinks(self): 538 """Test symlinks.""" 539 540 walker = wcmatch.WcMatch( 541 self.tempdir, 542 '*.txt', 543 flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.HIDDEN | wcmatch.SYMLINKS 544 ) 545 546 self.crawl_files(walker) 547 self.assertEqual( 548 sorted(self.files), 549 self.norm_list( 550 ['a.txt', '.hidden/a.txt', 'sym1/a.txt'] 551 ) 552 ) 553 554 def test_avoid_symlinks(self): 555 """Test avoiding symlinks.""" 556 557 walker = wcmatch.WcMatch( 558 self.tempdir, 559 '*.txt', 560 flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.HIDDEN 561 ) 562 563 self.crawl_files(walker) 564 self.assertEqual( 565 sorted(self.files), 566 self.norm_list( 567 ['a.txt', '.hidden/a.txt'] 568 ) 569 ) 570 571 572class TestExpansionLimit(unittest.TestCase): 573 """Test expansion limits.""" 574 575 def test_limit_wcmatch(self): 576 """Test expansion limit of `globmatch`.""" 577 578 with self.assertRaises(_wcparse.PatternLimitException): 579 wcmatch.WcMatch('.', '{1..11}', flags=wcmatch.BRACE, limit=10) 580