1from io import StringIO
2from pprint import pprint
3from typing import Any
4from typing import cast
5from typing import Dict
6from typing import Iterable
7from typing import Iterator
8from typing import List
9from typing import Optional
10from typing import Tuple
11from typing import TypeVar
12from typing import Union
13
14import attr
15import py
16
17from _pytest._code.code import ExceptionChainRepr
18from _pytest._code.code import ExceptionInfo
19from _pytest._code.code import ExceptionRepr
20from _pytest._code.code import ReprEntry
21from _pytest._code.code import ReprEntryNative
22from _pytest._code.code import ReprExceptionInfo
23from _pytest._code.code import ReprFileLocation
24from _pytest._code.code import ReprFuncArgs
25from _pytest._code.code import ReprLocals
26from _pytest._code.code import ReprTraceback
27from _pytest._code.code import TerminalRepr
28from _pytest._io import TerminalWriter
29from _pytest.compat import final
30from _pytest.compat import TYPE_CHECKING
31from _pytest.config import Config
32from _pytest.nodes import Collector
33from _pytest.nodes import Item
34from _pytest.outcomes import skip
35from _pytest.pathlib import Path
36
37if TYPE_CHECKING:
38    from typing import NoReturn
39    from typing_extensions import Type
40    from typing_extensions import Literal
41
42    from _pytest.runner import CallInfo
43
44
45def getworkerinfoline(node):
46    try:
47        return node._workerinfocache
48    except AttributeError:
49        d = node.workerinfo
50        ver = "%s.%s.%s" % d["version_info"][:3]
51        node._workerinfocache = s = "[{}] {} -- Python {} {}".format(
52            d["id"], d["sysplatform"], ver, d["executable"]
53        )
54        return s
55
56
57_R = TypeVar("_R", bound="BaseReport")
58
59
60class BaseReport:
61    when = None  # type: Optional[str]
62    location = None  # type: Optional[Tuple[str, Optional[int], str]]
63    longrepr = (
64        None
65    )  # type: Union[None, ExceptionInfo[BaseException], Tuple[str, int, str], str, TerminalRepr]
66    sections = []  # type: List[Tuple[str, str]]
67    nodeid = None  # type: str
68
69    def __init__(self, **kw: Any) -> None:
70        self.__dict__.update(kw)
71
72    if TYPE_CHECKING:
73        # Can have arbitrary fields given to __init__().
74        def __getattr__(self, key: str) -> Any:
75            ...
76
77    def toterminal(self, out: TerminalWriter) -> None:
78        if hasattr(self, "node"):
79            out.line(getworkerinfoline(self.node))
80
81        longrepr = self.longrepr
82        if longrepr is None:
83            return
84
85        if hasattr(longrepr, "toterminal"):
86            longrepr_terminal = cast(TerminalRepr, longrepr)
87            longrepr_terminal.toterminal(out)
88        else:
89            try:
90                s = str(longrepr)
91            except UnicodeEncodeError:
92                s = "<unprintable longrepr>"
93            out.line(s)
94
95    def get_sections(self, prefix: str) -> Iterator[Tuple[str, str]]:
96        for name, content in self.sections:
97            if name.startswith(prefix):
98                yield prefix, content
99
100    @property
101    def longreprtext(self) -> str:
102        """Read-only property that returns the full string representation of
103        ``longrepr``.
104
105        .. versionadded:: 3.0
106        """
107        file = StringIO()
108        tw = TerminalWriter(file)
109        tw.hasmarkup = False
110        self.toterminal(tw)
111        exc = file.getvalue()
112        return exc.strip()
113
114    @property
115    def caplog(self) -> str:
116        """Return captured log lines, if log capturing is enabled.
117
118        .. versionadded:: 3.5
119        """
120        return "\n".join(
121            content for (prefix, content) in self.get_sections("Captured log")
122        )
123
124    @property
125    def capstdout(self) -> str:
126        """Return captured text from stdout, if capturing is enabled.
127
128        .. versionadded:: 3.0
129        """
130        return "".join(
131            content for (prefix, content) in self.get_sections("Captured stdout")
132        )
133
134    @property
135    def capstderr(self) -> str:
136        """Return captured text from stderr, if capturing is enabled.
137
138        .. versionadded:: 3.0
139        """
140        return "".join(
141            content for (prefix, content) in self.get_sections("Captured stderr")
142        )
143
144    passed = property(lambda x: x.outcome == "passed")
145    failed = property(lambda x: x.outcome == "failed")
146    skipped = property(lambda x: x.outcome == "skipped")
147
148    @property
149    def fspath(self) -> str:
150        return self.nodeid.split("::")[0]
151
152    @property
153    def count_towards_summary(self) -> bool:
154        """**Experimental** Whether this report should be counted towards the
155        totals shown at the end of the test session: "1 passed, 1 failure, etc".
156
157        .. note::
158
159            This function is considered **experimental**, so beware that it is subject to changes
160            even in patch releases.
161        """
162        return True
163
164    @property
165    def head_line(self) -> Optional[str]:
166        """**Experimental** The head line shown with longrepr output for this
167        report, more commonly during traceback representation during
168        failures::
169
170            ________ Test.foo ________
171
172
173        In the example above, the head_line is "Test.foo".
174
175        .. note::
176
177            This function is considered **experimental**, so beware that it is subject to changes
178            even in patch releases.
179        """
180        if self.location is not None:
181            fspath, lineno, domain = self.location
182            return domain
183        return None
184
185    def _get_verbose_word(self, config: Config):
186        _category, _short, verbose = config.hook.pytest_report_teststatus(
187            report=self, config=config
188        )
189        return verbose
190
191    def _to_json(self) -> Dict[str, Any]:
192        """Return the contents of this report as a dict of builtin entries,
193        suitable for serialization.
194
195        This was originally the serialize_report() function from xdist (ca03269).
196
197        Experimental method.
198        """
199        return _report_to_json(self)
200
201    @classmethod
202    def _from_json(cls: "Type[_R]", reportdict: Dict[str, object]) -> _R:
203        """Create either a TestReport or CollectReport, depending on the calling class.
204
205        It is the callers responsibility to know which class to pass here.
206
207        This was originally the serialize_report() function from xdist (ca03269).
208
209        Experimental method.
210        """
211        kwargs = _report_kwargs_from_json(reportdict)
212        return cls(**kwargs)
213
214
215def _report_unserialization_failure(
216    type_name: str, report_class: "Type[BaseReport]", reportdict
217) -> "NoReturn":
218    url = "https://github.com/pytest-dev/pytest/issues"
219    stream = StringIO()
220    pprint("-" * 100, stream=stream)
221    pprint("INTERNALERROR: Unknown entry type returned: %s" % type_name, stream=stream)
222    pprint("report_name: %s" % report_class, stream=stream)
223    pprint(reportdict, stream=stream)
224    pprint("Please report this bug at %s" % url, stream=stream)
225    pprint("-" * 100, stream=stream)
226    raise RuntimeError(stream.getvalue())
227
228
229@final
230class TestReport(BaseReport):
231    """Basic test report object (also used for setup and teardown calls if
232    they fail)."""
233
234    __test__ = False
235
236    def __init__(
237        self,
238        nodeid: str,
239        location: Tuple[str, Optional[int], str],
240        keywords,
241        outcome: "Literal['passed', 'failed', 'skipped']",
242        longrepr: Union[
243            None, ExceptionInfo[BaseException], Tuple[str, int, str], str, TerminalRepr
244        ],
245        when: "Literal['setup', 'call', 'teardown']",
246        sections: Iterable[Tuple[str, str]] = (),
247        duration: float = 0,
248        user_properties: Optional[Iterable[Tuple[str, object]]] = None,
249        **extra
250    ) -> None:
251        #: Normalized collection nodeid.
252        self.nodeid = nodeid
253
254        #: A (filesystempath, lineno, domaininfo) tuple indicating the
255        #: actual location of a test item - it might be different from the
256        #: collected one e.g. if a method is inherited from a different module.
257        self.location = location  # type: Tuple[str, Optional[int], str]
258
259        #: A name -> value dictionary containing all keywords and
260        #: markers associated with a test invocation.
261        self.keywords = keywords
262
263        #: Test outcome, always one of "passed", "failed", "skipped".
264        self.outcome = outcome
265
266        #: None or a failure representation.
267        self.longrepr = longrepr
268
269        #: One of 'setup', 'call', 'teardown' to indicate runtest phase.
270        self.when = when
271
272        #: User properties is a list of tuples (name, value) that holds user
273        #: defined properties of the test.
274        self.user_properties = list(user_properties or [])
275
276        #: List of pairs ``(str, str)`` of extra information which needs to
277        #: marshallable. Used by pytest to add captured text
278        #: from ``stdout`` and ``stderr``, but may be used by other plugins
279        #: to add arbitrary information to reports.
280        self.sections = list(sections)
281
282        #: Time it took to run just the test.
283        self.duration = duration
284
285        self.__dict__.update(extra)
286
287    def __repr__(self) -> str:
288        return "<{} {!r} when={!r} outcome={!r}>".format(
289            self.__class__.__name__, self.nodeid, self.when, self.outcome
290        )
291
292    @classmethod
293    def from_item_and_call(cls, item: Item, call: "CallInfo[None]") -> "TestReport":
294        """Create and fill a TestReport with standard item and call info."""
295        when = call.when
296        # Remove "collect" from the Literal type -- only for collection calls.
297        assert when != "collect"
298        duration = call.duration
299        keywords = {x: 1 for x in item.keywords}
300        excinfo = call.excinfo
301        sections = []
302        if not call.excinfo:
303            outcome = "passed"  # type: Literal["passed", "failed", "skipped"]
304            longrepr = (
305                None
306            )  # type: Union[None, ExceptionInfo[BaseException], Tuple[str, int, str], str, TerminalRepr]
307        else:
308            if not isinstance(excinfo, ExceptionInfo):
309                outcome = "failed"
310                longrepr = excinfo
311            elif isinstance(excinfo.value, skip.Exception):
312                outcome = "skipped"
313                r = excinfo._getreprcrash()
314                longrepr = (str(r.path), r.lineno, r.message)
315            else:
316                outcome = "failed"
317                if call.when == "call":
318                    longrepr = item.repr_failure(excinfo)
319                else:  # exception in setup or teardown
320                    longrepr = item._repr_failure_py(
321                        excinfo, style=item.config.getoption("tbstyle", "auto")
322                    )
323        for rwhen, key, content in item._report_sections:
324            sections.append(("Captured {} {}".format(key, rwhen), content))
325        return cls(
326            item.nodeid,
327            item.location,
328            keywords,
329            outcome,
330            longrepr,
331            when,
332            sections,
333            duration,
334            user_properties=item.user_properties,
335        )
336
337
338@final
339class CollectReport(BaseReport):
340    """Collection report object."""
341
342    when = "collect"
343
344    def __init__(
345        self,
346        nodeid: str,
347        outcome: "Literal['passed', 'skipped', 'failed']",
348        longrepr,
349        result: Optional[List[Union[Item, Collector]]],
350        sections: Iterable[Tuple[str, str]] = (),
351        **extra
352    ) -> None:
353        #: Normalized collection nodeid.
354        self.nodeid = nodeid
355
356        #: Test outcome, always one of "passed", "failed", "skipped".
357        self.outcome = outcome
358
359        #: None or a failure representation.
360        self.longrepr = longrepr
361
362        #: The collected items and collection nodes.
363        self.result = result or []
364
365        #: List of pairs ``(str, str)`` of extra information which needs to
366        #: marshallable.
367        # Used by pytest to add captured text : from ``stdout`` and ``stderr``,
368        # but may be used by other plugins : to add arbitrary information to
369        # reports.
370        self.sections = list(sections)
371
372        self.__dict__.update(extra)
373
374    @property
375    def location(self):
376        return (self.fspath, None, self.fspath)
377
378    def __repr__(self) -> str:
379        return "<CollectReport {!r} lenresult={} outcome={!r}>".format(
380            self.nodeid, len(self.result), self.outcome
381        )
382
383
384class CollectErrorRepr(TerminalRepr):
385    def __init__(self, msg: str) -> None:
386        self.longrepr = msg
387
388    def toterminal(self, out: TerminalWriter) -> None:
389        out.line(self.longrepr, red=True)
390
391
392def pytest_report_to_serializable(
393    report: Union[CollectReport, TestReport]
394) -> Optional[Dict[str, Any]]:
395    if isinstance(report, (TestReport, CollectReport)):
396        data = report._to_json()
397        data["$report_type"] = report.__class__.__name__
398        return data
399    # TODO: Check if this is actually reachable.
400    return None  # type: ignore[unreachable]
401
402
403def pytest_report_from_serializable(
404    data: Dict[str, Any],
405) -> Optional[Union[CollectReport, TestReport]]:
406    if "$report_type" in data:
407        if data["$report_type"] == "TestReport":
408            return TestReport._from_json(data)
409        elif data["$report_type"] == "CollectReport":
410            return CollectReport._from_json(data)
411        assert False, "Unknown report_type unserialize data: {}".format(
412            data["$report_type"]
413        )
414    return None
415
416
417def _report_to_json(report: BaseReport) -> Dict[str, Any]:
418    """Return the contents of this report as a dict of builtin entries,
419    suitable for serialization.
420
421    This was originally the serialize_report() function from xdist (ca03269).
422    """
423
424    def serialize_repr_entry(
425        entry: Union[ReprEntry, ReprEntryNative]
426    ) -> Dict[str, Any]:
427        data = attr.asdict(entry)
428        for key, value in data.items():
429            if hasattr(value, "__dict__"):
430                data[key] = attr.asdict(value)
431        entry_data = {"type": type(entry).__name__, "data": data}
432        return entry_data
433
434    def serialize_repr_traceback(reprtraceback: ReprTraceback) -> Dict[str, Any]:
435        result = attr.asdict(reprtraceback)
436        result["reprentries"] = [
437            serialize_repr_entry(x) for x in reprtraceback.reprentries
438        ]
439        return result
440
441    def serialize_repr_crash(
442        reprcrash: Optional[ReprFileLocation],
443    ) -> Optional[Dict[str, Any]]:
444        if reprcrash is not None:
445            return attr.asdict(reprcrash)
446        else:
447            return None
448
449    def serialize_exception_longrepr(rep: BaseReport) -> Dict[str, Any]:
450        assert rep.longrepr is not None
451        # TODO: Investigate whether the duck typing is really necessary here.
452        longrepr = cast(ExceptionRepr, rep.longrepr)
453        result = {
454            "reprcrash": serialize_repr_crash(longrepr.reprcrash),
455            "reprtraceback": serialize_repr_traceback(longrepr.reprtraceback),
456            "sections": longrepr.sections,
457        }  # type: Dict[str, Any]
458        if isinstance(longrepr, ExceptionChainRepr):
459            result["chain"] = []
460            for repr_traceback, repr_crash, description in longrepr.chain:
461                result["chain"].append(
462                    (
463                        serialize_repr_traceback(repr_traceback),
464                        serialize_repr_crash(repr_crash),
465                        description,
466                    )
467                )
468        else:
469            result["chain"] = None
470        return result
471
472    d = report.__dict__.copy()
473    if hasattr(report.longrepr, "toterminal"):
474        if hasattr(report.longrepr, "reprtraceback") and hasattr(
475            report.longrepr, "reprcrash"
476        ):
477            d["longrepr"] = serialize_exception_longrepr(report)
478        else:
479            d["longrepr"] = str(report.longrepr)
480    else:
481        d["longrepr"] = report.longrepr
482    for name in d:
483        if isinstance(d[name], (py.path.local, Path)):
484            d[name] = str(d[name])
485        elif name == "result":
486            d[name] = None  # for now
487    return d
488
489
490def _report_kwargs_from_json(reportdict: Dict[str, Any]) -> Dict[str, Any]:
491    """Return **kwargs that can be used to construct a TestReport or
492    CollectReport instance.
493
494    This was originally the serialize_report() function from xdist (ca03269).
495    """
496
497    def deserialize_repr_entry(entry_data):
498        data = entry_data["data"]
499        entry_type = entry_data["type"]
500        if entry_type == "ReprEntry":
501            reprfuncargs = None
502            reprfileloc = None
503            reprlocals = None
504            if data["reprfuncargs"]:
505                reprfuncargs = ReprFuncArgs(**data["reprfuncargs"])
506            if data["reprfileloc"]:
507                reprfileloc = ReprFileLocation(**data["reprfileloc"])
508            if data["reprlocals"]:
509                reprlocals = ReprLocals(data["reprlocals"]["lines"])
510
511            reprentry = ReprEntry(
512                lines=data["lines"],
513                reprfuncargs=reprfuncargs,
514                reprlocals=reprlocals,
515                reprfileloc=reprfileloc,
516                style=data["style"],
517            )  # type: Union[ReprEntry, ReprEntryNative]
518        elif entry_type == "ReprEntryNative":
519            reprentry = ReprEntryNative(data["lines"])
520        else:
521            _report_unserialization_failure(entry_type, TestReport, reportdict)
522        return reprentry
523
524    def deserialize_repr_traceback(repr_traceback_dict):
525        repr_traceback_dict["reprentries"] = [
526            deserialize_repr_entry(x) for x in repr_traceback_dict["reprentries"]
527        ]
528        return ReprTraceback(**repr_traceback_dict)
529
530    def deserialize_repr_crash(repr_crash_dict: Optional[Dict[str, Any]]):
531        if repr_crash_dict is not None:
532            return ReprFileLocation(**repr_crash_dict)
533        else:
534            return None
535
536    if (
537        reportdict["longrepr"]
538        and "reprcrash" in reportdict["longrepr"]
539        and "reprtraceback" in reportdict["longrepr"]
540    ):
541
542        reprtraceback = deserialize_repr_traceback(
543            reportdict["longrepr"]["reprtraceback"]
544        )
545        reprcrash = deserialize_repr_crash(reportdict["longrepr"]["reprcrash"])
546        if reportdict["longrepr"]["chain"]:
547            chain = []
548            for repr_traceback_data, repr_crash_data, description in reportdict[
549                "longrepr"
550            ]["chain"]:
551                chain.append(
552                    (
553                        deserialize_repr_traceback(repr_traceback_data),
554                        deserialize_repr_crash(repr_crash_data),
555                        description,
556                    )
557                )
558            exception_info = ExceptionChainRepr(
559                chain
560            )  # type: Union[ExceptionChainRepr,ReprExceptionInfo]
561        else:
562            exception_info = ReprExceptionInfo(reprtraceback, reprcrash)
563
564        for section in reportdict["longrepr"]["sections"]:
565            exception_info.addsection(*section)
566        reportdict["longrepr"] = exception_info
567
568    return reportdict
569