1import copy
2import functools
3import imp
4import io
5import os
6from collections import OrderedDict, defaultdict
7from datetime import datetime
8
9from mozlog import reader
10from mozlog.formatters import JSONFormatter
11from mozlog.handlers import BaseHandler, StreamHandler, LogLevelFilter
12
13here = os.path.dirname(__file__)
14localpaths = imp.load_source("localpaths", os.path.abspath(os.path.join(here, os.pardir, os.pardir, "localpaths.py")))
15from ci.tc.github_checks_output import get_gh_checks_outputter  # type: ignore
16from wpt.markdown import markdown_adjust, table  # type: ignore
17
18
19# If a test takes more than (FLAKY_THRESHOLD*timeout) and does not consistently
20# time out, it is considered slow (potentially flaky).
21FLAKY_THRESHOLD = 0.8
22
23
24class LogActionFilter(BaseHandler):  # type: ignore
25
26    """Handler that filters out messages not of a given set of actions.
27
28    Subclasses BaseHandler.
29
30    :param inner: Handler to use for messages that pass this filter
31    :param actions: List of actions for which to fire the handler
32    """
33
34    def __init__(self, inner, actions):
35        """Extend BaseHandler and set inner and actions props on self."""
36        BaseHandler.__init__(self, inner)
37        self.inner = inner
38        self.actions = actions
39
40    def __call__(self, item):
41        """Invoke handler if action is in list passed as constructor param."""
42        if item["action"] in self.actions:
43            return self.inner(item)
44
45
46class LogHandler(reader.LogHandler):  # type: ignore
47
48    """Handle updating test and subtest status in log.
49
50    Subclasses reader.LogHandler.
51    """
52    def __init__(self):
53        self.results = OrderedDict()
54
55    def find_or_create_test(self, data):
56        test_name = data["test"]
57        if self.results.get(test_name):
58            return self.results[test_name]
59
60        test = {
61            "subtests": OrderedDict(),
62            "status": defaultdict(int),
63            "longest_duration": defaultdict(float),
64        }
65        self.results[test_name] = test
66        return test
67
68    def find_or_create_subtest(self, data):
69        test = self.find_or_create_test(data)
70        subtest_name = data["subtest"]
71
72        if test["subtests"].get(subtest_name):
73            return test["subtests"][subtest_name]
74
75        subtest = {
76            "status": defaultdict(int),
77            "messages": set()
78        }
79        test["subtests"][subtest_name] = subtest
80
81        return subtest
82
83    def test_start(self, data):
84        test = self.find_or_create_test(data)
85        test["start_time"] = data["time"]
86
87    def test_status(self, data):
88        subtest = self.find_or_create_subtest(data)
89        subtest["status"][data["status"]] += 1
90        if data.get("message"):
91            subtest["messages"].add(data["message"])
92
93    def test_end(self, data):
94        test = self.find_or_create_test(data)
95        test["status"][data["status"]] += 1
96        # Timestamps are in ms since epoch.
97        duration = data["time"] - test.pop("start_time")
98        test["longest_duration"][data["status"]] = max(
99            duration, test["longest_duration"][data["status"]])
100        try:
101            # test_timeout is in seconds; convert it to ms.
102            test["timeout"] = data["extra"]["test_timeout"] * 1000
103        except KeyError:
104            # If a test is skipped, it won't have extra info.
105            pass
106
107
108def is_inconsistent(results_dict, iterations):
109    """Return whether or not a single test is inconsistent."""
110    if 'SKIP' in results_dict:
111        return False
112    return len(results_dict) > 1 or sum(results_dict.values()) != iterations
113
114
115def find_slow_status(test):
116    """Check if a single test almost times out.
117
118    We are interested in tests that almost time out (i.e. likely to be flaky).
119    Therefore, timeout statuses are ignored, including (EXTERNAL-)TIMEOUT.
120    CRASH & ERROR are also ignored because the they override TIMEOUT; a test
121    that both crashes and times out is marked as CRASH, so it won't be flaky.
122
123    Returns:
124        A result status produced by a run that almost times out; None, if no
125        runs almost time out.
126    """
127    if "timeout" not in test:
128        return None
129    threshold = test["timeout"] * FLAKY_THRESHOLD
130    for status in ['PASS', 'FAIL', 'OK']:
131        if (status in test["longest_duration"] and
132            test["longest_duration"][status] > threshold):
133            return status
134    return None
135
136
137def process_results(log, iterations):
138    """Process test log and return overall results and list of inconsistent tests."""
139    inconsistent = []
140    slow = []
141    handler = LogHandler()
142    reader.handle_log(reader.read(log), handler)
143    results = handler.results
144    for test_name, test in results.items():
145        if is_inconsistent(test["status"], iterations):
146            inconsistent.append((test_name, None, test["status"], []))
147        for subtest_name, subtest in test["subtests"].items():
148            if is_inconsistent(subtest["status"], iterations):
149                inconsistent.append((test_name, subtest_name, subtest["status"], subtest["messages"]))
150
151        slow_status = find_slow_status(test)
152        if slow_status is not None:
153            slow.append((
154                test_name,
155                slow_status,
156                test["longest_duration"][slow_status],
157                test["timeout"]
158            ))
159
160    return results, inconsistent, slow
161
162
163def err_string(results_dict, iterations):
164    """Create and return string with errors from test run."""
165    rv = []
166    total_results = sum(results_dict.values())
167    if total_results > iterations:
168        rv.append("Duplicate subtest name")
169    else:
170        for key, value in sorted(results_dict.items()):
171            rv.append("%s%s" %
172                      (key, ": %s/%s" % (value, iterations) if value != iterations else ""))
173    if total_results < iterations:
174        rv.append("MISSING: %s/%s" % (iterations - total_results, iterations))
175    rv = ", ".join(rv)
176    if is_inconsistent(results_dict, iterations):
177        rv = "**%s**" % rv
178    return rv
179
180
181def write_github_checks_summary_inconsistent(log, inconsistent, iterations):
182    """Outputs a summary of inconsistent tests for GitHub Checks."""
183    log("Some affected tests had inconsistent (flaky) results:\n")
184    write_inconsistent(log, inconsistent, iterations)
185    log("\n")
186    log("These may be pre-existing or new flakes. Please try to reproduce (see "
187        "the above WPT command, though some flags may not be needed when "
188        "running locally) and determine if your change introduced the flake. "
189        "If you are unable to reproduce the problem, please tag "
190        "`@web-platform-tests/wpt-core-team` in a comment for help.\n")
191
192
193def write_github_checks_summary_slow_tests(log, slow):
194    """Outputs a summary of slow tests for GitHub Checks."""
195    log("Some affected tests had slow results:\n")
196    write_slow_tests(log, slow)
197    log("\n")
198    log("These may be pre-existing or newly slow tests. Slow tests indicate "
199        "that a test ran very close to the test timeout limit and so may "
200        "become TIMEOUT-flaky in the future. Consider speeding up the test or "
201        "breaking it into multiple tests. For help, please tag "
202        "`@web-platform-tests/wpt-core-team` in a comment.\n")
203
204
205def write_inconsistent(log, inconsistent, iterations):
206    """Output inconsistent tests to the passed in logging function."""
207    log("## Unstable results ##\n")
208    strings = [(
209        "`%s`" % markdown_adjust(test),
210        ("`%s`" % markdown_adjust(subtest)) if subtest else "",
211        err_string(results, iterations),
212        ("`%s`" % markdown_adjust(";".join(messages))) if len(messages) else "")
213        for test, subtest, results, messages in inconsistent]
214    table(["Test", "Subtest", "Results", "Messages"], strings, log)
215
216
217def write_slow_tests(log, slow):
218    """Output slow tests to the passed in logging function."""
219    log("## Slow tests ##\n")
220    strings = [(
221        "`%s`" % markdown_adjust(test),
222        "`%s`" % status,
223        "`%.0f`" % duration,
224        "`%.0f`" % timeout)
225        for test, status, duration, timeout in slow]
226    table(["Test", "Result", "Longest duration (ms)", "Timeout (ms)"], strings, log)
227
228
229def write_results(log, results, iterations, pr_number=None, use_details=False):
230    log("## All results ##\n")
231    if use_details:
232        log("<details>\n")
233        log("<summary>%i %s ran</summary>\n\n" % (len(results),
234                                                  "tests" if len(results) > 1
235                                                  else "test"))
236
237    for test_name, test in results.items():
238        baseurl = "http://w3c-test.org/submissions"
239        if "https" in os.path.splitext(test_name)[0].split(".")[1:]:
240            baseurl = "https://w3c-test.org/submissions"
241        title = test_name
242        if use_details:
243            log("<details>\n")
244            if pr_number:
245                title = "<a href=\"%s/%s%s\">%s</a>" % (baseurl, pr_number, test_name, title)
246            log('<summary>%s</summary>\n\n' % title)
247        else:
248            log("### %s ###" % title)
249        strings = [("", err_string(test["status"], iterations), "")]
250
251        strings.extend(((
252            ("`%s`" % markdown_adjust(subtest_name)) if subtest else "",
253            err_string(subtest["status"], iterations),
254            ("`%s`" % markdown_adjust(';'.join(subtest["messages"]))) if len(subtest["messages"]) else "")
255            for subtest_name, subtest in test["subtests"].items()))
256        table(["Subtest", "Results", "Messages"], strings, log)
257        if use_details:
258            log("</details>\n")
259
260    if use_details:
261        log("</details>\n")
262
263
264def run_step(logger, iterations, restart_after_iteration, kwargs_extras, **kwargs):
265    from . import wptrunner
266    kwargs = copy.deepcopy(kwargs)
267
268    if restart_after_iteration:
269        kwargs["repeat"] = iterations
270    else:
271        kwargs["rerun"] = iterations
272
273    kwargs["pause_after_test"] = False
274    kwargs.update(kwargs_extras)
275
276    def wrap_handler(x):
277        if not kwargs["verify_log_full"]:
278            x = LogLevelFilter(x, "WARNING")
279            x = LogActionFilter(x, ["log", "process_output"])
280        return x
281
282    initial_handlers = logger._state.handlers
283    logger._state.handlers = [wrap_handler(handler)
284                              for handler in initial_handlers]
285
286    log = io.BytesIO()
287    # Setup logging for wptrunner that keeps process output and
288    # warning+ level logs only
289    logger.add_handler(StreamHandler(log, JSONFormatter()))
290
291    # Use the number of iterations of the test suite that were run to process the results.
292    # if the runs were stopped to avoid hitting the maximum run time.
293    _, test_status = wptrunner.run_tests(**kwargs)
294    iterations = test_status.repeated_runs
295
296    if not restart_after_iteration:
297        iterations = kwargs["rerun"]
298
299    all_skipped = test_status.all_skipped
300
301    logger._state.handlers = initial_handlers
302    logger._state.running_tests = set()
303    logger._state.suite_started = False
304
305    log.seek(0)
306    results, inconsistent, slow = process_results(log, iterations)
307    return results, inconsistent, slow, iterations, all_skipped
308
309
310def get_steps(logger, repeat_loop, repeat_restart, kwargs_extras):
311    steps = []
312    for kwargs_extra in kwargs_extras:
313        if kwargs_extra:
314            flags_string = " with flags %s" % " ".join(
315                "%s=%s" % item for item in kwargs_extra.items())
316        else:
317            flags_string = ""
318
319        if repeat_loop:
320            desc = "Running tests in a loop %d times%s" % (repeat_loop,
321                                                           flags_string)
322            steps.append((desc,
323                          functools.partial(run_step,
324                                            logger,
325                                            repeat_loop,
326                                            False,
327                                            kwargs_extra),
328                          repeat_loop))
329
330        if repeat_restart:
331            desc = "Running tests in a loop with restarts %s times%s" % (repeat_restart,
332                                                                         flags_string)
333            steps.append((desc,
334                          functools.partial(run_step,
335                                            logger,
336                                            repeat_restart,
337                                            True,
338                                            kwargs_extra),
339                          repeat_restart))
340
341    return steps
342
343
344def write_summary(logger, step_results, final_result):
345    for desc, result in step_results:
346        logger.info('::: %s : %s' % (desc, result))
347    logger.info(':::')
348    if final_result == "PASS":
349        log = logger.info
350    elif final_result == "TIMEOUT":
351        log = logger.warning
352    else:
353        log = logger.error
354    log('::: Test verification %s' % final_result)
355
356    logger.info(':::')
357
358
359def check_stability(logger, repeat_loop=10, repeat_restart=5, chaos_mode=True, max_time=None,
360                    output_results=True, **kwargs):
361    kwargs_extras = [{}]
362    if chaos_mode and kwargs["product"] == "firefox":
363        kwargs_extras.append({"chaos_mode_flags": "0xfb"})
364
365    steps = get_steps(logger, repeat_loop, repeat_restart, kwargs_extras)
366
367    start_time = datetime.now()
368    step_results = []
369
370    github_checks_outputter = get_gh_checks_outputter(kwargs["github_checks_text_file"])
371
372    for desc, step_func, expected_iterations in steps:
373        if max_time and datetime.now() - start_time > max_time:
374            logger.info("::: Test verification is taking too long: Giving up!")
375            logger.info("::: So far, all checks passed, but not all checks were run.")
376            write_summary(logger, step_results, "TIMEOUT")
377            return 2
378
379        logger.info(':::')
380        logger.info('::: Running test verification step "%s"...' % desc)
381        logger.info(':::')
382        results, inconsistent, slow, iterations, all_skipped = step_func(**kwargs)
383        logger.info(f"::: Ran {iterations} of expected {expected_iterations} iterations.")
384        if iterations <= 1 and expected_iterations > 1 and not all_skipped:
385            step_results.append((desc, "FAIL"))
386            logger.info("::: Reached iteration timeout before finishing 2 or more repeat runs.")
387            logger.info("::: At least 2 successful repeat runs are required to validate stability.")
388            write_summary(logger, step_results, "TIMEOUT")
389            return 1
390
391        if output_results:
392            write_results(logger.info, results, iterations)
393
394        if inconsistent:
395            step_results.append((desc, "FAIL"))
396            if github_checks_outputter:
397                write_github_checks_summary_inconsistent(github_checks_outputter.output, inconsistent, iterations)
398            write_inconsistent(logger.info, inconsistent, iterations)
399            write_summary(logger, step_results, "FAIL")
400            return 1
401
402        if slow:
403            step_results.append((desc, "FAIL"))
404            if github_checks_outputter:
405                write_github_checks_summary_slow_tests(github_checks_outputter.output, slow)
406            write_slow_tests(logger.info, slow)
407            write_summary(logger, step_results, "FAIL")
408            return 1
409
410        # If the tests passed but the number of iterations didn't match the number expected to run,
411        # it is likely that the runs were stopped early to avoid a timeout.
412        if iterations != expected_iterations:
413            result = f"PASS *  {iterations}/{expected_iterations} repeats completed"
414            step_results.append((desc, result))
415        else:
416            step_results.append((desc, "PASS"))
417
418    write_summary(logger, step_results, "PASS")
419