1#
2#  subunit: extensions to Python unittest to get test results from subprocesses.
3#  Copyright (C) 2009  Robert Collins <robertc@robertcollins.net>
4#
5#  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6#  license at the users choice. A copy of both licenses are available in the
7#  project source as Apache-2.0 and BSD. You may not use this file except in
8#  compliance with one of these two licences.
9#
10#  Unless required by applicable law or agreed to in writing, software
11#  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12#  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13#  license you chose for the specific language governing permissions and
14#  limitations under that license.
15#
16
17import csv
18import datetime
19import sys
20import unittest
21
22from testtools import TestCase
23from testtools.compat import StringIO
24from testtools.content import (
25    text_content,
26    TracebackContent,
27    )
28from testtools.testresult.doubles import ExtendedTestResult
29
30import subunit
31import subunit.iso8601 as iso8601
32import subunit.test_results
33
34import testtools
35
36
37class LoggingDecorator(subunit.test_results.HookedTestResultDecorator):
38
39    def __init__(self, decorated):
40        self._calls = 0
41        super(LoggingDecorator, self).__init__(decorated)
42
43    def _before_event(self):
44        self._calls += 1
45
46
47class AssertBeforeTestResult(LoggingDecorator):
48    """A TestResult for checking preconditions."""
49
50    def __init__(self, decorated, test):
51        self.test = test
52        super(AssertBeforeTestResult, self).__init__(decorated)
53
54    def _before_event(self):
55        self.test.assertEqual(1, self.earlier._calls)
56        super(AssertBeforeTestResult, self)._before_event()
57
58
59class TimeCapturingResult(unittest.TestResult):
60
61    def __init__(self):
62        super(TimeCapturingResult, self).__init__()
63        self._calls = []
64        self.failfast = False
65
66    def time(self, a_datetime):
67        self._calls.append(a_datetime)
68
69
70class TestHookedTestResultDecorator(unittest.TestCase):
71
72    def setUp(self):
73        # An end to the chain
74        terminal = unittest.TestResult()
75        # Asserts that the call was made to self.result before asserter was
76        # called.
77        asserter = AssertBeforeTestResult(terminal, self)
78        # The result object we call, which much increase its call count.
79        self.result = LoggingDecorator(asserter)
80        asserter.earlier = self.result
81        self.decorated = asserter
82
83    def tearDown(self):
84        # The hook in self.result must have been called
85        self.assertEqual(1, self.result._calls)
86        # The hook in asserter must have been called too, otherwise the
87        # assertion about ordering won't have completed.
88        self.assertEqual(1, self.decorated._calls)
89
90    def test_startTest(self):
91        self.result.startTest(self)
92
93    def test_startTestRun(self):
94        self.result.startTestRun()
95
96    def test_stopTest(self):
97        self.result.stopTest(self)
98
99    def test_stopTestRun(self):
100        self.result.stopTestRun()
101
102    def test_addError(self):
103        self.result.addError(self, subunit.RemoteError())
104
105    def test_addError_details(self):
106        self.result.addError(self, details={})
107
108    def test_addFailure(self):
109        self.result.addFailure(self, subunit.RemoteError())
110
111    def test_addFailure_details(self):
112        self.result.addFailure(self, details={})
113
114    def test_addSuccess(self):
115        self.result.addSuccess(self)
116
117    def test_addSuccess_details(self):
118        self.result.addSuccess(self, details={})
119
120    def test_addSkip(self):
121        self.result.addSkip(self, "foo")
122
123    def test_addSkip_details(self):
124        self.result.addSkip(self, details={})
125
126    def test_addExpectedFailure(self):
127        self.result.addExpectedFailure(self, subunit.RemoteError())
128
129    def test_addExpectedFailure_details(self):
130        self.result.addExpectedFailure(self, details={})
131
132    def test_addUnexpectedSuccess(self):
133        self.result.addUnexpectedSuccess(self)
134
135    def test_addUnexpectedSuccess_details(self):
136        self.result.addUnexpectedSuccess(self, details={})
137
138    def test_progress(self):
139        self.result.progress(1, subunit.PROGRESS_SET)
140
141    def test_wasSuccessful(self):
142        self.result.wasSuccessful()
143
144    def test_shouldStop(self):
145        self.result.shouldStop
146
147    def test_stop(self):
148        self.result.stop()
149
150    def test_time(self):
151        self.result.time(None)
152
153
154class TestAutoTimingTestResultDecorator(unittest.TestCase):
155
156    def setUp(self):
157        # And end to the chain which captures time events.
158        terminal = TimeCapturingResult()
159        # The result object under test.
160        self.result = subunit.test_results.AutoTimingTestResultDecorator(
161            terminal)
162        self.decorated = terminal
163
164    def test_without_time_calls_time_is_called_and_not_None(self):
165        self.result.startTest(self)
166        self.assertEqual(1, len(self.decorated._calls))
167        self.assertNotEqual(None, self.decorated._calls[0])
168
169    def test_no_time_from_progress(self):
170        self.result.progress(1, subunit.PROGRESS_CUR)
171        self.assertEqual(0, len(self.decorated._calls))
172
173    def test_no_time_from_shouldStop(self):
174        self.decorated.stop()
175        self.result.shouldStop
176        self.assertEqual(0, len(self.decorated._calls))
177
178    def test_calling_time_inhibits_automatic_time(self):
179        # Calling time() outputs a time signal immediately and prevents
180        # automatically adding one when other methods are called.
181        time = datetime.datetime(2009,10,11,12,13,14,15, iso8601.Utc())
182        self.result.time(time)
183        self.result.startTest(self)
184        self.result.stopTest(self)
185        self.assertEqual(1, len(self.decorated._calls))
186        self.assertEqual(time, self.decorated._calls[0])
187
188    def test_calling_time_None_enables_automatic_time(self):
189        time = datetime.datetime(2009,10,11,12,13,14,15, iso8601.Utc())
190        self.result.time(time)
191        self.assertEqual(1, len(self.decorated._calls))
192        self.assertEqual(time, self.decorated._calls[0])
193        # Calling None passes the None through, in case other results care.
194        self.result.time(None)
195        self.assertEqual(2, len(self.decorated._calls))
196        self.assertEqual(None, self.decorated._calls[1])
197        # Calling other methods doesn't generate an automatic time event.
198        self.result.startTest(self)
199        self.assertEqual(3, len(self.decorated._calls))
200        self.assertNotEqual(None, self.decorated._calls[2])
201
202    def test_set_failfast_True(self):
203        self.assertFalse(self.decorated.failfast)
204        self.result.failfast = True
205        self.assertTrue(self.decorated.failfast)
206
207
208class TestTagCollapsingDecorator(TestCase):
209
210    def test_tags_collapsed_outside_of_tests(self):
211        result = ExtendedTestResult()
212        tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
213        tag_collapser.tags(set(['a']), set())
214        tag_collapser.tags(set(['b']), set())
215        tag_collapser.startTest(self)
216        self.assertEquals(
217            [('tags', set(['a', 'b']), set([])),
218             ('startTest', self),
219             ], result._events)
220
221    def test_tags_collapsed_outside_of_tests_are_flushed(self):
222        result = ExtendedTestResult()
223        tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
224        tag_collapser.startTestRun()
225        tag_collapser.tags(set(['a']), set())
226        tag_collapser.tags(set(['b']), set())
227        tag_collapser.startTest(self)
228        tag_collapser.addSuccess(self)
229        tag_collapser.stopTest(self)
230        tag_collapser.stopTestRun()
231        self.assertEquals(
232            [('startTestRun',),
233             ('tags', set(['a', 'b']), set([])),
234             ('startTest', self),
235             ('addSuccess', self),
236             ('stopTest', self),
237             ('stopTestRun',),
238             ], result._events)
239
240    def test_tags_forwarded_after_tests(self):
241        test = subunit.RemotedTestCase('foo')
242        result = ExtendedTestResult()
243        tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
244        tag_collapser.startTestRun()
245        tag_collapser.startTest(test)
246        tag_collapser.addSuccess(test)
247        tag_collapser.stopTest(test)
248        tag_collapser.tags(set(['a']), set(['b']))
249        tag_collapser.stopTestRun()
250        self.assertEqual(
251            [('startTestRun',),
252             ('startTest', test),
253             ('addSuccess', test),
254             ('stopTest', test),
255             ('tags', set(['a']), set(['b'])),
256             ('stopTestRun',),
257             ],
258            result._events)
259
260    def test_tags_collapsed_inside_of_tests(self):
261        result = ExtendedTestResult()
262        tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
263        test = subunit.RemotedTestCase('foo')
264        tag_collapser.startTest(test)
265        tag_collapser.tags(set(['a']), set())
266        tag_collapser.tags(set(['b']), set(['a']))
267        tag_collapser.tags(set(['c']), set())
268        tag_collapser.stopTest(test)
269        self.assertEquals(
270            [('startTest', test),
271             ('tags', set(['b', 'c']), set(['a'])),
272             ('stopTest', test)],
273            result._events)
274
275    def test_tags_collapsed_inside_of_tests_different_ordering(self):
276        result = ExtendedTestResult()
277        tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
278        test = subunit.RemotedTestCase('foo')
279        tag_collapser.startTest(test)
280        tag_collapser.tags(set(), set(['a']))
281        tag_collapser.tags(set(['a', 'b']), set())
282        tag_collapser.tags(set(['c']), set())
283        tag_collapser.stopTest(test)
284        self.assertEquals(
285            [('startTest', test),
286             ('tags', set(['a', 'b', 'c']), set()),
287             ('stopTest', test)],
288            result._events)
289
290    def test_tags_sent_before_result(self):
291        # Because addSuccess and friends tend to send subunit output
292        # immediately, and because 'tags:' before a result line means
293        # something different to 'tags:' after a result line, we need to be
294        # sure that tags are emitted before 'addSuccess' (or whatever).
295        result = ExtendedTestResult()
296        tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
297        test = subunit.RemotedTestCase('foo')
298        tag_collapser.startTest(test)
299        tag_collapser.tags(set(['a']), set())
300        tag_collapser.addSuccess(test)
301        tag_collapser.stopTest(test)
302        self.assertEquals(
303            [('startTest', test),
304             ('tags', set(['a']), set()),
305             ('addSuccess', test),
306             ('stopTest', test)],
307            result._events)
308
309
310class TestTimeCollapsingDecorator(TestCase):
311
312    def make_time(self):
313        # Heh heh.
314        return datetime.datetime(
315            2000, 1, self.getUniqueInteger(), tzinfo=iso8601.UTC)
316
317    def test_initial_time_forwarded(self):
318        # We always forward the first time event we see.
319        result = ExtendedTestResult()
320        tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
321        a_time = self.make_time()
322        tag_collapser.time(a_time)
323        self.assertEquals([('time', a_time)], result._events)
324
325    def test_time_collapsed_to_first_and_last(self):
326        # If there are many consecutive time events, only the first and last
327        # are sent through.
328        result = ExtendedTestResult()
329        tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
330        times = [self.make_time() for i in range(5)]
331        for a_time in times:
332            tag_collapser.time(a_time)
333        tag_collapser.startTest(subunit.RemotedTestCase('foo'))
334        self.assertEquals(
335            [('time', times[0]), ('time', times[-1])], result._events[:-1])
336
337    def test_only_one_time_sent(self):
338        # If we receive a single time event followed by a non-time event, we
339        # send exactly one time event.
340        result = ExtendedTestResult()
341        tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
342        a_time = self.make_time()
343        tag_collapser.time(a_time)
344        tag_collapser.startTest(subunit.RemotedTestCase('foo'))
345        self.assertEquals([('time', a_time)], result._events[:-1])
346
347    def test_duplicate_times_not_sent(self):
348        # Many time events with the exact same time are collapsed into one
349        # time event.
350        result = ExtendedTestResult()
351        tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
352        a_time = self.make_time()
353        for i in range(5):
354            tag_collapser.time(a_time)
355        tag_collapser.startTest(subunit.RemotedTestCase('foo'))
356        self.assertEquals([('time', a_time)], result._events[:-1])
357
358    def test_no_times_inserted(self):
359        result = ExtendedTestResult()
360        tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
361        a_time = self.make_time()
362        tag_collapser.time(a_time)
363        foo = subunit.RemotedTestCase('foo')
364        tag_collapser.startTest(foo)
365        tag_collapser.addSuccess(foo)
366        tag_collapser.stopTest(foo)
367        self.assertEquals(
368            [('time', a_time),
369             ('startTest', foo),
370             ('addSuccess', foo),
371             ('stopTest', foo)], result._events)
372
373
374class TestByTestResultTests(testtools.TestCase):
375
376    def setUp(self):
377        super(TestByTestResultTests, self).setUp()
378        self.log = []
379        self.result = subunit.test_results.TestByTestResult(self.on_test)
380        if sys.version_info >= (3, 0):
381            self.result._now = iter(range(5)).__next__
382        else:
383            self.result._now = iter(range(5)).next
384
385    def assertCalled(self, **kwargs):
386        defaults = {
387            'test': self,
388            'tags': set(),
389            'details': None,
390            'start_time': 0,
391            'stop_time': 1,
392            }
393        defaults.update(kwargs)
394        self.assertEqual([defaults], self.log)
395
396    def on_test(self, **kwargs):
397        self.log.append(kwargs)
398
399    def test_no_tests_nothing_reported(self):
400        self.result.startTestRun()
401        self.result.stopTestRun()
402        self.assertEqual([], self.log)
403
404    def test_add_success(self):
405        self.result.startTest(self)
406        self.result.addSuccess(self)
407        self.result.stopTest(self)
408        self.assertCalled(status='success')
409
410    def test_add_success_details(self):
411        self.result.startTest(self)
412        details = {'foo': 'bar'}
413        self.result.addSuccess(self, details=details)
414        self.result.stopTest(self)
415        self.assertCalled(status='success', details=details)
416
417    def test_tags(self):
418        if not getattr(self.result, 'tags', None):
419            self.skipTest("No tags in testtools")
420        self.result.tags(['foo'], [])
421        self.result.startTest(self)
422        self.result.addSuccess(self)
423        self.result.stopTest(self)
424        self.assertCalled(status='success', tags=set(['foo']))
425
426    def test_add_error(self):
427        self.result.startTest(self)
428        try:
429            1/0
430        except ZeroDivisionError:
431            error = sys.exc_info()
432        self.result.addError(self, error)
433        self.result.stopTest(self)
434        self.assertCalled(
435            status='error',
436            details={'traceback': TracebackContent(error, self)})
437
438    def test_add_error_details(self):
439        self.result.startTest(self)
440        details = {"foo": text_content("bar")}
441        self.result.addError(self, details=details)
442        self.result.stopTest(self)
443        self.assertCalled(status='error', details=details)
444
445    def test_add_failure(self):
446        self.result.startTest(self)
447        try:
448            self.fail("intentional failure")
449        except self.failureException:
450            failure = sys.exc_info()
451        self.result.addFailure(self, failure)
452        self.result.stopTest(self)
453        self.assertCalled(
454            status='failure',
455            details={'traceback': TracebackContent(failure, self)})
456
457    def test_add_failure_details(self):
458        self.result.startTest(self)
459        details = {"foo": text_content("bar")}
460        self.result.addFailure(self, details=details)
461        self.result.stopTest(self)
462        self.assertCalled(status='failure', details=details)
463
464    def test_add_xfail(self):
465        self.result.startTest(self)
466        try:
467            1/0
468        except ZeroDivisionError:
469            error = sys.exc_info()
470        self.result.addExpectedFailure(self, error)
471        self.result.stopTest(self)
472        self.assertCalled(
473            status='xfail',
474            details={'traceback': TracebackContent(error, self)})
475
476    def test_add_xfail_details(self):
477        self.result.startTest(self)
478        details = {"foo": text_content("bar")}
479        self.result.addExpectedFailure(self, details=details)
480        self.result.stopTest(self)
481        self.assertCalled(status='xfail', details=details)
482
483    def test_add_unexpected_success(self):
484        self.result.startTest(self)
485        details = {'foo': 'bar'}
486        self.result.addUnexpectedSuccess(self, details=details)
487        self.result.stopTest(self)
488        self.assertCalled(status='success', details=details)
489
490    def test_add_skip_reason(self):
491        self.result.startTest(self)
492        reason = self.getUniqueString()
493        self.result.addSkip(self, reason)
494        self.result.stopTest(self)
495        self.assertCalled(
496            status='skip', details={'reason': text_content(reason)})
497
498    def test_add_skip_details(self):
499        self.result.startTest(self)
500        details = {'foo': 'bar'}
501        self.result.addSkip(self, details=details)
502        self.result.stopTest(self)
503        self.assertCalled(status='skip', details=details)
504
505    def test_twice(self):
506        self.result.startTest(self)
507        self.result.addSuccess(self, details={'foo': 'bar'})
508        self.result.stopTest(self)
509        self.result.startTest(self)
510        self.result.addSuccess(self)
511        self.result.stopTest(self)
512        self.assertEqual(
513            [{'test': self,
514              'status': 'success',
515              'start_time': 0,
516              'stop_time': 1,
517              'tags': set(),
518              'details': {'foo': 'bar'}},
519             {'test': self,
520              'status': 'success',
521              'start_time': 2,
522              'stop_time': 3,
523              'tags': set(),
524              'details': None},
525             ],
526            self.log)
527
528
529class TestCsvResult(testtools.TestCase):
530
531    def parse_stream(self, stream):
532        stream.seek(0)
533        reader = csv.reader(stream)
534        return list(reader)
535
536    def test_csv_output(self):
537        stream = StringIO()
538        result = subunit.test_results.CsvResult(stream)
539        if sys.version_info >= (3, 0):
540            result._now = iter(range(5)).__next__
541        else:
542            result._now = iter(range(5)).next
543        result.startTestRun()
544        result.startTest(self)
545        result.addSuccess(self)
546        result.stopTest(self)
547        result.stopTestRun()
548        self.assertEqual(
549            [['test', 'status', 'start_time', 'stop_time'],
550             [self.id(), 'success', '0', '1'],
551             ],
552            self.parse_stream(stream))
553
554    def test_just_header_when_no_tests(self):
555        stream = StringIO()
556        result = subunit.test_results.CsvResult(stream)
557        result.startTestRun()
558        result.stopTestRun()
559        self.assertEqual(
560            [['test', 'status', 'start_time', 'stop_time']],
561            self.parse_stream(stream))
562
563    def test_no_output_before_events(self):
564        stream = StringIO()
565        subunit.test_results.CsvResult(stream)
566        self.assertEqual([], self.parse_stream(stream))
567