# ***** BEGIN LICENSE BLOCK ***** # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this file, # You can obtain one at http://mozilla.org/MPL/2.0/. # ***** END LICENSE BLOCK ***** from __future__ import absolute_import import json from mozharness.base import log from mozharness.base.log import OutputParser, WARNING, INFO, ERROR from mozharness.mozilla.automation import TBPL_WARNING, TBPL_FAILURE from mozharness.mozilla.automation import TBPL_SUCCESS, TBPL_WORST_LEVEL_TUPLE from mozharness.mozilla.automation import TBPL_RETRY from mozharness.mozilla.testing.errors import TinderBoxPrintRe from mozharness.mozilla.testing.unittest import tbox_print_summary from collections import ( defaultdict, namedtuple, ) class StructuredOutputParser(OutputParser): # The script class using this must inherit the MozbaseMixin to ensure # that mozlog is available. def __init__(self, **kwargs): """Object that tracks the overall status of the test run""" # The 'strict' argument dictates whether the presence of output # from the harness process other than line-delimited json indicates # failure. If it does not, the errors_list parameter may be used # to detect additional failure output from the harness process. if "strict" in kwargs: self.strict = kwargs.pop("strict") else: self.strict = True self.suite_category = kwargs.pop("suite_category", None) tbpl_compact = kwargs.pop("log_compact", False) super(StructuredOutputParser, self).__init__(**kwargs) self.allow_crashes = kwargs.pop("allow_crashes", False) mozlog = self._get_mozlog_module() self.formatter = mozlog.formatters.TbplFormatter(compact=tbpl_compact) self.handler = mozlog.handlers.StatusHandler() self.log_actions = mozlog.structuredlog.log_actions() self.worst_log_level = INFO self.tbpl_status = TBPL_SUCCESS self.harness_retry_re = TinderBoxPrintRe["harness_error"]["retry_regex"] self.prev_was_unstructured = False def _get_mozlog_module(self): try: import mozlog except ImportError: self.fatal( "A script class using structured logging must inherit " "from the MozbaseMixin to ensure that mozlog is available." ) return mozlog def _handle_unstructured_output(self, line, log_output=True): self.log_output = log_output return super(StructuredOutputParser, self).parse_single_line(line) def parse_single_line(self, line): """Parses a line of log output from the child process and passes it to mozlog to update the overall status of the run. Re-emits the logged line in human-readable format. """ level = INFO tbpl_level = TBPL_SUCCESS data = None try: candidate_data = json.loads(line) if ( isinstance(candidate_data, dict) and "action" in candidate_data and candidate_data["action"] in self.log_actions ): data = candidate_data except ValueError: pass if data is None: if self.strict: if not self.prev_was_unstructured: self.critical( ( "Test harness output was not a valid structured log message: " "\n%s" ) % line ) else: self.critical(line) self.update_levels(TBPL_FAILURE, log.CRITICAL) self.prev_was_unstructured = True else: self._handle_unstructured_output(line) return self.prev_was_unstructured = False self.handler(data) action = data["action"] if action in ("log", "process_output"): if action == "log": message = data["message"] level = getattr(log, data["level"].upper()) else: message = data["data"] # Run log and process_output actions through the error lists, but make sure # the super parser doesn't print them to stdout (they should go through the # log formatter). error_level = self._handle_unstructured_output(message, log_output=False) if error_level is not None: level = self.worst_level(error_level, level) if self.harness_retry_re.search(message): self.update_levels(TBPL_RETRY, log.CRITICAL) tbpl_level = TBPL_RETRY level = log.CRITICAL log_data = self.formatter(data) if log_data is not None: self.log(log_data, level=level) self.update_levels(tbpl_level, level) def _subtract_tuples(self, old, new): items = set(list(old.keys()) + list(new.keys())) merged = defaultdict(int) for item in items: merged[item] = new.get(item, 0) - old.get(item, 0) if merged[item] <= 0: del merged[item] return merged def evaluate_parser(self, return_code, success_codes=None, previous_summary=None): success_codes = success_codes or [0] summary = self.handler.summarize() """ We can run evaluate_parser multiple times, it will duplicate failures and status which can mean that future tests will fail if a previous test fails. When we have a previous summary, we want to do 2 things: 1) Remove previous data from the new summary to only look at new data 2) Build a joined summary to include the previous + new data """ RunSummary = namedtuple( "RunSummary", ( "unexpected_statuses", "expected_statuses", "known_intermittent_statuses", "log_level_counts", "action_counts", ), ) if previous_summary == {}: previous_summary = RunSummary( defaultdict(int), defaultdict(int), defaultdict(int), defaultdict(int), defaultdict(int), ) if previous_summary: # Always preserve retry status: if any failure triggers retry, the script # must exit with TBPL_RETRY to trigger task retry. if self.tbpl_status != TBPL_RETRY: self.tbpl_status = TBPL_SUCCESS joined_summary = summary # Remove previously known status messages if "ERROR" in summary.log_level_counts: summary.log_level_counts["ERROR"] -= self.handler.no_tests_run_count summary = RunSummary( self._subtract_tuples( previous_summary.unexpected_statuses, summary.unexpected_statuses ), self._subtract_tuples( previous_summary.expected_statuses, summary.expected_statuses ), self._subtract_tuples( previous_summary.known_intermittent_statuses, summary.known_intermittent_statuses, ), self._subtract_tuples( previous_summary.log_level_counts, summary.log_level_counts ), summary.action_counts, ) # If we have previous data to ignore, # cache it so we don't parse the log multiple times self.summary = summary else: joined_summary = summary fail_pair = TBPL_WARNING, WARNING error_pair = TBPL_FAILURE, ERROR # These are warning/orange statuses. failure_conditions = [ (sum(summary.unexpected_statuses.values()), 0, "statuses", False), ( summary.action_counts.get("crash", 0), summary.expected_statuses.get("CRASH", 0), "crashes", self.allow_crashes, ), ( summary.action_counts.get("valgrind_error", 0), 0, "valgrind errors", False, ), ] for value, limit, type_name, allow in failure_conditions: if value > limit: msg = "%d unexpected %s" % (value, type_name) if limit != 0: msg += " expected at most %d" % (limit) if not allow: self.update_levels(*fail_pair) msg = "Got " + msg self.error(msg) else: msg = "Ignored " + msg self.warning(msg) # These are error/red statuses. A message is output here every time something # wouldn't otherwise be highlighted in the UI. required_actions = { "suite_end": "No suite end message was emitted by this harness.", "test_end": "No checks run.", } for action, diagnostic_message in required_actions.items(): if action not in summary.action_counts: self.log(diagnostic_message, ERROR) self.update_levels(*error_pair) failure_log_levels = ["ERROR", "CRITICAL"] for level in failure_log_levels: if level in summary.log_level_counts: self.update_levels(*error_pair) # If a superclass was used to detect errors with a regex based output parser, # this will be reflected in the status here. if self.num_errors: self.update_levels(*error_pair) # Harnesses typically return non-zero on test failure, so don't promote # to error if we already have a failing status. if return_code not in success_codes and self.tbpl_status == TBPL_SUCCESS: self.update_levels(*error_pair) return self.tbpl_status, self.worst_log_level, joined_summary def update_levels(self, tbpl_level, log_level): self.worst_log_level = self.worst_level(log_level, self.worst_log_level) self.tbpl_status = self.worst_level( tbpl_level, self.tbpl_status, levels=TBPL_WORST_LEVEL_TUPLE ) def print_summary(self, suite_name): # Summary text provided for compatibility. Counts are currently # in the format //, # // will yield the # expected info from a structured log (fail count from the prior implementation # includes unexpected passes from "todo" assertions). try: summary = self.summary except AttributeError: summary = self.handler.summarize() unexpected_count = sum(summary.unexpected_statuses.values()) expected_count = sum(summary.expected_statuses.values()) expected_failures = summary.expected_statuses.get("FAIL", 0) if unexpected_count: fail_text = '%s' % unexpected_count else: fail_text = "0" text_summary = "%s/%s/%s" % (expected_count, fail_text, expected_failures) self.info("TinderboxPrint: %s
%s\n" % (suite_name, text_summary)) def append_tinderboxprint_line(self, suite_name): try: summary = self.summary except AttributeError: summary = self.handler.summarize() unexpected_count = sum(summary.unexpected_statuses.values()) expected_count = sum(summary.expected_statuses.values()) expected_failures = summary.expected_statuses.get("FAIL", 0) crashed = 0 if "crash" in summary.action_counts: crashed = summary.action_counts["crash"] text_summary = tbox_print_summary( expected_count, unexpected_count, expected_failures, crashed > 0, False ) self.info("TinderboxPrint: %s
%s\n" % (suite_name, text_summary))