1# 2# subunit: extensions to Python unittest to get test results from subprocesses. 3# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net> 4# 5# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause 6# license at the users choice. A copy of both licenses are available in the 7# project source as Apache-2.0 and BSD. You may not use this file except in 8# compliance with one of these two licences. 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# license you chose for the specific language governing permissions and 14# limitations under that license. 15# 16 17"""Subunit - a streaming test protocol 18 19Overview 20++++++++ 21 22The ``subunit`` Python package provides a number of ``unittest`` extensions 23which can be used to cause tests to output Subunit, to parse Subunit streams 24into test activity, perform seamless test isolation within a regular test 25case and variously sort, filter and report on test runs. 26 27 28Key Classes 29----------- 30 31The ``subunit.TestProtocolClient`` class is a ``unittest.TestResult`` 32extension which will translate a test run into a Subunit stream. 33 34The ``subunit.ProtocolTestCase`` class is an adapter between the Subunit wire 35protocol and the ``unittest.TestCase`` object protocol. It is used to translate 36a stream into a test run, which regular ``unittest.TestResult`` objects can 37process and report/inspect. 38 39Subunit has support for non-blocking usage too, for use with asyncore or 40Twisted. See the ``TestProtocolServer`` parser class for more details. 41 42Subunit includes extensions to the Python ``TestResult`` protocol. These are 43all done in a compatible manner: ``TestResult`` objects that do not implement 44the extension methods will not cause errors to be raised, instead the extension 45will either lose fidelity (for instance, folding expected failures to success 46in Python versions < 2.7 or 3.1), or discard the extended data (for extra 47details, tags, timestamping and progress markers). 48 49The test outcome methods ``addSuccess``, ``addError``, ``addExpectedFailure``, 50``addFailure``, ``addSkip`` take an optional keyword parameter ``details`` 51which can be used instead of the usual python unittest parameter. 52When used the value of details should be a dict from ``string`` to 53``testtools.content.Content`` objects. This is a draft API being worked on with 54the Python Testing In Python mail list, with the goal of permitting a common 55way to provide additional data beyond a traceback, such as captured data from 56disk, logging messages etc. The reference for this API is in testtools (0.9.0 57and newer). 58 59The ``tags(new_tags, gone_tags)`` method is called (if present) to add or 60remove tags in the test run that is currently executing. If called when no 61test is in progress (that is, if called outside of the ``startTest``, 62``stopTest`` pair), the the tags apply to all subsequent tests. If called 63when a test is in progress, then the tags only apply to that test. 64 65The ``time(a_datetime)`` method is called (if present) when a ``time:`` 66directive is encountered in a Subunit stream. This is used to tell a TestResult 67about the time that events in the stream occurred at, to allow reconstructing 68test timing from a stream. 69 70The ``progress(offset, whence)`` method controls progress data for a stream. 71The offset parameter is an int, and whence is one of subunit.PROGRESS_CUR, 72subunit.PROGRESS_SET, PROGRESS_PUSH, PROGRESS_POP. Push and pop operations 73ignore the offset parameter. 74 75 76Python test support 77------------------- 78 79``subunit.run`` is a convenience wrapper to run a Python test suite via 80the command line, reporting via Subunit:: 81 82 $ python -m subunit.run mylib.tests.test_suite 83 84The ``IsolatedTestSuite`` class is a TestSuite that forks before running its 85tests, allowing isolation between the test runner and some tests. 86 87Similarly, ``IsolatedTestCase`` is a base class which can be subclassed to get 88tests that will fork() before that individual test is run. 89 90`ExecTestCase`` is a convenience wrapper for running an external 91program to get a Subunit stream and then report that back to an arbitrary 92result object:: 93 94 class AggregateTests(subunit.ExecTestCase): 95 96 def test_script_one(self): 97 './bin/script_one' 98 99 def test_script_two(self): 100 './bin/script_two' 101 102 # Normally your normal test loading would take of this automatically, 103 # It is only spelt out in detail here for clarity. 104 suite = unittest.TestSuite([AggregateTests("test_script_one"), 105 AggregateTests("test_script_two")]) 106 # Create any TestResult class you like. 107 result = unittest._TextTestResult(sys.stdout) 108 # And run your suite as normal, Subunit will exec each external script as 109 # needed and report to your result object. 110 suite.run(result) 111 112Utility modules 113--------------- 114 115* subunit.chunked contains HTTP chunked encoding/decoding logic. 116* subunit.test_results contains TestResult helper classes. 117""" 118 119import os 120import re 121import subprocess 122import sys 123import unittest 124try: 125 from io import UnsupportedOperation as _UnsupportedOperation 126except ImportError: 127 _UnsupportedOperation = AttributeError 128 129from extras import safe_hasattr 130from testtools import content, content_type, ExtendedToOriginalDecorator 131from testtools.content import TracebackContent 132from testtools.compat import _b, _u, BytesIO, StringIO 133try: 134 from testtools.testresult.real import _StringException 135 RemoteException = _StringException 136except ImportError: 137 raise ImportError ("testtools.testresult.real does not contain " 138 "_StringException, check your version.") 139from testtools import testresult, CopyStreamResult 140 141from subunit import chunked, details, iso8601, test_results 142from subunit.v2 import ByteStreamToStreamResult, StreamResultToBytes 143 144# same format as sys.version_info: "A tuple containing the five components of 145# the version number: major, minor, micro, releaselevel, and serial. All 146# values except releaselevel are integers; the release level is 'alpha', 147# 'beta', 'candidate', or 'final'. The version_info value corresponding to the 148# Python version 2.0 is (2, 0, 0, 'final', 0)." Additionally we use a 149# releaselevel of 'dev' for unreleased under-development code. 150# 151# If the releaselevel is 'alpha' then the major/minor/micro components are not 152# established at this point, and setup.py will use a version of next-$(revno). 153# If the releaselevel is 'final', then the tarball will be major.minor.micro. 154# Otherwise it is major.minor.micro~$(revno). 155 156__version__ = (1, 3, 0, 'final', 0) 157 158PROGRESS_SET = 0 159PROGRESS_CUR = 1 160PROGRESS_PUSH = 2 161PROGRESS_POP = 3 162 163 164def test_suite(): 165 import subunit.tests 166 return subunit.tests.test_suite() 167 168 169def join_dir(base_path, path): 170 """ 171 Returns an absolute path to C{path}, calculated relative to the parent 172 of C{base_path}. 173 174 @param base_path: A path to a file or directory. 175 @param path: An absolute path, or a path relative to the containing 176 directory of C{base_path}. 177 178 @return: An absolute path to C{path}. 179 """ 180 return os.path.join(os.path.dirname(os.path.abspath(base_path)), path) 181 182 183def tags_to_new_gone(tags): 184 """Split a list of tags into a new_set and a gone_set.""" 185 new_tags = set() 186 gone_tags = set() 187 for tag in tags: 188 if tag[0] == '-': 189 gone_tags.add(tag[1:]) 190 else: 191 new_tags.add(tag) 192 return new_tags, gone_tags 193 194 195class DiscardStream(object): 196 """A filelike object which discards what is written to it.""" 197 198 def fileno(self): 199 raise _UnsupportedOperation() 200 201 def write(self, bytes): 202 pass 203 204 def read(self, len=0): 205 return _b('') 206 207 208class _ParserState(object): 209 """State for the subunit parser.""" 210 211 def __init__(self, parser): 212 self.parser = parser 213 self._test_sym = (_b('test'), _b('testing')) 214 self._colon_sym = _b(':') 215 self._error_sym = (_b('error'),) 216 self._failure_sym = (_b('failure'),) 217 self._progress_sym = (_b('progress'),) 218 self._skip_sym = _b('skip') 219 self._success_sym = (_b('success'), _b('successful')) 220 self._tags_sym = (_b('tags'),) 221 self._time_sym = (_b('time'),) 222 self._xfail_sym = (_b('xfail'),) 223 self._uxsuccess_sym = (_b('uxsuccess'),) 224 self._start_simple = _u(" [") 225 self._start_multipart = _u(" [ multipart") 226 227 def addError(self, offset, line): 228 """An 'error:' directive has been read.""" 229 self.parser.stdOutLineReceived(line) 230 231 def addExpectedFail(self, offset, line): 232 """An 'xfail:' directive has been read.""" 233 self.parser.stdOutLineReceived(line) 234 235 def addFailure(self, offset, line): 236 """A 'failure:' directive has been read.""" 237 self.parser.stdOutLineReceived(line) 238 239 def addSkip(self, offset, line): 240 """A 'skip:' directive has been read.""" 241 self.parser.stdOutLineReceived(line) 242 243 def addSuccess(self, offset, line): 244 """A 'success:' directive has been read.""" 245 self.parser.stdOutLineReceived(line) 246 247 def lineReceived(self, line): 248 """a line has been received.""" 249 parts = line.split(None, 1) 250 if len(parts) == 2 and line.startswith(parts[0]): 251 cmd, rest = parts 252 offset = len(cmd) + 1 253 cmd = cmd.rstrip(self._colon_sym) 254 if cmd in self._test_sym: 255 self.startTest(offset, line) 256 elif cmd in self._error_sym: 257 self.addError(offset, line) 258 elif cmd in self._failure_sym: 259 self.addFailure(offset, line) 260 elif cmd in self._progress_sym: 261 self.parser._handleProgress(offset, line) 262 elif cmd in self._skip_sym: 263 self.addSkip(offset, line) 264 elif cmd in self._success_sym: 265 self.addSuccess(offset, line) 266 elif cmd in self._tags_sym: 267 self.parser._handleTags(offset, line) 268 self.parser.subunitLineReceived(line) 269 elif cmd in self._time_sym: 270 self.parser._handleTime(offset, line) 271 self.parser.subunitLineReceived(line) 272 elif cmd in self._xfail_sym: 273 self.addExpectedFail(offset, line) 274 elif cmd in self._uxsuccess_sym: 275 self.addUnexpectedSuccess(offset, line) 276 else: 277 self.parser.stdOutLineReceived(line) 278 else: 279 self.parser.stdOutLineReceived(line) 280 281 def lostConnection(self): 282 """Connection lost.""" 283 self.parser._lostConnectionInTest(_u('unknown state of ')) 284 285 def startTest(self, offset, line): 286 """A test start command received.""" 287 self.parser.stdOutLineReceived(line) 288 289 290class _InTest(_ParserState): 291 """State for the subunit parser after reading a test: directive.""" 292 293 def _outcome(self, offset, line, no_details, details_state): 294 """An outcome directive has been read. 295 296 :param no_details: Callable to call when no details are presented. 297 :param details_state: The state to switch to for details 298 processing of this outcome. 299 """ 300 test_name = line[offset:-1].decode('utf8') 301 if self.parser.current_test_description == test_name: 302 self.parser._state = self.parser._outside_test 303 self.parser.current_test_description = None 304 no_details() 305 self.parser.client.stopTest(self.parser._current_test) 306 self.parser._current_test = None 307 self.parser.subunitLineReceived(line) 308 elif self.parser.current_test_description + self._start_simple == \ 309 test_name: 310 self.parser._state = details_state 311 details_state.set_simple() 312 self.parser.subunitLineReceived(line) 313 elif self.parser.current_test_description + self._start_multipart == \ 314 test_name: 315 self.parser._state = details_state 316 details_state.set_multipart() 317 self.parser.subunitLineReceived(line) 318 else: 319 self.parser.stdOutLineReceived(line) 320 321 def _error(self): 322 self.parser.client.addError(self.parser._current_test, 323 details={}) 324 325 def addError(self, offset, line): 326 """An 'error:' directive has been read.""" 327 self._outcome(offset, line, self._error, 328 self.parser._reading_error_details) 329 330 def _xfail(self): 331 self.parser.client.addExpectedFailure(self.parser._current_test, 332 details={}) 333 334 def addExpectedFail(self, offset, line): 335 """An 'xfail:' directive has been read.""" 336 self._outcome(offset, line, self._xfail, 337 self.parser._reading_xfail_details) 338 339 def _uxsuccess(self): 340 self.parser.client.addUnexpectedSuccess(self.parser._current_test) 341 342 def addUnexpectedSuccess(self, offset, line): 343 """A 'uxsuccess:' directive has been read.""" 344 self._outcome(offset, line, self._uxsuccess, 345 self.parser._reading_uxsuccess_details) 346 347 def _failure(self): 348 self.parser.client.addFailure(self.parser._current_test, details={}) 349 350 def addFailure(self, offset, line): 351 """A 'failure:' directive has been read.""" 352 self._outcome(offset, line, self._failure, 353 self.parser._reading_failure_details) 354 355 def _skip(self): 356 self.parser.client.addSkip(self.parser._current_test, details={}) 357 358 def addSkip(self, offset, line): 359 """A 'skip:' directive has been read.""" 360 self._outcome(offset, line, self._skip, 361 self.parser._reading_skip_details) 362 363 def _succeed(self): 364 self.parser.client.addSuccess(self.parser._current_test, details={}) 365 366 def addSuccess(self, offset, line): 367 """A 'success:' directive has been read.""" 368 self._outcome(offset, line, self._succeed, 369 self.parser._reading_success_details) 370 371 def lostConnection(self): 372 """Connection lost.""" 373 self.parser._lostConnectionInTest(_u('')) 374 375 376class _OutSideTest(_ParserState): 377 """State for the subunit parser outside of a test context.""" 378 379 def lostConnection(self): 380 """Connection lost.""" 381 382 def startTest(self, offset, line): 383 """A test start command received.""" 384 self.parser._state = self.parser._in_test 385 test_name = line[offset:-1].decode('utf8') 386 self.parser._current_test = RemotedTestCase(test_name) 387 self.parser.current_test_description = test_name 388 self.parser.client.startTest(self.parser._current_test) 389 self.parser.subunitLineReceived(line) 390 391 392class _ReadingDetails(_ParserState): 393 """Common logic for readin state details.""" 394 395 def endDetails(self): 396 """The end of a details section has been reached.""" 397 self.parser._state = self.parser._outside_test 398 self.parser.current_test_description = None 399 self._report_outcome() 400 self.parser.client.stopTest(self.parser._current_test) 401 402 def lineReceived(self, line): 403 """a line has been received.""" 404 self.details_parser.lineReceived(line) 405 self.parser.subunitLineReceived(line) 406 407 def lostConnection(self): 408 """Connection lost.""" 409 self.parser._lostConnectionInTest(_u('%s report of ') % 410 self._outcome_label()) 411 412 def _outcome_label(self): 413 """The label to describe this outcome.""" 414 raise NotImplementedError(self._outcome_label) 415 416 def set_simple(self): 417 """Start a simple details parser.""" 418 self.details_parser = details.SimpleDetailsParser(self) 419 420 def set_multipart(self): 421 """Start a multipart details parser.""" 422 self.details_parser = details.MultipartDetailsParser(self) 423 424 425class _ReadingFailureDetails(_ReadingDetails): 426 """State for the subunit parser when reading failure details.""" 427 428 def _report_outcome(self): 429 self.parser.client.addFailure(self.parser._current_test, 430 details=self.details_parser.get_details()) 431 432 def _outcome_label(self): 433 return "failure" 434 435 436class _ReadingErrorDetails(_ReadingDetails): 437 """State for the subunit parser when reading error details.""" 438 439 def _report_outcome(self): 440 self.parser.client.addError(self.parser._current_test, 441 details=self.details_parser.get_details()) 442 443 def _outcome_label(self): 444 return "error" 445 446 447class _ReadingExpectedFailureDetails(_ReadingDetails): 448 """State for the subunit parser when reading xfail details.""" 449 450 def _report_outcome(self): 451 self.parser.client.addExpectedFailure(self.parser._current_test, 452 details=self.details_parser.get_details()) 453 454 def _outcome_label(self): 455 return "xfail" 456 457 458class _ReadingUnexpectedSuccessDetails(_ReadingDetails): 459 """State for the subunit parser when reading uxsuccess details.""" 460 461 def _report_outcome(self): 462 self.parser.client.addUnexpectedSuccess(self.parser._current_test, 463 details=self.details_parser.get_details()) 464 465 def _outcome_label(self): 466 return "uxsuccess" 467 468 469class _ReadingSkipDetails(_ReadingDetails): 470 """State for the subunit parser when reading skip details.""" 471 472 def _report_outcome(self): 473 self.parser.client.addSkip(self.parser._current_test, 474 details=self.details_parser.get_details("skip")) 475 476 def _outcome_label(self): 477 return "skip" 478 479 480class _ReadingSuccessDetails(_ReadingDetails): 481 """State for the subunit parser when reading success details.""" 482 483 def _report_outcome(self): 484 self.parser.client.addSuccess(self.parser._current_test, 485 details=self.details_parser.get_details("success")) 486 487 def _outcome_label(self): 488 return "success" 489 490 491class TestProtocolServer(object): 492 """A parser for subunit. 493 494 :ivar tags: The current tags associated with the protocol stream. 495 """ 496 497 def __init__(self, client, stream=None, forward_stream=None): 498 """Create a TestProtocolServer instance. 499 500 :param client: An object meeting the unittest.TestResult protocol. 501 :param stream: The stream that lines received which are not part of the 502 subunit protocol should be written to. This allows custom handling 503 of mixed protocols. By default, sys.stdout will be used for 504 convenience. It should accept bytes to its write() method. 505 :param forward_stream: A stream to forward subunit lines to. This 506 allows a filter to forward the entire stream while still parsing 507 and acting on it. By default forward_stream is set to 508 DiscardStream() and no forwarding happens. 509 """ 510 self.client = ExtendedToOriginalDecorator(client) 511 if stream is None: 512 stream = sys.stdout 513 if sys.version_info > (3, 0): 514 stream = stream.buffer 515 self._stream = stream 516 self._forward_stream = forward_stream or DiscardStream() 517 # state objects we can switch too 518 self._in_test = _InTest(self) 519 self._outside_test = _OutSideTest(self) 520 self._reading_error_details = _ReadingErrorDetails(self) 521 self._reading_failure_details = _ReadingFailureDetails(self) 522 self._reading_skip_details = _ReadingSkipDetails(self) 523 self._reading_success_details = _ReadingSuccessDetails(self) 524 self._reading_xfail_details = _ReadingExpectedFailureDetails(self) 525 self._reading_uxsuccess_details = _ReadingUnexpectedSuccessDetails(self) 526 # start with outside test. 527 self._state = self._outside_test 528 # Avoid casts on every call 529 self._plusminus = _b('+-') 530 self._push_sym = _b('push') 531 self._pop_sym = _b('pop') 532 533 def _handleProgress(self, offset, line): 534 """Process a progress directive.""" 535 line = line[offset:].strip() 536 if line[0] in self._plusminus: 537 whence = PROGRESS_CUR 538 delta = int(line) 539 elif line == self._push_sym: 540 whence = PROGRESS_PUSH 541 delta = None 542 elif line == self._pop_sym: 543 whence = PROGRESS_POP 544 delta = None 545 else: 546 whence = PROGRESS_SET 547 delta = int(line) 548 self.client.progress(delta, whence) 549 550 def _handleTags(self, offset, line): 551 """Process a tags command.""" 552 tags = line[offset:].decode('utf8').split() 553 new_tags, gone_tags = tags_to_new_gone(tags) 554 self.client.tags(new_tags, gone_tags) 555 556 def _handleTime(self, offset, line): 557 # Accept it, but do not do anything with it yet. 558 try: 559 event_time = iso8601.parse_date(line[offset:-1]) 560 except TypeError: 561 raise TypeError(_u("Failed to parse %r, got %r") 562 % (line, sys.exec_info[1])) 563 self.client.time(event_time) 564 565 def lineReceived(self, line): 566 """Call the appropriate local method for the received line.""" 567 self._state.lineReceived(line) 568 569 def _lostConnectionInTest(self, state_string): 570 error_string = _u("lost connection during %stest '%s'") % ( 571 state_string, self.current_test_description) 572 self.client.addError(self._current_test, RemoteError(error_string)) 573 self.client.stopTest(self._current_test) 574 575 def lostConnection(self): 576 """The input connection has finished.""" 577 self._state.lostConnection() 578 579 def readFrom(self, pipe): 580 """Blocking convenience API to parse an entire stream. 581 582 :param pipe: A file-like object supporting readlines(). 583 :return: None. 584 """ 585 for line in pipe.readlines(): 586 self.lineReceived(line) 587 self.lostConnection() 588 589 def _startTest(self, offset, line): 590 """Internal call to change state machine. Override startTest().""" 591 self._state.startTest(offset, line) 592 593 def subunitLineReceived(self, line): 594 self._forward_stream.write(line) 595 596 def stdOutLineReceived(self, line): 597 self._stream.write(line) 598 599 600class TestProtocolClient(testresult.TestResult): 601 """A TestResult which generates a subunit stream for a test run. 602 603 # Get a TestSuite or TestCase to run 604 suite = make_suite() 605 # Create a stream (any object with a 'write' method). This should accept 606 # bytes not strings: subunit is a byte orientated protocol. 607 stream = file('tests.log', 'wb') 608 # Create a subunit result object which will output to the stream 609 result = subunit.TestProtocolClient(stream) 610 # Optionally, to get timing data for performance analysis, wrap the 611 # serialiser with a timing decorator 612 result = subunit.test_results.AutoTimingTestResultDecorator(result) 613 # Run the test suite reporting to the subunit result object 614 suite.run(result) 615 # Close the stream. 616 stream.close() 617 """ 618 619 def __init__(self, stream): 620 testresult.TestResult.__init__(self) 621 stream = make_stream_binary(stream) 622 self._stream = stream 623 self._progress_fmt = _b("progress: ") 624 self._bytes_eol = _b("\n") 625 self._progress_plus = _b("+") 626 self._progress_push = _b("push") 627 self._progress_pop = _b("pop") 628 self._empty_bytes = _b("") 629 self._start_simple = _b(" [\n") 630 self._end_simple = _b("]\n") 631 632 def addError(self, test, error=None, details=None): 633 """Report an error in test test. 634 635 Only one of error and details should be provided: conceptually there 636 are two separate methods: 637 addError(self, test, error) 638 addError(self, test, details) 639 640 :param error: Standard unittest positional argument form - an 641 exc_info tuple. 642 :param details: New Testing-in-python drafted API; a dict from string 643 to subunit.Content objects. 644 """ 645 self._addOutcome("error", test, error=error, details=details) 646 if self.failfast: 647 self.stop() 648 649 def addExpectedFailure(self, test, error=None, details=None): 650 """Report an expected failure in test test. 651 652 Only one of error and details should be provided: conceptually there 653 are two separate methods: 654 addError(self, test, error) 655 addError(self, test, details) 656 657 :param error: Standard unittest positional argument form - an 658 exc_info tuple. 659 :param details: New Testing-in-python drafted API; a dict from string 660 to subunit.Content objects. 661 """ 662 self._addOutcome("xfail", test, error=error, details=details) 663 664 def addFailure(self, test, error=None, details=None): 665 """Report a failure in test test. 666 667 Only one of error and details should be provided: conceptually there 668 are two separate methods: 669 addFailure(self, test, error) 670 addFailure(self, test, details) 671 672 :param error: Standard unittest positional argument form - an 673 exc_info tuple. 674 :param details: New Testing-in-python drafted API; a dict from string 675 to subunit.Content objects. 676 """ 677 self._addOutcome("failure", test, error=error, details=details) 678 if self.failfast: 679 self.stop() 680 681 def _addOutcome(self, outcome, test, error=None, details=None, 682 error_permitted=True): 683 """Report a failure in test test. 684 685 Only one of error and details should be provided: conceptually there 686 are two separate methods: 687 addOutcome(self, test, error) 688 addOutcome(self, test, details) 689 690 :param outcome: A string describing the outcome - used as the 691 event name in the subunit stream. 692 :param error: Standard unittest positional argument form - an 693 exc_info tuple. 694 :param details: New Testing-in-python drafted API; a dict from string 695 to subunit.Content objects. 696 :param error_permitted: If True then one and only one of error or 697 details must be supplied. If False then error must not be supplied 698 and details is still optional. """ 699 self._stream.write(_b("%s: " % outcome) + self._test_id(test)) 700 if error_permitted: 701 if error is None and details is None: 702 raise ValueError 703 else: 704 if error is not None: 705 raise ValueError 706 if error is not None: 707 self._stream.write(self._start_simple) 708 tb_content = TracebackContent(error, test) 709 for bytes in tb_content.iter_bytes(): 710 self._stream.write(bytes) 711 elif details is not None: 712 self._write_details(details) 713 else: 714 self._stream.write(_b("\n")) 715 if details is not None or error is not None: 716 self._stream.write(self._end_simple) 717 718 def addSkip(self, test, reason=None, details=None): 719 """Report a skipped test.""" 720 if reason is None: 721 self._addOutcome("skip", test, error=None, details=details) 722 else: 723 self._stream.write(_b("skip: %s [\n" % test.id())) 724 self._stream.write(_b("%s\n" % reason)) 725 self._stream.write(self._end_simple) 726 727 def addSuccess(self, test, details=None): 728 """Report a success in a test.""" 729 self._addOutcome("successful", test, details=details, error_permitted=False) 730 731 def addUnexpectedSuccess(self, test, details=None): 732 """Report an unexpected success in test test. 733 734 Details can optionally be provided: conceptually there 735 are two separate methods: 736 addError(self, test) 737 addError(self, test, details) 738 739 :param details: New Testing-in-python drafted API; a dict from string 740 to subunit.Content objects. 741 """ 742 self._addOutcome("uxsuccess", test, details=details, 743 error_permitted=False) 744 if self.failfast: 745 self.stop() 746 747 def _test_id(self, test): 748 result = test.id() 749 if type(result) is not bytes: 750 result = result.encode('utf8') 751 return result 752 753 def startTest(self, test): 754 """Mark a test as starting its test run.""" 755 super(TestProtocolClient, self).startTest(test) 756 self._stream.write(_b("test: ") + self._test_id(test) + _b("\n")) 757 self._stream.flush() 758 759 def stopTest(self, test): 760 super(TestProtocolClient, self).stopTest(test) 761 self._stream.flush() 762 763 def progress(self, offset, whence): 764 """Provide indication about the progress/length of the test run. 765 766 :param offset: Information about the number of tests remaining. If 767 whence is PROGRESS_CUR, then offset increases/decreases the 768 remaining test count. If whence is PROGRESS_SET, then offset 769 specifies exactly the remaining test count. 770 :param whence: One of PROGRESS_CUR, PROGRESS_SET, PROGRESS_PUSH, 771 PROGRESS_POP. 772 """ 773 if whence == PROGRESS_CUR and offset > -1: 774 prefix = self._progress_plus 775 offset = _b(str(offset)) 776 elif whence == PROGRESS_PUSH: 777 prefix = self._empty_bytes 778 offset = self._progress_push 779 elif whence == PROGRESS_POP: 780 prefix = self._empty_bytes 781 offset = self._progress_pop 782 else: 783 prefix = self._empty_bytes 784 offset = _b(str(offset)) 785 self._stream.write(self._progress_fmt + prefix + offset + 786 self._bytes_eol) 787 788 def tags(self, new_tags, gone_tags): 789 """Inform the client about tags added/removed from the stream.""" 790 if not new_tags and not gone_tags: 791 return 792 tags = set([tag.encode('utf8') for tag in new_tags]) 793 tags.update([_b("-") + tag.encode('utf8') for tag in gone_tags]) 794 tag_line = _b("tags: ") + _b(" ").join(tags) + _b("\n") 795 self._stream.write(tag_line) 796 797 def time(self, a_datetime): 798 """Inform the client of the time. 799 800 ":param datetime: A datetime.datetime object. 801 """ 802 time = a_datetime.astimezone(iso8601.Utc()) 803 self._stream.write(_b("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % ( 804 time.year, time.month, time.day, time.hour, time.minute, 805 time.second, time.microsecond))) 806 807 def _write_details(self, details): 808 """Output details to the stream. 809 810 :param details: An extended details dict for a test outcome. 811 """ 812 self._stream.write(_b(" [ multipart\n")) 813 for name, content in sorted(details.items()): 814 self._stream.write(_b("Content-Type: %s/%s" % 815 (content.content_type.type, content.content_type.subtype))) 816 parameters = content.content_type.parameters 817 if parameters: 818 self._stream.write(_b(";")) 819 param_strs = [] 820 for param, value in parameters.items(): 821 param_strs.append("%s=%s" % (param, value)) 822 self._stream.write(_b(",".join(param_strs))) 823 self._stream.write(_b("\n%s\n" % name)) 824 encoder = chunked.Encoder(self._stream) 825 list(map(encoder.write, content.iter_bytes())) 826 encoder.close() 827 828 def done(self): 829 """Obey the testtools result.done() interface.""" 830 831 832def RemoteError(description=_u("")): 833 return (_StringException, _StringException(description), None) 834 835 836class RemotedTestCase(unittest.TestCase): 837 """A class to represent test cases run in child processes. 838 839 Instances of this class are used to provide the Python test API a TestCase 840 that can be printed to the screen, introspected for metadata and so on. 841 However, as they are a simply a memoisation of a test that was actually 842 run in the past by a separate process, they cannot perform any interactive 843 actions. 844 """ 845 846 def __eq__ (self, other): 847 try: 848 return self.__description == other.__description 849 except AttributeError: 850 return False 851 852 def __init__(self, description): 853 """Create a psuedo test case with description description.""" 854 self.__description = description 855 856 def error(self, label): 857 raise NotImplementedError("%s on RemotedTestCases is not permitted." % 858 label) 859 860 def setUp(self): 861 self.error("setUp") 862 863 def tearDown(self): 864 self.error("tearDown") 865 866 def shortDescription(self): 867 return self.__description 868 869 def id(self): 870 return "%s" % (self.__description,) 871 872 def __str__(self): 873 return "%s (%s)" % (self.__description, self._strclass()) 874 875 def __repr__(self): 876 return "<%s description='%s'>" % \ 877 (self._strclass(), self.__description) 878 879 def run(self, result=None): 880 if result is None: result = self.defaultTestResult() 881 result.startTest(self) 882 result.addError(self, RemoteError(_u("Cannot run RemotedTestCases.\n"))) 883 result.stopTest(self) 884 885 def _strclass(self): 886 cls = self.__class__ 887 return "%s.%s" % (cls.__module__, cls.__name__) 888 889 890class ExecTestCase(unittest.TestCase): 891 """A test case which runs external scripts for test fixtures.""" 892 893 def __init__(self, methodName='runTest'): 894 """Create an instance of the class that will use the named test 895 method when executed. Raises a ValueError if the instance does 896 not have a method with the specified name. 897 """ 898 unittest.TestCase.__init__(self, methodName) 899 testMethod = getattr(self, methodName) 900 self.script = join_dir(sys.modules[self.__class__.__module__].__file__, 901 testMethod.__doc__) 902 903 def countTestCases(self): 904 return 1 905 906 def run(self, result=None): 907 if result is None: result = self.defaultTestResult() 908 self._run(result) 909 910 def debug(self): 911 """Run the test without collecting errors in a TestResult""" 912 self._run(testresult.TestResult()) 913 914 def _run(self, result): 915 protocol = TestProtocolServer(result) 916 process = subprocess.Popen(self.script, shell=True, 917 stdout=subprocess.PIPE) 918 make_stream_binary(process.stdout) 919 output = process.communicate()[0] 920 protocol.readFrom(BytesIO(output)) 921 922 923class IsolatedTestCase(unittest.TestCase): 924 """A TestCase which executes in a forked process. 925 926 Each test gets its own process, which has a performance overhead but will 927 provide excellent isolation from global state (such as django configs, 928 zope utilities and so on). 929 """ 930 931 def run(self, result=None): 932 if result is None: result = self.defaultTestResult() 933 run_isolated(unittest.TestCase, self, result) 934 935 936class IsolatedTestSuite(unittest.TestSuite): 937 """A TestSuite which runs its tests in a forked process. 938 939 This decorator that will fork() before running the tests and report the 940 results from the child process using a Subunit stream. This is useful for 941 handling tests that mutate global state, or are testing C extensions that 942 could crash the VM. 943 """ 944 945 def run(self, result=None): 946 if result is None: result = testresult.TestResult() 947 run_isolated(unittest.TestSuite, self, result) 948 949 950def run_isolated(klass, self, result): 951 """Run a test suite or case in a subprocess, using the run method on klass. 952 """ 953 c2pread, c2pwrite = os.pipe() 954 # fixme - error -> result 955 # now fork 956 pid = os.fork() 957 if pid == 0: 958 # Child 959 # Close parent's pipe ends 960 os.close(c2pread) 961 # Dup fds for child 962 os.dup2(c2pwrite, 1) 963 # Close pipe fds. 964 os.close(c2pwrite) 965 966 # at this point, sys.stdin is redirected, now we want 967 # to filter it to escape ]'s. 968 ### XXX: test and write that bit. 969 stream = os.fdopen(1, 'wb') 970 result = TestProtocolClient(stream) 971 klass.run(self, result) 972 stream.flush() 973 sys.stderr.flush() 974 # exit HARD, exit NOW. 975 os._exit(0) 976 else: 977 # Parent 978 # Close child pipe ends 979 os.close(c2pwrite) 980 # hookup a protocol engine 981 protocol = TestProtocolServer(result) 982 fileobj = os.fdopen(c2pread, 'rb') 983 protocol.readFrom(fileobj) 984 os.waitpid(pid, 0) 985 # TODO return code evaluation. 986 return result 987 988 989def TAP2SubUnit(tap, output_stream): 990 """Filter a TAP pipe into a subunit pipe. 991 992 This should be invoked once per TAP script, as TAP scripts get 993 mapped to a single runnable case with multiple components. 994 995 :param tap: A tap pipe/stream/file object - should emit unicode strings. 996 :param subunit: A pipe/stream/file object to write subunit results to. 997 :return: The exit code to exit with. 998 """ 999 output = StreamResultToBytes(output_stream) 1000 UTF8_TEXT = 'text/plain; charset=UTF8' 1001 BEFORE_PLAN = 0 1002 AFTER_PLAN = 1 1003 SKIP_STREAM = 2 1004 state = BEFORE_PLAN 1005 plan_start = 1 1006 plan_stop = 0 1007 # Test data for the next test to emit 1008 test_name = None 1009 log = [] 1010 result = None 1011 def missing_test(plan_start): 1012 output.status(test_id='test %d' % plan_start, 1013 test_status='fail', runnable=False, 1014 mime_type=UTF8_TEXT, eof=True, file_name="tap meta", 1015 file_bytes=b"test missing from TAP output") 1016 def _emit_test(): 1017 "write out a test" 1018 if test_name is None: 1019 return 1020 if log: 1021 log_bytes = b'\n'.join(log_line.encode('utf8') for log_line in log) 1022 mime_type = UTF8_TEXT 1023 file_name = 'tap comment' 1024 eof = True 1025 else: 1026 log_bytes = None 1027 mime_type = None 1028 file_name = None 1029 eof = True 1030 del log[:] 1031 output.status(test_id=test_name, test_status=result, 1032 file_bytes=log_bytes, mime_type=mime_type, eof=eof, 1033 file_name=file_name, runnable=False) 1034 for line in tap: 1035 if state == BEFORE_PLAN: 1036 match = re.match("(\d+)\.\.(\d+)\s*(?:\#\s+(.*))?\n", line) 1037 if match: 1038 state = AFTER_PLAN 1039 _, plan_stop, comment = match.groups() 1040 plan_stop = int(plan_stop) 1041 if plan_start > plan_stop and plan_stop == 0: 1042 # skipped file 1043 state = SKIP_STREAM 1044 output.status(test_id='file skip', test_status='skip', 1045 file_bytes=comment.encode('utf8'), eof=True, 1046 file_name='tap comment') 1047 continue 1048 # not a plan line, or have seen one before 1049 match = re.match("(ok|not ok)(?:\s+(\d+)?)?(?:\s+([^#]*[^#\s]+)\s*)?(?:\s+#\s+(TODO|SKIP|skip|todo)(?:\s+(.*))?)?\n", line) 1050 if match: 1051 # new test, emit current one. 1052 _emit_test() 1053 status, number, description, directive, directive_comment = match.groups() 1054 if status == 'ok': 1055 result = 'success' 1056 else: 1057 result = "fail" 1058 if description is None: 1059 description = '' 1060 else: 1061 description = ' ' + description 1062 if directive is not None: 1063 if directive.upper() == 'TODO': 1064 result = 'xfail' 1065 elif directive.upper() == 'SKIP': 1066 result = 'skip' 1067 if directive_comment is not None: 1068 log.append(directive_comment) 1069 if number is not None: 1070 number = int(number) 1071 while plan_start < number: 1072 missing_test(plan_start) 1073 plan_start += 1 1074 test_name = "test %d%s" % (plan_start, description) 1075 plan_start += 1 1076 continue 1077 match = re.match("Bail out\!(?:\s*(.*))?\n", line) 1078 if match: 1079 reason, = match.groups() 1080 if reason is None: 1081 extra = '' 1082 else: 1083 extra = ' %s' % reason 1084 _emit_test() 1085 test_name = "Bail out!%s" % extra 1086 result = "fail" 1087 state = SKIP_STREAM 1088 continue 1089 match = re.match("\#.*\n", line) 1090 if match: 1091 log.append(line[:-1]) 1092 continue 1093 # Should look at buffering status and binding this to the prior result. 1094 output.status(file_bytes=line.encode('utf8'), file_name='stdout', 1095 mime_type=UTF8_TEXT) 1096 _emit_test() 1097 while plan_start <= plan_stop: 1098 # record missed tests 1099 missing_test(plan_start) 1100 plan_start += 1 1101 return 0 1102 1103 1104def tag_stream(original, filtered, tags): 1105 """Alter tags on a stream. 1106 1107 :param original: The input stream. 1108 :param filtered: The output stream. 1109 :param tags: The tags to apply. As in a normal stream - a list of 'TAG' or 1110 '-TAG' commands. 1111 1112 A 'TAG' command will add the tag to the output stream, 1113 and override any existing '-TAG' command in that stream. 1114 Specifically: 1115 * A global 'tags: TAG' will be added to the start of the stream. 1116 * Any tags commands with -TAG will have the -TAG removed. 1117 1118 A '-TAG' command will remove the TAG command from the stream. 1119 Specifically: 1120 * A 'tags: -TAG' command will be added to the start of the stream. 1121 * Any 'tags: TAG' command will have 'TAG' removed from it. 1122 Additionally, any redundant tagging commands (adding a tag globally 1123 present, or removing a tag globally removed) are stripped as a 1124 by-product of the filtering. 1125 :return: 0 1126 """ 1127 new_tags, gone_tags = tags_to_new_gone(tags) 1128 source = ByteStreamToStreamResult(original, non_subunit_name='stdout') 1129 class Tagger(CopyStreamResult): 1130 def status(self, **kwargs): 1131 tags = kwargs.get('test_tags') 1132 if not tags: 1133 tags = set() 1134 tags.update(new_tags) 1135 tags.difference_update(gone_tags) 1136 if tags: 1137 kwargs['test_tags'] = tags 1138 else: 1139 kwargs['test_tags'] = None 1140 super(Tagger, self).status(**kwargs) 1141 output = Tagger([StreamResultToBytes(filtered)]) 1142 source.run(output) 1143 return 0 1144 1145 1146class ProtocolTestCase(object): 1147 """Subunit wire protocol to unittest.TestCase adapter. 1148 1149 ProtocolTestCase honours the core of ``unittest.TestCase`` protocol - 1150 calling a ProtocolTestCase or invoking the run() method will make a 'test 1151 run' happen. The 'test run' will simply be a replay of the test activity 1152 that has been encoded into the stream. The ``unittest.TestCase`` ``debug`` 1153 and ``countTestCases`` methods are not supported because there isn't a 1154 sensible mapping for those methods. 1155 1156 # Get a stream (any object with a readline() method), in this case the 1157 # stream output by the example from ``subunit.TestProtocolClient``. 1158 stream = file('tests.log', 'rb') 1159 # Create a parser which will read from the stream and emit 1160 # activity to a unittest.TestResult when run() is called. 1161 suite = subunit.ProtocolTestCase(stream) 1162 # Create a result object to accept the contents of that stream. 1163 result = unittest._TextTestResult(sys.stdout) 1164 # 'run' the tests - process the stream and feed its contents to result. 1165 suite.run(result) 1166 stream.close() 1167 1168 :seealso: TestProtocolServer (the subunit wire protocol parser). 1169 """ 1170 1171 def __init__(self, stream, passthrough=None, forward=None): 1172 """Create a ProtocolTestCase reading from stream. 1173 1174 :param stream: A filelike object which a subunit stream can be read 1175 from. 1176 :param passthrough: A stream pass non subunit input on to. If not 1177 supplied, the TestProtocolServer default is used. 1178 :param forward: A stream to pass subunit input on to. If not supplied 1179 subunit input is not forwarded. 1180 """ 1181 stream = make_stream_binary(stream) 1182 self._stream = stream 1183 self._passthrough = passthrough 1184 if forward is not None: 1185 forward = make_stream_binary(forward) 1186 self._forward = forward 1187 1188 def __call__(self, result=None): 1189 return self.run(result) 1190 1191 def run(self, result=None): 1192 if result is None: 1193 result = self.defaultTestResult() 1194 protocol = TestProtocolServer(result, self._passthrough, self._forward) 1195 line = self._stream.readline() 1196 while line: 1197 protocol.lineReceived(line) 1198 line = self._stream.readline() 1199 protocol.lostConnection() 1200 1201 1202class TestResultStats(testresult.TestResult): 1203 """A pyunit TestResult interface implementation for making statistics. 1204 1205 :ivar total_tests: The total tests seen. 1206 :ivar passed_tests: The tests that passed. 1207 :ivar failed_tests: The tests that failed. 1208 :ivar seen_tags: The tags seen across all tests. 1209 """ 1210 1211 def __init__(self, stream): 1212 """Create a TestResultStats which outputs to stream.""" 1213 testresult.TestResult.__init__(self) 1214 self._stream = stream 1215 self.failed_tests = 0 1216 self.skipped_tests = 0 1217 self.seen_tags = set() 1218 1219 @property 1220 def total_tests(self): 1221 return self.testsRun 1222 1223 def addError(self, test, err, details=None): 1224 self.failed_tests += 1 1225 1226 def addFailure(self, test, err, details=None): 1227 self.failed_tests += 1 1228 1229 def addSkip(self, test, reason, details=None): 1230 self.skipped_tests += 1 1231 1232 def formatStats(self): 1233 self._stream.write("Total tests: %5d\n" % self.total_tests) 1234 self._stream.write("Passed tests: %5d\n" % self.passed_tests) 1235 self._stream.write("Failed tests: %5d\n" % self.failed_tests) 1236 self._stream.write("Skipped tests: %5d\n" % self.skipped_tests) 1237 tags = sorted(self.seen_tags) 1238 self._stream.write("Seen tags: %s\n" % (", ".join(tags))) 1239 1240 @property 1241 def passed_tests(self): 1242 return self.total_tests - self.failed_tests - self.skipped_tests 1243 1244 def tags(self, new_tags, gone_tags): 1245 """Accumulate the seen tags.""" 1246 self.seen_tags.update(new_tags) 1247 1248 def wasSuccessful(self): 1249 """Tells whether or not this result was a success""" 1250 return self.failed_tests == 0 1251 1252 1253def read_test_list(path): 1254 """Read a list of test ids from a file on disk. 1255 1256 :param path: Path to the file 1257 :return: Sequence of test ids 1258 """ 1259 f = open(path, 'rb') 1260 try: 1261 return [l.rstrip("\n") for l in f.readlines()] 1262 finally: 1263 f.close() 1264 1265 1266def make_stream_binary(stream): 1267 """Ensure that a stream will be binary safe. See _make_binary_on_windows. 1268 1269 :return: A binary version of the same stream (some streams cannot be 1270 'fixed' but can be unwrapped). 1271 """ 1272 try: 1273 fileno = stream.fileno() 1274 except (_UnsupportedOperation, AttributeError): 1275 pass 1276 else: 1277 _make_binary_on_windows(fileno) 1278 return _unwrap_text(stream) 1279 1280 1281def _make_binary_on_windows(fileno): 1282 """Win32 mangles \r\n to \n and that breaks streams. See bug lp:505078.""" 1283 if sys.platform == "win32": 1284 import msvcrt 1285 msvcrt.setmode(fileno, os.O_BINARY) 1286 1287 1288def _unwrap_text(stream): 1289 """Unwrap stream if it is a text stream to get the original buffer.""" 1290 exceptions = (_UnsupportedOperation, IOError) 1291 if sys.version_info > (3, 0): 1292 unicode_type = str 1293 else: 1294 unicode_type = unicode 1295 exceptions += (ValueError,) 1296 try: 1297 # Read streams 1298 if type(stream.read(0)) is unicode_type: 1299 return stream.buffer 1300 except exceptions: 1301 # Cannot read from the stream: try via writes 1302 try: 1303 stream.write(_b('')) 1304 except TypeError: 1305 return stream.buffer 1306 return stream 1307