1#
2#  subunit: extensions to Python unittest to get test results from subprocesses.
3#  Copyright (C) 2005  Robert Collins <robertc@robertcollins.net>
4#
5#  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6#  license at the users choice. A copy of both licenses are available in the
7#  project source as Apache-2.0 and BSD. You may not use this file except in
8#  compliance with one of these two licences.
9#
10#  Unless required by applicable law or agreed to in writing, software
11#  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12#  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13#  license you chose for the specific language governing permissions and
14#  limitations under that license.
15#
16
17"""Subunit - a streaming test protocol
18
19Overview
20++++++++
21
22The ``subunit`` Python package provides a number of ``unittest`` extensions
23which can be used to cause tests to output Subunit, to parse Subunit streams
24into test activity, perform seamless test isolation within a regular test
25case and variously sort, filter and report on test runs.
26
27
28Key Classes
29-----------
30
31The ``subunit.TestProtocolClient`` class is a ``unittest.TestResult``
32extension which will translate a test run into a Subunit stream.
33
34The ``subunit.ProtocolTestCase`` class is an adapter between the Subunit wire
35protocol and the ``unittest.TestCase`` object protocol. It is used to translate
36a stream into a test run, which regular ``unittest.TestResult`` objects can
37process and report/inspect.
38
39Subunit has support for non-blocking usage too, for use with asyncore or
40Twisted. See the ``TestProtocolServer`` parser class for more details.
41
42Subunit includes extensions to the Python ``TestResult`` protocol. These are
43all done in a compatible manner: ``TestResult`` objects that do not implement
44the extension methods will not cause errors to be raised, instead the extension
45will either lose fidelity (for instance, folding expected failures to success
46in Python versions < 2.7 or 3.1), or discard the extended data (for extra
47details, tags, timestamping and progress markers).
48
49The test outcome methods ``addSuccess``, ``addError``, ``addExpectedFailure``,
50``addFailure``, ``addSkip`` take an optional keyword parameter ``details``
51which can be used instead of the usual python unittest parameter.
52When used the value of details should be a dict from ``string`` to
53``testtools.content.Content`` objects. This is a draft API being worked on with
54the Python Testing In Python mail list, with the goal of permitting a common
55way to provide additional data beyond a traceback, such as captured data from
56disk, logging messages etc. The reference for this API is in testtools (0.9.0
57and newer).
58
59The ``tags(new_tags, gone_tags)`` method is called (if present) to add or
60remove tags in the test run that is currently executing. If called when no
61test is in progress (that is, if called outside of the ``startTest``,
62``stopTest`` pair), the the tags apply to all subsequent tests. If called
63when a test is in progress, then the tags only apply to that test.
64
65The ``time(a_datetime)`` method is called (if present) when a ``time:``
66directive is encountered in a Subunit stream. This is used to tell a TestResult
67about the time that events in the stream occurred at, to allow reconstructing
68test timing from a stream.
69
70The ``progress(offset, whence)`` method controls progress data for a stream.
71The offset parameter is an int, and whence is one of subunit.PROGRESS_CUR,
72subunit.PROGRESS_SET, PROGRESS_PUSH, PROGRESS_POP. Push and pop operations
73ignore the offset parameter.
74
75
76Python test support
77-------------------
78
79``subunit.run`` is a convenience wrapper to run a Python test suite via
80the command line, reporting via Subunit::
81
82  $ python -m subunit.run mylib.tests.test_suite
83
84The ``IsolatedTestSuite`` class is a TestSuite that forks before running its
85tests, allowing isolation between the test runner and some tests.
86
87Similarly, ``IsolatedTestCase`` is a base class which can be subclassed to get
88tests that will fork() before that individual test is run.
89
90`ExecTestCase`` is a convenience wrapper for running an external
91program to get a Subunit stream and then report that back to an arbitrary
92result object::
93
94 class AggregateTests(subunit.ExecTestCase):
95
96     def test_script_one(self):
97         './bin/script_one'
98
99     def test_script_two(self):
100         './bin/script_two'
101
102 # Normally your normal test loading would take of this automatically,
103 # It is only spelt out in detail here for clarity.
104 suite = unittest.TestSuite([AggregateTests("test_script_one"),
105     AggregateTests("test_script_two")])
106 # Create any TestResult class you like.
107 result = unittest._TextTestResult(sys.stdout)
108 # And run your suite as normal, Subunit will exec each external script as
109 # needed and report to your result object.
110 suite.run(result)
111
112Utility modules
113---------------
114
115* subunit.chunked contains HTTP chunked encoding/decoding logic.
116* subunit.test_results contains TestResult helper classes.
117"""
118
119import os
120import re
121import subprocess
122import sys
123import unittest
124try:
125    from io import UnsupportedOperation as _UnsupportedOperation
126except ImportError:
127    _UnsupportedOperation = AttributeError
128
129from extras import safe_hasattr
130from testtools import content, content_type, ExtendedToOriginalDecorator
131from testtools.content import TracebackContent
132from testtools.compat import _b, _u, BytesIO, StringIO
133try:
134    from testtools.testresult.real import _StringException
135    RemoteException = _StringException
136except ImportError:
137    raise ImportError ("testtools.testresult.real does not contain "
138        "_StringException, check your version.")
139from testtools import testresult, CopyStreamResult
140
141from subunit import chunked, details, iso8601, test_results
142from subunit.v2 import ByteStreamToStreamResult, StreamResultToBytes
143
144# same format as sys.version_info: "A tuple containing the five components of
145# the version number: major, minor, micro, releaselevel, and serial. All
146# values except releaselevel are integers; the release level is 'alpha',
147# 'beta', 'candidate', or 'final'. The version_info value corresponding to the
148# Python version 2.0 is (2, 0, 0, 'final', 0)."  Additionally we use a
149# releaselevel of 'dev' for unreleased under-development code.
150#
151# If the releaselevel is 'alpha' then the major/minor/micro components are not
152# established at this point, and setup.py will use a version of next-$(revno).
153# If the releaselevel is 'final', then the tarball will be major.minor.micro.
154# Otherwise it is major.minor.micro~$(revno).
155
156__version__ = (1, 3, 0, 'final', 0)
157
158PROGRESS_SET = 0
159PROGRESS_CUR = 1
160PROGRESS_PUSH = 2
161PROGRESS_POP = 3
162
163
164def test_suite():
165    import subunit.tests
166    return subunit.tests.test_suite()
167
168
169def join_dir(base_path, path):
170    """
171    Returns an absolute path to C{path}, calculated relative to the parent
172    of C{base_path}.
173
174    @param base_path: A path to a file or directory.
175    @param path: An absolute path, or a path relative to the containing
176    directory of C{base_path}.
177
178    @return: An absolute path to C{path}.
179    """
180    return os.path.join(os.path.dirname(os.path.abspath(base_path)), path)
181
182
183def tags_to_new_gone(tags):
184    """Split a list of tags into a new_set and a gone_set."""
185    new_tags = set()
186    gone_tags = set()
187    for tag in tags:
188        if tag[0] == '-':
189            gone_tags.add(tag[1:])
190        else:
191            new_tags.add(tag)
192    return new_tags, gone_tags
193
194
195class DiscardStream(object):
196    """A filelike object which discards what is written to it."""
197
198    def fileno(self):
199        raise _UnsupportedOperation()
200
201    def write(self, bytes):
202        pass
203
204    def read(self, len=0):
205        return _b('')
206
207
208class _ParserState(object):
209    """State for the subunit parser."""
210
211    def __init__(self, parser):
212        self.parser = parser
213        self._test_sym = (_b('test'), _b('testing'))
214        self._colon_sym = _b(':')
215        self._error_sym = (_b('error'),)
216        self._failure_sym = (_b('failure'),)
217        self._progress_sym = (_b('progress'),)
218        self._skip_sym = _b('skip')
219        self._success_sym = (_b('success'), _b('successful'))
220        self._tags_sym = (_b('tags'),)
221        self._time_sym = (_b('time'),)
222        self._xfail_sym = (_b('xfail'),)
223        self._uxsuccess_sym = (_b('uxsuccess'),)
224        self._start_simple = _u(" [")
225        self._start_multipart = _u(" [ multipart")
226
227    def addError(self, offset, line):
228        """An 'error:' directive has been read."""
229        self.parser.stdOutLineReceived(line)
230
231    def addExpectedFail(self, offset, line):
232        """An 'xfail:' directive has been read."""
233        self.parser.stdOutLineReceived(line)
234
235    def addFailure(self, offset, line):
236        """A 'failure:' directive has been read."""
237        self.parser.stdOutLineReceived(line)
238
239    def addSkip(self, offset, line):
240        """A 'skip:' directive has been read."""
241        self.parser.stdOutLineReceived(line)
242
243    def addSuccess(self, offset, line):
244        """A 'success:' directive has been read."""
245        self.parser.stdOutLineReceived(line)
246
247    def lineReceived(self, line):
248        """a line has been received."""
249        parts = line.split(None, 1)
250        if len(parts) == 2 and line.startswith(parts[0]):
251            cmd, rest = parts
252            offset = len(cmd) + 1
253            cmd = cmd.rstrip(self._colon_sym)
254            if cmd in self._test_sym:
255                self.startTest(offset, line)
256            elif cmd in self._error_sym:
257                self.addError(offset, line)
258            elif cmd in self._failure_sym:
259                self.addFailure(offset, line)
260            elif cmd in self._progress_sym:
261                self.parser._handleProgress(offset, line)
262            elif cmd in self._skip_sym:
263                self.addSkip(offset, line)
264            elif cmd in self._success_sym:
265                self.addSuccess(offset, line)
266            elif cmd in self._tags_sym:
267                self.parser._handleTags(offset, line)
268                self.parser.subunitLineReceived(line)
269            elif cmd in self._time_sym:
270                self.parser._handleTime(offset, line)
271                self.parser.subunitLineReceived(line)
272            elif cmd in self._xfail_sym:
273                self.addExpectedFail(offset, line)
274            elif cmd in self._uxsuccess_sym:
275                self.addUnexpectedSuccess(offset, line)
276            else:
277                self.parser.stdOutLineReceived(line)
278        else:
279            self.parser.stdOutLineReceived(line)
280
281    def lostConnection(self):
282        """Connection lost."""
283        self.parser._lostConnectionInTest(_u('unknown state of '))
284
285    def startTest(self, offset, line):
286        """A test start command received."""
287        self.parser.stdOutLineReceived(line)
288
289
290class _InTest(_ParserState):
291    """State for the subunit parser after reading a test: directive."""
292
293    def _outcome(self, offset, line, no_details, details_state):
294        """An outcome directive has been read.
295
296        :param no_details: Callable to call when no details are presented.
297        :param details_state: The state to switch to for details
298            processing of this outcome.
299        """
300        test_name = line[offset:-1].decode('utf8')
301        if self.parser.current_test_description == test_name:
302            self.parser._state = self.parser._outside_test
303            self.parser.current_test_description = None
304            no_details()
305            self.parser.client.stopTest(self.parser._current_test)
306            self.parser._current_test = None
307            self.parser.subunitLineReceived(line)
308        elif self.parser.current_test_description + self._start_simple == \
309            test_name:
310            self.parser._state = details_state
311            details_state.set_simple()
312            self.parser.subunitLineReceived(line)
313        elif self.parser.current_test_description + self._start_multipart == \
314            test_name:
315            self.parser._state = details_state
316            details_state.set_multipart()
317            self.parser.subunitLineReceived(line)
318        else:
319            self.parser.stdOutLineReceived(line)
320
321    def _error(self):
322        self.parser.client.addError(self.parser._current_test,
323            details={})
324
325    def addError(self, offset, line):
326        """An 'error:' directive has been read."""
327        self._outcome(offset, line, self._error,
328            self.parser._reading_error_details)
329
330    def _xfail(self):
331        self.parser.client.addExpectedFailure(self.parser._current_test,
332            details={})
333
334    def addExpectedFail(self, offset, line):
335        """An 'xfail:' directive has been read."""
336        self._outcome(offset, line, self._xfail,
337            self.parser._reading_xfail_details)
338
339    def _uxsuccess(self):
340        self.parser.client.addUnexpectedSuccess(self.parser._current_test)
341
342    def addUnexpectedSuccess(self, offset, line):
343        """A 'uxsuccess:' directive has been read."""
344        self._outcome(offset, line, self._uxsuccess,
345            self.parser._reading_uxsuccess_details)
346
347    def _failure(self):
348        self.parser.client.addFailure(self.parser._current_test, details={})
349
350    def addFailure(self, offset, line):
351        """A 'failure:' directive has been read."""
352        self._outcome(offset, line, self._failure,
353            self.parser._reading_failure_details)
354
355    def _skip(self):
356        self.parser.client.addSkip(self.parser._current_test, details={})
357
358    def addSkip(self, offset, line):
359        """A 'skip:' directive has been read."""
360        self._outcome(offset, line, self._skip,
361            self.parser._reading_skip_details)
362
363    def _succeed(self):
364        self.parser.client.addSuccess(self.parser._current_test, details={})
365
366    def addSuccess(self, offset, line):
367        """A 'success:' directive has been read."""
368        self._outcome(offset, line, self._succeed,
369            self.parser._reading_success_details)
370
371    def lostConnection(self):
372        """Connection lost."""
373        self.parser._lostConnectionInTest(_u(''))
374
375
376class _OutSideTest(_ParserState):
377    """State for the subunit parser outside of a test context."""
378
379    def lostConnection(self):
380        """Connection lost."""
381
382    def startTest(self, offset, line):
383        """A test start command received."""
384        self.parser._state = self.parser._in_test
385        test_name = line[offset:-1].decode('utf8')
386        self.parser._current_test = RemotedTestCase(test_name)
387        self.parser.current_test_description = test_name
388        self.parser.client.startTest(self.parser._current_test)
389        self.parser.subunitLineReceived(line)
390
391
392class _ReadingDetails(_ParserState):
393    """Common logic for readin state details."""
394
395    def endDetails(self):
396        """The end of a details section has been reached."""
397        self.parser._state = self.parser._outside_test
398        self.parser.current_test_description = None
399        self._report_outcome()
400        self.parser.client.stopTest(self.parser._current_test)
401
402    def lineReceived(self, line):
403        """a line has been received."""
404        self.details_parser.lineReceived(line)
405        self.parser.subunitLineReceived(line)
406
407    def lostConnection(self):
408        """Connection lost."""
409        self.parser._lostConnectionInTest(_u('%s report of ') %
410            self._outcome_label())
411
412    def _outcome_label(self):
413        """The label to describe this outcome."""
414        raise NotImplementedError(self._outcome_label)
415
416    def set_simple(self):
417        """Start a simple details parser."""
418        self.details_parser = details.SimpleDetailsParser(self)
419
420    def set_multipart(self):
421        """Start a multipart details parser."""
422        self.details_parser = details.MultipartDetailsParser(self)
423
424
425class _ReadingFailureDetails(_ReadingDetails):
426    """State for the subunit parser when reading failure details."""
427
428    def _report_outcome(self):
429        self.parser.client.addFailure(self.parser._current_test,
430            details=self.details_parser.get_details())
431
432    def _outcome_label(self):
433        return "failure"
434
435
436class _ReadingErrorDetails(_ReadingDetails):
437    """State for the subunit parser when reading error details."""
438
439    def _report_outcome(self):
440        self.parser.client.addError(self.parser._current_test,
441            details=self.details_parser.get_details())
442
443    def _outcome_label(self):
444        return "error"
445
446
447class _ReadingExpectedFailureDetails(_ReadingDetails):
448    """State for the subunit parser when reading xfail details."""
449
450    def _report_outcome(self):
451        self.parser.client.addExpectedFailure(self.parser._current_test,
452            details=self.details_parser.get_details())
453
454    def _outcome_label(self):
455        return "xfail"
456
457
458class _ReadingUnexpectedSuccessDetails(_ReadingDetails):
459    """State for the subunit parser when reading uxsuccess details."""
460
461    def _report_outcome(self):
462        self.parser.client.addUnexpectedSuccess(self.parser._current_test,
463            details=self.details_parser.get_details())
464
465    def _outcome_label(self):
466        return "uxsuccess"
467
468
469class _ReadingSkipDetails(_ReadingDetails):
470    """State for the subunit parser when reading skip details."""
471
472    def _report_outcome(self):
473        self.parser.client.addSkip(self.parser._current_test,
474            details=self.details_parser.get_details("skip"))
475
476    def _outcome_label(self):
477        return "skip"
478
479
480class _ReadingSuccessDetails(_ReadingDetails):
481    """State for the subunit parser when reading success details."""
482
483    def _report_outcome(self):
484        self.parser.client.addSuccess(self.parser._current_test,
485            details=self.details_parser.get_details("success"))
486
487    def _outcome_label(self):
488        return "success"
489
490
491class TestProtocolServer(object):
492    """A parser for subunit.
493
494    :ivar tags: The current tags associated with the protocol stream.
495    """
496
497    def __init__(self, client, stream=None, forward_stream=None):
498        """Create a TestProtocolServer instance.
499
500        :param client: An object meeting the unittest.TestResult protocol.
501        :param stream: The stream that lines received which are not part of the
502            subunit protocol should be written to. This allows custom handling
503            of mixed protocols. By default, sys.stdout will be used for
504            convenience. It should accept bytes to its write() method.
505        :param forward_stream: A stream to forward subunit lines to. This
506            allows a filter to forward the entire stream while still parsing
507            and acting on it. By default forward_stream is set to
508            DiscardStream() and no forwarding happens.
509        """
510        self.client = ExtendedToOriginalDecorator(client)
511        if stream is None:
512            stream = sys.stdout
513            if sys.version_info > (3, 0):
514                stream = stream.buffer
515        self._stream = stream
516        self._forward_stream = forward_stream or DiscardStream()
517        # state objects we can switch too
518        self._in_test = _InTest(self)
519        self._outside_test = _OutSideTest(self)
520        self._reading_error_details = _ReadingErrorDetails(self)
521        self._reading_failure_details = _ReadingFailureDetails(self)
522        self._reading_skip_details = _ReadingSkipDetails(self)
523        self._reading_success_details = _ReadingSuccessDetails(self)
524        self._reading_xfail_details = _ReadingExpectedFailureDetails(self)
525        self._reading_uxsuccess_details = _ReadingUnexpectedSuccessDetails(self)
526        # start with outside test.
527        self._state = self._outside_test
528        # Avoid casts on every call
529        self._plusminus = _b('+-')
530        self._push_sym = _b('push')
531        self._pop_sym = _b('pop')
532
533    def _handleProgress(self, offset, line):
534        """Process a progress directive."""
535        line = line[offset:].strip()
536        if line[0] in self._plusminus:
537            whence = PROGRESS_CUR
538            delta = int(line)
539        elif line == self._push_sym:
540            whence = PROGRESS_PUSH
541            delta = None
542        elif line == self._pop_sym:
543            whence = PROGRESS_POP
544            delta = None
545        else:
546            whence = PROGRESS_SET
547            delta = int(line)
548        self.client.progress(delta, whence)
549
550    def _handleTags(self, offset, line):
551        """Process a tags command."""
552        tags = line[offset:].decode('utf8').split()
553        new_tags, gone_tags = tags_to_new_gone(tags)
554        self.client.tags(new_tags, gone_tags)
555
556    def _handleTime(self, offset, line):
557        # Accept it, but do not do anything with it yet.
558        try:
559            event_time = iso8601.parse_date(line[offset:-1])
560        except TypeError:
561            raise TypeError(_u("Failed to parse %r, got %r")
562                % (line, sys.exec_info[1]))
563        self.client.time(event_time)
564
565    def lineReceived(self, line):
566        """Call the appropriate local method for the received line."""
567        self._state.lineReceived(line)
568
569    def _lostConnectionInTest(self, state_string):
570        error_string = _u("lost connection during %stest '%s'") % (
571            state_string, self.current_test_description)
572        self.client.addError(self._current_test, RemoteError(error_string))
573        self.client.stopTest(self._current_test)
574
575    def lostConnection(self):
576        """The input connection has finished."""
577        self._state.lostConnection()
578
579    def readFrom(self, pipe):
580        """Blocking convenience API to parse an entire stream.
581
582        :param pipe: A file-like object supporting readlines().
583        :return: None.
584        """
585        for line in pipe.readlines():
586            self.lineReceived(line)
587        self.lostConnection()
588
589    def _startTest(self, offset, line):
590        """Internal call to change state machine. Override startTest()."""
591        self._state.startTest(offset, line)
592
593    def subunitLineReceived(self, line):
594        self._forward_stream.write(line)
595
596    def stdOutLineReceived(self, line):
597        self._stream.write(line)
598
599
600class TestProtocolClient(testresult.TestResult):
601    """A TestResult which generates a subunit stream for a test run.
602
603    # Get a TestSuite or TestCase to run
604    suite = make_suite()
605    # Create a stream (any object with a 'write' method). This should accept
606    # bytes not strings: subunit is a byte orientated protocol.
607    stream = file('tests.log', 'wb')
608    # Create a subunit result object which will output to the stream
609    result = subunit.TestProtocolClient(stream)
610    # Optionally, to get timing data for performance analysis, wrap the
611    # serialiser with a timing decorator
612    result = subunit.test_results.AutoTimingTestResultDecorator(result)
613    # Run the test suite reporting to the subunit result object
614    suite.run(result)
615    # Close the stream.
616    stream.close()
617    """
618
619    def __init__(self, stream):
620        testresult.TestResult.__init__(self)
621        stream = make_stream_binary(stream)
622        self._stream = stream
623        self._progress_fmt = _b("progress: ")
624        self._bytes_eol = _b("\n")
625        self._progress_plus = _b("+")
626        self._progress_push = _b("push")
627        self._progress_pop = _b("pop")
628        self._empty_bytes = _b("")
629        self._start_simple = _b(" [\n")
630        self._end_simple = _b("]\n")
631
632    def addError(self, test, error=None, details=None):
633        """Report an error in test test.
634
635        Only one of error and details should be provided: conceptually there
636        are two separate methods:
637            addError(self, test, error)
638            addError(self, test, details)
639
640        :param error: Standard unittest positional argument form - an
641            exc_info tuple.
642        :param details: New Testing-in-python drafted API; a dict from string
643            to subunit.Content objects.
644        """
645        self._addOutcome("error", test, error=error, details=details)
646        if self.failfast:
647            self.stop()
648
649    def addExpectedFailure(self, test, error=None, details=None):
650        """Report an expected failure in test test.
651
652        Only one of error and details should be provided: conceptually there
653        are two separate methods:
654            addError(self, test, error)
655            addError(self, test, details)
656
657        :param error: Standard unittest positional argument form - an
658            exc_info tuple.
659        :param details: New Testing-in-python drafted API; a dict from string
660            to subunit.Content objects.
661        """
662        self._addOutcome("xfail", test, error=error, details=details)
663
664    def addFailure(self, test, error=None, details=None):
665        """Report a failure in test test.
666
667        Only one of error and details should be provided: conceptually there
668        are two separate methods:
669            addFailure(self, test, error)
670            addFailure(self, test, details)
671
672        :param error: Standard unittest positional argument form - an
673            exc_info tuple.
674        :param details: New Testing-in-python drafted API; a dict from string
675            to subunit.Content objects.
676        """
677        self._addOutcome("failure", test, error=error, details=details)
678        if self.failfast:
679            self.stop()
680
681    def _addOutcome(self, outcome, test, error=None, details=None,
682        error_permitted=True):
683        """Report a failure in test test.
684
685        Only one of error and details should be provided: conceptually there
686        are two separate methods:
687            addOutcome(self, test, error)
688            addOutcome(self, test, details)
689
690        :param outcome: A string describing the outcome - used as the
691            event name in the subunit stream.
692        :param error: Standard unittest positional argument form - an
693            exc_info tuple.
694        :param details: New Testing-in-python drafted API; a dict from string
695            to subunit.Content objects.
696        :param error_permitted: If True then one and only one of error or
697            details must be supplied. If False then error must not be supplied
698            and details is still optional.  """
699        self._stream.write(_b("%s: " % outcome) + self._test_id(test))
700        if error_permitted:
701            if error is None and details is None:
702                raise ValueError
703        else:
704            if error is not None:
705                raise ValueError
706        if error is not None:
707            self._stream.write(self._start_simple)
708            tb_content = TracebackContent(error, test)
709            for bytes in tb_content.iter_bytes():
710                self._stream.write(bytes)
711        elif details is not None:
712            self._write_details(details)
713        else:
714            self._stream.write(_b("\n"))
715        if details is not None or error is not None:
716            self._stream.write(self._end_simple)
717
718    def addSkip(self, test, reason=None, details=None):
719        """Report a skipped test."""
720        if reason is None:
721            self._addOutcome("skip", test, error=None, details=details)
722        else:
723            self._stream.write(_b("skip: %s [\n" % test.id()))
724            self._stream.write(_b("%s\n" % reason))
725            self._stream.write(self._end_simple)
726
727    def addSuccess(self, test, details=None):
728        """Report a success in a test."""
729        self._addOutcome("successful", test, details=details, error_permitted=False)
730
731    def addUnexpectedSuccess(self, test, details=None):
732        """Report an unexpected success in test test.
733
734        Details can optionally be provided: conceptually there
735        are two separate methods:
736            addError(self, test)
737            addError(self, test, details)
738
739        :param details: New Testing-in-python drafted API; a dict from string
740            to subunit.Content objects.
741        """
742        self._addOutcome("uxsuccess", test, details=details,
743            error_permitted=False)
744        if self.failfast:
745            self.stop()
746
747    def _test_id(self, test):
748        result = test.id()
749        if type(result) is not bytes:
750            result = result.encode('utf8')
751        return result
752
753    def startTest(self, test):
754        """Mark a test as starting its test run."""
755        super(TestProtocolClient, self).startTest(test)
756        self._stream.write(_b("test: ") + self._test_id(test) + _b("\n"))
757        self._stream.flush()
758
759    def stopTest(self, test):
760        super(TestProtocolClient, self).stopTest(test)
761        self._stream.flush()
762
763    def progress(self, offset, whence):
764        """Provide indication about the progress/length of the test run.
765
766        :param offset: Information about the number of tests remaining. If
767            whence is PROGRESS_CUR, then offset increases/decreases the
768            remaining test count. If whence is PROGRESS_SET, then offset
769            specifies exactly the remaining test count.
770        :param whence: One of PROGRESS_CUR, PROGRESS_SET, PROGRESS_PUSH,
771            PROGRESS_POP.
772        """
773        if whence == PROGRESS_CUR and offset > -1:
774            prefix = self._progress_plus
775            offset = _b(str(offset))
776        elif whence == PROGRESS_PUSH:
777            prefix = self._empty_bytes
778            offset = self._progress_push
779        elif whence == PROGRESS_POP:
780            prefix = self._empty_bytes
781            offset = self._progress_pop
782        else:
783            prefix = self._empty_bytes
784            offset = _b(str(offset))
785        self._stream.write(self._progress_fmt + prefix + offset +
786            self._bytes_eol)
787
788    def tags(self, new_tags, gone_tags):
789        """Inform the client about tags added/removed from the stream."""
790        if not new_tags and not gone_tags:
791            return
792        tags = set([tag.encode('utf8') for tag in new_tags])
793        tags.update([_b("-") + tag.encode('utf8') for tag in gone_tags])
794        tag_line = _b("tags: ") + _b(" ").join(tags) + _b("\n")
795        self._stream.write(tag_line)
796
797    def time(self, a_datetime):
798        """Inform the client of the time.
799
800        ":param datetime: A datetime.datetime object.
801        """
802        time = a_datetime.astimezone(iso8601.Utc())
803        self._stream.write(_b("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % (
804            time.year, time.month, time.day, time.hour, time.minute,
805            time.second, time.microsecond)))
806
807    def _write_details(self, details):
808        """Output details to the stream.
809
810        :param details: An extended details dict for a test outcome.
811        """
812        self._stream.write(_b(" [ multipart\n"))
813        for name, content in sorted(details.items()):
814            self._stream.write(_b("Content-Type: %s/%s" %
815                (content.content_type.type, content.content_type.subtype)))
816            parameters = content.content_type.parameters
817            if parameters:
818                self._stream.write(_b(";"))
819                param_strs = []
820                for param, value in parameters.items():
821                    param_strs.append("%s=%s" % (param, value))
822                self._stream.write(_b(",".join(param_strs)))
823            self._stream.write(_b("\n%s\n" % name))
824            encoder = chunked.Encoder(self._stream)
825            list(map(encoder.write, content.iter_bytes()))
826            encoder.close()
827
828    def done(self):
829        """Obey the testtools result.done() interface."""
830
831
832def RemoteError(description=_u("")):
833    return (_StringException, _StringException(description), None)
834
835
836class RemotedTestCase(unittest.TestCase):
837    """A class to represent test cases run in child processes.
838
839    Instances of this class are used to provide the Python test API a TestCase
840    that can be printed to the screen, introspected for metadata and so on.
841    However, as they are a simply a memoisation of a test that was actually
842    run in the past by a separate process, they cannot perform any interactive
843    actions.
844    """
845
846    def __eq__ (self, other):
847        try:
848            return self.__description == other.__description
849        except AttributeError:
850            return False
851
852    def __init__(self, description):
853        """Create a psuedo test case with description description."""
854        self.__description = description
855
856    def error(self, label):
857        raise NotImplementedError("%s on RemotedTestCases is not permitted." %
858            label)
859
860    def setUp(self):
861        self.error("setUp")
862
863    def tearDown(self):
864        self.error("tearDown")
865
866    def shortDescription(self):
867        return self.__description
868
869    def id(self):
870        return "%s" % (self.__description,)
871
872    def __str__(self):
873        return "%s (%s)" % (self.__description, self._strclass())
874
875    def __repr__(self):
876        return "<%s description='%s'>" % \
877               (self._strclass(), self.__description)
878
879    def run(self, result=None):
880        if result is None: result = self.defaultTestResult()
881        result.startTest(self)
882        result.addError(self, RemoteError(_u("Cannot run RemotedTestCases.\n")))
883        result.stopTest(self)
884
885    def _strclass(self):
886        cls = self.__class__
887        return "%s.%s" % (cls.__module__, cls.__name__)
888
889
890class ExecTestCase(unittest.TestCase):
891    """A test case which runs external scripts for test fixtures."""
892
893    def __init__(self, methodName='runTest'):
894        """Create an instance of the class that will use the named test
895           method when executed. Raises a ValueError if the instance does
896           not have a method with the specified name.
897        """
898        unittest.TestCase.__init__(self, methodName)
899        testMethod = getattr(self, methodName)
900        self.script = join_dir(sys.modules[self.__class__.__module__].__file__,
901                               testMethod.__doc__)
902
903    def countTestCases(self):
904        return 1
905
906    def run(self, result=None):
907        if result is None: result = self.defaultTestResult()
908        self._run(result)
909
910    def debug(self):
911        """Run the test without collecting errors in a TestResult"""
912        self._run(testresult.TestResult())
913
914    def _run(self, result):
915        protocol = TestProtocolServer(result)
916        process = subprocess.Popen(self.script, shell=True,
917            stdout=subprocess.PIPE)
918        make_stream_binary(process.stdout)
919        output = process.communicate()[0]
920        protocol.readFrom(BytesIO(output))
921
922
923class IsolatedTestCase(unittest.TestCase):
924    """A TestCase which executes in a forked process.
925
926    Each test gets its own process, which has a performance overhead but will
927    provide excellent isolation from global state (such as django configs,
928    zope utilities and so on).
929    """
930
931    def run(self, result=None):
932        if result is None: result = self.defaultTestResult()
933        run_isolated(unittest.TestCase, self, result)
934
935
936class IsolatedTestSuite(unittest.TestSuite):
937    """A TestSuite which runs its tests in a forked process.
938
939    This decorator that will fork() before running the tests and report the
940    results from the child process using a Subunit stream.  This is useful for
941    handling tests that mutate global state, or are testing C extensions that
942    could crash the VM.
943    """
944
945    def run(self, result=None):
946        if result is None: result = testresult.TestResult()
947        run_isolated(unittest.TestSuite, self, result)
948
949
950def run_isolated(klass, self, result):
951    """Run a test suite or case in a subprocess, using the run method on klass.
952    """
953    c2pread, c2pwrite = os.pipe()
954    # fixme - error -> result
955    # now fork
956    pid = os.fork()
957    if pid == 0:
958        # Child
959        # Close parent's pipe ends
960        os.close(c2pread)
961        # Dup fds for child
962        os.dup2(c2pwrite, 1)
963        # Close pipe fds.
964        os.close(c2pwrite)
965
966        # at this point, sys.stdin is redirected, now we want
967        # to filter it to escape ]'s.
968        ### XXX: test and write that bit.
969        stream = os.fdopen(1, 'wb')
970        result = TestProtocolClient(stream)
971        klass.run(self, result)
972        stream.flush()
973        sys.stderr.flush()
974        # exit HARD, exit NOW.
975        os._exit(0)
976    else:
977        # Parent
978        # Close child pipe ends
979        os.close(c2pwrite)
980        # hookup a protocol engine
981        protocol = TestProtocolServer(result)
982        fileobj = os.fdopen(c2pread, 'rb')
983        protocol.readFrom(fileobj)
984        os.waitpid(pid, 0)
985        # TODO return code evaluation.
986    return result
987
988
989def TAP2SubUnit(tap, output_stream):
990    """Filter a TAP pipe into a subunit pipe.
991
992    This should be invoked once per TAP script, as TAP scripts get
993    mapped to a single runnable case with multiple components.
994
995    :param tap: A tap pipe/stream/file object - should emit unicode strings.
996    :param subunit: A pipe/stream/file object to write subunit results to.
997    :return: The exit code to exit with.
998    """
999    output = StreamResultToBytes(output_stream)
1000    UTF8_TEXT = 'text/plain; charset=UTF8'
1001    BEFORE_PLAN = 0
1002    AFTER_PLAN = 1
1003    SKIP_STREAM = 2
1004    state = BEFORE_PLAN
1005    plan_start = 1
1006    plan_stop = 0
1007    # Test data for the next test to emit
1008    test_name = None
1009    log = []
1010    result = None
1011    def missing_test(plan_start):
1012        output.status(test_id='test %d' % plan_start,
1013            test_status='fail', runnable=False,
1014            mime_type=UTF8_TEXT, eof=True, file_name="tap meta",
1015            file_bytes=b"test missing from TAP output")
1016    def _emit_test():
1017        "write out a test"
1018        if test_name is None:
1019            return
1020        if log:
1021            log_bytes = b'\n'.join(log_line.encode('utf8') for log_line in log)
1022            mime_type = UTF8_TEXT
1023            file_name = 'tap comment'
1024            eof = True
1025        else:
1026            log_bytes = None
1027            mime_type = None
1028            file_name = None
1029            eof = True
1030        del log[:]
1031        output.status(test_id=test_name, test_status=result,
1032            file_bytes=log_bytes, mime_type=mime_type, eof=eof,
1033            file_name=file_name, runnable=False)
1034    for line in tap:
1035        if state == BEFORE_PLAN:
1036            match = re.match("(\d+)\.\.(\d+)\s*(?:\#\s+(.*))?\n", line)
1037            if match:
1038                state = AFTER_PLAN
1039                _, plan_stop, comment = match.groups()
1040                plan_stop = int(plan_stop)
1041                if plan_start > plan_stop and plan_stop == 0:
1042                    # skipped file
1043                    state = SKIP_STREAM
1044                    output.status(test_id='file skip', test_status='skip',
1045                        file_bytes=comment.encode('utf8'), eof=True,
1046                        file_name='tap comment')
1047                continue
1048        # not a plan line, or have seen one before
1049        match = re.match("(ok|not ok)(?:\s+(\d+)?)?(?:\s+([^#]*[^#\s]+)\s*)?(?:\s+#\s+(TODO|SKIP|skip|todo)(?:\s+(.*))?)?\n", line)
1050        if match:
1051            # new test, emit current one.
1052            _emit_test()
1053            status, number, description, directive, directive_comment = match.groups()
1054            if status == 'ok':
1055                result = 'success'
1056            else:
1057                result = "fail"
1058            if description is None:
1059                description = ''
1060            else:
1061                description = ' ' + description
1062            if directive is not None:
1063                if directive.upper() == 'TODO':
1064                    result = 'xfail'
1065                elif directive.upper() == 'SKIP':
1066                    result = 'skip'
1067                if directive_comment is not None:
1068                    log.append(directive_comment)
1069            if number is not None:
1070                number = int(number)
1071                while plan_start < number:
1072                    missing_test(plan_start)
1073                    plan_start += 1
1074            test_name = "test %d%s" % (plan_start, description)
1075            plan_start += 1
1076            continue
1077        match = re.match("Bail out\!(?:\s*(.*))?\n", line)
1078        if match:
1079            reason, = match.groups()
1080            if reason is None:
1081                extra = ''
1082            else:
1083                extra = ' %s' % reason
1084            _emit_test()
1085            test_name = "Bail out!%s" % extra
1086            result = "fail"
1087            state = SKIP_STREAM
1088            continue
1089        match = re.match("\#.*\n", line)
1090        if match:
1091            log.append(line[:-1])
1092            continue
1093        # Should look at buffering status and binding this to the prior result.
1094        output.status(file_bytes=line.encode('utf8'), file_name='stdout',
1095            mime_type=UTF8_TEXT)
1096    _emit_test()
1097    while plan_start <= plan_stop:
1098        # record missed tests
1099        missing_test(plan_start)
1100        plan_start += 1
1101    return 0
1102
1103
1104def tag_stream(original, filtered, tags):
1105    """Alter tags on a stream.
1106
1107    :param original: The input stream.
1108    :param filtered: The output stream.
1109    :param tags: The tags to apply. As in a normal stream - a list of 'TAG' or
1110        '-TAG' commands.
1111
1112        A 'TAG' command will add the tag to the output stream,
1113        and override any existing '-TAG' command in that stream.
1114        Specifically:
1115         * A global 'tags: TAG' will be added to the start of the stream.
1116         * Any tags commands with -TAG will have the -TAG removed.
1117
1118        A '-TAG' command will remove the TAG command from the stream.
1119        Specifically:
1120         * A 'tags: -TAG' command will be added to the start of the stream.
1121         * Any 'tags: TAG' command will have 'TAG' removed from it.
1122        Additionally, any redundant tagging commands (adding a tag globally
1123        present, or removing a tag globally removed) are stripped as a
1124        by-product of the filtering.
1125    :return: 0
1126    """
1127    new_tags, gone_tags = tags_to_new_gone(tags)
1128    source = ByteStreamToStreamResult(original, non_subunit_name='stdout')
1129    class Tagger(CopyStreamResult):
1130        def status(self, **kwargs):
1131            tags = kwargs.get('test_tags')
1132            if not tags:
1133                tags = set()
1134            tags.update(new_tags)
1135            tags.difference_update(gone_tags)
1136            if tags:
1137                kwargs['test_tags'] = tags
1138            else:
1139                kwargs['test_tags'] = None
1140            super(Tagger, self).status(**kwargs)
1141    output = Tagger([StreamResultToBytes(filtered)])
1142    source.run(output)
1143    return 0
1144
1145
1146class ProtocolTestCase(object):
1147    """Subunit wire protocol to unittest.TestCase adapter.
1148
1149    ProtocolTestCase honours the core of ``unittest.TestCase`` protocol -
1150    calling a ProtocolTestCase or invoking the run() method will make a 'test
1151    run' happen. The 'test run' will simply be a replay of the test activity
1152    that has been encoded into the stream. The ``unittest.TestCase`` ``debug``
1153    and ``countTestCases`` methods are not supported because there isn't a
1154    sensible mapping for those methods.
1155
1156    # Get a stream (any object with a readline() method), in this case the
1157    # stream output by the example from ``subunit.TestProtocolClient``.
1158    stream = file('tests.log', 'rb')
1159    # Create a parser which will read from the stream and emit
1160    # activity to a unittest.TestResult when run() is called.
1161    suite = subunit.ProtocolTestCase(stream)
1162    # Create a result object to accept the contents of that stream.
1163    result = unittest._TextTestResult(sys.stdout)
1164    # 'run' the tests - process the stream and feed its contents to result.
1165    suite.run(result)
1166    stream.close()
1167
1168    :seealso: TestProtocolServer (the subunit wire protocol parser).
1169    """
1170
1171    def __init__(self, stream, passthrough=None, forward=None):
1172        """Create a ProtocolTestCase reading from stream.
1173
1174        :param stream: A filelike object which a subunit stream can be read
1175            from.
1176        :param passthrough: A stream pass non subunit input on to. If not
1177            supplied, the TestProtocolServer default is used.
1178        :param forward: A stream to pass subunit input on to. If not supplied
1179            subunit input is not forwarded.
1180        """
1181        stream = make_stream_binary(stream)
1182        self._stream = stream
1183        self._passthrough = passthrough
1184        if forward is not None:
1185            forward = make_stream_binary(forward)
1186        self._forward = forward
1187
1188    def __call__(self, result=None):
1189        return self.run(result)
1190
1191    def run(self, result=None):
1192        if result is None:
1193            result = self.defaultTestResult()
1194        protocol = TestProtocolServer(result, self._passthrough, self._forward)
1195        line = self._stream.readline()
1196        while line:
1197            protocol.lineReceived(line)
1198            line = self._stream.readline()
1199        protocol.lostConnection()
1200
1201
1202class TestResultStats(testresult.TestResult):
1203    """A pyunit TestResult interface implementation for making statistics.
1204
1205    :ivar total_tests: The total tests seen.
1206    :ivar passed_tests: The tests that passed.
1207    :ivar failed_tests: The tests that failed.
1208    :ivar seen_tags: The tags seen across all tests.
1209    """
1210
1211    def __init__(self, stream):
1212        """Create a TestResultStats which outputs to stream."""
1213        testresult.TestResult.__init__(self)
1214        self._stream = stream
1215        self.failed_tests = 0
1216        self.skipped_tests = 0
1217        self.seen_tags = set()
1218
1219    @property
1220    def total_tests(self):
1221        return self.testsRun
1222
1223    def addError(self, test, err, details=None):
1224        self.failed_tests += 1
1225
1226    def addFailure(self, test, err, details=None):
1227        self.failed_tests += 1
1228
1229    def addSkip(self, test, reason, details=None):
1230        self.skipped_tests += 1
1231
1232    def formatStats(self):
1233        self._stream.write("Total tests:   %5d\n" % self.total_tests)
1234        self._stream.write("Passed tests:  %5d\n" % self.passed_tests)
1235        self._stream.write("Failed tests:  %5d\n" % self.failed_tests)
1236        self._stream.write("Skipped tests: %5d\n" % self.skipped_tests)
1237        tags = sorted(self.seen_tags)
1238        self._stream.write("Seen tags: %s\n" % (", ".join(tags)))
1239
1240    @property
1241    def passed_tests(self):
1242        return self.total_tests - self.failed_tests - self.skipped_tests
1243
1244    def tags(self, new_tags, gone_tags):
1245        """Accumulate the seen tags."""
1246        self.seen_tags.update(new_tags)
1247
1248    def wasSuccessful(self):
1249        """Tells whether or not this result was a success"""
1250        return self.failed_tests == 0
1251
1252
1253def read_test_list(path):
1254    """Read a list of test ids from a file on disk.
1255
1256    :param path: Path to the file
1257    :return: Sequence of test ids
1258    """
1259    f = open(path, 'rb')
1260    try:
1261        return [l.rstrip("\n") for l in f.readlines()]
1262    finally:
1263        f.close()
1264
1265
1266def make_stream_binary(stream):
1267    """Ensure that a stream will be binary safe. See _make_binary_on_windows.
1268
1269    :return: A binary version of the same stream (some streams cannot be
1270        'fixed' but can be unwrapped).
1271    """
1272    try:
1273        fileno = stream.fileno()
1274    except (_UnsupportedOperation, AttributeError):
1275        pass
1276    else:
1277        _make_binary_on_windows(fileno)
1278    return _unwrap_text(stream)
1279
1280
1281def _make_binary_on_windows(fileno):
1282    """Win32 mangles \r\n to \n and that breaks streams. See bug lp:505078."""
1283    if sys.platform == "win32":
1284        import msvcrt
1285        msvcrt.setmode(fileno, os.O_BINARY)
1286
1287
1288def _unwrap_text(stream):
1289    """Unwrap stream if it is a text stream to get the original buffer."""
1290    exceptions = (_UnsupportedOperation, IOError)
1291    if sys.version_info > (3, 0):
1292        unicode_type = str
1293    else:
1294        unicode_type = unicode
1295        exceptions += (ValueError,)
1296    try:
1297        # Read streams
1298        if type(stream.read(0)) is unicode_type:
1299            return stream.buffer
1300    except exceptions:
1301        # Cannot read from the stream: try via writes
1302        try:
1303            stream.write(_b(''))
1304        except TypeError:
1305            return stream.buffer
1306    return stream
1307