1# -*- coding: utf-8 -*-
2"""Tests for `wcmatch`."""
3import unittest
4import os
5import wcmatch.wcmatch as wcmatch
6import shutil
7from wcmatch import _wcparse
8
9
10# Below is general helper stuff that Python uses in `unittests`.  As these
11# not meant for users, and could change without notice, include them
12# ourselves so we aren't surprised later.
13TESTFN = '@test'
14
15# Disambiguate `TESTFN` for parallel testing, while letting it remain a valid
16# module name.
17TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
18
19
20def create_empty_file(filename):
21    """Create an empty file. If the file already exists, truncate it."""
22
23    fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
24    os.close(fd)
25
26
27_can_symlink = None
28
29
30def can_symlink():
31    """Check if we can symlink."""
32
33    global _can_symlink
34    if _can_symlink is not None:
35        return _can_symlink
36    symlink_path = TESTFN + "can_symlink"
37    try:
38        os.symlink(TESTFN, symlink_path)
39        can = True
40    except (OSError, NotImplementedError, AttributeError):
41        can = False
42    else:
43        os.remove(symlink_path)
44    _can_symlink = can
45    return can
46
47
48def skip_unless_symlink(test):
49    """Skip decorator for tests that require functional symlink."""
50
51    ok = can_symlink()
52    msg = "Requires functional symlink implementation"
53    return test if ok else unittest.skip(msg)(test)
54
55
56class _TestWcmatch(unittest.TestCase):
57    """Test the `WcMatch` class."""
58
59    def mktemp(self, *parts):
60        """Make temp directory."""
61
62        filename = self.norm(*parts)
63        base, file = os.path.split(filename)
64        if not os.path.exists(base):
65            retry = 3
66            while retry:
67                try:
68                    os.makedirs(base)
69                    retry = 0
70                except Exception:
71                    retry -= 1
72        create_empty_file(filename)
73
74    def force_err(self):
75        """Force an error."""
76
77        raise TypeError
78
79    def norm(self, *parts):
80        """Normalizes file path (in relation to temp directory)."""
81        tempdir = os.fsencode(self.tempdir) if isinstance(parts[0], bytes) else self.tempdir
82        return os.path.join(tempdir, *parts)
83
84    def norm_list(self, files):
85        """Normalize file list."""
86
87        return sorted([self.norm(os.path.normpath(x)) for x in files])
88
89    def setUp(self):
90        """Setup."""
91
92        self.tempdir = TESTFN + "_dir"
93        self.default_flags = wcmatch.R | wcmatch.I | wcmatch.M | wcmatch.SL
94        self.errors = []
95        self.skipped = 0
96        self.skip_records = []
97        self.error_records = []
98        self.files = []
99
100    def tearDown(self):
101        """Cleanup."""
102
103        retry = 3
104        while retry:
105            try:
106                shutil.rmtree(self.tempdir)
107                retry = 0
108            except Exception:
109                retry -= 1
110
111    def crawl_files(self, walker):
112        """Crawl the files."""
113
114        for f in walker.match():
115            if f == '<SKIPPED>':
116                self.skip_records.append(f)
117            elif f == '<ERROR>':
118                self.error_records.append(f)
119            else:
120                self.files.append(f)
121        self.skipped = walker.get_skipped()
122
123
124class TestWcmatch(_TestWcmatch):
125    """Test the `WcMatch` class."""
126
127    def setUp(self):
128        """Setup."""
129
130        self.tempdir = TESTFN + "_dir"
131        self.mktemp('.hidden', 'a.txt')
132        self.mktemp('.hidden', 'b.file')
133        self.mktemp('.hidden_file')
134        self.mktemp('a.txt')
135        self.mktemp('b.file')
136        self.mktemp('c.txt.bak')
137
138        self.default_flags = wcmatch.R | wcmatch.I | wcmatch.M | wcmatch.SL
139        self.errors = []
140        self.skipped = 0
141        self.skip_records = []
142        self.error_records = []
143        self.files = []
144
145    def test_full_path_exclude(self):
146        """Test full path exclude."""
147
148        walker = wcmatch.WcMatch(
149            self.tempdir,
150            '*.txt', exclude_pattern='**/.hidden',
151            flags=self.default_flags | wcmatch.DIRPATHNAME | wcmatch.GLOBSTAR | wcmatch.RECURSIVE | wcmatch.HIDDEN
152        )
153
154        self.crawl_files(walker)
155
156        self.assertEqual(sorted(self.files), self.norm_list(['a.txt']))
157
158    def test_full_file(self):
159        """Test full file."""
160
161        walker = wcmatch.WcMatch(
162            self.tempdir,
163            '**/*.txt|-**/.hidden/*',
164            flags=self.default_flags | wcmatch.FILEPATHNAME | wcmatch.GLOBSTAR | wcmatch.RECURSIVE | wcmatch.HIDDEN
165        )
166
167        self.crawl_files(walker)
168
169        self.assertEqual(sorted(self.files), self.norm_list(['a.txt']))
170
171    def test_non_recursive(self):
172        """Test non-recursive search."""
173
174        walker = wcmatch.WcMatch(
175            self.tempdir,
176            '*.txt',
177            flags=self.default_flags
178        )
179
180        self.crawl_files(walker)
181        self.assertEqual(self.skipped, 3)
182        self.assertEqual(sorted(self.files), self.norm_list(['a.txt']))
183
184    def test_non_recursive_inverse(self):
185        """Test non-recursive inverse search."""
186
187        walker = wcmatch.WcMatch(
188            self.tempdir,
189            '*.*|-*.file',
190            flags=self.default_flags
191        )
192
193        self.crawl_files(walker)
194        self.assertEqual(self.skipped, 2)
195        self.assertEqual(sorted(self.files), self.norm_list(['a.txt', 'c.txt.bak']))
196
197    def test_recursive(self):
198        """Test non-recursive search."""
199
200        walker = wcmatch.WcMatch(
201            self.tempdir,
202            '*.txt',
203            flags=self.default_flags | wcmatch.RECURSIVE
204        )
205
206        self.crawl_files(walker)
207        self.assertEqual(self.skipped, 3)
208        self.assertEqual(sorted(self.files), self.norm_list(['a.txt']))
209
210    def test_recursive_bytes(self):
211        """Test non-recursive search."""
212
213        walker = wcmatch.WcMatch(
214            os.fsencode(self.tempdir),
215            b'*.txt',
216            flags=self.default_flags | wcmatch.RECURSIVE
217        )
218
219        self.crawl_files(walker)
220        self.assertEqual(self.skipped, 3)
221        self.assertEqual(sorted(self.files), self.norm_list([b'a.txt']))
222
223    def test_recursive_hidden(self):
224        """Test non-recursive search."""
225
226        walker = wcmatch.WcMatch(
227            self.tempdir,
228            '*.txt',
229            flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.HIDDEN
230        )
231
232        self.crawl_files(walker)
233        self.assertEqual(self.skipped, 4)
234        self.assertEqual(sorted(self.files), self.norm_list(['.hidden/a.txt', 'a.txt']))
235
236    def test_recursive_hidden_bytes(self):
237        """Test non-recursive search with byte strings."""
238
239        walker = wcmatch.WcMatch(
240            os.fsencode(self.tempdir),
241            b'*.txt',
242            flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.HIDDEN
243        )
244
245        self.crawl_files(walker)
246        self.assertEqual(self.skipped, 4)
247        self.assertEqual(sorted(self.files), self.norm_list([b'.hidden/a.txt', b'a.txt']))
248
249    def test_recursive_hidden_folder_exclude(self):
250        """Test non-recursive search."""
251
252        walker = wcmatch.WcMatch(
253            self.tempdir,
254            '*.txt', exclude_pattern='.hidden',
255            flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.HIDDEN
256        )
257
258        self.crawl_files(walker)
259        self.assertEqual(self.skipped, 3)
260        self.assertEqual(sorted(self.files), self.norm_list(['a.txt']))
261
262    def test_recursive_hidden_folder_exclude_inverse(self):
263        """Test non-recursive search with inverse."""
264
265        walker = wcmatch.WcMatch(
266            self.tempdir,
267            '*.txt', exclude_pattern='*|-.hidden',
268            flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.HIDDEN
269        )
270
271        self.crawl_files(walker)
272        self.assertEqual(self.skipped, 4)
273        self.assertEqual(sorted(self.files), self.norm_list(['.hidden/a.txt', 'a.txt']))
274
275    def test_abort(self):
276        """Test aborting."""
277
278        walker = wcmatch.WcMatch(
279            self.tempdir,
280            '*.txt',
281            flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.HIDDEN
282        )
283
284        records = 0
285        for f in walker.imatch():
286            records += 1
287            walker.kill()
288        self.assertEqual(records, 1)
289
290        # Reset our test tracker along with the walker object
291        self.errors = []
292        self.skipped = 0
293        self.files = []
294        records = 0
295        walker.reset()
296
297        self.crawl_files(walker)
298        self.assertEqual(sorted(self.files), self.norm_list(['.hidden/a.txt', 'a.txt']))
299
300    def test_abort_early(self):
301        """Test aborting early."""
302
303        walker = wcmatch.WcMatch(
304            self.tempdir,
305            '*.txt*',
306            flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.HIDDEN
307        )
308
309        walker.kill()
310        records = 0
311        for f in walker.imatch():
312            records += 1
313
314        self.assertTrue(records == 0 or walker.get_skipped() == 0)
315
316    def test_empty_string_dir(self):
317        """Test when directory is an empty string."""
318
319        target = '.' + os.sep
320        walker = wcmatch.WcMatch(
321            '',
322            '*.txt*',
323            flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.HIDDEN
324        )
325        self.assertEqual(walker._root_dir, target)
326
327        walker = wcmatch.WcMatch(
328            b'',
329            b'*.txt*',
330            flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.HIDDEN
331        )
332        self.assertEqual(walker._root_dir, os.fsencode(target))
333
334    def test_empty_string_file(self):
335        """Test when file pattern is an empty string."""
336
337        walker = wcmatch.WcMatch(
338            self.tempdir,
339            '',
340            flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.HIDDEN
341        )
342        self.crawl_files(walker)
343        self.assertEqual(
344            sorted(self.files),
345            self.norm_list(
346                ['a.txt', '.hidden/a.txt', '.hidden/b.file', 'b.file', '.hidden_file', 'c.txt.bak']
347            )
348        )
349
350    def test_skip_override(self):
351        """Test `on_skip` override."""
352
353        walker = wcmatch.WcMatch(
354            self.tempdir,
355            '*.txt',
356            flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.HIDDEN
357        )
358
359        walker.on_skip = lambda base, name: '<SKIPPED>'
360
361        self.crawl_files(walker)
362        self.assertEqual(len(self.skip_records), 4)
363
364    def test_errors(self):
365        """Test errors."""
366
367        walker = wcmatch.WcMatch(
368            self.tempdir,
369            '*.txt',
370            flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.HIDDEN
371        )
372
373        walker.on_validate_file = lambda base, name: self.force_err()
374
375        self.crawl_files(walker)
376        self.assertEqual(sorted(self.files), self.norm_list([]))
377
378        self.errors = []
379        self.skipped = 0
380        self.files = []
381
382        walker = wcmatch.WcMatch(
383            self.tempdir,
384            '*.txt',
385            flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.HIDDEN
386        )
387
388        walker.on_validate_directory = lambda base, name: self.force_err()
389
390        self.crawl_files(walker)
391        self.assertEqual(sorted(self.files), self.norm_list(['a.txt']))
392
393    def test_error_override(self):
394        """Test `on_eror` override."""
395
396        walker = wcmatch.WcMatch(
397            self.tempdir,
398            '*.txt',
399            flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.HIDDEN
400        )
401
402        walker.on_validate_file = lambda base, name: self.force_err()
403        walker.on_error = lambda base, name: '<ERROR>'
404
405        self.crawl_files(walker)
406        self.assertEqual(len(self.error_records), 2)
407
408    def test_match_base_filepath(self):
409        """Test `MATCHBASE` with filepath."""
410
411        walker = wcmatch.WcMatch(
412            self.tempdir,
413            '*.txt',
414            flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.HIDDEN | wcmatch.FILEPATHNAME | wcmatch.MATCHBASE
415        )
416        self.crawl_files(walker)
417        self.assertEqual(
418            sorted(self.files),
419            self.norm_list(
420                ['a.txt', '.hidden/a.txt']
421            )
422        )
423
424    def test_match_base_absolute_filepath(self):
425        """Test `MATCHBASE` with filepath and an absolute path."""
426
427        walker = wcmatch.WcMatch(
428            self.tempdir,
429            '.hidden/*.txt',
430            flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.HIDDEN | wcmatch.FILEPATHNAME | wcmatch.MATCHBASE
431        )
432        self.crawl_files(walker)
433        self.assertEqual(
434            sorted(self.files),
435            self.norm_list(
436                ['.hidden/a.txt']
437            )
438        )
439
440    def test_match_base_anchored_filepath(self):
441        """Test `MATCHBASE` with filepath and an anchored pattern."""
442
443        walker = wcmatch.WcMatch(
444            self.tempdir,
445            '/*.txt',
446            flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.HIDDEN | wcmatch.FILEPATHNAME | wcmatch.MATCHBASE
447        )
448        self.crawl_files(walker)
449        self.assertEqual(
450            sorted(self.files),
451            self.norm_list(
452                ['a.txt']
453            )
454        )
455
456    def test_match_insensitive(self):
457        """Test case insensitive."""
458
459        walker = wcmatch.WcMatch(
460            self.tempdir,
461            'A.TXT',
462            flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.FILEPATHNAME | wcmatch.IGNORECASE
463        )
464        self.crawl_files(walker)
465        self.assertEqual(
466            sorted(self.files),
467            self.norm_list(
468                ['a.txt']
469            )
470        )
471
472    def test_nomatch_sensitive(self):
473        """Test case sensitive does not match."""
474
475        walker = wcmatch.WcMatch(
476            self.tempdir,
477            'A.TXT',
478            flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.FILEPATHNAME | wcmatch.CASE
479        )
480        self.crawl_files(walker)
481        self.assertEqual(
482            sorted(self.files),
483            self.norm_list(
484                []
485            )
486        )
487
488    def test_match_sensitive(self):
489        """Test case sensitive."""
490
491        walker = wcmatch.WcMatch(
492            self.tempdir,
493            'a.txt',
494            flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.FILEPATHNAME | wcmatch.CASE
495        )
496        self.crawl_files(walker)
497        self.assertEqual(
498            sorted(self.files),
499            self.norm_list(
500                ['a.txt']
501            )
502        )
503
504
505@skip_unless_symlink
506class TestWcmatchSymlink(_TestWcmatch):
507    """Test symlinks."""
508
509    def mksymlink(self, original, link):
510        """Make symlink."""
511
512        if not os.path.lexists(link):
513            os.symlink(original, link)
514
515    def setUp(self):
516        """Setup."""
517
518        self.tempdir = TESTFN + "_dir"
519        self.mktemp('.hidden', 'a.txt')
520        self.mktemp('.hidden', 'b.file')
521        self.mktemp('.hidden_file')
522        self.mktemp('a.txt')
523        self.mktemp('b.file')
524        self.mktemp('c.txt.bak')
525        self.can_symlink = can_symlink()
526        if self.can_symlink:
527            self.mksymlink('.hidden', self.norm('sym1'))
528            self.mksymlink(os.path.join('.hidden', 'a.txt'), self.norm('sym2'))
529
530        self.default_flags = wcmatch.R | wcmatch.I | wcmatch.M
531        self.errors = []
532        self.skipped = 0
533        self.skip_records = []
534        self.error_records = []
535        self.files = []
536
537    def test_symlinks(self):
538        """Test symlinks."""
539
540        walker = wcmatch.WcMatch(
541            self.tempdir,
542            '*.txt',
543            flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.HIDDEN | wcmatch.SYMLINKS
544        )
545
546        self.crawl_files(walker)
547        self.assertEqual(
548            sorted(self.files),
549            self.norm_list(
550                ['a.txt', '.hidden/a.txt', 'sym1/a.txt']
551            )
552        )
553
554    def test_avoid_symlinks(self):
555        """Test avoiding symlinks."""
556
557        walker = wcmatch.WcMatch(
558            self.tempdir,
559            '*.txt',
560            flags=self.default_flags | wcmatch.RECURSIVE | wcmatch.HIDDEN
561        )
562
563        self.crawl_files(walker)
564        self.assertEqual(
565            sorted(self.files),
566            self.norm_list(
567                ['a.txt', '.hidden/a.txt']
568            )
569        )
570
571
572class TestExpansionLimit(unittest.TestCase):
573    """Test expansion limits."""
574
575    def test_limit_wcmatch(self):
576        """Test expansion limit of `globmatch`."""
577
578        with self.assertRaises(_wcparse.PatternLimitException):
579            wcmatch.WcMatch('.', '{1..11}', flags=wcmatch.BRACE, limit=10)
580