1# Copyright (C) 2005-2013, 2015, 2016 Canonical Ltd
2#
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation; either version 2 of the License, or
6# (at your option) any later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11# GNU General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License
14# along with this program; if not, write to the Free Software
15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
17"""Testing framework extensions"""
18
19# NOTE: Some classes in here use camelCaseNaming() rather than
20# underscore_naming().  That's for consistency with unittest; it's not the
21# general style of breezy.  Please continue that consistency when adding e.g.
22# new assertFoo() methods.
23
24import atexit
25import codecs
26import copy
27import difflib
28import doctest
29import errno
30import functools
31from io import (
32    BytesIO,
33    StringIO,
34    TextIOWrapper,
35    )
36import itertools
37import logging
38import math
39import os
40import platform
41import pprint
42import random
43import re
44import shlex
45import site
46import stat
47import subprocess
48import sys
49import tempfile
50import threading
51import time
52import traceback
53import unittest
54import warnings
55
56import testtools
57# nb: check this before importing anything else from within it
58_testtools_version = getattr(testtools, '__version__', ())
59if _testtools_version < (0, 9, 5):
60    raise ImportError("need at least testtools 0.9.5: %s is %r"
61                      % (testtools.__file__, _testtools_version))
62from testtools import content
63
64import breezy
65from .. import (
66    branchbuilder,
67    controldir,
68    commands as _mod_commands,
69    config,
70    i18n,
71    debug,
72    errors,
73    hooks,
74    lock as _mod_lock,
75    lockdir,
76    osutils,
77    plugin as _mod_plugin,
78    pyutils,
79    ui,
80    urlutils,
81    registry,
82    symbol_versioning,
83    trace,
84    transport as _mod_transport,
85    workingtree,
86    )
87from breezy.bzr import (
88    chk_map,
89    )
90try:
91    import breezy.lsprof
92except ImportError:
93    # lsprof not available
94    pass
95from ..bzr.smart import client, request
96from ..transport import (
97    memory,
98    pathfilter,
99    )
100from ..tests import (
101    fixtures,
102    test_server,
103    TestUtil,
104    treeshape,
105    ui_testing,
106    )
107
108# Mark this python module as being part of the implementation
109# of unittest: this gives us better tracebacks where the last
110# shown frame is the test code, not our assertXYZ.
111__unittest = 1
112
113default_transport = test_server.LocalURLServer
114
115
116_unitialized_attr = object()
117"""A sentinel needed to act as a default value in a method signature."""
118
119
120# Subunit result codes, defined here to prevent a hard dependency on subunit.
121SUBUNIT_SEEK_SET = 0
122SUBUNIT_SEEK_CUR = 1
123
124# These are intentionally brought into this namespace. That way plugins, etc
125# can just "from breezy.tests import TestCase, TestLoader, etc"
126TestSuite = TestUtil.TestSuite
127TestLoader = TestUtil.TestLoader
128
129# Tests should run in a clean and clearly defined environment. The goal is to
130# keep them isolated from the running environment as mush as possible. The test
131# framework ensures the variables defined below are set (or deleted if the
132# value is None) before a test is run and reset to their original value after
133# the test is run. Generally if some code depends on an environment variable,
134# the tests should start without this variable in the environment. There are a
135# few exceptions but you shouldn't violate this rule lightly.
136isolated_environ = {
137    'BRZ_HOME': None,
138    'HOME': None,
139    'GNUPGHOME': None,
140    'XDG_CONFIG_HOME': None,
141    # brz now uses the Win32 API and doesn't rely on APPDATA, but the
142    # tests do check our impls match APPDATA
143    'BRZ_EDITOR': None,  # test_msgeditor manipulates this variable
144    'VISUAL': None,
145    'EDITOR': None,
146    'BRZ_EMAIL': None,
147    'BZREMAIL': None,  # may still be present in the environment
148    'EMAIL': 'jrandom@example.com',  # set EMAIL as brz does not guess
149    'BRZ_PROGRESS_BAR': None,
150    # Trap leaks to $XDG_CACHE_HOME/breezy/brz.log. This occurs when tests use
151    # TestCase as a base class instead of TestCaseInTempDir. Tests inheriting
152    # from TestCase should not use disk resources, BRZ_LOG is one.
153    'BRZ_LOG': '/you-should-use-TestCaseInTempDir-if-you-need-a-log-file',
154    'BRZ_PLUGIN_PATH': '-site',
155    'BRZ_DISABLE_PLUGINS': None,
156    'BRZ_PLUGINS_AT': None,
157    'BRZ_CONCURRENCY': None,
158    # Make sure that any text ui tests are consistent regardless of
159    # the environment the test case is run in; you may want tests that
160    # test other combinations.  'dumb' is a reasonable guess for tests
161    # going to a pipe or a BytesIO.
162    'TERM': 'dumb',
163    'LINES': '25',
164    'COLUMNS': '80',
165    'BRZ_COLUMNS': '80',
166    # Disable SSH Agent
167    'SSH_AUTH_SOCK': None,
168    # Proxies
169    'http_proxy': None,
170    'HTTP_PROXY': None,
171    'https_proxy': None,
172    'HTTPS_PROXY': None,
173    'no_proxy': None,
174    'NO_PROXY': None,
175    'all_proxy': None,
176    'ALL_PROXY': None,
177    'BZR_REMOTE_PATH': None,
178    # Generally speaking, we don't want apport reporting on crashes in
179    # the test envirnoment unless we're specifically testing apport,
180    # so that it doesn't leak into the real system environment.  We
181    # use an env var so it propagates to subprocesses.
182    'APPORT_DISABLE': '1',
183    }
184
185
186def override_os_environ(test, env=None):
187    """Modify os.environ keeping a copy.
188
189    :param test: A test instance
190
191    :param env: A dict containing variable definitions to be installed
192    """
193    if env is None:
194        env = isolated_environ
195    test._original_os_environ = dict(**os.environ)
196    for var in env:
197        osutils.set_or_unset_env(var, env[var])
198        if var not in test._original_os_environ:
199            # The var is new, add it with a value of None, so
200            # restore_os_environ will delete it
201            test._original_os_environ[var] = None
202
203
204def restore_os_environ(test):
205    """Restore os.environ to its original state.
206
207    :param test: A test instance previously passed to override_os_environ.
208    """
209    for var, value in test._original_os_environ.items():
210        # Restore the original value (or delete it if the value has been set to
211        # None in override_os_environ).
212        osutils.set_or_unset_env(var, value)
213
214
215def _clear__type_equality_funcs(test):
216    """Cleanup bound methods stored on TestCase instances
217
218    Clear the dict breaking a few (mostly) harmless cycles in the affected
219    unittests released with Python 2.6 and initial Python 2.7 versions.
220
221    For a few revisions between Python 2.7.1 and Python 2.7.2 that annoyingly
222    shipped in Oneiric, an object with no clear method was used, hence the
223    extra complications, see bug 809048 for details.
224    """
225    type_equality_funcs = getattr(test, "_type_equality_funcs", None)
226    if type_equality_funcs is not None:
227        tef_clear = getattr(type_equality_funcs, "clear", None)
228        if tef_clear is None:
229            tef_instance_dict = getattr(type_equality_funcs, "__dict__", None)
230            if tef_instance_dict is not None:
231                tef_clear = tef_instance_dict.clear
232        if tef_clear is not None:
233            tef_clear()
234
235
236class ExtendedTestResult(testtools.TextTestResult):
237    """Accepts, reports and accumulates the results of running tests.
238
239    Compared to the unittest version this class adds support for
240    profiling, benchmarking, stopping as soon as a test fails,  and
241    skipping tests.  There are further-specialized subclasses for
242    different types of display.
243
244    When a test finishes, in whatever way, it calls one of the addSuccess,
245    addFailure or addError methods.  These in turn may redirect to a more
246    specific case for the special test results supported by our extended
247    tests.
248
249    Note that just one of these objects is fed the results from many tests.
250    """
251
252    stop_early = False
253
254    def __init__(self, stream, descriptions, verbosity,
255                 bench_history=None,
256                 strict=False,
257                 ):
258        """Construct new TestResult.
259
260        :param bench_history: Optionally, a writable file object to accumulate
261            benchmark results.
262        """
263        testtools.TextTestResult.__init__(self, stream)
264        if bench_history is not None:
265            from breezy.version import _get_bzr_source_tree
266            src_tree = _get_bzr_source_tree()
267            if src_tree:
268                try:
269                    revision_id = src_tree.get_parent_ids()[0]
270                except IndexError:
271                    # XXX: if this is a brand new tree, do the same as if there
272                    # is no branch.
273                    revision_id = b''
274            else:
275                # XXX: If there's no branch, what should we do?
276                revision_id = b''
277            bench_history.write("--date %s %s\n" % (time.time(), revision_id))
278        self._bench_history = bench_history
279        self.ui = ui.ui_factory
280        self.num_tests = 0
281        self.error_count = 0
282        self.failure_count = 0
283        self.known_failure_count = 0
284        self.skip_count = 0
285        self.not_applicable_count = 0
286        self.unsupported = {}
287        self.count = 0
288        self._overall_start_time = time.time()
289        self._strict = strict
290        self._first_thread_leaker_id = None
291        self._tests_leaking_threads_count = 0
292        self._traceback_from_test = None
293
294    def stopTestRun(self):
295        run = self.testsRun
296        actionTaken = "Ran"
297        stopTime = time.time()
298        timeTaken = stopTime - self.startTime
299        # GZ 2010-07-19: Seems testtools has no printErrors method, and though
300        #                the parent class method is similar have to duplicate
301        self._show_list('ERROR', self.errors)
302        self._show_list('FAIL', self.failures)
303        self.stream.write(self.sep2)
304        self.stream.write("%s %d test%s in %.3fs\n\n" % (
305            actionTaken, run, run != 1 and "s" or "", timeTaken))
306        if not self.wasSuccessful():
307            self.stream.write("FAILED (")
308            failed, errored = map(len, (self.failures, self.errors))
309            if failed:
310                self.stream.write("failures=%d" % failed)
311            if errored:
312                if failed:
313                    self.stream.write(", ")
314                self.stream.write("errors=%d" % errored)
315            if self.known_failure_count:
316                if failed or errored:
317                    self.stream.write(", ")
318                self.stream.write("known_failure_count=%d" %
319                                  self.known_failure_count)
320            self.stream.write(")\n")
321        else:
322            if self.known_failure_count:
323                self.stream.write("OK (known_failures=%d)\n" %
324                                  self.known_failure_count)
325            else:
326                self.stream.write("OK\n")
327        if self.skip_count > 0:
328            skipped = self.skip_count
329            self.stream.write('%d test%s skipped\n' %
330                              (skipped, skipped != 1 and "s" or ""))
331        if self.unsupported:
332            for feature, count in sorted(self.unsupported.items()):
333                self.stream.write("Missing feature '%s' skipped %d tests.\n" %
334                                  (feature, count))
335        if self._strict:
336            ok = self.wasStrictlySuccessful()
337        else:
338            ok = self.wasSuccessful()
339        if self._first_thread_leaker_id:
340            self.stream.write(
341                '%s is leaking threads among %d leaking tests.\n' % (
342                    self._first_thread_leaker_id,
343                    self._tests_leaking_threads_count))
344            # We don't report the main thread as an active one.
345            self.stream.write(
346                '%d non-main threads were left active in the end.\n'
347                % (len(self._active_threads) - 1))
348
349    def getDescription(self, test):
350        return test.id()
351
352    def _extractBenchmarkTime(self, testCase, details=None):
353        """Add a benchmark time for the current test case."""
354        if details and 'benchtime' in details:
355            return float(''.join(details['benchtime'].iter_bytes()))
356        return getattr(testCase, "_benchtime", None)
357
358    def _delta_to_float(self, a_timedelta, precision):
359        # This calls ceiling to ensure that the most pessimistic view of time
360        # taken is shown (rather than leaving it to the Python %f operator
361        # to decide whether to round/floor/ceiling. This was added when we
362        # had pyp3 test failures that suggest a floor was happening.
363        shift = 10 ** precision
364        return math.ceil(
365            (a_timedelta.days * 86400.0 + a_timedelta.seconds +
366             a_timedelta.microseconds / 1000000.0) * shift) / shift
367
368    def _elapsedTestTimeString(self):
369        """Return time string for overall time the current test has taken."""
370        return self._formatTime(self._delta_to_float(
371            self._now() - self._start_datetime, 3))
372
373    def _testTimeString(self, testCase):
374        benchmark_time = self._extractBenchmarkTime(testCase)
375        if benchmark_time is not None:
376            return self._formatTime(benchmark_time) + "*"
377        else:
378            return self._elapsedTestTimeString()
379
380    def _formatTime(self, seconds):
381        """Format seconds as milliseconds with leading spaces."""
382        # some benchmarks can take thousands of seconds to run, so we need 8
383        # places
384        return "%8dms" % (1000 * seconds)
385
386    def _shortened_test_description(self, test):
387        what = test.id()
388        what = re.sub(r'^breezy\.tests\.', '', what)
389        return what
390
391    # GZ 2010-10-04: Cloned tests may end up harmlessly calling this method
392    #                multiple times in a row, because the handler is added for
393    #                each test but the container list is shared between cases.
394    #                See lp:498869 lp:625574 and lp:637725 for background.
395    def _record_traceback_from_test(self, exc_info):
396        """Store the traceback from passed exc_info tuple till"""
397        self._traceback_from_test = exc_info[2]
398
399    def startTest(self, test):
400        super(ExtendedTestResult, self).startTest(test)
401        if self.count == 0:
402            self.startTests()
403        self.count += 1
404        self.report_test_start(test)
405        test.number = self.count
406        self._recordTestStartTime()
407        # Make testtools cases give us the real traceback on failure
408        addOnException = getattr(test, "addOnException", None)
409        if addOnException is not None:
410            addOnException(self._record_traceback_from_test)
411        # Only check for thread leaks on breezy derived test cases
412        if isinstance(test, TestCase):
413            test.addCleanup(self._check_leaked_threads, test)
414
415    def stopTest(self, test):
416        super(ExtendedTestResult, self).stopTest(test)
417        # Manually break cycles, means touching various private things but hey
418        getDetails = getattr(test, "getDetails", None)
419        if getDetails is not None:
420            getDetails().clear()
421        _clear__type_equality_funcs(test)
422        self._traceback_from_test = None
423
424    def startTests(self):
425        self.report_tests_starting()
426        self._active_threads = threading.enumerate()
427
428    def _check_leaked_threads(self, test):
429        """See if any threads have leaked since last call
430
431        A sample of live threads is stored in the _active_threads attribute,
432        when this method runs it compares the current live threads and any not
433        in the previous sample are treated as having leaked.
434        """
435        now_active_threads = set(threading.enumerate())
436        threads_leaked = now_active_threads.difference(self._active_threads)
437        if threads_leaked:
438            self._report_thread_leak(test, threads_leaked, now_active_threads)
439            self._tests_leaking_threads_count += 1
440            if self._first_thread_leaker_id is None:
441                self._first_thread_leaker_id = test.id()
442            self._active_threads = now_active_threads
443
444    def _recordTestStartTime(self):
445        """Record that a test has started."""
446        self._start_datetime = self._now()
447
448    def addError(self, test, err):
449        """Tell result that test finished with an error.
450
451        Called from the TestCase run() method when the test
452        fails with an unexpected error.
453        """
454        self._post_mortem(self._traceback_from_test or err[2])
455        super(ExtendedTestResult, self).addError(test, err)
456        self.error_count += 1
457        self.report_error(test, err)
458        if self.stop_early:
459            self.stop()
460
461    def addFailure(self, test, err):
462        """Tell result that test failed.
463
464        Called from the TestCase run() method when the test
465        fails because e.g. an assert() method failed.
466        """
467        self._post_mortem(self._traceback_from_test or err[2])
468        super(ExtendedTestResult, self).addFailure(test, err)
469        self.failure_count += 1
470        self.report_failure(test, err)
471        if self.stop_early:
472            self.stop()
473
474    def addSuccess(self, test, details=None):
475        """Tell result that test completed successfully.
476
477        Called from the TestCase run()
478        """
479        if self._bench_history is not None:
480            benchmark_time = self._extractBenchmarkTime(test, details)
481            if benchmark_time is not None:
482                self._bench_history.write("%s %s\n" % (
483                    self._formatTime(benchmark_time),
484                    test.id()))
485        self.report_success(test)
486        super(ExtendedTestResult, self).addSuccess(test)
487        test._log_contents = ''
488
489    def addExpectedFailure(self, test, err):
490        self.known_failure_count += 1
491        self.report_known_failure(test, err)
492
493    def addUnexpectedSuccess(self, test, details=None):
494        """Tell result the test unexpectedly passed, counting as a failure
495
496        When the minimum version of testtools required becomes 0.9.8 this
497        can be updated to use the new handling there.
498        """
499        super(ExtendedTestResult, self).addFailure(test, details=details)
500        self.failure_count += 1
501        self.report_unexpected_success(test,
502                                       "".join(details["reason"].iter_text()))
503        if self.stop_early:
504            self.stop()
505
506    def addNotSupported(self, test, feature):
507        """The test will not be run because of a missing feature.
508        """
509        # this can be called in two different ways: it may be that the
510        # test started running, and then raised (through requireFeature)
511        # UnavailableFeature.  Alternatively this method can be called
512        # while probing for features before running the test code proper; in
513        # that case we will see startTest and stopTest, but the test will
514        # never actually run.
515        self.unsupported.setdefault(str(feature), 0)
516        self.unsupported[str(feature)] += 1
517        self.report_unsupported(test, feature)
518
519    def addSkip(self, test, reason):
520        """A test has not run for 'reason'."""
521        self.skip_count += 1
522        self.report_skip(test, reason)
523
524    def addNotApplicable(self, test, reason):
525        self.not_applicable_count += 1
526        self.report_not_applicable(test, reason)
527
528    def _count_stored_tests(self):
529        """Count of tests instances kept alive due to not succeeding"""
530        return self.error_count + self.failure_count + self.known_failure_count
531
532    def _post_mortem(self, tb=None):
533        """Start a PDB post mortem session."""
534        if os.environ.get('BRZ_TEST_PDB', None):
535            import pdb
536            pdb.post_mortem(tb)
537
538    def progress(self, offset, whence):
539        """The test is adjusting the count of tests to run."""
540        if whence == SUBUNIT_SEEK_SET:
541            self.num_tests = offset
542        elif whence == SUBUNIT_SEEK_CUR:
543            self.num_tests += offset
544        else:
545            raise errors.BzrError("Unknown whence %r" % whence)
546
547    def report_tests_starting(self):
548        """Display information before the test run begins"""
549        if getattr(sys, 'frozen', None) is None:
550            bzr_path = osutils.realpath(sys.argv[0])
551        else:
552            bzr_path = sys.executable
553        self.stream.write(
554            'brz selftest: %s\n' % (bzr_path,))
555        self.stream.write(
556            '   %s\n' % (
557                breezy.__path__[0],))
558        self.stream.write(
559            '   bzr-%s python-%s %s\n' % (
560                breezy.version_string,
561                breezy._format_version_tuple(sys.version_info),
562                platform.platform(aliased=1),
563                ))
564        self.stream.write('\n')
565
566    def report_test_start(self, test):
567        """Display information on the test just about to be run"""
568
569    def _report_thread_leak(self, test, leaked_threads, active_threads):
570        """Display information on a test that leaked one or more threads"""
571        # GZ 2010-09-09: A leak summary reported separately from the general
572        #                thread debugging would be nice. Tests under subunit
573        #                need something not using stream, perhaps adding a
574        #                testtools details object would be fitting.
575        if 'threads' in selftest_debug_flags:
576            self.stream.write('%s is leaking, active is now %d\n' %
577                              (test.id(), len(active_threads)))
578
579    def startTestRun(self):
580        self.startTime = time.time()
581
582    def report_success(self, test):
583        pass
584
585    def wasStrictlySuccessful(self):
586        if self.unsupported or self.known_failure_count:
587            return False
588        return self.wasSuccessful()
589
590
591class TextTestResult(ExtendedTestResult):
592    """Displays progress and results of tests in text form"""
593
594    def __init__(self, stream, descriptions, verbosity,
595                 bench_history=None,
596                 strict=None,
597                 ):
598        ExtendedTestResult.__init__(self, stream, descriptions, verbosity,
599                                    bench_history, strict)
600        self.pb = self.ui.nested_progress_bar()
601        self.pb.show_pct = False
602        self.pb.show_spinner = False
603        self.pb.show_eta = False,
604        self.pb.show_count = False
605        self.pb.show_bar = False
606        self.pb.update_latency = 0
607        self.pb.show_transport_activity = False
608
609    def stopTestRun(self):
610        # called when the tests that are going to run have run
611        self.pb.clear()
612        self.pb.finished()
613        super(TextTestResult, self).stopTestRun()
614
615    def report_tests_starting(self):
616        super(TextTestResult, self).report_tests_starting()
617        self.pb.update('[test 0/%d] Starting' % (self.num_tests))
618
619    def _progress_prefix_text(self):
620        # the longer this text, the less space we have to show the test
621        # name...
622        a = '[%d' % self.count              # total that have been run
623        # tests skipped as known not to be relevant are not important enough
624        # to show here
625        # if self.skip_count:
626        #     a += ', %d skip' % self.skip_count
627        # if self.known_failure_count:
628        #     a += '+%dX' % self.known_failure_count
629        if self.num_tests:
630            a += '/%d' % self.num_tests
631        a += ' in '
632        runtime = time.time() - self._overall_start_time
633        if runtime >= 60:
634            a += '%dm%ds' % (runtime / 60, runtime % 60)
635        else:
636            a += '%ds' % runtime
637        total_fail_count = self.error_count + self.failure_count
638        if total_fail_count:
639            a += ', %d failed' % total_fail_count
640        # if self.unsupported:
641        #     a += ', %d missing' % len(self.unsupported)
642        a += ']'
643        return a
644
645    def report_test_start(self, test):
646        self.pb.update(
647            self._progress_prefix_text() +
648            ' ' +
649            self._shortened_test_description(test))
650
651    def _test_description(self, test):
652        return self._shortened_test_description(test)
653
654    def report_error(self, test, err):
655        self.stream.write('ERROR: %s\n    %s\n' % (
656            self._test_description(test),
657            err[1],
658            ))
659
660    def report_failure(self, test, err):
661        self.stream.write('FAIL: %s\n    %s\n' % (
662            self._test_description(test),
663            err[1],
664            ))
665
666    def report_known_failure(self, test, err):
667        pass
668
669    def report_unexpected_success(self, test, reason):
670        self.stream.write('FAIL: %s\n    %s: %s\n' % (
671            self._test_description(test),
672            "Unexpected success. Should have failed",
673            reason,
674            ))
675
676    def report_skip(self, test, reason):
677        pass
678
679    def report_not_applicable(self, test, reason):
680        pass
681
682    def report_unsupported(self, test, feature):
683        """test cannot be run because feature is missing."""
684
685
686class VerboseTestResult(ExtendedTestResult):
687    """Produce long output, with one line per test run plus times"""
688
689    def _ellipsize_to_right(self, a_string, final_width):
690        """Truncate and pad a string, keeping the right hand side"""
691        if len(a_string) > final_width:
692            result = '...' + a_string[3 - final_width:]
693        else:
694            result = a_string
695        return result.ljust(final_width)
696
697    def report_tests_starting(self):
698        self.stream.write('running %d tests...\n' % self.num_tests)
699        super(VerboseTestResult, self).report_tests_starting()
700
701    def report_test_start(self, test):
702        name = self._shortened_test_description(test)
703        width = osutils.terminal_width()
704        if width is not None:
705            # width needs space for 6 char status, plus 1 for slash, plus an
706            # 11-char time string, plus a trailing blank
707            # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on
708            # space
709            self.stream.write(self._ellipsize_to_right(name, width - 18))
710        else:
711            self.stream.write(name)
712        self.stream.flush()
713
714    def _error_summary(self, err):
715        indent = ' ' * 4
716        return '%s%s' % (indent, err[1])
717
718    def report_error(self, test, err):
719        self.stream.write('ERROR %s\n%s\n'
720                          % (self._testTimeString(test),
721                             self._error_summary(err)))
722
723    def report_failure(self, test, err):
724        self.stream.write(' FAIL %s\n%s\n'
725                          % (self._testTimeString(test),
726                             self._error_summary(err)))
727
728    def report_known_failure(self, test, err):
729        self.stream.write('XFAIL %s\n%s\n'
730                          % (self._testTimeString(test),
731                             self._error_summary(err)))
732
733    def report_unexpected_success(self, test, reason):
734        self.stream.write(' FAIL %s\n%s: %s\n'
735                          % (self._testTimeString(test),
736                             "Unexpected success. Should have failed",
737                             reason))
738
739    def report_success(self, test):
740        self.stream.write('   OK %s\n' % self._testTimeString(test))
741        for bench_called, stats in getattr(test, '_benchcalls', []):
742            self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
743            stats.pprint(file=self.stream)
744        # flush the stream so that we get smooth output. This verbose mode is
745        # used to show the output in PQM.
746        self.stream.flush()
747
748    def report_skip(self, test, reason):
749        self.stream.write(' SKIP %s\n%s\n'
750                          % (self._testTimeString(test), reason))
751
752    def report_not_applicable(self, test, reason):
753        self.stream.write('  N/A %s\n    %s\n'
754                          % (self._testTimeString(test), reason))
755
756    def report_unsupported(self, test, feature):
757        """test cannot be run because feature is missing."""
758        self.stream.write("NODEP %s\n    The feature '%s' is not available.\n"
759                          % (self._testTimeString(test), feature))
760
761
762class TextTestRunner(object):
763    stop_on_failure = False
764
765    def __init__(self,
766                 stream=sys.stderr,
767                 descriptions=0,
768                 verbosity=1,
769                 bench_history=None,
770                 strict=False,
771                 result_decorators=None,
772                 ):
773        """Create a TextTestRunner.
774
775        :param result_decorators: An optional list of decorators to apply
776            to the result object being used by the runner. Decorators are
777            applied left to right - the first element in the list is the
778            innermost decorator.
779        """
780        # stream may know claim to know to write unicode strings, but in older
781        # pythons this goes sufficiently wrong that it is a bad idea. (
782        # specifically a built in file with encoding 'UTF-8' will still try
783        # to encode using ascii.
784        new_encoding = osutils.get_terminal_encoding()
785        codec = codecs.lookup(new_encoding)
786        encode = codec.encode
787        # GZ 2010-09-08: Really we don't want to be writing arbitrary bytes,
788        #                so should swap to the plain codecs.StreamWriter
789        stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream,
790                                                     "backslashreplace")
791        stream.encoding = new_encoding
792        self.stream = stream
793        self.descriptions = descriptions
794        self.verbosity = verbosity
795        self._bench_history = bench_history
796        self._strict = strict
797        self._result_decorators = result_decorators or []
798
799    def run(self, test):
800        "Run the given test case or test suite."
801        if self.verbosity == 1:
802            result_class = TextTestResult
803        elif self.verbosity >= 2:
804            result_class = VerboseTestResult
805        original_result = result_class(self.stream,
806                                       self.descriptions,
807                                       self.verbosity,
808                                       bench_history=self._bench_history,
809                                       strict=self._strict,
810                                       )
811        # Signal to result objects that look at stop early policy to stop,
812        original_result.stop_early = self.stop_on_failure
813        result = original_result
814        for decorator in self._result_decorators:
815            result = decorator(result)
816            result.stop_early = self.stop_on_failure
817        result.startTestRun()
818        try:
819            test.run(result)
820        finally:
821            result.stopTestRun()
822        # higher level code uses our extended protocol to determine
823        # what exit code to give.
824        return original_result
825
826
827def iter_suite_tests(suite):
828    """Return all tests in a suite, recursing through nested suites"""
829    if isinstance(suite, unittest.TestCase):
830        yield suite
831    elif isinstance(suite, unittest.TestSuite):
832        for item in suite:
833            for r in iter_suite_tests(item):
834                yield r
835    else:
836        raise Exception('unknown type %r for object %r'
837                        % (type(suite), suite))
838
839
840TestSkipped = testtools.testcase.TestSkipped
841
842
843class TestNotApplicable(TestSkipped):
844    """A test is not applicable to the situation where it was run.
845
846    This is only normally raised by parameterized tests, if they find that
847    the instance they're constructed upon does not support one aspect
848    of its interface.
849    """
850
851
852# traceback._some_str fails to format exceptions that have the default
853# __str__ which does an implicit ascii conversion. However, repr() on those
854# objects works, for all that its not quite what the doctor may have ordered.
855def _clever_some_str(value):
856    try:
857        return str(value)
858    except BaseException:
859        try:
860            return repr(value).replace('\\n', '\n')
861        except BaseException:
862            return '<unprintable %s object>' % type(value).__name__
863
864
865traceback._some_str = _clever_some_str
866
867
868# deprecated - use self.knownFailure(), or self.expectFailure.
869KnownFailure = testtools.testcase._ExpectedFailure
870
871
872class UnavailableFeature(Exception):
873    """A feature required for this test was not available.
874
875    This can be considered a specialised form of SkippedTest.
876
877    The feature should be used to construct the exception.
878    """
879
880
881class StringIOWrapper(ui_testing.BytesIOWithEncoding):
882
883    @symbol_versioning.deprecated_method(
884        symbol_versioning.deprecated_in((3, 0)))
885    def __init__(self, s=None):
886        super(StringIOWrapper, self).__init__(s)
887
888
889TestUIFactory = ui_testing.TestUIFactory
890
891
892def isolated_doctest_setUp(test):
893    override_os_environ(test)
894    osutils.set_or_unset_env('BRZ_HOME', '/nonexistent')
895    test._orig_ui_factory = ui.ui_factory
896    ui.ui_factory = ui.SilentUIFactory()
897
898
899def isolated_doctest_tearDown(test):
900    restore_os_environ(test)
901    ui.ui_factory = test._orig_ui_factory
902
903
904def IsolatedDocTestSuite(*args, **kwargs):
905    """Overrides doctest.DocTestSuite to handle isolation.
906
907    The method is really a factory and users are expected to use it as such.
908    """
909
910    kwargs['setUp'] = isolated_doctest_setUp
911    kwargs['tearDown'] = isolated_doctest_tearDown
912    return doctest.DocTestSuite(*args, **kwargs)
913
914
915class TestCase(testtools.TestCase):
916    """Base class for brz unit tests.
917
918    Tests that need access to disk resources should subclass
919    TestCaseInTempDir not TestCase.
920
921    Error and debug log messages are redirected from their usual
922    location into a temporary file, the contents of which can be
923    retrieved by _get_log().  We use a real OS file, not an in-memory object,
924    so that it can also capture file IO.  When the test completes this file
925    is read into memory and removed from disk.
926
927    There are also convenience functions to invoke bzr's command-line
928    routine, and to build and check brz trees.
929
930    In addition to the usual method of overriding tearDown(), this class also
931    allows subclasses to register cleanup functions via addCleanup, which are
932    run in order as the object is torn down.  It's less likely this will be
933    accidentally overlooked.
934    """
935
936    _log_file = None
937    # record lsprof data when performing benchmark calls.
938    _gather_lsprof_in_benchmarks = False
939
940    def __init__(self, methodName='testMethod'):
941        super(TestCase, self).__init__(methodName)
942        self._directory_isolation = True
943        self.exception_handlers.insert(
944            0, (UnavailableFeature, self._do_unsupported_or_skip))
945        self.exception_handlers.insert(
946            0, (TestNotApplicable, self._do_not_applicable))
947
948    def setUp(self):
949        super(TestCase, self).setUp()
950
951        # At this point we're still accessing the config files in $BRZ_HOME (as
952        # set by the user running selftest).
953        timeout = config.GlobalStack().get('selftest.timeout')
954        if timeout:
955            timeout_fixture = fixtures.TimeoutFixture(timeout)
956            timeout_fixture.setUp()
957            self.addCleanup(timeout_fixture.cleanUp)
958
959        for feature in getattr(self, '_test_needs_features', []):
960            self.requireFeature(feature)
961        self._cleanEnvironment()
962
963        self.overrideAttr(breezy.get_global_state(), 'cmdline_overrides',
964                          config.CommandLineStore())
965
966        self._silenceUI()
967        self._startLogFile()
968        self._benchcalls = []
969        self._benchtime = None
970        self._clear_hooks()
971        self._track_transports()
972        self._track_locks()
973        self._clear_debug_flags()
974        # Isolate global verbosity level, to make sure it's reproducible
975        # between tests.  We should get rid of this altogether: bug 656694. --
976        # mbp 20101008
977        self.overrideAttr(breezy.trace, '_verbosity_level', 0)
978        self._log_files = set()
979        # Each key in the ``_counters`` dict holds a value for a different
980        # counter. When the test ends, addDetail() should be used to output the
981        # counter values. This happens in install_counter_hook().
982        self._counters = {}
983        if 'config_stats' in selftest_debug_flags:
984            self._install_config_stats_hooks()
985        # Do not use i18n for tests (unless the test reverses this)
986        i18n.disable_i18n()
987
988    def debug(self):
989        # debug a frame up.
990        import pdb
991        # The sys preserved stdin/stdout should allow blackbox tests debugging
992        pdb.Pdb(stdin=sys.__stdin__, stdout=sys.__stdout__
993                ).set_trace(sys._getframe().f_back)
994
995    def discardDetail(self, name):
996        """Extend the addDetail, getDetails api so we can remove a detail.
997
998        eg. brz always adds the 'log' detail at startup, but we don't want to
999        include it for skipped, xfail, etc tests.
1000
1001        It is safe to call this for a detail that doesn't exist, in case this
1002        gets called multiple times.
1003        """
1004        # We cheat. details is stored in __details which means we shouldn't
1005        # touch it. but getDetails() returns the dict directly, so we can
1006        # mutate it.
1007        details = self.getDetails()
1008        if name in details:
1009            del details[name]
1010
1011    def install_counter_hook(self, hooks, name, counter_name=None):
1012        """Install a counting hook.
1013
1014        Any hook can be counted as long as it doesn't need to return a value.
1015
1016        :param hooks: Where the hook should be installed.
1017
1018        :param name: The hook name that will be counted.
1019
1020        :param counter_name: The counter identifier in ``_counters``, defaults
1021            to ``name``.
1022        """
1023        _counters = self._counters  # Avoid closing over self
1024        if counter_name is None:
1025            counter_name = name
1026        if counter_name in _counters:
1027            raise AssertionError('%s is already used as a counter name'
1028                                 % (counter_name,))
1029        _counters[counter_name] = 0
1030        self.addDetail(counter_name, content.Content(
1031            content.UTF8_TEXT,
1032            lambda: [b'%d' % (_counters[counter_name],)]))
1033
1034        def increment_counter(*args, **kwargs):
1035            _counters[counter_name] += 1
1036        label = 'count %s calls' % (counter_name,)
1037        hooks.install_named_hook(name, increment_counter, label)
1038        self.addCleanup(hooks.uninstall_named_hook, name, label)
1039
1040    def _install_config_stats_hooks(self):
1041        """Install config hooks to count hook calls.
1042
1043        """
1044        for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1045            self.install_counter_hook(config.ConfigHooks, hook_name,
1046                                      'config.%s' % (hook_name,))
1047
1048        # The OldConfigHooks are private and need special handling to protect
1049        # against recursive tests (tests that run other tests), so we just do
1050        # manually what registering them into _builtin_known_hooks will provide
1051        # us.
1052        self.overrideAttr(config, 'OldConfigHooks', config._OldConfigHooks())
1053        for hook_name in ('get', 'set', 'remove', 'load', 'save'):
1054            self.install_counter_hook(config.OldConfigHooks, hook_name,
1055                                      'old_config.%s' % (hook_name,))
1056
1057    def _clear_debug_flags(self):
1058        """Prevent externally set debug flags affecting tests.
1059
1060        Tests that want to use debug flags can just set them in the
1061        debug_flags set during setup/teardown.
1062        """
1063        # Start with a copy of the current debug flags we can safely modify.
1064        self.overrideAttr(debug, 'debug_flags', set(debug.debug_flags))
1065        if 'allow_debug' not in selftest_debug_flags:
1066            debug.debug_flags.clear()
1067        if 'disable_lock_checks' not in selftest_debug_flags:
1068            debug.debug_flags.add('strict_locks')
1069
1070    def _clear_hooks(self):
1071        # prevent hooks affecting tests
1072        known_hooks = hooks.known_hooks
1073        self._preserved_hooks = {}
1074        for key, (parent, name) in known_hooks.iter_parent_objects():
1075            current_hooks = getattr(parent, name)
1076            self._preserved_hooks[parent] = (name, current_hooks)
1077        self._preserved_lazy_hooks = hooks._lazy_hooks
1078        hooks._lazy_hooks = {}
1079        self.addCleanup(self._restoreHooks)
1080        for key, (parent, name) in known_hooks.iter_parent_objects():
1081            factory = known_hooks.get(key)
1082            setattr(parent, name, factory())
1083        # this hook should always be installed
1084        request._install_hook()
1085
1086    def disable_directory_isolation(self):
1087        """Turn off directory isolation checks."""
1088        self._directory_isolation = False
1089
1090    def enable_directory_isolation(self):
1091        """Enable directory isolation checks."""
1092        self._directory_isolation = True
1093
1094    def _silenceUI(self):
1095        """Turn off UI for duration of test"""
1096        # by default the UI is off; tests can turn it on if they want it.
1097        self.overrideAttr(ui, 'ui_factory', ui.SilentUIFactory())
1098
1099    def _check_locks(self):
1100        """Check that all lock take/release actions have been paired."""
1101        # We always check for mismatched locks. If a mismatch is found, we
1102        # fail unless -Edisable_lock_checks is supplied to selftest, in which
1103        # case we just print a warning.
1104        # unhook:
1105        acquired_locks = [lock for action, lock in self._lock_actions
1106                          if action == 'acquired']
1107        released_locks = [lock for action, lock in self._lock_actions
1108                          if action == 'released']
1109        broken_locks = [lock for action, lock in self._lock_actions
1110                        if action == 'broken']
1111        # trivially, given the tests for lock acquistion and release, if we
1112        # have as many in each list, it should be ok. Some lock tests also
1113        # break some locks on purpose and should be taken into account by
1114        # considering that breaking a lock is just a dirty way of releasing it.
1115        if len(acquired_locks) != (len(released_locks) + len(broken_locks)):
1116            message = (
1117                'Different number of acquired and '
1118                'released or broken locks.\n'
1119                'acquired=%s\n'
1120                'released=%s\n'
1121                'broken=%s\n' %
1122                (acquired_locks, released_locks, broken_locks))
1123            if not self._lock_check_thorough:
1124                # Rather than fail, just warn
1125                print("Broken test %s: %s" % (self, message))
1126                return
1127            self.fail(message)
1128
1129    def _track_locks(self):
1130        """Track lock activity during tests."""
1131        self._lock_actions = []
1132        if 'disable_lock_checks' in selftest_debug_flags:
1133            self._lock_check_thorough = False
1134        else:
1135            self._lock_check_thorough = True
1136
1137        self.addCleanup(self._check_locks)
1138        _mod_lock.Lock.hooks.install_named_hook('lock_acquired',
1139                                                self._lock_acquired, None)
1140        _mod_lock.Lock.hooks.install_named_hook('lock_released',
1141                                                self._lock_released, None)
1142        _mod_lock.Lock.hooks.install_named_hook('lock_broken',
1143                                                self._lock_broken, None)
1144
1145    def _lock_acquired(self, result):
1146        self._lock_actions.append(('acquired', result))
1147
1148    def _lock_released(self, result):
1149        self._lock_actions.append(('released', result))
1150
1151    def _lock_broken(self, result):
1152        self._lock_actions.append(('broken', result))
1153
1154    def permit_dir(self, name):
1155        """Permit a directory to be used by this test. See permit_url."""
1156        name_transport = _mod_transport.get_transport_from_path(name)
1157        self.permit_url(name)
1158        self.permit_url(name_transport.base)
1159
1160    def permit_url(self, url):
1161        """Declare that url is an ok url to use in this test.
1162
1163        Do this for memory transports, temporary test directory etc.
1164
1165        Do not do this for the current working directory, /tmp, or any other
1166        preexisting non isolated url.
1167        """
1168        if not url.endswith('/'):
1169            url += '/'
1170        self._bzr_selftest_roots.append(url)
1171
1172    def permit_source_tree_branch_repo(self):
1173        """Permit the source tree brz is running from to be opened.
1174
1175        Some code such as breezy.version attempts to read from the brz branch
1176        that brz is executing from (if any). This method permits that directory
1177        to be used in the test suite.
1178        """
1179        path = self.get_source_path()
1180        self.record_directory_isolation()
1181        try:
1182            try:
1183                workingtree.WorkingTree.open(path)
1184            except (errors.NotBranchError, errors.NoWorkingTree):
1185                raise TestSkipped('Needs a working tree of brz sources')
1186        finally:
1187            self.enable_directory_isolation()
1188
1189    def _preopen_isolate_transport(self, transport):
1190        """Check that all transport openings are done in the test work area."""
1191        while isinstance(transport, pathfilter.PathFilteringTransport):
1192            # Unwrap pathfiltered transports
1193            transport = transport.server.backing_transport.clone(
1194                transport._filter('.'))
1195        url = transport.base
1196        # ReadonlySmartTCPServer_for_testing decorates the backing transport
1197        # urls it is given by prepending readonly+. This is appropriate as the
1198        # client shouldn't know that the server is readonly (or not readonly).
1199        # We could register all servers twice, with readonly+ prepending, but
1200        # that makes for a long list; this is about the same but easier to
1201        # read.
1202        if url.startswith('readonly+'):
1203            url = url[len('readonly+'):]
1204        self._preopen_isolate_url(url)
1205
1206    def _preopen_isolate_url(self, url):
1207        if not self._directory_isolation:
1208            return
1209        if self._directory_isolation == 'record':
1210            self._bzr_selftest_roots.append(url)
1211            return
1212        # This prevents all transports, including e.g. sftp ones backed on disk
1213        # from working unless they are explicitly granted permission. We then
1214        # depend on the code that sets up test transports to check that they
1215        # are appropriately isolated and enable their use by calling
1216        # self.permit_transport()
1217        if not osutils.is_inside_any(self._bzr_selftest_roots, url):
1218            raise errors.BzrError("Attempt to escape test isolation: %r %r"
1219                                  % (url, self._bzr_selftest_roots))
1220
1221    def record_directory_isolation(self):
1222        """Gather accessed directories to permit later access.
1223
1224        This is used for tests that access the branch brz is running from.
1225        """
1226        self._directory_isolation = "record"
1227
1228    def start_server(self, transport_server, backing_server=None):
1229        """Start transport_server for this test.
1230
1231        This starts the server, registers a cleanup for it and permits the
1232        server's urls to be used.
1233        """
1234        if backing_server is None:
1235            transport_server.start_server()
1236        else:
1237            transport_server.start_server(backing_server)
1238        self.addCleanup(transport_server.stop_server)
1239        # Obtain a real transport because if the server supplies a password, it
1240        # will be hidden from the base on the client side.
1241        t = _mod_transport.get_transport_from_url(transport_server.get_url())
1242        # Some transport servers effectively chroot the backing transport;
1243        # others like SFTPServer don't - users of the transport can walk up the
1244        # transport to read the entire backing transport. This wouldn't matter
1245        # except that the workdir tests are given - and that they expect the
1246        # server's url to point at - is one directory under the safety net. So
1247        # Branch operations into the transport will attempt to walk up one
1248        # directory. Chrooting all servers would avoid this but also mean that
1249        # we wouldn't be testing directly against non-root urls. Alternatively
1250        # getting the test framework to start the server with a backing server
1251        # at the actual safety net directory would work too, but this then
1252        # means that the self.get_url/self.get_transport methods would need
1253        # to transform all their results. On balance its cleaner to handle it
1254        # here, and permit a higher url when we have one of these transports.
1255        if t.base.endswith('/work/'):
1256            # we have safety net/test root/work
1257            t = t.clone('../..')
1258        elif isinstance(transport_server,
1259                        test_server.SmartTCPServer_for_testing):
1260            # The smart server adds a path similar to work, which is traversed
1261            # up from by the client. But the server is chrooted - the actual
1262            # backing transport is not escaped from, and VFS requests to the
1263            # root will error (because they try to escape the chroot).
1264            t2 = t.clone('..')
1265            while t2.base != t.base:
1266                t = t2
1267                t2 = t.clone('..')
1268        self.permit_url(t.base)
1269
1270    def _track_transports(self):
1271        """Install checks for transport usage."""
1272        # TestCase has no safe place it can write to.
1273        self._bzr_selftest_roots = []
1274        # Currently the easiest way to be sure that nothing is going on is to
1275        # hook into brz dir opening. This leaves a small window of error for
1276        # transport tests, but they are well known, and we can improve on this
1277        # step.
1278        controldir.ControlDir.hooks.install_named_hook(
1279            "pre_open", self._preopen_isolate_transport,
1280            "Check brz directories are safe.")
1281
1282    def _ndiff_strings(self, a, b):
1283        """Return ndiff between two strings containing lines.
1284
1285        A trailing newline is added if missing to make the strings
1286        print properly."""
1287        if b and not b.endswith('\n'):
1288            b += '\n'
1289        if a and not a.endswith('\n'):
1290            a += '\n'
1291        difflines = difflib.ndiff(a.splitlines(True),
1292                                  b.splitlines(True),
1293                                  linejunk=lambda x: False,
1294                                  charjunk=lambda x: False)
1295        return ''.join(difflines)
1296
1297    def assertEqual(self, a, b, message=''):
1298        try:
1299            if a == b:
1300                return
1301        except UnicodeError as e:
1302            # If we can't compare without getting a UnicodeError, then
1303            # obviously they are different
1304            trace.mutter('UnicodeError: %s', e)
1305        if message:
1306            message += '\n'
1307        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
1308                             % (message,
1309                                pprint.pformat(a), pprint.pformat(b)))
1310
1311    # FIXME: This is deprecated in unittest2 but plugins may still use it so we
1312    # need a deprecation period for them -- vila 2016-02-01
1313    assertEquals = assertEqual
1314
1315    def assertEqualDiff(self, a, b, message=None):
1316        """Assert two texts are equal, if not raise an exception.
1317
1318        This is intended for use with multi-line strings where it can
1319        be hard to find the differences by eye.
1320        """
1321        # TODO: perhaps override assertEqual to call this for strings?
1322        if a == b:
1323            return
1324        if message is None:
1325            message = "texts not equal:\n"
1326        if a + ('\n' if isinstance(a, str) else b'\n') == b:
1327            message = 'first string is missing a final newline.\n'
1328        if a == b + ('\n' if isinstance(b, str) else b'\n'):
1329            message = 'second string is missing a final newline.\n'
1330        raise AssertionError(message
1331                             + self._ndiff_strings(
1332                                 a if isinstance(a, str) else a.decode(),
1333                                 b if isinstance(b, str) else b.decode()))
1334
1335    def assertEqualMode(self, mode, mode_test):
1336        self.assertEqual(mode, mode_test,
1337                         'mode mismatch %o != %o' % (mode, mode_test))
1338
1339    def assertEqualStat(self, expected, actual):
1340        """assert that expected and actual are the same stat result.
1341
1342        :param expected: A stat result.
1343        :param actual: A stat result.
1344        :raises AssertionError: If the expected and actual stat values differ
1345            other than by atime.
1346        """
1347        self.assertEqual(expected.st_size, actual.st_size,
1348                         'st_size did not match')
1349        self.assertEqual(expected.st_mtime, actual.st_mtime,
1350                         'st_mtime did not match')
1351        self.assertEqual(expected.st_ctime, actual.st_ctime,
1352                         'st_ctime did not match')
1353        if sys.platform == 'win32':
1354            # On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
1355            # is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
1356            # odd. We just force it to always be 0 to avoid any problems.
1357            self.assertEqual(0, expected.st_dev)
1358            self.assertEqual(0, actual.st_dev)
1359            self.assertEqual(0, expected.st_ino)
1360            self.assertEqual(0, actual.st_ino)
1361        else:
1362            self.assertEqual(expected.st_dev, actual.st_dev,
1363                             'st_dev did not match')
1364            self.assertEqual(expected.st_ino, actual.st_ino,
1365                             'st_ino did not match')
1366        self.assertEqual(expected.st_mode, actual.st_mode,
1367                         'st_mode did not match')
1368
1369    def assertLength(self, length, obj_with_len):
1370        """Assert that obj_with_len is of length length."""
1371        if len(obj_with_len) != length:
1372            self.fail("Incorrect length: wanted %d, got %d for %r" % (
1373                length, len(obj_with_len), obj_with_len))
1374
1375    def assertLogsError(self, exception_class, func, *args, **kwargs):
1376        """Assert that `func(*args, **kwargs)` quietly logs a specific error.
1377        """
1378        captured = []
1379        orig_log_exception_quietly = trace.log_exception_quietly
1380        try:
1381            def capture():
1382                orig_log_exception_quietly()
1383                captured.append(sys.exc_info()[1])
1384            trace.log_exception_quietly = capture
1385            func(*args, **kwargs)
1386        finally:
1387            trace.log_exception_quietly = orig_log_exception_quietly
1388        self.assertLength(1, captured)
1389        err = captured[0]
1390        self.assertIsInstance(err, exception_class)
1391        return err
1392
1393    def assertPositive(self, val):
1394        """Assert that val is greater than 0."""
1395        self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
1396
1397    def assertNegative(self, val):
1398        """Assert that val is less than 0."""
1399        self.assertTrue(val < 0, 'expected a negative value, but got %s' % val)
1400
1401    def assertStartsWith(self, s, prefix):
1402        if not s.startswith(prefix):
1403            raise AssertionError(
1404                'string %r does not start with %r' % (s, prefix))
1405
1406    def assertEndsWith(self, s, suffix):
1407        """Asserts that s ends with suffix."""
1408        if not s.endswith(suffix):
1409            raise AssertionError(
1410                'string %r does not end with %r' % (s, suffix))
1411
1412    def assertContainsRe(self, haystack, needle_re, flags=0):
1413        """Assert that a contains something matching a regular expression."""
1414        if not re.search(needle_re, haystack, flags):
1415            if (('\n' if isinstance(haystack, str) else b'\n') in haystack or
1416                    len(haystack) > 60):
1417                # a long string, format it in a more readable way
1418                raise AssertionError(
1419                    'pattern "%s" not found in\n"""\\\n%s"""\n'
1420                    % (needle_re, haystack))
1421            else:
1422                raise AssertionError('pattern "%s" not found in "%s"'
1423                                     % (needle_re, haystack))
1424
1425    def assertNotContainsRe(self, haystack, needle_re, flags=0):
1426        """Assert that a does not match a regular expression"""
1427        if re.search(needle_re, haystack, flags):
1428            raise AssertionError('pattern "%s" found in "%s"'
1429                                 % (needle_re, haystack))
1430
1431    def assertContainsString(self, haystack, needle):
1432        if haystack.find(needle) == -1:
1433            self.fail("string %r not found in '''%s'''" % (needle, haystack))
1434
1435    def assertNotContainsString(self, haystack, needle):
1436        if haystack.find(needle) != -1:
1437            self.fail("string %r found in '''%s'''" % (needle, haystack))
1438
1439    def assertSubset(self, sublist, superlist):
1440        """Assert that every entry in sublist is present in superlist."""
1441        missing = set(sublist) - set(superlist)
1442        if len(missing) > 0:
1443            raise AssertionError("value(s) %r not present in container %r" %
1444                                 (missing, superlist))
1445
1446    def assertListRaises(self, excClass, func, *args, **kwargs):
1447        """Fail unless excClass is raised when the iterator from func is used.
1448
1449        Many functions can return generators this makes sure
1450        to wrap them in a list() call to make sure the whole generator
1451        is run, and that the proper exception is raised.
1452        """
1453        try:
1454            list(func(*args, **kwargs))
1455        except excClass as e:
1456            return e
1457        else:
1458            if getattr(excClass, '__name__', None) is not None:
1459                excName = excClass.__name__
1460            else:
1461                excName = str(excClass)
1462            raise self.failureException("%s not raised" % excName)
1463
1464    def assertRaises(self, excClass, callableObj, *args, **kwargs):
1465        """Assert that a callable raises a particular exception.
1466
1467        :param excClass: As for the except statement, this may be either an
1468            exception class, or a tuple of classes.
1469        :param callableObj: A callable, will be passed ``*args`` and
1470            ``**kwargs``.
1471
1472        Returns the exception so that you can examine it.
1473        """
1474        try:
1475            callableObj(*args, **kwargs)
1476        except excClass as e:
1477            return e
1478        else:
1479            if getattr(excClass, '__name__', None) is not None:
1480                excName = excClass.__name__
1481            else:
1482                # probably a tuple
1483                excName = str(excClass)
1484            raise self.failureException("%s not raised" % excName)
1485
1486    def assertIs(self, left, right, message=None):
1487        if not (left is right):
1488            if message is not None:
1489                raise AssertionError(message)
1490            else:
1491                raise AssertionError("%r is not %r." % (left, right))
1492
1493    def assertIsNot(self, left, right, message=None):
1494        if (left is right):
1495            if message is not None:
1496                raise AssertionError(message)
1497            else:
1498                raise AssertionError("%r is %r." % (left, right))
1499
1500    def assertTransportMode(self, transport, path, mode):
1501        """Fail if a path does not have mode "mode".
1502
1503        If modes are not supported on this transport, the assertion is ignored.
1504        """
1505        if not transport._can_roundtrip_unix_modebits():
1506            return
1507        path_stat = transport.stat(path)
1508        actual_mode = stat.S_IMODE(path_stat.st_mode)
1509        self.assertEqual(mode, actual_mode,
1510                         'mode of %r incorrect (%s != %s)'
1511                         % (path, oct(mode), oct(actual_mode)))
1512
1513    def assertIsSameRealPath(self, path1, path2):
1514        """Fail if path1 and path2 points to different files"""
1515        self.assertEqual(osutils.realpath(path1),
1516                         osutils.realpath(path2),
1517                         "apparent paths:\na = %s\nb = %s\n," % (path1, path2))
1518
1519    def assertIsInstance(self, obj, kls, msg=None):
1520        """Fail if obj is not an instance of kls
1521
1522        :param msg: Supplementary message to show if the assertion fails.
1523        """
1524        if not isinstance(obj, kls):
1525            m = "%r is an instance of %s rather than %s" % (
1526                obj, obj.__class__, kls)
1527            if msg:
1528                m += ": " + msg
1529            self.fail(m)
1530
1531    def assertFileEqual(self, content, path):
1532        """Fail if path does not contain 'content'."""
1533        self.assertPathExists(path)
1534
1535        mode = 'r' + ('b' if isinstance(content, bytes) else '')
1536        with open(path, mode) as f:
1537            s = f.read()
1538        self.assertEqualDiff(content, s)
1539
1540    def assertDocstring(self, expected_docstring, obj):
1541        """Fail if obj does not have expected_docstring"""
1542        if __doc__ is None:
1543            # With -OO the docstring should be None instead
1544            self.assertIs(obj.__doc__, None)
1545        else:
1546            self.assertEqual(expected_docstring, obj.__doc__)
1547
1548    def assertPathExists(self, path):
1549        """Fail unless path or paths, which may be abs or relative, exist."""
1550        # TODO(jelmer): Clean this up for pad.lv/1696545
1551        if not isinstance(path, (bytes, str)):
1552            for p in path:
1553                self.assertPathExists(p)
1554        else:
1555            self.assertTrue(osutils.lexists(path),
1556                            path + " does not exist")
1557
1558    def assertPathDoesNotExist(self, path):
1559        """Fail if path or paths, which may be abs or relative, exist."""
1560        if not isinstance(path, (str, str)):
1561            for p in path:
1562                self.assertPathDoesNotExist(p)
1563        else:
1564            self.assertFalse(osutils.lexists(path),
1565                             path + " exists")
1566
1567    def _capture_deprecation_warnings(self, a_callable, *args, **kwargs):
1568        """A helper for callDeprecated and applyDeprecated.
1569
1570        :param a_callable: A callable to call.
1571        :param args: The positional arguments for the callable
1572        :param kwargs: The keyword arguments for the callable
1573        :return: A tuple (warnings, result). result is the result of calling
1574            a_callable(``*args``, ``**kwargs``).
1575        """
1576        local_warnings = []
1577
1578        def capture_warnings(msg, cls=None, stacklevel=None):
1579            # we've hooked into a deprecation specific callpath,
1580            # only deprecations should getting sent via it.
1581            self.assertEqual(cls, DeprecationWarning)
1582            local_warnings.append(msg)
1583        original_warning_method = symbol_versioning.warn
1584        symbol_versioning.set_warning_method(capture_warnings)
1585        try:
1586            result = a_callable(*args, **kwargs)
1587        finally:
1588            symbol_versioning.set_warning_method(original_warning_method)
1589        return (local_warnings, result)
1590
1591    def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
1592        """Call a deprecated callable without warning the user.
1593
1594        Note that this only captures warnings raised by symbol_versioning.warn,
1595        not other callers that go direct to the warning module.
1596
1597        To test that a deprecated method raises an error, do something like
1598        this (remember that both assertRaises and applyDeprecated delays *args
1599        and **kwargs passing)::
1600
1601            self.assertRaises(errors.ReservedId,
1602                self.applyDeprecated,
1603                deprecated_in((1, 5, 0)),
1604                br.append_revision,
1605                'current:')
1606
1607        :param deprecation_format: The deprecation format that the callable
1608            should have been deprecated with. This is the same type as the
1609            parameter to deprecated_method/deprecated_function. If the
1610            callable is not deprecated with this format, an assertion error
1611            will be raised.
1612        :param a_callable: A callable to call. This may be a bound method or
1613            a regular function. It will be called with ``*args`` and
1614            ``**kwargs``.
1615        :param args: The positional arguments for the callable
1616        :param kwargs: The keyword arguments for the callable
1617        :return: The result of a_callable(``*args``, ``**kwargs``)
1618        """
1619        call_warnings, result = self._capture_deprecation_warnings(
1620            a_callable, *args, **kwargs)
1621        expected_first_warning = symbol_versioning.deprecation_string(
1622            a_callable, deprecation_format)
1623        if len(call_warnings) == 0:
1624            self.fail("No deprecation warning generated by call to %s" %
1625                      a_callable)
1626        self.assertEqual(expected_first_warning, call_warnings[0])
1627        return result
1628
1629    def callCatchWarnings(self, fn, *args, **kw):
1630        """Call a callable that raises python warnings.
1631
1632        The caller's responsible for examining the returned warnings.
1633
1634        If the callable raises an exception, the exception is not
1635        caught and propagates up to the caller.  In that case, the list
1636        of warnings is not available.
1637
1638        :returns: ([warning_object, ...], fn_result)
1639        """
1640        # XXX: This is not perfect, because it completely overrides the
1641        # warnings filters, and some code may depend on suppressing particular
1642        # warnings.  It's the easiest way to insulate ourselves from -Werror,
1643        # though.  -- Andrew, 20071062
1644        wlist = []
1645
1646        def _catcher(message, category, filename, lineno, file=None, line=None):
1647            # despite the name, 'message' is normally(?) a Warning subclass
1648            # instance
1649            wlist.append(message)
1650        saved_showwarning = warnings.showwarning
1651        saved_filters = warnings.filters
1652        try:
1653            warnings.showwarning = _catcher
1654            warnings.filters = []
1655            result = fn(*args, **kw)
1656        finally:
1657            warnings.showwarning = saved_showwarning
1658            warnings.filters = saved_filters
1659        return wlist, result
1660
1661    def callDeprecated(self, expected, callable, *args, **kwargs):
1662        """Assert that a callable is deprecated in a particular way.
1663
1664        This is a very precise test for unusual requirements. The
1665        applyDeprecated helper function is probably more suited for most tests
1666        as it allows you to simply specify the deprecation format being used
1667        and will ensure that that is issued for the function being called.
1668
1669        Note that this only captures warnings raised by symbol_versioning.warn,
1670        not other callers that go direct to the warning module.  To catch
1671        general warnings, use callCatchWarnings.
1672
1673        :param expected: a list of the deprecation warnings expected, in order
1674        :param callable: The callable to call
1675        :param args: The positional arguments for the callable
1676        :param kwargs: The keyword arguments for the callable
1677        """
1678        call_warnings, result = self._capture_deprecation_warnings(
1679            callable, *args, **kwargs)
1680        self.assertEqual(expected, call_warnings)
1681        return result
1682
1683    def _startLogFile(self):
1684        """Setup a in-memory target for bzr and testcase log messages"""
1685        pseudo_log_file = BytesIO()
1686
1687        def _get_log_contents_for_weird_testtools_api():
1688            return [pseudo_log_file.getvalue().decode(
1689                "utf-8", "replace").encode("utf-8")]
1690        self.addDetail(
1691            "log", content.Content(
1692                content.ContentType("text", "plain", {"charset": "utf8"}),
1693                _get_log_contents_for_weird_testtools_api))
1694        self._log_file = pseudo_log_file
1695        self._log_memento = trace.push_log_file(self._log_file)
1696        self.addCleanup(self._finishLogFile)
1697
1698    def _finishLogFile(self):
1699        """Flush and dereference the in-memory log for this testcase"""
1700        if trace._trace_file:
1701            # flush the log file, to get all content
1702            trace._trace_file.flush()
1703        trace.pop_log_file(self._log_memento)
1704        # The logging module now tracks references for cleanup so discard ours
1705        del self._log_memento
1706
1707    def thisFailsStrictLockCheck(self):
1708        """It is known that this test would fail with -Dstrict_locks.
1709
1710        By default, all tests are run with strict lock checking unless
1711        -Edisable_lock_checks is supplied. However there are some tests which
1712        we know fail strict locks at this point that have not been fixed.
1713        They should call this function to disable the strict checking.
1714
1715        This should be used sparingly, it is much better to fix the locking
1716        issues rather than papering over the problem by calling this function.
1717        """
1718        debug.debug_flags.discard('strict_locks')
1719
1720    def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1721        """Overrides an object attribute restoring it after the test.
1722
1723        :note: This should be used with discretion; you should think about
1724        whether it's better to make the code testable without monkey-patching.
1725
1726        :param obj: The object that will be mutated.
1727
1728        :param attr_name: The attribute name we want to preserve/override in
1729            the object.
1730
1731        :param new: The optional value we want to set the attribute to.
1732
1733        :returns: The actual attr value.
1734        """
1735        # The actual value is captured by the call below
1736        value = getattr(obj, attr_name, _unitialized_attr)
1737        if value is _unitialized_attr:
1738            # When the test completes, the attribute should not exist, but if
1739            # we aren't setting a value, we don't need to do anything.
1740            if new is not _unitialized_attr:
1741                self.addCleanup(delattr, obj, attr_name)
1742        else:
1743            self.addCleanup(setattr, obj, attr_name, value)
1744        if new is not _unitialized_attr:
1745            setattr(obj, attr_name, new)
1746        return value
1747
1748    def overrideEnv(self, name, new):
1749        """Set an environment variable, and reset it after the test.
1750
1751        :param name: The environment variable name.
1752
1753        :param new: The value to set the variable to. If None, the
1754            variable is deleted from the environment.
1755
1756        :returns: The actual variable value.
1757        """
1758        value = osutils.set_or_unset_env(name, new)
1759        self.addCleanup(osutils.set_or_unset_env, name, value)
1760        return value
1761
1762    def recordCalls(self, obj, attr_name):
1763        """Monkeypatch in a wrapper that will record calls.
1764
1765        The monkeypatch is automatically removed when the test concludes.
1766
1767        :param obj: The namespace holding the reference to be replaced;
1768            typically a module, class, or object.
1769        :param attr_name: A string for the name of the attribute to patch.
1770        :returns: A list that will be extended with one item every time the
1771            function is called, with a tuple of (args, kwargs).
1772        """
1773        calls = []
1774
1775        def decorator(*args, **kwargs):
1776            calls.append((args, kwargs))
1777            return orig(*args, **kwargs)
1778        orig = self.overrideAttr(obj, attr_name, decorator)
1779        return calls
1780
1781    def _cleanEnvironment(self):
1782        for name, value in isolated_environ.items():
1783            self.overrideEnv(name, value)
1784
1785    def _restoreHooks(self):
1786        for klass, (name, hooks) in self._preserved_hooks.items():
1787            setattr(klass, name, hooks)
1788        self._preserved_hooks.clear()
1789        breezy.hooks._lazy_hooks = self._preserved_lazy_hooks
1790        self._preserved_lazy_hooks.clear()
1791
1792    def knownFailure(self, reason):
1793        """Declare that this test fails for a known reason
1794
1795        Tests that are known to fail should generally be using expectedFailure
1796        with an appropriate reverse assertion if a change could cause the test
1797        to start passing. Conversely if the test has no immediate prospect of
1798        succeeding then using skip is more suitable.
1799
1800        When this method is called while an exception is being handled, that
1801        traceback will be used, otherwise a new exception will be thrown to
1802        provide one but won't be reported.
1803        """
1804        self._add_reason(reason)
1805        try:
1806            exc_info = sys.exc_info()
1807            if exc_info != (None, None, None):
1808                self._report_traceback(exc_info)
1809            else:
1810                try:
1811                    raise self.failureException(reason)
1812                except self.failureException:
1813                    exc_info = sys.exc_info()
1814            # GZ 02-08-2011: Maybe cleanup this err.exc_info attribute too?
1815            raise testtools.testcase._ExpectedFailure(exc_info)
1816        finally:
1817            del exc_info
1818
1819    def _suppress_log(self):
1820        """Remove the log info from details."""
1821        self.discardDetail('log')
1822
1823    def _do_skip(self, result, reason):
1824        self._suppress_log()
1825        addSkip = getattr(result, 'addSkip', None)
1826        if not callable(addSkip):
1827            result.addSuccess(result)
1828        else:
1829            addSkip(self, str(reason))
1830
1831    @staticmethod
1832    def _do_known_failure(self, result, e):
1833        self._suppress_log()
1834        err = sys.exc_info()
1835        addExpectedFailure = getattr(result, 'addExpectedFailure', None)
1836        if addExpectedFailure is not None:
1837            addExpectedFailure(self, err)
1838        else:
1839            result.addSuccess(self)
1840
1841    @staticmethod
1842    def _do_not_applicable(self, result, e):
1843        if not e.args:
1844            reason = 'No reason given'
1845        else:
1846            reason = e.args[0]
1847        self._suppress_log()
1848        addNotApplicable = getattr(result, 'addNotApplicable', None)
1849        if addNotApplicable is not None:
1850            result.addNotApplicable(self, reason)
1851        else:
1852            self._do_skip(result, reason)
1853
1854    @staticmethod
1855    def _report_skip(self, result, err):
1856        """Override the default _report_skip.
1857
1858        We want to strip the 'log' detail. If we waint until _do_skip, it has
1859        already been formatted into the 'reason' string, and we can't pull it
1860        out again.
1861        """
1862        self._suppress_log()
1863        super(TestCase, self)._report_skip(self, result, err)
1864
1865    @staticmethod
1866    def _report_expected_failure(self, result, err):
1867        """Strip the log.
1868
1869        See _report_skip for motivation.
1870        """
1871        self._suppress_log()
1872        super(TestCase, self)._report_expected_failure(self, result, err)
1873
1874    @staticmethod
1875    def _do_unsupported_or_skip(self, result, e):
1876        reason = e.args[0]
1877        self._suppress_log()
1878        addNotSupported = getattr(result, 'addNotSupported', None)
1879        if addNotSupported is not None:
1880            result.addNotSupported(self, reason)
1881        else:
1882            self._do_skip(result, reason)
1883
1884    def time(self, callable, *args, **kwargs):
1885        """Run callable and accrue the time it takes to the benchmark time.
1886
1887        If lsprofiling is enabled (i.e. by --lsprof-time to brz selftest) then
1888        this will cause lsprofile statistics to be gathered and stored in
1889        self._benchcalls.
1890        """
1891        if self._benchtime is None:
1892            self.addDetail('benchtime', content.Content(
1893                content.UTF8_TEXT,
1894                lambda: [str(self._benchtime).encode('utf-8')]))
1895            self._benchtime = 0
1896        start = time.time()
1897        try:
1898            if not self._gather_lsprof_in_benchmarks:
1899                return callable(*args, **kwargs)
1900            else:
1901                # record this benchmark
1902                ret, stats = breezy.lsprof.profile(callable, *args, **kwargs)
1903                stats.sort()
1904                self._benchcalls.append(((callable, args, kwargs), stats))
1905                return ret
1906        finally:
1907            self._benchtime += time.time() - start
1908
1909    def log(self, *args):
1910        trace.mutter(*args)
1911
1912    def get_log(self):
1913        """Get a unicode string containing the log from breezy.trace.
1914
1915        Undecodable characters are replaced.
1916        """
1917        return u"".join(self.getDetails()['log'].iter_text())
1918
1919    def requireFeature(self, feature):
1920        """This test requires a specific feature is available.
1921
1922        :raises UnavailableFeature: When feature is not available.
1923        """
1924        if not feature.available():
1925            raise UnavailableFeature(feature)
1926
1927    def _run_bzr_core(self, args, encoding, stdin, stdout, stderr,
1928                      working_dir):
1929        # Clear chk_map page cache, because the contents are likely to mask
1930        # locking errors.
1931        chk_map.clear_cache()
1932
1933        self.log('run brz: %r', args)
1934
1935        self._last_cmd_stdout = stdout
1936        self._last_cmd_stderr = stderr
1937
1938        old_ui_factory = ui.ui_factory
1939        ui.ui_factory = ui_testing.TestUIFactory(
1940            stdin=stdin,
1941            stdout=self._last_cmd_stdout,
1942            stderr=self._last_cmd_stderr)
1943
1944        cwd = None
1945        if working_dir is not None:
1946            cwd = osutils.getcwd()
1947            os.chdir(working_dir)
1948
1949        try:
1950            with ui.ui_factory:
1951                result = self.apply_redirected(
1952                    ui.ui_factory.stdin,
1953                    stdout, stderr,
1954                    _mod_commands.run_bzr_catch_user_errors,
1955                    args)
1956        finally:
1957            ui.ui_factory = old_ui_factory
1958            if cwd is not None:
1959                os.chdir(cwd)
1960
1961        return result
1962
1963    def run_bzr_raw(self, args, retcode=0, stdin=None, encoding=None,
1964                    working_dir=None, error_regexes=[]):
1965        """Invoke brz, as if it were run from the command line.
1966
1967        The argument list should not include the brz program name - the
1968        first argument is normally the brz command.  Arguments may be
1969        passed in three ways:
1970
1971        1- A list of strings, eg ["commit", "a"].  This is recommended
1972        when the command contains whitespace or metacharacters, or
1973        is built up at run time.
1974
1975        2- A single string, eg "add a".  This is the most convenient
1976        for hardcoded commands.
1977
1978        This runs brz through the interface that catches and reports
1979        errors, and with logging set to something approximating the
1980        default, so that error reporting can be checked.
1981
1982        This should be the main method for tests that want to exercise the
1983        overall behavior of the brz application (rather than a unit test
1984        or a functional test of the library.)
1985
1986        This sends the stdout/stderr results into the test's log,
1987        where it may be useful for debugging.  See also run_captured.
1988
1989        :keyword stdin: A string to be used as stdin for the command.
1990        :keyword retcode: The status code the command should return;
1991            default 0.
1992        :keyword working_dir: The directory to run the command in
1993        :keyword error_regexes: A list of expected error messages.  If
1994            specified they must be seen in the error output of the command.
1995        """
1996        if isinstance(args, str):
1997            args = shlex.split(args)
1998
1999        if encoding is None:
2000            encoding = osutils.get_user_encoding()
2001
2002        stdout = BytesIO()
2003        stderr = BytesIO()
2004        wrapped_stdout = TextIOWrapper(stdout, encoding)
2005        wrapped_stderr = TextIOWrapper(stderr, encoding)
2006        handler = logging.StreamHandler(wrapped_stderr)
2007        handler.setLevel(logging.INFO)
2008
2009        logger = logging.getLogger('')
2010        logger.addHandler(handler)
2011        try:
2012            result = self._run_bzr_core(
2013                args, encoding=encoding, stdin=stdin, stdout=wrapped_stdout,
2014                stderr=wrapped_stderr, working_dir=working_dir,
2015                )
2016        finally:
2017            logger.removeHandler(handler)
2018
2019        wrapped_stdout.flush()
2020        wrapped_stderr.flush()
2021
2022        out = stdout.getvalue()
2023        err = stderr.getvalue()
2024        if out:
2025            self.log('output:\n%r', out)
2026        if err:
2027            self.log('errors:\n%r', err)
2028        if retcode is not None:
2029            self.assertEqual(retcode, result,
2030                             message='Unexpected return code')
2031        self.assertIsInstance(error_regexes, (list, tuple))
2032        for regex in error_regexes:
2033            self.assertContainsRe(err, regex)
2034        return out, err
2035
2036    def run_bzr(self, args, retcode=0, stdin=None, encoding=None,
2037                working_dir=None, error_regexes=[]):
2038        """Invoke brz, as if it were run from the command line.
2039
2040        The argument list should not include the brz program name - the
2041        first argument is normally the brz command.  Arguments may be
2042        passed in three ways:
2043
2044        1- A list of strings, eg ["commit", "a"].  This is recommended
2045        when the command contains whitespace or metacharacters, or
2046        is built up at run time.
2047
2048        2- A single string, eg "add a".  This is the most convenient
2049        for hardcoded commands.
2050
2051        This runs brz through the interface that catches and reports
2052        errors, and with logging set to something approximating the
2053        default, so that error reporting can be checked.
2054
2055        This should be the main method for tests that want to exercise the
2056        overall behavior of the brz application (rather than a unit test
2057        or a functional test of the library.)
2058
2059        This sends the stdout/stderr results into the test's log,
2060        where it may be useful for debugging.  See also run_captured.
2061
2062        :keyword stdin: A string to be used as stdin for the command.
2063        :keyword retcode: The status code the command should return;
2064            default 0.
2065        :keyword working_dir: The directory to run the command in
2066        :keyword error_regexes: A list of expected error messages.  If
2067            specified they must be seen in the error output of the command.
2068        """
2069        if isinstance(args, str):
2070            args = shlex.split(args)
2071
2072        if encoding is None:
2073            encoding = osutils.get_user_encoding()
2074
2075        stdout = ui_testing.StringIOWithEncoding()
2076        stderr = ui_testing.StringIOWithEncoding()
2077        stdout.encoding = stderr.encoding = encoding
2078        handler = logging.StreamHandler(stream=stderr)
2079        handler.setLevel(logging.INFO)
2080
2081        logger = logging.getLogger('')
2082        logger.addHandler(handler)
2083
2084        try:
2085            result = self._run_bzr_core(
2086                args, encoding=encoding, stdin=stdin, stdout=stdout,
2087                stderr=stderr, working_dir=working_dir)
2088        finally:
2089            logger.removeHandler(handler)
2090
2091        out = stdout.getvalue()
2092        err = stderr.getvalue()
2093        if out:
2094            self.log('output:\n%r', out)
2095        if err:
2096            self.log('errors:\n%r', err)
2097        if retcode is not None:
2098            self.assertEqual(retcode, result,
2099                             message='Unexpected return code')
2100        self.assertIsInstance(error_regexes, (list, tuple))
2101        for regex in error_regexes:
2102            self.assertContainsRe(err, regex)
2103        return out, err
2104
2105    def run_bzr_error(self, error_regexes, *args, **kwargs):
2106        """Run brz, and check that stderr contains the supplied regexes
2107
2108        :param error_regexes: Sequence of regular expressions which
2109            must each be found in the error output. The relative ordering
2110            is not enforced.
2111        :param args: command-line arguments for brz
2112        :param kwargs: Keyword arguments which are interpreted by run_brz
2113            This function changes the default value of retcode to be 3,
2114            since in most cases this is run when you expect brz to fail.
2115
2116        :return: (out, err) The actual output of running the command (in case
2117            you want to do more inspection)
2118
2119        Examples of use::
2120
2121            # Make sure that commit is failing because there is nothing to do
2122            self.run_bzr_error(['no changes to commit'],
2123                               ['commit', '-m', 'my commit comment'])
2124            # Make sure --strict is handling an unknown file, rather than
2125            # giving us the 'nothing to do' error
2126            self.build_tree(['unknown'])
2127            self.run_bzr_error(
2128                ['Commit refused because there are unknown files'],
2129                ['commit', --strict', '-m', 'my commit comment'])
2130        """
2131        kwargs.setdefault('retcode', 3)
2132        kwargs['error_regexes'] = error_regexes
2133        out, err = self.run_bzr(*args, **kwargs)
2134        return out, err
2135
2136    def run_bzr_subprocess(self, *args, **kwargs):
2137        """Run brz in a subprocess for testing.
2138
2139        This starts a new Python interpreter and runs brz in there.
2140        This should only be used for tests that have a justifiable need for
2141        this isolation: e.g. they are testing startup time, or signal
2142        handling, or early startup code, etc.  Subprocess code can't be
2143        profiled or debugged so easily.
2144
2145        :keyword retcode: The status code that is expected.  Defaults to 0.  If
2146            None is supplied, the status code is not checked.
2147        :keyword env_changes: A dictionary which lists changes to environment
2148            variables. A value of None will unset the env variable.
2149            The values must be strings. The change will only occur in the
2150            child, so you don't need to fix the environment after running.
2151        :keyword universal_newlines: Convert CRLF => LF
2152        :keyword allow_plugins: By default the subprocess is run with
2153            --no-plugins to ensure test reproducibility. Also, it is possible
2154            for system-wide plugins to create unexpected output on stderr,
2155            which can cause unnecessary test failures.
2156        """
2157        env_changes = kwargs.get('env_changes', {})
2158        working_dir = kwargs.get('working_dir', None)
2159        allow_plugins = kwargs.get('allow_plugins', False)
2160        if len(args) == 1:
2161            if isinstance(args[0], list):
2162                args = args[0]
2163            elif isinstance(args[0], str):
2164                args = list(shlex.split(args[0]))
2165        else:
2166            raise ValueError("passing varargs to run_bzr_subprocess")
2167        process = self.start_bzr_subprocess(args, env_changes=env_changes,
2168                                            working_dir=working_dir,
2169                                            allow_plugins=allow_plugins)
2170        # We distinguish between retcode=None and retcode not passed.
2171        supplied_retcode = kwargs.get('retcode', 0)
2172        return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
2173                                          universal_newlines=kwargs.get(
2174                                              'universal_newlines', False),
2175                                          process_args=args)
2176
2177    def start_bzr_subprocess(self, process_args, env_changes=None,
2178                             skip_if_plan_to_signal=False,
2179                             working_dir=None,
2180                             allow_plugins=False, stderr=subprocess.PIPE):
2181        """Start brz in a subprocess for testing.
2182
2183        This starts a new Python interpreter and runs brz in there.
2184        This should only be used for tests that have a justifiable need for
2185        this isolation: e.g. they are testing startup time, or signal
2186        handling, or early startup code, etc.  Subprocess code can't be
2187        profiled or debugged so easily.
2188
2189        :param process_args: a list of arguments to pass to the brz executable,
2190            for example ``['--version']``.
2191        :param env_changes: A dictionary which lists changes to environment
2192            variables. A value of None will unset the env variable.
2193            The values must be strings. The change will only occur in the
2194            child, so you don't need to fix the environment after running.
2195        :param skip_if_plan_to_signal: raise TestSkipped when true and system
2196            doesn't support signalling subprocesses.
2197        :param allow_plugins: If False (default) pass --no-plugins to brz.
2198        :param stderr: file to use for the subprocess's stderr.  Valid values
2199            are those valid for the stderr argument of `subprocess.Popen`.
2200            Default value is ``subprocess.PIPE``.
2201
2202        :returns: Popen object for the started process.
2203        """
2204        if skip_if_plan_to_signal:
2205            if os.name != "posix":
2206                raise TestSkipped("Sending signals not supported")
2207
2208        if env_changes is None:
2209            env_changes = {}
2210        # Because $HOME is set to a tempdir for the context of a test, modules
2211        # installed in the user dir will not be found unless $PYTHONUSERBASE
2212        # gets set to the computed directory of this parent process.
2213        if site.USER_BASE is not None:
2214            env_changes["PYTHONUSERBASE"] = site.USER_BASE
2215        old_env = {}
2216
2217        def cleanup_environment():
2218            for env_var, value in env_changes.items():
2219                old_env[env_var] = osutils.set_or_unset_env(env_var, value)
2220
2221        def restore_environment():
2222            for env_var, value in old_env.items():
2223                osutils.set_or_unset_env(env_var, value)
2224
2225        bzr_path = self.get_brz_path()
2226
2227        cwd = None
2228        if working_dir is not None:
2229            cwd = osutils.getcwd()
2230            os.chdir(working_dir)
2231
2232        try:
2233            # win32 subprocess doesn't support preexec_fn
2234            # so we will avoid using it on all platforms, just to
2235            # make sure the code path is used, and we don't break on win32
2236            cleanup_environment()
2237            # Include the subprocess's log file in the test details, in case
2238            # the test fails due to an error in the subprocess.
2239            self._add_subprocess_log(trace._get_brz_log_filename())
2240            command = [sys.executable]
2241            # frozen executables don't need the path to bzr
2242            if getattr(sys, "frozen", None) is None:
2243                command.append(bzr_path)
2244            if not allow_plugins:
2245                command.append('--no-plugins')
2246            command.extend(process_args)
2247            process = self._popen(command, stdin=subprocess.PIPE,
2248                                  stdout=subprocess.PIPE,
2249                                  stderr=stderr, bufsize=0)
2250        finally:
2251            restore_environment()
2252            if cwd is not None:
2253                os.chdir(cwd)
2254
2255        return process
2256
2257    def _add_subprocess_log(self, log_file_path):
2258        if len(self._log_files) == 0:
2259            # Register an addCleanup func.  We do this on the first call to
2260            # _add_subprocess_log rather than in TestCase.setUp so that this
2261            # addCleanup is registered after any cleanups for tempdirs that
2262            # subclasses might create, which will probably remove the log file
2263            # we want to read.
2264            self.addCleanup(self._subprocess_log_cleanup)
2265        # self._log_files is a set, so if a log file is reused we won't grab it
2266        # twice.
2267        self._log_files.add(log_file_path)
2268
2269    def _subprocess_log_cleanup(self):
2270        for count, log_file_path in enumerate(self._log_files):
2271            # We use buffer_now=True to avoid holding the file open beyond
2272            # the life of this function, which might interfere with e.g.
2273            # cleaning tempdirs on Windows.
2274            # XXX: Testtools 0.9.5 doesn't have the content_from_file helper
2275            # detail_content = content.content_from_file(
2276            #    log_file_path, buffer_now=True)
2277            with open(log_file_path, 'rb') as log_file:
2278                log_file_bytes = log_file.read()
2279            detail_content = content.Content(
2280                content.ContentType("text", "plain", {"charset": "utf8"}),
2281                lambda: [log_file_bytes])
2282            self.addDetail("start_bzr_subprocess-log-%d" % (count,),
2283                           detail_content)
2284
2285    def _popen(self, *args, **kwargs):
2286        """Place a call to Popen.
2287
2288        Allows tests to override this method to intercept the calls made to
2289        Popen for introspection.
2290        """
2291        return subprocess.Popen(*args, **kwargs)
2292
2293    def get_source_path(self):
2294        """Return the path of the directory containing breezy."""
2295        return os.path.dirname(os.path.dirname(breezy.__file__))
2296
2297    def get_brz_path(self):
2298        """Return the path of the 'brz' executable for this test suite."""
2299        brz_path = os.path.join(self.get_source_path(), "brz")
2300        if not os.path.isfile(brz_path):
2301            # We are probably installed. Assume sys.argv is the right file
2302            brz_path = sys.argv[0]
2303        return brz_path
2304
2305    def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
2306                              universal_newlines=False, process_args=None):
2307        """Finish the execution of process.
2308
2309        :param process: the Popen object returned from start_bzr_subprocess.
2310        :param retcode: The status code that is expected.  Defaults to 0.  If
2311            None is supplied, the status code is not checked.
2312        :param send_signal: an optional signal to send to the process.
2313        :param universal_newlines: Convert CRLF => LF
2314        :returns: (stdout, stderr)
2315        """
2316        if send_signal is not None:
2317            os.kill(process.pid, send_signal)
2318        out, err = process.communicate()
2319
2320        if universal_newlines:
2321            out = out.replace(b'\r\n', b'\n')
2322            err = err.replace(b'\r\n', b'\n')
2323
2324        if retcode is not None and retcode != process.returncode:
2325            if process_args is None:
2326                process_args = "(unknown args)"
2327            trace.mutter('Output of brz %r:\n%s', process_args, out)
2328            trace.mutter('Error for brz %r:\n%s', process_args, err)
2329            self.fail('Command brz %r failed with retcode %d != %d'
2330                      % (process_args, retcode, process.returncode))
2331        return [out, err]
2332
2333    def check_tree_shape(self, tree, shape):
2334        """Compare a tree to a list of expected names.
2335
2336        Fail if they are not precisely equal.
2337        """
2338        extras = []
2339        shape = list(shape)             # copy
2340        for path, ie in tree.iter_entries_by_dir():
2341            name = path.replace('\\', '/')
2342            if ie.kind == 'directory':
2343                name = name + '/'
2344            if name == "/":
2345                pass  # ignore root entry
2346            elif name in shape:
2347                shape.remove(name)
2348            else:
2349                extras.append(name)
2350        if shape:
2351            self.fail("expected paths not found in inventory: %r" % shape)
2352        if extras:
2353            self.fail("unexpected paths found in inventory: %r" % extras)
2354
2355    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
2356                         a_callable=None, *args, **kwargs):
2357        """Call callable with redirected std io pipes.
2358
2359        Returns the return code."""
2360        if not callable(a_callable):
2361            raise ValueError("a_callable must be callable.")
2362        if stdin is None:
2363            stdin = BytesIO(b"")
2364        if stdout is None:
2365            if getattr(self, "_log_file", None) is not None:
2366                stdout = self._log_file
2367            else:
2368                stdout = StringIO()
2369        if stderr is None:
2370            if getattr(self, "_log_file", None is not None):
2371                stderr = self._log_file
2372            else:
2373                stderr = StringIO()
2374        real_stdin = sys.stdin
2375        real_stdout = sys.stdout
2376        real_stderr = sys.stderr
2377        try:
2378            sys.stdout = stdout
2379            sys.stderr = stderr
2380            sys.stdin = stdin
2381            return a_callable(*args, **kwargs)
2382        finally:
2383            sys.stdout = real_stdout
2384            sys.stderr = real_stderr
2385            sys.stdin = real_stdin
2386
2387    def reduceLockdirTimeout(self):
2388        """Reduce the default lock timeout for the duration of the test, so that
2389        if LockContention occurs during a test, it does so quickly.
2390
2391        Tests that expect to provoke LockContention errors should call this.
2392        """
2393        self.overrideAttr(lockdir, '_DEFAULT_TIMEOUT_SECONDS', 0)
2394
2395    def make_utf8_encoded_stringio(self, encoding_type=None):
2396        """Return a wrapped BytesIO, that will encode text input to UTF-8."""
2397        if encoding_type is None:
2398            encoding_type = 'strict'
2399        bio = BytesIO()
2400        output_encoding = 'utf-8'
2401        sio = codecs.getwriter(output_encoding)(bio, errors=encoding_type)
2402        sio.encoding = output_encoding
2403        return sio
2404
2405    def disable_verb(self, verb):
2406        """Disable a smart server verb for one test."""
2407        from breezy.bzr.smart import request
2408        request_handlers = request.request_handlers
2409        orig_method = request_handlers.get(verb)
2410        orig_info = request_handlers.get_info(verb)
2411        request_handlers.remove(verb)
2412        self.addCleanup(request_handlers.register, verb, orig_method,
2413                        info=orig_info)
2414
2415    def __hash__(self):
2416        return id(self)
2417
2418
2419class CapturedCall(object):
2420    """A helper for capturing smart server calls for easy debug analysis."""
2421
2422    def __init__(self, params, prefix_length):
2423        """Capture the call with params and skip prefix_length stack frames."""
2424        self.call = params
2425        import traceback
2426        # The last 5 frames are the __init__, the hook frame, and 3 smart
2427        # client frames. Beyond this we could get more clever, but this is good
2428        # enough for now.
2429        stack = traceback.extract_stack()[prefix_length:-5]
2430        self.stack = ''.join(traceback.format_list(stack))
2431
2432    def __str__(self):
2433        return self.call.method.decode('utf-8')
2434
2435    def __repr__(self):
2436        return self.call.method.decode('utf-8')
2437
2438    def stack(self):
2439        return self.stack
2440
2441
2442class TestCaseWithMemoryTransport(TestCase):
2443    """Common test class for tests that do not need disk resources.
2444
2445    Tests that need disk resources should derive from TestCaseInTempDir
2446    orTestCaseWithTransport.
2447
2448    TestCaseWithMemoryTransport sets the TEST_ROOT variable for all brz tests.
2449
2450    For TestCaseWithMemoryTransport the ``test_home_dir`` is set to the name of
2451    a directory which does not exist. This serves to help ensure test isolation
2452    is preserved. ``test_dir`` is set to the TEST_ROOT, as is cwd, because they
2453    must exist. However, TestCaseWithMemoryTransport does not offer local file
2454    defaults for the transport in tests, nor does it obey the command line
2455    override, so tests that accidentally write to the common directory should
2456    be rare.
2457
2458    :cvar TEST_ROOT: Directory containing all temporary directories, plus a
2459        ``.bzr`` directory that stops us ascending higher into the filesystem.
2460    """
2461
2462    TEST_ROOT = None
2463    _TEST_NAME = 'test'
2464
2465    def __init__(self, methodName='runTest'):
2466        # allow test parameterization after test construction and before test
2467        # execution. Variables that the parameterizer sets need to be
2468        # ones that are not set by setUp, or setUp will trash them.
2469        super(TestCaseWithMemoryTransport, self).__init__(methodName)
2470        self.vfs_transport_factory = default_transport
2471        self.transport_server = None
2472        self.transport_readonly_server = None
2473        self.__vfs_server = None
2474
2475    def setUp(self):
2476        super(TestCaseWithMemoryTransport, self).setUp()
2477
2478        def _add_disconnect_cleanup(transport):
2479            """Schedule disconnection of given transport at test cleanup
2480
2481            This needs to happen for all connected transports or leaks occur.
2482
2483            Note reconnections may mean we call disconnect multiple times per
2484            transport which is suboptimal but seems harmless.
2485            """
2486            self.addCleanup(transport.disconnect)
2487
2488        _mod_transport.Transport.hooks.install_named_hook(
2489            'post_connect', _add_disconnect_cleanup, None)
2490
2491        self._make_test_root()
2492        self.addCleanup(os.chdir, osutils.getcwd())
2493        self.makeAndChdirToTestDir()
2494        self.overrideEnvironmentForTesting()
2495        self.__readonly_server = None
2496        self.__server = None
2497        self.reduceLockdirTimeout()
2498        # Each test may use its own config files even if the local config files
2499        # don't actually exist. They'll rightly fail if they try to create them
2500        # though.
2501        self.overrideAttr(config, '_shared_stores', {})
2502
2503    def get_transport(self, relpath=None):
2504        """Return a writeable transport.
2505
2506        This transport is for the test scratch space relative to
2507        "self._test_root"
2508
2509        :param relpath: a path relative to the base url.
2510        """
2511        t = _mod_transport.get_transport_from_url(self.get_url(relpath))
2512        self.assertFalse(t.is_readonly())
2513        return t
2514
2515    def get_readonly_transport(self, relpath=None):
2516        """Return a readonly transport for the test scratch space
2517
2518        This can be used to test that operations which should only need
2519        readonly access in fact do not try to write.
2520
2521        :param relpath: a path relative to the base url.
2522        """
2523        t = _mod_transport.get_transport_from_url(
2524            self.get_readonly_url(relpath))
2525        self.assertTrue(t.is_readonly())
2526        return t
2527
2528    def create_transport_readonly_server(self):
2529        """Create a transport server from class defined at init.
2530
2531        This is mostly a hook for daughter classes.
2532        """
2533        return self.transport_readonly_server()
2534
2535    def get_readonly_server(self):
2536        """Get the server instance for the readonly transport
2537
2538        This is useful for some tests with specific servers to do diagnostics.
2539        """
2540        if self.__readonly_server is None:
2541            if self.transport_readonly_server is None:
2542                # readonly decorator requested
2543                self.__readonly_server = test_server.ReadonlyServer()
2544            else:
2545                # explicit readonly transport.
2546                self.__readonly_server = (
2547                    self.create_transport_readonly_server())
2548            self.start_server(self.__readonly_server,
2549                              self.get_vfs_only_server())
2550        return self.__readonly_server
2551
2552    def get_readonly_url(self, relpath=None):
2553        """Get a URL for the readonly transport.
2554
2555        This will either be backed by '.' or a decorator to the transport
2556        used by self.get_url()
2557        relpath provides for clients to get a path relative to the base url.
2558        These should only be downwards relative, not upwards.
2559        """
2560        base = self.get_readonly_server().get_url()
2561        return self._adjust_url(base, relpath)
2562
2563    def get_vfs_only_server(self):
2564        """Get the vfs only read/write server instance.
2565
2566        This is useful for some tests with specific servers that need
2567        diagnostics.
2568
2569        For TestCaseWithMemoryTransport this is always a MemoryServer, and there
2570        is no means to override it.
2571        """
2572        if self.__vfs_server is None:
2573            self.__vfs_server = memory.MemoryServer()
2574            self.start_server(self.__vfs_server)
2575        return self.__vfs_server
2576
2577    def get_server(self):
2578        """Get the read/write server instance.
2579
2580        This is useful for some tests with specific servers that need
2581        diagnostics.
2582
2583        This is built from the self.transport_server factory. If that is None,
2584        then the self.get_vfs_server is returned.
2585        """
2586        if self.__server is None:
2587            if (self.transport_server is None or self.transport_server is
2588                    self.vfs_transport_factory):
2589                self.__server = self.get_vfs_only_server()
2590            else:
2591                # bring up a decorated means of access to the vfs only server.
2592                self.__server = self.transport_server()
2593                self.start_server(self.__server, self.get_vfs_only_server())
2594        return self.__server
2595
2596    def _adjust_url(self, base, relpath):
2597        """Get a URL (or maybe a path) for the readwrite transport.
2598
2599        This will either be backed by '.' or to an equivalent non-file based
2600        facility.
2601        relpath provides for clients to get a path relative to the base url.
2602        These should only be downwards relative, not upwards.
2603        """
2604        if relpath is not None and relpath != '.':
2605            if not base.endswith('/'):
2606                base = base + '/'
2607            # XXX: Really base should be a url; we did after all call
2608            # get_url()!  But sometimes it's just a path (from
2609            # LocalAbspathServer), and it'd be wrong to append urlescaped data
2610            # to a non-escaped local path.
2611            if base.startswith('./') or base.startswith('/'):
2612                base += relpath
2613            else:
2614                base += urlutils.escape(relpath)
2615        return base
2616
2617    def get_url(self, relpath=None):
2618        """Get a URL (or maybe a path) for the readwrite transport.
2619
2620        This will either be backed by '.' or to an equivalent non-file based
2621        facility.
2622        relpath provides for clients to get a path relative to the base url.
2623        These should only be downwards relative, not upwards.
2624        """
2625        base = self.get_server().get_url()
2626        return self._adjust_url(base, relpath)
2627
2628    def get_vfs_only_url(self, relpath=None):
2629        """Get a URL (or maybe a path for the plain old vfs transport.
2630
2631        This will never be a smart protocol.  It always has all the
2632        capabilities of the local filesystem, but it might actually be a
2633        MemoryTransport or some other similar virtual filesystem.
2634
2635        This is the backing transport (if any) of the server returned by
2636        get_url and get_readonly_url.
2637
2638        :param relpath: provides for clients to get a path relative to the base
2639            url.  These should only be downwards relative, not upwards.
2640        :return: A URL
2641        """
2642        base = self.get_vfs_only_server().get_url()
2643        return self._adjust_url(base, relpath)
2644
2645    def _create_safety_net(self):
2646        """Make a fake bzr directory.
2647
2648        This prevents any tests propagating up onto the TEST_ROOT directory's
2649        real branch.
2650        """
2651        root = TestCaseWithMemoryTransport.TEST_ROOT
2652        try:
2653            # Make sure we get a readable and accessible home for brz.log
2654            # and/or config files, and not fallback to weird defaults (see
2655            # http://pad.lv/825027).
2656            self.assertIs(None, os.environ.get('BRZ_HOME', None))
2657            os.environ['BRZ_HOME'] = root
2658            from breezy.bzr.bzrdir import BzrDirMetaFormat1
2659            wt = controldir.ControlDir.create_standalone_workingtree(
2660                root, format=BzrDirMetaFormat1())
2661            del os.environ['BRZ_HOME']
2662        except Exception as e:
2663            self.fail("Fail to initialize the safety net: %r\n" % (e,))
2664        # Hack for speed: remember the raw bytes of the dirstate file so that
2665        # we don't need to re-open the wt to check it hasn't changed.
2666        TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE = (
2667            wt.control_transport.get_bytes('dirstate'))
2668
2669    def _check_safety_net(self):
2670        """Check that the safety .bzr directory have not been touched.
2671
2672        _make_test_root have created a .bzr directory to prevent tests from
2673        propagating. This method ensures than a test did not leaked.
2674        """
2675        root = TestCaseWithMemoryTransport.TEST_ROOT
2676        t = _mod_transport.get_transport_from_path(root)
2677        self.permit_url(t.base)
2678        if (t.get_bytes('.bzr/checkout/dirstate') !=
2679                TestCaseWithMemoryTransport._SAFETY_NET_PRISTINE_DIRSTATE):
2680            # The current test have modified the /bzr directory, we need to
2681            # recreate a new one or all the followng tests will fail.
2682            # If you need to inspect its content uncomment the following line
2683            # import pdb; pdb.set_trace()
2684            _rmtree_temp_dir(root + '/.bzr', test_id=self.id())
2685            self._create_safety_net()
2686            raise AssertionError('%s/.bzr should not be modified' % root)
2687
2688    def _make_test_root(self):
2689        if TestCaseWithMemoryTransport.TEST_ROOT is None:
2690            # Watch out for tricky test dir (on OSX /tmp -> /private/tmp)
2691            root = osutils.realpath(osutils.mkdtemp(prefix='testbzr-',
2692                                                    suffix='.tmp'))
2693            TestCaseWithMemoryTransport.TEST_ROOT = root
2694
2695            self._create_safety_net()
2696
2697            # The same directory is used by all tests, and we're not
2698            # specifically told when all tests are finished.  This will do.
2699            atexit.register(_rmtree_temp_dir, root)
2700
2701        self.permit_dir(TestCaseWithMemoryTransport.TEST_ROOT)
2702        self.addCleanup(self._check_safety_net)
2703
2704    def makeAndChdirToTestDir(self):
2705        """Create a temporary directories for this one test.
2706
2707        This must set self.test_home_dir and self.test_dir and chdir to
2708        self.test_dir.
2709
2710        For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this
2711        test.
2712        """
2713        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
2714        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
2715        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
2716        self.permit_dir(self.test_dir)
2717
2718    def make_branch(self, relpath, format=None, name=None):
2719        """Create a branch on the transport at relpath."""
2720        repo = self.make_repository(relpath, format=format)
2721        return repo.controldir.create_branch(append_revisions_only=False, name=name)
2722
2723    def get_default_format(self):
2724        return 'default'
2725
2726    def resolve_format(self, format):
2727        """Resolve an object to a ControlDir format object.
2728
2729        The initial format object can either already be
2730        a ControlDirFormat, None (for the default format),
2731        or a string with the name of the control dir format.
2732
2733        :param format: Object to resolve
2734        :return A ControlDirFormat instance
2735        """
2736        if format is None:
2737            format = self.get_default_format()
2738        if isinstance(format, str):
2739            format = controldir.format_registry.make_controldir(format)
2740        return format
2741
2742    def make_controldir(self, relpath, format=None):
2743        try:
2744            # might be a relative or absolute path
2745            maybe_a_url = self.get_url(relpath)
2746            segments = maybe_a_url.rsplit('/', 1)
2747            t = _mod_transport.get_transport(maybe_a_url)
2748            if len(segments) > 1 and segments[-1] not in ('', '.'):
2749                t.ensure_base()
2750            format = self.resolve_format(format)
2751            return format.initialize_on_transport(t)
2752        except errors.UninitializableFormat:
2753            raise TestSkipped("Format %s is not initializable." % format)
2754
2755    def make_repository(self, relpath, shared=None, format=None):
2756        """Create a repository on our default transport at relpath.
2757
2758        Note that relpath must be a relative path, not a full url.
2759        """
2760        # FIXME: If you create a remoterepository this returns the underlying
2761        # real format, which is incorrect.  Actually we should make sure that
2762        # RemoteBzrDir returns a RemoteRepository.
2763        # maybe  mbp 20070410
2764        made_control = self.make_controldir(relpath, format=format)
2765        return made_control.create_repository(shared=shared)
2766
2767    def make_smart_server(self, path, backing_server=None):
2768        if backing_server is None:
2769            backing_server = self.get_server()
2770        smart_server = test_server.SmartTCPServer_for_testing()
2771        self.start_server(smart_server, backing_server)
2772        remote_transport = _mod_transport.get_transport_from_url(smart_server.get_url()
2773                                                                 ).clone(path)
2774        return remote_transport
2775
2776    def make_branch_and_memory_tree(self, relpath, format=None):
2777        """Create a branch on the default transport and a MemoryTree for it."""
2778        b = self.make_branch(relpath, format=format)
2779        return b.create_memorytree()
2780
2781    def make_branch_builder(self, relpath, format=None):
2782        branch = self.make_branch(relpath, format=format)
2783        return branchbuilder.BranchBuilder(branch=branch)
2784
2785    def overrideEnvironmentForTesting(self):
2786        test_home_dir = self.test_home_dir
2787        self.overrideEnv('HOME', test_home_dir)
2788        self.overrideEnv('BRZ_HOME', test_home_dir)
2789        self.overrideEnv('GNUPGHOME', os.path.join(test_home_dir, '.gnupg'))
2790
2791    def setup_smart_server_with_call_log(self):
2792        """Sets up a smart server as the transport server with a call log."""
2793        self.transport_server = test_server.SmartTCPServer_for_testing
2794        self.hpss_connections = []
2795        self.hpss_calls = []
2796        import traceback
2797        # Skip the current stack down to the caller of
2798        # setup_smart_server_with_call_log
2799        prefix_length = len(traceback.extract_stack()) - 2
2800
2801        def capture_hpss_call(params):
2802            self.hpss_calls.append(
2803                CapturedCall(params, prefix_length))
2804
2805        def capture_connect(transport):
2806            self.hpss_connections.append(transport)
2807        client._SmartClient.hooks.install_named_hook(
2808            'call', capture_hpss_call, None)
2809        _mod_transport.Transport.hooks.install_named_hook(
2810            'post_connect', capture_connect, None)
2811
2812    def reset_smart_call_log(self):
2813        self.hpss_calls = []
2814        self.hpss_connections = []
2815
2816
2817class TestCaseInTempDir(TestCaseWithMemoryTransport):
2818    """Derived class that runs a test within a temporary directory.
2819
2820    This is useful for tests that need to create a branch, etc.
2821
2822    The directory is created in a slightly complex way: for each
2823    Python invocation, a new temporary top-level directory is created.
2824    All test cases create their own directory within that.  If the
2825    tests complete successfully, the directory is removed.
2826
2827    :ivar test_base_dir: The path of the top-level directory for this
2828    test, which contains a home directory and a work directory.
2829
2830    :ivar test_home_dir: An initially empty directory under test_base_dir
2831    which is used as $HOME for this test.
2832
2833    :ivar test_dir: A directory under test_base_dir used as the current
2834    directory when the test proper is run.
2835    """
2836
2837    OVERRIDE_PYTHON = 'python'
2838
2839    def setUp(self):
2840        super(TestCaseInTempDir, self).setUp()
2841        # Remove the protection set in isolated_environ, we have a proper
2842        # access to disk resources now.
2843        self.overrideEnv('BRZ_LOG', None)
2844
2845    def check_file_contents(self, filename, expect):
2846        self.log("check contents of file %s" % filename)
2847        with open(filename, 'rb') as f:
2848            contents = f.read()
2849        if contents != expect:
2850            self.log("expected: %r" % expect)
2851            self.log("actually: %r" % contents)
2852            self.fail("contents of %s not as expected" % filename)
2853
2854    def _getTestDirPrefix(self):
2855        # create a directory within the top level test directory
2856        if sys.platform in ('win32', 'cygwin'):
2857            name_prefix = re.sub('[<>*=+",:;_/\\-]', '_', self.id())
2858            # windows is likely to have path-length limits so use a short name
2859            name_prefix = name_prefix[-30:]
2860        else:
2861            name_prefix = re.sub('[/]', '_', self.id())
2862        return name_prefix
2863
2864    def makeAndChdirToTestDir(self):
2865        """See TestCaseWithMemoryTransport.makeAndChdirToTestDir().
2866
2867        For TestCaseInTempDir we create a temporary directory based on the test
2868        name and then create two subdirs - test and home under it.
2869        """
2870        name_prefix = osutils.pathjoin(TestCaseWithMemoryTransport.TEST_ROOT,
2871                                       self._getTestDirPrefix())
2872        name = name_prefix
2873        for i in range(100):
2874            if os.path.exists(name):
2875                name = name_prefix + '_' + str(i)
2876            else:
2877                # now create test and home directories within this dir
2878                self.test_base_dir = name
2879                self.addCleanup(self.deleteTestDir)
2880                os.mkdir(self.test_base_dir)
2881                break
2882        self.permit_dir(self.test_base_dir)
2883        # 'sprouting' and 'init' of a branch both walk up the tree to find
2884        # stacking policy to honour; create a bzr dir with an unshared
2885        # repository (but not a branch - our code would be trying to escape
2886        # then!) to stop them, and permit it to be read.
2887        # control = controldir.ControlDir.create(self.test_base_dir)
2888        # control.create_repository()
2889        self.test_home_dir = self.test_base_dir + '/home'
2890        os.mkdir(self.test_home_dir)
2891        self.test_dir = self.test_base_dir + '/work'
2892        os.mkdir(self.test_dir)
2893        os.chdir(self.test_dir)
2894        # put name of test inside
2895        with open(self.test_base_dir + '/name', 'w') as f:
2896            f.write(self.id())
2897
2898    def deleteTestDir(self):
2899        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
2900        _rmtree_temp_dir(self.test_base_dir, test_id=self.id())
2901
2902    def build_tree(self, shape, line_endings='binary', transport=None):
2903        """Build a test tree according to a pattern.
2904
2905        shape is a sequence of file specifications.  If the final
2906        character is '/', a directory is created.
2907
2908        This assumes that all the elements in the tree being built are new.
2909
2910        This doesn't add anything to a branch.
2911
2912        :type shape:    list or tuple.
2913        :param line_endings: Either 'binary' or 'native'
2914            in binary mode, exact contents are written in native mode, the
2915            line endings match the default platform endings.
2916        :param transport: A transport to write to, for building trees on VFS's.
2917            If the transport is readonly or None, "." is opened automatically.
2918        :return: None
2919        """
2920        if type(shape) not in (list, tuple):
2921            raise AssertionError("Parameter 'shape' should be "
2922                                 "a list or a tuple. Got %r instead" % (shape,))
2923        # It's OK to just create them using forward slashes on windows.
2924        if transport is None or transport.is_readonly():
2925            transport = _mod_transport.get_transport_from_path(".")
2926        for name in shape:
2927            self.assertIsInstance(name, str)
2928            if name[-1] == '/':
2929                transport.mkdir(urlutils.escape(name[:-1]))
2930            else:
2931                if line_endings == 'binary':
2932                    end = b'\n'
2933                elif line_endings == 'native':
2934                    end = os.linesep.encode('ascii')
2935                else:
2936                    raise errors.BzrError(
2937                        'Invalid line ending request %r' % line_endings)
2938                content = b"contents of %s%s" % (name.encode('utf-8'), end)
2939                transport.put_bytes_non_atomic(urlutils.escape(name), content)
2940
2941    build_tree_contents = staticmethod(treeshape.build_tree_contents)
2942
2943    def assertInWorkingTree(self, path, root_path='.', tree=None):
2944        """Assert whether path or paths are in the WorkingTree"""
2945        if tree is None:
2946            tree = workingtree.WorkingTree.open(root_path)
2947        if not isinstance(path, str):
2948            for p in path:
2949                self.assertInWorkingTree(p, tree=tree)
2950        else:
2951            self.assertTrue(tree.is_versioned(path),
2952                            path + ' not in working tree.')
2953
2954    def assertNotInWorkingTree(self, path, root_path='.', tree=None):
2955        """Assert whether path or paths are not in the WorkingTree"""
2956        if tree is None:
2957            tree = workingtree.WorkingTree.open(root_path)
2958        if not isinstance(path, str):
2959            for p in path:
2960                self.assertNotInWorkingTree(p, tree=tree)
2961        else:
2962            self.assertFalse(tree.is_versioned(
2963                path), path + ' in working tree.')
2964
2965
2966class TestCaseWithTransport(TestCaseInTempDir):
2967    """A test case that provides get_url and get_readonly_url facilities.
2968
2969    These back onto two transport servers, one for readonly access and one for
2970    read write access.
2971
2972    If no explicit class is provided for readonly access, a
2973    ReadonlyTransportDecorator is used instead which allows the use of non disk
2974    based read write transports.
2975
2976    If an explicit class is provided for readonly access, that server and the
2977    readwrite one must both define get_url() as resolving to os.getcwd().
2978    """
2979
2980    def setUp(self):
2981        super(TestCaseWithTransport, self).setUp()
2982        self.__vfs_server = None
2983
2984    def get_vfs_only_server(self):
2985        """See TestCaseWithMemoryTransport.
2986
2987        This is useful for some tests with specific servers that need
2988        diagnostics.
2989        """
2990        if self.__vfs_server is None:
2991            self.__vfs_server = self.vfs_transport_factory()
2992            self.start_server(self.__vfs_server)
2993        return self.__vfs_server
2994
2995    def make_branch_and_tree(self, relpath, format=None):
2996        """Create a branch on the transport and a tree locally.
2997
2998        If the transport is not a LocalTransport, the Tree can't be created on
2999        the transport.  In that case if the vfs_transport_factory is
3000        LocalURLServer the working tree is created in the local
3001        directory backing the transport, and the returned tree's branch and
3002        repository will also be accessed locally. Otherwise a lightweight
3003        checkout is created and returned.
3004
3005        We do this because we can't physically create a tree in the local
3006        path, with a branch reference to the transport_factory url, and
3007        a branch + repository in the vfs_transport, unless the vfs_transport
3008        namespace is distinct from the local disk - the two branch objects
3009        would collide. While we could construct a tree with its branch object
3010        pointing at the transport_factory transport in memory, reopening it
3011        would behaving unexpectedly, and has in the past caused testing bugs
3012        when we tried to do it that way.
3013
3014        :param format: The BzrDirFormat.
3015        :returns: the WorkingTree.
3016        """
3017        # TODO: always use the local disk path for the working tree,
3018        # this obviously requires a format that supports branch references
3019        # so check for that by checking bzrdir.BzrDirFormat.get_default_format()
3020        # RBC 20060208
3021        format = self.resolve_format(format=format)
3022        if not format.supports_workingtrees:
3023            b = self.make_branch(relpath + '.branch', format=format)
3024            return b.create_checkout(relpath, lightweight=True)
3025        b = self.make_branch(relpath, format=format)
3026        try:
3027            return b.controldir.create_workingtree()
3028        except errors.NotLocalUrl:
3029            # We can only make working trees locally at the moment.  If the
3030            # transport can't support them, then we keep the non-disk-backed
3031            # branch and create a local checkout.
3032            if self.vfs_transport_factory is test_server.LocalURLServer:
3033                # the branch is colocated on disk, we cannot create a checkout.
3034                # hopefully callers will expect this.
3035                local_controldir = controldir.ControlDir.open(
3036                    self.get_vfs_only_url(relpath))
3037                wt = local_controldir.create_workingtree()
3038                if wt.branch._format != b._format:
3039                    wt._branch = b
3040                    # Make sure that assigning to wt._branch fixes wt.branch,
3041                    # in case the implementation details of workingtree objects
3042                    # change.
3043                    self.assertIs(b, wt.branch)
3044                return wt
3045            else:
3046                return b.create_checkout(relpath, lightweight=True)
3047
3048    def assertIsDirectory(self, relpath, transport):
3049        """Assert that relpath within transport is a directory.
3050
3051        This may not be possible on all transports; in that case it propagates
3052        a TransportNotPossible.
3053        """
3054        try:
3055            mode = transport.stat(relpath).st_mode
3056        except errors.NoSuchFile:
3057            self.fail("path %s is not a directory; no such file"
3058                      % (relpath))
3059        if not stat.S_ISDIR(mode):
3060            self.fail("path %s is not a directory; has mode %#o"
3061                      % (relpath, mode))
3062
3063    def assertTreesEqual(self, left, right):
3064        """Check that left and right have the same content and properties."""
3065        # we use a tree delta to check for equality of the content, and we
3066        # manually check for equality of other things such as the parents list.
3067        self.assertEqual(left.get_parent_ids(), right.get_parent_ids())
3068        differences = left.changes_from(right)
3069        self.assertFalse(differences.has_changed(),
3070                         "Trees %r and %r are different: %r" % (left, right, differences))
3071
3072    def disable_missing_extensions_warning(self):
3073        """Some tests expect a precise stderr content.
3074
3075        There is no point in forcing them to duplicate the extension related
3076        warning.
3077        """
3078        config.GlobalConfig().set_user_option(
3079            'suppress_warnings', 'missing_extensions')
3080
3081
3082class ChrootedTestCase(TestCaseWithTransport):
3083    """A support class that provides readonly urls outside the local namespace.
3084
3085    This is done by checking if self.transport_server is a MemoryServer. if it
3086    is then we are chrooted already, if it is not then an HttpServer is used
3087    for readonly urls.
3088
3089    TODO RBC 20060127: make this an option to TestCaseWithTransport so it can
3090                       be used without needed to redo it when a different
3091                       subclass is in use ?
3092    """
3093
3094    def setUp(self):
3095        from breezy.tests import http_server
3096        super(ChrootedTestCase, self).setUp()
3097        if not self.vfs_transport_factory == memory.MemoryServer:
3098            self.transport_readonly_server = http_server.HttpServer
3099
3100
3101def condition_id_re(pattern):
3102    """Create a condition filter which performs a re check on a test's id.
3103
3104    :param pattern: A regular expression string.
3105    :return: A callable that returns True if the re matches.
3106    """
3107    filter_re = re.compile(pattern, 0)
3108
3109    def condition(test):
3110        test_id = test.id()
3111        return filter_re.search(test_id)
3112    return condition
3113
3114
3115def condition_isinstance(klass_or_klass_list):
3116    """Create a condition filter which returns isinstance(param, klass).
3117
3118    :return: A callable which when called with one parameter obj return the
3119        result of isinstance(obj, klass_or_klass_list).
3120    """
3121    def condition(obj):
3122        return isinstance(obj, klass_or_klass_list)
3123    return condition
3124
3125
3126def condition_id_in_list(id_list):
3127    """Create a condition filter which verify that test's id in a list.
3128
3129    :param id_list: A TestIdList object.
3130    :return: A callable that returns True if the test's id appears in the list.
3131    """
3132    def condition(test):
3133        return id_list.includes(test.id())
3134    return condition
3135
3136
3137def condition_id_startswith(starts):
3138    """Create a condition filter verifying that test's id starts with a string.
3139
3140    :param starts: A list of string.
3141    :return: A callable that returns True if the test's id starts with one of
3142        the given strings.
3143    """
3144    def condition(test):
3145        for start in starts:
3146            if test.id().startswith(start):
3147                return True
3148        return False
3149    return condition
3150
3151
3152def exclude_tests_by_condition(suite, condition):
3153    """Create a test suite which excludes some tests from suite.
3154
3155    :param suite: The suite to get tests from.
3156    :param condition: A callable whose result evaluates True when called with a
3157        test case which should be excluded from the result.
3158    :return: A suite which contains the tests found in suite that fail
3159        condition.
3160    """
3161    result = []
3162    for test in iter_suite_tests(suite):
3163        if not condition(test):
3164            result.append(test)
3165    return TestUtil.TestSuite(result)
3166
3167
3168def filter_suite_by_condition(suite, condition):
3169    """Create a test suite by filtering another one.
3170
3171    :param suite: The source suite.
3172    :param condition: A callable whose result evaluates True when called with a
3173        test case which should be included in the result.
3174    :return: A suite which contains the tests found in suite that pass
3175        condition.
3176    """
3177    result = []
3178    for test in iter_suite_tests(suite):
3179        if condition(test):
3180            result.append(test)
3181    return TestUtil.TestSuite(result)
3182
3183
3184def filter_suite_by_re(suite, pattern):
3185    """Create a test suite by filtering another one.
3186
3187    :param suite:           the source suite
3188    :param pattern:         pattern that names must match
3189    :returns: the newly created suite
3190    """
3191    condition = condition_id_re(pattern)
3192    result_suite = filter_suite_by_condition(suite, condition)
3193    return result_suite
3194
3195
3196def filter_suite_by_id_list(suite, test_id_list):
3197    """Create a test suite by filtering another one.
3198
3199    :param suite: The source suite.
3200    :param test_id_list: A list of the test ids to keep as strings.
3201    :returns: the newly created suite
3202    """
3203    condition = condition_id_in_list(test_id_list)
3204    result_suite = filter_suite_by_condition(suite, condition)
3205    return result_suite
3206
3207
3208def filter_suite_by_id_startswith(suite, start):
3209    """Create a test suite by filtering another one.
3210
3211    :param suite: The source suite.
3212    :param start: A list of string the test id must start with one of.
3213    :returns: the newly created suite
3214    """
3215    condition = condition_id_startswith(start)
3216    result_suite = filter_suite_by_condition(suite, condition)
3217    return result_suite
3218
3219
3220def exclude_tests_by_re(suite, pattern):
3221    """Create a test suite which excludes some tests from suite.
3222
3223    :param suite: The suite to get tests from.
3224    :param pattern: A regular expression string. Test ids that match this
3225        pattern will be excluded from the result.
3226    :return: A TestSuite that contains all the tests from suite without the
3227        tests that matched pattern. The order of tests is the same as it was in
3228        suite.
3229    """
3230    return exclude_tests_by_condition(suite, condition_id_re(pattern))
3231
3232
3233def preserve_input(something):
3234    """A helper for performing test suite transformation chains.
3235
3236    :param something: Anything you want to preserve.
3237    :return: Something.
3238    """
3239    return something
3240
3241
3242def randomize_suite(suite):
3243    """Return a new TestSuite with suite's tests in random order.
3244
3245    The tests in the input suite are flattened into a single suite in order to
3246    accomplish this. Any nested TestSuites are removed to provide global
3247    randomness.
3248    """
3249    tests = list(iter_suite_tests(suite))
3250    random.shuffle(tests)
3251    return TestUtil.TestSuite(tests)
3252
3253
3254def split_suite_by_condition(suite, condition):
3255    """Split a test suite into two by a condition.
3256
3257    :param suite: The suite to split.
3258    :param condition: The condition to match on. Tests that match this
3259        condition are returned in the first test suite, ones that do not match
3260        are in the second suite.
3261    :return: A tuple of two test suites, where the first contains tests from
3262        suite matching the condition, and the second contains the remainder
3263        from suite. The order within each output suite is the same as it was in
3264        suite.
3265    """
3266    matched = []
3267    did_not_match = []
3268    for test in iter_suite_tests(suite):
3269        if condition(test):
3270            matched.append(test)
3271        else:
3272            did_not_match.append(test)
3273    return TestUtil.TestSuite(matched), TestUtil.TestSuite(did_not_match)
3274
3275
3276def split_suite_by_re(suite, pattern):
3277    """Split a test suite into two by a regular expression.
3278
3279    :param suite: The suite to split.
3280    :param pattern: A regular expression string. Test ids that match this
3281        pattern will be in the first test suite returned, and the others in the
3282        second test suite returned.
3283    :return: A tuple of two test suites, where the first contains tests from
3284        suite matching pattern, and the second contains the remainder from
3285        suite. The order within each output suite is the same as it was in
3286        suite.
3287    """
3288    return split_suite_by_condition(suite, condition_id_re(pattern))
3289
3290
3291def run_suite(suite, name='test', verbose=False, pattern=".*",
3292              stop_on_failure=False,
3293              transport=None, lsprof_timed=None, bench_history=None,
3294              matching_tests_first=None,
3295              list_only=False,
3296              random_seed=None,
3297              exclude_pattern=None,
3298              strict=False,
3299              runner_class=None,
3300              suite_decorators=None,
3301              stream=None,
3302              result_decorators=None,
3303              ):
3304    """Run a test suite for brz selftest.
3305
3306    :param runner_class: The class of runner to use. Must support the
3307        constructor arguments passed by run_suite which are more than standard
3308        python uses.
3309    :return: A boolean indicating success.
3310    """
3311    TestCase._gather_lsprof_in_benchmarks = lsprof_timed
3312    if verbose:
3313        verbosity = 2
3314    else:
3315        verbosity = 1
3316    if runner_class is None:
3317        runner_class = TextTestRunner
3318    if stream is None:
3319        stream = sys.stdout
3320    runner = runner_class(stream=stream,
3321                          descriptions=0,
3322                          verbosity=verbosity,
3323                          bench_history=bench_history,
3324                          strict=strict,
3325                          result_decorators=result_decorators,
3326                          )
3327    runner.stop_on_failure = stop_on_failure
3328    if isinstance(suite, unittest.TestSuite):
3329        # Empty out _tests list of passed suite and populate new TestSuite
3330        suite._tests[:], suite = [], TestSuite(suite)
3331    # built in decorator factories:
3332    decorators = [
3333        random_order(random_seed, runner),
3334        exclude_tests(exclude_pattern),
3335        ]
3336    if matching_tests_first:
3337        decorators.append(tests_first(pattern))
3338    else:
3339        decorators.append(filter_tests(pattern))
3340    if suite_decorators:
3341        decorators.extend(suite_decorators)
3342    # tell the result object how many tests will be running: (except if
3343    # --parallel=fork is being used. Robert said he will provide a better
3344    # progress design later -- vila 20090817)
3345    if fork_decorator not in decorators:
3346        decorators.append(CountingDecorator)
3347    for decorator in decorators:
3348        suite = decorator(suite)
3349    if list_only:
3350        # Done after test suite decoration to allow randomisation etc
3351        # to take effect, though that is of marginal benefit.
3352        if verbosity >= 2:
3353            stream.write("Listing tests only ...\n")
3354        if getattr(runner, 'list', None) is not None:
3355            runner.list(suite)
3356        else:
3357            for t in iter_suite_tests(suite):
3358                stream.write("%s\n" % (t.id()))
3359        return True
3360    result = runner.run(suite)
3361    if strict and getattr(result, 'wasStrictlySuccessful', False):
3362        return result.wasStrictlySuccessful()
3363    else:
3364        return result.wasSuccessful()
3365
3366
3367# A registry where get() returns a suite decorator.
3368parallel_registry = registry.Registry()
3369
3370
3371def fork_decorator(suite):
3372    if getattr(os, "fork", None) is None:
3373        raise errors.CommandError("platform does not support fork,"
3374                                     " try --parallel=subprocess instead.")
3375    concurrency = osutils.local_concurrency()
3376    if concurrency == 1:
3377        return suite
3378    from testtools import ConcurrentTestSuite
3379    return ConcurrentTestSuite(suite, fork_for_tests)
3380
3381
3382parallel_registry.register('fork', fork_decorator)
3383
3384
3385def subprocess_decorator(suite):
3386    concurrency = osutils.local_concurrency()
3387    if concurrency == 1:
3388        return suite
3389    from testtools import ConcurrentTestSuite
3390    return ConcurrentTestSuite(suite, reinvoke_for_tests)
3391
3392
3393parallel_registry.register('subprocess', subprocess_decorator)
3394
3395
3396def exclude_tests(exclude_pattern):
3397    """Return a test suite decorator that excludes tests."""
3398    if exclude_pattern is None:
3399        return identity_decorator
3400
3401    def decorator(suite):
3402        return ExcludeDecorator(suite, exclude_pattern)
3403    return decorator
3404
3405
3406def filter_tests(pattern):
3407    if pattern == '.*':
3408        return identity_decorator
3409
3410    def decorator(suite):
3411        return FilterTestsDecorator(suite, pattern)
3412    return decorator
3413
3414
3415def random_order(random_seed, runner):
3416    """Return a test suite decorator factory for randomising tests order.
3417
3418    :param random_seed: now, a string which casts to an integer, or an integer.
3419    :param runner: A test runner with a stream attribute to report on.
3420    """
3421    if random_seed is None:
3422        return identity_decorator
3423
3424    def decorator(suite):
3425        return RandomDecorator(suite, random_seed, runner.stream)
3426    return decorator
3427
3428
3429def tests_first(pattern):
3430    if pattern == '.*':
3431        return identity_decorator
3432
3433    def decorator(suite):
3434        return TestFirstDecorator(suite, pattern)
3435    return decorator
3436
3437
3438def identity_decorator(suite):
3439    """Return suite."""
3440    return suite
3441
3442
3443class TestDecorator(TestUtil.TestSuite):
3444    """A decorator for TestCase/TestSuite objects.
3445
3446    Contains rather than flattening suite passed on construction
3447    """
3448
3449    def __init__(self, suite=None):
3450        super(TestDecorator, self).__init__()
3451        if suite is not None:
3452            self.addTest(suite)
3453
3454    # Don't need subclass run method with suite emptying
3455    run = unittest.TestSuite.run
3456
3457
3458class CountingDecorator(TestDecorator):
3459    """A decorator which calls result.progress(self.countTestCases)."""
3460
3461    def run(self, result):
3462        progress_method = getattr(result, 'progress', None)
3463        if callable(progress_method):
3464            progress_method(self.countTestCases(), SUBUNIT_SEEK_SET)
3465        return super(CountingDecorator, self).run(result)
3466
3467
3468class ExcludeDecorator(TestDecorator):
3469    """A decorator which excludes test matching an exclude pattern."""
3470
3471    def __init__(self, suite, exclude_pattern):
3472        super(ExcludeDecorator, self).__init__(
3473            exclude_tests_by_re(suite, exclude_pattern))
3474
3475
3476class FilterTestsDecorator(TestDecorator):
3477    """A decorator which filters tests to those matching a pattern."""
3478
3479    def __init__(self, suite, pattern):
3480        super(FilterTestsDecorator, self).__init__(
3481            filter_suite_by_re(suite, pattern))
3482
3483
3484class RandomDecorator(TestDecorator):
3485    """A decorator which randomises the order of its tests."""
3486
3487    def __init__(self, suite, random_seed, stream):
3488        random_seed = self.actual_seed(random_seed)
3489        stream.write("Randomizing test order using seed %s\n\n" %
3490                     (random_seed,))
3491        # Initialise the random number generator.
3492        random.seed(random_seed)
3493        super(RandomDecorator, self).__init__(randomize_suite(suite))
3494
3495    @staticmethod
3496    def actual_seed(seed):
3497        if seed == "now":
3498            # We convert the seed to an integer to make it reuseable across
3499            # invocations (because the user can reenter it).
3500            return int(time.time())
3501        else:
3502            # Convert the seed to an integer if we can
3503            try:
3504                return int(seed)
3505            except (TypeError, ValueError):
3506                pass
3507        return seed
3508
3509
3510class TestFirstDecorator(TestDecorator):
3511    """A decorator which moves named tests to the front."""
3512
3513    def __init__(self, suite, pattern):
3514        super(TestFirstDecorator, self).__init__()
3515        self.addTests(split_suite_by_re(suite, pattern))
3516
3517
3518def partition_tests(suite, count):
3519    """Partition suite into count lists of tests."""
3520    # This just assigns tests in a round-robin fashion.  On one hand this
3521    # splits up blocks of related tests that might run faster if they shared
3522    # resources, but on the other it avoids assigning blocks of slow tests to
3523    # just one partition.  So the slowest partition shouldn't be much slower
3524    # than the fastest.
3525    partitions = [list() for i in range(count)]
3526    tests = iter_suite_tests(suite)
3527    for partition, test in zip(itertools.cycle(partitions), tests):
3528        partition.append(test)
3529    return partitions
3530
3531
3532def workaround_zealous_crypto_random():
3533    """Crypto.Random want to help us being secure, but we don't care here.
3534
3535    This workaround some test failure related to the sftp server. Once paramiko
3536    stop using the controversial API in Crypto.Random, we may get rid of it.
3537    """
3538    try:
3539        from Crypto.Random import atfork
3540        atfork()
3541    except ImportError:
3542        pass
3543
3544
3545def fork_for_tests(suite):
3546    """Take suite and start up one runner per CPU by forking()
3547
3548    :return: An iterable of TestCase-like objects which can each have
3549        run(result) called on them to feed tests to result.
3550    """
3551    concurrency = osutils.local_concurrency()
3552    result = []
3553    from subunit import ProtocolTestCase
3554    from subunit.test_results import AutoTimingTestResultDecorator
3555
3556    class TestInOtherProcess(ProtocolTestCase):
3557        # Should be in subunit, I think. RBC.
3558        def __init__(self, stream, pid):
3559            ProtocolTestCase.__init__(self, stream)
3560            self.pid = pid
3561
3562        def run(self, result):
3563            try:
3564                ProtocolTestCase.run(self, result)
3565            finally:
3566                pid, status = os.waitpid(self.pid, 0)
3567            # GZ 2011-10-18: If status is nonzero, should report to the result
3568            #                that something went wrong.
3569
3570    test_blocks = partition_tests(suite, concurrency)
3571    # Clear the tests from the original suite so it doesn't keep them alive
3572    suite._tests[:] = []
3573    for process_tests in test_blocks:
3574        process_suite = TestUtil.TestSuite(process_tests)
3575        # Also clear each split list so new suite has only reference
3576        process_tests[:] = []
3577        c2pread, c2pwrite = os.pipe()
3578        pid = os.fork()
3579        if pid == 0:
3580            try:
3581                stream = os.fdopen(c2pwrite, 'wb', 0)
3582                workaround_zealous_crypto_random()
3583                try:
3584                    import coverage
3585                except ImportError:
3586                    pass
3587                else:
3588                    coverage.process_startup()
3589                os.close(c2pread)
3590                # Leave stderr and stdout open so we can see test noise
3591                # Close stdin so that the child goes away if it decides to
3592                # read from stdin (otherwise its a roulette to see what
3593                # child actually gets keystrokes for pdb etc).
3594                sys.stdin.close()
3595                subunit_result = AutoTimingTestResultDecorator(
3596                    SubUnitBzrProtocolClientv1(stream))
3597                process_suite.run(subunit_result)
3598            except:
3599                # Try and report traceback on stream, but exit with error even
3600                # if stream couldn't be created or something else goes wrong.
3601                # The traceback is formatted to a string and written in one go
3602                # to avoid interleaving lines from multiple failing children.
3603                tb = traceback.format_exc()
3604                if isinstance(tb, str):
3605                    tb = tb.encode('utf-8')
3606                try:
3607                    stream.write(tb)
3608                finally:
3609                    stream.flush()
3610                    os._exit(1)
3611            os._exit(0)
3612        else:
3613            os.close(c2pwrite)
3614            stream = os.fdopen(c2pread, 'rb', 0)
3615            test = TestInOtherProcess(stream, pid)
3616            result.append(test)
3617    return result
3618
3619
3620def reinvoke_for_tests(suite):
3621    """Take suite and start up one runner per CPU using subprocess().
3622
3623    :return: An iterable of TestCase-like objects which can each have
3624        run(result) called on them to feed tests to result.
3625    """
3626    concurrency = osutils.local_concurrency()
3627    result = []
3628    from subunit import ProtocolTestCase
3629
3630    class TestInSubprocess(ProtocolTestCase):
3631        def __init__(self, process, name):
3632            ProtocolTestCase.__init__(self, process.stdout)
3633            self.process = process
3634            self.process.stdin.close()
3635            self.name = name
3636
3637        def run(self, result):
3638            try:
3639                ProtocolTestCase.run(self, result)
3640            finally:
3641                self.process.wait()
3642                os.unlink(self.name)
3643            # print "pid %d finished" % finished_process
3644    test_blocks = partition_tests(suite, concurrency)
3645    for process_tests in test_blocks:
3646        # ugly; currently reimplement rather than reuses TestCase methods.
3647        bzr_path = os.path.dirname(os.path.dirname(breezy.__file__)) + '/bzr'
3648        if not os.path.isfile(bzr_path):
3649            # We are probably installed. Assume sys.argv is the right file
3650            bzr_path = sys.argv[0]
3651        bzr_path = [bzr_path]
3652        if sys.platform == "win32":
3653            # if we're on windows, we can't execute the bzr script directly
3654            bzr_path = [sys.executable] + bzr_path
3655        fd, test_list_file_name = tempfile.mkstemp()
3656        test_list_file = os.fdopen(fd, 'wb', 1)
3657        for test in process_tests:
3658            test_list_file.write(test.id() + '\n')
3659        test_list_file.close()
3660        try:
3661            argv = bzr_path + ['selftest', '--load-list', test_list_file_name,
3662                               '--subunit']
3663            if '--no-plugins' in sys.argv:
3664                argv.append('--no-plugins')
3665            # stderr=subprocess.STDOUT would be ideal, but until we prevent
3666            # noise on stderr it can interrupt the subunit protocol.
3667            process = subprocess.Popen(argv, stdin=subprocess.PIPE,
3668                                       stdout=subprocess.PIPE,
3669                                       stderr=subprocess.PIPE,
3670                                       bufsize=1)
3671            test = TestInSubprocess(process, test_list_file_name)
3672            result.append(test)
3673        except:
3674            os.unlink(test_list_file_name)
3675            raise
3676    return result
3677
3678
3679class ProfileResult(testtools.ExtendedToOriginalDecorator):
3680    """Generate profiling data for all activity between start and success.
3681
3682    The profile data is appended to the test's _benchcalls attribute and can
3683    be accessed by the forwarded-to TestResult.
3684
3685    While it might be cleaner do accumulate this in stopTest, addSuccess is
3686    where our existing output support for lsprof is, and this class aims to
3687    fit in with that: while it could be moved it's not necessary to accomplish
3688    test profiling, nor would it be dramatically cleaner.
3689    """
3690
3691    def startTest(self, test):
3692        self.profiler = breezy.lsprof.BzrProfiler()
3693        # Prevent deadlocks in tests that use lsprof: those tests will
3694        # unavoidably fail.
3695        breezy.lsprof.BzrProfiler.profiler_block = 0
3696        self.profiler.start()
3697        testtools.ExtendedToOriginalDecorator.startTest(self, test)
3698
3699    def addSuccess(self, test):
3700        stats = self.profiler.stop()
3701        try:
3702            calls = test._benchcalls
3703        except AttributeError:
3704            test._benchcalls = []
3705            calls = test._benchcalls
3706        calls.append(((test.id(), "", ""), stats))
3707        testtools.ExtendedToOriginalDecorator.addSuccess(self, test)
3708
3709    def stopTest(self, test):
3710        testtools.ExtendedToOriginalDecorator.stopTest(self, test)
3711        self.profiler = None
3712
3713
3714# Controlled by "brz selftest -E=..." option
3715# Currently supported:
3716#   -Eallow_debug           Will no longer clear debug.debug_flags() so it
3717#                           preserves any flags supplied at the command line.
3718#   -Edisable_lock_checks   Turns errors in mismatched locks into simple prints
3719#                           rather than failing tests. And no longer raise
3720#                           LockContention when fctnl locks are not being used
3721#                           with proper exclusion rules.
3722#   -Ethreads               Will display thread ident at creation/join time to
3723#                           help track thread leaks
3724#   -Euncollected_cases     Display the identity of any test cases that weren't
3725#                           deallocated after being completed.
3726#   -Econfig_stats          Will collect statistics using addDetail
3727selftest_debug_flags = set()
3728
3729
3730def selftest(verbose=False, pattern=".*", stop_on_failure=True,
3731             transport=None,
3732             test_suite_factory=None,
3733             lsprof_timed=None,
3734             bench_history=None,
3735             matching_tests_first=None,
3736             list_only=False,
3737             random_seed=None,
3738             exclude_pattern=None,
3739             strict=False,
3740             load_list=None,
3741             debug_flags=None,
3742             starting_with=None,
3743             runner_class=None,
3744             suite_decorators=None,
3745             stream=None,
3746             lsprof_tests=False,
3747             ):
3748    """Run the whole test suite under the enhanced runner"""
3749    # XXX: Very ugly way to do this...
3750    # Disable warning about old formats because we don't want it to disturb
3751    # any blackbox tests.
3752    from breezy import repository
3753    repository._deprecation_warning_done = True
3754
3755    global default_transport
3756    if transport is None:
3757        transport = default_transport
3758    old_transport = default_transport
3759    default_transport = transport
3760    global selftest_debug_flags
3761    old_debug_flags = selftest_debug_flags
3762    if debug_flags is not None:
3763        selftest_debug_flags = set(debug_flags)
3764    try:
3765        if load_list is None:
3766            keep_only = None
3767        else:
3768            keep_only = load_test_id_list(load_list)
3769        if starting_with:
3770            starting_with = [test_prefix_alias_registry.resolve_alias(start)
3771                             for start in starting_with]
3772            # Always consider 'unittest' an interesting name so that failed
3773            # suites wrapped as test cases appear in the output.
3774            starting_with.append('unittest')
3775        if test_suite_factory is None:
3776            # Reduce loading time by loading modules based on the starting_with
3777            # patterns.
3778            suite = test_suite(keep_only, starting_with)
3779        else:
3780            suite = test_suite_factory()
3781        if starting_with:
3782            # But always filter as requested.
3783            suite = filter_suite_by_id_startswith(suite, starting_with)
3784        result_decorators = []
3785        if lsprof_tests:
3786            result_decorators.append(ProfileResult)
3787        return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
3788                         stop_on_failure=stop_on_failure,
3789                         transport=transport,
3790                         lsprof_timed=lsprof_timed,
3791                         bench_history=bench_history,
3792                         matching_tests_first=matching_tests_first,
3793                         list_only=list_only,
3794                         random_seed=random_seed,
3795                         exclude_pattern=exclude_pattern,
3796                         strict=strict,
3797                         runner_class=runner_class,
3798                         suite_decorators=suite_decorators,
3799                         stream=stream,
3800                         result_decorators=result_decorators,
3801                         )
3802    finally:
3803        default_transport = old_transport
3804        selftest_debug_flags = old_debug_flags
3805
3806
3807def load_test_id_list(file_name):
3808    """Load a test id list from a text file.
3809
3810    The format is one test id by line.  No special care is taken to impose
3811    strict rules, these test ids are used to filter the test suite so a test id
3812    that do not match an existing test will do no harm. This allows user to add
3813    comments, leave blank lines, etc.
3814    """
3815    test_list = []
3816    try:
3817        ftest = open(file_name, 'rt')
3818    except IOError as e:
3819        if e.errno != errno.ENOENT:
3820            raise
3821        else:
3822            raise errors.NoSuchFile(file_name)
3823
3824    for test_name in ftest.readlines():
3825        test_list.append(test_name.strip())
3826    ftest.close()
3827    return test_list
3828
3829
3830def suite_matches_id_list(test_suite, id_list):
3831    """Warns about tests not appearing or appearing more than once.
3832
3833    :param test_suite: A TestSuite object.
3834    :param test_id_list: The list of test ids that should be found in
3835         test_suite.
3836
3837    :return: (absents, duplicates) absents is a list containing the test found
3838        in id_list but not in test_suite, duplicates is a list containing the
3839        tests found multiple times in test_suite.
3840
3841    When using a prefined test id list, it may occurs that some tests do not
3842    exist anymore or that some tests use the same id. This function warns the
3843    tester about potential problems in his workflow (test lists are volatile)
3844    or in the test suite itself (using the same id for several tests does not
3845    help to localize defects).
3846    """
3847    # Build a dict counting id occurrences
3848    tests = dict()
3849    for test in iter_suite_tests(test_suite):
3850        id = test.id()
3851        tests[id] = tests.get(id, 0) + 1
3852
3853    not_found = []
3854    duplicates = []
3855    for id in id_list:
3856        occurs = tests.get(id, 0)
3857        if not occurs:
3858            not_found.append(id)
3859        elif occurs > 1:
3860            duplicates.append(id)
3861
3862    return not_found, duplicates
3863
3864
3865class TestIdList(object):
3866    """Test id list to filter a test suite.
3867
3868    Relying on the assumption that test ids are built as:
3869    <module>[.<class>.<method>][(<param>+)], <module> being in python dotted
3870    notation, this class offers methods to :
3871    - avoid building a test suite for modules not refered to in the test list,
3872    - keep only the tests listed from the module test suite.
3873    """
3874
3875    def __init__(self, test_id_list):
3876        # When a test suite needs to be filtered against us we compare test ids
3877        # for equality, so a simple dict offers a quick and simple solution.
3878        self.tests = dict().fromkeys(test_id_list, True)
3879
3880        # While unittest.TestCase have ids like:
3881        # <module>.<class>.<method>[(<param+)],
3882        # doctest.DocTestCase can have ids like:
3883        # <module>
3884        # <module>.<class>
3885        # <module>.<function>
3886        # <module>.<class>.<method>
3887
3888        # Since we can't predict a test class from its name only, we settle on
3889        # a simple constraint: a test id always begins with its module name.
3890
3891        modules = {}
3892        for test_id in test_id_list:
3893            parts = test_id.split('.')
3894            mod_name = parts.pop(0)
3895            modules[mod_name] = True
3896            for part in parts:
3897                mod_name += '.' + part
3898                modules[mod_name] = True
3899        self.modules = modules
3900
3901    def refers_to(self, module_name):
3902        """Is there tests for the module or one of its sub modules."""
3903        return module_name in self.modules
3904
3905    def includes(self, test_id):
3906        return test_id in self.tests
3907
3908
3909class TestPrefixAliasRegistry(registry.Registry):
3910    """A registry for test prefix aliases.
3911
3912    This helps implement shorcuts for the --starting-with selftest
3913    option. Overriding existing prefixes is not allowed but not fatal (a
3914    warning will be emitted).
3915    """
3916
3917    def register(self, key, obj, help=None, info=None,
3918                 override_existing=False):
3919        """See Registry.register.
3920
3921        Trying to override an existing alias causes a warning to be emitted,
3922        not a fatal execption.
3923        """
3924        try:
3925            super(TestPrefixAliasRegistry, self).register(
3926                key, obj, help=help, info=info, override_existing=False)
3927        except KeyError:
3928            actual = self.get(key)
3929            trace.note(
3930                'Test prefix alias %s is already used for %s, ignoring %s'
3931                % (key, actual, obj))
3932
3933    def resolve_alias(self, id_start):
3934        """Replace the alias by the prefix in the given string.
3935
3936        Using an unknown prefix is an error to help catching typos.
3937        """
3938        parts = id_start.split('.')
3939        try:
3940            parts[0] = self.get(parts[0])
3941        except KeyError:
3942            raise errors.CommandError(
3943                '%s is not a known test prefix alias' % parts[0])
3944        return '.'.join(parts)
3945
3946
3947test_prefix_alias_registry = TestPrefixAliasRegistry()
3948"""Registry of test prefix aliases."""
3949
3950
3951# This alias allows to detect typos ('bzrlin.') by making all valid test ids
3952# appear prefixed ('breezy.' is "replaced" by 'breezy.').
3953test_prefix_alias_registry.register('breezy', 'breezy')
3954
3955# Obvious highest levels prefixes, feel free to add your own via a plugin
3956test_prefix_alias_registry.register('bd', 'breezy.doc')
3957test_prefix_alias_registry.register('bu', 'breezy.utils')
3958test_prefix_alias_registry.register('bt', 'breezy.tests')
3959test_prefix_alias_registry.register('bgt', 'breezy.git.tests')
3960test_prefix_alias_registry.register('bbt', 'breezy.bzr.tests')
3961test_prefix_alias_registry.register('bb', 'breezy.tests.blackbox')
3962test_prefix_alias_registry.register('bp', 'breezy.plugins')
3963
3964
3965def _test_suite_testmod_names():
3966    """Return the standard list of test module names to test."""
3967    return [
3968        'breezy.bzr.tests',
3969        'breezy.git.tests',
3970        'breezy.tests.blackbox',
3971        'breezy.tests.commands',
3972        'breezy.tests.per_branch',
3973        'breezy.tests.per_controldir',
3974        'breezy.tests.per_controldir_colo',
3975        'breezy.tests.per_foreign_vcs',
3976        'breezy.tests.per_interrepository',
3977        'breezy.tests.per_intertree',
3978        'breezy.tests.per_interbranch',
3979        'breezy.tests.per_lock',
3980        'breezy.tests.per_merger',
3981        'breezy.tests.per_transport',
3982        'breezy.tests.per_tree',
3983        'breezy.tests.per_repository',
3984        'breezy.tests.per_repository_reference',
3985        'breezy.tests.per_uifactory',
3986        'breezy.tests.per_workingtree',
3987        'breezy.tests.test__annotator',
3988        'breezy.tests.test__bencode',
3989        'breezy.tests.test__known_graph',
3990        'breezy.tests.test__simple_set',
3991        'breezy.tests.test__static_tuple',
3992        'breezy.tests.test__walkdirs_win32',
3993        'breezy.tests.test_ancestry',
3994        'breezy.tests.test_annotate',
3995        'breezy.tests.test_atomicfile',
3996        'breezy.tests.test_bad_files',
3997        'breezy.tests.test_bisect',
3998        'breezy.tests.test_bisect_multi',
3999        'breezy.tests.test_branch',
4000        'breezy.tests.test_branchbuilder',
4001        'breezy.tests.test_bugtracker',
4002        'breezy.tests.test__chunks_to_lines',
4003        'breezy.tests.test_cache_utf8',
4004        'breezy.tests.test_chunk_writer',
4005        'breezy.tests.test_clean_tree',
4006        'breezy.tests.test_cmdline',
4007        'breezy.tests.test_commands',
4008        'breezy.tests.test_commit',
4009        'breezy.tests.test_commit_merge',
4010        'breezy.tests.test_config',
4011        'breezy.tests.test_bedding',
4012        'breezy.tests.test_conflicts',
4013        'breezy.tests.test_controldir',
4014        'breezy.tests.test_counted_lock',
4015        'breezy.tests.test_crash',
4016        'breezy.tests.test_decorators',
4017        'breezy.tests.test_delta',
4018        'breezy.tests.test_debug',
4019        'breezy.tests.test_diff',
4020        'breezy.tests.test_directory_service',
4021        'breezy.tests.test_dirty_tracker',
4022        'breezy.tests.test_email_message',
4023        'breezy.tests.test_eol_filters',
4024        'breezy.tests.test_errors',
4025        'breezy.tests.test_estimate_compressed_size',
4026        'breezy.tests.test_export',
4027        'breezy.tests.test_export_pot',
4028        'breezy.tests.test_extract',
4029        'breezy.tests.test_features',
4030        'breezy.tests.test_fetch',
4031        'breezy.tests.test_fetch_ghosts',
4032        'breezy.tests.test_fixtures',
4033        'breezy.tests.test_fifo_cache',
4034        'breezy.tests.test_filters',
4035        'breezy.tests.test_filter_tree',
4036        'breezy.tests.test_foreign',
4037        'breezy.tests.test_generate_docs',
4038        'breezy.tests.test_globbing',
4039        'breezy.tests.test_gpg',
4040        'breezy.tests.test_graph',
4041        'breezy.tests.test_grep',
4042        'breezy.tests.test_hashcache',
4043        'breezy.tests.test_help',
4044        'breezy.tests.test_hooks',
4045        'breezy.tests.test_http',
4046        'breezy.tests.test_http_response',
4047        'breezy.tests.test_https_ca_bundle',
4048        'breezy.tests.test_https_urllib',
4049        'breezy.tests.test_i18n',
4050        'breezy.tests.test_identitymap',
4051        'breezy.tests.test_ignores',
4052        'breezy.tests.test_import_tariff',
4053        'breezy.tests.test_info',
4054        'breezy.tests.test_lazy_import',
4055        'breezy.tests.test_lazy_regex',
4056        'breezy.tests.test_library_state',
4057        'breezy.tests.test_location',
4058        'breezy.tests.test_lock',
4059        'breezy.tests.test_lockable_files',
4060        'breezy.tests.test_lockdir',
4061        'breezy.tests.test_log',
4062        'breezy.tests.test_lru_cache',
4063        'breezy.tests.test_lsprof',
4064        'breezy.tests.test_mail_client',
4065        'breezy.tests.test_matchers',
4066        'breezy.tests.test_memorybranch',
4067        'breezy.tests.test_memorytree',
4068        'breezy.tests.test_merge',
4069        'breezy.tests.test_merge3',
4070        'breezy.tests.test_mergeable',
4071        'breezy.tests.test_merge_core',
4072        'breezy.tests.test_merge_directive',
4073        'breezy.tests.test_mergetools',
4074        'breezy.tests.test_missing',
4075        'breezy.tests.test_msgeditor',
4076        'breezy.tests.test_multiparent',
4077        'breezy.tests.test_multiwalker',
4078        'breezy.tests.test_mutabletree',
4079        'breezy.tests.test_nonascii',
4080        'breezy.tests.test_options',
4081        'breezy.tests.test_osutils',
4082        'breezy.tests.test_osutils_encodings',
4083        'breezy.tests.test_patch',
4084        'breezy.tests.test_patches',
4085        'breezy.tests.test_permissions',
4086        'breezy.tests.test_plugins',
4087        'breezy.tests.test_progress',
4088        'breezy.tests.test_propose',
4089        'breezy.tests.test_pyutils',
4090        'breezy.tests.test_reconcile',
4091        'breezy.tests.test_reconfigure',
4092        'breezy.tests.test_registry',
4093        'breezy.tests.test_rename_map',
4094        'breezy.tests.test_revert',
4095        'breezy.tests.test_revision',
4096        'breezy.tests.test_revisionspec',
4097        'breezy.tests.test_revisiontree',
4098        'breezy.tests.test_rio',
4099        'breezy.tests.test__rio',
4100        'breezy.tests.test_rules',
4101        'breezy.tests.test_url_policy_open',
4102        'breezy.tests.test_sampler',
4103        'breezy.tests.test_scenarios',
4104        'breezy.tests.test_script',
4105        'breezy.tests.test_selftest',
4106        'breezy.tests.test_setup',
4107        'breezy.tests.test_sftp_transport',
4108        'breezy.tests.test_shelf',
4109        'breezy.tests.test_shelf_ui',
4110        'breezy.tests.test_smart_add',
4111        'breezy.tests.test_smtp_connection',
4112        'breezy.tests.test_source',
4113        'breezy.tests.test_ssh_transport',
4114        'breezy.tests.test_status',
4115        'breezy.tests.test_strace',
4116        'breezy.tests.test_subsume',
4117        'breezy.tests.test_switch',
4118        'breezy.tests.test_symbol_versioning',
4119        'breezy.tests.test_tag',
4120        'breezy.tests.test_test_server',
4121        'breezy.tests.test_textfile',
4122        'breezy.tests.test_textmerge',
4123        'breezy.tests.test_cethread',
4124        'breezy.tests.test_timestamp',
4125        'breezy.tests.test_trace',
4126        'breezy.tests.test_transactions',
4127        'breezy.tests.test_transform',
4128        'breezy.tests.test_transport',
4129        'breezy.tests.test_transport_log',
4130        'breezy.tests.test_tree',
4131        'breezy.tests.test_treebuilder',
4132        'breezy.tests.test_treeshape',
4133        'breezy.tests.test_tsort',
4134        'breezy.tests.test_tuned_gzip',
4135        'breezy.tests.test_ui',
4136        'breezy.tests.test_uncommit',
4137        'breezy.tests.test_upgrade',
4138        'breezy.tests.test_upgrade_stacked',
4139        'breezy.tests.test_upstream_import',
4140        'breezy.tests.test_urlutils',
4141        'breezy.tests.test_utextwrap',
4142        'breezy.tests.test_version',
4143        'breezy.tests.test_version_info',
4144        'breezy.tests.test_views',
4145        'breezy.tests.test_whitebox',
4146        'breezy.tests.test_win32utils',
4147        'breezy.tests.test_workspace',
4148        'breezy.tests.test_workingtree',
4149        'breezy.tests.test_wsgi',
4150        ]
4151
4152
4153def _test_suite_modules_to_doctest():
4154    """Return the list of modules to doctest."""
4155    if __doc__ is None:
4156        # GZ 2009-03-31: No docstrings with -OO so there's nothing to doctest
4157        return []
4158    return [
4159        'breezy',
4160        'breezy.branchbuilder',
4161        'breezy.bzr.inventory',
4162        'breezy.decorators',
4163        'breezy.iterablefile',
4164        'breezy.lockdir',
4165        'breezy.merge3',
4166        'breezy.option',
4167        'breezy.pyutils',
4168        'breezy.symbol_versioning',
4169        'breezy.tests',
4170        'breezy.tests.fixtures',
4171        'breezy.timestamp',
4172        'breezy.transport.http.urllib',
4173        'breezy.version_info_formats.format_custom',
4174        ]
4175
4176
4177def test_suite(keep_only=None, starting_with=None):
4178    """Build and return TestSuite for the whole of breezy.
4179
4180    :param keep_only: A list of test ids limiting the suite returned.
4181
4182    :param starting_with: An id limiting the suite returned to the tests
4183         starting with it.
4184
4185    This function can be replaced if you need to change the default test
4186    suite on a global basis, but it is not encouraged.
4187    """
4188
4189    loader = TestUtil.TestLoader()
4190
4191    if keep_only is not None:
4192        id_filter = TestIdList(keep_only)
4193    if starting_with:
4194        # We take precedence over keep_only because *at loading time* using
4195        # both options means we will load less tests for the same final result.
4196        def interesting_module(name):
4197            for start in starting_with:
4198                # Either the module name starts with the specified string
4199                # or it may contain tests starting with the specified string
4200                if name.startswith(start) or start.startswith(name):
4201                    return True
4202            return False
4203        loader = TestUtil.FilteredByModuleTestLoader(interesting_module)
4204
4205    elif keep_only is not None:
4206        loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
4207
4208        def interesting_module(name):
4209            return id_filter.refers_to(name)
4210
4211    else:
4212        loader = TestUtil.TestLoader()
4213
4214        def interesting_module(name):
4215            # No filtering, all modules are interesting
4216            return True
4217
4218    suite = loader.suiteClass()
4219
4220    # modules building their suite with loadTestsFromModuleNames
4221    suite.addTest(loader.loadTestsFromModuleNames(_test_suite_testmod_names()))
4222
4223    suite.addTest(loader.loadTestsFromModuleNames(['breezy.doc']))
4224
4225    for mod in _test_suite_modules_to_doctest():
4226        if not interesting_module(mod):
4227            # No tests to keep here, move along
4228            continue
4229        try:
4230            # note that this really does mean "report only" -- doctest
4231            # still runs the rest of the examples
4232            doc_suite = IsolatedDocTestSuite(
4233                mod, optionflags=doctest.REPORT_ONLY_FIRST_FAILURE)
4234        except ValueError as e:
4235            print('**failed to get doctest for: %s\n%s' % (mod, e))
4236            raise
4237        if len(doc_suite._tests) == 0:
4238            raise errors.BzrError("no doctests found in %s" % (mod,))
4239        suite.addTest(doc_suite)
4240
4241    default_encoding = sys.getdefaultencoding()
4242    for name, plugin in _mod_plugin.plugins().items():
4243        if not interesting_module(plugin.module.__name__):
4244            continue
4245        plugin_suite = plugin.test_suite()
4246        # We used to catch ImportError here and turn it into just a warning,
4247        # but really if you don't have --no-plugins this should be a failure.
4248        # mbp 20080213 - see http://bugs.launchpad.net/bugs/189771
4249        if plugin_suite is None:
4250            plugin_suite = plugin.load_plugin_tests(loader)
4251        if plugin_suite is not None:
4252            suite.addTest(plugin_suite)
4253        if default_encoding != sys.getdefaultencoding():
4254            trace.warning(
4255                'Plugin "%s" tried to reset default encoding to: %s', name,
4256                sys.getdefaultencoding())
4257            reload(sys)
4258            sys.setdefaultencoding(default_encoding)
4259
4260    if keep_only is not None:
4261        # Now that the referred modules have loaded their tests, keep only the
4262        # requested ones.
4263        suite = filter_suite_by_id_list(suite, id_filter)
4264        # Do some sanity checks on the id_list filtering
4265        not_found, duplicates = suite_matches_id_list(suite, keep_only)
4266        if starting_with:
4267            # The tester has used both keep_only and starting_with, so he is
4268            # already aware that some tests are excluded from the list, there
4269            # is no need to tell him which.
4270            pass
4271        else:
4272            # Some tests mentioned in the list are not in the test suite. The
4273            # list may be out of date, report to the tester.
4274            for id in not_found:
4275                trace.warning('"%s" not found in the test suite', id)
4276        for id in duplicates:
4277            trace.warning('"%s" is used as an id by several tests', id)
4278
4279    return suite
4280
4281
4282def multiply_scenarios(*scenarios):
4283    """Multiply two or more iterables of scenarios.
4284
4285    It is safe to pass scenario generators or iterators.
4286
4287    :returns: A list of compound scenarios: the cross-product of all
4288        scenarios, with the names concatenated and the parameters
4289        merged together.
4290    """
4291    return functools.reduce(_multiply_two_scenarios, map(list, scenarios))
4292
4293
4294def _multiply_two_scenarios(scenarios_left, scenarios_right):
4295    """Multiply two sets of scenarios.
4296
4297    :returns: the cartesian product of the two sets of scenarios, that is
4298        a scenario for every possible combination of a left scenario and a
4299        right scenario.
4300    """
4301    return [
4302        ('%s,%s' % (left_name, right_name),
4303         dict(left_dict, **right_dict))
4304        for left_name, left_dict in scenarios_left
4305        for right_name, right_dict in scenarios_right]
4306
4307
4308def multiply_tests(tests, scenarios, result):
4309    """Multiply tests_list by scenarios into result.
4310
4311    This is the core workhorse for test parameterisation.
4312
4313    Typically the load_tests() method for a per-implementation test suite will
4314    call multiply_tests and return the result.
4315
4316    :param tests: The tests to parameterise.
4317    :param scenarios: The scenarios to apply: pairs of (scenario_name,
4318        scenario_param_dict).
4319    :param result: A TestSuite to add created tests to.
4320
4321    This returns the passed in result TestSuite with the cross product of all
4322    the tests repeated once for each scenario.  Each test is adapted by adding
4323    the scenario name at the end of its id(), and updating the test object's
4324    __dict__ with the scenario_param_dict.
4325
4326    >>> import breezy.tests.test_sampler
4327    >>> r = multiply_tests(
4328    ...     breezy.tests.test_sampler.DemoTest('test_nothing'),
4329    ...     [('one', dict(param=1)),
4330    ...      ('two', dict(param=2))],
4331    ...     TestUtil.TestSuite())
4332    >>> tests = list(iter_suite_tests(r))
4333    >>> len(tests)
4334    2
4335    >>> tests[0].id()
4336    'breezy.tests.test_sampler.DemoTest.test_nothing(one)'
4337    >>> tests[0].param
4338    1
4339    >>> tests[1].param
4340    2
4341    """
4342    for test in iter_suite_tests(tests):
4343        apply_scenarios(test, scenarios, result)
4344    return result
4345
4346
4347def apply_scenarios(test, scenarios, result):
4348    """Apply the scenarios in scenarios to test and add to result.
4349
4350    :param test: The test to apply scenarios to.
4351    :param scenarios: An iterable of scenarios to apply to test.
4352    :return: result
4353    :seealso: apply_scenario
4354    """
4355    for scenario in scenarios:
4356        result.addTest(apply_scenario(test, scenario))
4357    return result
4358
4359
4360def apply_scenario(test, scenario):
4361    """Copy test and apply scenario to it.
4362
4363    :param test: A test to adapt.
4364    :param scenario: A tuple describing the scenario.
4365        The first element of the tuple is the new test id.
4366        The second element is a dict containing attributes to set on the
4367        test.
4368    :return: The adapted test.
4369    """
4370    new_id = "%s(%s)" % (test.id(), scenario[0])
4371    new_test = clone_test(test, new_id)
4372    for name, value in scenario[1].items():
4373        setattr(new_test, name, value)
4374    return new_test
4375
4376
4377def clone_test(test, new_id):
4378    """Clone a test giving it a new id.
4379
4380    :param test: The test to clone.
4381    :param new_id: The id to assign to it.
4382    :return: The new test.
4383    """
4384    new_test = copy.copy(test)
4385    new_test.id = lambda: new_id
4386    # XXX: Workaround <https://bugs.launchpad.net/testtools/+bug/637725>, which
4387    # causes cloned tests to share the 'details' dict.  This makes it hard to
4388    # read the test output for parameterized tests, because tracebacks will be
4389    # associated with irrelevant tests.
4390    try:
4391        details = new_test._TestCase__details
4392    except AttributeError:
4393        # must be a different version of testtools than expected.  Do nothing.
4394        pass
4395    else:
4396        # Reset the '__details' dict.
4397        new_test._TestCase__details = {}
4398    return new_test
4399
4400
4401
4402def permute_tests_for_extension(standard_tests, loader, py_module_name,
4403                                ext_module_name):
4404    """Helper for permutating tests against an extension module.
4405
4406    This is meant to be used inside a modules 'load_tests()' function. It will
4407    create 2 scenarios, and cause all tests in the 'standard_tests' to be run
4408    against both implementations. Setting 'test.module' to the appropriate
4409    module. See breezy.tests.test__chk_map.load_tests as an example.
4410
4411    :param standard_tests: A test suite to permute
4412    :param loader: A TestLoader
4413    :param py_module_name: The python path to a python module that can always
4414        be loaded, and will be considered the 'python' implementation. (eg
4415        'breezy._chk_map_py')
4416    :param ext_module_name: The python path to an extension module. If the
4417        module cannot be loaded, a single test will be added, which notes that
4418        the module is not available. If it can be loaded, all standard_tests
4419        will be run against that module.
4420    :return: (suite, feature) suite is a test-suite that has all the permuted
4421        tests. feature is the Feature object that can be used to determine if
4422        the module is available.
4423    """
4424
4425    from .features import ModuleAvailableFeature
4426    py_module = pyutils.get_named_object(py_module_name)
4427    scenarios = [
4428        ('python', {'module': py_module}),
4429    ]
4430    suite = loader.suiteClass()
4431    feature = ModuleAvailableFeature(ext_module_name)
4432    if feature.available():
4433        scenarios.append(('C', {'module': feature.module}))
4434    else:
4435        class FailWithoutFeature(TestCase):
4436            def id(self):
4437                return ext_module_name + '.' + super(FailWithoutFeature, self).id()
4438            def test_fail(self):
4439                self.requireFeature(feature)
4440        # the compiled module isn't available, so we add a failing test
4441        suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
4442    result = multiply_tests(standard_tests, scenarios, suite)
4443    return result, feature
4444
4445
4446def _rmtree_temp_dir(dirname, test_id=None):
4447    # If LANG=C we probably have created some bogus paths
4448    # which rmtree(unicode) will fail to delete
4449    # so make sure we are using rmtree(str) to delete everything
4450    # except on win32, where rmtree(str) will fail
4451    # since it doesn't have the property of byte-stream paths
4452    # (they are either ascii or mbcs)
4453    if sys.platform == 'win32' and isinstance(dirname, bytes):
4454        # make sure we are using the unicode win32 api
4455        dirname = dirname.decode('mbcs')
4456    else:
4457        dirname = dirname.encode(sys.getfilesystemencoding())
4458    try:
4459        osutils.rmtree(dirname)
4460    except OSError as e:
4461        # We don't want to fail here because some useful display will be lost
4462        # otherwise. Polluting the tmp dir is bad, but not giving all the
4463        # possible info to the test runner is even worse.
4464        if test_id is not None:
4465            ui.ui_factory.clear_term()
4466            sys.stderr.write('\nWhile running: %s\n' % (test_id,))
4467        # Ugly, but the last thing we want here is fail, so bear with it.
4468        printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
4469                                    ).encode('ascii', 'replace')
4470        sys.stderr.write('Unable to remove testing dir %s\n%s'
4471                         % (os.path.basename(dirname), printable_e))
4472
4473
4474def probe_unicode_in_user_encoding():
4475    """Try to encode several unicode strings to use in unicode-aware tests.
4476    Return first successfull match.
4477
4478    :return:  (unicode value, encoded plain string value) or (None, None)
4479    """
4480    possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
4481    for uni_val in possible_vals:
4482        try:
4483            str_val = uni_val.encode(osutils.get_user_encoding())
4484        except UnicodeEncodeError:
4485            # Try a different character
4486            pass
4487        else:
4488            return uni_val, str_val
4489    return None, None
4490
4491
4492def probe_bad_non_ascii(encoding):
4493    """Try to find [bad] character with code [128..255]
4494    that cannot be decoded to unicode in some encoding.
4495    Return None if all non-ascii characters is valid
4496    for given encoding.
4497    """
4498    for i in range(128, 256):
4499        char = bytes([i])
4500        try:
4501            char.decode(encoding)
4502        except UnicodeDecodeError:
4503            return char
4504    return None
4505
4506
4507# Only define SubUnitBzrRunner if subunit is available.
4508try:
4509    from subunit import TestProtocolClient
4510    from subunit.test_results import AutoTimingTestResultDecorator
4511
4512    class SubUnitBzrProtocolClientv1(TestProtocolClient):
4513
4514        def stopTest(self, test):
4515            super(SubUnitBzrProtocolClientv1, self).stopTest(test)
4516            _clear__type_equality_funcs(test)
4517
4518        def addSuccess(self, test, details=None):
4519            # The subunit client always includes the details in the subunit
4520            # stream, but we don't want to include it in ours.
4521            if details is not None and 'log' in details:
4522                del details['log']
4523            return super(SubUnitBzrProtocolClientv1, self).addSuccess(
4524                test, details)
4525
4526    class SubUnitBzrRunnerv1(TextTestRunner):
4527
4528        def run(self, test):
4529            result = AutoTimingTestResultDecorator(
4530                SubUnitBzrProtocolClientv1(self.stream))
4531            test.run(result)
4532            return result
4533except ImportError:
4534    pass
4535
4536
4537try:
4538    from subunit.run import SubunitTestRunner
4539
4540    class SubUnitBzrRunnerv2(TextTestRunner, SubunitTestRunner):
4541
4542        def __init__(self, stream=sys.stderr, descriptions=0, verbosity=1,
4543                     bench_history=None, strict=False, result_decorators=None):
4544            TextTestRunner.__init__(
4545                self, stream=stream,
4546                descriptions=descriptions, verbosity=verbosity,
4547                bench_history=bench_history, strict=strict,
4548                result_decorators=result_decorators)
4549            SubunitTestRunner.__init__(self, verbosity=verbosity,
4550                                       stream=stream)
4551
4552        run = SubunitTestRunner.run
4553except ImportError:
4554    pass
4555