1from io import StringIO 2from pprint import pprint 3from typing import Any 4from typing import cast 5from typing import Dict 6from typing import Iterable 7from typing import Iterator 8from typing import List 9from typing import Optional 10from typing import Tuple 11from typing import TypeVar 12from typing import Union 13 14import attr 15import py 16 17from _pytest._code.code import ExceptionChainRepr 18from _pytest._code.code import ExceptionInfo 19from _pytest._code.code import ExceptionRepr 20from _pytest._code.code import ReprEntry 21from _pytest._code.code import ReprEntryNative 22from _pytest._code.code import ReprExceptionInfo 23from _pytest._code.code import ReprFileLocation 24from _pytest._code.code import ReprFuncArgs 25from _pytest._code.code import ReprLocals 26from _pytest._code.code import ReprTraceback 27from _pytest._code.code import TerminalRepr 28from _pytest._io import TerminalWriter 29from _pytest.compat import final 30from _pytest.compat import TYPE_CHECKING 31from _pytest.config import Config 32from _pytest.nodes import Collector 33from _pytest.nodes import Item 34from _pytest.outcomes import skip 35from _pytest.pathlib import Path 36 37if TYPE_CHECKING: 38 from typing import NoReturn 39 from typing_extensions import Type 40 from typing_extensions import Literal 41 42 from _pytest.runner import CallInfo 43 44 45def getworkerinfoline(node): 46 try: 47 return node._workerinfocache 48 except AttributeError: 49 d = node.workerinfo 50 ver = "%s.%s.%s" % d["version_info"][:3] 51 node._workerinfocache = s = "[{}] {} -- Python {} {}".format( 52 d["id"], d["sysplatform"], ver, d["executable"] 53 ) 54 return s 55 56 57_R = TypeVar("_R", bound="BaseReport") 58 59 60class BaseReport: 61 when = None # type: Optional[str] 62 location = None # type: Optional[Tuple[str, Optional[int], str]] 63 longrepr = ( 64 None 65 ) # type: Union[None, ExceptionInfo[BaseException], Tuple[str, int, str], str, TerminalRepr] 66 sections = [] # type: List[Tuple[str, str]] 67 nodeid = None # type: str 68 69 def __init__(self, **kw: Any) -> None: 70 self.__dict__.update(kw) 71 72 if TYPE_CHECKING: 73 # Can have arbitrary fields given to __init__(). 74 def __getattr__(self, key: str) -> Any: 75 ... 76 77 def toterminal(self, out: TerminalWriter) -> None: 78 if hasattr(self, "node"): 79 out.line(getworkerinfoline(self.node)) 80 81 longrepr = self.longrepr 82 if longrepr is None: 83 return 84 85 if hasattr(longrepr, "toterminal"): 86 longrepr_terminal = cast(TerminalRepr, longrepr) 87 longrepr_terminal.toterminal(out) 88 else: 89 try: 90 s = str(longrepr) 91 except UnicodeEncodeError: 92 s = "<unprintable longrepr>" 93 out.line(s) 94 95 def get_sections(self, prefix: str) -> Iterator[Tuple[str, str]]: 96 for name, content in self.sections: 97 if name.startswith(prefix): 98 yield prefix, content 99 100 @property 101 def longreprtext(self) -> str: 102 """Read-only property that returns the full string representation of 103 ``longrepr``. 104 105 .. versionadded:: 3.0 106 """ 107 file = StringIO() 108 tw = TerminalWriter(file) 109 tw.hasmarkup = False 110 self.toterminal(tw) 111 exc = file.getvalue() 112 return exc.strip() 113 114 @property 115 def caplog(self) -> str: 116 """Return captured log lines, if log capturing is enabled. 117 118 .. versionadded:: 3.5 119 """ 120 return "\n".join( 121 content for (prefix, content) in self.get_sections("Captured log") 122 ) 123 124 @property 125 def capstdout(self) -> str: 126 """Return captured text from stdout, if capturing is enabled. 127 128 .. versionadded:: 3.0 129 """ 130 return "".join( 131 content for (prefix, content) in self.get_sections("Captured stdout") 132 ) 133 134 @property 135 def capstderr(self) -> str: 136 """Return captured text from stderr, if capturing is enabled. 137 138 .. versionadded:: 3.0 139 """ 140 return "".join( 141 content for (prefix, content) in self.get_sections("Captured stderr") 142 ) 143 144 passed = property(lambda x: x.outcome == "passed") 145 failed = property(lambda x: x.outcome == "failed") 146 skipped = property(lambda x: x.outcome == "skipped") 147 148 @property 149 def fspath(self) -> str: 150 return self.nodeid.split("::")[0] 151 152 @property 153 def count_towards_summary(self) -> bool: 154 """**Experimental** Whether this report should be counted towards the 155 totals shown at the end of the test session: "1 passed, 1 failure, etc". 156 157 .. note:: 158 159 This function is considered **experimental**, so beware that it is subject to changes 160 even in patch releases. 161 """ 162 return True 163 164 @property 165 def head_line(self) -> Optional[str]: 166 """**Experimental** The head line shown with longrepr output for this 167 report, more commonly during traceback representation during 168 failures:: 169 170 ________ Test.foo ________ 171 172 173 In the example above, the head_line is "Test.foo". 174 175 .. note:: 176 177 This function is considered **experimental**, so beware that it is subject to changes 178 even in patch releases. 179 """ 180 if self.location is not None: 181 fspath, lineno, domain = self.location 182 return domain 183 return None 184 185 def _get_verbose_word(self, config: Config): 186 _category, _short, verbose = config.hook.pytest_report_teststatus( 187 report=self, config=config 188 ) 189 return verbose 190 191 def _to_json(self) -> Dict[str, Any]: 192 """Return the contents of this report as a dict of builtin entries, 193 suitable for serialization. 194 195 This was originally the serialize_report() function from xdist (ca03269). 196 197 Experimental method. 198 """ 199 return _report_to_json(self) 200 201 @classmethod 202 def _from_json(cls: "Type[_R]", reportdict: Dict[str, object]) -> _R: 203 """Create either a TestReport or CollectReport, depending on the calling class. 204 205 It is the callers responsibility to know which class to pass here. 206 207 This was originally the serialize_report() function from xdist (ca03269). 208 209 Experimental method. 210 """ 211 kwargs = _report_kwargs_from_json(reportdict) 212 return cls(**kwargs) 213 214 215def _report_unserialization_failure( 216 type_name: str, report_class: "Type[BaseReport]", reportdict 217) -> "NoReturn": 218 url = "https://github.com/pytest-dev/pytest/issues" 219 stream = StringIO() 220 pprint("-" * 100, stream=stream) 221 pprint("INTERNALERROR: Unknown entry type returned: %s" % type_name, stream=stream) 222 pprint("report_name: %s" % report_class, stream=stream) 223 pprint(reportdict, stream=stream) 224 pprint("Please report this bug at %s" % url, stream=stream) 225 pprint("-" * 100, stream=stream) 226 raise RuntimeError(stream.getvalue()) 227 228 229@final 230class TestReport(BaseReport): 231 """Basic test report object (also used for setup and teardown calls if 232 they fail).""" 233 234 __test__ = False 235 236 def __init__( 237 self, 238 nodeid: str, 239 location: Tuple[str, Optional[int], str], 240 keywords, 241 outcome: "Literal['passed', 'failed', 'skipped']", 242 longrepr: Union[ 243 None, ExceptionInfo[BaseException], Tuple[str, int, str], str, TerminalRepr 244 ], 245 when: "Literal['setup', 'call', 'teardown']", 246 sections: Iterable[Tuple[str, str]] = (), 247 duration: float = 0, 248 user_properties: Optional[Iterable[Tuple[str, object]]] = None, 249 **extra 250 ) -> None: 251 #: Normalized collection nodeid. 252 self.nodeid = nodeid 253 254 #: A (filesystempath, lineno, domaininfo) tuple indicating the 255 #: actual location of a test item - it might be different from the 256 #: collected one e.g. if a method is inherited from a different module. 257 self.location = location # type: Tuple[str, Optional[int], str] 258 259 #: A name -> value dictionary containing all keywords and 260 #: markers associated with a test invocation. 261 self.keywords = keywords 262 263 #: Test outcome, always one of "passed", "failed", "skipped". 264 self.outcome = outcome 265 266 #: None or a failure representation. 267 self.longrepr = longrepr 268 269 #: One of 'setup', 'call', 'teardown' to indicate runtest phase. 270 self.when = when 271 272 #: User properties is a list of tuples (name, value) that holds user 273 #: defined properties of the test. 274 self.user_properties = list(user_properties or []) 275 276 #: List of pairs ``(str, str)`` of extra information which needs to 277 #: marshallable. Used by pytest to add captured text 278 #: from ``stdout`` and ``stderr``, but may be used by other plugins 279 #: to add arbitrary information to reports. 280 self.sections = list(sections) 281 282 #: Time it took to run just the test. 283 self.duration = duration 284 285 self.__dict__.update(extra) 286 287 def __repr__(self) -> str: 288 return "<{} {!r} when={!r} outcome={!r}>".format( 289 self.__class__.__name__, self.nodeid, self.when, self.outcome 290 ) 291 292 @classmethod 293 def from_item_and_call(cls, item: Item, call: "CallInfo[None]") -> "TestReport": 294 """Create and fill a TestReport with standard item and call info.""" 295 when = call.when 296 # Remove "collect" from the Literal type -- only for collection calls. 297 assert when != "collect" 298 duration = call.duration 299 keywords = {x: 1 for x in item.keywords} 300 excinfo = call.excinfo 301 sections = [] 302 if not call.excinfo: 303 outcome = "passed" # type: Literal["passed", "failed", "skipped"] 304 longrepr = ( 305 None 306 ) # type: Union[None, ExceptionInfo[BaseException], Tuple[str, int, str], str, TerminalRepr] 307 else: 308 if not isinstance(excinfo, ExceptionInfo): 309 outcome = "failed" 310 longrepr = excinfo 311 elif isinstance(excinfo.value, skip.Exception): 312 outcome = "skipped" 313 r = excinfo._getreprcrash() 314 longrepr = (str(r.path), r.lineno, r.message) 315 else: 316 outcome = "failed" 317 if call.when == "call": 318 longrepr = item.repr_failure(excinfo) 319 else: # exception in setup or teardown 320 longrepr = item._repr_failure_py( 321 excinfo, style=item.config.getoption("tbstyle", "auto") 322 ) 323 for rwhen, key, content in item._report_sections: 324 sections.append(("Captured {} {}".format(key, rwhen), content)) 325 return cls( 326 item.nodeid, 327 item.location, 328 keywords, 329 outcome, 330 longrepr, 331 when, 332 sections, 333 duration, 334 user_properties=item.user_properties, 335 ) 336 337 338@final 339class CollectReport(BaseReport): 340 """Collection report object.""" 341 342 when = "collect" 343 344 def __init__( 345 self, 346 nodeid: str, 347 outcome: "Literal['passed', 'skipped', 'failed']", 348 longrepr, 349 result: Optional[List[Union[Item, Collector]]], 350 sections: Iterable[Tuple[str, str]] = (), 351 **extra 352 ) -> None: 353 #: Normalized collection nodeid. 354 self.nodeid = nodeid 355 356 #: Test outcome, always one of "passed", "failed", "skipped". 357 self.outcome = outcome 358 359 #: None or a failure representation. 360 self.longrepr = longrepr 361 362 #: The collected items and collection nodes. 363 self.result = result or [] 364 365 #: List of pairs ``(str, str)`` of extra information which needs to 366 #: marshallable. 367 # Used by pytest to add captured text : from ``stdout`` and ``stderr``, 368 # but may be used by other plugins : to add arbitrary information to 369 # reports. 370 self.sections = list(sections) 371 372 self.__dict__.update(extra) 373 374 @property 375 def location(self): 376 return (self.fspath, None, self.fspath) 377 378 def __repr__(self) -> str: 379 return "<CollectReport {!r} lenresult={} outcome={!r}>".format( 380 self.nodeid, len(self.result), self.outcome 381 ) 382 383 384class CollectErrorRepr(TerminalRepr): 385 def __init__(self, msg: str) -> None: 386 self.longrepr = msg 387 388 def toterminal(self, out: TerminalWriter) -> None: 389 out.line(self.longrepr, red=True) 390 391 392def pytest_report_to_serializable( 393 report: Union[CollectReport, TestReport] 394) -> Optional[Dict[str, Any]]: 395 if isinstance(report, (TestReport, CollectReport)): 396 data = report._to_json() 397 data["$report_type"] = report.__class__.__name__ 398 return data 399 # TODO: Check if this is actually reachable. 400 return None # type: ignore[unreachable] 401 402 403def pytest_report_from_serializable( 404 data: Dict[str, Any], 405) -> Optional[Union[CollectReport, TestReport]]: 406 if "$report_type" in data: 407 if data["$report_type"] == "TestReport": 408 return TestReport._from_json(data) 409 elif data["$report_type"] == "CollectReport": 410 return CollectReport._from_json(data) 411 assert False, "Unknown report_type unserialize data: {}".format( 412 data["$report_type"] 413 ) 414 return None 415 416 417def _report_to_json(report: BaseReport) -> Dict[str, Any]: 418 """Return the contents of this report as a dict of builtin entries, 419 suitable for serialization. 420 421 This was originally the serialize_report() function from xdist (ca03269). 422 """ 423 424 def serialize_repr_entry( 425 entry: Union[ReprEntry, ReprEntryNative] 426 ) -> Dict[str, Any]: 427 data = attr.asdict(entry) 428 for key, value in data.items(): 429 if hasattr(value, "__dict__"): 430 data[key] = attr.asdict(value) 431 entry_data = {"type": type(entry).__name__, "data": data} 432 return entry_data 433 434 def serialize_repr_traceback(reprtraceback: ReprTraceback) -> Dict[str, Any]: 435 result = attr.asdict(reprtraceback) 436 result["reprentries"] = [ 437 serialize_repr_entry(x) for x in reprtraceback.reprentries 438 ] 439 return result 440 441 def serialize_repr_crash( 442 reprcrash: Optional[ReprFileLocation], 443 ) -> Optional[Dict[str, Any]]: 444 if reprcrash is not None: 445 return attr.asdict(reprcrash) 446 else: 447 return None 448 449 def serialize_exception_longrepr(rep: BaseReport) -> Dict[str, Any]: 450 assert rep.longrepr is not None 451 # TODO: Investigate whether the duck typing is really necessary here. 452 longrepr = cast(ExceptionRepr, rep.longrepr) 453 result = { 454 "reprcrash": serialize_repr_crash(longrepr.reprcrash), 455 "reprtraceback": serialize_repr_traceback(longrepr.reprtraceback), 456 "sections": longrepr.sections, 457 } # type: Dict[str, Any] 458 if isinstance(longrepr, ExceptionChainRepr): 459 result["chain"] = [] 460 for repr_traceback, repr_crash, description in longrepr.chain: 461 result["chain"].append( 462 ( 463 serialize_repr_traceback(repr_traceback), 464 serialize_repr_crash(repr_crash), 465 description, 466 ) 467 ) 468 else: 469 result["chain"] = None 470 return result 471 472 d = report.__dict__.copy() 473 if hasattr(report.longrepr, "toterminal"): 474 if hasattr(report.longrepr, "reprtraceback") and hasattr( 475 report.longrepr, "reprcrash" 476 ): 477 d["longrepr"] = serialize_exception_longrepr(report) 478 else: 479 d["longrepr"] = str(report.longrepr) 480 else: 481 d["longrepr"] = report.longrepr 482 for name in d: 483 if isinstance(d[name], (py.path.local, Path)): 484 d[name] = str(d[name]) 485 elif name == "result": 486 d[name] = None # for now 487 return d 488 489 490def _report_kwargs_from_json(reportdict: Dict[str, Any]) -> Dict[str, Any]: 491 """Return **kwargs that can be used to construct a TestReport or 492 CollectReport instance. 493 494 This was originally the serialize_report() function from xdist (ca03269). 495 """ 496 497 def deserialize_repr_entry(entry_data): 498 data = entry_data["data"] 499 entry_type = entry_data["type"] 500 if entry_type == "ReprEntry": 501 reprfuncargs = None 502 reprfileloc = None 503 reprlocals = None 504 if data["reprfuncargs"]: 505 reprfuncargs = ReprFuncArgs(**data["reprfuncargs"]) 506 if data["reprfileloc"]: 507 reprfileloc = ReprFileLocation(**data["reprfileloc"]) 508 if data["reprlocals"]: 509 reprlocals = ReprLocals(data["reprlocals"]["lines"]) 510 511 reprentry = ReprEntry( 512 lines=data["lines"], 513 reprfuncargs=reprfuncargs, 514 reprlocals=reprlocals, 515 reprfileloc=reprfileloc, 516 style=data["style"], 517 ) # type: Union[ReprEntry, ReprEntryNative] 518 elif entry_type == "ReprEntryNative": 519 reprentry = ReprEntryNative(data["lines"]) 520 else: 521 _report_unserialization_failure(entry_type, TestReport, reportdict) 522 return reprentry 523 524 def deserialize_repr_traceback(repr_traceback_dict): 525 repr_traceback_dict["reprentries"] = [ 526 deserialize_repr_entry(x) for x in repr_traceback_dict["reprentries"] 527 ] 528 return ReprTraceback(**repr_traceback_dict) 529 530 def deserialize_repr_crash(repr_crash_dict: Optional[Dict[str, Any]]): 531 if repr_crash_dict is not None: 532 return ReprFileLocation(**repr_crash_dict) 533 else: 534 return None 535 536 if ( 537 reportdict["longrepr"] 538 and "reprcrash" in reportdict["longrepr"] 539 and "reprtraceback" in reportdict["longrepr"] 540 ): 541 542 reprtraceback = deserialize_repr_traceback( 543 reportdict["longrepr"]["reprtraceback"] 544 ) 545 reprcrash = deserialize_repr_crash(reportdict["longrepr"]["reprcrash"]) 546 if reportdict["longrepr"]["chain"]: 547 chain = [] 548 for repr_traceback_data, repr_crash_data, description in reportdict[ 549 "longrepr" 550 ]["chain"]: 551 chain.append( 552 ( 553 deserialize_repr_traceback(repr_traceback_data), 554 deserialize_repr_crash(repr_crash_data), 555 description, 556 ) 557 ) 558 exception_info = ExceptionChainRepr( 559 chain 560 ) # type: Union[ExceptionChainRepr,ReprExceptionInfo] 561 else: 562 exception_info = ReprExceptionInfo(reprtraceback, reprcrash) 563 564 for section in reportdict["longrepr"]["sections"]: 565 exception_info.addsection(*section) 566 reportdict["longrepr"] = exception_info 567 568 return reportdict 569