1# Copyright (C) 2005-2013, 2016 Canonical Ltd
2#
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation; either version 2 of the License, or
6# (at your option) any later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11# GNU General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License
14# along with this program; if not, write to the Free Software
15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
17"""Tests for the test framework."""
18
19import gc
20import doctest
21from functools import reduce
22from io import BytesIO, StringIO, TextIOWrapper
23import os
24import signal
25import sys
26import threading
27import time
28import unittest
29import warnings
30
31from testtools import (
32    ExtendedToOriginalDecorator,
33    MultiTestResult,
34    )
35from testtools.content import Content
36from testtools.content_type import ContentType
37from testtools.matchers import (
38    DocTestMatches,
39    Equals,
40    )
41import testtools.testresult.doubles
42
43import breezy
44from .. import (
45    branchbuilder,
46    controldir,
47    errors,
48    hooks,
49    lockdir,
50    memorytree,
51    osutils,
52    repository,
53    symbol_versioning,
54    tests,
55    transport,
56    workingtree,
57    )
58from ..bzr import (
59    bzrdir,
60    remote,
61    workingtree_3,
62    workingtree_4,
63    )
64from ..bzr import (
65    groupcompress_repo,
66    )
67from ..git import (
68    workingtree as git_workingtree,
69    )
70from ..symbol_versioning import (
71    deprecated_function,
72    deprecated_in,
73    deprecated_method,
74    )
75from . import (
76    features,
77    test_lsprof,
78    test_server,
79    TestUtil,
80    )
81from ..trace import note, mutter
82from ..transport import memory
83
84
85def _test_ids(test_suite):
86    """Get the ids for the tests in a test suite."""
87    return [t.id() for t in tests.iter_suite_tests(test_suite)]
88
89
90class MetaTestLog(tests.TestCase):
91
92    def test_logging(self):
93        """Test logs are captured when a test fails."""
94        self.log('a test message')
95        details = self.getDetails()
96        log = details['log']
97        self.assertThat(log.content_type, Equals(ContentType(
98            "text", "plain", {"charset": "utf8"})))
99        self.assertThat(u"".join(log.iter_text()), Equals(self.get_log()))
100        self.assertThat(self.get_log(),
101                        DocTestMatches(u"...a test message\n", doctest.ELLIPSIS))
102
103
104class TestTreeShape(tests.TestCaseInTempDir):
105
106    def test_unicode_paths(self):
107        self.requireFeature(features.UnicodeFilenameFeature)
108
109        filename = u'hell\u00d8'
110        self.build_tree_contents([(filename, b'contents of hello')])
111        self.assertPathExists(filename)
112
113
114class TestClassesAvailable(tests.TestCase):
115    """As a convenience we expose Test* classes from breezy.tests"""
116
117    def test_test_case(self):
118        from . import TestCase
119
120    def test_test_loader(self):
121        from . import TestLoader
122
123    def test_test_suite(self):
124        from . import TestSuite
125
126
127class TestTransportScenarios(tests.TestCase):
128    """A group of tests that test the transport implementation adaption core.
129
130    This is a meta test that the tests are applied to all available
131    transports.
132
133    This will be generalised in the future which is why it is in this
134    test file even though it is specific to transport tests at the moment.
135    """
136
137    def test_get_transport_permutations(self):
138        # this checks that get_test_permutations defined by the module is
139        # called by the get_transport_test_permutations function.
140        class MockModule(object):
141            def get_test_permutations(self):
142                return sample_permutation
143        sample_permutation = [(1, 2), (3, 4)]
144        from .per_transport import get_transport_test_permutations
145        self.assertEqual(sample_permutation,
146                         get_transport_test_permutations(MockModule()))
147
148    def test_scenarios_include_all_modules(self):
149        # this checks that the scenario generator returns as many permutations
150        # as there are in all the registered transport modules - we assume if
151        # this matches its probably doing the right thing especially in
152        # combination with the tests for setting the right classes below.
153        from .per_transport import transport_test_permutations
154        from ..transport import _get_transport_modules
155        modules = _get_transport_modules()
156        permutation_count = 0
157        for module in modules:
158            try:
159                permutation_count += len(reduce(getattr,
160                                                (module
161                                                 + ".get_test_permutations").split('.')[1:],
162                                                __import__(module))())
163            except errors.DependencyNotPresent:
164                pass
165        scenarios = transport_test_permutations()
166        self.assertEqual(permutation_count, len(scenarios))
167
168    def test_scenarios_include_transport_class(self):
169        # This test used to know about all the possible transports and the
170        # order they were returned but that seems overly brittle (mbp
171        # 20060307)
172        from .per_transport import transport_test_permutations
173        scenarios = transport_test_permutations()
174        # there are at least that many builtin transports
175        self.assertTrue(len(scenarios) > 6)
176        one_scenario = scenarios[0]
177        self.assertIsInstance(one_scenario[0], str)
178        self.assertTrue(issubclass(one_scenario[1]["transport_class"],
179                                   breezy.transport.Transport))
180        self.assertTrue(issubclass(one_scenario[1]["transport_server"],
181                                   breezy.transport.Server))
182
183
184class TestBranchScenarios(tests.TestCase):
185
186    def test_scenarios(self):
187        # check that constructor parameters are passed through to the adapted
188        # test.
189        from .per_branch import make_scenarios
190        server1 = "a"
191        server2 = "b"
192        formats = [("c", "C"), ("d", "D")]
193        scenarios = make_scenarios(server1, server2, formats)
194        self.assertEqual(2, len(scenarios))
195        self.assertEqual([
196            ('str',
197             {'branch_format': 'c',
198              'bzrdir_format': 'C',
199              'transport_readonly_server': 'b',
200              'transport_server': 'a'}),
201            ('str',
202             {'branch_format': 'd',
203              'bzrdir_format': 'D',
204              'transport_readonly_server': 'b',
205              'transport_server': 'a'})],
206            scenarios)
207
208
209class TestBzrDirScenarios(tests.TestCase):
210
211    def test_scenarios(self):
212        # check that constructor parameters are passed through to the adapted
213        # test.
214        from .per_controldir import make_scenarios
215        vfs_factory = "v"
216        server1 = "a"
217        server2 = "b"
218        formats = ["c", "d"]
219        scenarios = make_scenarios(vfs_factory, server1, server2, formats)
220        self.assertEqual([
221            ('str',
222             {'bzrdir_format': 'c',
223              'transport_readonly_server': 'b',
224              'transport_server': 'a',
225              'vfs_transport_factory': 'v'}),
226            ('str',
227             {'bzrdir_format': 'd',
228              'transport_readonly_server': 'b',
229              'transport_server': 'a',
230              'vfs_transport_factory': 'v'})],
231            scenarios)
232
233
234class TestRepositoryScenarios(tests.TestCase):
235
236    def test_formats_to_scenarios(self):
237        from .per_repository import formats_to_scenarios
238        formats = [("(c)", remote.RemoteRepositoryFormat()),
239                   ("(d)", repository.format_registry.get(
240                    b'Bazaar repository format 2a (needs bzr 1.16 or later)\n'))]
241        no_vfs_scenarios = formats_to_scenarios(formats, "server", "readonly",
242                                                None)
243        vfs_scenarios = formats_to_scenarios(formats, "server", "readonly",
244                                             vfs_transport_factory="vfs")
245        # no_vfs generate scenarios without vfs_transport_factory
246        expected = [
247            ('RemoteRepositoryFormat(c)',
248             {'bzrdir_format': remote.RemoteBzrDirFormat(),
249              'repository_format': remote.RemoteRepositoryFormat(),
250              'transport_readonly_server': 'readonly',
251              'transport_server': 'server'}),
252            ('RepositoryFormat2a(d)',
253             {'bzrdir_format': bzrdir.BzrDirMetaFormat1(),
254              'repository_format': groupcompress_repo.RepositoryFormat2a(),
255              'transport_readonly_server': 'readonly',
256              'transport_server': 'server'})]
257        self.assertEqual(expected, no_vfs_scenarios)
258        self.assertEqual([
259            ('RemoteRepositoryFormat(c)',
260             {'bzrdir_format': remote.RemoteBzrDirFormat(),
261              'repository_format': remote.RemoteRepositoryFormat(),
262              'transport_readonly_server': 'readonly',
263              'transport_server': 'server',
264              'vfs_transport_factory': 'vfs'}),
265            ('RepositoryFormat2a(d)',
266             {'bzrdir_format': bzrdir.BzrDirMetaFormat1(),
267              'repository_format': groupcompress_repo.RepositoryFormat2a(),
268              'transport_readonly_server': 'readonly',
269              'transport_server': 'server',
270              'vfs_transport_factory': 'vfs'})],
271            vfs_scenarios)
272
273
274class TestTestScenarioApplication(tests.TestCase):
275    """Tests for the test adaption facilities."""
276
277    def test_apply_scenario(self):
278        from breezy.tests import apply_scenario
279        input_test = TestTestScenarioApplication("test_apply_scenario")
280        # setup two adapted tests
281        adapted_test1 = apply_scenario(input_test,
282                                       ("new id",
283                                        {"bzrdir_format": "bzr_format",
284                                         "repository_format": "repo_fmt",
285                                         "transport_server": "transport_server",
286                                         "transport_readonly_server": "readonly-server"}))
287        adapted_test2 = apply_scenario(input_test,
288                                       ("new id 2", {"bzrdir_format": None}))
289        # input_test should have been altered.
290        self.assertRaises(AttributeError, getattr, input_test, "bzrdir_format")
291        # the new tests are mutually incompatible, ensuring it has
292        # made new ones, and unspecified elements in the scenario
293        # should not have been altered.
294        self.assertEqual("bzr_format", adapted_test1.bzrdir_format)
295        self.assertEqual("repo_fmt", adapted_test1.repository_format)
296        self.assertEqual("transport_server", adapted_test1.transport_server)
297        self.assertEqual("readonly-server",
298                         adapted_test1.transport_readonly_server)
299        self.assertEqual(
300            "breezy.tests.test_selftest.TestTestScenarioApplication."
301            "test_apply_scenario(new id)",
302            adapted_test1.id())
303        self.assertEqual(None, adapted_test2.bzrdir_format)
304        self.assertEqual(
305            "breezy.tests.test_selftest.TestTestScenarioApplication."
306            "test_apply_scenario(new id 2)",
307            adapted_test2.id())
308
309
310class TestInterRepositoryScenarios(tests.TestCase):
311
312    def test_scenarios(self):
313        # check that constructor parameters are passed through to the adapted
314        # test.
315        from .per_interrepository import make_scenarios
316        server1 = "a"
317        server2 = "b"
318        formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")]
319        scenarios = make_scenarios(server1, server2, formats)
320        self.assertEqual([
321            ('C0,str,str',
322             {'repository_format': 'C1',
323              'repository_format_to': 'C2',
324              'transport_readonly_server': 'b',
325              'transport_server': 'a',
326              'extra_setup': 'C3'}),
327            ('D0,str,str',
328             {'repository_format': 'D1',
329              'repository_format_to': 'D2',
330              'transport_readonly_server': 'b',
331              'transport_server': 'a',
332              'extra_setup': 'D3'})],
333            scenarios)
334
335
336class TestWorkingTreeScenarios(tests.TestCase):
337
338    def test_scenarios(self):
339        # check that constructor parameters are passed through to the adapted
340        # test.
341        from .per_workingtree import make_scenarios
342        server1 = "a"
343        server2 = "b"
344        formats = [workingtree_4.WorkingTreeFormat4(),
345                   workingtree_3.WorkingTreeFormat3(),
346                   workingtree_4.WorkingTreeFormat6()]
347        scenarios = make_scenarios(server1, server2, formats,
348                                   remote_server='c', remote_readonly_server='d',
349                                   remote_backing_server='e')
350        self.assertEqual([
351            ('WorkingTreeFormat4',
352             {'bzrdir_format': formats[0]._matchingcontroldir,
353              'transport_readonly_server': 'b',
354              'transport_server': 'a',
355              'workingtree_format': formats[0]}),
356            ('WorkingTreeFormat3',
357             {'bzrdir_format': formats[1]._matchingcontroldir,
358              'transport_readonly_server': 'b',
359              'transport_server': 'a',
360              'workingtree_format': formats[1]}),
361            ('WorkingTreeFormat6',
362             {'bzrdir_format': formats[2]._matchingcontroldir,
363              'transport_readonly_server': 'b',
364              'transport_server': 'a',
365              'workingtree_format': formats[2]}),
366            ('WorkingTreeFormat6,remote',
367             {'bzrdir_format': formats[2]._matchingcontroldir,
368              'repo_is_remote': True,
369              'transport_readonly_server': 'd',
370              'transport_server': 'c',
371              'vfs_transport_factory': 'e',
372              'workingtree_format': formats[2]}),
373            ], scenarios)
374
375
376class TestTreeScenarios(tests.TestCase):
377
378    def test_scenarios(self):
379        # the tree implementation scenario generator is meant to setup one
380        # instance for each working tree format, one additional instance
381        # that will use the default wt format, but create a revision tree for
382        # the tests, and one more that uses the default wt format as a
383        # lightweight checkout of a remote repository.  This means that the wt
384        # ones should have the workingtree_to_test_tree attribute set to
385        # 'return_parameter' and the revision one set to
386        # revision_tree_from_workingtree.
387
388        from .per_tree import (
389            _dirstate_tree_from_workingtree,
390            make_scenarios,
391            preview_tree_pre,
392            preview_tree_post,
393            return_parameter,
394            revision_tree_from_workingtree
395            )
396        server1 = "a"
397        server2 = "b"
398        smart_server = test_server.SmartTCPServer_for_testing
399        smart_readonly_server = test_server.ReadonlySmartTCPServer_for_testing
400        mem_server = memory.MemoryServer
401        formats = [workingtree_4.WorkingTreeFormat4(),
402                   workingtree_3.WorkingTreeFormat3(), ]
403        scenarios = make_scenarios(server1, server2, formats)
404        self.assertEqual(9, len(scenarios))
405        default_wt_format = workingtree.format_registry.get_default()
406        wt4_format = workingtree_4.WorkingTreeFormat4()
407        wt5_format = workingtree_4.WorkingTreeFormat5()
408        wt6_format = workingtree_4.WorkingTreeFormat6()
409        git_wt_format = git_workingtree.GitWorkingTreeFormat()
410        expected_scenarios = [
411            ('WorkingTreeFormat4',
412             {'bzrdir_format': formats[0]._matchingcontroldir,
413              'transport_readonly_server': 'b',
414              'transport_server': 'a',
415              'workingtree_format': formats[0],
416              '_workingtree_to_test_tree': return_parameter,
417              }),
418            ('WorkingTreeFormat3',
419             {'bzrdir_format': formats[1]._matchingcontroldir,
420              'transport_readonly_server': 'b',
421              'transport_server': 'a',
422              'workingtree_format': formats[1],
423              '_workingtree_to_test_tree': return_parameter,
424              }),
425            ('WorkingTreeFormat6,remote',
426             {'bzrdir_format': wt6_format._matchingcontroldir,
427              'repo_is_remote': True,
428              'transport_readonly_server': smart_readonly_server,
429              'transport_server': smart_server,
430              'vfs_transport_factory': mem_server,
431              'workingtree_format': wt6_format,
432              '_workingtree_to_test_tree': return_parameter,
433              }),
434            ('RevisionTree',
435             {'_workingtree_to_test_tree': revision_tree_from_workingtree,
436              'bzrdir_format': default_wt_format._matchingcontroldir,
437              'transport_readonly_server': 'b',
438              'transport_server': 'a',
439              'workingtree_format': default_wt_format,
440              }),
441            ('GitRevisionTree',
442             {'_workingtree_to_test_tree': revision_tree_from_workingtree,
443              'bzrdir_format': git_wt_format._matchingcontroldir,
444              'transport_readonly_server': 'b',
445              'transport_server': 'a',
446              'workingtree_format': git_wt_format,
447              }
448             ),
449            ('DirStateRevisionTree,WT4',
450             {'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
451              'bzrdir_format': wt4_format._matchingcontroldir,
452              'transport_readonly_server': 'b',
453              'transport_server': 'a',
454              'workingtree_format': wt4_format,
455              }),
456            ('DirStateRevisionTree,WT5',
457             {'_workingtree_to_test_tree': _dirstate_tree_from_workingtree,
458              'bzrdir_format': wt5_format._matchingcontroldir,
459              'transport_readonly_server': 'b',
460              'transport_server': 'a',
461              'workingtree_format': wt5_format,
462              }),
463            ('PreviewTree',
464             {'_workingtree_to_test_tree': preview_tree_pre,
465              'bzrdir_format': default_wt_format._matchingcontroldir,
466              'transport_readonly_server': 'b',
467              'transport_server': 'a',
468              'workingtree_format': default_wt_format}),
469            ('PreviewTreePost',
470             {'_workingtree_to_test_tree': preview_tree_post,
471              'bzrdir_format': default_wt_format._matchingcontroldir,
472              'transport_readonly_server': 'b',
473              'transport_server': 'a',
474              'workingtree_format': default_wt_format}),
475            ]
476        self.assertEqual(expected_scenarios, scenarios)
477
478
479class TestInterTreeScenarios(tests.TestCase):
480    """A group of tests that test the InterTreeTestAdapter."""
481
482    def test_scenarios(self):
483        # check that constructor parameters are passed through to the adapted
484        # test.
485        # for InterTree tests we want the machinery to bring up two trees in
486        # each instance: the base one, and the one we are interacting with.
487        # because each optimiser can be direction specific, we need to test
488        # each optimiser in its chosen direction.
489        # unlike the TestProviderAdapter we dont want to automatically add a
490        # parameterized one for WorkingTree - the optimisers will tell us what
491        # ones to add.
492        from .per_tree import (
493            return_parameter,
494            )
495        from .per_intertree import (
496            make_scenarios,
497            )
498        from ..bzr.workingtree_3 import WorkingTreeFormat3
499        from ..bzr.workingtree_4 import WorkingTreeFormat4
500        input_test = TestInterTreeScenarios(
501            "test_scenarios")
502        server1 = "a"
503        server2 = "b"
504        format1 = WorkingTreeFormat4()
505        format2 = WorkingTreeFormat3()
506        formats = [("1", str, format1, format2, "converter1"),
507                   ("2", int, format2, format1, "converter2")]
508        scenarios = make_scenarios(server1, server2, formats)
509        self.assertEqual(2, len(scenarios))
510        expected_scenarios = [
511            ("1", {
512                "bzrdir_format": format1._matchingcontroldir,
513                "intertree_class": formats[0][1],
514                "workingtree_format": formats[0][2],
515                "workingtree_format_to": formats[0][3],
516                "mutable_trees_to_test_trees": formats[0][4],
517                "_workingtree_to_test_tree": return_parameter,
518                "transport_server": server1,
519                "transport_readonly_server": server2,
520                }),
521            ("2", {
522                "bzrdir_format": format2._matchingcontroldir,
523                "intertree_class": formats[1][1],
524                "workingtree_format": formats[1][2],
525                "workingtree_format_to": formats[1][3],
526                "mutable_trees_to_test_trees": formats[1][4],
527                "_workingtree_to_test_tree": return_parameter,
528                "transport_server": server1,
529                "transport_readonly_server": server2,
530                }),
531            ]
532        self.assertEqual(scenarios, expected_scenarios)
533
534
535class TestTestCaseInTempDir(tests.TestCaseInTempDir):
536
537    def test_home_is_not_working(self):
538        self.assertNotEqual(self.test_dir, self.test_home_dir)
539        cwd = osutils.getcwd()
540        self.assertIsSameRealPath(self.test_dir, cwd)
541        self.assertIsSameRealPath(self.test_home_dir, os.environ['HOME'])
542
543    def test_assertEqualStat_equal(self):
544        from ..bzr.tests.test_dirstate import _FakeStat
545        self.build_tree(["foo"])
546        real = os.lstat("foo")
547        fake = _FakeStat(real.st_size, real.st_mtime, real.st_ctime,
548                         real.st_dev, real.st_ino, real.st_mode)
549        self.assertEqualStat(real, fake)
550
551    def test_assertEqualStat_notequal(self):
552        self.build_tree(["foo", "longname"])
553        self.assertRaises(AssertionError, self.assertEqualStat,
554                          os.lstat("foo"), os.lstat("longname"))
555
556    def test_assertPathExists(self):
557        self.assertPathExists('.')
558        self.build_tree(['foo/', 'foo/bar'])
559        self.assertPathExists('foo/bar')
560        self.assertPathDoesNotExist('foo/foo')
561
562
563class TestTestCaseWithMemoryTransport(tests.TestCaseWithMemoryTransport):
564
565    def test_home_is_non_existant_dir_under_root(self):
566        """The test_home_dir for TestCaseWithMemoryTransport is missing.
567
568        This is because TestCaseWithMemoryTransport is for tests that do not
569        need any disk resources: they should be hooked into breezy in such a
570        way that no global settings are being changed by the test (only a
571        few tests should need to do that), and having a missing dir as home is
572        an effective way to ensure that this is the case.
573        """
574        self.assertIsSameRealPath(
575            self.TEST_ROOT + "/MemoryTransportMissingHomeDir",
576            self.test_home_dir)
577        self.assertIsSameRealPath(self.test_home_dir, os.environ['HOME'])
578
579    def test_cwd_is_TEST_ROOT(self):
580        self.assertIsSameRealPath(self.test_dir, self.TEST_ROOT)
581        cwd = osutils.getcwd()
582        self.assertIsSameRealPath(self.test_dir, cwd)
583
584    def test_BRZ_HOME_and_HOME_are_bytestrings(self):
585        """The $BRZ_HOME and $HOME environment variables should not be unicode.
586
587        See https://bugs.launchpad.net/bzr/+bug/464174
588        """
589        self.assertIsInstance(os.environ['BRZ_HOME'], str)
590        self.assertIsInstance(os.environ['HOME'], str)
591
592    def test_make_branch_and_memory_tree(self):
593        """In TestCaseWithMemoryTransport we should not make the branch on disk.
594
595        This is hard to comprehensively robustly test, so we settle for making
596        a branch and checking no directory was created at its relpath.
597        """
598        tree = self.make_branch_and_memory_tree('dir')
599        # Guard against regression into MemoryTransport leaking
600        # files to disk instead of keeping them in memory.
601        self.assertFalse(osutils.lexists('dir'))
602        self.assertIsInstance(tree, memorytree.MemoryTree)
603
604    def test_make_branch_and_memory_tree_with_format(self):
605        """make_branch_and_memory_tree should accept a format option."""
606        format = bzrdir.BzrDirMetaFormat1()
607        format.repository_format = repository.format_registry.get_default()
608        tree = self.make_branch_and_memory_tree('dir', format=format)
609        # Guard against regression into MemoryTransport leaking
610        # files to disk instead of keeping them in memory.
611        self.assertFalse(osutils.lexists('dir'))
612        self.assertIsInstance(tree, memorytree.MemoryTree)
613        self.assertEqual(format.repository_format.__class__,
614                         tree.branch.repository._format.__class__)
615
616    def test_make_branch_builder(self):
617        builder = self.make_branch_builder('dir')
618        self.assertIsInstance(builder, branchbuilder.BranchBuilder)
619        # Guard against regression into MemoryTransport leaking
620        # files to disk instead of keeping them in memory.
621        self.assertFalse(osutils.lexists('dir'))
622
623    def test_make_branch_builder_with_format(self):
624        # Use a repo layout that doesn't conform to a 'named' layout, to ensure
625        # that the format objects are used.
626        format = bzrdir.BzrDirMetaFormat1()
627        repo_format = repository.format_registry.get_default()
628        format.repository_format = repo_format
629        builder = self.make_branch_builder('dir', format=format)
630        the_branch = builder.get_branch()
631        # Guard against regression into MemoryTransport leaking
632        # files to disk instead of keeping them in memory.
633        self.assertFalse(osutils.lexists('dir'))
634        self.assertEqual(format.repository_format.__class__,
635                         the_branch.repository._format.__class__)
636        self.assertEqual(repo_format.get_format_string(),
637                         self.get_transport().get_bytes(
638            'dir/.bzr/repository/format'))
639
640    def test_make_branch_builder_with_format_name(self):
641        builder = self.make_branch_builder('dir', format='knit')
642        the_branch = builder.get_branch()
643        # Guard against regression into MemoryTransport leaking
644        # files to disk instead of keeping them in memory.
645        self.assertFalse(osutils.lexists('dir'))
646        dir_format = controldir.format_registry.make_controldir('knit')
647        self.assertEqual(dir_format.repository_format.__class__,
648                         the_branch.repository._format.__class__)
649        self.assertEqual(b'Bazaar-NG Knit Repository Format 1',
650                         self.get_transport().get_bytes(
651                             'dir/.bzr/repository/format'))
652
653    def test_dangling_locks_cause_failures(self):
654        class TestDanglingLock(tests.TestCaseWithMemoryTransport):
655            def test_function(self):
656                t = self.get_transport_from_path('.')
657                l = lockdir.LockDir(t, 'lock')
658                l.create()
659                l.attempt_lock()
660        test = TestDanglingLock('test_function')
661        result = test.run()
662        total_failures = result.errors + result.failures
663        if self._lock_check_thorough:
664            self.assertEqual(1, len(total_failures))
665        else:
666            # When _lock_check_thorough is disabled, then we don't trigger a
667            # failure
668            self.assertEqual(0, len(total_failures))
669
670
671class TestTestCaseWithTransport(tests.TestCaseWithTransport):
672    """Tests for the convenience functions TestCaseWithTransport introduces."""
673
674    def test_get_readonly_url_none(self):
675        from ..transport.readonly import ReadonlyTransportDecorator
676        self.vfs_transport_factory = memory.MemoryServer
677        self.transport_readonly_server = None
678        # calling get_readonly_transport() constructs a decorator on the url
679        # for the server
680        url = self.get_readonly_url()
681        url2 = self.get_readonly_url('foo/bar')
682        t = transport.get_transport_from_url(url)
683        t2 = transport.get_transport_from_url(url2)
684        self.assertIsInstance(t, ReadonlyTransportDecorator)
685        self.assertIsInstance(t2, ReadonlyTransportDecorator)
686        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
687
688    def test_get_readonly_url_http(self):
689        from .http_server import HttpServer
690        from ..transport.http.urllib import HttpTransport
691        self.transport_server = test_server.LocalURLServer
692        self.transport_readonly_server = HttpServer
693        # calling get_readonly_transport() gives us a HTTP server instance.
694        url = self.get_readonly_url()
695        url2 = self.get_readonly_url('foo/bar')
696        # the transport returned may be any HttpTransportBase subclass
697        t = transport.get_transport_from_url(url)
698        t2 = transport.get_transport_from_url(url2)
699        self.assertIsInstance(t, HttpTransport)
700        self.assertIsInstance(t2, HttpTransport)
701        self.assertEqual(t2.base[:-1], t.abspath('foo/bar'))
702
703    def test_is_directory(self):
704        """Test assertIsDirectory assertion"""
705        t = self.get_transport()
706        self.build_tree(['a_dir/', 'a_file'], transport=t)
707        self.assertIsDirectory('a_dir', t)
708        self.assertRaises(AssertionError, self.assertIsDirectory, 'a_file', t)
709        self.assertRaises(
710            AssertionError, self.assertIsDirectory, 'not_here', t)
711
712    def test_make_branch_builder(self):
713        builder = self.make_branch_builder('dir')
714        rev_id = builder.build_commit()
715        self.assertPathExists('dir')
716        a_dir = controldir.ControlDir.open('dir')
717        self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree)
718        a_branch = a_dir.open_branch()
719        builder_branch = builder.get_branch()
720        self.assertEqual(a_branch.base, builder_branch.base)
721        self.assertEqual((1, rev_id), builder_branch.last_revision_info())
722        self.assertEqual((1, rev_id), a_branch.last_revision_info())
723
724
725class TestTestCaseTransports(tests.TestCaseWithTransport):
726
727    def setUp(self):
728        super(TestTestCaseTransports, self).setUp()
729        self.vfs_transport_factory = memory.MemoryServer
730
731    def test_make_controldir_preserves_transport(self):
732        t = self.get_transport()
733        result_bzrdir = self.make_controldir('subdir')
734        self.assertIsInstance(result_bzrdir.transport,
735                              memory.MemoryTransport)
736        # should not be on disk, should only be in memory
737        self.assertPathDoesNotExist('subdir')
738
739
740class TestChrootedTest(tests.ChrootedTestCase):
741
742    def test_root_is_root(self):
743        t = transport.get_transport_from_url(self.get_readonly_url())
744        url = t.base
745        self.assertEqual(url, t.clone('..').base)
746
747
748class TestProfileResult(tests.TestCase):
749
750    def test_profiles_tests(self):
751        self.requireFeature(features.lsprof_feature)
752        terminal = testtools.testresult.doubles.ExtendedTestResult()
753        result = tests.ProfileResult(terminal)
754
755        class Sample(tests.TestCase):
756            def a(self):
757                self.sample_function()
758
759            def sample_function(self):
760                pass
761        test = Sample("a")
762        test.run(result)
763        case = terminal._events[0][1]
764        self.assertLength(1, case._benchcalls)
765        # We must be able to unpack it as the test reporting code wants
766        (_, _, _), stats = case._benchcalls[0]
767        self.assertTrue(callable(stats.pprint))
768
769
770class TestTestResult(tests.TestCase):
771
772    def check_timing(self, test_case, expected_re):
773        result = tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
774        capture = testtools.testresult.doubles.ExtendedTestResult()
775        test_case.run(MultiTestResult(result, capture))
776        run_case = capture._events[0][1]
777        timed_string = result._testTimeString(run_case)
778        self.assertContainsRe(timed_string, expected_re)
779
780    def test_test_reporting(self):
781        class ShortDelayTestCase(tests.TestCase):
782            def test_short_delay(self):
783                time.sleep(0.003)
784
785            def test_short_benchmark(self):
786                self.time(time.sleep, 0.003)
787        self.check_timing(ShortDelayTestCase('test_short_delay'),
788                          r"^ +[0-9]+ms$")
789        # if a benchmark time is given, we now show just that time followed by
790        # a star
791        self.check_timing(ShortDelayTestCase('test_short_benchmark'),
792                          r"^ +[0-9]+ms\*$")
793
794    def test_unittest_reporting_unittest_class(self):
795        # getting the time from a non-breezy test works ok
796        class ShortDelayTestCase(unittest.TestCase):
797            def test_short_delay(self):
798                time.sleep(0.003)
799        self.check_timing(ShortDelayTestCase('test_short_delay'),
800                          r"^ +[0-9]+ms$")
801
802    def _time_hello_world_encoding(self):
803        """Profile two sleep calls
804
805        This is used to exercise the test framework.
806        """
807        self.time(str, b'hello', errors='replace')
808        self.time(str, b'world', errors='replace')
809
810    def test_lsprofiling(self):
811        """Verbose test result prints lsprof statistics from test cases."""
812        self.requireFeature(features.lsprof_feature)
813        result_stream = StringIO()
814        result = breezy.tests.VerboseTestResult(
815            result_stream,
816            descriptions=0,
817            verbosity=2,
818            )
819        # we want profile a call of some sort and check it is output by
820        # addSuccess. We dont care about addError or addFailure as they
821        # are not that interesting for performance tuning.
822        # make a new test instance that when run will generate a profile
823        example_test_case = TestTestResult("_time_hello_world_encoding")
824        example_test_case._gather_lsprof_in_benchmarks = True
825        # execute the test, which should succeed and record profiles
826        example_test_case.run(result)
827        # lsprofile_something()
828        # if this worked we want
829        # LSProf output for <built in function unicode> (['hello'], {'errors': 'replace'})
830        #    CallCount    Recursive    Total(ms)   Inline(ms) module:lineno(function)
831        # (the lsprof header)
832        # ... an arbitrary number of lines
833        # and the function call which is time.sleep.
834        #           1        0            ???         ???       ???(sleep)
835        # and then repeated but with 'world', rather than 'hello'.
836        # this should appear in the output stream of our test result.
837        output = result_stream.getvalue()
838        self.assertContainsRe(output,
839                              r"LSProf output for <class 'str'>\(\(b'hello',\), {'errors': 'replace'}\)")
840        self.assertContainsRe(output,
841                              r"LSProf output for <class 'str'>\(\(b'world',\), {'errors': 'replace'}\)")
842        self.assertContainsRe(output,
843                              r" *CallCount *Recursive *Total\(ms\) *Inline\(ms\) *module:lineno\(function\)\n")
844        self.assertContainsRe(output,
845                              r"( +1 +0 +0\.\d+ +0\.\d+ +<method 'disable' of '_lsprof\.Profiler' objects>\n)?")
846
847    def test_uses_time_from_testtools(self):
848        """Test case timings in verbose results should use testtools times"""
849        import datetime
850
851        class TimeAddedVerboseTestResult(tests.VerboseTestResult):
852            def startTest(self, test):
853                self.time(datetime.datetime.utcfromtimestamp(1.145))
854                super(TimeAddedVerboseTestResult, self).startTest(test)
855
856            def addSuccess(self, test):
857                self.time(datetime.datetime.utcfromtimestamp(51.147))
858                super(TimeAddedVerboseTestResult, self).addSuccess(test)
859
860            def report_tests_starting(self): pass
861        sio = StringIO()
862        self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2))
863        self.assertEndsWith(sio.getvalue(), "OK    50002ms\n")
864
865    def test_known_failure(self):
866        """Using knownFailure should trigger several result actions."""
867        class InstrumentedTestResult(tests.ExtendedTestResult):
868            def stopTestRun(self): pass
869
870            def report_tests_starting(self): pass
871
872            def report_known_failure(self, test, err=None, details=None):
873                self._call = test, 'known failure'
874        result = InstrumentedTestResult(None, None, None, None)
875
876        class Test(tests.TestCase):
877            def test_function(self):
878                self.knownFailure('failed!')
879        test = Test("test_function")
880        test.run(result)
881        # it should invoke 'report_known_failure'.
882        self.assertEqual(2, len(result._call))
883        self.assertEqual(test.id(), result._call[0].id())
884        self.assertEqual('known failure', result._call[1])
885        # we dont introspec the traceback, if the rest is ok, it would be
886        # exceptional for it not to be.
887        # it should update the known_failure_count on the object.
888        self.assertEqual(1, result.known_failure_count)
889        # the result should be successful.
890        self.assertTrue(result.wasSuccessful())
891
892    def test_verbose_report_known_failure(self):
893        # verbose test output formatting
894        result_stream = StringIO()
895        result = breezy.tests.VerboseTestResult(
896            result_stream,
897            descriptions=0,
898            verbosity=2,
899            )
900        _get_test("test_xfail").run(result)
901        self.assertContainsRe(result_stream.getvalue(),
902                              "\n\\S+\\.test_xfail\\s+XFAIL\\s+\\d+ms\n"
903                              "\\s*(?:Text attachment: )?reason"
904                              "(?:\n-+\n|: {{{)"
905                              "this_fails"
906                              "(?:\n-+\n|}}}\n)")
907
908    def get_passing_test(self):
909        """Return a test object that can't be run usefully."""
910        def passing_test():
911            pass
912        return unittest.FunctionTestCase(passing_test)
913
914    def test_add_not_supported(self):
915        """Test the behaviour of invoking addNotSupported."""
916        class InstrumentedTestResult(tests.ExtendedTestResult):
917            def stopTestRun(self): pass
918
919            def report_tests_starting(self): pass
920
921            def report_unsupported(self, test, feature):
922                self._call = test, feature
923        result = InstrumentedTestResult(None, None, None, None)
924        test = SampleTestCase('_test_pass')
925        feature = features.Feature()
926        result.startTest(test)
927        result.addNotSupported(test, feature)
928        # it should invoke 'report_unsupported'.
929        self.assertEqual(2, len(result._call))
930        self.assertEqual(test, result._call[0])
931        self.assertEqual(feature, result._call[1])
932        # the result should be successful.
933        self.assertTrue(result.wasSuccessful())
934        # it should record the test against a count of tests not run due to
935        # this feature.
936        self.assertEqual(1, result.unsupported['Feature'])
937        # and invoking it again should increment that counter
938        result.addNotSupported(test, feature)
939        self.assertEqual(2, result.unsupported['Feature'])
940
941    def test_verbose_report_unsupported(self):
942        # verbose test output formatting
943        result_stream = StringIO()
944        result = breezy.tests.VerboseTestResult(
945            result_stream,
946            descriptions=0,
947            verbosity=2,
948            )
949        test = self.get_passing_test()
950        feature = features.Feature()
951        result.startTest(test)
952        prefix = len(result_stream.getvalue())
953        result.report_unsupported(test, feature)
954        output = result_stream.getvalue()[prefix:]
955        lines = output.splitlines()
956        # We don't check for the final '0ms' since it may fail on slow hosts
957        self.assertStartsWith(lines[0], 'NODEP')
958        self.assertEqual(lines[1],
959                         "    The feature 'Feature' is not available.")
960
961    def test_unavailable_exception(self):
962        """An UnavailableFeature being raised should invoke addNotSupported."""
963        class InstrumentedTestResult(tests.ExtendedTestResult):
964            def stopTestRun(self):
965                pass
966
967            def report_tests_starting(self):
968                pass
969
970            def addNotSupported(self, test, feature):
971                self._call = test, feature
972        result = InstrumentedTestResult(None, None, None, None)
973        feature = features.Feature()
974
975        class Test(tests.TestCase):
976            def test_function(self):
977                raise tests.UnavailableFeature(feature)
978        test = Test("test_function")
979        test.run(result)
980        # it should invoke 'addNotSupported'.
981        self.assertEqual(2, len(result._call))
982        self.assertEqual(test.id(), result._call[0].id())
983        self.assertEqual(feature, result._call[1])
984        # and not count as an error
985        self.assertEqual(0, result.error_count)
986
987    def test_strict_with_unsupported_feature(self):
988        result = tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
989        test = self.get_passing_test()
990        feature = "Unsupported Feature"
991        result.addNotSupported(test, feature)
992        self.assertFalse(result.wasStrictlySuccessful())
993        self.assertEqual(None, result._extractBenchmarkTime(test))
994
995    def test_strict_with_known_failure(self):
996        result = tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
997        test = _get_test("test_xfail")
998        test.run(result)
999        self.assertFalse(result.wasStrictlySuccessful())
1000        self.assertEqual(None, result._extractBenchmarkTime(test))
1001
1002    def test_strict_with_success(self):
1003        result = tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
1004        test = self.get_passing_test()
1005        result.addSuccess(test)
1006        self.assertTrue(result.wasStrictlySuccessful())
1007        self.assertEqual(None, result._extractBenchmarkTime(test))
1008
1009    def test_startTests(self):
1010        """Starting the first test should trigger startTests."""
1011        class InstrumentedTestResult(tests.ExtendedTestResult):
1012            calls = 0
1013
1014            def startTests(self):
1015                self.calls += 1
1016        result = InstrumentedTestResult(None, None, None, None)
1017
1018        def test_function():
1019            pass
1020        test = unittest.FunctionTestCase(test_function)
1021        test.run(result)
1022        self.assertEqual(1, result.calls)
1023
1024    def test_startTests_only_once(self):
1025        """With multiple tests startTests should still only be called once"""
1026        class InstrumentedTestResult(tests.ExtendedTestResult):
1027            calls = 0
1028
1029            def startTests(self):
1030                self.calls += 1
1031        result = InstrumentedTestResult(None, None, None, None)
1032        suite = unittest.TestSuite([
1033            unittest.FunctionTestCase(lambda: None),
1034            unittest.FunctionTestCase(lambda: None)])
1035        suite.run(result)
1036        self.assertEqual(1, result.calls)
1037        self.assertEqual(2, result.count)
1038
1039
1040class TestRunner(tests.TestCase):
1041
1042    def dummy_test(self):
1043        pass
1044
1045    def run_test_runner(self, testrunner, test):
1046        """Run suite in testrunner, saving global state and restoring it.
1047
1048        This current saves and restores:
1049        TestCaseInTempDir.TEST_ROOT
1050
1051        There should be no tests in this file that use
1052        breezy.tests.TextTestRunner without using this convenience method,
1053        because of our use of global state.
1054        """
1055        old_root = tests.TestCaseInTempDir.TEST_ROOT
1056        try:
1057            tests.TestCaseInTempDir.TEST_ROOT = None
1058            return testrunner.run(test)
1059        finally:
1060            tests.TestCaseInTempDir.TEST_ROOT = old_root
1061
1062    def test_known_failure_failed_run(self):
1063        # run a test that generates a known failure which should be printed in
1064        # the final output when real failures occur.
1065        class Test(tests.TestCase):
1066            def known_failure_test(self):
1067                self.expectFailure('failed', self.assertTrue, False)
1068        test = unittest.TestSuite()
1069        test.addTest(Test("known_failure_test"))
1070
1071        def failing_test():
1072            raise AssertionError('foo')
1073        test.addTest(unittest.FunctionTestCase(failing_test))
1074        stream = StringIO()
1075        runner = tests.TextTestRunner(stream=stream)
1076        self.run_test_runner(runner, test)
1077        self.assertContainsRe(
1078            stream.getvalue(),
1079            '(?sm)^brz selftest.*$'
1080            '.*'
1081            '^======================================================================\n'
1082            '^FAIL: failing_test\n'
1083            '^----------------------------------------------------------------------\n'
1084            'Traceback \\(most recent call last\\):\n'
1085            '  .*'  # File .*, line .*, in failing_test' - but maybe not from .pyc
1086            '    raise AssertionError\\(\'foo\'\\)\n'
1087            '.*'
1088            '^----------------------------------------------------------------------\n'
1089            '.*'
1090            'FAILED \\(failures=1, known_failure_count=1\\)'
1091            )
1092
1093    def test_known_failure_ok_run(self):
1094        # run a test that generates a known failure which should be printed in
1095        # the final output.
1096        class Test(tests.TestCase):
1097            def known_failure_test(self):
1098                self.knownFailure("Never works...")
1099        test = Test("known_failure_test")
1100        stream = StringIO()
1101        runner = tests.TextTestRunner(stream=stream)
1102        self.run_test_runner(runner, test)
1103        self.assertContainsRe(stream.getvalue(),
1104                              '\n'
1105                              '-*\n'
1106                              'Ran 1 test in .*\n'
1107                              '\n'
1108                              'OK \\(known_failures=1\\)\n')
1109
1110    def test_unexpected_success_bad(self):
1111        class Test(tests.TestCase):
1112            def test_truth(self):
1113                self.expectFailure("No absolute truth", self.assertTrue, True)
1114        runner = tests.TextTestRunner(stream=StringIO())
1115        self.run_test_runner(runner, Test("test_truth"))
1116        self.assertContainsRe(runner.stream.getvalue(),
1117                              "=+\n"
1118                              "FAIL: \\S+\\.test_truth\n"
1119                              "-+\n"
1120                              "(?:.*\n)*"
1121                              "\\s*(?:Text attachment: )?reason"
1122                              "(?:\n-+\n|: {{{)"
1123                              "No absolute truth"
1124                              "(?:\n-+\n|}}}\n)"
1125                              "(?:.*\n)*"
1126                              "-+\n"
1127                              "Ran 1 test in .*\n"
1128                              "\n"
1129                              "FAILED \\(failures=1\\)\n\\Z")
1130
1131    def test_result_decorator(self):
1132        # decorate results
1133        calls = []
1134
1135        class LoggingDecorator(ExtendedToOriginalDecorator):
1136            def startTest(self, test):
1137                ExtendedToOriginalDecorator.startTest(self, test)
1138                calls.append('start')
1139        test = unittest.FunctionTestCase(lambda: None)
1140        stream = StringIO()
1141        runner = tests.TextTestRunner(stream=stream,
1142                                      result_decorators=[LoggingDecorator])
1143        self.run_test_runner(runner, test)
1144        self.assertLength(1, calls)
1145
1146    def test_skipped_test(self):
1147        # run a test that is skipped, and check the suite as a whole still
1148        # succeeds.
1149        # skipping_test must be hidden in here so it's not run as a real test
1150        class SkippingTest(tests.TestCase):
1151            def skipping_test(self):
1152                raise tests.TestSkipped('test intentionally skipped')
1153        runner = tests.TextTestRunner(stream=StringIO())
1154        test = SkippingTest("skipping_test")
1155        result = self.run_test_runner(runner, test)
1156        self.assertTrue(result.wasSuccessful())
1157
1158    def test_skipped_from_setup(self):
1159        calls = []
1160
1161        class SkippedSetupTest(tests.TestCase):
1162
1163            def setUp(self):
1164                calls.append('setUp')
1165                self.addCleanup(self.cleanup)
1166                raise tests.TestSkipped('skipped setup')
1167
1168            def test_skip(self):
1169                self.fail('test reached')
1170
1171            def cleanup(self):
1172                calls.append('cleanup')
1173
1174        runner = tests.TextTestRunner(stream=StringIO())
1175        test = SkippedSetupTest('test_skip')
1176        result = self.run_test_runner(runner, test)
1177        self.assertTrue(result.wasSuccessful())
1178        # Check if cleanup was called the right number of times.
1179        self.assertEqual(['setUp', 'cleanup'], calls)
1180
1181    def test_skipped_from_test(self):
1182        calls = []
1183
1184        class SkippedTest(tests.TestCase):
1185
1186            def setUp(self):
1187                super(SkippedTest, self).setUp()
1188                calls.append('setUp')
1189                self.addCleanup(self.cleanup)
1190
1191            def test_skip(self):
1192                raise tests.TestSkipped('skipped test')
1193
1194            def cleanup(self):
1195                calls.append('cleanup')
1196
1197        runner = tests.TextTestRunner(stream=StringIO())
1198        test = SkippedTest('test_skip')
1199        result = self.run_test_runner(runner, test)
1200        self.assertTrue(result.wasSuccessful())
1201        # Check if cleanup was called the right number of times.
1202        self.assertEqual(['setUp', 'cleanup'], calls)
1203
1204    def test_not_applicable(self):
1205        # run a test that is skipped because it's not applicable
1206        class Test(tests.TestCase):
1207            def not_applicable_test(self):
1208                raise tests.TestNotApplicable('this test never runs')
1209        out = StringIO()
1210        runner = tests.TextTestRunner(stream=out, verbosity=2)
1211        test = Test("not_applicable_test")
1212        result = self.run_test_runner(runner, test)
1213        self.log(out.getvalue())
1214        self.assertTrue(result.wasSuccessful())
1215        self.assertTrue(result.wasStrictlySuccessful())
1216        self.assertContainsRe(out.getvalue(),
1217                              r'(?m)not_applicable_test  * N/A')
1218        self.assertContainsRe(out.getvalue(),
1219                              r'(?m)^    this test never runs')
1220
1221    def test_unsupported_features_listed(self):
1222        """When unsupported features are encountered they are detailed."""
1223        class Feature1(features.Feature):
1224            def _probe(self):
1225                return False
1226
1227        class Feature2(features.Feature):
1228            def _probe(self):
1229                return False
1230        # create sample tests
1231        test1 = SampleTestCase('_test_pass')
1232        test1._test_needs_features = [Feature1()]
1233        test2 = SampleTestCase('_test_pass')
1234        test2._test_needs_features = [Feature2()]
1235        test = unittest.TestSuite()
1236        test.addTest(test1)
1237        test.addTest(test2)
1238        stream = StringIO()
1239        runner = tests.TextTestRunner(stream=stream)
1240        self.run_test_runner(runner, test)
1241        lines = stream.getvalue().splitlines()
1242        self.assertEqual([
1243            'OK',
1244            "Missing feature 'Feature1' skipped 1 tests.",
1245            "Missing feature 'Feature2' skipped 1 tests.",
1246            ],
1247            lines[-3:])
1248
1249    def test_verbose_test_count(self):
1250        """A verbose test run reports the right test count at the start"""
1251        suite = TestUtil.TestSuite([
1252            unittest.FunctionTestCase(lambda:None),
1253            unittest.FunctionTestCase(lambda:None)])
1254        self.assertEqual(suite.countTestCases(), 2)
1255        stream = StringIO()
1256        runner = tests.TextTestRunner(stream=stream, verbosity=2)
1257        # Need to use the CountingDecorator as that's what sets num_tests
1258        self.run_test_runner(runner, tests.CountingDecorator(suite))
1259        self.assertStartsWith(stream.getvalue(), "running 2 tests")
1260
1261    def test_startTestRun(self):
1262        """run should call result.startTestRun()"""
1263        calls = []
1264
1265        class LoggingDecorator(ExtendedToOriginalDecorator):
1266            def startTestRun(self):
1267                ExtendedToOriginalDecorator.startTestRun(self)
1268                calls.append('startTestRun')
1269        test = unittest.FunctionTestCase(lambda: None)
1270        stream = StringIO()
1271        runner = tests.TextTestRunner(stream=stream,
1272                                      result_decorators=[LoggingDecorator])
1273        self.run_test_runner(runner, test)
1274        self.assertLength(1, calls)
1275
1276    def test_stopTestRun(self):
1277        """run should call result.stopTestRun()"""
1278        calls = []
1279
1280        class LoggingDecorator(ExtendedToOriginalDecorator):
1281            def stopTestRun(self):
1282                ExtendedToOriginalDecorator.stopTestRun(self)
1283                calls.append('stopTestRun')
1284        test = unittest.FunctionTestCase(lambda: None)
1285        stream = StringIO()
1286        runner = tests.TextTestRunner(stream=stream,
1287                                      result_decorators=[LoggingDecorator])
1288        self.run_test_runner(runner, test)
1289        self.assertLength(1, calls)
1290
1291    def test_unicode_test_output_on_ascii_stream(self):
1292        """Showing results should always succeed even on an ascii console"""
1293        class FailureWithUnicode(tests.TestCase):
1294            def test_log_unicode(self):
1295                self.log(u"\u2606")
1296                self.fail("Now print that log!")
1297        bio = BytesIO()
1298        out = TextIOWrapper(bio, 'ascii', 'backslashreplace')
1299        self.overrideAttr(osutils, "get_terminal_encoding",
1300                          lambda trace=False: "ascii")
1301        self.run_test_runner(
1302            tests.TextTestRunner(stream=out),
1303            FailureWithUnicode("test_log_unicode"))
1304        out.flush()
1305        self.assertContainsRe(bio.getvalue(),
1306                              b"(?:Text attachment: )?log"
1307                              b"(?:\n-+\n|: {{{)"
1308                              b"\\d+\\.\\d+  \\\\u2606"
1309                              b"(?:\n-+\n|}}}\n)")
1310
1311
1312class SampleTestCase(tests.TestCase):
1313
1314    def _test_pass(self):
1315        pass
1316
1317
1318class _TestException(Exception):
1319    pass
1320
1321
1322class TestTestCase(tests.TestCase):
1323    """Tests that test the core breezy TestCase."""
1324
1325    def test_assertLength_matches_empty(self):
1326        a_list = []
1327        self.assertLength(0, a_list)
1328
1329    def test_assertLength_matches_nonempty(self):
1330        a_list = [1, 2, 3]
1331        self.assertLength(3, a_list)
1332
1333    def test_assertLength_fails_different(self):
1334        a_list = []
1335        self.assertRaises(AssertionError, self.assertLength, 1, a_list)
1336
1337    def test_assertLength_shows_sequence_in_failure(self):
1338        a_list = [1, 2, 3]
1339        exception = self.assertRaises(AssertionError, self.assertLength, 2,
1340                                      a_list)
1341        self.assertEqual('Incorrect length: wanted 2, got 3 for [1, 2, 3]',
1342                         exception.args[0])
1343
1344    def test_base_setUp_not_called_causes_failure(self):
1345        class TestCaseWithBrokenSetUp(tests.TestCase):
1346            def setUp(self):
1347                pass  # does not call TestCase.setUp
1348
1349            def test_foo(self):
1350                pass
1351        test = TestCaseWithBrokenSetUp('test_foo')
1352        result = unittest.TestResult()
1353        test.run(result)
1354        self.assertFalse(result.wasSuccessful())
1355        self.assertEqual(1, result.testsRun)
1356
1357    def test_base_tearDown_not_called_causes_failure(self):
1358        class TestCaseWithBrokenTearDown(tests.TestCase):
1359            def tearDown(self):
1360                pass  # does not call TestCase.tearDown
1361
1362            def test_foo(self):
1363                pass
1364        test = TestCaseWithBrokenTearDown('test_foo')
1365        result = unittest.TestResult()
1366        test.run(result)
1367        self.assertFalse(result.wasSuccessful())
1368        self.assertEqual(1, result.testsRun)
1369
1370    def test_debug_flags_sanitised(self):
1371        """The breezy debug flags should be sanitised by setUp."""
1372        if 'allow_debug' in tests.selftest_debug_flags:
1373            raise tests.TestNotApplicable(
1374                '-Eallow_debug option prevents debug flag sanitisation')
1375        # we could set something and run a test that will check
1376        # it gets santised, but this is probably sufficient for now:
1377        # if someone runs the test with -Dsomething it will error.
1378        flags = set()
1379        if self._lock_check_thorough:
1380            flags.add('strict_locks')
1381        self.assertEqual(flags, breezy.debug.debug_flags)
1382
1383    def change_selftest_debug_flags(self, new_flags):
1384        self.overrideAttr(tests, 'selftest_debug_flags', set(new_flags))
1385
1386    def test_allow_debug_flag(self):
1387        """The -Eallow_debug flag prevents breezy.debug.debug_flags from being
1388        sanitised (i.e. cleared) before running a test.
1389        """
1390        self.change_selftest_debug_flags({'allow_debug'})
1391        breezy.debug.debug_flags = {'a-flag'}
1392
1393        class TestThatRecordsFlags(tests.TestCase):
1394            def test_foo(nested_self):
1395                self.flags = set(breezy.debug.debug_flags)
1396        test = TestThatRecordsFlags('test_foo')
1397        test.run(self.make_test_result())
1398        flags = {'a-flag'}
1399        if 'disable_lock_checks' not in tests.selftest_debug_flags:
1400            flags.add('strict_locks')
1401        self.assertEqual(flags, self.flags)
1402
1403    def test_disable_lock_checks(self):
1404        """The -Edisable_lock_checks flag disables thorough checks."""
1405        class TestThatRecordsFlags(tests.TestCase):
1406            def test_foo(nested_self):
1407                self.flags = set(breezy.debug.debug_flags)
1408                self.test_lock_check_thorough = nested_self._lock_check_thorough
1409        self.change_selftest_debug_flags(set())
1410        test = TestThatRecordsFlags('test_foo')
1411        test.run(self.make_test_result())
1412        # By default we do strict lock checking and thorough lock/unlock
1413        # tracking.
1414        self.assertTrue(self.test_lock_check_thorough)
1415        self.assertEqual({'strict_locks'}, self.flags)
1416        # Now set the disable_lock_checks flag, and show that this changed.
1417        self.change_selftest_debug_flags({'disable_lock_checks'})
1418        test = TestThatRecordsFlags('test_foo')
1419        test.run(self.make_test_result())
1420        self.assertFalse(self.test_lock_check_thorough)
1421        self.assertEqual(set(), self.flags)
1422
1423    def test_this_fails_strict_lock_check(self):
1424        class TestThatRecordsFlags(tests.TestCase):
1425            def test_foo(nested_self):
1426                self.flags1 = set(breezy.debug.debug_flags)
1427                self.thisFailsStrictLockCheck()
1428                self.flags2 = set(breezy.debug.debug_flags)
1429        # Make sure lock checking is active
1430        self.change_selftest_debug_flags(set())
1431        test = TestThatRecordsFlags('test_foo')
1432        test.run(self.make_test_result())
1433        self.assertEqual({'strict_locks'}, self.flags1)
1434        self.assertEqual(set(), self.flags2)
1435
1436    def test_debug_flags_restored(self):
1437        """The breezy debug flags should be restored to their original state
1438        after the test was run, even if allow_debug is set.
1439        """
1440        self.change_selftest_debug_flags({'allow_debug'})
1441        # Now run a test that modifies debug.debug_flags.
1442        breezy.debug.debug_flags = {'original-state'}
1443
1444        class TestThatModifiesFlags(tests.TestCase):
1445            def test_foo(self):
1446                breezy.debug.debug_flags = {'modified'}
1447        test = TestThatModifiesFlags('test_foo')
1448        test.run(self.make_test_result())
1449        self.assertEqual({'original-state'}, breezy.debug.debug_flags)
1450
1451    def make_test_result(self):
1452        """Get a test result that writes to a StringIO."""
1453        return tests.TextTestResult(StringIO(), descriptions=0, verbosity=1)
1454
1455    def inner_test(self):
1456        # the inner child test
1457        note("inner_test")
1458
1459    def outer_child(self):
1460        # the outer child test
1461        note("outer_start")
1462        self.inner_test = TestTestCase("inner_child")
1463        result = self.make_test_result()
1464        self.inner_test.run(result)
1465        note("outer finish")
1466        self.addCleanup(osutils.delete_any, self._log_file_name)
1467
1468    def test_trace_nesting(self):
1469        # this tests that each test case nests its trace facility correctly.
1470        # we do this by running a test case manually. That test case (A)
1471        # should setup a new log, log content to it, setup a child case (B),
1472        # which should log independently, then case (A) should log a trailer
1473        # and return.
1474        # we do two nested children so that we can verify the state of the
1475        # logs after the outer child finishes is correct, which a bad clean
1476        # up routine in tearDown might trigger a fault in our test with only
1477        # one child, we should instead see the bad result inside our test with
1478        # the two children.
1479        # the outer child test
1480        original_trace = breezy.trace._trace_file
1481        outer_test = TestTestCase("outer_child")
1482        result = self.make_test_result()
1483        outer_test.run(result)
1484        self.assertEqual(original_trace, breezy.trace._trace_file)
1485
1486    def method_that_times_a_bit_twice(self):
1487        # call self.time twice to ensure it aggregates
1488        self.time(time.sleep, 0.007)
1489        self.time(time.sleep, 0.007)
1490
1491    def test_time_creates_benchmark_in_result(self):
1492        """The TestCase.time() method accumulates a benchmark time."""
1493        sample_test = TestTestCase("method_that_times_a_bit_twice")
1494        output_stream = StringIO()
1495        result = breezy.tests.VerboseTestResult(
1496            output_stream,
1497            descriptions=0,
1498            verbosity=2)
1499        sample_test.run(result)
1500        self.assertContainsRe(
1501            output_stream.getvalue(),
1502            r"\d+ms\*\n$")
1503
1504    def test_hooks_sanitised(self):
1505        """The breezy hooks should be sanitised by setUp."""
1506        # Note this test won't fail with hooks that the core library doesn't
1507        # use - but it trigger with a plugin that adds hooks, so its still a
1508        # useful warning in that case.
1509        self.assertEqual(breezy.branch.BranchHooks(),
1510                         breezy.branch.Branch.hooks)
1511        self.assertEqual(
1512            breezy.bzr.smart.server.SmartServerHooks(),
1513            breezy.bzr.smart.server.SmartTCPServer.hooks)
1514        self.assertEqual(
1515            breezy.commands.CommandHooks(), breezy.commands.Command.hooks)
1516
1517    def test__gather_lsprof_in_benchmarks(self):
1518        """When _gather_lsprof_in_benchmarks is on, accumulate profile data.
1519
1520        Each self.time() call is individually and separately profiled.
1521        """
1522        self.requireFeature(features.lsprof_feature)
1523        # overrides the class member with an instance member so no cleanup
1524        # needed.
1525        self._gather_lsprof_in_benchmarks = True
1526        self.time(time.sleep, 0.000)
1527        self.time(time.sleep, 0.003)
1528        self.assertEqual(2, len(self._benchcalls))
1529        self.assertEqual((time.sleep, (0.000,), {}), self._benchcalls[0][0])
1530        self.assertEqual((time.sleep, (0.003,), {}), self._benchcalls[1][0])
1531        self.assertIsInstance(self._benchcalls[0][1], breezy.lsprof.Stats)
1532        self.assertIsInstance(self._benchcalls[1][1], breezy.lsprof.Stats)
1533        del self._benchcalls[:]
1534
1535    def test_knownFailure(self):
1536        """Self.knownFailure() should raise a KnownFailure exception."""
1537        self.assertRaises(tests.KnownFailure, self.knownFailure, "A Failure")
1538
1539    def test_open_bzrdir_safe_roots(self):
1540        # even a memory transport should fail to open when its url isn't
1541        # permitted.
1542        # Manually set one up (TestCase doesn't and shouldn't provide magic
1543        # machinery)
1544        transport_server = memory.MemoryServer()
1545        transport_server.start_server()
1546        self.addCleanup(transport_server.stop_server)
1547        t = transport.get_transport_from_url(transport_server.get_url())
1548        controldir.ControlDir.create(t.base)
1549        self.assertRaises(errors.BzrError,
1550                          controldir.ControlDir.open_from_transport, t)
1551        # But if we declare this as safe, we can open the bzrdir.
1552        self.permit_url(t.base)
1553        self._bzr_selftest_roots.append(t.base)
1554        controldir.ControlDir.open_from_transport(t)
1555
1556    def test_requireFeature_available(self):
1557        """self.requireFeature(available) is a no-op."""
1558        class Available(features.Feature):
1559            def _probe(self):
1560                return True
1561        feature = Available()
1562        self.requireFeature(feature)
1563
1564    def test_requireFeature_unavailable(self):
1565        """self.requireFeature(unavailable) raises UnavailableFeature."""
1566        class Unavailable(features.Feature):
1567            def _probe(self):
1568                return False
1569        feature = Unavailable()
1570        self.assertRaises(tests.UnavailableFeature,
1571                          self.requireFeature, feature)
1572
1573    def test_run_no_parameters(self):
1574        test = SampleTestCase('_test_pass')
1575        test.run()
1576
1577    def test_run_enabled_unittest_result(self):
1578        """Test we revert to regular behaviour when the test is enabled."""
1579        test = SampleTestCase('_test_pass')
1580
1581        class EnabledFeature(object):
1582            def available(self):
1583                return True
1584        test._test_needs_features = [EnabledFeature()]
1585        result = unittest.TestResult()
1586        test.run(result)
1587        self.assertEqual(1, result.testsRun)
1588        self.assertEqual([], result.errors)
1589        self.assertEqual([], result.failures)
1590
1591    def test_run_disabled_unittest_result(self):
1592        """Test our compatibility for disabled tests with unittest results."""
1593        test = SampleTestCase('_test_pass')
1594
1595        class DisabledFeature(object):
1596            def available(self):
1597                return False
1598        test._test_needs_features = [DisabledFeature()]
1599        result = unittest.TestResult()
1600        test.run(result)
1601        self.assertEqual(1, result.testsRun)
1602        self.assertEqual([], result.errors)
1603        self.assertEqual([], result.failures)
1604
1605    def test_run_disabled_supporting_result(self):
1606        """Test disabled tests behaviour with support aware results."""
1607        test = SampleTestCase('_test_pass')
1608
1609        class DisabledFeature(object):
1610            def __eq__(self, other):
1611                return isinstance(other, DisabledFeature)
1612
1613            def available(self):
1614                return False
1615        the_feature = DisabledFeature()
1616        test._test_needs_features = [the_feature]
1617
1618        class InstrumentedTestResult(unittest.TestResult):
1619            def __init__(self):
1620                unittest.TestResult.__init__(self)
1621                self.calls = []
1622
1623            def startTest(self, test):
1624                self.calls.append(('startTest', test))
1625
1626            def stopTest(self, test):
1627                self.calls.append(('stopTest', test))
1628
1629            def addNotSupported(self, test, feature):
1630                self.calls.append(('addNotSupported', test, feature))
1631        result = InstrumentedTestResult()
1632        test.run(result)
1633        case = result.calls[0][1]
1634        self.assertEqual([
1635            ('startTest', case),
1636            ('addNotSupported', case, the_feature),
1637            ('stopTest', case),
1638            ],
1639            result.calls)
1640
1641    def test_start_server_registers_url(self):
1642        transport_server = memory.MemoryServer()
1643        # A little strict, but unlikely to be changed soon.
1644        self.assertEqual([], self._bzr_selftest_roots)
1645        self.start_server(transport_server)
1646        self.assertSubset([transport_server.get_url()],
1647                          self._bzr_selftest_roots)
1648
1649    def test_assert_list_raises_on_generator(self):
1650        def generator_which_will_raise():
1651            # This will not raise until after the first yield
1652            yield 1
1653            raise _TestException()
1654
1655        e = self.assertListRaises(_TestException, generator_which_will_raise)
1656        self.assertIsInstance(e, _TestException)
1657
1658        e = self.assertListRaises(Exception, generator_which_will_raise)
1659        self.assertIsInstance(e, _TestException)
1660
1661    def test_assert_list_raises_on_plain(self):
1662        def plain_exception():
1663            raise _TestException()
1664            return []
1665
1666        e = self.assertListRaises(_TestException, plain_exception)
1667        self.assertIsInstance(e, _TestException)
1668
1669        e = self.assertListRaises(Exception, plain_exception)
1670        self.assertIsInstance(e, _TestException)
1671
1672    def test_assert_list_raises_assert_wrong_exception(self):
1673        class _NotTestException(Exception):
1674            pass
1675
1676        def wrong_exception():
1677            raise _NotTestException()
1678
1679        def wrong_exception_generator():
1680            yield 1
1681            yield 2
1682            raise _NotTestException()
1683
1684        # Wrong exceptions are not intercepted
1685        self.assertRaises(
1686            _NotTestException,
1687            self.assertListRaises, _TestException, wrong_exception)
1688        self.assertRaises(
1689            _NotTestException,
1690            self.assertListRaises, _TestException, wrong_exception_generator)
1691
1692    def test_assert_list_raises_no_exception(self):
1693        def success():
1694            return []
1695
1696        def success_generator():
1697            yield 1
1698            yield 2
1699
1700        self.assertRaises(AssertionError,
1701                          self.assertListRaises, _TestException, success)
1702
1703        self.assertRaises(
1704            AssertionError,
1705            self.assertListRaises, _TestException, success_generator)
1706
1707    def _run_successful_test(self, test):
1708        result = testtools.TestResult()
1709        test.run(result)
1710        self.assertTrue(result.wasSuccessful())
1711        return result
1712
1713    def test_overrideAttr_without_value(self):
1714        self.test_attr = 'original'  # Define a test attribute
1715        obj = self  # Make 'obj' visible to the embedded test
1716
1717        class Test(tests.TestCase):
1718
1719            def setUp(self):
1720                super(Test, self).setUp()
1721                self.orig = self.overrideAttr(obj, 'test_attr')
1722
1723            def test_value(self):
1724                self.assertEqual('original', self.orig)
1725                self.assertEqual('original', obj.test_attr)
1726                obj.test_attr = 'modified'
1727                self.assertEqual('modified', obj.test_attr)
1728
1729        self._run_successful_test(Test('test_value'))
1730        self.assertEqual('original', obj.test_attr)
1731
1732    def test_overrideAttr_with_value(self):
1733        self.test_attr = 'original'  # Define a test attribute
1734        obj = self  # Make 'obj' visible to the embedded test
1735
1736        class Test(tests.TestCase):
1737
1738            def setUp(self):
1739                super(Test, self).setUp()
1740                self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1741
1742            def test_value(self):
1743                self.assertEqual('original', self.orig)
1744                self.assertEqual('modified', obj.test_attr)
1745
1746        self._run_successful_test(Test('test_value'))
1747        self.assertEqual('original', obj.test_attr)
1748
1749    def test_overrideAttr_with_no_existing_value_and_value(self):
1750        # Do not define the test_attribute
1751        obj = self  # Make 'obj' visible to the embedded test
1752
1753        class Test(tests.TestCase):
1754
1755            def setUp(self):
1756                tests.TestCase.setUp(self)
1757                self.orig = self.overrideAttr(obj, 'test_attr', new='modified')
1758
1759            def test_value(self):
1760                self.assertEqual(tests._unitialized_attr, self.orig)
1761                self.assertEqual('modified', obj.test_attr)
1762
1763        self._run_successful_test(Test('test_value'))
1764        self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1765
1766    def test_overrideAttr_with_no_existing_value_and_no_value(self):
1767        # Do not define the test_attribute
1768        obj = self  # Make 'obj' visible to the embedded test
1769
1770        class Test(tests.TestCase):
1771
1772            def setUp(self):
1773                tests.TestCase.setUp(self)
1774                self.orig = self.overrideAttr(obj, 'test_attr')
1775
1776            def test_value(self):
1777                self.assertEqual(tests._unitialized_attr, self.orig)
1778                self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1779
1780        self._run_successful_test(Test('test_value'))
1781        self.assertRaises(AttributeError, getattr, obj, 'test_attr')
1782
1783    def test_recordCalls(self):
1784        from breezy.tests import test_selftest
1785        calls = self.recordCalls(
1786            test_selftest, '_add_numbers')
1787        self.assertEqual(test_selftest._add_numbers(2, 10),
1788                         12)
1789        self.assertEqual(calls, [((2, 10), {})])
1790
1791
1792def _add_numbers(a, b):
1793    return a + b
1794
1795
1796class _MissingFeature(features.Feature):
1797    def _probe(self):
1798        return False
1799
1800
1801missing_feature = _MissingFeature()
1802
1803
1804def _get_test(name):
1805    """Get an instance of a specific example test.
1806
1807    We protect this in a function so that they don't auto-run in the test
1808    suite.
1809    """
1810
1811    class ExampleTests(tests.TestCase):
1812
1813        def test_fail(self):
1814            mutter('this was a failing test')
1815            self.fail('this test will fail')
1816
1817        def test_error(self):
1818            mutter('this test errored')
1819            raise RuntimeError('gotcha')
1820
1821        def test_missing_feature(self):
1822            mutter('missing the feature')
1823            self.requireFeature(missing_feature)
1824
1825        def test_skip(self):
1826            mutter('this test will be skipped')
1827            raise tests.TestSkipped('reason')
1828
1829        def test_success(self):
1830            mutter('this test succeeds')
1831
1832        def test_xfail(self):
1833            mutter('test with expected failure')
1834            self.knownFailure('this_fails')
1835
1836        def test_unexpected_success(self):
1837            mutter('test with unexpected success')
1838            self.expectFailure('should_fail', lambda: None)
1839
1840    return ExampleTests(name)
1841
1842
1843class TestTestCaseLogDetails(tests.TestCase):
1844
1845    def _run_test(self, test_name):
1846        test = _get_test(test_name)
1847        result = testtools.TestResult()
1848        test.run(result)
1849        return result
1850
1851    def test_fail_has_log(self):
1852        result = self._run_test('test_fail')
1853        self.assertEqual(1, len(result.failures))
1854        result_content = result.failures[0][1]
1855        self.assertContainsRe(result_content,
1856                              '(?m)^(?:Text attachment: )?log(?:$|: )')
1857        self.assertContainsRe(result_content, 'this was a failing test')
1858
1859    def test_error_has_log(self):
1860        result = self._run_test('test_error')
1861        self.assertEqual(1, len(result.errors))
1862        result_content = result.errors[0][1]
1863        self.assertContainsRe(result_content,
1864                              '(?m)^(?:Text attachment: )?log(?:$|: )')
1865        self.assertContainsRe(result_content, 'this test errored')
1866
1867    def test_skip_has_no_log(self):
1868        result = self._run_test('test_skip')
1869        reasons = result.skip_reasons
1870        self.assertEqual({'reason'}, set(reasons))
1871        skips = reasons['reason']
1872        self.assertEqual(1, len(skips))
1873        test = skips[0]
1874        self.assertFalse('log' in test.getDetails())
1875
1876    def test_missing_feature_has_no_log(self):
1877        # testtools doesn't know about addNotSupported, so it just gets
1878        # considered as a skip
1879        result = self._run_test('test_missing_feature')
1880        reasons = result.skip_reasons
1881        self.assertEqual({str(missing_feature)}, set(reasons))
1882        skips = reasons[str(missing_feature)]
1883        self.assertEqual(1, len(skips))
1884        test = skips[0]
1885        self.assertFalse('log' in test.getDetails())
1886
1887    def test_xfail_has_no_log(self):
1888        result = self._run_test('test_xfail')
1889        self.assertEqual(1, len(result.expectedFailures))
1890        result_content = result.expectedFailures[0][1]
1891        self.assertNotContainsRe(result_content,
1892                                 '(?m)^(?:Text attachment: )?log(?:$|: )')
1893        self.assertNotContainsRe(result_content, 'test with expected failure')
1894
1895    def test_unexpected_success_has_log(self):
1896        result = self._run_test('test_unexpected_success')
1897        self.assertEqual(1, len(result.unexpectedSuccesses))
1898        # Inconsistency, unexpectedSuccesses is a list of tests,
1899        # expectedFailures is a list of reasons?
1900        test = result.unexpectedSuccesses[0]
1901        details = test.getDetails()
1902        self.assertTrue('log' in details)
1903
1904
1905class TestTestCloning(tests.TestCase):
1906    """Tests that test cloning of TestCases (as used by multiply_tests)."""
1907
1908    def test_cloned_testcase_does_not_share_details(self):
1909        """A TestCase cloned with clone_test does not share mutable attributes
1910        such as details or cleanups.
1911        """
1912        class Test(tests.TestCase):
1913            def test_foo(self):
1914                self.addDetail('foo', Content('text/plain', lambda: 'foo'))
1915        orig_test = Test('test_foo')
1916        cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)')
1917        orig_test.run(unittest.TestResult())
1918        self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes())
1919        self.assertEqual(None, cloned_test.getDetails().get('foo'))
1920
1921    def test_double_apply_scenario_preserves_first_scenario(self):
1922        """Applying two levels of scenarios to a test preserves the attributes
1923        added by both scenarios.
1924        """
1925        class Test(tests.TestCase):
1926            def test_foo(self):
1927                pass
1928        test = Test('test_foo')
1929        scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})]
1930        scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})]
1931        suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite())
1932        suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite())
1933        all_tests = list(tests.iter_suite_tests(suite))
1934        self.assertLength(4, all_tests)
1935        all_xys = sorted((t.x, t.y) for t in all_tests)
1936        self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys)
1937
1938
1939# NB: Don't delete this; it's not actually from 0.11!
1940@deprecated_function(deprecated_in((0, 11, 0)))
1941def sample_deprecated_function():
1942    """A deprecated function to test applyDeprecated with."""
1943    return 2
1944
1945
1946def sample_undeprecated_function(a_param):
1947    """A undeprecated function to test applyDeprecated with."""
1948
1949
1950class ApplyDeprecatedHelper(object):
1951    """A helper class for ApplyDeprecated tests."""
1952
1953    @deprecated_method(deprecated_in((0, 11, 0)))
1954    def sample_deprecated_method(self, param_one):
1955        """A deprecated method for testing with."""
1956        return param_one
1957
1958    def sample_normal_method(self):
1959        """A undeprecated method."""
1960
1961    @deprecated_method(deprecated_in((0, 10, 0)))
1962    def sample_nested_deprecation(self):
1963        return sample_deprecated_function()
1964
1965
1966class TestExtraAssertions(tests.TestCase):
1967    """Tests for new test assertions in breezy test suite"""
1968
1969    def test_assert_isinstance(self):
1970        self.assertIsInstance(2, int)
1971        self.assertIsInstance(u'', str)
1972        e = self.assertRaises(AssertionError, self.assertIsInstance, None, int)
1973        self.assertIn(
1974            str(e),
1975            ["None is an instance of <type 'NoneType'> rather than "
1976             "<type 'int'>",
1977             "None is an instance of <class 'NoneType'> rather than "
1978             "<class 'int'>"])
1979        self.assertRaises(AssertionError, self.assertIsInstance, 23.3, int)
1980        e = self.assertRaises(AssertionError,
1981                              self.assertIsInstance, None, int,
1982                              "it's just not")
1983        self.assertEqual(
1984            str(e),
1985            "None is an instance of <class 'NoneType'> rather "
1986            "than <class 'int'>: it's just not")
1987
1988    def test_assertEndsWith(self):
1989        self.assertEndsWith('foo', 'oo')
1990        self.assertRaises(AssertionError, self.assertEndsWith, 'o', 'oo')
1991
1992    def test_assertEqualDiff(self):
1993        e = self.assertRaises(AssertionError,
1994                              self.assertEqualDiff, '', '\n')
1995        self.assertEqual(str(e),
1996                         # Don't blink ! The '+' applies to the second string
1997                         'first string is missing a final newline.\n+ \n')
1998        e = self.assertRaises(AssertionError,
1999                              self.assertEqualDiff, '\n', '')
2000        self.assertEqual(str(e),
2001                         # Don't blink ! The '-' applies to the second string
2002                         'second string is missing a final newline.\n- \n')
2003
2004
2005class TestDeprecations(tests.TestCase):
2006
2007    def test_applyDeprecated_not_deprecated(self):
2008        sample_object = ApplyDeprecatedHelper()
2009        # calling an undeprecated callable raises an assertion
2010        self.assertRaises(AssertionError, self.applyDeprecated,
2011                          deprecated_in((0, 11, 0)),
2012                          sample_object.sample_normal_method)
2013        self.assertRaises(AssertionError, self.applyDeprecated,
2014                          deprecated_in((0, 11, 0)),
2015                          sample_undeprecated_function, "a param value")
2016        # calling a deprecated callable (function or method) with the wrong
2017        # expected deprecation fails.
2018        self.assertRaises(AssertionError, self.applyDeprecated,
2019                          deprecated_in((0, 10, 0)),
2020                          sample_object.sample_deprecated_method,
2021                          "a param value")
2022        self.assertRaises(AssertionError, self.applyDeprecated,
2023                          deprecated_in((0, 10, 0)),
2024                          sample_deprecated_function)
2025        # calling a deprecated callable (function or method) with the right
2026        # expected deprecation returns the functions result.
2027        self.assertEqual(
2028            "a param value",
2029            self.applyDeprecated(
2030                deprecated_in((0, 11, 0)),
2031                sample_object.sample_deprecated_method, "a param value"))
2032        self.assertEqual(2, self.applyDeprecated(deprecated_in((0, 11, 0)),
2033                                                 sample_deprecated_function))
2034        # calling a nested deprecation with the wrong deprecation version
2035        # fails even if a deeper nested function was deprecated with the
2036        # supplied version.
2037        self.assertRaises(
2038            AssertionError, self.applyDeprecated,
2039            deprecated_in((0, 11, 0)), sample_object.sample_nested_deprecation)
2040        # calling a nested deprecation with the right deprecation value
2041        # returns the calls result.
2042        self.assertEqual(
2043            2, self.applyDeprecated(
2044                deprecated_in((0, 10, 0)),
2045                sample_object.sample_nested_deprecation))
2046
2047    def test_callDeprecated(self):
2048        def testfunc(be_deprecated, result=None):
2049            if be_deprecated is True:
2050                symbol_versioning.warn('i am deprecated', DeprecationWarning,
2051                                       stacklevel=1)
2052            return result
2053        result = self.callDeprecated(['i am deprecated'], testfunc, True)
2054        self.assertIs(None, result)
2055        result = self.callDeprecated([], testfunc, False, 'result')
2056        self.assertEqual('result', result)
2057        self.callDeprecated(['i am deprecated'], testfunc, be_deprecated=True)
2058        self.callDeprecated([], testfunc, be_deprecated=False)
2059
2060
2061class TestWarningTests(tests.TestCase):
2062    """Tests for calling methods that raise warnings."""
2063
2064    def test_callCatchWarnings(self):
2065        def meth(a, b):
2066            warnings.warn("this is your last warning")
2067            return a + b
2068        wlist, result = self.callCatchWarnings(meth, 1, 2)
2069        self.assertEqual(3, result)
2070        # would like just to compare them, but UserWarning doesn't implement
2071        # eq well
2072        w0, = wlist
2073        self.assertIsInstance(w0, UserWarning)
2074        self.assertEqual("this is your last warning", str(w0))
2075
2076
2077class TestConvenienceMakers(tests.TestCaseWithTransport):
2078    """Test for the make_* convenience functions."""
2079
2080    def test_make_branch_and_tree_with_format(self):
2081        # we should be able to supply a format to make_branch_and_tree
2082        self.make_branch_and_tree(
2083            'a', format=breezy.bzr.bzrdir.BzrDirMetaFormat1())
2084        self.assertIsInstance(breezy.controldir.ControlDir.open('a')._format,
2085                              breezy.bzr.bzrdir.BzrDirMetaFormat1)
2086
2087    def test_make_branch_and_memory_tree(self):
2088        # we should be able to get a new branch and a mutable tree from
2089        # TestCaseWithTransport
2090        tree = self.make_branch_and_memory_tree('a')
2091        self.assertIsInstance(tree, breezy.memorytree.MemoryTree)
2092
2093    def test_make_tree_for_local_vfs_backed_transport(self):
2094        # make_branch_and_tree has to use local branch and repositories
2095        # when the vfs transport and local disk are colocated, even if
2096        # a different transport is in use for url generation.
2097        self.transport_server = test_server.FakeVFATServer
2098        self.assertFalse(self.get_url('t1').startswith('file://'))
2099        tree = self.make_branch_and_tree('t1')
2100        base = tree.controldir.root_transport.base
2101        self.assertStartsWith(base, 'file://')
2102        self.assertEqual(tree.controldir.root_transport,
2103                         tree.branch.controldir.root_transport)
2104        self.assertEqual(tree.controldir.root_transport,
2105                         tree.branch.repository.controldir.root_transport)
2106
2107
2108class SelfTestHelper(object):
2109
2110    def run_selftest(self, **kwargs):
2111        """Run selftest returning its output."""
2112        bio = BytesIO()
2113        output = TextIOWrapper(bio, 'utf-8')
2114        old_transport = breezy.tests.default_transport
2115        old_root = tests.TestCaseWithMemoryTransport.TEST_ROOT
2116        tests.TestCaseWithMemoryTransport.TEST_ROOT = None
2117        try:
2118            self.assertEqual(True, tests.selftest(stream=output, **kwargs))
2119        finally:
2120            breezy.tests.default_transport = old_transport
2121            tests.TestCaseWithMemoryTransport.TEST_ROOT = old_root
2122        output.flush()
2123        output.detach()
2124        bio.seek(0)
2125        return bio
2126
2127
2128class TestSelftest(tests.TestCase, SelfTestHelper):
2129    """Tests of breezy.tests.selftest."""
2130
2131    def test_selftest_benchmark_parameter_invokes_test_suite__benchmark__(
2132            self):
2133        factory_called = []
2134
2135        def factory():
2136            factory_called.append(True)
2137            return TestUtil.TestSuite()
2138        out = StringIO()
2139        err = StringIO()
2140        self.apply_redirected(out, err, None, breezy.tests.selftest,
2141                              test_suite_factory=factory)
2142        self.assertEqual([True], factory_called)
2143
2144    def factory(self):
2145        """A test suite factory."""
2146        class Test(tests.TestCase):
2147            def id(self):
2148                return __name__ + ".Test." + self._testMethodName
2149
2150            def a(self):
2151                pass
2152
2153            def b(self):
2154                pass
2155
2156            def c(telf):
2157                pass
2158        return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
2159
2160    def test_list_only(self):
2161        output = self.run_selftest(test_suite_factory=self.factory,
2162                                   list_only=True)
2163        self.assertEqual(3, len(output.readlines()))
2164
2165    def test_list_only_filtered(self):
2166        output = self.run_selftest(test_suite_factory=self.factory,
2167                                   list_only=True, pattern="Test.b")
2168        self.assertEndsWith(output.getvalue(), b"Test.b\n")
2169        self.assertLength(1, output.readlines())
2170
2171    def test_list_only_excludes(self):
2172        output = self.run_selftest(test_suite_factory=self.factory,
2173                                   list_only=True, exclude_pattern="Test.b")
2174        self.assertNotContainsRe(b"Test.b", output.getvalue())
2175        self.assertLength(2, output.readlines())
2176
2177    def test_lsprof_tests(self):
2178        self.requireFeature(features.lsprof_feature)
2179        results = []
2180
2181        class Test(object):
2182            def __call__(test, result):
2183                test.run(result)
2184
2185            def run(test, result):
2186                results.append(result)
2187
2188            def countTestCases(self):
2189                return 1
2190        self.run_selftest(test_suite_factory=Test, lsprof_tests=True)
2191        self.assertLength(1, results)
2192        self.assertIsInstance(results.pop(), ExtendedToOriginalDecorator)
2193
2194    def test_random(self):
2195        # test randomising by listing a number of tests.
2196        output_123 = self.run_selftest(test_suite_factory=self.factory,
2197                                       list_only=True, random_seed="123")
2198        output_234 = self.run_selftest(test_suite_factory=self.factory,
2199                                       list_only=True, random_seed="234")
2200        self.assertNotEqual(output_123, output_234)
2201        # "Randominzing test order..\n\n
2202        self.assertLength(5, output_123.readlines())
2203        self.assertLength(5, output_234.readlines())
2204
2205    def test_random_reuse_is_same_order(self):
2206        # test randomising by listing a number of tests.
2207        expected = self.run_selftest(test_suite_factory=self.factory,
2208                                     list_only=True, random_seed="123")
2209        repeated = self.run_selftest(test_suite_factory=self.factory,
2210                                     list_only=True, random_seed="123")
2211        self.assertEqual(expected.getvalue(), repeated.getvalue())
2212
2213    def test_runner_class(self):
2214        self.requireFeature(features.subunit)
2215        from subunit import ProtocolTestCase
2216        stream = self.run_selftest(
2217            runner_class=tests.SubUnitBzrRunnerv1,
2218            test_suite_factory=self.factory)
2219        test = ProtocolTestCase(stream)
2220        result = unittest.TestResult()
2221        test.run(result)
2222        self.assertEqual(3, result.testsRun)
2223
2224    def test_starting_with_single_argument(self):
2225        output = self.run_selftest(test_suite_factory=self.factory,
2226                                   starting_with=[
2227                                       'breezy.tests.test_selftest.Test.a'],
2228                                   list_only=True)
2229        self.assertEqual(b'breezy.tests.test_selftest.Test.a\n',
2230                         output.getvalue())
2231
2232    def test_starting_with_multiple_argument(self):
2233        output = self.run_selftest(
2234            test_suite_factory=self.factory,
2235            starting_with=['breezy.tests.test_selftest.Test.a',
2236                           'breezy.tests.test_selftest.Test.b'],
2237            list_only=True)
2238        self.assertEqual(b'breezy.tests.test_selftest.Test.a\n'
2239                         b'breezy.tests.test_selftest.Test.b\n',
2240                         output.getvalue())
2241
2242    def check_transport_set(self, transport_server):
2243        captured_transport = []
2244
2245        def seen_transport(a_transport):
2246            captured_transport.append(a_transport)
2247
2248        class Capture(tests.TestCase):
2249            def a(self):
2250                seen_transport(breezy.tests.default_transport)
2251
2252        def factory():
2253            return TestUtil.TestSuite([Capture("a")])
2254        self.run_selftest(transport=transport_server,
2255                          test_suite_factory=factory)
2256        self.assertEqual(transport_server, captured_transport[0])
2257
2258    def test_transport_sftp(self):
2259        self.requireFeature(features.paramiko)
2260        from breezy.tests import stub_sftp
2261        self.check_transport_set(stub_sftp.SFTPAbsoluteServer)
2262
2263    def test_transport_memory(self):
2264        self.check_transport_set(memory.MemoryServer)
2265
2266
2267class TestSelftestWithIdList(tests.TestCaseInTempDir, SelfTestHelper):
2268    # Does IO: reads test.list
2269
2270    def test_load_list(self):
2271        # Provide a list with one test - this test.
2272        test_id_line = b'%s\n' % self.id().encode('ascii')
2273        self.build_tree_contents([('test.list', test_id_line)])
2274        # And generate a list of the tests in  the suite.
2275        stream = self.run_selftest(load_list='test.list', list_only=True)
2276        self.assertEqual(test_id_line, stream.getvalue())
2277
2278    def test_load_unknown(self):
2279        # Provide a list with one test - this test.
2280        # And generate a list of the tests in  the suite.
2281        self.assertRaises(errors.NoSuchFile, self.run_selftest,
2282                          load_list='missing file name', list_only=True)
2283
2284
2285class TestSubunitLogDetails(tests.TestCase, SelfTestHelper):
2286
2287    _test_needs_features = [features.subunit]
2288
2289    def run_subunit_stream(self, test_name):
2290        from subunit import ProtocolTestCase
2291
2292        def factory():
2293            return TestUtil.TestSuite([_get_test(test_name)])
2294        stream = self.run_selftest(
2295            runner_class=tests.SubUnitBzrRunnerv1,
2296            test_suite_factory=factory)
2297        test = ProtocolTestCase(stream)
2298        result = testtools.TestResult()
2299        test.run(result)
2300        content = stream.getvalue()
2301        return content, result
2302
2303    def test_fail_has_log(self):
2304        content, result = self.run_subunit_stream('test_fail')
2305        self.assertEqual(1, len(result.failures))
2306        self.assertContainsRe(content, b'(?m)^log$')
2307        self.assertContainsRe(content, b'this test will fail')
2308
2309    def test_error_has_log(self):
2310        content, result = self.run_subunit_stream('test_error')
2311        self.assertContainsRe(content, b'(?m)^log$')
2312        self.assertContainsRe(content, b'this test errored')
2313
2314    def test_skip_has_no_log(self):
2315        content, result = self.run_subunit_stream('test_skip')
2316        self.assertNotContainsRe(content, b'(?m)^log$')
2317        self.assertNotContainsRe(content, b'this test will be skipped')
2318        reasons = result.skip_reasons
2319        self.assertEqual({'reason'}, set(reasons))
2320        skips = reasons['reason']
2321        self.assertEqual(1, len(skips))
2322        # test = skips[0]
2323        # RemotedTestCase doesn't preserve the "details"
2324        # self.assertFalse('log' in test.getDetails())
2325
2326    def test_missing_feature_has_no_log(self):
2327        content, result = self.run_subunit_stream('test_missing_feature')
2328        self.assertNotContainsRe(content, b'(?m)^log$')
2329        self.assertNotContainsRe(content, b'missing the feature')
2330        reasons = result.skip_reasons
2331        self.assertEqual({'_MissingFeature\n'}, set(reasons))
2332        skips = reasons['_MissingFeature\n']
2333        self.assertEqual(1, len(skips))
2334        # test = skips[0]
2335        # RemotedTestCase doesn't preserve the "details"
2336        # self.assertFalse('log' in test.getDetails())
2337
2338    def test_xfail_has_no_log(self):
2339        content, result = self.run_subunit_stream('test_xfail')
2340        self.assertNotContainsRe(content, b'(?m)^log$')
2341        self.assertNotContainsRe(content, b'test with expected failure')
2342        self.assertEqual(1, len(result.expectedFailures))
2343        result_content = result.expectedFailures[0][1]
2344        self.assertNotContainsRe(result_content,
2345                                 '(?m)^(?:Text attachment: )?log(?:$|: )')
2346        self.assertNotContainsRe(result_content, 'test with expected failure')
2347
2348    def test_unexpected_success_has_log(self):
2349        content, result = self.run_subunit_stream('test_unexpected_success')
2350        self.assertContainsRe(content, b'(?m)^log$')
2351        self.assertContainsRe(content, b'test with unexpected success')
2352        self.assertEqual(1, len(result.unexpectedSuccesses))
2353        # test = result.unexpectedSuccesses[0]
2354        # RemotedTestCase doesn't preserve the "details"
2355        # self.assertTrue('log' in test.getDetails())
2356
2357    def test_success_has_no_log(self):
2358        content, result = self.run_subunit_stream('test_success')
2359        self.assertEqual(1, result.testsRun)
2360        self.assertNotContainsRe(content, b'(?m)^log$')
2361        self.assertNotContainsRe(content, b'this test succeeds')
2362
2363
2364class TestRunBzr(tests.TestCase):
2365
2366    result = 0
2367    out = ''
2368    err = ''
2369
2370    def _run_bzr_core(self, argv, encoding=None, stdin=None,
2371                      stdout=None, stderr=None, working_dir=None):
2372        """Override _run_bzr_core to test how it is invoked by run_bzr.
2373
2374        Attempts to run bzr from inside this class don't actually run it.
2375
2376        We test how run_bzr actually invokes bzr in another location.  Here we
2377        only need to test that it passes the right parameters to run_bzr.
2378        """
2379        self.argv = list(argv)
2380        self.encoding = encoding
2381        self.stdin = stdin
2382        self.working_dir = working_dir
2383        stdout.write(self.out)
2384        stderr.write(self.err)
2385        return self.result
2386
2387    def test_run_bzr_error(self):
2388        self.out = "It sure does!\n"
2389        self.result = 34
2390        out, err = self.run_bzr_error(['^$'], ['rocks'], retcode=34)
2391        self.assertEqual(['rocks'], self.argv)
2392        self.assertEqual('It sure does!\n', out)
2393        self.assertEqual(out, self.out)
2394        self.assertEqual('', err)
2395        self.assertEqual(err, self.err)
2396
2397    def test_run_bzr_error_regexes(self):
2398        self.out = ''
2399        self.err = "bzr: ERROR: foobarbaz is not versioned"
2400        self.result = 3
2401        out, err = self.run_bzr_error(
2402            ["bzr: ERROR: foobarbaz is not versioned"],
2403            ['file-id', 'foobarbaz'])
2404
2405    def test_encoding(self):
2406        """Test that run_bzr passes encoding to _run_bzr_core"""
2407        self.run_bzr('foo bar')
2408        self.assertEqual(osutils.get_user_encoding(), self.encoding)
2409        self.assertEqual(['foo', 'bar'], self.argv)
2410
2411        self.run_bzr('foo bar', encoding='baz')
2412        self.assertEqual('baz', self.encoding)
2413        self.assertEqual(['foo', 'bar'], self.argv)
2414
2415    def test_stdin(self):
2416        # test that the stdin keyword to run_bzr is passed through to
2417        # _run_bzr_core as-is. We do this by overriding
2418        # _run_bzr_core in this class, and then calling run_bzr,
2419        # which is a convenience function for _run_bzr_core, so
2420        # should invoke it.
2421        self.run_bzr('foo bar', stdin='gam')
2422        self.assertEqual('gam', self.stdin)
2423        self.assertEqual(['foo', 'bar'], self.argv)
2424
2425        self.run_bzr('foo bar', stdin='zippy')
2426        self.assertEqual('zippy', self.stdin)
2427        self.assertEqual(['foo', 'bar'], self.argv)
2428
2429    def test_working_dir(self):
2430        """Test that run_bzr passes working_dir to _run_bzr_core"""
2431        self.run_bzr('foo bar')
2432        self.assertEqual(None, self.working_dir)
2433        self.assertEqual(['foo', 'bar'], self.argv)
2434
2435        self.run_bzr('foo bar', working_dir='baz')
2436        self.assertEqual('baz', self.working_dir)
2437        self.assertEqual(['foo', 'bar'], self.argv)
2438
2439    def test_reject_extra_keyword_arguments(self):
2440        self.assertRaises(TypeError, self.run_bzr, "foo bar",
2441                          error_regex=['error message'])
2442
2443
2444class TestRunBzrCaptured(tests.TestCaseWithTransport):
2445    # Does IO when testing the working_dir parameter.
2446
2447    def apply_redirected(self, stdin=None, stdout=None, stderr=None,
2448                         a_callable=None, *args, **kwargs):
2449        self.stdin = stdin
2450        self.factory_stdin = getattr(breezy.ui.ui_factory, "stdin", None)
2451        self.factory = breezy.ui.ui_factory
2452        self.working_dir = osutils.getcwd()
2453        stdout.write('foo\n')
2454        stderr.write('bar\n')
2455        return 0
2456
2457    def test_stdin(self):
2458        # test that the stdin keyword to _run_bzr_core is passed through to
2459        # apply_redirected as a StringIO. We do this by overriding
2460        # apply_redirected in this class, and then calling _run_bzr_core,
2461        # which calls apply_redirected.
2462        self.run_bzr(['foo', 'bar'], stdin='gam')
2463        self.assertEqual('gam', self.stdin.read())
2464        self.assertTrue(self.stdin is self.factory_stdin)
2465        self.run_bzr(['foo', 'bar'], stdin='zippy')
2466        self.assertEqual('zippy', self.stdin.read())
2467        self.assertTrue(self.stdin is self.factory_stdin)
2468
2469    def test_ui_factory(self):
2470        # each invocation of self.run_bzr should get its
2471        # own UI factory, which is an instance of TestUIFactory,
2472        # with stdin, stdout and stderr attached to the stdin,
2473        # stdout and stderr of the invoked run_bzr
2474        current_factory = breezy.ui.ui_factory
2475        self.run_bzr(['foo'])
2476        self.assertFalse(current_factory is self.factory)
2477        self.assertNotEqual(sys.stdout, self.factory.stdout)
2478        self.assertNotEqual(sys.stderr, self.factory.stderr)
2479        self.assertEqual('foo\n', self.factory.stdout.getvalue())
2480        self.assertEqual('bar\n', self.factory.stderr.getvalue())
2481        self.assertIsInstance(self.factory, tests.TestUIFactory)
2482
2483    def test_working_dir(self):
2484        self.build_tree(['one/', 'two/'])
2485        cwd = osutils.getcwd()
2486
2487        # Default is to work in the current directory
2488        self.run_bzr(['foo', 'bar'])
2489        self.assertEqual(cwd, self.working_dir)
2490
2491        self.run_bzr(['foo', 'bar'], working_dir=None)
2492        self.assertEqual(cwd, self.working_dir)
2493
2494        # The function should be run in the alternative directory
2495        # but afterwards the current working dir shouldn't be changed
2496        self.run_bzr(['foo', 'bar'], working_dir='one')
2497        self.assertNotEqual(cwd, self.working_dir)
2498        self.assertEndsWith(self.working_dir, 'one')
2499        self.assertEqual(cwd, osutils.getcwd())
2500
2501        self.run_bzr(['foo', 'bar'], working_dir='two')
2502        self.assertNotEqual(cwd, self.working_dir)
2503        self.assertEndsWith(self.working_dir, 'two')
2504        self.assertEqual(cwd, osutils.getcwd())
2505
2506
2507class StubProcess(object):
2508    """A stub process for testing run_bzr_subprocess."""
2509
2510    def __init__(self, out="", err="", retcode=0):
2511        self.out = out
2512        self.err = err
2513        self.returncode = retcode
2514
2515    def communicate(self):
2516        return self.out, self.err
2517
2518
2519class TestWithFakedStartBzrSubprocess(tests.TestCaseWithTransport):
2520    """Base class for tests testing how we might run bzr."""
2521
2522    def setUp(self):
2523        super(TestWithFakedStartBzrSubprocess, self).setUp()
2524        self.subprocess_calls = []
2525
2526    def start_bzr_subprocess(self, process_args, env_changes=None,
2527                             skip_if_plan_to_signal=False,
2528                             working_dir=None,
2529                             allow_plugins=False):
2530        """capture what run_bzr_subprocess tries to do."""
2531        self.subprocess_calls.append(
2532            {'process_args': process_args,
2533             'env_changes': env_changes,
2534             'skip_if_plan_to_signal': skip_if_plan_to_signal,
2535             'working_dir': working_dir, 'allow_plugins': allow_plugins})
2536        return self.next_subprocess
2537
2538
2539class TestRunBzrSubprocess(TestWithFakedStartBzrSubprocess):
2540
2541    def assertRunBzrSubprocess(self, expected_args, process, *args, **kwargs):
2542        """Run run_bzr_subprocess with args and kwargs using a stubbed process.
2543
2544        Inside TestRunBzrSubprocessCommands we use a stub start_bzr_subprocess
2545        that will return static results. This assertion method populates those
2546        results and also checks the arguments run_bzr_subprocess generates.
2547        """
2548        self.next_subprocess = process
2549        try:
2550            result = self.run_bzr_subprocess(*args, **kwargs)
2551        except BaseException:
2552            self.next_subprocess = None
2553            for key, expected in expected_args.items():
2554                self.assertEqual(expected, self.subprocess_calls[-1][key])
2555            raise
2556        else:
2557            self.next_subprocess = None
2558            for key, expected in expected_args.items():
2559                self.assertEqual(expected, self.subprocess_calls[-1][key])
2560            return result
2561
2562    def test_run_bzr_subprocess(self):
2563        """The run_bzr_helper_external command behaves nicely."""
2564        self.assertRunBzrSubprocess({'process_args': ['--version']},
2565                                    StubProcess(), '--version')
2566        self.assertRunBzrSubprocess({'process_args': ['--version']},
2567                                    StubProcess(), ['--version'])
2568        # retcode=None disables retcode checking
2569        result = self.assertRunBzrSubprocess(
2570            {}, StubProcess(retcode=3), '--version', retcode=None)
2571        result = self.assertRunBzrSubprocess(
2572            {}, StubProcess(out="is free software"), '--version')
2573        self.assertContainsRe(result[0], 'is free software')
2574        # Running a subcommand that is missing errors
2575        self.assertRaises(AssertionError, self.assertRunBzrSubprocess,
2576                          {'process_args': ['--versionn']
2577                           }, StubProcess(retcode=3),
2578                          '--versionn')
2579        # Unless it is told to expect the error from the subprocess
2580        result = self.assertRunBzrSubprocess(
2581            {}, StubProcess(retcode=3), '--versionn', retcode=3)
2582        # Or to ignore retcode checking
2583        result = self.assertRunBzrSubprocess(
2584            {}, StubProcess(err="unknown command", retcode=3),
2585            '--versionn', retcode=None)
2586        self.assertContainsRe(result[1], 'unknown command')
2587
2588    def test_env_change_passes_through(self):
2589        self.assertRunBzrSubprocess(
2590            {'env_changes': {'new': 'value', 'changed': 'newvalue', 'deleted': None}},
2591            StubProcess(), '',
2592            env_changes={'new': 'value', 'changed': 'newvalue', 'deleted': None})
2593
2594    def test_no_working_dir_passed_as_None(self):
2595        self.assertRunBzrSubprocess({'working_dir': None}, StubProcess(), '')
2596
2597    def test_no_working_dir_passed_through(self):
2598        self.assertRunBzrSubprocess({'working_dir': 'dir'}, StubProcess(), '',
2599                                    working_dir='dir')
2600
2601    def test_run_bzr_subprocess_no_plugins(self):
2602        self.assertRunBzrSubprocess({'allow_plugins': False},
2603                                    StubProcess(), '')
2604
2605    def test_allow_plugins(self):
2606        self.assertRunBzrSubprocess({'allow_plugins': True},
2607                                    StubProcess(), '', allow_plugins=True)
2608
2609
2610class TestFinishBzrSubprocess(TestWithFakedStartBzrSubprocess):
2611
2612    def test_finish_bzr_subprocess_with_error(self):
2613        """finish_bzr_subprocess allows specification of the desired exit code.
2614        """
2615        process = StubProcess(err="unknown command", retcode=3)
2616        result = self.finish_bzr_subprocess(process, retcode=3)
2617        self.assertEqual('', result[0])
2618        self.assertContainsRe(result[1], 'unknown command')
2619
2620    def test_finish_bzr_subprocess_ignoring_retcode(self):
2621        """finish_bzr_subprocess allows the exit code to be ignored."""
2622        process = StubProcess(err="unknown command", retcode=3)
2623        result = self.finish_bzr_subprocess(process, retcode=None)
2624        self.assertEqual('', result[0])
2625        self.assertContainsRe(result[1], 'unknown command')
2626
2627    def test_finish_subprocess_with_unexpected_retcode(self):
2628        """finish_bzr_subprocess raises self.failureException if the retcode is
2629        not the expected one.
2630        """
2631        process = StubProcess(err="unknown command", retcode=3)
2632        self.assertRaises(self.failureException, self.finish_bzr_subprocess,
2633                          process)
2634
2635
2636class _DontSpawnProcess(Exception):
2637    """A simple exception which just allows us to skip unnecessary steps"""
2638
2639
2640class TestStartBzrSubProcess(tests.TestCase):
2641    """Stub test start_bzr_subprocess."""
2642
2643    def _subprocess_log_cleanup(self):
2644        """Inhibits the base version as we don't produce a log file."""
2645
2646    def _popen(self, *args, **kwargs):
2647        """Override the base version to record the command that is run.
2648
2649        From there we can ensure it is correct without spawning a real process.
2650        """
2651        self.check_popen_state()
2652        self._popen_args = args
2653        self._popen_kwargs = kwargs
2654        raise _DontSpawnProcess()
2655
2656    def check_popen_state(self):
2657        """Replace to make assertions when popen is called."""
2658
2659    def test_run_bzr_subprocess_no_plugins(self):
2660        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
2661        command = self._popen_args[0]
2662        self.assertEqual(sys.executable, command[0])
2663        self.assertEqual(self.get_brz_path(), command[1])
2664        self.assertEqual(['--no-plugins'], command[2:])
2665
2666    def test_allow_plugins(self):
2667        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2668                          allow_plugins=True)
2669        command = self._popen_args[0]
2670        self.assertEqual([], command[2:])
2671
2672    def test_set_env(self):
2673        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2674        # set in the child
2675
2676        def check_environment():
2677            self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2678        self.check_popen_state = check_environment
2679        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2680                          env_changes={'EXISTANT_ENV_VAR': 'set variable'})
2681        # not set in theparent
2682        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2683
2684    def test_run_bzr_subprocess_env_del(self):
2685        """run_bzr_subprocess can remove environment variables too."""
2686        self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2687
2688        def check_environment():
2689            self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2690        os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2691        self.check_popen_state = check_environment
2692        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2693                          env_changes={'EXISTANT_ENV_VAR': None})
2694        # Still set in parent
2695        self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2696        del os.environ['EXISTANT_ENV_VAR']
2697
2698    def test_env_del_missing(self):
2699        self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2700
2701        def check_environment():
2702            self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2703        self.check_popen_state = check_environment
2704        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2705                          env_changes={'NON_EXISTANT_ENV_VAR': None})
2706
2707    def test_working_dir(self):
2708        """Test that we can specify the working dir for the child"""
2709        chdirs = []
2710
2711        def chdir(path):
2712            chdirs.append(path)
2713        self.overrideAttr(os, 'chdir', chdir)
2714
2715        def getcwd():
2716            return 'current'
2717        self.overrideAttr(osutils, 'getcwd', getcwd)
2718        self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2719                          working_dir='foo')
2720        self.assertEqual(['foo', 'current'], chdirs)
2721
2722    def test_get_brz_path_with_cwd_breezy(self):
2723        self.get_source_path = lambda: ""
2724        self.overrideAttr(os.path, "isfile", lambda path: True)
2725        self.assertEqual(self.get_brz_path(), "brz")
2726
2727
2728class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport):
2729    """Tests that really need to do things with an external bzr."""
2730
2731    def test_start_and_stop_bzr_subprocess_send_signal(self):
2732        """finish_bzr_subprocess raises self.failureException if the retcode is
2733        not the expected one.
2734        """
2735        self.disable_missing_extensions_warning()
2736        process = self.start_bzr_subprocess(['wait-until-signalled'],
2737                                            skip_if_plan_to_signal=True)
2738        self.assertEqual(b'running\n', process.stdout.readline())
2739        result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT,
2740                                            retcode=3)
2741        self.assertEqual(b'', result[0])
2742        self.assertEqual(b'brz: interrupted\n', result[1])
2743
2744
2745class TestSelftestFiltering(tests.TestCase):
2746
2747    def setUp(self):
2748        super(TestSelftestFiltering, self).setUp()
2749        self.suite = TestUtil.TestSuite()
2750        self.loader = TestUtil.TestLoader()
2751        self.suite.addTest(self.loader.loadTestsFromModule(
2752            sys.modules['breezy.tests.test_selftest']))
2753        self.all_names = _test_ids(self.suite)
2754
2755    def test_condition_id_re(self):
2756        test_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2757                     'test_condition_id_re')
2758        filtered_suite = tests.filter_suite_by_condition(
2759            self.suite, tests.condition_id_re('test_condition_id_re'))
2760        self.assertEqual([test_name], _test_ids(filtered_suite))
2761
2762    def test_condition_id_in_list(self):
2763        test_names = ['breezy.tests.test_selftest.TestSelftestFiltering.'
2764                      'test_condition_id_in_list']
2765        id_list = tests.TestIdList(test_names)
2766        filtered_suite = tests.filter_suite_by_condition(
2767            self.suite, tests.condition_id_in_list(id_list))
2768        my_pattern = 'TestSelftestFiltering.*test_condition_id_in_list'
2769        re_filtered = tests.filter_suite_by_re(self.suite, my_pattern)
2770        self.assertEqual(_test_ids(re_filtered), _test_ids(filtered_suite))
2771
2772    def test_condition_id_startswith(self):
2773        klass = 'breezy.tests.test_selftest.TestSelftestFiltering.'
2774        start1 = klass + 'test_condition_id_starts'
2775        start2 = klass + 'test_condition_id_in'
2776        test_names = [klass + 'test_condition_id_in_list',
2777                      klass + 'test_condition_id_startswith',
2778                      ]
2779        filtered_suite = tests.filter_suite_by_condition(
2780            self.suite, tests.condition_id_startswith([start1, start2]))
2781        self.assertEqual(test_names, _test_ids(filtered_suite))
2782
2783    def test_condition_isinstance(self):
2784        filtered_suite = tests.filter_suite_by_condition(
2785            self.suite, tests.condition_isinstance(self.__class__))
2786        class_pattern = 'breezy.tests.test_selftest.TestSelftestFiltering.'
2787        re_filtered = tests.filter_suite_by_re(self.suite, class_pattern)
2788        self.assertEqual(_test_ids(re_filtered), _test_ids(filtered_suite))
2789
2790    def test_exclude_tests_by_condition(self):
2791        excluded_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2792                         'test_exclude_tests_by_condition')
2793        filtered_suite = tests.exclude_tests_by_condition(
2794            self.suite, lambda x: x.id() == excluded_name)
2795        self.assertEqual(len(self.all_names) - 1,
2796                         filtered_suite.countTestCases())
2797        self.assertFalse(excluded_name in _test_ids(filtered_suite))
2798        remaining_names = list(self.all_names)
2799        remaining_names.remove(excluded_name)
2800        self.assertEqual(remaining_names, _test_ids(filtered_suite))
2801
2802    def test_exclude_tests_by_re(self):
2803        self.all_names = _test_ids(self.suite)
2804        filtered_suite = tests.exclude_tests_by_re(self.suite,
2805                                                   'exclude_tests_by_re')
2806        excluded_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2807                         'test_exclude_tests_by_re')
2808        self.assertEqual(len(self.all_names) - 1,
2809                         filtered_suite.countTestCases())
2810        self.assertFalse(excluded_name in _test_ids(filtered_suite))
2811        remaining_names = list(self.all_names)
2812        remaining_names.remove(excluded_name)
2813        self.assertEqual(remaining_names, _test_ids(filtered_suite))
2814
2815    def test_filter_suite_by_condition(self):
2816        test_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2817                     'test_filter_suite_by_condition')
2818        filtered_suite = tests.filter_suite_by_condition(
2819            self.suite, lambda x: x.id() == test_name)
2820        self.assertEqual([test_name], _test_ids(filtered_suite))
2821
2822    def test_filter_suite_by_re(self):
2823        filtered_suite = tests.filter_suite_by_re(self.suite,
2824                                                  'test_filter_suite_by_r')
2825        filtered_names = _test_ids(filtered_suite)
2826        self.assertEqual(
2827            filtered_names, ['breezy.tests.test_selftest.'
2828                             'TestSelftestFiltering.test_filter_suite_by_re'])
2829
2830    def test_filter_suite_by_id_list(self):
2831        test_list = ['breezy.tests.test_selftest.'
2832                     'TestSelftestFiltering.test_filter_suite_by_id_list']
2833        filtered_suite = tests.filter_suite_by_id_list(
2834            self.suite, tests.TestIdList(test_list))
2835        filtered_names = _test_ids(filtered_suite)
2836        self.assertEqual(
2837            filtered_names,
2838            ['breezy.tests.test_selftest.'
2839             'TestSelftestFiltering.test_filter_suite_by_id_list'])
2840
2841    def test_filter_suite_by_id_startswith(self):
2842        # By design this test may fail if another test is added whose name also
2843        # begins with one of the start value used.
2844        klass = 'breezy.tests.test_selftest.TestSelftestFiltering.'
2845        start1 = klass + 'test_filter_suite_by_id_starts'
2846        start2 = klass + 'test_filter_suite_by_id_li'
2847        test_list = [klass + 'test_filter_suite_by_id_list',
2848                     klass + 'test_filter_suite_by_id_startswith',
2849                     ]
2850        filtered_suite = tests.filter_suite_by_id_startswith(
2851            self.suite, [start1, start2])
2852        self.assertEqual(
2853            test_list,
2854            _test_ids(filtered_suite),
2855            )
2856
2857    def test_preserve_input(self):
2858        # NB: Surely this is something in the stdlib to do this?
2859        self.assertIs(self.suite, tests.preserve_input(self.suite))
2860        self.assertEqual("@#$", tests.preserve_input("@#$"))
2861
2862    def test_randomize_suite(self):
2863        randomized_suite = tests.randomize_suite(self.suite)
2864        # randomizing should not add or remove test names.
2865        self.assertEqual(set(_test_ids(self.suite)),
2866                         set(_test_ids(randomized_suite)))
2867        # Technically, this *can* fail, because random.shuffle(list) can be
2868        # equal to list. Trying multiple times just pushes the frequency back.
2869        # As its len(self.all_names)!:1, the failure frequency should be low
2870        # enough to ignore. RBC 20071021.
2871        # It should change the order.
2872        self.assertNotEqual(self.all_names, _test_ids(randomized_suite))
2873        # But not the length. (Possibly redundant with the set test, but not
2874        # necessarily.)
2875        self.assertEqual(len(self.all_names), len(_test_ids(randomized_suite)))
2876
2877    def test_split_suit_by_condition(self):
2878        self.all_names = _test_ids(self.suite)
2879        condition = tests.condition_id_re('test_filter_suite_by_r')
2880        split_suite = tests.split_suite_by_condition(self.suite, condition)
2881        filtered_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2882                         'test_filter_suite_by_re')
2883        self.assertEqual([filtered_name], _test_ids(split_suite[0]))
2884        self.assertFalse(filtered_name in _test_ids(split_suite[1]))
2885        remaining_names = list(self.all_names)
2886        remaining_names.remove(filtered_name)
2887        self.assertEqual(remaining_names, _test_ids(split_suite[1]))
2888
2889    def test_split_suit_by_re(self):
2890        self.all_names = _test_ids(self.suite)
2891        split_suite = tests.split_suite_by_re(self.suite,
2892                                              'test_filter_suite_by_r')
2893        filtered_name = ('breezy.tests.test_selftest.TestSelftestFiltering.'
2894                         'test_filter_suite_by_re')
2895        self.assertEqual([filtered_name], _test_ids(split_suite[0]))
2896        self.assertFalse(filtered_name in _test_ids(split_suite[1]))
2897        remaining_names = list(self.all_names)
2898        remaining_names.remove(filtered_name)
2899        self.assertEqual(remaining_names, _test_ids(split_suite[1]))
2900
2901
2902class TestCheckTreeShape(tests.TestCaseWithTransport):
2903
2904    def test_check_tree_shape(self):
2905        files = ['a', 'b/', 'b/c']
2906        tree = self.make_branch_and_tree('.')
2907        self.build_tree(files)
2908        tree.add(files)
2909        tree.lock_read()
2910        try:
2911            self.check_tree_shape(tree, files)
2912        finally:
2913            tree.unlock()
2914
2915
2916class TestBlackboxSupport(tests.TestCase):
2917    """Tests for testsuite blackbox features."""
2918
2919    def test_run_bzr_failure_not_caught(self):
2920        # When we run bzr in blackbox mode, we want any unexpected errors to
2921        # propagate up to the test suite so that it can show the error in the
2922        # usual way, and we won't get a double traceback.
2923        e = self.assertRaises(
2924            AssertionError,
2925            self.run_bzr, ['assert-fail'])
2926        # make sure we got the real thing, not an error from somewhere else in
2927        # the test framework
2928        self.assertEqual('always fails', str(e))
2929        # check that there's no traceback in the test log
2930        self.assertNotContainsRe(self.get_log(), r'Traceback')
2931
2932    def test_run_bzr_user_error_caught(self):
2933        # Running bzr in blackbox mode, normal/expected/user errors should be
2934        # caught in the regular way and turned into an error message plus exit
2935        # code.
2936        transport_server = memory.MemoryServer()
2937        transport_server.start_server()
2938        self.addCleanup(transport_server.stop_server)
2939        url = transport_server.get_url()
2940        self.permit_url(url)
2941        out, err = self.run_bzr(["log", "%s/nonexistantpath" % url], retcode=3)
2942        self.assertEqual(out, '')
2943        self.assertContainsRe(
2944            err, 'brz: ERROR: Not a branch: ".*nonexistantpath/".\n')
2945
2946
2947class TestTestLoader(tests.TestCase):
2948    """Tests for the test loader."""
2949
2950    def _get_loader_and_module(self):
2951        """Gets a TestLoader and a module with one test in it."""
2952        loader = TestUtil.TestLoader()
2953        module = {}
2954
2955        class Stub(tests.TestCase):
2956            def test_foo(self):
2957                pass
2958
2959        class MyModule(object):
2960            pass
2961        MyModule.a_class = Stub
2962        module = MyModule()
2963        module.__name__ = 'fake_module'
2964        return loader, module
2965
2966    def test_module_no_load_tests_attribute_loads_classes(self):
2967        loader, module = self._get_loader_and_module()
2968        self.assertEqual(1, loader.loadTestsFromModule(
2969            module).countTestCases())
2970
2971    def test_module_load_tests_attribute_gets_called(self):
2972        loader, module = self._get_loader_and_module()
2973
2974        def load_tests(loader, standard_tests, pattern):
2975            result = loader.suiteClass()
2976            for test in tests.iter_suite_tests(standard_tests):
2977                result.addTests([test, test])
2978            return result
2979        # add a load_tests() method which multiplies the tests from the module.
2980        module.__class__.load_tests = staticmethod(load_tests)
2981        self.assertEqual(
2982            2 * [str(module.a_class('test_foo'))],
2983            list(map(str, loader.loadTestsFromModule(module))))
2984
2985    def test_load_tests_from_module_name_smoke_test(self):
2986        loader = TestUtil.TestLoader()
2987        suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
2988        self.assertEqual(['breezy.tests.test_sampler.DemoTest.test_nothing'],
2989                         _test_ids(suite))
2990
2991    def test_load_tests_from_module_name_with_bogus_module_name(self):
2992        loader = TestUtil.TestLoader()
2993        self.assertRaises(ImportError, loader.loadTestsFromModuleName, 'bogus')
2994
2995
2996class TestTestIdList(tests.TestCase):
2997
2998    def _create_id_list(self, test_list):
2999        return tests.TestIdList(test_list)
3000
3001    def _create_suite(self, test_id_list):
3002
3003        class Stub(tests.TestCase):
3004            def test_foo(self):
3005                pass
3006
3007        def _create_test_id(id):
3008            return lambda: id
3009
3010        suite = TestUtil.TestSuite()
3011        for id in test_id_list:
3012            t = Stub('test_foo')
3013            t.id = _create_test_id(id)
3014            suite.addTest(t)
3015        return suite
3016
3017    def _test_ids(self, test_suite):
3018        """Get the ids for the tests in a test suite."""
3019        return [t.id() for t in tests.iter_suite_tests(test_suite)]
3020
3021    def test_empty_list(self):
3022        id_list = self._create_id_list([])
3023        self.assertEqual({}, id_list.tests)
3024        self.assertEqual({}, id_list.modules)
3025
3026    def test_valid_list(self):
3027        id_list = self._create_id_list(
3028            ['mod1.cl1.meth1', 'mod1.cl1.meth2',
3029             'mod1.func1', 'mod1.cl2.meth2',
3030             'mod1.submod1',
3031             'mod1.submod2.cl1.meth1', 'mod1.submod2.cl2.meth2',
3032             ])
3033        self.assertTrue(id_list.refers_to('mod1'))
3034        self.assertTrue(id_list.refers_to('mod1.submod1'))
3035        self.assertTrue(id_list.refers_to('mod1.submod2'))
3036        self.assertTrue(id_list.includes('mod1.cl1.meth1'))
3037        self.assertTrue(id_list.includes('mod1.submod1'))
3038        self.assertTrue(id_list.includes('mod1.func1'))
3039
3040    def test_bad_chars_in_params(self):
3041        id_list = self._create_id_list(['mod1.cl1.meth1(xx.yy)'])
3042        self.assertTrue(id_list.refers_to('mod1'))
3043        self.assertTrue(id_list.includes('mod1.cl1.meth1(xx.yy)'))
3044
3045    def test_module_used(self):
3046        id_list = self._create_id_list(['mod.class.meth'])
3047        self.assertTrue(id_list.refers_to('mod'))
3048        self.assertTrue(id_list.refers_to('mod.class'))
3049        self.assertTrue(id_list.refers_to('mod.class.meth'))
3050
3051    def test_test_suite_matches_id_list_with_unknown(self):
3052        loader = TestUtil.TestLoader()
3053        suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
3054        test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing',
3055                     'bogus']
3056        not_found, duplicates = tests.suite_matches_id_list(suite, test_list)
3057        self.assertEqual(['bogus'], not_found)
3058        self.assertEqual([], duplicates)
3059
3060    def test_suite_matches_id_list_with_duplicates(self):
3061        loader = TestUtil.TestLoader()
3062        suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
3063        dupes = loader.suiteClass()
3064        for test in tests.iter_suite_tests(suite):
3065            dupes.addTest(test)
3066            dupes.addTest(test)  # Add it again
3067
3068        test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing', ]
3069        not_found, duplicates = tests.suite_matches_id_list(
3070            dupes, test_list)
3071        self.assertEqual([], not_found)
3072        self.assertEqual(['breezy.tests.test_sampler.DemoTest.test_nothing'],
3073                         duplicates)
3074
3075
3076class TestTestSuite(tests.TestCase):
3077
3078    def test__test_suite_testmod_names(self):
3079        # Test that a plausible list of test module names are returned
3080        # by _test_suite_testmod_names.
3081        test_list = tests._test_suite_testmod_names()
3082        self.assertSubset([
3083            'breezy.tests.blackbox',
3084            'breezy.tests.per_transport',
3085            'breezy.tests.test_selftest',
3086            ],
3087            test_list)
3088
3089    def test__test_suite_modules_to_doctest(self):
3090        # Test that a plausible list of modules to doctest is returned
3091        # by _test_suite_modules_to_doctest.
3092        test_list = tests._test_suite_modules_to_doctest()
3093        if __doc__ is None:
3094            # When docstrings are stripped, there are no modules to doctest
3095            self.assertEqual([], test_list)
3096            return
3097        self.assertSubset([
3098            'breezy.timestamp',
3099            ],
3100            test_list)
3101
3102    def test_test_suite(self):
3103        # test_suite() loads the entire test suite to operate. To avoid this
3104        # overhead, and yet still be confident that things are happening,
3105        # we temporarily replace two functions used by test_suite with
3106        # test doubles that supply a few sample tests to load, and check they
3107        # are loaded.
3108        calls = []
3109
3110        def testmod_names():
3111            calls.append("testmod_names")
3112            return [
3113                'breezy.tests.blackbox.test_branch',
3114                'breezy.tests.per_transport',
3115                'breezy.tests.test_selftest',
3116                ]
3117        self.overrideAttr(tests, '_test_suite_testmod_names', testmod_names)
3118
3119        def doctests():
3120            calls.append("modules_to_doctest")
3121            if __doc__ is None:
3122                return []
3123            return ['breezy.timestamp']
3124        self.overrideAttr(tests, '_test_suite_modules_to_doctest', doctests)
3125        expected_test_list = [
3126            # testmod_names
3127            'breezy.tests.blackbox.test_branch.TestBranch.test_branch',
3128            ('breezy.tests.per_transport.TransportTests'
3129             '.test_abspath(LocalTransport,LocalURLServer)'),
3130            'breezy.tests.test_selftest.TestTestSuite.test_test_suite',
3131            # plugins can't be tested that way since selftest may be run with
3132            # --no-plugins
3133            ]
3134        suite = tests.test_suite()
3135        self.assertEqual({"testmod_names", "modules_to_doctest"}, set(calls))
3136        self.assertSubset(expected_test_list, _test_ids(suite))
3137
3138    def test_test_suite_list_and_start(self):
3139        # We cannot test this at the same time as the main load, because we
3140        # want to know that starting_with == None works. So a second load is
3141        # incurred - note that the starting_with parameter causes a partial
3142        # load rather than a full load so this test should be pretty quick.
3143        test_list = [
3144            'breezy.tests.test_selftest.TestTestSuite.test_test_suite']
3145        suite = tests.test_suite(test_list,
3146                                 ['breezy.tests.test_selftest.TestTestSuite'])
3147        # test_test_suite_list_and_start is not included
3148        self.assertEqual(test_list, _test_ids(suite))
3149
3150
3151class TestLoadTestIdList(tests.TestCaseInTempDir):
3152
3153    def _create_test_list_file(self, file_name, content):
3154        fl = open(file_name, 'wt')
3155        fl.write(content)
3156        fl.close()
3157
3158    def test_load_unknown(self):
3159        self.assertRaises(errors.NoSuchFile,
3160                          tests.load_test_id_list, 'i_do_not_exist')
3161
3162    def test_load_test_list(self):
3163        test_list_fname = 'test.list'
3164        self._create_test_list_file(test_list_fname,
3165                                    'mod1.cl1.meth1\nmod2.cl2.meth2\n')
3166        tlist = tests.load_test_id_list(test_list_fname)
3167        self.assertEqual(2, len(tlist))
3168        self.assertEqual('mod1.cl1.meth1', tlist[0])
3169        self.assertEqual('mod2.cl2.meth2', tlist[1])
3170
3171    def test_load_dirty_file(self):
3172        test_list_fname = 'test.list'
3173        self._create_test_list_file(test_list_fname,
3174                                    '  mod1.cl1.meth1\n\nmod2.cl2.meth2  \n'
3175                                    'bar baz\n')
3176        tlist = tests.load_test_id_list(test_list_fname)
3177        self.assertEqual(4, len(tlist))
3178        self.assertEqual('mod1.cl1.meth1', tlist[0])
3179        self.assertEqual('', tlist[1])
3180        self.assertEqual('mod2.cl2.meth2', tlist[2])
3181        self.assertEqual('bar baz', tlist[3])
3182
3183
3184class TestFilteredByModuleTestLoader(tests.TestCase):
3185
3186    def _create_loader(self, test_list):
3187        id_filter = tests.TestIdList(test_list)
3188        loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to)
3189        return loader
3190
3191    def test_load_tests(self):
3192        test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing']
3193        loader = self._create_loader(test_list)
3194        suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
3195        self.assertEqual(test_list, _test_ids(suite))
3196
3197    def test_exclude_tests(self):
3198        test_list = ['bogus']
3199        loader = self._create_loader(test_list)
3200        suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
3201        self.assertEqual([], _test_ids(suite))
3202
3203
3204class TestFilteredByNameStartTestLoader(tests.TestCase):
3205
3206    def _create_loader(self, name_start):
3207        def needs_module(name):
3208            return name.startswith(name_start) or name_start.startswith(name)
3209        loader = TestUtil.FilteredByModuleTestLoader(needs_module)
3210        return loader
3211
3212    def test_load_tests(self):
3213        test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing']
3214        loader = self._create_loader('breezy.tests.test_samp')
3215
3216        suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
3217        self.assertEqual(test_list, _test_ids(suite))
3218
3219    def test_load_tests_inside_module(self):
3220        test_list = ['breezy.tests.test_sampler.DemoTest.test_nothing']
3221        loader = self._create_loader('breezy.tests.test_sampler.Demo')
3222
3223        suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
3224        self.assertEqual(test_list, _test_ids(suite))
3225
3226    def test_exclude_tests(self):
3227        loader = self._create_loader('bogus')
3228
3229        suite = loader.loadTestsFromModuleName('breezy.tests.test_sampler')
3230        self.assertEqual([], _test_ids(suite))
3231
3232
3233class TestTestPrefixRegistry(tests.TestCase):
3234
3235    def _get_registry(self):
3236        tp_registry = tests.TestPrefixAliasRegistry()
3237        return tp_registry
3238
3239    def test_register_new_prefix(self):
3240        tpr = self._get_registry()
3241        tpr.register('foo', 'fff.ooo.ooo')
3242        self.assertEqual('fff.ooo.ooo', tpr.get('foo'))
3243
3244    def test_register_existing_prefix(self):
3245        tpr = self._get_registry()
3246        tpr.register('bar', 'bbb.aaa.rrr')
3247        tpr.register('bar', 'bBB.aAA.rRR')
3248        self.assertEqual('bbb.aaa.rrr', tpr.get('bar'))
3249        self.assertThat(self.get_log(),
3250                        DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR",
3251                                       doctest.ELLIPSIS))
3252
3253    def test_get_unknown_prefix(self):
3254        tpr = self._get_registry()
3255        self.assertRaises(KeyError, tpr.get, 'I am not a prefix')
3256
3257    def test_resolve_prefix(self):
3258        tpr = self._get_registry()
3259        tpr.register('bar', 'bb.aa.rr')
3260        self.assertEqual('bb.aa.rr', tpr.resolve_alias('bar'))
3261
3262    def test_resolve_unknown_alias(self):
3263        tpr = self._get_registry()
3264        self.assertRaises(errors.CommandError,
3265                          tpr.resolve_alias, 'I am not a prefix')
3266
3267    def test_predefined_prefixes(self):
3268        tpr = tests.test_prefix_alias_registry
3269        self.assertEqual('breezy', tpr.resolve_alias('breezy'))
3270        self.assertEqual('breezy.doc', tpr.resolve_alias('bd'))
3271        self.assertEqual('breezy.utils', tpr.resolve_alias('bu'))
3272        self.assertEqual('breezy.tests', tpr.resolve_alias('bt'))
3273        self.assertEqual('breezy.tests.blackbox', tpr.resolve_alias('bb'))
3274        self.assertEqual('breezy.plugins', tpr.resolve_alias('bp'))
3275
3276
3277class TestThreadLeakDetection(tests.TestCase):
3278    """Ensure when tests leak threads we detect and report it"""
3279
3280    class LeakRecordingResult(tests.ExtendedTestResult):
3281        def __init__(self):
3282            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3283            self.leaks = []
3284
3285        def _report_thread_leak(self, test, leaks, alive):
3286            self.leaks.append((test, leaks))
3287
3288    def test_testcase_without_addCleanups(self):
3289        """Check old TestCase instances don't break with leak detection"""
3290        class Test(unittest.TestCase):
3291            def runTest(self):
3292                pass
3293        result = self.LeakRecordingResult()
3294        test = Test()
3295        result.startTestRun()
3296        test.run(result)
3297        result.stopTestRun()
3298        self.assertEqual(result._tests_leaking_threads_count, 0)
3299        self.assertEqual(result.leaks, [])
3300
3301    def test_thread_leak(self):
3302        """Ensure a thread that outlives the running of a test is reported
3303
3304        Uses a thread that blocks on an event, and is started by the inner
3305        test case. As the thread outlives the inner case's run, it should be
3306        detected as a leak, but the event is then set so that the thread can
3307        be safely joined in cleanup so it's not leaked for real.
3308        """
3309        event = threading.Event()
3310        thread = threading.Thread(name="Leaker", target=event.wait)
3311
3312        class Test(tests.TestCase):
3313            def test_leak(self):
3314                thread.start()
3315        result = self.LeakRecordingResult()
3316        test = Test("test_leak")
3317        self.addCleanup(thread.join)
3318        self.addCleanup(event.set)
3319        result.startTestRun()
3320        test.run(result)
3321        result.stopTestRun()
3322        self.assertEqual(result._tests_leaking_threads_count, 1)
3323        self.assertEqual(result._first_thread_leaker_id, test.id())
3324        self.assertEqual(result.leaks, [(test, {thread})])
3325        self.assertContainsString(result.stream.getvalue(), "leaking threads")
3326
3327    def test_multiple_leaks(self):
3328        """Check multiple leaks are blamed on the test cases at fault
3329
3330        Same concept as the previous test, but has one inner test method that
3331        leaks two threads, and one that doesn't leak at all.
3332        """
3333        event = threading.Event()
3334        thread_a = threading.Thread(name="LeakerA", target=event.wait)
3335        thread_b = threading.Thread(name="LeakerB", target=event.wait)
3336        thread_c = threading.Thread(name="LeakerC", target=event.wait)
3337
3338        class Test(tests.TestCase):
3339            def test_first_leak(self):
3340                thread_b.start()
3341
3342            def test_second_no_leak(self):
3343                pass
3344
3345            def test_third_leak(self):
3346                thread_c.start()
3347                thread_a.start()
3348        result = self.LeakRecordingResult()
3349        first_test = Test("test_first_leak")
3350        third_test = Test("test_third_leak")
3351        self.addCleanup(thread_a.join)
3352        self.addCleanup(thread_b.join)
3353        self.addCleanup(thread_c.join)
3354        self.addCleanup(event.set)
3355        result.startTestRun()
3356        unittest.TestSuite(
3357            [first_test, Test("test_second_no_leak"), third_test]
3358            ).run(result)
3359        result.stopTestRun()
3360        self.assertEqual(result._tests_leaking_threads_count, 2)
3361        self.assertEqual(result._first_thread_leaker_id, first_test.id())
3362        self.assertEqual(result.leaks, [
3363            (first_test, {thread_b}),
3364            (third_test, {thread_a, thread_c})])
3365        self.assertContainsString(result.stream.getvalue(), "leaking threads")
3366
3367
3368class TestPostMortemDebugging(tests.TestCase):
3369    """Check post mortem debugging works when tests fail or error"""
3370
3371    class TracebackRecordingResult(tests.ExtendedTestResult):
3372        def __init__(self):
3373            tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1)
3374            self.postcode = None
3375
3376        def _post_mortem(self, tb=None):
3377            """Record the code object at the end of the current traceback"""
3378            tb = tb or sys.exc_info()[2]
3379            if tb is not None:
3380                next = tb.tb_next
3381                while next is not None:
3382                    tb = next
3383                    next = next.tb_next
3384                self.postcode = tb.tb_frame.f_code
3385
3386        def report_error(self, test, err):
3387            pass
3388
3389        def report_failure(self, test, err):
3390            pass
3391
3392    def test_location_unittest_error(self):
3393        """Needs right post mortem traceback with erroring unittest case"""
3394        class Test(unittest.TestCase):
3395            def runTest(self):
3396                raise RuntimeError
3397        result = self.TracebackRecordingResult()
3398        Test().run(result)
3399        self.assertEqual(result.postcode, Test.runTest.__code__)
3400
3401    def test_location_unittest_failure(self):
3402        """Needs right post mortem traceback with failing unittest case"""
3403        class Test(unittest.TestCase):
3404            def runTest(self):
3405                raise self.failureException
3406        result = self.TracebackRecordingResult()
3407        Test().run(result)
3408        self.assertEqual(result.postcode, Test.runTest.__code__)
3409
3410    def test_location_bt_error(self):
3411        """Needs right post mortem traceback with erroring breezy.tests case"""
3412        class Test(tests.TestCase):
3413            def test_error(self):
3414                raise RuntimeError
3415        result = self.TracebackRecordingResult()
3416        Test("test_error").run(result)
3417        self.assertEqual(result.postcode, Test.test_error.__code__)
3418
3419    def test_location_bt_failure(self):
3420        """Needs right post mortem traceback with failing breezy.tests case"""
3421        class Test(tests.TestCase):
3422            def test_failure(self):
3423                raise self.failureException
3424        result = self.TracebackRecordingResult()
3425        Test("test_failure").run(result)
3426        self.assertEqual(result.postcode, Test.test_failure.__code__)
3427
3428    def test_env_var_triggers_post_mortem(self):
3429        """Check pdb.post_mortem is called iff BRZ_TEST_PDB is set"""
3430        import pdb
3431        result = tests.ExtendedTestResult(StringIO(), 0, 1)
3432        post_mortem_calls = []
3433        self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append)
3434        self.overrideEnv('BRZ_TEST_PDB', None)
3435        result._post_mortem(1)
3436        self.overrideEnv('BRZ_TEST_PDB', 'on')
3437        result._post_mortem(2)
3438        self.assertEqual([2], post_mortem_calls)
3439
3440
3441class TestRunSuite(tests.TestCase):
3442
3443    def test_runner_class(self):
3444        """run_suite accepts and uses a runner_class keyword argument."""
3445        class Stub(tests.TestCase):
3446            def test_foo(self):
3447                pass
3448        suite = Stub("test_foo")
3449        calls = []
3450
3451        class MyRunner(tests.TextTestRunner):
3452            def run(self, test):
3453                calls.append(test)
3454                return tests.ExtendedTestResult(self.stream, self.descriptions,
3455                                                self.verbosity)
3456        tests.run_suite(suite, runner_class=MyRunner, stream=StringIO())
3457        self.assertLength(1, calls)
3458
3459
3460class _Selftest(object):
3461    """Mixin for tests needing full selftest output"""
3462
3463    def _inject_stream_into_subunit(self, stream):
3464        """To be overridden by subclasses that run tests out of process"""
3465
3466    def _run_selftest(self, **kwargs):
3467        bio = BytesIO()
3468        sio = TextIOWrapper(bio, 'utf-8')
3469        self._inject_stream_into_subunit(bio)
3470        tests.selftest(stream=sio, stop_on_failure=False, **kwargs)
3471        sio.flush()
3472        return bio.getvalue()
3473
3474
3475class _ForkedSelftest(_Selftest):
3476    """Mixin for tests needing full selftest output with forked children"""
3477
3478    _test_needs_features = [features.subunit]
3479
3480    def _inject_stream_into_subunit(self, stream):
3481        """Monkey-patch subunit so the extra output goes to stream not stdout
3482
3483        Some APIs need rewriting so this kind of bogus hackery can be replaced
3484        by passing the stream param from run_tests down into ProtocolTestCase.
3485        """
3486        from subunit import ProtocolTestCase
3487        _original_init = ProtocolTestCase.__init__
3488
3489        def _init_with_passthrough(self, *args, **kwargs):
3490            _original_init(self, *args, **kwargs)
3491            self._passthrough = stream
3492        self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough)
3493
3494    def _run_selftest(self, **kwargs):
3495        # GZ 2011-05-26: Add a PosixSystem feature so this check can go away
3496        if getattr(os, "fork", None) is None:
3497            raise tests.TestNotApplicable("Platform doesn't support forking")
3498        # Make sure the fork code is actually invoked by claiming two cores
3499        self.overrideAttr(osutils, "local_concurrency", lambda: 2)
3500        kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator)
3501        return super(_ForkedSelftest, self)._run_selftest(**kwargs)
3502
3503
3504class TestParallelFork(_ForkedSelftest, tests.TestCase):
3505    """Check operation of --parallel=fork selftest option"""
3506
3507    def test_error_in_child_during_fork(self):
3508        """Error in a forked child during test setup should get reported"""
3509        class Test(tests.TestCase):
3510            def testMethod(self):
3511                pass
3512        # We don't care what, just break something that a child will run
3513        self.overrideAttr(tests, "workaround_zealous_crypto_random", None)
3514        out = self._run_selftest(test_suite_factory=Test)
3515        # Lines from the tracebacks of the two child processes may be mixed
3516        # together due to the way subunit parses and forwards the streams,
3517        # so permit extra lines between each part of the error output.
3518        self.assertContainsRe(out,
3519                              b"Traceback.*:\n"
3520                              b"(?:.*\n)*"
3521                              b".+ in fork_for_tests\n"
3522                              b"(?:.*\n)*"
3523                              b"\\s*workaround_zealous_crypto_random\\(\\)\n"
3524                              b"(?:.*\n)*"
3525                              b"TypeError:")
3526
3527
3528class TestUncollectedWarnings(_Selftest, tests.TestCase):
3529    """Check a test case still alive after being run emits a warning"""
3530
3531    class Test(tests.TestCase):
3532        def test_pass(self):
3533            pass
3534
3535        def test_self_ref(self):
3536            self.also_self = self.test_self_ref
3537
3538        def test_skip(self):
3539            self.skipTest("Don't need")
3540
3541    def _get_suite(self):
3542        return TestUtil.TestSuite([
3543            self.Test("test_pass"),
3544            self.Test("test_self_ref"),
3545            self.Test("test_skip"),
3546            ])
3547
3548    def _run_selftest_with_suite(self, **kwargs):
3549        old_flags = tests.selftest_debug_flags
3550        tests.selftest_debug_flags = old_flags.union(["uncollected_cases"])
3551        gc_on = gc.isenabled()
3552        if gc_on:
3553            gc.disable()
3554        try:
3555            output = self._run_selftest(test_suite_factory=self._get_suite,
3556                                        **kwargs)
3557        finally:
3558            if gc_on:
3559                gc.enable()
3560            tests.selftest_debug_flags = old_flags
3561        self.assertNotContainsRe(output, b"Uncollected test case.*test_pass")
3562        self.assertContainsRe(output, b"Uncollected test case.*test_self_ref")
3563        return output
3564
3565    def test_testsuite(self):
3566        self._run_selftest_with_suite()
3567
3568    def test_pattern(self):
3569        out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$")
3570        self.assertNotContainsRe(out, b"test_skip")
3571
3572    def test_exclude_pattern(self):
3573        out = self._run_selftest_with_suite(exclude_pattern="test_skip$")
3574        self.assertNotContainsRe(out, b"test_skip")
3575
3576    def test_random_seed(self):
3577        self._run_selftest_with_suite(random_seed="now")
3578
3579    def test_matching_tests_first(self):
3580        self._run_selftest_with_suite(matching_tests_first=True,
3581                                      pattern="test_self_ref$")
3582
3583    def test_starting_with_and_exclude(self):
3584        out = self._run_selftest_with_suite(starting_with=["bt."],
3585                                            exclude_pattern="test_skip$")
3586        self.assertNotContainsRe(out, b"test_skip")
3587
3588    def test_additonal_decorator(self):
3589        self._run_selftest_with_suite(suite_decorators=[tests.TestDecorator])
3590
3591
3592class TestUncollectedWarningsSubunit(TestUncollectedWarnings):
3593    """Check warnings from tests staying alive are emitted with subunit"""
3594
3595    _test_needs_features = [features.subunit]
3596
3597    def _run_selftest_with_suite(self, **kwargs):
3598        return TestUncollectedWarnings._run_selftest_with_suite(
3599            self, runner_class=tests.SubUnitBzrRunnerv1, **kwargs)
3600
3601
3602class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings):
3603    """Check warnings from tests staying alive are emitted when forking"""
3604
3605
3606class TestEnvironHandling(tests.TestCase):
3607
3608    def test_overrideEnv_None_called_twice_doesnt_leak(self):
3609        self.assertFalse('MYVAR' in os.environ)
3610        self.overrideEnv('MYVAR', '42')
3611        # We use an embedded test to make sure we fix the _captureVar bug
3612
3613        class Test(tests.TestCase):
3614            def test_me(self):
3615                # The first call save the 42 value
3616                self.overrideEnv('MYVAR', None)
3617                self.assertEqual(None, os.environ.get('MYVAR'))
3618                # Make sure we can call it twice
3619                self.overrideEnv('MYVAR', None)
3620                self.assertEqual(None, os.environ.get('MYVAR'))
3621        output = StringIO()
3622        result = tests.TextTestResult(output, 0, 1)
3623        Test('test_me').run(result)
3624        if not result.wasStrictlySuccessful():
3625            self.fail(output.getvalue())
3626        # We get our value back
3627        self.assertEqual('42', os.environ.get('MYVAR'))
3628
3629
3630class TestIsolatedEnv(tests.TestCase):
3631    """Test isolating tests from os.environ.
3632
3633    Since we use tests that are already isolated from os.environ a bit of care
3634    should be taken when designing the tests to avoid bootstrap side-effects.
3635    The tests start an already clean os.environ which allow doing valid
3636    assertions about which variables are present or not and design tests around
3637    these assertions.
3638    """
3639
3640    class ScratchMonkey(tests.TestCase):
3641
3642        def test_me(self):
3643            pass
3644
3645    def test_basics(self):
3646        # Make sure we know the definition of BRZ_HOME: not part of os.environ
3647        # for tests.TestCase.
3648        self.assertTrue('BRZ_HOME' in tests.isolated_environ)
3649        self.assertEqual(None, tests.isolated_environ['BRZ_HOME'])
3650        # Being part of isolated_environ, BRZ_HOME should not appear here
3651        self.assertFalse('BRZ_HOME' in os.environ)
3652        # Make sure we know the definition of LINES: part of os.environ for
3653        # tests.TestCase
3654        self.assertTrue('LINES' in tests.isolated_environ)
3655        self.assertEqual('25', tests.isolated_environ['LINES'])
3656        self.assertEqual('25', os.environ['LINES'])
3657
3658    def test_injecting_unknown_variable(self):
3659        # BRZ_HOME is known to be absent from os.environ
3660        test = self.ScratchMonkey('test_me')
3661        tests.override_os_environ(test, {'BRZ_HOME': 'foo'})
3662        self.assertEqual('foo', os.environ['BRZ_HOME'])
3663        tests.restore_os_environ(test)
3664        self.assertFalse('BRZ_HOME' in os.environ)
3665
3666    def test_injecting_known_variable(self):
3667        test = self.ScratchMonkey('test_me')
3668        # LINES is known to be present in os.environ
3669        tests.override_os_environ(test, {'LINES': '42'})
3670        self.assertEqual('42', os.environ['LINES'])
3671        tests.restore_os_environ(test)
3672        self.assertEqual('25', os.environ['LINES'])
3673
3674    def test_deleting_variable(self):
3675        test = self.ScratchMonkey('test_me')
3676        # LINES is known to be present in os.environ
3677        tests.override_os_environ(test, {'LINES': None})
3678        self.assertTrue('LINES' not in os.environ)
3679        tests.restore_os_environ(test)
3680        self.assertEqual('25', os.environ['LINES'])
3681
3682
3683class TestDocTestSuiteIsolation(tests.TestCase):
3684    """Test that `tests.DocTestSuite` isolates doc tests from os.environ.
3685
3686    Since tests.TestCase alreay provides an isolation from os.environ, we use
3687    the clean environment as a base for testing. To precisely capture the
3688    isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to
3689    compare against.
3690
3691    We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`,
3692    not `os.environ` so each test overrides it to suit its needs.
3693
3694    """
3695
3696    def get_doctest_suite_for_string(self, klass, string):
3697        class Finder(doctest.DocTestFinder):
3698
3699            def find(*args, **kwargs):
3700                test = doctest.DocTestParser().get_doctest(
3701                    string, {}, 'foo', 'foo.py', 0)
3702                return [test]
3703
3704        suite = klass(test_finder=Finder())
3705        return suite
3706
3707    def run_doctest_suite_for_string(self, klass, string):
3708        suite = self.get_doctest_suite_for_string(klass, string)
3709        output = StringIO()
3710        result = tests.TextTestResult(output, 0, 1)
3711        suite.run(result)
3712        return result, output
3713
3714    def assertDocTestStringSucceds(self, klass, string):
3715        result, output = self.run_doctest_suite_for_string(klass, string)
3716        if not result.wasStrictlySuccessful():
3717            self.fail(output.getvalue())
3718
3719    def assertDocTestStringFails(self, klass, string):
3720        result, output = self.run_doctest_suite_for_string(klass, string)
3721        if result.wasStrictlySuccessful():
3722            self.fail(output.getvalue())
3723
3724    def test_injected_variable(self):
3725        self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'})
3726        test = """
3727            >>> import os
3728            >>> os.environ['LINES']
3729            '42'
3730            """
3731        # doctest.DocTestSuite fails as it sees '25'
3732        self.assertDocTestStringFails(doctest.DocTestSuite, test)
3733        # tests.DocTestSuite sees '42'
3734        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3735
3736    def test_deleted_variable(self):
3737        self.overrideAttr(tests, 'isolated_environ', {'LINES': None})
3738        test = """
3739            >>> import os
3740            >>> os.environ.get('LINES')
3741            """
3742        # doctest.DocTestSuite fails as it sees '25'
3743        self.assertDocTestStringFails(doctest.DocTestSuite, test)
3744        # tests.DocTestSuite sees None
3745        self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test)
3746
3747
3748class TestSelftestExcludePatterns(tests.TestCase):
3749
3750    def setUp(self):
3751        super(TestSelftestExcludePatterns, self).setUp()
3752        self.overrideAttr(tests, 'test_suite', self.suite_factory)
3753
3754    def suite_factory(self, keep_only=None, starting_with=None):
3755        """A test suite factory with only a few tests."""
3756        class Test(tests.TestCase):
3757            def id(self):
3758                # We don't need the full class path
3759                return self._testMethodName
3760
3761            def a(self):
3762                pass
3763
3764            def b(self):
3765                pass
3766
3767            def c(self):
3768                pass
3769        return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
3770
3771    def assertTestList(self, expected, *selftest_args):
3772        # We rely on setUp installing the right test suite factory so we can
3773        # test at the command level without loading the whole test suite
3774        out, err = self.run_bzr(('selftest', '--list') + selftest_args)
3775        actual = out.splitlines()
3776        self.assertEqual(expected, actual)
3777
3778    def test_full_list(self):
3779        self.assertTestList(['a', 'b', 'c'])
3780
3781    def test_single_exclude(self):
3782        self.assertTestList(['b', 'c'], '-x', 'a')
3783
3784    def test_mutiple_excludes(self):
3785        self.assertTestList(['c'], '-x', 'a', '-x', 'b')
3786
3787
3788class TestCounterHooks(tests.TestCase, SelfTestHelper):
3789
3790    _test_needs_features = [features.subunit]
3791
3792    def setUp(self):
3793        super(TestCounterHooks, self).setUp()
3794
3795        class Test(tests.TestCase):
3796
3797            def setUp(self):
3798                super(Test, self).setUp()
3799                self.hooks = hooks.Hooks()
3800                self.hooks.add_hook('myhook', 'Foo bar blah', (2, 4))
3801                self.install_counter_hook(self.hooks, 'myhook')
3802
3803            def no_hook(self):
3804                pass
3805
3806            def run_hook_once(self):
3807                for hook in self.hooks['myhook']:
3808                    hook(self)
3809
3810        self.test_class = Test
3811
3812    def assertHookCalls(self, expected_calls, test_name):
3813        test = self.test_class(test_name)
3814        result = unittest.TestResult()
3815        test.run(result)
3816        self.assertTrue(hasattr(test, '_counters'))
3817        self.assertTrue('myhook' in test._counters)
3818        self.assertEqual(expected_calls, test._counters['myhook'])
3819
3820    def test_no_hook(self):
3821        self.assertHookCalls(0, 'no_hook')
3822
3823    def test_run_hook_once(self):
3824        tt = features.testtools
3825        if tt.module.__version__ < (0, 9, 8):
3826            raise tests.TestSkipped('testtools-0.9.8 required for addDetail')
3827        self.assertHookCalls(1, 'run_hook_once')
3828