1# -*- coding: UTF-8 -*-
2# pylint: disable=line-too-long
3"""
4This module provides a reporter with JUnit XML output.
5
6Mapping of behave model elements to XML elements::
7
8    feature     -> xml_element:testsuite
9    scenario    -> xml_element:testcase
10
11XML document structure::
12
13    # -- XML elements:
14    # CARDINALITY SUFFIX:
15    #   ?   optional (zero or one)
16    #   *   many0 (zero or more)
17    #   +   many (one or more)
18    testsuites := sequence<testsuite>
19    testsuite:
20        properties? : sequence<property>
21        testcase* :
22            error?      : text
23            failure?    : text
24            system-out  : text
25            system-err  : text
26
27    testsuite:
28        @name       : TokenString
29        @tests      : int
30        @failures   : int
31        @errors     : int
32        @skipped    : int
33        @time       : Decimal       # Duration in seconds
34        # -- SINCE: behave-1.2.6
35        @timestamp  : IsoDateTime
36        @hostname   : string
37
38    testcase:
39        @name       : TokenString
40        @classname  : TokenString
41        @status     : string        # Status enum
42        @time       : Decimal       # Elapsed seconds
43
44    error:
45        @message    : string
46        @type       : string
47
48    failure:
49        @message    : string
50        @type       : string
51
52    # -- HINT: Not used
53    property:
54        @name  : TokenString
55        @value : string
56
57    type Status : Enum("passed", "failed", "skipped", "untested")
58
59Note that a spec for JUnit XML output was not clearly defined.
60Best sources are:
61
62* `JUnit XML`_ (for PDF)
63* JUnit XML (`ant spec 1`_, `ant spec 2`_)
64
65
66.. _`JUnit XML`:  http://junitpdfreport.sourceforge.net/managedcontent/PdfTranslation
67.. _`ant spec 1`: https://github.com/windyroad/JUnit-Schema
68.. _`ant spec 2`: http://svn.apache.org/repos/asf/ant/core/trunk/src/main/org/apache/tools/ant/taskdefs/optional/junit/XMLJUnitResultFormatter.java
69"""
70# pylint: enable=line-too-long
71
72from __future__ import absolute_import
73import os.path
74import codecs
75from xml.etree import ElementTree
76from datetime import datetime
77from behave.reporter.base import Reporter
78from behave.model import Scenario, ScenarioOutline, Step
79from behave.model_core import Status
80from behave.formatter import ansi_escapes
81from behave.model_describe import ModelDescriptor
82from behave.textutil import indent, make_indentation, text as _text
83from behave.userdata import UserDataNamespace
84import six
85if six.PY2:
86    # -- USE: Python3 backport for better unicode compatibility.
87    import traceback2 as traceback
88else:
89    import traceback
90
91
92def CDATA(text=None):   # pylint: disable=invalid-name
93    # -- issue #70: remove_ansi_escapes(text)
94    element = ElementTree.Element('![CDATA[')
95    element.text = ansi_escapes.strip_escapes(text)
96    return element
97
98
99class ElementTreeWithCDATA(ElementTree.ElementTree):
100    # pylint: disable=redefined-builtin, no-member
101    def _write(self, file, node, encoding, namespaces):
102        """This method is for ElementTree <= 1.2.6"""
103
104        if node.tag == '![CDATA[':
105            text = node.text.encode(encoding)
106            file.write("\n<![CDATA[%s]]>\n" % text)
107        else:
108            ElementTree.ElementTree._write(self, file, node, encoding,
109                                           namespaces)
110
111if hasattr(ElementTree, '_serialize'):
112    # pylint: disable=protected-access
113    def _serialize_xml2(write, elem, encoding, qnames, namespaces,
114                        orig=ElementTree._serialize_xml):
115        if elem.tag == '![CDATA[':
116            write("\n<%s%s]]>\n" % \
117                  (elem.tag, elem.text.encode(encoding, "xmlcharrefreplace")))
118            return
119        return orig(write, elem, encoding, qnames, namespaces)
120
121    def _serialize_xml3(write, elem, qnames, namespaces,
122                        short_empty_elements=None,
123                        orig=ElementTree._serialize_xml):
124        if elem.tag == '![CDATA[':
125            write("\n<{tag}{text}]]>\n".format(
126                tag=elem.tag, text=elem.text))
127            return
128        if short_empty_elements:
129            # python >=3.3
130            return orig(write, elem, qnames, namespaces, short_empty_elements)
131        else:
132            # python <3.3
133            return orig(write, elem, qnames, namespaces)
134
135    if six.PY3:
136        ElementTree._serialize_xml = \
137            ElementTree._serialize['xml'] = _serialize_xml3
138    elif six.PY2:
139        ElementTree._serialize_xml = \
140            ElementTree._serialize['xml'] = _serialize_xml2
141
142
143class FeatureReportData(object):
144    """
145    Provides value object to collect JUnit report data from a Feature.
146    """
147    def __init__(self, feature, filename, classname=None):
148        if not classname and filename:
149            classname = filename.replace('/', '.')
150        self.feature = feature
151        self.filename = filename
152        self.classname = classname
153        self.testcases = []
154        self.counts_tests = 0
155        self.counts_errors = 0
156        self.counts_failed = 0
157        self.counts_skipped = 0
158
159    def reset(self):
160        self.testcases = []
161        self.counts_tests = 0
162        self.counts_errors = 0
163        self.counts_failed = 0
164        self.counts_skipped = 0
165
166
167class JUnitReporter(Reporter):
168    """Generates JUnit-like XML test report for behave.
169    """
170    # -- XML REPORT:
171    userdata_scope = "behave.reporter.junit"
172    show_timings = True     # -- Show step timings.
173    show_skipped_always = False
174    show_timestamp = True
175    show_hostname = True
176    # -- XML REPORT PART: Describe scenarios
177    show_scenarios = True   # Show scenario descriptions.
178    show_tags = True
179    show_multiline = True
180
181    def __init__(self, config):
182        super(JUnitReporter, self).__init__(config)
183        self.setup_with_userdata(config.userdata)
184
185    def setup_with_userdata(self, userdata):
186        """Setup JUnit reporter with userdata information.
187        A user can now tweak the output format of this reporter.
188
189        EXAMPLE:
190        .. code-block:: ini
191
192            # -- FILE: behave.ini
193            [behave.userdata]
194            behave.reporter.junit.show_hostname = false
195        """
196        # -- EXPERIMENTAL:
197        config = UserDataNamespace(self.userdata_scope, userdata)
198        self.show_hostname = config.getbool("show_hostname", self.show_hostname)
199        self.show_multiline = config.getbool("show_multiline", self.show_multiline)
200        self.show_scenarios = config.getbool("show_scenarios", self.show_scenarios)
201        self.show_tags = config.getbool("show_tags", self.show_tags)
202        self.show_timings = config.getbool("show_timings", self.show_timings)
203        self.show_timestamp = config.getbool("show_timestamp", self.show_timestamp)
204        self.show_skipped_always = config.getbool("show_skipped_always",
205                                              self.show_skipped_always)
206
207    def make_feature_filename(self, feature):
208        filename = None
209        for path in self.config.paths:
210            if feature.filename.startswith(path):
211                filename = feature.filename[len(path) + 1:]
212                break
213        if not filename:
214            # -- NOTE: Directory path (subdirs) are taken into account.
215            filename = feature.location.relpath(self.config.base_dir)
216        filename = filename.rsplit('.', 1)[0]
217        filename = filename.replace('\\', '/').replace('/', '.')
218        return _text(filename)
219
220    @property
221    def show_skipped(self):
222        return self.config.show_skipped or self.show_skipped_always
223
224    # -- REPORTER-API:
225    def feature(self, feature):
226        if feature.status == Status.skipped and not self.show_skipped:
227            # -- SKIP-OUTPUT: If skipped features should not be shown.
228            return
229
230        feature_filename = self.make_feature_filename(feature)
231        classname = feature_filename
232        report = FeatureReportData(feature, feature_filename)
233        now = datetime.now()
234
235        suite = ElementTree.Element(u'testsuite')
236        feature_name = feature.name or feature_filename
237        suite.set(u'name', u'%s.%s' % (classname, feature_name))
238
239        # -- BUILD-TESTCASES: From scenarios
240        for scenario in feature:
241            if isinstance(scenario, ScenarioOutline):
242                scenario_outline = scenario
243                self._process_scenario_outline(scenario_outline, report)
244            else:
245                self._process_scenario(scenario, report)
246
247        # -- ADD TESTCASES to testsuite:
248        for testcase in report.testcases:
249            suite.append(testcase)
250
251        suite.set(u'tests', _text(report.counts_tests))
252        suite.set(u'errors', _text(report.counts_errors))
253        suite.set(u'failures', _text(report.counts_failed))
254        suite.set(u'skipped', _text(report.counts_skipped))  # WAS: skips
255        suite.set(u'time', _text(round(feature.duration, 6)))
256        # -- SINCE: behave-1.2.6.dev0
257        if self.show_timestamp:
258            suite.set(u'timestamp', _text(now.isoformat()))
259        if self.show_hostname:
260            suite.set(u'hostname', _text(gethostname()))
261
262        if not os.path.exists(self.config.junit_directory):
263            # -- ENSURE: Create multiple directory levels at once.
264            os.makedirs(self.config.junit_directory)
265
266        tree = ElementTreeWithCDATA(suite)
267        report_dirname = self.config.junit_directory
268        report_basename = u'TESTS-%s.xml' % feature_filename
269        report_filename = os.path.join(report_dirname, report_basename)
270        tree.write(codecs.open(report_filename, "wb"), "UTF-8")
271
272    # -- MORE:
273    # pylint: disable=line-too-long
274    @staticmethod
275    def select_step_with_status(status, steps):
276        """Helper function to find the first step that has the given
277        step.status.
278
279        EXAMPLE: Search for a failing step in a scenario (all steps).
280            >>> scenario = ...
281            >>> failed_step = select_step_with_status(Status.failed, scenario)
282            >>> failed_step = select_step_with_status(Status.failed, scenario.all_steps)
283            >>> assert failed_step.status == Status.failed
284
285        EXAMPLE: Search only scenario steps, skip background steps.
286            >>> failed_step = select_step_with_status(Status.failed, scenario.steps)
287
288        :param status:  Step status to search for (as enum value).
289        :param steps:   List of steps to search in (or scenario).
290        :returns: Step object, if found.
291        :returns: None, otherwise.
292
293        .. versionchanged:: 1.2.6
294            status: Use enum value instead of string (or string).
295        """
296        for step in steps:
297            assert isinstance(step, Step), \
298                "TYPE-MISMATCH: step.class=%s"  % step.__class__.__name__
299            if step.status == status:
300                return step
301        # -- OTHERWISE: No step with the given status found.
302        # KeyError("Step with status={0} not found".format(status))
303        return None
304    # pylint: enable=line-too-long
305
306    def describe_step(self, step):
307        status_text = _text(step.status.name)
308        if self.show_timings:
309            status_text += u" in %0.3fs" % step.duration
310        text = u'%s %s ... ' % (step.keyword, step.name)
311        text += u'%s\n' % status_text
312        if self.show_multiline:
313            prefix = make_indentation(2)
314            if step.text:
315                text += ModelDescriptor.describe_docstring(step.text, prefix)
316            elif step.table:
317                text += ModelDescriptor.describe_table(step.table, prefix)
318        return text
319
320    @classmethod
321    def describe_tags(cls, tags):
322        text = u''
323        if tags:
324            text = u'@'+ u' @'.join(tags)
325        return text
326
327    def describe_scenario(self, scenario):
328        """Describe the scenario and the test status.
329        NOTE: table, multiline text is missing in description.
330
331        :param scenario:  Scenario that was tested.
332        :return: Textual description of the scenario.
333        """
334        header_line = u'\n@scenario.begin\n'
335        if self.show_tags and scenario.tags:
336            header_line += u'\n  %s\n' % self.describe_tags(scenario.tags)
337        header_line += u'  %s: %s\n' % (scenario.keyword, scenario.name)
338        footer_line = u'\n@scenario.end\n' + u'-' * 80 + '\n'
339        text = u''
340        for step in scenario:
341            text += self.describe_step(step)
342        step_indentation = make_indentation(4)
343        return header_line + indent(text, step_indentation) + footer_line
344
345    def _process_scenario(self, scenario, report):
346        """Process a scenario and append information to JUnit report object.
347        This corresponds to a JUnit testcase:
348
349          * testcase.@classname = f(filename) +'.'+ feature.name
350          * testcase.@name   = scenario.name
351          * testcase.@status = scenario.status
352          * testcase.@time   = scenario.duration
353
354        Distinguishes now between failures and errors.
355        Failures are AssertationErrors: expectation is violated/not met.
356        Errors are unexpected RuntimeErrors (all other exceptions).
357
358        If a failure/error occurs, the step, that caused the failure,
359        and its location are provided now.
360
361        :param scenario:  Scenario to process.
362        :param report:    Context object to store/add info to (outgoing param).
363        """
364        # pylint: disable=too-many-locals, too-many-branches, too-many-statements
365        assert isinstance(scenario, Scenario)
366        assert not isinstance(scenario, ScenarioOutline)
367        if scenario.status != Status.skipped or self.show_skipped:
368            # -- NOTE: Count only if not-skipped or skipped should be shown.
369            report.counts_tests += 1
370        classname = report.classname
371        feature = report.feature
372        feature_name = feature.name
373        if not feature_name:
374            feature_name = self.make_feature_filename(feature)
375
376        case = ElementTree.Element('testcase')
377        case.set(u"classname", u"%s.%s" % (classname, feature_name))
378        case.set(u"name", scenario.name or "")
379        case.set(u"status", scenario.status.name)
380        case.set(u"time", _text(round(scenario.duration, 6)))
381
382        step = None
383        failing_step = None
384        if scenario.status == Status.failed:
385            for status in (Status.failed, Status.undefined):
386                step = self.select_step_with_status(status, scenario)
387                if step:
388                    break
389            # -- NOTE: Scenario may fail now due to hook-errors.
390            element_name = "failure"
391            if step and isinstance(step.exception, (AssertionError, type(None))):
392                # -- FAILURE: AssertionError
393                assert step.status in (Status.failed, Status.undefined)
394                report.counts_failed += 1
395            else:
396                # -- UNEXPECTED RUNTIME-ERROR:
397                report.counts_errors += 1
398                element_name = "error"
399            # -- COMMON-PART:
400            failure = ElementTree.Element(element_name)
401            if step:
402                step_text = self.describe_step(step).rstrip()
403                text = u"\nFailing step: %s\nLocation: %s\n" % \
404                       (step_text, step.location)
405                message = _text(step.exception)
406                failure.set(u'type', step.exception.__class__.__name__)
407                failure.set(u'message', message)
408                text += _text(step.error_message)
409            else:
410                # -- MAYBE: Hook failure before any step is executed.
411                failure_type = "UnknownError"
412                if scenario.exception:
413                    failure_type = scenario.exception.__class__.__name__
414                failure.set(u'type', failure_type)
415                failure.set(u'message', scenario.error_message or "")
416                traceback_lines = traceback.format_tb(scenario.exc_traceback)
417                traceback_lines.insert(0, u"Traceback:\n")
418                text = _text(u"".join(traceback_lines))
419            failure.append(CDATA(text))
420            case.append(failure)
421        elif (scenario.status in (Status.skipped, Status.untested)
422              and self.show_skipped):
423            report.counts_skipped += 1
424            step = self.select_step_with_status(Status.undefined, scenario)
425            if step:
426                # -- UNDEFINED-STEP:
427                report.counts_failed += 1
428                failure = ElementTree.Element(u"failure")
429                failure.set(u"type", u"undefined")
430                failure.set(u"message", (u"Undefined Step: %s" % step.name))
431                case.append(failure)
432            else:
433                skip = ElementTree.Element(u'skipped')
434                case.append(skip)
435
436        # Create stdout section for each test case
437        stdout = ElementTree.Element(u"system-out")
438        text = u""
439        if self.show_scenarios:
440            text = self.describe_scenario(scenario)
441
442        # Append the captured standard output
443        if scenario.captured.stdout:
444            output = _text(scenario.captured.stdout)
445            text += u"\nCaptured stdout:\n%s\n" % output
446        stdout.append(CDATA(text))
447        case.append(stdout)
448
449        # Create stderr section for each test case
450        if scenario.captured.stderr:
451            stderr = ElementTree.Element(u"system-err")
452            output = _text(scenario.captured.stderr)
453            text = u"\nCaptured stderr:\n%s\n" % output
454            stderr.append(CDATA(text))
455            case.append(stderr)
456
457        if scenario.status != Status.skipped or self.show_skipped:
458            report.testcases.append(case)
459
460    def _process_scenario_outline(self, scenario_outline, report):
461        assert isinstance(scenario_outline, ScenarioOutline)
462        for scenario in scenario_outline:
463            assert isinstance(scenario, Scenario)
464            self._process_scenario(scenario, report)
465
466# -----------------------------------------------------------------------------
467# SUPPORT:
468# -----------------------------------------------------------------------------
469def gethostname():
470    """Return hostname of local host (as string)"""
471    import socket
472    return socket.gethostname()
473