1# 2# subunit: extensions to python unittest to get test results from subprocesses. 3# Copyright (C) 2011 Robert Collins <robertc@robertcollins.net> 4# 5# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause 6# license at the users choice. A copy of both licenses are available in the 7# project source as Apache-2.0 and BSD. You may not use this file except in 8# compliance with one of these two licences. 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# license you chose for the specific language governing permissions and 14# limitations under that license. 15# 16 17import io 18import unittest 19 20from testtools import PlaceHolder, TestCase 21from testtools.compat import _b 22from testtools.matchers import StartsWith 23from testtools.testresult.doubles import StreamResult 24 25import subunit 26from subunit import run 27from subunit.run import SubunitTestRunner 28 29 30class TestSubunitTestRunner(TestCase): 31 32 def test_includes_timing_output(self): 33 bytestream = io.BytesIO() 34 runner = SubunitTestRunner(stream=bytestream) 35 test = PlaceHolder('name') 36 runner.run(test) 37 bytestream.seek(0) 38 eventstream = StreamResult() 39 subunit.ByteStreamToStreamResult(bytestream).run(eventstream) 40 timestamps = [event[-1] for event in eventstream._events 41 if event is not None] 42 self.assertNotEqual([], timestamps) 43 44 def test_enumerates_tests_before_run(self): 45 bytestream = io.BytesIO() 46 runner = SubunitTestRunner(stream=bytestream) 47 test1 = PlaceHolder('name1') 48 test2 = PlaceHolder('name2') 49 case = unittest.TestSuite([test1, test2]) 50 runner.run(case) 51 bytestream.seek(0) 52 eventstream = StreamResult() 53 subunit.ByteStreamToStreamResult(bytestream).run(eventstream) 54 self.assertEqual([ 55 ('status', 'name1', 'exists'), 56 ('status', 'name2', 'exists'), 57 ], [event[:3] for event in eventstream._events[:2]]) 58 59 def test_list_errors_if_errors_from_list_test(self): 60 bytestream = io.BytesIO() 61 runner = SubunitTestRunner(stream=bytestream) 62 def list_test(test): 63 return [], ['failed import'] 64 self.patch(run, 'list_test', list_test) 65 exc = self.assertRaises(SystemExit, runner.list, None) 66 self.assertEqual((2,), exc.args) 67 68 def test_list_includes_loader_errors(self): 69 bytestream = io.BytesIO() 70 runner = SubunitTestRunner(stream=bytestream) 71 def list_test(test): 72 return [], [] 73 class Loader(object): 74 errors = ['failed import'] 75 loader = Loader() 76 self.patch(run, 'list_test', list_test) 77 exc = self.assertRaises(SystemExit, runner.list, None, loader=loader) 78 self.assertEqual((2,), exc.args) 79 80 class FailingTest(TestCase): 81 def test_fail(self): 82 1/0 83 84 def test_exits_zero_when_tests_fail(self): 85 bytestream = io.BytesIO() 86 stream = io.TextIOWrapper(bytestream, encoding="utf8") 87 try: 88 self.assertEqual(None, run.main( 89 argv=["progName", "subunit.tests.test_run.TestSubunitTestRunner.FailingTest"], 90 stdout=stream)) 91 except SystemExit: 92 self.fail("SystemExit raised") 93 self.assertThat(bytestream.getvalue(), StartsWith(_b('\xb3'))) 94 95 class ExitingTest(TestCase): 96 def test_exit(self): 97 raise SystemExit(0) 98 99 def test_exits_nonzero_when_execution_errors(self): 100 bytestream = io.BytesIO() 101 stream = io.TextIOWrapper(bytestream, encoding="utf8") 102 exc = self.assertRaises(SystemExit, run.main, 103 argv=["progName", "subunit.tests.test_run.TestSubunitTestRunner.ExitingTest"], 104 stdout=stream) 105 self.assertEqual(0, exc.args[0]) 106